我是Apache Spark的新手,需要同時在我的Spark羣集上運行多個長時間運行的進程(作業)。通常,這些單獨的流程(每個流程都是自己的工作)都需要相互溝通。暫時,我正在考慮使用Kafka成爲這些流程之間的經紀人。因此,高級別工作對業務通信會是什麼樣子:使用Kafka在長時間運行的Spark作業之間進行通信
- 招聘#1做了一些工作,併發布消息到卡夫卡的話題
- 招聘#2被設置爲流媒體接收器(使用
StreamingContext
),以同樣的卡夫卡的話題,並儘快將消息發佈到的話題,#2任務消耗它 - 招聘#2現在可以做一些工作的基礎上,該消息也消耗
從我可以告訴,流上下文阻止在Spark驅動程序節點上運行的偵聽器。這意味着,一旦我開始流的消費,像這樣:
def createKafkaStream(ssc: StreamingContext,
kafkaTopics: String, brokers: String): DStream[(String,
String)] = {
// some configs here
KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, props, topicsSet)
}
def consumerHandler(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(10))
createKafkaStream(ssc, "someTopic", "my-kafka-ip:9092").foreachRDD(rdd => {
rdd.collect().foreach { msg =>
// Now do some work as soon as we receive a messsage from the topic
}
})
ssc
}
StreamingContext.getActive.foreach {
_.stop(stopSparkContext = false)
}
val ssc = StreamingContext.getActiveOrCreate(consumerHandler)
ssc.start()
ssc.awaitTermination()
...現在有2個含義:
- 司機正在攔截和監聽工作從卡夫卡到消費;和
- 工作時(消息)被接收時,它們被髮送到任何可用的工人節點實際上在
所以首先被執行,如果任何我上面說的是不正確或有誤導之嫌,請開始糾正我!假設我或多或少是正確的,那麼我只是想知道是否有一個更具可擴展性或高性能的方式來完成這一點,根據我的標準。再次,我有兩個長期運行的作業(作業#1和作業#2)在我的Spark節點上運行,其中一個作業需要能夠「將作業發送給另一個作業」。有任何想法嗎?
順便說一句 - 在foreachRDD中使用'rdd.collect'會導致他們將整個數據集發回給驅動程序。你絕對不希望這樣。 –
感謝@Yuval(+1),最好/更有效的方式來獲得訪問消費的個人消息?這不是我的意圖,我只是新API,所以請隨時更新我的代碼! – smeeb
你可以使用'rdd.foreach'。 –