2017-10-12 151 views
0

我試圖使用Spark Scala代碼流式傳輸twitter數據。我能夠獲取數據並創建數據框並查看它。但是,當嘗試提取status.getPlace.getCountry()時,我得到顯示java.lang.NullPointerException。使用Spark的Twitter流式傳輸

星火版本:2.0.0, 斯卡拉版本:

2.11.8試圖用if條件,檢查值等,但不成功。

代碼:

val spark = SparkSession.builder().appName("Twitter Spark Example").getOrCreate() 
val ssc = new StreamingContext(spark.sparkContext,Seconds(5)) 

val filters:Seq[String] = Seq("hadoop") 
val cb = new ConfigurationBuilder() 
     .setOAuthConsumerKey("******") 
     .setOAuthConsumerSecret("******") 
     .setOAuthAccessToken("********") 
     .setOAuthAccessTokenSecret("******").build() 

val twitter_auth = new TwitterFactory(cb) 
val a = new OAuthAuthorization(cb) 
val atwitter:Option[twitter4j.auth.Authorization] = Some(twitter_auth.getInstance(a).getAuthorization()) 

val tweetsdstream = TwitterUtils.createStream(ssc, atwitter, filters, StorageLevel.MEMORY_AND_DISK_SER_2) 
val data = tweetsdstream.map {status => 
     val places = status.getPlace 
     val id = status.getUser.getId 
     val date = status.getUser.getCreatedAt.toString() 
     val user = status.getUser.getName() 
     val place = places.getCountry() 

     (id,date,user,place) 
     } 
data.foreachRDD{rdd => 
     import spark.implicits._ 
     rdd.toDF("id","date","user","place").show() 
    } 

ssc.start() 
ssc.awaitTermination() 

是否有來自Twitter的訪問位置信息有任何限制? 任何建議都會有幫助。

感謝

+3

實際上大部分時間'getPlace'和'getCountry'都包含null值,您可以嘗試使用geoLocation而不是 –

回答

0

您可以使用Option處理null S:

val data = tweetsdstream.map { 
    status => 
    val place = Option(status.getPlace).map(_.getCountry).orNull 
    val id = status.getUser.getId 
    val user = status.getUser.getName 
    val date = status.getUser.getCreatedAt.toString 
    (id, date, user, place) 
} 

這樣一來,你就能夠想象的所有微博,無論他們是否有一個國家或沒有(和它在國家未定義的情況下將爲空)。

Option對於處理可能丟失的數據非常有用,可以將其用於其他可能的空字段。

+0

,您的解決方案適用於我。非常感謝。 –