0
哪一個更好的方式在apache梁中繼Pcollection之後進行左連接?左加入apache梁
pcoll1 = [('key1', [[('a', 1)],[('b', 2)], [('c', 3)], [('d', 4)],[('e', 5)], [('f', 6)]]), ('key2',[[('a', 12)],[('b', 21)], [('c', 13)]]), ('key3',[[('a', 21)],[('b', 23)], [('c', 31)]])]
pcoll2 = [('key1', [[('x', 10)]]), ('key2', [[('x', 20)]])]
預期outpus是
[('a', 1), ('x', 10)]
[('b', 2), ('x', 10)]
[('c', 3), ('x', 10)]
[('d', 4), ('x', 10)]
[('e', 5), ('x', 10)]
[('f', 6), ('x', 10)]
[('a', 12), ('x', 20)]
[('b', 21), ('x', 20)]
[('c', 13), ('x', 20)]
[('a', 21)]
[('b', 23)]
[('c', 31)]
我已經實現使用CoGroupByKey()和帕爾()左木匠。有沒有其他方法可以在beam Python SDK中實現左連接器?
left_joined = (
{'left': pcoll1, 'right': pcoll2}
| 'LeftJoiner: Combine' >> beam.CoGroupByKey()
| 'LeftJoiner: ExtractValues' >> beam.Values()
| 'LeftJoiner: JoinValues' >> beam.ParDo(LeftJoinerFn())
)
class LeftJoinerFn(beam.DoFn):
def __init__(self):
super(LeftJoinerFn, self).__init__()
def process(self, row, **kwargs):
left = row['left']
right = row['right']
if left and right:
for each in left:
yield each + right[0]
elif left:
for each in left:
yield each
請進一步描述,其中如何我可以加入左側Pcollection與側面輸入權利Pcollection使用相同的密鑰。 – prabeesh