2016-07-26 146 views
1

我試圖使用KStreamBuilder將數據從一個主題移動到另一個主題。我嘗試下面的代碼,與異常KStreamBuilder無法將數據從一個主題傳輸到另一個主題

import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.common.serialization.Serdes; 
import org.apache.kafka.streams.kstream.KStreamBuilder; 
import org.apache.kafka.streams.KafkaStreams; 
import org.apache.kafka.streams.StreamsConfig; 

import java.util.Properties; 

public class StreamsInTopic { 

public static void main(String[] args) throws Exception { 
    Properties props = new Properties(); 
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); 
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094"); 
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

    KStreamBuilder builder = new KStreamBuilder(); 
    System.out.println("KStreamBuilder initialized!!"); 

    builder.stream("nil_PF1_P1").to("nil_RF1_P1_1"); 
    System.out.println("Streaming prepared!!"); 

    KafkaStreams streams = new KafkaStreams(builder, props); 
    System.out.println("KafkaStreams Initialised!!"); 

    streams.start(); 
    System.out.println("Streams started!!"); 

    Thread.sleep(30000L); 
    streams.close(); 
    System.out.println("Streams closed!!"); 
} 
} 

輸出:

KStreamBuilder initialized!! 
Streaming prepared!! 
log4j:WARN No appenders could be found for logger (org.apache.kafka.streams.StreamsConfig). 
log4j:WARN Please initialize the log4j system properly. 
KafkaStreams Initialised!! 
Streams started!! 
Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1 
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60) 
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72) 
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) 
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) 
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64) 
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174) 
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320) 
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) 
Streams closed!! 

然後我試圖消耗數據。

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nil_RF1_P1_1 --from-beginning 

任何想法?我需要任何輔助配置嗎? 我正在使用kafka 0.10.0.0羣集和客戶端。

使用依賴關係。

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-streams</artifactId> 
    <version>0.10.0.0</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.10.0.0</version> 
</dependency> 
+1

java.lang.IllegalArgumentException:Invalid timestamp -1'錯誤的原因很可能是您一直在使用非0.10生產者將數據寫入輸入主題。 –

回答

2

看着你在你的問題中分享的內容,這個問題似乎是,你是不是寫(=生產)的任何數據到輸入主題「nil_PF1_P1」:

  • 卡夫卡流應用程序配置爲將來自Kafka輸入主題「nil_PF1_P1」的數據寫入Kafka輸出主題「nil_RF1_P1_1」。
  • 控制檯消費讀取(應用程序的輸出話題)「」 nil_RF1_P1_1" 的任何數據。
  • 但你不提是否你是如何將數據輸入輸入主題‘nil_PF1_P1’。

另外:你會立即關閉卡夫卡流實例在你的代碼:

streams.start(); 
System.out.println("Streams started!!"); 

//Thread.sleep(1000L); 
streams.close(); 

這會不會給應用程序足夠的時間來實際執行任何處理通常情況下,你只在你main方法ABO調用streams.start()。並且在您的Java應用程序中註冊一個關閉鉤子,在觸發時會調用streams.close()

爲了測試/開發的目的,當然你也可以在main()以內撥打電話streams.close(),但是我會增加開始和結束之間的睡眠時間(例如嘗試30秒而不是1秒) - 但當然您還需要確保在該時間窗口期間實際上是將一些數據寫入應用程序的輸入主題。

編輯:java.lang.IllegalArgumentException: Invalid timestamp -1錯誤的原因很可能是您用非0.10生產者將數據寫入輸入主題。細節在http://docs.confluent.io/current/streams/faq.html#invalid-timestamp-exception解釋。

+0

感謝Miguno。我增加了睡眠時間。它給我的例外,我已轉貼。請看看,如果你可以幫助! – Nilotpal

1

Kafka Stream首次發佈版本爲0.10,因此要求寫入主題的所有記錄都有相關的時間戳記(v0.10中引入的關鍵字和值旁邊的附加字段)。對於Streams,此時間戳不能爲負數(即使代理沒有檢查並允許插入具有負時間戳的數據)。

因此,可能會發生這樣的情況:使用較早的Java生產者(即,生產前0.10)編寫的主題會寫入缺少時間戳記字段的記錄。您也可以使用「舊」主題,即寫入0.9代理的主題,然後將代理升級到0.10 - 所有這些消息都不會設置時間戳。出於兼容性原因,KafkaConsumer(v0.10)將缺少的時間戳設置爲值-1

在卡夫卡流,在內部,從輸入消息時間戳「轉發」接收通知的輸出,因此,如果使用消息沒有時間戳,卡夫卡流嘗試寫入時間戳-1消息轉換成輸出的主題,從而導致上述錯誤。 (Kafka Streams使用0.10 Java生產者檢查時間戳是否有效,並針對負的時間戳值引發上述異常)。

爲避免此問題,您需要通過流配置參數timestamp.extractor(請參閱http://docs.confluent.io/3.0.0/streams/developer-guide.html#optional-configuration-parameters)更改使用的時間戳提取器。根據您的語義,您可以使用WallclockTimestampExtractor或提供定製的提取器。

相關問題