2016-12-15 55 views
3

我想讀取一個csv文件並使用apache beam數據流將其寫入BigQuery。爲了做到這一點,我需要以字典的形式將數據呈現給BigQuery。我如何使用apache beam來轉換數據以執行此操作?如何將csv轉換爲apache beam數據流中的字典

我的輸入csv文件有兩列,我想在BigQuery中創建一個後續的兩列表。我知道如何在BigQuery中創建數據,這是直接的,我不知道如何將csv轉換爲字典。下面的代碼是不正確的,但應該告訴我想要做什麼。

# Standard imports 
import apache_beam as beam 
# Create a pipeline executing on a direct runner (local, non-cloud). 
p = beam.Pipeline('DirectPipelineRunner') 
# Create a PCollection with names and write it to a file. 
(p 
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv')) 
# How do you do this?? 
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v}) 
| 'save' >> beam.Write(
    beam.io.BigQuerySink(
    output_table, 
    schema='month:INTEGER, tornado_count:INTEGER', 
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))) 
p.run() 

回答

8

這個想法是有一個返回解析CSV行的源。您可以通過繼承FileBasedSource類來包含CSV解析來實現此目的。特別是,read_records功能會是這個樣子:

class MyCsvFileSource(apache_beam.io.filebasedsource.FileBasedSource): 
    def read_records(self, file_name, range_tracker): 
    self._file = self.open_file(file_name) 

    reader = csv.reader(self._file) 

    for rec in reader: 
     yield rec 

我最近寫了一CsvFileSource爲Apache梁。你可以看看the Github repository。您可以使用pip install beam_utilsfrom beam_utils.sources import CsvFileSource來使用它。 CsvFileSource還包括設置自定義分隔符,跳過文件頭和/或輸出字典而不是列表的選項。

+1

非常感謝巴勃羅,這個作品非常好!這裏是一個代碼片段,以防人們在尋找完整性(p |'read solar data'>> beam.Read(CsvFileSource('./ sensor1_121116.csv')) |'save'>> beam.Write(beam .io.TextFileSink('./ greetings_solar'))) – user1753640

+1

我正在嘗試將結果寫入BigQuery,但沒有運氣,表格被創建但沒有數據。你能告訴發生了什麼事嗎?這裏是一個片段(p |'read solar data'>> beam.Read(CsvFileSource('./ sensor1_121116.csv')) |'save'>> beam.Write( beam.io.BigQuerySink( output_table , 模式= 'lumosity:INTEGER,時間:INTEGER', create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE))) – user1753640

+0

@ user1753640:我有同樣的問題,不得不在將數據存儲到GBQ之前,使用匹配模式的字典。 – vdolez

0

作爲Pablo職位的補充,我想分享一下我對自己的樣本所做的一些改變。 (+1你!)

更改: reader = csv.reader(self._file)reader = csv.DictReader(self._file)

csv.DictReader使用CSV文件作爲字典鍵的第一行。其他行用於使用它的值填充每行的字典。它會根據列順序自動將正確的值放到正確的鍵上。

一個小細節是Dict中的每個值都以字符串形式存儲。如果您使用eg,這可能會與您的BigQuery架構發生衝突。 INTEGER對於某些字段。所以你之後需要注意正確的鑄造。