2016-11-10 69 views
1

我正在使用PySpark,並且正在尋找修改列表中包含的4個rdds的方法。當我展示我的名單,我有這樣的事情:循環後的聯盟RDD PySpark

enter image description here

所以給出:

for r in repartionned_rdd: 

    print r.collect() 

給出:

[(u'_guid_NCw7SuFnCh_mFW3SI3qTvBCbqXKD4mtsdJvWE7HNgNg=', (u'f', u'KSJakOd2|KtC9ZF9h'))] 
[(u'_guid_OCs2au-sKnxzPE0uRPDP4hg1vvhgpzRAAYjNWRQpKbw=', (u'f', u'KxrylzuA|KpSXJwH2')), (u'_guid_txH15ULaeUDBC4Z_NlEOj2xoYBFa-08imqIBLfYsKps=', (u'f', u'bda54c71-cd1e-4eb7-856c-ba2e6def30c8|6e189e07-807e-41a2-a60a-b07d894a2905')), (u'_guid_ehCT6NyD9l3q3NV9ZroaWVEo3bnDt4tvbU_fMBrEn1g=', (u'm', u'537D69B4-743A-45B9-BED1-A25AA5926F13|2bb3e466-edc5-4302-b102-3bddb1f8c490|aa4760de-104c-4dc3-94c3-336427f89723')), (u'_guid_9F4Ph5GztLN9IlWNgZWKPMCcT4N3Je6-93iM_130F-c=', (u'f', u'KOQqBzhU|KrDt5GC4')), (u'_guid_nPlE_f-zoOHNYiXJSGXWoVryc1U4Bnfxkow3P0mDUFY=', (u'f', u'Kh3tIZR1|Khs0tRsh|K3geBqb_|KBrVNcDX|Jg2uDy8M|529816a3-ee43-4423-961f-8aedaf25d58c')), (u'331d8410d4924e72b0f0585e888c85ce', (u'f', u'1F37807A-CBEA-4B78-85D7-5A97B37B539E'))] 
[(u'28b195c271f14a329235c262e7baecbf', (u'm', u'50c41480-a94e-4afa-a732-b6ed7a057239'))] 
[(u'c65ac2064bc14116a363125392dcc6f7', (u'f', u'77e4b9b3-83b4-4553-b274-7a16f553cf05')), (u'171f92200d634d62bdc6685bdb7a94e3', (u'f', u'bdf53cb6-695d-4dde-b0c1-d1a34ebea6f7|a09e4074-c22e-48a1-9976-ee2151b5888c|K1Umlb5M|639B02B4-24AD-4069-99A2-C68E8C8F7F06|KjE3wXIr')), (u'_guid_wQZIzeFxciX9CIHUPeWOF2euOIC0jiOsXVXN98_zCh8=', (u'f', u'F0992237-2598-4B13-AA8A-C37D436B901C|C80D1A89-DD84-4734-838F-128F99EBDD20|KthpuVu0')), (u'_guid_ufOcKO48drwr50yJN26NriX5MLYONwmALxWcmly7oqQ=', (u'f', u'KlY10YxX|KyCVx_km'))] 

我OBJECTIF是添加一種新的「列「到列表中的每個rdd。該行將包含每個rdd的唯一索引。我的代碼:

for i, rdd in enumerate(repartionned_rdd): 

    new_rdd = rdd.map(lambda x : x + (float(i),)) 

    print new_rdd.collect() 

其中給出:(!在代碼中提到的)

[(u'_guid_NCw7SuFnCh_mFW3SI3qTvBCbqXKD4mtsdJvWE7HNgNg=', 
(u'f', u'KSJakOd2|KtC9ZF9h'), 0.0)] 

[(u'_guid_OCs2au-sKnxzPE0uRPDP4hg1vvhgpzRAAYjNWRQpKbw=', 
(u'f', u'KxrylzuA|KpSXJwH2'), 1.0), 
(u'_guid_txH15ULaeUDBC4Z_NlEOj2xoYBFa-08imqIBLfYsKps=', 
(u'f', u'bda54c71-cd1e-4eb7-856c-ba2e6def30c8|6e189e07-807e-41a2-a60a-b07d894a2905'), 1.0), 
(u'_guid_ehCT6NyD9l3q3NV9ZroaWVEo3bnDt4tvbU_fMBrEn1g=', 
(u'm', u'537D69B4-743A-45B9-BED1-A25AA5926F13|2bb3e466-edc5-4302-b102-3bddb1f8c490|aa4760de-104c-4dc3-94c3-336427f89723'), 1.0), 
(u'_guid_9F4Ph5GztLN9IlWNgZWKPMCcT4N3Je6-93iM_130F-c=', 
(u'f', u'KOQqBzhU|KrDt5GC4'), 1.0), 
(u'_guid_nPlE_f-zoOHNYiXJSGXWoVryc1U4Bnfxkow3P0mDUFY=', 
(u'f', u'Kh3tIZR1|Khs0tRsh|K3geBqb_|KBrVNcDX|Jg2uDy8M|529816a3-ee43-4423-961f-8aedaf25d58c'), 1.0), 
(u'331d8410d4924e72b0f0585e888c85ce', 
(u'f', u'1F37807A-CBEA-4B78-85D7-5A97B37B539E'), 1.0)] 

[(u'28b195c271f14a329235c262e7baecbf', 
(u'm', u'50c41480-a94e-4afa-a732-b6ed7a057239'), 2.0)] 

[(u'c65ac2064bc14116a363125392dcc6f7', 
(u'f', u'77e4b9b3-83b4-4553-b274-7a16f553cf05'), 3.0), 
(u'171f92200d634d62bdc6685bdb7a94e3', 
(u'f', u'bdf53cb6-695d-4dde-b0c1-d1a34ebea6f7|a09e4074-c22e-48a1-9976-ee2151b5888c|K1Umlb5M|639B02B4-24AD-4069-99A2-C68E8C8F7F06|KjE3wXIr'), 3.0), 
(u'_guid_wQZIzeFxciX9CIHUPeWOF2euOIC0jiOsXVXN98_zCh8=', 
(u'f', u'F0992237-2598-4B13-AA8A-C37D436B901C|C80D1A89-DD84-4734-838F-128F99EBDD20|KthpuVu0'), 3.0), 
(u'_guid_ufOcKO48drwr50yJN26NriX5MLYONwmALxWcmly7oqQ=', 
(u'f', u'KlY10YxX|KyCVx_km'), 3.0)] 

所以在我new_rdd每一行包含一個新列,這是具體的RDD的指數

我對象現在只是將所有這些新的rdds放在一個獨特的rdd中。 我嘗試這樣做:

all_rdds_list =[] 

for i, rdd in enumerate(repartionned_rdd): 

    new_rdd = rdd.map(lambda x : x + (float(i),)) 

    all_rdds_list.append(new_rdd) 

但是,當我試圖顯示我的RDDS,我得到這個:

for x in all_rdds_list: 

    print x.collect() 

結果:

[(u'_guid_NCw7SuFnCh_mFW3SI3qTvBCbqXKD4mtsdJvWE7HNgNg=', 
(u'f', u'KSJakOd2|KtC9ZF9h'), 3.0)] 

[(u'_guid_OCs2au-sKnxzPE0uRPDP4hg1vvhgpzRAAYjNWRQpKbw=', 
(u'f', u'KxrylzuA|KpSXJwH2'), 3.0), 
(u'_guid_txH15ULaeUDBC4Z_NlEOj2xoYBFa-08imqIBLfYsKps=', 
(u'f', u'bda54c71-cd1e-4eb7-856c-ba2e6def30c8|6e189e07-807e-41a2-a60a-b07d894a2905'), 3.0), 
(u'_guid_ehCT6NyD9l3q3NV9ZroaWVEo3bnDt4tvbU_fMBrEn1g=', 
(u'm', u'537D69B4-743A-45B9-BED1-A25AA5926F13|2bb3e466-edc5-4302-b102-3bddb1f8c490|aa4760de-104c-4dc3-94c3-336427f89723'), 3.0), 
(u'_guid_9F4Ph5GztLN9IlWNgZWKPMCcT4N3Je6-93iM_130F-c=', 
(u'f', u'KOQqBzhU|KrDt5GC4'), 3.0), 
(u'_guid_nPlE_f-zoOHNYiXJSGXWoVryc1U4Bnfxkow3P0mDUFY=', 
(u'f', u'Kh3tIZR1|Khs0tRsh|K3geBqb_|KBrVNcDX|Jg2uDy8M|529816a3-ee43-4423-961f-8aedaf25d58c'), 3.0), 
(u'331d8410d4924e72b0f0585e888c85ce', 
(u'f', u'1F37807A-CBEA-4B78-85D7-5A97B37B539E'), 3.0)] 

[(u'28b195c271f14a329235c262e7baecbf', 
(u'm', u'50c41480-a94e-4afa-a732-b6ed7a057239'), 3.0)] 

[(u'c65ac2064bc14116a363125392dcc6f7', 
(u'f', u'77e4b9b3-83b4-4553-b274-7a16f553cf05'), 3.0), 
(u'171f92200d634d62bdc6685bdb7a94e3', 
(u'f', u'bdf53cb6-695d-4dde-b0c1-d1a34ebea6f7|a09e4074-c22e-48a1-9976-ee2151b5888c|K1Umlb5M|639B02B4-24AD-4069-99A2-C68E8C8F7F06|KjE3wXIr'), 3.0), 
(u'_guid_wQZIzeFxciX9CIHUPeWOF2euOIC0jiOsXVXN98_zCh8=', 
(u'f', u'F0992237-2598-4B13-AA8A-C37D436B901C|C80D1A89-DD84-4734-838F-128F99EBDD20|KthpuVu0'), 3.0), 
(u'_guid_ufOcKO48drwr50yJN26NriX5MLYONwmALxWcmly7oqQ=', 
(u'f', u'KlY10YxX|KyCVx_km'), 3.0)] 

幫助?謝謝 !

回答

0

要做到這一點,最好的辦法:

def get_population_id(repartionned_rdd): 

    idx = range(len(repartionned_rdd)) 

    FullRDD = sc.emptyRDD() 

    for (i, rdd) in zip(idx, repartionned_rdd): 

     FullRDD = FullRDD.union(rdd.map(lambda x: x + (float(i),))) 

    return FullRDD 
1

你的方法有兩個問題。首先使用一個在評估方法分配之前更改的變量。地圖調用是轉換,因此只有在您應用動作時纔會執行(如collect)。這就是爲什麼當您在枚舉循環內部收集時會看到正確的附加列,但在後面的示例中,它會爲每個映射選擇最後一個值i

第二個問題是,如果您嘗試將rdds聯合起來,則應該使用union函數而不是rdds列表。如果你真的想要一個rdds列表,那麼你可以用下面的union來代替你以前的列表。

full_rdd = None 
for i, rdd in enumerate(repartionned_rdd): 
    new_rdd = rdd.map(lambda x : x + (float(i),)) 
    if full_rdd is None: 
     full_rdd = new_rdd 
    else: 
     full_rdd = sc.union([full_rdd, new_rdd]) 
    # This will force the lazy evaluation to execute now before `i` changes 
    full_rdd.count() 
+0

Pyrce:謝謝你,但如果將你的代碼,我得到這個錯誤信息:**類型錯誤:聯盟()到底需要2個參數(3給出)** – DataAddicted

+0

@DataAddicted opps,我錯過了圍繞RDD參數的陳述。現在應該是正確的。 – Pyrce