2016-11-11 63 views
3

我試圖在版本2.4.0上的彈性雲上針對Elastic Search上的彈性搜索編寫一對rdd。 我正在使用elasticsearch-spark_2.10-2.4.0插件來寫入ES。 這裏是我用寫信給ES代碼:從火花寫入rdd彈性搜索失敗

def predict_imgs(r): 
    import json 
    out_d = {} 
    out_d["pid"] = r["pid"] 
    out_d["other_stuff"] = r["other_stuff"] 

    return (r["pid"], json.dumps(out_d)) 

res2 = res1.map(predict_imgs) 

es_write_conf = { 
"es.nodes" : image_es, 
#"es.port" : "9243", 
"es.resource" : "index/type", 
"es.nodes.wan.only":"True", 
"es.write.operation":"upsert", 
"es.mapping.id":"product_id", 
"es.nodes.discovery" : "false", 
"es.net.http.auth.user": "username", 
"es.net.http.auth.pass": "pass", 
"es.input.json": "true", 
"es.http.timeout":"1m", 
"es.scroll.size":"10", 
"es.batch.size.bytes":"1mb", 
"es.http.retries":"1", 
"es.batch.size.entries":"5", 
"es.batch.write.refresh":"False", 
"es.batch.write.retry.count":"1", 
"es.batch.write.retry.wait":"10s"} 

res2.saveAsNewAPIHadoopFile(
path='-', 
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
keyClass="org.apache.hadoop.io.NullWritable", 
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
conf=es_write_conf) 

我得到的錯誤如下:

Py4JJavaError: An error occurred while calling  z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 744 in stage 26.0 failed 4 times, most recent failure: Lost task 744.3 in stage 26.0 (TID 2841, 10.181.252.29): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) 

有趣的部分是當我做了就第一個幾本作品在RDD2的元素,然後建立一個新RDD了出來,並把它寫入ES,它完美的作品:

x = sc.parallelize([res2.take(1)]) 
x.saveAsNewAPIHadoopFile(
path='-', 
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
keyClass="org.apache.hadoop.io.NullWritable", 
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
conf=es_write_conf) 

我使用的彈性雲(彈性搜索的雲服務)和Databricks(雲Apache Spark的冒險) 難道ES不能跟上Spark寫入ES的過程嗎? 我將彈性雲的大小從2GB RAM增加到8GB RAM。

有沒有推薦配置的es_write_conf我上面用過?任何其他confs,你可以想到的? 更新到ES 5.0有幫助嗎?

任何幫助表示讚賞。這幾天一直在掙扎着。謝謝。

回答

2

它看起來像pyspark計算的問題,而不是必需的elasticsearch保存過程。確保您的RDDS都OK方式:

  1. 表演count()上RDD1集(以 「物化」 的結果)
  2. 上RDD2執行count()

如果計數正常,嘗試用緩存的結果保存到ES前:

res2.cache() 
res2.count() # to fill the cache 
res2.saveAsNewAPIHadoopFile(... 

它的問題仍然存在,嘗試看看死者遺囑執行人標準錯誤和標準輸出(你可以找到他們處於S執行人標籤parkUI)。

我也注意到在es_write_conf的非常小的批量大小,嘗試增加到500或1000以獲得更好的性能。