2016-07-02 355 views
5

Elasticsaerch的文檔僅涵蓋將完整索引加載到Spark。如何使用Pyspark和Dataframe查詢Elasticsearch索引

from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type") 
df.printSchema() 

如何執行查詢以從Elasticsearch索引中返回數據並使用pyspark將其作爲DataFrame加載到Spark?

回答

4

以下是我該怎麼做。

一般環境設置和命令:

export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6 
export PYSPARK_DRIVER_PYTHON=ipython2 

./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar 

代碼:

from pyspark import SparkConf 
from pyspark.sql import SQLContext 

conf = SparkConf().setAppName("ESTest") 
sc = SparkContext(conf=conf) 
sqlContext = SQLContext(sc) 

q ="""{ 
    "query": { 
    "filtered": { 
     "filter": { 
     "exists": { 
      "field": "label" 
     } 
     }, 
     "query": { 
     "match_all": {} 
     } 
    } 
    } 
}""" 

es_read_conf = { 
    "es.nodes" : "localhost", 
    "es.port" : "9200", 
    "es.resource" : "titanic/passenger", 
    "es.query" : q 
} 

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf) 

sqlContext.createDataFrame(es_rdd).collect() 

你也可以定義數據幀列。有關更多信息,請參閱Here

希望它有幫助!

+0

這就是我現在正在做的,我希望有一種方法可以直接獲取過濾的DataFrame –

+1

我不確定使用ES-Hadoop Spark連接器的最新API是否可行。 –

+1

有沒有辦法使用這個API將數據框寫入elasticsearch? –

0

我使用pyspark在亞馬遜的EMR羣集中運行我的代碼。然後,順便我把它的工作是執行以下步驟:

1)將在羣集創建這個引導作用(創建本地主機elasticsearch服務器):

s3://awssupportdatasvcs.com/bootstrap-actions/elasticsearch/elasticsearch_install.4.0.0.rb 

2)我運行這些命令來填充在elasticsearch數據庫的一些數據:

curl -XPUT "http://localhost:9200/movies/movie/1" -d' { 
    "title": "The Godfather", 
    "director": "Francis Ford Coppola", 
    "year": 1972 
    }' 

您還可以,如果你想運行其他捲曲的命令,如:

curl -XGET http://localhost:9200/_search?pretty=true&q={'matchAll':{''}} 

3)我inited pyspark使用以下參數:

pyspark --driver-memory 5G --executor-memory 10G --executor-cores 2 --jars=elasticsearch-hadoop-5.5.1.jar 

我已經下載了elasticsearch Python客戶端之前

4)我運行下面的代碼:

from pyspark import SparkConf 
from pyspark.sql import SQLContext 

q ="""{ 
    "query": { 
    "match_all": {} 
    } 
}""" 

es_read_conf = { 
    "es.nodes" : "localhost", 
    "es.port" : "9200", 
    "es.resource" : "movies/movie", 
    "es.query" : q 
} 

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf) 

sqlContext.createDataFrame(es_rdd).collect() 

後來我終於得到了命令的成功結果。

0

我遇到類似的問題,要將地理過濾數據導入PySpark DataFrame。我在Spark版本2.1.1和ES版本5.2中使用elasticsearch-spark-20_2.11-5.2.2.jar。我能夠通過指定我的查詢作爲一個選項加載數據到數據幀在創建數據框

我的地理查詢

q ="""{ 
    "query": { 
     "bool" : { 
      "must" : { 
       "match_all" : {} 
      }, 
      "filter" : { 
       "geo_distance" : { 
        "distance" : "100km", 
        "location" : { 
         "lat" : 35.825, 
         "lon" : -87.99 
        } 
       } 
      } 
     } 
    } 
}""" 

我用下面的命令將數據加載到數據幀

spark_df = spark.read.format("es").option("es.query", q).load("index_name") 
相關問題