2017-09-15 46 views
0

我是新來氣流。在我的ETL管道公司目前我們正在使用Crontab和自定義調度(內部開發)。現在我們正在計劃實施Apache的氣流爲我們所有的數據管道 - 爲此,在探索無法爲每個任務實例/ Dag找到unique_id的功能時。我搜索了大多數以宏和模板結尾的解決方案。但是它們都沒有爲任務提供uniqueID。但我可以看到每個任務的用戶界面中增量uniqueID。是否有任何方法可以輕鬆地訪問我的python方法內的變量。主要用例是我需要將這些ID作爲參數傳遞給Python/ruby​​/Pentaho被稱爲腳本/方法的作業。獲取unique_id爲apache氣流任務

對於實例

我的shell腳本 'test.sh' 需要兩個參數之一run_id和其他被collection_id。目前,我們正在創造從集中式數據庫這個獨特的run_id並將它傳遞給作業。如果它已經存在於空氣背景下,我們要使用

from airflow.operators.bash_operator import BashOperator 
from datetime import date, datetime, timedelta 
from airflow import DAG 

shell_command = "/data2/test.sh -r run_id -c collection_id" 


putfiles_s3 = BashOperator(
       task_id='putfiles_s3', 
       bash_command=shell_command, 
       dag=dag) 

尋找一個獨特的run_id(無論是達格水平/任務級別)執行此Dag時(預定/手動)

注意:這是一個示例任務。將有多個從屬任務到此Dag。從氣流UI enter image description here

感謝 Anoop [R

+0

包括你的代碼 –

+0

你看過UUID嗎? https://stackoverflow.com/questions/534839/how-to-create-a-guid-uuid-in-python#534851 –

+0

@MicahElliott感謝您的建議。我們可以生成像這樣的隨機id或shell隨機命令。我正在尋找一些由airflow本身生成的id,就像job_id一樣。附上Airflow UI的屏幕截圖供參考。 –

回答

1

{{ ti.job_id }} 。附接JOB_ID截圖是你想要什麼:

from datetime import datetime, timedelta 
from airflow.operators.bash_operator import BashOperator 
from airflow import DAG 


dag = DAG(
    "job_id", 
    start_date=datetime(2018, 1, 1), 
) 

with dag: 
    BashOperator(
     task_id='unique_id', 
     bash_command="echo {{ ti.job_id }}", 
    ) 

這將是在運行時有效。從該執行日誌是這樣的:

[2018-01-03 10:28:37,523] {bash_operator.py:80} INFO - Temporary script location: /tmp/airflowtmpcj0omuts//tmp/airflowtmpcj0omuts/unique_iddq7kw0yj 
[2018-01-03 10:28:37,524] {bash_operator.py:88} INFO - Running command: echo 4 
[2018-01-03 10:28:37,621] {bash_operator.py:97} INFO - Output: 
[2018-01-03 10:28:37,648] {bash_operator.py:101} INFO - 4 

請注意,這隻會是有效的在運行,所以「繪製的模板」視圖在WebUI將顯示無,而不是數量。

+0

{{ti.job_id}}我可以與任何運算符一起使用,並且可以作爲參數傳遞給Python方法嗎?如果您不介意可以向我展示一個將此值傳遞給python方法的示例 Thanks Ash Berlin-泰勒。 –

+0

我得到的溶液用於使同一對蟒方法 DEF test_failure(** kwargs): 打印的實例變量從上下文訪問' TI = kwargs [ '鈦'] 打印ti.job_id @Ash是有任何文件通過「task_instance」給出了所有可用的值。這個URL沒有解釋太多有關「ti」的信息。https://pythonhosted.org/airflow/code.html#macros –

+0

現在不行,不。 「ti」是一個TaskInstance https://pythonhosted.org/airflow/code.html#airflow.models.TaskInstance,但沒有記錄該對象的任何屬性,所以我轉到代碼。 –