2017-08-16 332 views
1

我正在編寫一個處理谷歌雲存儲桶視頻的數據流管道。我的管道將每個工作項下載到本地系統,然後將結果重新上傳到GCP存儲桶。之前的question從gcp下載文件到本地系統時Google Cloud Dataflow神祕的消息

該管道適用於本地DirectRunner,我在調試DataFlowRunnner時遇到了麻煩。

錯誤讀取

File "run_clouddataflow.py", line 41, in process 
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 464, in download_to_file self._do_download(transport, file_obj, download_url, headers) 
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 418, in _do_download download.consume(transport) File "/usr/local/lib/python2.7/dist-packages/google/resumable_media/requests/download.py", line 101, in consume self._write_to_stream(result) 
File "/usr/local/lib/python2.7/dist-packages/google/resumable_media/requests/download.py", line 62, in _write_to_stream with response: AttributeError: __exit__ [while running 'Run DeepMeerkat'] 

當試圖執行blob.download_to_file(file_obj)內:

storage_client=storage.Client() 
bucket = storage_client.get_bucket(parsed.hostname) 
blob=storage.Blob(parsed.path[1:],bucket) 

#store local path 
local_path="/tmp/" + parsed.path.split("/")[-1] 

print('local path: ' + local_path) 
with open(local_path, 'wb') as file_obj: 
    blob.download_to_file(file_obj) 

print("Downloaded" + local_path) 

我猜,工人不在允許本地寫?或者,數據流容器中可能沒有/ tmp文件夾。我應該在哪裏寫對象?無法訪問環境很難進行調試。是否有可能從工人在進行調試的訪問標準輸出(串行控制檯?)

編輯#1

我已經試過明確地傳遞憑據:

try: 
     credentials, project = google.auth.default() 
    except: 
     os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = known_args.authtoken 
     credentials, project = google.auth.default() 

以及寫CWD() ,而不是/ tmp目錄/

local_path=parsed.path.split("/")[-1] 

print('local path: ' + local_path) 
with open(local_path, 'wb') as file_obj: 
    blob.download_to_file(file_obj) 

仍然得到來自GCP BLOB下載了神祕的錯誤。

完整流水線腳本如下,setup.py是here

import logging 
import argparse 
import json 
import logging 
import os 
import csv 
import apache_beam as beam 
from urlparse import urlparse 
from google.cloud import storage 

##The namespaces inside of clouddataflow workers is not inherited , 
##Please see https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors, better to write ugly import statements then to miss a namespace 

class PredictDoFn(beam.DoFn): 
    def process(self,element): 

    import csv 
    from google.cloud import storage 
    from DeepMeerkat import DeepMeerkat 
    from urlparse import urlparse 
    import os 
    import google.auth 


    DM=DeepMeerkat.DeepMeerkat() 

    print(os.getcwd()) 
    print(element) 

    #try adding credentials? 
    #set credentials, inherent from worker 
    credentials, project = google.auth.default() 

    #download element locally 
    parsed = urlparse(element[0]) 

    #parse gcp path 
    storage_client=storage.Client(credentials=credentials) 
    bucket = storage_client.get_bucket(parsed.hostname) 
    blob=storage.Blob(parsed.path[1:],bucket) 

    #store local path 
    local_path=parsed.path.split("/")[-1] 

    print('local path: ' + local_path) 
    with open(local_path, 'wb') as file_obj: 
     blob.download_to_file(file_obj) 

    print("Downloaded" + local_path) 

    #Assign input from DataFlow/manifest 
    DM.process_args(video=local_path) 
    DM.args.output="Frames" 

    #Run DeepMeerkat 
    DM.run() 

    #upload back to GCS 
    found_frames=[] 
    for (root, dirs, files) in os.walk("Frames/"): 
     for files in files: 
     fileupper=files.upper() 
     if fileupper.endswith((".JPG")): 
      found_frames.append(os.path.join(root, files)) 

    for frame in found_frames: 

     #create GCS path 
     path="DeepMeerkat/" + parsed.path.split("/")[-1] + "/" + frame.split("/")[-1] 
     blob=storage.Blob(path,bucket) 
     blob.upload_from_filename(frame) 

def run(): 
    import argparse 
    import os 
    import apache_beam as beam 
    import csv 
    import logging 
    import google.auth 

    parser = argparse.ArgumentParser() 
    parser.add_argument('--input', dest='input', default="gs://api-project-773889352370-testing/DataFlow/manifest.csv", 
         help='Input file to process.') 
    parser.add_argument('--authtoken', default="/Users/Ben/Dropbox/Google/MeerkatReader-9fbf10d1e30c.json", 
         help='Input file to process.') 
    known_args, pipeline_args = parser.parse_known_args() 

    #set credentials, inherent from worker 
    try: 
     credentials, project = google.auth.default() 
    except: 
     os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = known_args.authtoken 
     credentials, project = google.auth.default() 

    p = beam.Pipeline(argv=pipeline_args) 

    vids = (p|'Read input' >> beam.io.ReadFromText(known_args.input) 
     | 'Parse input' >> beam.Map(lambda line: csv.reader([line]).next()) 
     | 'Run DeepMeerkat' >> beam.ParDo(PredictDoFn())) 

    logging.getLogger().setLevel(logging.INFO) 
    p.run() 

if __name__ == '__main__': 
    logging.getLogger().setLevel(logging.INFO) 
    run() 
+0

您可以在雲用戶界面上登錄並讀取日誌數據。這足夠嗎?另外,您應該可以寫入本地磁盤。我會盡快回復你。 – Pablo

+0

謝謝巴勃羅。我正在檢查新的google.auth模塊,也可能是工作人員沒有從數據流繼承我的憑證。我只是增加了嘗試: 憑證,項目= google.auth.default() 除外: os.environ [ 「GOOGLE_APPLICATION_CREDENTIALS」] = known_args.authtoken 證書,項目= google.auth.default() – bw4sz

+0

添加到編輯。 – bw4sz

回答