2016-07-06 87 views
0

數據寫到一個簡單的例子弗林克讀取卡夫卡

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val properties = new Properties() 
properties.setProperty("bootstrap.servers","xxxxxx") 
properties.setProperty("zookeeper.connect","xxxxxx") 
properties.setProperty("group.id", "caffrey") 
val stream = env 
    .addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties)) 
.print() 

env.execute("Flink Kafka Example") 

當運行此代碼我收到一個錯誤這樣的:

[錯誤]類 org.apache.flink .streaming.api.checkpoint.CheckpointNotifier未找到 - 繼續使用存根。

我谷歌這個錯誤,找到CheckpointNotifier是一個interface。 我真的不明白我在哪裏做錯了。

回答

2

由於CheckpointNotifier是舊版Flink版本的一個類,我懷疑你在你的pom文件中混合了不同的Flink依賴關係。

確保所有Flink依賴項具有相同的版本。

+0

我更改降級flink版本,它的作品謝謝! – user2341602