2016-11-30 94 views
-2

我需要將一個rdd轉換爲兩行,並將一個rdd轉換爲一行。例如:從多行生成一行到一個RDD

rdd1=a 
    b 

我需要:

rdd2=(a,b) 

我怎樣才能做到在pyspark這一步呢? 這個問題可能是愚蠢的,但我是新的火花。 「UPDATE」 這是執行rdd2和rdd3之間的直角座標,從rdd1開始。像:

rdd3:(k,l) 
    (c,g) 
    (f,x) 

我想這樣的輸出:

rddOut:[(a,b),(k,l)] 
     [(a,b),(c,g)] 
     [(a,b),(f,x)] 

在此先感謝

回答

-1

你能解釋一下多一點您的需要?由於您失去了所有的並行性,因此使用單行RDD並不是一個好主意。

如果要按鍵收集數據,可以將RDD轉換爲RDD(鍵和值)。然後,您可以執行reduceByKey,以便通過簡單地將reduce函數作爲列表級聯來將列表中的所有內容收集到列表中。

+0

問題是我必須執行一個測量距離比較兩個相同的rdds(在兩者之間執行笛卡爾),但結果太大了,因爲我正在處理大型數據集。所以這個想法是採取rdd1的第一行,與所有rdd2(與rdd1相同)執行笛卡爾,然後發出輸出文件。刪除rdd1的第一行,先取出新的,用rdd2執行笛卡爾並生成第二個文件等。 –

-1

如果我對你的問題的理解是正確的,那麼使用flatMap這將得到你需要的輸出。

0

更新我的anwser:

initRDD = sc.parallelize(list('aeiou')).map(lambda x: (x, ord(x))).collect() 

ssc = StreamingContext(sc, batchDuration=3) 

lines = ssc.socketTextStream('localhost', 9999) 
items = lines.flatMap(lambda x: x.split()) 
counts = items.countByValue().map(lambda x: ([x] + initRDD)) 

它看起來像廣播,而不是直角。

+0

不正確,我更新了我的問題 –