2017-05-13 29 views
6

我有一個PySpark應用程序必須詳細說明5GB的壓縮數據(字符串)。我正在使用一個帶有12個內核(24個線程)和72Gb RAM的小型服務器。我的PySpark程序僅包含2個地圖操作,由3個非常大的正則表達式(每個3gb已編譯)和pickle加載。 Spark工作在獨立模式下,工人和主人在同一臺機器上。環境有多少火花能做到?

我的問題是:是否爲每個執行器核心引發複製每個變量?因爲它使用所有可用的內存,然後使用大量的交換空間。或者它可能會加載RAM中的所有分區? RDD包含大約1000萬字符串,必須由3個正則表達式搜索。 RDD計算大約1000個分區。我在完成這項任務時遇到了麻煩,因爲在幾分鐘後內存已滿並且使用交換空間的啓動開始變得非常緩慢。 我注意到沒有正則表達式的情況是一樣的。

這是我的代碼,它會刪除Twitter的鳴叫的所有無用的領域和掃描鳴叫的文本和說明特定的詞:

import json 
import re 
import twitter_util as twu 
import pickle 

from pyspark import SparkContext 
sc = SparkContext() 

prefix = '/home/lucadiliello' 

source = prefix + '/data/tweets' 
dest = prefix + '/data/complete_tweets' 

#Regex's path 
companies_names_regex = prefix + '/data/comp_names_regex' 
companies_names_dict = prefix + '/data/comp_names_dict' 
companies_names_dict_to_legal = prefix + '/data/comp_names_dict_to_legal' 

#Loading the regex's 
comp_regex = pickle.load(open(companies_names_regex)) 
comp_dict = pickle.load(open(companies_names_dict)) 
comp_dict_legal = pickle.load(open(companies_names_dict_to_legal)) 

#Loading the RDD from textfile 
tx = sc.textFile(source).map(lambda a: json.loads(a)) 


def get_device(input_text): 
    output_text = re.sub('<[^>]*>', '', input_text) 
    return output_text 

def filter_data(a): 
    res = {} 
    try: 
     res['mentions'] = a['entities']['user_mentions'] 
     res['hashtags'] = a['entities']['hashtags'] 
     res['created_at'] = a['created_at'] 
     res['id'] = a['id'] 

     res['lang'] = a['lang'] 
     if 'place' in a and a['place'] is not None:  
      res['place'] = {} 
      res['place']['country_code'] = a['place']['country_code'] 
      res['place']['place_type'] = a['place']['place_type'] 
      res['place']['name'] = a['place']['name'] 
      res['place']['full_name'] = a['place']['full_name'] 

     res['source'] = get_device(a['source']) 
     res['text'] = a['text'] 
     res['timestamp_ms'] = a['timestamp_ms'] 

     res['user'] = {} 
     res['user']['created_at'] = a['user']['created_at'] 
     res['user']['description'] = a['user']['description'] 
     res['user']['followers_count'] = a['user']['followers_count'] 
     res['user']['friends_count'] = a['user']['friends_count'] 
     res['user']['screen_name'] = a['user']['screen_name'] 
     res['user']['lang'] = a['user']['lang'] 
     res['user']['name'] = a['user']['name'] 
     res['user']['location'] = a['user']['location'] 
     res['user']['statuses_count'] = a['user']['statuses_count'] 
     res['user']['verified'] = a['user']['verified'] 
     res['user']['url'] = a['user']['url'] 
    except KeyError: 
     return [] 

    return [res] 


results = tx.flatMap(filter_data) 


def setting_tweet(tweet): 

    text = tweet['text'] if tweet['text'] is not None else '' 
    descr = tweet['user']['description'] if tweet['user']['description'] is not None else '' 
    del tweet['text'] 
    del tweet['user']['description'] 

    tweet['text'] = {} 
    tweet['user']['description'] = {} 
    del tweet['mentions'] 

    #tweet 
    tweet['text']['original_text'] = text 
    tweet['text']['mentions'] = twu.find_retweet(text) 
    tweet['text']['links'] = [] 
    for j in twu.find_links(text): 
     tmp = {} 
     try: 
      tmp['host'] = twu.get_host(j) 
      tmp['link'] = j 
      tweet['text']['links'].append(tmp) 
     except ValueError: 
      pass 

    tweet['text']['companies'] = [] 
    for x in comp_regex.findall(text.lower()): 
     tmp = {} 
     tmp['id'] = comp_dict[x.lower()] 
     tmp['name'] = x 
     tmp['legalName'] = comp_dict_legal[x.lower()] 
     tweet['text']['companies'].append(tmp) 

    # descr 
    tweet['user']['description']['original_text'] = descr 
    tweet['user']['description']['mentions'] = twu.find_retweet(descr) 
    tweet['user']['description']['links'] = [] 
    for j in twu.find_links(descr): 
     tmp = {} 
     try: 
      tmp['host'] = twu.get_host(j) 
      tmp['link'] = j 
      tweet['user']['description']['links'].append(tmp) 
     except ValueError: 
      pass 

    tweet['user']['description']['companies'] = [] 
    for x in comp_regex.findall(descr.lower()): 
     tmp = {} 
     tmp['id'] = comp_dict[x.lower()] 
     tmp['name'] = x 
     tmp['legalName'] = comp_dict_legal[x.lower()] 
     tweet['user']['description']['companies'].append(tmp) 

    return tweet 


res = results.map(setting_tweet) 

res.map(lambda a: json.dumps(a)).saveAsTextFile(dest, compressionCodecClass="org.apache.hadoop.io.compress.BZip2Codec") 

UPDATE 約1小時後,內存(72GB )完全滿並且交換(72gb)。在我的情況下使用廣播不是一個解決方案。

UPDATE 2 如果不加載3個變量和pickle,使用高達10GB的內存而不是144GB的內存就不會出現問題! (72GB RAM + 72Gb交換)

+1

代碼會很好,但如果沒有它,你就會回答你的問題--Spark使用與你分配給Python工作者的許多線程(核心)一樣多的本地變量副本。有一些解決方法,但通常相當複雜。 – zero323

+0

鑑於代碼,您應該爲驅動程序副本添加+1,在驅動程序上爲pickle版本添加+1,爲每個執行程序JVM添加+1(或多或少)。您可以通過使用廣播或直接從執行者加載數據來稍微改善這一點。 – zero323

+0

對於每個執行程序進程,在內存中使用相同的正則表達式實例沒有竅門嗎?如果不是我認爲我會減少執行者的數量..... –

回答

1

我的問題是:是否爲每個執行器核心引發複製每個變量?

是的!

每個(本地)變量的副本數等於分配給Python工作者的線程數。


至於你的問題,嘗試加載comp_regexcomp_dictcomp_dict_legal不使用pickle