0

我試圖修改Apache Beam的MinimalWordCount python示例以從BigQuery表中讀取。我做了以下修改,我似乎有查詢工作,但例如。修改MinimalWordCount示例以從BigQuery中讀取

原來的例子在這裏:

with beam.Pipeline(options=pipeline_options) as p: 

    # Read the text file[pattern] into a PCollection. 
    lines = p | ReadFromText(known_args.input) 

    # Count the occurrences of each word. 
    counts = (
     lines 
     | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) 
         .with_output_types(unicode)) 
     | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) 
     | 'GroupAndSum' >> beam.CombinePerKey(sum)) 

    # Format the counts into a PCollection of strings. 
    output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c)) 

    # Write the output using a "Write" transform that has side effects. 
    # pylint: disable=expression-not-assigned 
    output | WriteToText(known_args.output) 

不是ReadFromText我想這個調整從BigQuery資料表中的列來讀取。要做到這一點,我已經取代lines = p | ReadFromText(known_args.input)用下面的代碼:

query = 'SELECT text_column FROM `bigquery.table.goes.here` ' 
lines = p | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True)) 

當我重新運行的管道,我得到的錯誤:「WARNING:root:A task failed with exception. expected string or buffer [while running 'Split']

我認識到,「分割」操作期待字符串,它顯然沒有得到一個字符串。我如何修改'ReadFromBigQuery',以便它傳遞一個字符串/緩衝區?我是否需要提供表格模式或將'ReadFromBigQuery'的結果轉換爲字符串緩衝區?

回答

1

這是因爲BigQuerySource返回字典(dict)的PCollection,其中字典中的每個鍵代表一列。對於你的情況做最簡單的事情將是beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True)這樣經過短短應用beam.Map

lines = (p 
|"ReadFromBigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True)) 
| "Extract text column" >> beam.Map(lambda row: row.get("text_column")) 
     ) 

如果遇到問題,列名,嘗試將其更改爲u"text_column"

或者你可以修改你斯普利特變換提取列的值有:

'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x.get("text_column"))) 
         .with_output_types(unicode)) 
+0

謝謝!我實際上只是嘗試了以下工作,以及'Split'>>(beam.FlatMap(lambda x:re.findall(r'[A-Za-z'] +',x [「text_column」]) ).with_output_types(unicode))'這是簡單的語法問題,還是在操作方式上有任何真正的區別? – reese0106

+0

這不是語法,它是這些源的不同行爲 - 一個輸出原始文本,另一個輸出字典。你可以有資源返回任何東西。 –