1

我正在使用Cloud Dataflow,PubSub & Bigquery閱讀JSON Pubsub消息,使用TableRowJsonCoder將JSON轉換爲表格,然後將它們寫入Bigquery。Cloud Dataflow,PubSub&Bigquery(TableRowJsonCoder)問題

我的問題是一致性,下面的代碼有時會起作用。沒有錯誤被拋出。我確定我正在向Pubsub主題正確發佈消息。我也確信Dataflow正在讀取每條消息。我使用gcloud命令行工具測試了這一點。

gcloud beta pubsub subscriptions pull --auto-ack SUBSCRIPTION-NAME 

在那裏我有兩個訂閱的主題,一個讀數據流和一個讀我在終端。該代碼還成功地將JSON數據格式化爲表格格式,並將其寫入到我指定的數據集和表格中,感覺如下:(

我的假設是我並不真正瞭解發生了什麼,而我如果我發送50條消息,看起來數據流只讀取大約一半的元素,這是我的第一個問題,這是關於元素的問題嗎?視爲一定數量的字節或消息?我如何解決這個問題?我正在用TableRowJSONCoder讀取數據。

然後再次出現了類似的問題,對於X元素,只有一小部分能成功通過Groupbykey。如果我能進一步排除故障,我對這個問題的描述會更加深入。請注意,「ID」字段總是不穩定,所以我認爲這不是與重複有關,但我可能是錯的。

即使我寫這封郵件,添加的元素已增加到41 &輸出到bigquery已上升到12.我只是沒有足夠長的時間?我的測試數據是否很小(始終低於100條消息)?即使它最終保存了我所有的行,花了一個多小時,似乎太長了。

dataflow console

The succesfully inserted data

/* 
* Copyright (C) 2015 Google Inc. 
* 
* Licensed under the Apache License, Version 2.0 (the "License"); you may not 
* use this file except in compliance with the License. You may obtain a copy of 
* the License at 
* 
* http://www.apache.org/licenses/LICENSE-2.0 
* 
* Unless required by applicable law or agreed to in writing, software 
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
* License for the specific language governing permissions and limitations under 
* the License. 
*/ 

package com.example; 

import com.google.api.services.bigquery.model.TableFieldSchema; 
import com.google.api.services.bigquery.model.TableReference; 
import com.google.api.services.bigquery.model.TableRow; 
import com.google.api.services.bigquery.model.TableSchema; 
import com.google.cloud.dataflow.sdk.Pipeline; 
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder; 
import com.google.cloud.dataflow.sdk.io.BigQueryIO; 
import com.google.cloud.dataflow.sdk.io.PubsubIO; 
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; 
import com.google.cloud.dataflow.sdk.options.PipelineOptions; 
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; 
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; 
import com.google.cloud.dataflow.sdk.transforms.windowing.Window; 
import com.google.cloud.dataflow.sdk.values.PCollection; 
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; 

import java.util.ArrayList; 
import java.util.List; 

import org.joda.time.Duration; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

/** 
* A starter example for writing Google Cloud Dataflow programs. 
* 
* <p>The example takes two strings, converts them to their upper-case 
* representation and logs them. 
* 
* <p>To run this starter example locally using DirectPipelineRunner, just 
* execute it without any additional parameters from your favorite development 
* environment. 
* 
* <p>To run this starter example using managed resource in Google Cloud 
* Platform, you should specify the following command-line options: 
* --project=<YOUR_PROJECT_ID> 
* --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE> 
* --runner=BlockingDataflowPipelineRunner 
*/ 
public class StarterPipeline { 

    private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class); 

    static final int WINDOW_SIZE = 1; // Default window duration in minutes 

    private final static String PROJECT_ID = "dataflow-project"; 
    private final static String PUBSUB_TOPIC = "projects/dataflow-project/topics/pub-sub-topic"; 
    private final static String DATASET_ID = "test_dataset"; 
    private final static String TABLE_ID = "test_table_version_one"; 


    private static TableSchema getSchema() { 
     List<TableFieldSchema> fields = new ArrayList<>(); 
     fields.add(new TableFieldSchema().setName("id").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("ip").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("installation_id").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("user_id").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("device_type").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("language").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("application_id").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("timestamp").setType("TIMESTAMP")); 
     TableSchema schema = new TableSchema().setFields(fields); 
     return schema; 
    } 

    private static TableReference getTableReference() { 
     TableReference tableRef = new TableReference(); 
     tableRef.setProjectId(PROJECT_ID); 
     tableRef.setDatasetId(DATASET_ID); 
     tableRef.setTableId(TABLE_ID); 
     return tableRef; 
    } 

    public static void main(String[] args) { 
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); 
    DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); 
    dataflowOptions.setStreaming(true); 
    Pipeline pipeline = Pipeline.create(dataflowOptions); 
    LOG.info("Reading from PubSub."); 
    PCollection<TableRow> input = pipeline 
     .apply(PubsubIO.Read.topic(PUBSUB_TOPIC).withCoder(TableRowJsonCoder.of())) 
      .apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1)))); 
    input 
     .apply(BigQueryIO.Write.to(getTableReference()).withSchema(getSchema())); 

    pipeline.run(); 
    } 
} 

而且我感興趣的是指定時間戳和記錄ID爲 「時間戳」 和 「ID」 字段。

+0

這應該確實快得多。如果您的項目中的網絡設置配置錯誤,我以前就看到過這種情況。你能否提供你工作的job_id,以便我可以進一步調查? 有關時間戳/ ID問題,請參閱https://cloud.google.com/dataflow/model/pubsub-io#timestamps-and-record-ids – danielm

+0

@danielm 2017-01-23_09_48_10-1670593411236141809,請注意,該項目上面的id不是正確的。 –

+0

離開管道運行過夜後,Pubsub讀取中添加了63個元素,並生成了17行。瓶頸在於GroupByKey,並且需要很長時間才能從Pubsub中讀取。 –

回答

0

問題是您的GCE虛擬機的網絡配置錯誤。數據流需要虛擬機能夠通過TCP通信,並且您的防火牆規則不允許這樣做。添加防火牆規則以允許虛擬機之間的TCP連接將解決此問題。

一些數據緩慢通過管道的原因是,有時你很幸運,數據只需要在一臺機器上處理。 Pubsub最終會超時並重試消息,因此它們最終都會通過。