我使用pyspark在亞馬遜的EMR羣集中運行我的代碼。然後,順便我把它的工作是執行以下步驟:
1)將在羣集創建這個引導作用(創建本地主機elasticsearch服務器):
s3://awssupportdatasvcs.com/bootstrap-actions/elasticsearch/elasticsearch_install.4.0.0.rb
2)我運行這些命令來填充在elasticsearch數據庫的一些數據:
curl -XPUT "http://localhost:9200/movies/movie/1" -d' {
"title": "The Godfather",
"director": "Francis Ford Coppola",
"year": 1972
}'
您還可以,如果你想運行其他捲曲的命令,如:
curl -XGET http://localhost:9200/_search?pretty=true&q={'matchAll':{''}}
3)我inited pyspark使用以下參數:
pyspark --driver-memory 5G --executor-memory 10G --executor-cores 2 --jars=elasticsearch-hadoop-5.5.1.jar
我已經下載了elasticsearch Python客戶端之前
4)我運行下面的代碼:
from pyspark import SparkConf
from pyspark.sql import SQLContext
q ="""{
"query": {
"match_all": {}
}
}"""
es_read_conf = {
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : "movies/movie",
"es.query" : q
}
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)
sqlContext.createDataFrame(es_rdd).collect()
後來我終於得到了命令的成功結果。
這就是我現在正在做的,我希望有一種方法可以直接獲取過濾的DataFrame –
我不確定使用ES-Hadoop Spark連接器的最新API是否可行。 –
有沒有辦法使用這個API將數據框寫入elasticsearch? –