2016-11-22 108 views
1

我們目前有一個Python Apache Beam管道工作並能夠在本地運行。我們現在正在使管道運行在Google Cloud Dataflow上,並且完全自動化,但是Dataflow/Apache Beam的流水線監控存在限制。Python Apache Beam Pipeline狀態API調用

目前,Cloud Dataflow有兩種方法可以通過它們的UI界面或命令行中的gcloud來監控您的管道狀態。這兩種解決方案都不適用於完全自動化的解決方案,我們可以考慮無損文件處理。

看着阿帕奇Beam的github上,他們有一個文件,internal/apiclient.py,顯示有用於找工作的狀態的功能,get_job

我們發現get_job的一個實例是runners/dataflow_runner.py

最終目標是使用此API來獲取我們自動觸發運行的一個或多個作業的狀態,以確保它們最終都通過管道成功處理。

任何人都可以向我們解釋在運行我們的管道(p.run())之後如何使用此API?我們不明白response = runner.dataflow_client.get_job(job_id)runner來自哪裏。

如果有人可以提供更大的理解我們如何在設置/運行我們的管道時訪問此API調用,那將是非常好的!

回答

1

我最終只是擺弄了代碼,發現瞭如何獲得工作細節。我們的下一步是看看是否有辦法獲得所有工作的清單。

# start the pipeline process 
pipeline     = p.run() 
# get the job_id for the current pipeline and store it somewhere 
job_id     = pipeline.job_id() 
# setup a job_version variable (either batch or streaming) 
job_version    = dataflow_runner.DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION 
# setup "runner" which is just a dictionary, I call it local 
local     = {} 
# create a dataflow_client 
local['dataflow_client'] = apiclient.DataflowApplicationClient(pipeline_options, job_version) 
# get the job details from the dataflow_client 
print local['dataflow_client'].get_job(job_id)