我給了一個通話一段時間回來與Spark和彈性搜索(約0.9天),我最近更新了一些對當今的例子(閱讀1.1)。我已發佈slides和example code。希望有所幫助!
我也複製了相關部分(從我自己的GitHub庫)的位置:
import org.elasticsearch.spark.sql._
...
val tweetsAsCS =
createSchemaRDD(tweetRDD.map(SharedIndex.prepareTweetsCaseClass))
tweetsAsCS.saveToEs(esResource)
請注意,我們沒有指定任何ES節點。這將默認嘗試保存到本地主機上的羣集。如果我們要使用不同的集羣,我們可以添加:
// if we want to have a different es cluster we can add
import org.elasticsearch.hadoop.cfg.ConfigurationOptions
val config = new SparkConf()
config.set(ConfigurationOptions.ES_NODES, node) // set the node for discovery
// other config settings
val sc = new SparkContext(config)
這樣做的第一部分(索引一些數據)。
從Spark查詢ES也變得更簡單了,雖然只有當您的數據類型受連接器映射支持時(我遇到的主要問題不是地理位置,但它足夠容易擴展映射器如果你碰到這個)。
val query = "{\"query\": {\"filtered\" : {\"query\" : {\"match_all\" : {}},\"filter\" : { \"geo_distance\" : { \"distance\" : \""+ dist + "km\", \"location\" : { \"lat\" : "+ lat +", \"lon\" : "+ lon +" }}}}}}"
val tweets = sqlCtx.esRDD(esResource, query)
esRDD函數通常不在SQLContext上,但是我們上面導入的隱式轉換使它可用。 tweets現在是一個SchemaRDD,我們可以根據需要更新它,並保存結果,就像我們在本例的第一部分中所做的那樣。
希望這會有所幫助!
謝謝你,我會看看!然而,作爲SO的評論員,我不得不問你是否可以在這裏提供代碼示例。不鼓勵鏈接,因爲它們可能會死亡,這違背了面向FAQ的網站的精神。 – 2014-11-06 16:09:48
當然讓我把它複製:) – Holden 2014-11-06 16:23:00