0
我使用的是舊版本的Flink。我升級到1.2.0,我有一些過濾器問題。如何在Scala中應用Flink的簡單過濾器
我有記錄的數據流,其工作得很好:
val logs: DataStream[Log] = env.addSource(new LogSource(
data, delay, factor))
// DISPLAY TUPLE IN CONSOLE
logs.print()
// EXECUTE SCRIPT
env.execute("stream")
我當然讀這說明文檔:
dataStream.filter { _ != 0 }
我試着像這樣的一堆東西:
val cleanLogs = logs.filter { _.isComplete }
但我得到了以下錯誤:
類型不匹配,預計:的filterFunction [日誌],實際:(任意)=>的
所以我沒有看到文件,但此錯誤的鏈接。 有什麼幫助嗎?例子 ?
感謝
什麼是'isComplete'方法的簽名? –
這不是一種方法,Log的第一個屬性是一個布爾值:isComplete。它與Flink 0.10一起工作得很好,但它可能不再可能了? – ImbaBalboa
我無法真正重現您的問題。我唯一想到的是一些錯誤的進口產品。確保你正在導入'DataStream'和'StreamExecutionEnvironment'的scala版本。在scala中最好總是導入'org.apache.flink.streaming.api.scala._' –