2017-10-10 237 views
0

我正在學習Airflow並有一個簡單的問題。下面是我的DAG稱爲dog_retriever如何訪問來自Airflow的響應SimpleHttpOperator GET請求

import airflow 
from airflow import DAG 
from airflow.operators.http_operator import SimpleHttpOperator 
from airflow.operators.sensors import HttpSensor 
from datetime import datetime, timedelta 
import json 



default_args = { 
    'owner': 'Loftium', 
    'depends_on_past': False, 
    'start_date': datetime(2017, 10, 9), 
    'email': '[email protected]', 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'retries': 3, 
    'retry_delay': timedelta(minutes=3), 
} 

dag = DAG('dog_retriever', 
    schedule_interval='@once', 
    default_args=default_args) 

t1 = SimpleHttpOperator(
    task_id='get_labrador', 
    method='GET', 
    http_conn_id='http_default', 
    endpoint='api/breed/labrador/images', 
    headers={"Content-Type": "application/json"}, 
    dag=dag) 

t2 = SimpleHttpOperator(
    task_id='get_breeds', 
    method='GET', 
    http_conn_id='http_default', 
    endpoint='api/breeds/list', 
    headers={"Content-Type": "application/json"}, 
    dag=dag) 

t2.set_upstream(t1) 

至於測試出氣流的手段,我只是在這個非常簡單的http://dog.ceo API做兩個GET請求一些端點。我們的目標是學習如何處理通過Airflow獲取的一些數據

執行正在執行 - 我的代碼成功地調用了任務t1和t2中的enpoints,我可以看到它們以正確的順序記錄在Airflow UI中基於我寫的set_upstream規則。

我無法弄清楚如何訪問這2個任務的json響應。這似乎很簡單,但我無法弄清楚。在SimpleHtttpOperator中,我看到了response_check的一個參數,但沒有簡單地打印,存儲或查看json響應。

謝謝。

回答

2

所以,因爲這是SimpleHttpOperator,實際的json被推送到XCOM,你可以從那裏獲得它。下面是代碼的行動路線:https://github.com/apache/incubator-airflow/blob/master/airflow/operators/http_operator.py#L87

你需要做的是設置xcom_push=True,使你的第一個T1將是以下幾點:

t1 = SimpleHttpOperator(
    task_id='get_labrador', 
    method='GET', 
    http_conn_id='http_default', 
    endpoint='api/breed/labrador/images', 
    headers={"Content-Type": "application/json"}, 
    xcom_push=True, 
    dag=dag) 

你應該能夠找到所有JSON與return value在XCOM中,XCOM的更多細節可以在以下網址找到:https://airflow.incubator.apache.org/concepts.html#xcoms

+0

謝謝@Chengzhi,這個作品。雖然我認爲我從現在開始簡單地使用PythonOperator。 –