2013-10-30 63 views
2

我是Spark新手,請指導。如何使用scala運行Spark流的twitter流行標籤?

有很多與Spark使用Scala相關的示例。

您可以從https://github.com/apache/incubator-spark/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples中查看。

我想運行TwitterPopularTags.scala。

我無法爲此示例設置twitter登錄詳細信息。

http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html#linking-with-spark-streaming

我成功地運行網絡計數的例子。

但是,當我執行
./run-example org.apache.spark.streaming.examples.TwitterPopularTags local[2] 那麼它會告訴我認證失敗的問題...

我TwitterPopularTags.scala初始化字符串情形類似

System.setProperty("twitter4j.oauth.consumerKey", "####"); 
System.setProperty("twitter4j.oauth.consumerSecret", "##"); 
System.setProperty("twitter4j.oauth.accessToken", "##"); 
System.setProperty("twitter4j.oauth.accessTokenSecret", "##"); 

請指導之前設置Twitter的登錄信息。

+0

http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html#linking-with-spark-streaming 我成功運行網絡計數示例。 但是,當我執行./run-example org.apache.spark.streaming.examples.TwitterPopularTags本地[2]然後它會顯示我身份驗證失敗問題... 我設置了t –

+0

您可以請編輯您的問題,包括您收到的特定錯誤消息或異常?另外,AMP Camp 3大數據迷你課程有一個流式Twitter示例,可能會有所幫助(並且包含有關生成相應API令牌的說明):http://ampcamp.berkeley.edu/big-data-mini-course/realtime - 處理與 - 火花streaming.html –

回答

1

在運行Twitter示例之前,將文件「twitter4j.properties」放入Spark根目錄(例如spark-0.8.0-incubating)。

twitter4j.properties:

oauth.consumerKey=*** 
oauth.consumerSecret=*** 
oauth.accessToken=*** 
oauth.accessTokenSecret=*** 

爲我工作在Mac上使用Scala的例子。

1

我無法打開github鏈接https://github.com/apache/incubator-spark/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples

但是,您可以使用下面的代碼爲我工作。

import org.apache.spark.streaming.{ Seconds, StreamingContext } 
import org.apache.spark.SparkContext._ 
import org.apache.spark.streaming.twitter._ 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming._ 
import org.apache.spark.{ SparkContext, SparkConf } 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.flume._ 

/** 
* A Spark Streaming application that receives tweets on certain 
* keywords from twitter datasource and find the popular hashtags 
* 
* Arguments: <comsumerKey> <consumerSecret> <accessToken> <accessTokenSecret> <keyword_1> ... <keyword_n> 
* <comsumerKey>  - Twitter consumer key 
* <consumerSecret>  - Twitter consumer secret 
* <accessToken>  - Twitter access token 
* <accessTokenSecret> - Twitter access token secret 
* <keyword_1>   - The keyword to filter tweets 
* <keyword_n>   - Any number of keywords to filter tweets 
* 
* More discussion at stdatalabs.blogspot.com 
* 
* @author Sachin Thirumala 
*/ 

object SparkPopularHashTags { 
    val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Streaming - PopularHashTags") 
    val sc = new SparkContext(conf) 

    def main(args: Array[String]) { 

    sc.setLogLevel("WARN") 

    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) 
    val filters = args.takeRight(args.length - 4) 

    // Set the system properties so that Twitter4j library used by twitter stream 
    // can use them to generat OAuth credentials 
    System.setProperty("twitter4j.oauth.consumerKey", consumerKey) 
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret) 
    System.setProperty("twitter4j.oauth.accessToken", accessToken) 
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret) 

    // Set the Spark StreamingContext to create a DStream for every 5 seconds 
    val ssc = new StreamingContext(sc, Seconds(5)) 
    // Pass the filter keywords as arguements 

    // val stream = FlumeUtils.createStream(ssc, args(0), args(1).toInt) 
    val stream = TwitterUtils.createStream(ssc, None, filters) 

    // Split the stream on space and extract hashtags 
    val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) 

    // Get the top hashtags over the previous 60 sec window 
    val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) 
     .map { case (topic, count) => (count, topic) } 
     .transform(_.sortByKey(false)) 

    // Get the top hashtags over the previous 10 sec window 
    val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) 
     .map { case (topic, count) => (count, topic) } 
     .transform(_.sortByKey(false)) 

    // print tweets in the currect DStream 
    stream.print() 

    // Print popular hashtags 
    topCounts60.foreachRDD(rdd => { 
     val topList = rdd.take(10) 
     println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) 
     topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) } 
    }) 
    topCounts10.foreachRDD(rdd => { 
     val topList = rdd.take(10) 
     println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) 
     topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) } 
    }) 

    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

說明:
setMaster("local[4]") - 確保爲1個線程用於收集流入流束和另一個線程用於處理它掌握與至少2個線程設置爲本地模式。

我們指望的流行主題標籤與下面的代碼:

val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) 
     .map { case (topic, count) => (count, topic) } 
     .transform(_.sortByKey(false)) 

上面的代碼做了主題標籤的字數超過作爲reduceByKeyAndWindow指定的上一個60/10秒,並以降序排序。

reduceByKeyAndWindow用於我們必須對先前流間隔中累積的數據應用轉換的情況。

通過將四個嘰嘰喳喳OAuth憑證作爲參數執行代碼: enter image description here

您應該看到在每一個10/60秒間隔的流行主題標籤。 enter image description here

您可以通過集成的火花流和風暴在以下鏈接水槽和卡夫卡檢查類似的項目:

星火流:

星火的流部分1:實時的Twitter情感分析 http://stdatalabs.blogspot.in/2016/09/spark-streaming-part-1-real-time.html

Spark流媒體第2部分:使用Flume實時分析twitter情緒 http://stdatalabs.blogspot.in/2016/09/spark-streaming-part-2-real-time_10.html

星火的流部分3:使用卡夫卡 http://stdatalabs.blogspot.in/2016/09/spark-streaming-part-3-real-time.html

數據保證在星火卡夫卡集成流媒體實時的Twitter情感分析 http://stdatalabs.blogspot.in/2016/10/data-guarantees-in-spark-streaming-with.html

風暴:

實時流處理使用Apache風暴 - 第1部分 http://stdatalabs.blogspot.in/2016/09/realtime-stream-processing-using-apache.html

使用Apache Storm和Kafka實時流處理 - 第2部分 http://stdatalabs.blogspot.in/2016/10/real-time-stream-processing-using.html