2016-08-04 90 views
3

因此,我正在學習通過Apache Spark從ElasticSearch獲取數據。 假設我已連接到具有「用戶」索引的ElasticSearch。Spark:使用ElasticSearch索引進行優化連接

sqlContext = SQLContext(sc) 
usersES=sqlContext.read.format('org.elasticsearch.spark.sql').option('es.nodes','mynode').load('users/user') 

解釋(usersES)顯示我:

==實際規劃==

掃描ElasticsearchRelation(MAP(es.nodes - > MYNODE, es.resource - > 用戶/用戶),org.apache.spark.sql.SQLContext @ 6c78e806,無)[關於#145,#活動146,bdate#147, UID#148]

當我使用過濾器:

usersES.filter(usersES.uid==1566324).explain() 

==實際規劃==過濾器(UID#203L = 1566324) + - 掃描ElasticsearchRelation(MAP(es.nodes - > MYNODE,es.resource - > 用戶/用戶),org.apache.spark.sql.SQLContext @ 6c78e806,None)[about#145,activities#146,bdate#147,uid#148] PushedFilters:[EqualTo(uid,1566324)]

如您所見,Spark優雅地將過濾器推送到ElasticSearch,使索引搜索更快速舒適。

但是,當我嘗試將usersES與其他數據框加入時,我始終得到相同的問題: Spark掃描整個ElasticSearch索引,而不是推送任何我給它的過濾器。 例如:

a = sc.parallelize([1566324,1566329]).map(Row('id')).toDF() 
a.join(usersES, usersES.uid==a.id).explain() 

所示:

SortMergeJoin [ID#210L],[UID#203L]: - 排序[ID#210L ASC],假,0: + - TungstenExchange散列分區(id#210L,200),無:+ - ConvertToUnsafe:+ - 掃描ExistingRDD [id#210L] + - Sort [uid#203L ASC],false,0 + - TungstenExchange hashpartitioning(uid#203L,200),無 + - ConvertToUnsafe + - Scan ElasticsearchRelation(Map(es.nodes - > mynode,es.resource - > user S /用戶),org.apache.spark.sql.SQLContext @ 6c78e806,無)[約145#,#活動146,bdate#147,#UID 148]

請告訴我,是可能的在Elasticsearch內部推入過濾器內部的連接?

回答

2

這是一個預期的行爲,是elaticsearch-hadoop連接器支持下推謂詞,但加入時不會推送。

這是因爲連接操作不知道密鑰在數據框中的分區方式。

默認情況下,此操作將散列兩個數據框的所有密鑰,將具有相同密鑰散列的所有元素通過網絡發送到同一臺計算機,然後將該元素與該計算機上的相同密鑰連接在一起。

這就是爲什麼你得到的執行計劃沒有被推下謂詞。

編輯:似乎自2.1版的IN條款像連接器支持。如果你的DataFrame a不大,你應該使用它。

+0

謝謝!似乎這是唯一的方法,雖然不符合我的需要(在IN的df到py中有數千條記錄) –

+0

比唯一的方法是使用spark來加入而不是使用In子句,或者甚至可以使用廣播變量。 – eliasah

+0

你能否請至少接受解決問題的答案? – eliasah