2016-10-01 126 views
0

我在Spark 2.0中擁有約5000萬個字典的rdd。它們非常小,只佔用大約12Gb的內存(按照Spark Web UI中的存儲選項卡)。我已經完成了所有在此RDD上處理的處理,現在我想從Spark中將其取出,因爲我現在需要將此數據提供給另一個系統。從Spark中獲取數據 - Python

我對此無從得知,需要一些幫助。理想情況下,我想要做的是將每個分區發送到驅動程序,並通過另一個python模塊將數據本地轉儲出去。這將需要最少的附加編碼。

我希望這樣的事情會的工作:

for x in processed_data.toDF().toLocalIterator(): 
    index.add(x) 

,但沒有喜悅,我得到了這個方便的堆棧跟蹤:

<ipython-input-20-b347e9bd2075> in <module>() 
----> 1 for x in processed_data.toDF().toLocalIterator(): 
     2  index.add(x) 

/apps/spark2/python/pyspark/rdd.py in _load_from_socket(port, serializer) 
    140  try: 
    141   rf = sock.makefile("rb", 65536) 
--> 142   for item in serializer.load_stream(rf): 
    143    yield item 
    144  finally: 

/apps/spark2/python/pyspark/serializers.py in load_stream(self, stream) 
    137   while True: 
    138    try: 
--> 139     yield self._read_with_length(stream) 
    140    except EOFError: 
    141     return 

/apps/spark2/python/pyspark/serializers.py in _read_with_length(self, stream) 
    154 
    155  def _read_with_length(self, stream): 
--> 156   length = read_int(stream) 
    157   if length == SpecialLengths.END_OF_DATA_SECTION: 
    158    raise EOFError 

/apps/spark2/python/pyspark/serializers.py in read_int(stream) 
    541 
    542 def read_int(stream): 
--> 543  length = stream.read(4) 
    544  if not length: 
    545   raise EOFError 

/usr/lib/python3.4/socket.py in readinto(self, b) 
    372   while True: 
    373    try: 
--> 374     return self._sock.recv_into(b) 
    375    except timeout: 
    376     self._timeout_occurred = True 

timeout: timed out 

我檢查所有的日誌文件,我不知道它可能是什麼。我甚至試圖重新分區rdd,所以我有更小的分區,仍然沒有運氣。

由於我的司機大約有內存40GB分配,然後我試圖收集它,然後我開始這些一堆:

ExecutorLostFailure (executor 3 exited caused by one of the running 
tasks) Reason: Remote RPC client disassociated. Likely due to 
containers exceeding thresholds, or network issues. Check driver logs 
for WARN messages. 

我查了日誌,甚至沒有看到任何問題。 ,即使遠程執行完畢被寫入DF出HDFS的唯一的事:

processed_data.toDF().write.json() 

然而問題是,那麼我剛剛得到的數據轉儲沒有適當的JSON的語法,就像每個對象後逗號.. ..

我在這裏錯過了什麼嗎?這真是令人沮喪,因爲我用一小部分數據嘗試了這一點,並且toLocalIterator工作得很好。

在此先感謝

+0

這很可能只是一個症狀而不是核心問題。另外,爲什麼你在收集之前轉換爲'DataFrame'。它根本沒有意義。 – zero323

+0

我不認爲rdd在數據框上有寫入方法。解決其中一些問題的好方法是什麼? – browskie

+0

它提供了許多寫入方法,但它仍不能解釋在不執行寫入操作時轉換的原因。 Re O'DataFrame.write.json' - 每行提供一個有效的JSON文檔。讀它不應該有問題。 – zero323

回答

0

我明白,這是一個已知的bug:https://issues.apache.org/jira/browse/SPARK-18281

應固定在版本2.0.3和2.1.1(均未尚未公佈,和2.1似乎有bug仍然)。

與此同時,如果內存不是問題,則將toLocalIterator替換爲collect應該可行。