1

我使用pyspark與Kafka接收器來處理推文流。我的應用程序的其中一個步驟包括致電Google Natural Language API以獲取每條推文的情緒分數。但是,我看到API每次處理的推文都會接到幾個電話(我在Google雲端控制檯中看到了電話號碼)。另外,如果我打印tweetIDs(映射函數內),我會得到相同的ID 3或4次。在我的應用程序結束時,推文被髮送到卡夫卡的另一個主題,我得到了正確的推文數(沒有重複的ID),所以原則上一切正常,但我不知道如何避免調用Google API每次推文不止一次。火花流媒體重複網絡電話

這是否與Spark或Kafka中的一些配置參數有關?

這裏是我的控制檯輸出的一個例子:

TIME 21:53:36: Google Response for tweet 801181843500466177 DONE! 
TIME 21:53:36: Google Response for tweet 801181854766399489 DONE! 
TIME 21:53:36: Google Response for tweet 801181844808966144 DONE! 
TIME 21:53:37: Google Response for tweet 801181854372012032 DONE! 
TIME 21:53:37: Google Response for tweet 801181843500466177 DONE! 
TIME 21:53:37: Google Response for tweet 801181854766399489 DONE! 
TIME 21:53:37: Google Response for tweet 801181844808966144 DONE! 
TIME 21:53:37: Google Response for tweet 801181854372012032 DONE! 

但在卡夫卡接收我只得到4周處理的tweet(這是接受,因爲他們只有4個獨特的鳴叫正確的事)。

執行此代碼是:

def sendToKafka(rdd,topic,address): 
    publish_producer = KafkaProducer(bootstrap_servers=address,\ 
          value_serializer=lambda v: json.dumps(v).encode('utf-8')) 
    records = rdd.collect() 
    msg_dict = defaultdict(list) 
    for rec in records: 
     msg_dict["results"].append(rec) 
    publish_producer.send(resultTopic,msg_dict) 
    publish_producer.close() 


kafka_stream = KafkaUtils.createStream(ssc, zookeeperAddress, "spark-consumer-"+myTopic, {myTopic: 1}) 

dstream_tweets=kafka_stream.map(lambda kafka_rec: get_json(kafka_rec[1]))\ 
       .map(lambda post: add_normalized_text(post))\ 
       .map(lambda post: tagKeywords(post,tokenizer,desired_keywords))\ 
       .filter(lambda post: post["keywords"] == True)\ 
       .map(lambda post: googleNLP.complementTweetFeatures(post,job_id)) 

dstream_tweets.foreachRDD(lambda rdd: sendToKafka(rdd,resultTopic,PRODUCER_ADDRESS)) 
+0

你已經做了什麼?你可以把你的代碼粘貼到這個問題上嗎? –

+0

我用代碼更新了問題。 googleNLP.complementTweetFeatures()向Google API發出一個請求並返回響應。 –

回答

1

我已經找到了解決這個!我剛剛與緩存DSTREAM:

dstream_tweets.cache() 

的多個網絡調用的發生是因爲星火重新計算該DSTREAM內RDDS在我的腳本執行後操作之前。當我緩存()DStream時,只需要計算一次;並且由於它被保存在內存中,以後的函數可以在不重新計算的情況下訪問這些信息(在這種情況下,需要重新計算再次調用API,因此值得花費更多的內存使用量)。