2

我是Apache Spark的新手,需要同時在我的Spark羣集上運行多個長時間運行的進程(作業)。通常,這些單獨的流程(每個流程都是自己的工作)都需要相互溝通。暫時,我正在考慮使用Kafka成爲這些流程之間的經紀人。因此,高級別工作對業務通信會是什麼樣子:使用Kafka在長時間運行的Spark作業之間進行通信

  1. 招聘#1做了一些工作,併發布消息到卡夫卡的話題
  2. 招聘#2被設置爲流媒體接收器(使用StreamingContext),以同樣的卡夫卡的話題,並儘快將消息發佈到的話題,#2任務消耗它
  3. 招聘#2現在可以做一些工作的基礎上,該消息也消耗

從我可以告訴,流上下文阻止在Spark驅動程序節點上運行的偵聽器。這意味着,一旦我開始流的消費,像這樣:

def createKafkaStream(ssc: StreamingContext, 
     kafkaTopics: String, brokers: String): DStream[(String, 
     String)] = { 
    // some configs here 
    KafkaUtils.createDirectStream[String, String, StringDecoder, 
     StringDecoder](ssc, props, topicsSet) 
} 

def consumerHandler(): StreamingContext = { 
    val ssc = new StreamingContext(sc, Seconds(10)) 

    createKafkaStream(ssc, "someTopic", "my-kafka-ip:9092").foreachRDD(rdd => { 
     rdd.collect().foreach { msg => 
      // Now do some work as soon as we receive a messsage from the topic 
     } 
    }) 

    ssc 
} 

StreamingContext.getActive.foreach { 
    _.stop(stopSparkContext = false) 
} 

val ssc = StreamingContext.getActiveOrCreate(consumerHandler) 
ssc.start() 
ssc.awaitTermination() 

...現在有2個含義:

  1. 司機正在攔截和監聽工作從卡夫卡到消費;和
  2. 工作時(消息)被接收時,它們被髮送到任何可用的工人節點實際上在

所以首先被執行,如果任何我上面說的是不正確或有誤導之嫌,請開始糾正我!假設我或多或少是正確的,那麼我只是想知道是否有一個更具可擴展性或高性能的方式來完成這一點,根據我的標準。再次,我有兩個長期運行的作業(作業#1和作業#2)在我的Spark節點上運行,其中一個作業需要能夠「將作業發送給另一個作業」。有任何想法嗎?

+1

順便說一句 - 在foreachRDD中使用'rdd.collect'會導致他們將整個數據集發回給驅動程序。你絕對不希望這樣。 –

+0

感謝@Yuval(+1),最好/更有效的方式來獲得訪問消費的個人消息?這不是我的意圖,我只是新API,所以請隨時更新我的​​代碼! – smeeb

+1

你可以使用'rdd.foreach'。 –

回答

2

從我所知道的情況來看,流上下文阻塞了偵聽器, 在Spark Driver節點上運行。

A StreamingContext(單數)不是阻塞監聽器。您的工作是爲您的流式作業創建執行圖。

當您從Kafka開始閱讀時,您指定要每隔10秒獲取新記錄。從現在起發生的情況取決於您使用Kafka的哪個Kafka抽象,通過KafkaUtils.createStream的Receiver方法或通過KafkaUtils.createDirectStream的無接收方法。

在這兩種方法中,一般都會從卡夫卡消費數據,然後分派給每個Spark工作人員,並行處理並行

然後我只是不知道是否有做到這一點

這種方法更具擴展性的高性能或方式 是高度可擴展性。使用無接收方法時,每個Kafka分區都映射到給定RDD中的Spark分區。您可以通過增加Kafka中的分區數量,或通過重新分區Spark內的數據(使用DStream.repartition)來增加並行性。我建議測試此設置以確定它是否適合您的性能要求。

+0

感謝@Yuval(+1),如果您不介意的話,可以爲您提供幾個後續問題! **(1)**您是否可以首先確認,爲了在Spark上針對Kafka主題設置「競爭消費者」,我需要每個羣集消費1個消費者?接收器和無接收器配置都是如此嗎? **(2)**什麼時候使用接收器與無接收器方法的一般準則是什麼? – smeeb

+0

**(3)**當你說「*當你從卡夫卡開始閱讀時,你指定你想每10秒取一個新記錄......」,這是在哪裏配置的?它可以設置爲10秒以外的其他值嗎?最後,**(4)**當使用接收方法時,什麼是卡夫卡分區?再次感謝! – smeeb

+1

@smeeb 1)「競爭消費者」*的含義是什麼? 2)我通常建議使用直接流媒體方法,它是在Spark 1.3.0中引入的,並且具有許多優點。我建議閱讀它。 3)在這裏配置:'val ssc = new StreamingContext(sc,Seconds(10))''。 4)在基於接收機的方法中,沒有分區映射。如果你想同時從Kafka讀取,你必須連接多個消費者,這意味着你必須執行多個'KafkaUtils.createStream'調用並將它們聯合起來。 –