2017-10-09 542 views
0

我正在使用kafka/confluent(3.2.0)來檢索我們所擁有的Mongodb實例的更改。Kafka/Confluent:ProducerConfig使用Docker鏡像更改默認max.request.size

源進程由Debezium source connector管理,他使用Source Connect Api,並使用擴展Confluent Connect泊塢窗鏡像的Mesos(DC/OS)部署在我們的系統上。 Kafka本身使用框架版本部署在相同的DC/OS上。

由於我們一些消息大於默認大小我已經改變了這些卡夫卡安裝參數:

•replica.fetch.max.bytes
•message.max.bytes

既4MB 。

然後我用這個

docker run -d --rm --net=bridge --name=kafka-connect-mongodb -e CONNECT_BOOTSTRAP_SERVERS=${KAFKA_BOOTSTRAP_SERVERS} -e CONNECT_REST_PORT=${CONNECT_REST_PORT} -e CONNECT_GROUP_ID="mongo-to-kafka-source-connector" -e CONNECT_CONFIG_STORAGE_TOPIC="${CONFIG.TOPIC}" -e CONNECT_OFFSET_STORAGE_TOPIC="${OFFSETS.TOPIC}" -e CONNECT_STATUS_STORAGE_TOPIC="${STATUS.TOPIC}" -e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" -e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="${SCHEMA_REGISTRY_LISTENERS}" -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="${SCHEMA_REGISTRY_LISTENERS}" -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" -e CONNECT_REST_ADVERTISED_HOST_NAME="${CONNECTOR_HOME}" -e CONNECT_LOG4J_ROOT_LOGLEVEL=INFO -e CONNECT_MAX_REQUEST_SIZE=4194304 -e KAFKA_MAX_REQUEST_SIZE=4194304 mongodb-source-connector:1.1 

我改變同時傳遞K​​AFKA_MAX_REQUEST_SIZE和CONNECT_MAX_REQUEST_SIZE默認max.request.size生產值和日誌進行適當變更爲4MB啓動連接泊塢窗圖像。

當我從Mongodb開始提取時出現問題。 爲此我跑這個帖子

curl -X POST \ 
http://hostname:8083/connectors \ 
    -d '{ 
    "name": "source_connector", 
    "config": { 
    "tasks.max":"1", 
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", 
    "mongodb.hosts": "mongodbhost:27017", 
    "mongodb.name": "replica", 
    "collection.whitelist": "db[.]table", 
    "max.request.size": "4194304" 
    } 
}' 

但隨後的日誌說

[2017-10-09 12:22:56,036] INFO ProducerConfig values: 
    acks = all 
    batch.size = 16384 
    block.on.buffer.full = false 
    bootstrap.servers = [PLAINTEXT://172.17.0.3:9093] 
    buffer.memory = 33554432 
    client.id = 
    compression.type = none 
    connections.max.idle.ms = 540000 
    interceptor.classes = null 
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 
    linger.ms = 0 
    max.block.ms = 9223372036854775807 
    max.in.flight.requests.per.connection = 1 
    max.request.size = 1048576 
    metadata.fetch.timeout.ms = 60000 
    metadata.max.age.ms = 300000 
    metric.reporters = [] 
    metrics.num.samples = 2 
    metrics.sample.window.ms = 30000 
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner 
    receive.buffer.bytes = 32768 
    reconnect.backoff.ms = 50 
    request.timeout.ms = 2147483647 
    retries = 2147483647 
    retry.backoff.ms = 100 
    sasl.jaas.config = null 
    sasl.kerberos.kinit.cmd = /usr/bin/kinit 
    sasl.kerberos.min.time.before.relogin = 60000 
    sasl.kerberos.service.name = null 
    sasl.kerberos.ticket.renew.jitter = 0.05 
    sasl.kerberos.ticket.renew.window.factor = 0.8 
    sasl.mechanism = GSSAPI 
    security.protocol = PLAINTEXT 
    send.buffer.bytes = 131072 
    ssl.cipher.suites = null 
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
    ssl.endpoint.identification.algorithm = null 
    ssl.key.password = null 
    ssl.keymanager.algorithm = SunX509 
    ssl.keystore.location = null 
    ssl.keystore.password = null 
    ssl.keystore.type = JKS 
    ssl.protocol = TLS 
    ssl.provider = null 
    ssl.secure.random.implementation = null 
    ssl.trustmanager.algorithm = PKIX 
    ssl.truststore.location = null 
    ssl.truststore.password = null 
    ssl.truststore.type = JKS 
    timeout.ms = 30000 
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 

因此,當我啓動源過程中,用於max.request.size的默認值。

Here完整日誌。

我不知道我想念什麼。

回答

0

的IRC聊天幫助我: 我不得不同時指定 KAFKA_PRODUCER_MAX_REQUEST_SIZE 和 CONNECT_PRODUCER_MAX_REQUEST_SIZE 同時啓動泊塢窗圖像。