2017-06-15 97 views
0

我試圖從Twitter使用Spark Streaming流數據。但是
下面的問題。Spark Streaming Twitter createStream問題

import org.apache.spark.streaming.twitter._ 
import twitter4j.auth._ 
import twitter4j.conf._ 
import org.apache.spark.streaming.{Seconds,StreamingContext} 
import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ 
val ssc = new StreamingContext(sc, Seconds(10)) 
val cb = new ConfigurationBuildercb.setDebugEnabled(true).setOAuthConsumerKey("").setOAuthConsumerSecret("").setOAuthAccessToken ("").setOAuthAccessTokenSecret("") 
val auth = new OAuthAuthorization(cb.build) 
val tweets = TwitterUtils.createStream(ssc,auth) 

錯誤屏幕:

val tweets = TwitterUtils.createStream(ssc,auth) 
<console>:49: error: overloaded method value createStream with alternatives: 
    (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,twitterAuth: twitter4j.auth.Authorization)org.apache.spark.streaming.api.java.JavaReceiverInputDStream[twitter4j.Status] <and> 
    (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,filters: Array[String])org.apache.spark.streaming.api.java.JavaReceiverInputDStream[twitter4j.Status] <and> 
    (ssc: org.apache.spark.streaming.StreamingContext,twitterAuth: Option[twitter4j.auth.Authorization],filters: Seq[String],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[twitter4j.Status] 
cannot be applied to (org.apache.spark.streaming.StreamingContext, twitter4j.auth.OAuthAuthorization) 
     val tweets = TwitterUtils.createStream(ssc,auth) 
+0

可以隨時從這裏吸取一些靈感:https://github.com/stefanobaghino/spark-twitter-stream-example – stefanobaghino

+0

感謝@ stefanobaghino –

回答

1

在這個問題的方法有此簽名:

def createStream(
    ssc: StreamingContext, 
    twitterAuth: Option[Authorization], 
    filters: Seq[String] = Nil, 
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 
) 

我們可以看到,ssc: StreamingContexttwitterAuth: Option[Authorization]是強制性的。另外兩個是可選的。

就你而言,twitterAuth類型不正確。這是一個Option[Authorization]。呼叫,在這種情況下,應該是這樣的:

val tweets = TwitterUtils.createStream(ssc, Some(auth)) 
0
import org.apache.spark._ 
import org.apache.spark.SparkContext._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.twitter._ 
import org.apache.spark.streaming.StreamingContext._ 


object TwitterStream { 

def setupLogging() = { 
import org.apache.log4j.{Level, Logger} 
val rootLogger = Logger.getRootLogger() 
rootLogger.setLevel(Level.ERROR) 
} 

/** Configures Twitter service credentials using twiter.txt in the main 
workspace directory */ 
def setupTwitter() = { 
import scala.io.Source 

for (line <- Source.fromFile("/Users/sampy/twitter.txt").getLines) { 
    val fields = line.split(" ") 
    if (fields.length == 2) { 
    System.setProperty("twitter4j.oauth." + fields(0), fields(1)) 
    } 
} 
} 

/** Our main function where the action happens */ 
def main(args: Array[String]) { 

setupTwitter() 


val ssc = new StreamingContext("local[*]", 
"PopularHashtags",Seconds(5)) 

setupLogging() 

val tweets = TwitterUtils.createStream(ssc, None) 
val engTweets = tweets.filter(x => x.getLang() == "en") 

val statuses = engTweets.map(status => status.getText) 

val tweetwords = statuses.flatMap(tweetText => tweetText.split(" ")) 

val hashtags = tweetwords.filter(word => word.startsWith("#")) 

val hashtagKeyValues = hashtags.map(hashtag => (hashtag, 1)) // 


val hashtagCounts = 
hashtagKeyValues.reduceByKeyAndWindow((x:Int,y:Int)=>x+y, Seconds(5), 
Seconds(20)) 
val sortedResults = hashtagCounts.transform(rdd => rdd.sortBy(x => 
x._2, false)) 
sortedResults.saveAsTextFiles("/Users/sampy/tweetsTwitter","txt") 

sortedResults.print 



ssc.checkpoint("/Users/sampy/checkpointTwitter") 
ssc.start() 
ssc.awaitTermination() 
} 
}