2017-07-20 110 views
1

我想在Flink中的每個節點上共享一個HashMap,並允許節點更新該HashMap。我有這樣的代碼至今:如何將HashMap附加到Flink中的配置對象?

object ParallelStreams { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    //Is there a way to attach a HashMap to this config variable? 
    val config = new Configuration() 
    config.setClass("HashMap", Class[CustomGlobal]) 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

    class CustomGlobal extends ExecutionConfig.GlobalJobParameters { 
     override def toMap: util.Map[String, String] = { 
     new HashMap[String, String]() 
     } 
    } 

    class MyCoMap extends RichCoMapFunction[String, String, String] { 
     var users: HashMap[String, String] = null 
     //How do I get access the HashMap I attach to the global config here? 
     override def open(parameters: Configuration): Unit = { 
     super.open(parameters) 
     val globalParams = getRuntimeContext.getExecutionConfig.getGlobalJobParameters 
     val globalConf = globalParams[Configuration] 
     val hashMap = globalConf.getClass 

     } 
     //Other functions to override here 
    } 
} 

我在想,如果你可以將自定義對象在這裏val config = new Configuration()創建config變量? (請參閱上面代碼中的註釋)。

我注意到你只能附加原始值。我創建了一個自定義類,它擴展了ExecutionConfig.GlobalJobParameters,並通過執行config.setClass("HashMap", Class[CustomGlobal])附加了該類,但是我不確定是否應該這樣做?

回答

1

向運算符分配參數的常用方法是將它們作爲函數類中的常規成員變量。在計劃構建過程中創建和分配的函數對象被序列化併發送給所有工作人員。所以你不必通過配置傳遞參數。

這將如下所示

class MyMapper(map: HashMap) extends MapFunction[String, String] { 
// class definition 
} 


val inStream: DataStream[String] = ??? 

val myHashMap: HashMap = ??? 
val myMapper: MyMapper = new MyMapper(myHashMap) 
val mappedStream: DataStream[String] = inStream.map(myMapper) 

myMapper對象序列化(使用Java序列化)和運執行。所以map的類型必須實現Java Serializable接口。

編輯:我錯過了你希望地圖可以從所有並行任務中更新的部分。這對Flink來說是不可能的。您必須完全複製地圖並全部更新(通過廣播)或使用外部系統(鍵值存儲)。

+0

另一種方法是使用側面輸入。請參閱https://stackoverflow.com/a/45219889/3026310瞭解一些指針。 –

相關問題