2017-03-01 71 views
0

我使用的是舊版本的Flink。我升級到1.2.0,我有一些過濾器問題。如何在Scala中應用Flink的簡單過濾器

我有記錄的數據流,其工作得很好:

val logs: DataStream[Log] = env.addSource(new LogSource(
     data, delay, factor)) 

// DISPLAY TUPLE IN CONSOLE 
logs.print() 

// EXECUTE SCRIPT 
env.execute("stream") 

我當然讀這說明文檔:

dataStream.filter { _ != 0 } 

我試着像這樣的一堆東西:

val cleanLogs = logs.filter { _.isComplete } 

但我得到了以下錯誤:

類型不匹配,預計:的filterFunction [日誌],實際:(任意)=>的

所以我沒有看到文件,但此錯誤的鏈接。 有什麼幫助嗎?例子 ?

感謝

+0

什麼是'isComplete'方法的簽名? –

+0

這不是一種方法,Log的第一個屬性是一個布爾值:isComplete。它與Flink 0.10一起工作得很好,但它可能不再可能了? – ImbaBalboa

+1

我無法真正重現您的問題。我唯一想到的是一些錯誤的進口產品。確保你正在導入'DataStream'和'StreamExecutionEnvironment'的scala版本。在scala中最好總是導入'org.apache.flink.streaming.api.scala._' –

回答

0

的問題是第一的StreamExecutionEnvironment這導致了這個問題,像filter基本功能,一個錯誤的進口。

然後,當我使用老版本的Flink時,我使用的是Flink 1.X中不再提供的類別LocalExecutionEnvironment

相反:StreamExecutionEnvironment.createLocalEnvironment(1)

相關問題