2017-04-12 141 views
0

我是一名初學者,嘗試使用帶有一些過濾器關鍵字的Scala使用Spark流發送推文。在流式傳輸之後,是否有可能僅過濾沒有地理位置的推文爲空?我正嘗試在ElasticSearch中保存推文。因此,在將推訊地圖保存到ElasticSearch之前,我可以使用地理定位信息過濾那些地圖,然後保存它們嗎?我使用json4s.JSONDSL和tweet中的字段創建JSON。這是示例代碼Spark流 - 用地理位置信息流後過濾推文

VAL流= TwitterUtils.createStream(SSC,無,過濾器) VAL tweetMap = stream.map(狀態=> { VAL tweetMap =

 ("location" -> Option(status.getGeoLocation).map(geo => { s"${geo.getLatitude},${geo.getLongitude}" })) ~ 
     ("UserLang" -> status.getUser.getLang) ~ 
     ("UserLocation" -> Option(status.getUser.getLocation)) ~ 
     ("UserName" -> status.getUser.getName) ~ 
     ("Text" -> status.getText) ~ 
     ("TextLength" -> status.getText.length) ~ 
     //Tokenized the tweet message and then filtered only words starting with # 
     ("HashTags" -> status.getText.split(" ").filter(_.startsWith("#")).mkString(" ")) ~ 
     ("PlaceCountry" -> Option(status.getPlace).map (pl => {s"${pl.getCountry}"})) 

tweetMap.map(S =>列表( 「資料Tweet提取的」))。打印

// Each batch is saved to Elasticsearch 
tweetMap.foreachRDD { tweets => EsSpark.saveToEs(tweets, "sparksender/tweets")) } 

//之前這個步驟是有一種方法篩選出具有 「位置」 爲空鳴叫?

我參考了github的代碼: https://github.com/luvgupta008/ScreamingTwitter/blob/master/src/main/scala/com/spark/streaming/TwitterTransmitter.scala

回答

0

查看RDD上的filter方法。採用謂詞函數(a: A) => Boolean。如果返回值爲true,則將該元素添加到列表中。如果它是假的,該元素將不會被添加到列表中。

tweetMap.filter(
    status => Option(status.getGeoLocation) match { 
    case Some(_) => true 
    case None => false 
    }) 
+0

我試過了,但是在編譯「value getGeoLocation不是scala.collection.immutable.Map [String,Any]的成員時出現錯誤」 –