2017-08-04 56 views
0

哪一個更好的方式在apache梁中繼Pcollection之後進行左連接?左加入apache梁

pcoll1 = [('key1', [[('a', 1)],[('b', 2)], [('c', 3)], [('d', 4)],[('e', 5)], [('f', 6)]]), ('key2',[[('a', 12)],[('b', 21)], [('c', 13)]]), ('key3',[[('a', 21)],[('b', 23)], [('c', 31)]])] 
pcoll2 = [('key1', [[('x', 10)]]), ('key2', [[('x', 20)]])] 

預期outpus是

[('a', 1), ('x', 10)] 
[('b', 2), ('x', 10)] 
[('c', 3), ('x', 10)] 
[('d', 4), ('x', 10)] 
[('e', 5), ('x', 10)] 
[('f', 6), ('x', 10)] 
[('a', 12), ('x', 20)] 
[('b', 21), ('x', 20)] 
[('c', 13), ('x', 20)] 
[('a', 21)] 
[('b', 23)] 
[('c', 31)] 

我已經實現使用CoGroupByKey()和帕爾()左木匠。有沒有其他方法可以在beam Python SDK中實現左連接器?

left_joined = (
    {'left': pcoll1, 'right': pcoll2} 
    | 'LeftJoiner: Combine' >> beam.CoGroupByKey() 
    | 'LeftJoiner: ExtractValues' >> beam.Values() 
    | 'LeftJoiner: JoinValues' >> beam.ParDo(LeftJoinerFn()) 
) 


class LeftJoinerFn(beam.DoFn): 

    def __init__(self): 
     super(LeftJoinerFn, self).__init__() 

    def process(self, row, **kwargs): 

     left = row['left'] 
     right = row['right'] 

     if left and right: 
      for each in left: 
       yield each + right[0] 

     elif left: 
      for each in left: 
       yield each 

回答

0

如果第二個集合總是較小,另一種方法是使用side inputs。這需要將正確的集合作爲廣播給所有工作人員的邊輸入,然後編寫一個處理來自左集合中的元素並讀入正確集合的ParDo。

+0

請進一步描述,其中如何我可以加入左側Pcollection與側面輸入權利Pcollection使用相同的密鑰。 – prabeesh