2016-12-30 565 views
1

我是Spark新手。Spark將rdd字段值替換爲另一個值

我可以用看在我elasticsearch數據庫中的第一RDD的內容:

print(es_rdd.first()) 
>>>(u'1', {u'name': u'john'}) 

我也可以用得到我的DSTREAM所需的值:

kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list":brokers}) 
name=kvs.map(lambda x: x[1]) 
name.pprint() 
>>>>robert 

我打算更換rdd「name」:「john」with「robert」,然後在彈性搜索中使用saveAsNewAPIHadoopFile()插入新的rdd

我該怎麼做? 有沒有辦法將「robert」映射到一個新的rdd中?喜歡的東西..

new_rdd=es_rdd.map(lambda item: {item[0]:name}) 

感謝

回答

2

我們可以根據索引列表與另一個RDD代替RDD的一部分。例如,將(RDD)中的元素從1,2,3,4替換爲2,3,4,4。

a = sc.parallelize([1,2,3,4]) 
repVals = sc.parallelize([2,3,4]) 
idx = sc.parallelize([0,1,2]) . # idx has the same number of values with repVals 

a = a.zipWithIndex() 
ref = idx.zip(repVals).collectAsMap() # create a dictionary of format {idex:repValue} 

anew = a.map(lambda x:ref[x[1]] if x[1] in ref else x[0]) 
anew.collect() 

結果表明[2,3,4,4-]

相關問題