2016-12-29 42 views
0

我試圖從https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/run_example_quickstart.html使用Scala

從教程重寫阿帕奇弗林克教程Scala的維基百科編輯流分析代碼的Apache弗林克維基百科的編輯分析是

import org.apache.flink.api.common.functions.FoldFunction; 
import org.apache.flink.api.java.functions.KeySelector; 
import org.apache.flink.api.java.tuple.Tuple2; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.datastream.KeyedStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.windowing.time.Time; 
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent; 
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource; 

public class WikipediaAnalysis { 

    public static void main(String[] args) throws Exception { 

    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); 

    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource()); 

    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits 
     .keyBy(new KeySelector<WikipediaEditEvent, String>() { 
     @Override 
     public String getKey(WikipediaEditEvent event) { 
      return event.getUser(); 
     } 
     }); 

    DataStream<Tuple2<String, Long>> result = keyedEdits 
     .timeWindow(Time.seconds(5)) 
     .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() { 
     @Override 
     public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) { 
      acc.f0 = event.getUser(); 
      acc.f1 += event.getByteDiff(); 
      return acc; 
     } 
     }); 

    result.print(); 

    see.execute(); 
    } 
} 
下面

是我的嘗試在階

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} 
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource} 
import org.apache.flink.streaming.api.windowing.time.Time 


object WikipediaAnalytics extends App{ 

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 

    val edits = env.addSource(new WikipediaEditsSource()); 

    val keyedEdits = edits.keyBy(event => event.getUser) 

    val result = keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L), (we: WikipediaEditEvent, t: (String, Long)) => 
    (we.getUser, t._2 + we.getByteDiff)) 

} 

這是或多或少的單詞到字轉換爲階,基於在其上val result的類型應爲DataStream[(String, Long)],但在fold()之後推斷的實際類型不在哪裏。

請幫忙鑑定一下是錯的Scala代碼

EDIT1:作出了以下的變化,使用的fold[R]的鑽營原理,現在確認爲所需類型的類型,但還是沒能得到的理由保持雖然

val result_1: (((String, Long), WikipediaEditEvent) => (String, Long)) => DataStream[(String, Long)] = 
    keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L)) 

    val result_2: DataStream[(String, Long)] = result_1((t: (String, Long), we: WikipediaEditEvent) => 
    (we.getUser, t._2 + we.getByteDiff)) 
+0

我想如果有很多可能性來解決最終的類型,你將不得不給類型推理系統一個提示。 – Ashalynd

回答

1

這個問題似乎是與倍,你必須有你的累加器初值後一個右括號。當您解決這個問題時,代碼將無法編譯,因爲它沒有可用於WikipediaEditEvent的TypeInformation。解決這個問題的最簡單方法是導入更多的flink scala API。見下面的完整例子:

import org.apache.flink.streaming.api.scala._ 
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource 
import org.apache.flink.streaming.api.windowing.time.Time 

object WikipediaAnalytics extends App { 
    val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 
    val edits = see.addSource(new WikipediaEditsSource()) 
    val userEditsVolume: DataStream[(String, Int)] = edits 
    .keyBy(_.getUser) 
    .timeWindow(Time.seconds(5)) 
    .fold(("", 0))((acc, event) => (event.getUser, acc._2 + event.getByteDiff)) 
    userEditsVolume.print() 
    see.execute("Wikipedia User Edit Volume") 
}