1
首先,我理解持久數據結構與關於RDD的..更新的理念和永恆更新字典的PySpark RDD是我能想到的:)如何基於某些條件
我的問題是隻有一個字:
給定字典(或Row對象)的RDD,我如何循環/映射並在該RDD上應用某些轉換登錄並接收應用了這些轉換的新RDD。例如:
給出一個包含有一個RDD詞典:
fbb = sc.parallelize(
[{'amount_gbp': -43.33,
'balance_gbp': 57.08,
'type': 'GED',
'id': 961690979,
'settled_jrnl_cr_datetime': u'(null)',
'virtual_cash_balance': 0,
'virtual_debt_balance': 0},
{'amount_gbp': 17.08,
'balance_gbp': 40.0,
'type': 'OIP',
'id': 962182953,
'settled_jrnl_cr_datetime': u'(null)',
'virtual_cash_balance': 0,
'virtual_debt_balance': 0}])
我試圖以應用功能:
def update_virtual_cash_balance(x):
x.update({'virtual_cash_balance': x['amount_gbp'] + x['balance_gbp']}) if x['type'] == 'GED' else x
fbb.map(lambda x: update_virtual_cash_balance(x)).collect()
和預期:
[{'amount_gbp': -43.33,
'balance_gbp': 57.08,
'type': 'GED',
'id': 961690979,
'settled_jrnl_cr_datetime': u'(null)',
'virtual_cash_balance': 13.75,
'virtual_debt_balance': 0},
{'amount_gbp': 17.08,
'balance_gbp': 40.0,
'type': 'OIP',
'id': 962182953,
'settled_jrnl_cr_datetime': u'(null)',
'virtual_cash_balance': 0,
'virtual_debt_balance': 0}]
不過的了:
Out[411]: [None, None]
任何幫助我誤解的東西都會很棒。
那不是我誰downvoted的幫助@LostInOverflow謝謝 – adebesin