2016-07-29 69 views
1

首先,我理解持久數據結構與關於RDD的..更新的理念和永恆更新字典的PySpark RDD是我能想到的:)如何基於某些條件

我的問題是隻有一個字:

給定字典(或Row對象)的RDD,我如何循環/映射並在該RDD上應用某些轉換登錄並接收應用了這些轉換的新RDD。例如:

給出一個包含有一個RDD詞典:

fbb = sc.parallelize(
    [{'amount_gbp': -43.33, 
     'balance_gbp': 57.08, 
     'type': 'GED', 
     'id': 961690979, 
     'settled_jrnl_cr_datetime': u'(null)', 
     'virtual_cash_balance': 0, 
     'virtual_debt_balance': 0}, 
    {'amount_gbp': 17.08, 
     'balance_gbp': 40.0, 
     'type': 'OIP', 
     'id': 962182953, 
     'settled_jrnl_cr_datetime': u'(null)', 
     'virtual_cash_balance': 0, 
     'virtual_debt_balance': 0}]) 

我試圖以應用功能:

def update_virtual_cash_balance(x): 
    x.update({'virtual_cash_balance': x['amount_gbp'] + x['balance_gbp']}) if x['type'] == 'GED' else x 

    fbb.map(lambda x: update_virtual_cash_balance(x)).collect() 

和預期:

[{'amount_gbp': -43.33, 
    'balance_gbp': 57.08, 
    'type': 'GED', 
    'id': 961690979, 
    'settled_jrnl_cr_datetime': u'(null)', 
    'virtual_cash_balance': 13.75, 
    'virtual_debt_balance': 0}, 
{'amount_gbp': 17.08, 
    'balance_gbp': 40.0, 
    'type': 'OIP', 
    'id': 962182953, 
    'settled_jrnl_cr_datetime': u'(null)', 
    'virtual_cash_balance': 0, 
    'virtual_debt_balance': 0}] 

不過的了:

Out[411]: [None, None] 

任何幫助我誤解的東西都會很棒。

回答

1
  • update_virtual_cash_balance不返回任何東西,所以你得到None
  • update方法不返回任何東西,所以你會得到None即使update_virtual_cash_balance返回值
  • 你不應該在的地方修改數據。 RDD是不可變的,變異的數據可能會產生不良影響。

嘗試:

def update_virtual_cash_balance(x): 
    if x['type'] == 'GED': 
     z = x.copy() # shallow copy should be enough here 
     z.update({'virtual_cash_balance': x['amount_gbp'] + x['balance_gbp']} 
     return z 
    return x 
+1

那不是我誰downvoted的幫助@LostInOverflow謝謝 – adebesin