我是一名初學者,嘗試使用帶有一些過濾器關鍵字的Scala使用Spark流發送推文。在流式傳輸之後,是否有可能僅過濾沒有地理位置的推文爲空?我正嘗試在ElasticSearch中保存推文。因此,在將推訊地圖保存到ElasticSearch之前,我可以使用地理定位信息過濾那些地圖,然後保存它們嗎?我使用json4s.JSONDSL和tweet中的字段創建JSON。這是示例代碼Spark流 - 用地理位置信息流後過濾推文
VAL流= TwitterUtils.createStream(SSC,無,過濾器) VAL tweetMap = stream.map(狀態=> { VAL tweetMap =
("location" -> Option(status.getGeoLocation).map(geo => { s"${geo.getLatitude},${geo.getLongitude}" })) ~
("UserLang" -> status.getUser.getLang) ~
("UserLocation" -> Option(status.getUser.getLocation)) ~
("UserName" -> status.getUser.getName) ~
("Text" -> status.getText) ~
("TextLength" -> status.getText.length) ~
//Tokenized the tweet message and then filtered only words starting with #
("HashTags" -> status.getText.split(" ").filter(_.startsWith("#")).mkString(" ")) ~
("PlaceCountry" -> Option(status.getPlace).map (pl => {s"${pl.getCountry}"}))
tweetMap.map(S =>列表( 「資料Tweet提取的」))。打印
// Each batch is saved to Elasticsearch
tweetMap.foreachRDD { tweets => EsSpark.saveToEs(tweets, "sparksender/tweets")) }
//之前這個步驟是有一種方法篩選出具有 「位置」 爲空鳴叫?
我參考了github的代碼: https://github.com/luvgupta008/ScreamingTwitter/blob/master/src/main/scala/com/spark/streaming/TwitterTransmitter.scala
我試過了,但是在編譯「value getGeoLocation不是scala.collection.immutable.Map [String,Any]的成員時出現錯誤」 –