我想在Flink中的每個節點上共享一個HashMap
,並允許節點更新該HashMap。我有這樣的代碼至今:如何將HashMap附加到Flink中的配置對象?
object ParallelStreams {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//Is there a way to attach a HashMap to this config variable?
val config = new Configuration()
config.setClass("HashMap", Class[CustomGlobal])
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
class CustomGlobal extends ExecutionConfig.GlobalJobParameters {
override def toMap: util.Map[String, String] = {
new HashMap[String, String]()
}
}
class MyCoMap extends RichCoMapFunction[String, String, String] {
var users: HashMap[String, String] = null
//How do I get access the HashMap I attach to the global config here?
override def open(parameters: Configuration): Unit = {
super.open(parameters)
val globalParams = getRuntimeContext.getExecutionConfig.getGlobalJobParameters
val globalConf = globalParams[Configuration]
val hashMap = globalConf.getClass
}
//Other functions to override here
}
}
我在想,如果你可以將自定義對象在這裏val config = new Configuration()
創建config
變量? (請參閱上面代碼中的註釋)。
我注意到你只能附加原始值。我創建了一個自定義類,它擴展了ExecutionConfig.GlobalJobParameters
,並通過執行config.setClass("HashMap", Class[CustomGlobal])
附加了該類,但是我不確定是否應該這樣做?
另一種方法是使用側面輸入。請參閱https://stackoverflow.com/a/45219889/3026310瞭解一些指針。 –