2017-10-07 674 views
0

我試圖使用Apache Spark來傳輸twitter數據,我想將流數據保存爲csv文件,但我不能 我該如何修復我的代碼才能使它csvSpark-Scala:另存爲csv文件(RDD)

我使用RDD。

這是我的主要代碼:

val ssc = new StreamingContext(conf, Seconds(3600)) 
val stream = TwitterUtils.createStream(ssc, None, filters) 

val tweets = stream.map(t => { 
    Map(
    // This is for tweet 
    "text" -> t.getText, 
    "retweet_count" -> t.getRetweetCount, 
    "favorited" -> t.isFavorited, 
    "truncated" -> t.isTruncated, 
    "id_str" -> t.getId, 
    "in_reply_to_screen_name" -> t.getInReplyToScreenName, 
    "source" -> t.getSource, 
    "retweeted" -> t.isRetweetedByMe, 
    "created_at" -> t.getCreatedAt, 
    "in_reply_to_status_id_str" -> t.getInReplyToStatusId, 
    "in_reply_to_user_id_str" -> t.getInReplyToUserId, 

    // This is for tweet's user 
    "listed_count" -> t.getUser.getListedCount, 
    "verified" -> t.getUser.isVerified, 
    "location" -> t.getUser.getLocation, 
    "user_id_str" -> t.getUser.getId, 
    "description" -> t.getUser.getDescription, 
    "geo_enabled" -> t.getUser.isGeoEnabled, 
    "user_created_at" -> t.getUser.getCreatedAt, 
    "statuses_count" -> t.getUser.getStatusesCount, 
    "followers_count" -> t.getUser.getFollowersCount, 
    "favorites_count" -> t.getUser.getFavouritesCount, 
    "protected" -> t.getUser.isProtected, 
    "user_url" -> t.getUser.getURL, 
    "name" -> t.getUser.getName, 
    "time_zone" -> t.getUser.getTimeZone, 
    "user_lang" -> t.getUser.getLang, 
    "utc_offset" -> t.getUser.getUtcOffset, 
    "friends_count" -> t.getUser.getFriendsCount, 
    "screen_name" -> t.getUser.getScreenName 
) 
}) 

tweets.repartition(1).saveAsTextFiles("~/streaming/tweets") 
+0

解決一個數據幀鳴叫轉換爲數據幀你有一個RDD或數據集/幀。如果是後者,則有一種採用CSV作爲格式選項的寫入方法 –

+0

@ cricket_007這是一個RDD –

+0

@ user8371915不,我正在使用RDD而不是DF –

回答

2

你需要這是RDD [地圖[字符串,字符串]]鳴叫轉換成數據幀保存爲CSV。原因很簡單RDD沒有模式。而csv格式具有特定的架構。所以你必須將RDD轉換爲具有模式的數據框。

有幾種方法可以做到這一點。一種方法可能是使用案例類,而不是將數據放入地圖中。

case class(text:String, retweetCount:Int ...) 

現在使用適當的參數來實例化案例類,而不是Map(...)。

最後用火花隱式轉換

import spark.implicits._ 
tweets.toDF.write.csv(...) // saves as CSV 

或者你可以在地圖轉換爲使用給定here

+0

這些更改是在tweet地圖內還是之後? –

+0

我試圖使用該方法將地圖轉換爲數據框它工作,但如何分配推特信息(getText,getRetweetCount,...)? –

+0

它終於有效,謝謝你 –