2017-04-13 645 views
2

我有幾個關於flink並行性的問題。這是我的設置:Apache Flink:如何並行執行但保持消息順序?

我有1個主節點和2個從屬。在flink,我創建了3個卡夫卡消費者,每個卡夫卡消費者從不同的主題消費。
由於元素的順序對我來說很重要,因此每個主題只有一個分區,並且我使用flink設置來使用事件時間。

然後我跑在每個數據流的下面管道(在僞代碼):

source 
.map(deserialize) 
.window 
.apply 
.map(serialize) 
.writeTo(sink) 

到現在爲止,我開始了我弗林克程序與假設,這將允許我使用這兩個參數-p 2我的節點。結果並不是我所希望的,因爲我的輸出順序有時會混亂。

通過弗林克文檔閱讀,並試圖更好地理解它,可能有人請確認我下面的「學習收穫「?

1)傳遞-p 2僅配置任務並行,即並行實例的最大數目後任務(如map(deserialize))將被分割成若想保持整個管道的順序,我必須使用-p 1

2.)這對我來說似乎是矛盾的/混亂的:即使並行性設置爲1,不同的任務仍然可以同時運行(同時),因此我的3個管道也將在pa中運行如果我通過-p 1拉勒爾。

而作爲一個跟進的問題:有沒有辦法找出哪些任務被映射到任務插槽,這樣我可以證實並行執行自己?

我將不勝感激!

更新

Here-p 2弗林克的執行計劃。

回答

1

已經提出這樣的問題就在這裏了Apache Flink user email list後就是答案:

1)的-p選項定義每項工作任務並行。如果選擇的並行性高於1並且數據被重新分配(例如,通過rebalance()或keyBy())順序不能保證。

2.)使用-p設置爲1個1任務時隙,即1個CPU核心,被使用。因此可能有多個線程同時在一個內核上運行,但不是並行運行。

至於我的要求:爲了並行運行多個管道,仍然保持我可以運行,而不是運行在同一弗林克工作中的所有管線多弗林克作業的順序。

0

我會盡力回答我所知道的。

1)是的,CLI客戶端可以使用-p指定parallelism參數。您可以說這是並行實例的最大數量。但是,我沒有看到並行和命令之間的聯繫?據我所知,訂單是由Flink管理的,事件中提供的時間戳或他自己的攝取時間戳。如果您想維護與不同數據源的順序,對我來說看起來很複雜,或者您可能會將這些不同的數據源合併爲一個。

2)你的3條管線可以並行運行,如果您有並行設置爲3。我覺得這裏在不同的時隙並行手段。

跟進問題),您可以檢查哪些任務映射在http://localhost:8081在JobManager的web前端,其任務插槽。

+0

我上傳了我的flink程序的執行計劃,在那裏你可以看到在最後一個地圖之前有一個重新平衡。根據[這些](https://ci.apache.org/projects/flink/flink-docs-release-1.3/concepts/programming-model.html#parallel-dataflows)flink docs命令不保證與rebalance( )。我假設寫給卡夫卡的兩個子任務之間存在競爭條件,並且弄亂了我的輸出順序。因此,我認爲並行性> 1會弄亂我的結果。 – BenScape

0

請在下面找到使用側輸出和插槽組進行本地擴展的示例。

package org.example 

/* 
* Licensed to the Apache Software Foundation (ASF) under one 
* or more contributor license agreements. See the NOTICE file 
* distributed with this work for additional information 
* regarding copyright ownership. The ASF licenses this file 
* to you under the Apache License, Version 2.0 (the 
* "License"); you may not use this file except in compliance 
* with the License. You may obtain a copy of the License at 
* 
*  http://www.apache.org/licenses/LICENSE-2.0 
* 
* Unless required by applicable law or agreed to in writing, software 
* distributed under the License is distributed on an "AS IS" BASIS, 
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
* See the License for the specific language governing permissions and 
* limitations under the License. 
*/ 

import org.apache.flink.streaming.api.functions.ProcessFunction 
import org.apache.flink.streaming.api.scala._ 
import org.apache.flink.util.Collector 

/** 
    * This example shows an implementation of WordCount with data from a text socket. 
    * To run the example make sure that the service providing the text data is already up and running. 
    * 
    * To start an example socket text stream on your local machine run netcat from a command line, 
    * where the parameter specifies the port number: 
    * 
    * {{{ 
    * nc -lk 9999 
    * }}} 
    * 
    * Usage: 
    * {{{ 
    * SocketTextStreamWordCount <hostname> <port> <output path> 
    * }}} 
    * 
    * This example shows how to: 
    * 
    * - use StreamExecutionEnvironment.socketTextStream 
    * - write a simple Flink Streaming program in scala. 
    * - write and use user-defined functions. 
    */ 
object SocketTextStreamWordCount { 

    def main(args: Array[String]) { 
    if (args.length != 2) { 
     System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>") 
     return 
    } 

    val hostName = args(0) 
    val port = args(1).toInt 
    val outputTag1 = OutputTag[String]("side-1") 
    val outputTag2 = OutputTag[String]("side-2") 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    env.getConfig.enableObjectReuse() 

    //Create streams for names and ages by mapping the inputs to the corresponding objects 
    val text = env.socketTextStream(hostName, port).slotSharingGroup("processElement") 
    val counts = text.flatMap { 
     _.toLowerCase.split("\\W+") filter { 
     _.nonEmpty 
     } 
    } 
     .process(new ProcessFunction[String, String] { 
     override def processElement(
            value: String, 
            ctx: ProcessFunction[String, String]#Context, 
            out: Collector[String]): Unit = { 
      if (value.head <= 'm') ctx.output(outputTag1, String.valueOf(value)) 
      else ctx.output(outputTag2, String.valueOf(value)) 
     } 
     }) 

    val sideOutputStream1: DataStream[String] = counts.getSideOutput(outputTag1) 
    val sideOutputStream2: DataStream[String] = counts.getSideOutput(outputTag2) 

    val output1 = sideOutputStream1.map { 
     (_, 1) 
    }.slotSharingGroup("map1") 
     .keyBy(0) 
     .sum(1) 

    val output2 = sideOutputStream2.map { 
     (_, 1) 
    }.slotSharingGroup("map2") 
     .keyBy(0) 
     .sum(1) 

    output1.print() 
    output2.print() 

    env.execute("Scala SocketTextStreamWordCount Example") 
    } 

}