因此,我正在學習通過Apache Spark從ElasticSearch獲取數據。 假設我已連接到具有「用戶」索引的ElasticSearch。Spark:使用ElasticSearch索引進行優化連接
sqlContext = SQLContext(sc)
usersES=sqlContext.read.format('org.elasticsearch.spark.sql').option('es.nodes','mynode').load('users/user')
解釋(usersES)顯示我:
==實際規劃==
掃描ElasticsearchRelation(MAP(es.nodes - > MYNODE, es.resource - > 用戶/用戶),org.apache.spark.sql.SQLContext @ 6c78e806,無)[關於#145,#活動146,bdate#147, UID#148]
當我使用過濾器:
usersES.filter(usersES.uid==1566324).explain()
==實際規劃==過濾器(UID#203L = 1566324) + - 掃描ElasticsearchRelation(MAP(es.nodes - > MYNODE,es.resource - > 用戶/用戶),org.apache.spark.sql.SQLContext @ 6c78e806,None)[about#145,activities#146,bdate#147,uid#148] PushedFilters:[EqualTo(uid,1566324)]
如您所見,Spark優雅地將過濾器推送到ElasticSearch,使索引搜索更快速舒適。
但是,當我嘗試將usersES與其他數據框加入時,我始終得到相同的問題: Spark掃描整個ElasticSearch索引,而不是推送任何我給它的過濾器。 例如:
a = sc.parallelize([1566324,1566329]).map(Row('id')).toDF()
a.join(usersES, usersES.uid==a.id).explain()
所示:
SortMergeJoin [ID#210L],[UID#203L]: - 排序[ID#210L ASC],假,0: + - TungstenExchange散列分區(id#210L,200),無:+ - ConvertToUnsafe:+ - 掃描ExistingRDD [id#210L] + - Sort [uid#203L ASC],false,0 + - TungstenExchange hashpartitioning(uid#203L,200),無 + - ConvertToUnsafe + - Scan ElasticsearchRelation(Map(es.nodes - > mynode,es.resource - > user S /用戶),org.apache.spark.sql.SQLContext @ 6c78e806,無)[約145#,#活動146,bdate#147,#UID 148]
請告訴我,是可能的在Elasticsearch內部推入過濾器內部的連接?
謝謝!似乎這是唯一的方法,雖然不符合我的需要(在IN的df到py中有數千條記錄) –
比唯一的方法是使用spark來加入而不是使用In子句,或者甚至可以使用廣播變量。 – eliasah
你能否請至少接受解決問題的答案? – eliasah