2017-05-30 87 views
1

我目前正試圖運行一個數據流(Apache的光束的Python SDK)任務可以導入> 100GB分享Tweet檔案匯入BigQuery,但運行到Error: Message: Too many sources provided: 15285. Limit is 10000.錯誤:消息:太多的來源提供:15285.限制是10000

任務需要推文(JSON),提取5個相關字段,用一些變換對它們進行變換/清理,然後將這些值寫入BigQuery中,這些值將用於進一步處理。

Cloud Dataflow to BigQuery - too many sources但它似乎是由於有很多不同的輸入文件,而我有一個單一的輸入文件,所以它似乎並不相關。此外,這裏提到的解決方案相當神祕,我不確定是否/如何將它們應用於我的問題。

我的猜測是BigQuery在爲每行或其他東西寫入臨時文件之前堅持它們,這就是「太多來源」的含義?

我該如何解決這個問題?

[編輯]

代碼:

import argparse 
import json 
import logging 

import apache_beam as beam 

class JsonCoder(object): 
    """A JSON coder interpreting each line as a JSON string.""" 

    def encode(self, x): 
     return json.dumps(x) 

    def decode(self, x): 
     return json.loads(x) 

def filter_by_nonempty_county(record): 
    if 'county_fips' in record and record['county_fips'] is not None: 
     yield record 

def run(argv=None): 

    parser = argparse.ArgumentParser() 
    parser.add_argument('--input', 
         default='...', 
         help=('Input twitter json file specified as: ' 
          'gs://path/to/tweets.json')) 
    parser.add_argument(
     '--output', 
     required=True, 
     help= 
     ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE ' 
     'or DATASET.TABLE.')) 

    known_args, pipeline_args = parser.parse_known_args(argv) 



    p = beam.Pipeline(argv=pipeline_args) 

    # read text file 

    #Read all tweets from given source file 
    read_tweets = "Read Tweet File" >> beam.io.ReadFromText(known_args.input, coder=JsonCoder()) 

    #Extract the relevant fields of the source file 
    extract_fields = "Project relevant fields" >> beam.Map(lambda row: {'text': row['text'], 
                    'user_id': row['user']['id'], 
                    'location': row['user']['location'] if 'location' in row['user'] else None, 
                    'geo':row['geo'] if 'geo' in row else None, 
                    'tweet_id': row['id'], 
                    'time': row['created_at']}) 


    #check what type of geo-location the user has 
    has_geo_location_or_not = "partition by has geo or not" >> beam.Partition(lambda element, partitions: 0 if element['geo'] is None else 1, 2) 


    check_county_not_empty = lambda element, partitions: 1 if 'county_fips' in element and element['county_fips'] is not None else 0 

    #tweet has coordinates partition or not 
    coordinate_partition = (p 
      | read_tweets 
      | extract_fields 
      | beam.ParDo(TimeConversion()) 
      | has_geo_location_or_not) 


    #lookup by coordinates 
    geo_lookup = (coordinate_partition[1] | "geo coordinates mapping" >> beam.ParDo(BeamGeoLocator()) 
          | "filter successful geo coords" >> beam.Partition(check_county_not_empty, 2)) 

    #lookup by profile 
    profile_lookup = ((coordinate_partition[0], geo_lookup[0]) 
         | "join streams" >> beam.Flatten() 
         | "Lookup from profile location" >> beam.ParDo(ComputeLocationFromProfile()) 
        ) 


    bigquery_output = "write output to BigQuery" >> beam.io.Write(
     beam.io.BigQuerySink(known_args.output, 
        schema='text:STRING, user_id:INTEGER, county_fips:STRING, tweet_id:INTEGER, time:TIMESTAMP, county_source:STRING', 
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
       write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) 

    #file_output = "write output" >> beam.io.WriteToText(known_args.output, coder=JsonCoder()) 


    output = ((profile_lookup, geo_lookup[1]) | "merge streams" >> beam.Flatten() 
       | "Filter entries without location" >> beam.FlatMap(filter_by_nonempty_county) 
       | "project relevant fields" >> beam.Map(lambda row: {'text': row['text'], 
                    'user_id': row['user_id'], 
                    'county_fips': row['county_fips'], 
                    'tweet_id': row['tweet_id'], 
                    'time': row['time'], 
                    'county_source': row['county_source']}) 
       | bigquery_output) 

    result = p.run() 
    result.wait_until_finish() 

if __name__ == '__main__': 
    logging.getLogger().setLevel(logging.DEBUG) 
    run() 

這是一個有點複雜,所以它可能會花費太多時間,直接做在BigQuery中。該代碼讀取推文json,通過是否進行地理標記來拆分PCollection,如果不是,則會嘗試通過配置文件位置查找它,將地圖映射到與我們的GIS分析相關的位置,然後將其寫入BigQuery。

+0

你能分享你的代碼嗎?另外,您是否必須使用Beam進行轉換,即您可以在BigQuery中執行該操作,即將文件加載到GCS中,將BigQuery指向它,然後轉換(如果您更喜歡後)。 –

+0

我在編輯中添加了代碼 – Zenon

+0

您使用的是哪個版本的SDK? –

回答

1

文件的數目對應於所述元素進行處理的碎片的數量。

一招至降低這是產生一些隨機的鍵和成組寫出來之前基於所述元素。

例如,你可以使用下面的在您的管道DoFnPTransform

class _RoundRobinKeyFn(beam.DoFn): 
    def __init__(self, count): 
    self.count = count 

    def start_bundle(self): 
    self.counter = random.randint(0, self.count - 1) 

    def process(self, element): 
    self.counter += 1 
    if self.counter >= self.count: 
     self.counter -= self.count 
    yield self.counter, element 

class LimitBundles(beam.PTransform): 
    def __init__(self, count): 
    self.count = count 

    def expand(self, input): 
    return input 
     | beam.ParDo(_RoundRobinKeyFn(this.count)) 
     | beam.GroupByKey() 
     | beam.FlatMap(lambda kv: kv[1]) 

你只想用這個bigquery_output前:

output = (# ... 
     | LimitBundles(10000) 
     | bigquery_output) 

(請注意,我只是在輸入這個沒有測試它,所以可能會出現一些Python錯別字。)

相關問題