0

我對Google雲端平臺相當陌生,我首次嘗試使用Google Dataflow開展我的研究生課程項目。我想要做的是編寫一個自動加載作業,從我的Cloud Storage上的特定存儲區加載文件,並將其中的數據插入到BigQuery表中。Google Dataflow:PCollection <String>到PCollection <TableRow>適用於BigQuery插入

我得到的數據爲PCollection<String>型,但對於插入BigQuery中的我顯然需要將其轉換爲一個PCollection<TableRow>類型。到目前爲止,我還沒有找到一個可靠的答案來做到這一點。

這裏是我的代碼:

public static void main(String[] args) { 
    //Defining the schema of the BigQuery table 
    List<TableFieldSchema> fields = new ArrayList<>(); 
    fields.add(new TableFieldSchema().setName("Datetime").setType("TIMESTAMP")); 
    fields.add(new TableFieldSchema().setName("Consumption").setType("FLOAT")); 
    fields.add(new TableFieldSchema().setName("MeterID").setType("STRING")); 
    TableSchema schema = new TableSchema().setFields(fields); 

    //Creating the pipeline 
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); 
    Pipeline p = Pipeline.create(options); 

    //Getting the data from cloud storage 
    PCollection<String> lines = p.apply(TextIO.Read.named("ReadCSVFromCloudStorage").from("gs://mybucket/myfolder/certainCSVfile.csv")); 

    //Probably need to do some transform here ... 

    //Inserting data into BigQuery 
    lines.apply(BigQueryIO.Write 
      .named("WriteToBigQuery") 
      .to("projectID:datasetID:tableID") 
      .withSchema(schema) 
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); 
} 

我可能只是忘了一些基本的東西,所以我希望你們能幫助我解決這個...

+0

你應該參考https://cloud.google.com/dataflow/model/par-do - 這顯示瞭如何使用變換一個帕爾多'PCollection ' 。 –

回答

3

BigQueryIO.Write運行在PCollection<TableRow>,如Writing to BigQuery概述。您需要應用轉換才能將PCollection<TableRow>轉換爲PCollection<String>。舉一個例子,看看StringToRowConverter

static class StringToRowConverter extends DoFn<String, TableRow> { 
    /** 
    * In this example, put the whole string into single BigQuery field. 
    */ 
    @Override 
    public void processElement(ProcessContext c) { 
     c.output(new TableRow().set("string_field", c.element())); 
    } 
    ... 
}