2017-06-18 186 views
-2

我轉換了現有的代碼,該代碼在python下面粘貼在pyspark中。與純python替代品相比,Pyspark代碼的性能不夠

Python代碼:

import json 
import csv 


def main(): 
    # create a simple JSON array 
    with open('paytm_tweets_data_1495614657.json') as str: 

     tweetsList = [] 
     # change the JSON string into a JSON object 
     jsonObject = json.load(str) 

     #print(jsonObject) 

     # # print the keys and values 
     for i in range(len(jsonObject)): 
      tweetsList.insert(i,jsonObject[i]["text"]) 

     #print(tweetsList) 
    displaySentiment(tweetsList) 



def displaySentiment(tweetsList): 
    aDict = {} 

    from sentiment import sentiment_score 

    for i in range(len(tweetsList)): 
     aDict[tweetsList[i]] = sentiment_score(tweetsList[i]) 
    print (aDict) 


    with open('PaytmtweetSentiment.csv', 'w') as csv_file: 
     writer = csv.DictWriter(csv_file, fieldnames = ["Tweets", "Sentiment Value"]) 
     writer.writeheader() 
     writer = csv.writer(csv_file) 
     for key, value in aDict.items(): 
      writer.writerow([key, value]) 


if __name__ == '__main__': 
    main() 

轉換Pyspark代碼:

import json 
import csv 
import os 
from pyspark import SparkContext, SparkConf 
from pyspark.python.pyspark.shell import spark 

os.environ['PYSPARK_PYTHON'] = "/usr/local/bin/python3" 


def main(): 
    path = "/Users/i322865/DeepInsights/bitbucket-code/ai-engine/twitter-sentiment-analysis/flipkart_tweets_data_1495601666.json" 
    peopleDF = spark.read.json(path).rdd 
    df = peopleDF.map(lambda row: row['text']) 
    displaySentiment(df.collect()) 



def displaySentiment(tweetsList): 
    from sentiment import sentiment_score 

    aDict = sentiment_score(tweetsList) 

    # 
    with open('paytmtweetSentiment.csv', 'w') as csv_file: 
     writer = csv.DictWriter(csv_file, fieldnames = ["Tweets", "Sentiment Value"]) 
     writer.writeheader() 
     writer = csv.writer(csv_file) 
     for i in range(len(tweetsList)): 
      writer.writerow([tweetsList[i], aDict[i]]) 
      print([tweetsList[i], aDict[i]]) 


if __name__ == '__main__': 
    conf = SparkConf().setAppName("Test").setMaster("local") 
    sc = SparkContext.getOrCreate(conf=conf) 
    main() 

我跑了兩個節目,但沒有看到任何顯著的性能提升。我錯過了什麼?請你能提出一些想法嗎?

另外,我是否也應該使用'減少'?我目前只使用'地圖'。

+0

這種類型的問題並不適合該網站,但事實上,這仍然是一個不好的代碼,說實話,當然沒有進攻! Pyspark不是一種編程語言。另一方面,Python也是如此。 – eliasah

+0

@eliasah對不起,修改了問題。感謝您的快速反饋。 – coders

+1

調用'df.collect()'兩次當然性能較差。調用它在所有渲染Spark大多無用 –

回答

2

如果你想在平行過程PySpark東西,不要collect()回Python列表

def calc_sentiment(tweetsDf): # You should pass a dataframe 
    from sentiment import sentiment_score 

    # Add a new column over the Tweets for the sentiment 
    return tweetsDf.withColumn('sentiment_score', sentiment_score(tweetsDf.text)) 

顯然,sentiment_score需求變化以及雙方接受並返回一個PySpark Column

然後,你會有這樣的事情

def main(): 
    path = "..../twitter-sentiment-analysis/flipkart_tweets_data_1495601666.json" 
    twitterDf = spark.read.json(path) 

    # Don't call collect, only sample the Dataframe 
    sentimentDf = calc_sentiment(twitterDf) 
    sentimentDf.show(5) 

    # TODO: Write sentimentDf to a CSV 
    sentimentDf.write.csv(....) 
+0

感謝您的建議。只是一個附加問題,而不是.json文件,如果我傳遞流數據,是否需要在上面建議的代碼中進行任何更改 – coders

+0

只要您可以將流傳輸到Dataframe對象中,那麼不是我知道 –

2

除了其他人指出的收集問題,y我們的PySpark實現可能會比較慢,因爲Spark不適用於您當前的用例。

Spark的基本原理旨在加速對非常大的分佈式數據集(多臺機器)的操作,而不是本地並行化。爲了實現這一點,它使用架空結構和流程。

對於單個/小型數據集,這種開銷很容易占主導地位,並且會降低您的解決方案速度。 This article討論了使用Hadoop,它非常類似。您可能試過multiprocessing

如果您確定Spark適合您,發佈一個詳細說明您的Spark設置,您如何衡量您的性能以及數據集的新問題可能會有幫助。

0

我認爲你沒有看到任何加速是完全合理的。你首先創建一個RDD(所以你分發數據)然後你收集它們來運行你的第二個函數,這是分析函數。實際上,通過將所有數據收集到繼續應用displaySentiment()函數的驅動程序機器,可以摧毀你的第一個功能。所以你實際上做的是在只有一臺機器的驅動器機器上運行程序。因此沒有加速。