2017-01-30 76 views
3

我是flink和流媒體的新手。我想將每個分區的某個函數應用到流的每個窗口(使用事件時間)。什麼迄今爲止我所做的是這樣的:Flink流媒體 - 在Windows應用功能

val env = StreamExecutionEnvironment.getExecutionEnvironment 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

val inputStream = env.readTextFile("dataset.txt") 
     .map(transformStream(_)) 
     .assignAscendingTimestamps(_.eventTime) 
     .keyBy(_.id) 
     .timeWindow(Time.seconds(windowSize),Time.seconds(slidingStep)) 

def transformStream(input: String): EventStream = {...} 

case class EventStream(val eventTime: Long, val id: String, actualEvent: String) 

我想要做的是一般的功能應用到每個窗口批次中的每個分區,也許應用複雜的處理算法或類似的東西。我已經看到該方法適用於DataStream API,但我不明白它是如何工作的。在弗林克API它說,它是用來一樣,在斯卡拉:

inputStream.apply { WindowFunction } 

誰能解釋這是什麼或如何使用它的應用方法是什麼?斯卡拉的一個例子是可取的。應用方法做我想要的嗎?

回答

5

所以基本上有兩種可能的方向來根據您想要做的計算類型。可以使用:fold/reduce/aggregate或更通用的,您已經提到 - apply。他們都適用於窗口的一個關鍵。

至於apply這是應用計算的非常通用的方法。最基本的版本(在斯卡拉)是:

def apply[R: TypeInformation](function: (K, W, Iterable[T],Collector[R]) => Unit): DataStream[R] 

其中函數有4個參數:

  • 窗口鍵(記住您正在使用的keyedStream)
  • 窗口(你可以提取即摹開始或從窗口的結束)
  • 已分配給這個特殊的窗口中的元素和關鍵
  • 一個收藏家,你應該發出的處理結果

必須記住,雖然這版本必須保持每個元素都處於狀態,直到窗口發出。一個更好的內存性能解決方案將使用帶有preAgreggator的版本,該版本在啓動上述函數之前執行一些計算。

在這裏,您可以看到與預先聚合一小片段:

val stream: DataStream[(String,Int)] = ... 

stream.keyBy(_._1) 
     .window(EventTimeSessionWindows.withGap(Time.seconds(conf.sessionGap()))) 
     .apply((e1, e2) => (e1._1, e1._2 + e2._2), 
      (key, window, in, out: Collector[(String, Long, Long, Int)]) => { 
       out.collect((key, window.getStart, window.getEnd, in.map(_._2).sum)) 
     }) 

,其對在會話窗口的一個關鍵的appearences。

所以基本上如果你不需要窗口的元信息,我會堅持如果他們足夠的話,我會堅持到fold \ reduce \ aggregate。比考慮適用某種預先集合,如果這還不夠,請查看最通用的apply

如需更完整的示例,您可以查看here

0

就我而言,您可以將map/flatmap/keyBy函數調用應用於有狀態窗口數據val inputStream以更改數據。所以,如果你要創建

class DoSthWithYourStream {...}

,你需要定義你的方法和輸入數據的限制,那麼您可以創建另一個值:

val inputStreamChanged = inputStream .map(a => DoSthWithYourStream.Change2ColumnsIntoOne(a.change1st, a.change2nd), a) .flatMap(new DoSthWithYourStream())

Examples extending Java Classed and applying Scala classes into the stream using map/flapmap/key etc

如果你想使用CEP,那麼我認爲最好的選擇是利用CEP pattern API

val pattern = Pattern.begin("start").where(_.getId == 42) .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0) .followedBy("end").where(_.getName == "end")

val patternStream = CEP.pattern(inputStream, pattern) val result: DataStream[Alert] = patternStream.select(createAlert(_))

+0

問題是我想處理整個分區,map/flatMap函數調用在DataStream的每個元素上應用轉換。 –

0

事實證明,它需要一點魔法斯卡拉。什麼到目前爲止,我這樣做是:

val test: DataStream[Long] = inputStream.apply(processPartition(_,_,_,_)) 

    def processPartition(key: String, window: TimeWindow, 
         batch: Iterable[EventStream], 
         out: Collector[Long]): Unit = {..} 

從我的實驗processPartition方法對整批即「鍵分區」(批次將只包含具有相同鍵的元素)應用的功能。我從Java API中獲取了此方法的參數。如果有人能夠詳細闡述應用函數和它的工作原理,這將是有用的。