我試圖使用Apache Spark來傳輸twitter數據,我想將流數據保存爲csv文件,但我不能 我該如何修復我的代碼才能使它csvSpark-Scala:另存爲csv文件(RDD)
我使用RDD。
這是我的主要代碼:
val ssc = new StreamingContext(conf, Seconds(3600))
val stream = TwitterUtils.createStream(ssc, None, filters)
val tweets = stream.map(t => {
Map(
// This is for tweet
"text" -> t.getText,
"retweet_count" -> t.getRetweetCount,
"favorited" -> t.isFavorited,
"truncated" -> t.isTruncated,
"id_str" -> t.getId,
"in_reply_to_screen_name" -> t.getInReplyToScreenName,
"source" -> t.getSource,
"retweeted" -> t.isRetweetedByMe,
"created_at" -> t.getCreatedAt,
"in_reply_to_status_id_str" -> t.getInReplyToStatusId,
"in_reply_to_user_id_str" -> t.getInReplyToUserId,
// This is for tweet's user
"listed_count" -> t.getUser.getListedCount,
"verified" -> t.getUser.isVerified,
"location" -> t.getUser.getLocation,
"user_id_str" -> t.getUser.getId,
"description" -> t.getUser.getDescription,
"geo_enabled" -> t.getUser.isGeoEnabled,
"user_created_at" -> t.getUser.getCreatedAt,
"statuses_count" -> t.getUser.getStatusesCount,
"followers_count" -> t.getUser.getFollowersCount,
"favorites_count" -> t.getUser.getFavouritesCount,
"protected" -> t.getUser.isProtected,
"user_url" -> t.getUser.getURL,
"name" -> t.getUser.getName,
"time_zone" -> t.getUser.getTimeZone,
"user_lang" -> t.getUser.getLang,
"utc_offset" -> t.getUser.getUtcOffset,
"friends_count" -> t.getUser.getFriendsCount,
"screen_name" -> t.getUser.getScreenName
)
})
tweets.repartition(1).saveAsTextFiles("~/streaming/tweets")
解決一個數據幀鳴叫轉換爲數據幀你有一個RDD或數據集/幀。如果是後者,則有一種採用CSV作爲格式選項的寫入方法 –
@ cricket_007這是一個RDD –
@ user8371915不,我正在使用RDD而不是DF –