2017-08-09 164 views
0

雖然我在做卡哇卡flink的概念驗證,但我發現了以下情況:似乎kafka製作者的錯誤可能是由於flink端完成的工作量而發生的!kafka-flink:根據flink作業的卡夫卡製作者錯誤

以下是詳細信息:

我有樣本文件樣?EDR包含「實體」,「價值」,「時間戳」

我用值進行700'000行的〜。下面的命令來創建卡夫卡話題:

~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic gprs 

我用下面的命令加載的話題示例文件:

[13:00] [email protected]: ~/fms 
% /home/kafka/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic gprs < ~/sample/sample01.EDR 

我有flink方面的工作,爲6小時和72小時的滑動窗口(aggregationeachsix,aggregationeachsentytwo)爲每個實體聚合值。

我做了三個方案:在主題

  1. 加載文件,而不會與aggregationeachsix的主題中運行
  2. 加載文件aggregationeachsix作業的主題中運行
  3. 加載文件中的任何工作,aggregationeachsentytwo工作運行

結果是,前兩個方案正在工作,但對於第三個方案,我在kafka生產者方面有以下錯誤在加載文件(在同一個文件並非總是如此,它可以在第一,第二,第三或甚至更高版本的文件):

[plenty of lines before this part] 
    [2017-08-09 12:56:53,409] ERROR Error when sending message to topic gprs with key: null, value: 35 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
    org.apache.kafka.common.errors.TimeoutException: Expiring 233 record(s) for gprs-0: 1560 ms has passed since last append 
    [2017-08-09 12:56:53,409] ERROR Error when sending message to topic gprs with key: null, value: 37 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
    org.apache.kafka.common.errors.TimeoutException: Expiring 233 record(s) for gprs-0: 1560 ms has passed since last append 
    [2017-08-09 12:56:53,409] ERROR Error when sending message to topic gprs with key: null, value: 37 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
    org.apache.kafka.common.errors.TimeoutException: Expiring 233 record(s) for gprs-0: 1560 ms has passed since last append 
    [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1627 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) 
    [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1626 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) 
    [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1625 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) 
    [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1624 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) 
    [2017-08-09 12:56:53,515] ERROR Error when sending message to topic gprs with key: null, value: 35 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
    org.apache.kafka.common.errors.TimeoutException: Expiring 8 record(s) for gprs-0: 27850 ms has passed since batch creation plus linger time 
    [2017-08-09 12:56:53,515] ERROR Error when sending message to topic gprs with key: null, value: 37 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
    org.apache.kafka.common.errors.TimeoutException: Expiring 8 record(s) for gprs-0: 27850 ms has passed since batch creation plus linger time 
    [plenty of lines after this part] 

我的問題是,爲什麼弗林克可能對卡夫卡生產商產生影響,那麼,是什麼我是否需要更改以避免此錯誤?

回答

0

當flink和kafka-producer都在使用它時,看起來您正在使網絡飽和,因此您獲得TimeoutExceptions

+0

一切運行在同一臺服務器上(kafka和flink)... –

+0

然後,您可能正在一臺機器上運行kafka的極限。 –