2014-11-04 83 views
1

This guy的更新例如有一個非常小的例子,表明如何整合ElasticSearch和火花,當所有的ES生態系統大約爲0.9版本。現在,它不再起作用了(用Google搜索它似乎不是一件簡單的事情)。有人可以給出一個小的,自包含的Scala示例:自洽和使用星火超過ElasticSearch

  1. 在spark中打開一個文件(在上面的例子中,它是/var/log/syslog);
  2. 用它做點什麼;
  3. 發送結果到ES;
  4. 開放導致回的火花。

...與ElasticSearch 1.3.4工程和星火1.1.0。

回答

3

我給了一個通話一段時間回來與Spark和彈性搜索(約0.9天),我最近更新了一些對當今的例子(閱讀1.1)。我已發佈slidesexample code。希望有所幫助!

我也複製了相關部分(從我自己的GitHub庫)的位置:

import org.elasticsearch.spark.sql._ 
... 
val tweetsAsCS = 
    createSchemaRDD(tweetRDD.map(SharedIndex.prepareTweetsCaseClass)) 
tweetsAsCS.saveToEs(esResource) 

請注意,我們沒有指定任何ES節點。這將默認嘗試保存到本地主機上的羣集。如果我們要使用不同的集羣,我們可以添加:

// if we want to have a different es cluster we can add 
import org.elasticsearch.hadoop.cfg.ConfigurationOptions 
val config = new SparkConf() 
config.set(ConfigurationOptions.ES_NODES, node) // set the node for discovery 
// other config settings 
val sc = new SparkContext(config) 

這樣做的第一部分(索引一些數據)。

從Spark查詢ES也變得更簡單了,雖然只有當您的數據類型受連接器映射支持時(我遇到的主要問題不是地理位置,但它足夠容易擴展映射器如果你碰到這個)。

val query = "{\"query\": {\"filtered\" : {\"query\" : {\"match_all\" : {}},\"filter\" : { \"geo_distance\" : { \"distance\" : \""+ dist + "km\", \"location\" : { \"lat\" : "+ lat +", \"lon\" : "+ lon +" }}}}}}" 
val tweets = sqlCtx.esRDD(esResource, query) 

esRDD函數通常不在SQLContext上,但是我們上面導入的隱式轉換使它可用。 tweets現在是一個SchemaRDD,我們可以根據需要更新它,並保存結果,就像我們在本例的第一部分中所做的那樣。

希望這會有所幫助!

+0

謝謝你,我會看看!然而,作爲SO的評論員,我不得不問你是否可以在這裏提供代碼示例。不鼓勵鏈接,因爲它們可能會死亡,這違背了面向FAQ的網站的精神。 – 2014-11-06 16:09:48

+1

當然讓我把它複製:) – Holden 2014-11-06 16:23:00