2016-04-23 60 views
0

TL; DR:我的Spark應用正在從Kafka發送的郵件總數中收取0.1%的郵件。我的主要嫌疑人:對於每個批處理間隔(例如,1秒)新的JVM被實例化。我試圖使用延遲加載的.map()轉換來提取數據。驅動程序&執行程序代碼是否相互詳盡可能有什麼可以做的嗎?Spark從Kafka讀取時丟失了99.9%的郵件

龍版採用的細節:

我的事件流程如下:一個Java類生產樣品(以.json字符串)數據使用卡夫卡的kafka-run-class.sh腳本&運行。這些消息是從kafka收集的,spark是使用java directstream從中讀取的。爲了簡潔起見,我們假設我的數據生產者發送的json消息的值可以是1或者0 & Spark應用程序的目的是區分1的& 0。數據生成器還將計數值附加到發送的消息中。

問題:在試驗中,我從數據發生器發送10000個信息。我是一個用於此輸入數據的kibana儀表板,它顯示9600信息(±0.1%,但始終如一)

Q1。剩餘的400信息在哪裏丟失?

現在,spark(在1個線程上運行1秒的批處理間隔&)讀取這些消息&其輸出I'v在另一個可視化文件中輸入到相同的kibana。

它一致地讀取10個(或有時20個)信息。

如果它讀取10封郵件,那些具有計數值1-10 &如果它讀取20條短信息,計數值是1-10 &〜3000-3010

Q2。爲什麼Spark僅獲得10(或最大20)msgs?

我更改了Spark應用程序中的設置「auto.offset.reset」,「最小」,但這並沒有什麼幫助。它只從計數1-10讀取10個信息。

Q3。需要做些什麼才能使它從kafka主題開始讀取?

1件事我能想到的是這裏所奪的運動是我在.MAP功能攝入封郵件:

JavaDStream<String> lines = stream.map(new Function<Tuple2<String, String>, String>() { 
    public String call(Tuple2<String, String> tuple2) { 
    my_fn(tuple2._2().toString()); 
    return tuple2._2(); 
} 

有人可以闡明窗口中的一些光,減少功能&如何使其與火花看到總消息的0.1%?

注:我使用logstash實例遷移從火花>卡夫卡,kafka-我需要操縱數據生成器腳本json的OBJ數據>彈性

。我正在使用maven &使用JSON依賴關係&它內置罰款。但在嘗試使用kafka-run-class.sh運行該類時,它拋出了json對象的classNotFoundException。

Q4。我如何使用kafka-run-class方法運行着色的jar,或者它需要作爲獨立的java程序運行,在這種情況下,它會以與使用kafka的開箱即用腳本運行時相同的速率發出msgs因爲我認爲它會考慮並行排隊以保持背壓。

使用這個kafka腳本我能夠在我的機器上滲出1.4Mpps。

編輯:關於卡夫卡火花分區&代碼邏輯更多信息

圖例: TOPIC_1是從生產者的數據(腳本) - >卡夫卡 topic_2是從火花發射的數據>卡夫卡

 
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic_1 
      Topic:topic_1 PartitionCount:1 ReplicationFactor:1 Configs: 
      Topic: topic_1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 

curl 'localhost:9200/_cat/indices?v' 
health status index   pri rep docs.count docs.deleted store.size pri.store.size 
yellow open topic_1  5 1 11002386   0 702.1mb  702.1mb 
yellow open topic_2  5 1  6307   0 786.4kb  786.4kb 
yellow open .kibana   1 1   9   0  47kb  47kb 

火花,我正在與1個執行程序運行& 1輪驅動以在1個線程上4個核與1秒間隔批次4GB 駕駛員&執行存儲器在具有10M +封郵件試驗,TOPIC_1接收阿爾莫st正確的數字(9600/10000 Q1以上)與此同時topic_2可能會得到僅僅〜6k的消息 關於分區,所有內容都在同一臺機器上的獨立模式下,32GB內存。我的意思是,數據製作者卡夫卡,ELK,火花。

Data_gen:簡單的java kafka生成器代碼,以1:1的比率發送值爲0或1的json字符串。 Spark_app:主要qn中的代碼。 my_fn()得到string信息,將其轉換成JSON &認爲如果該值是0或1

+1

這個問題有有關的東西我們並不需要太多的細節,並缺少簡潔,reproduceable測試用例這說明你的問題。我可以給你的一個答案是Q3--從Topic的開頭讀取,你必須使用具有'fromOffsets'參數的'createDirectStream'版本之一,然後你必須使用該參數設置偏移量參數。您需要確切地知道您的分區使用該參數的樣子 - 或者您最好準備從Kafka中獲取分區/偏移量信息。對於測試,您可以在分區和偏移量中進行硬編碼。 –

+0

謝謝@DavidGriffin。我會嘗試你對Q3的建議。但Q2對我來說更是一個阻礙。你能告訴你所有的信息是你在尋找什麼?太多的細節從來都不是問題;) –

+0

你知道你的經紀人佈局嗎?什麼分區坐在哪裏?你有多少執行者在運行?實際的代碼是什麼樣的?理想情況下,可以開始使用案例:我們可以遵循的配方來重現您所看到的內容。 –

回答

0
map() transformations are lazy so they don’t like to work until asked for. 
I was expecting messages in a map() transformation. 
put this expectation in rdd.take() which in non-lazy inside foreachRDD() 
It worked.