2017-09-11 222 views
1

我想觸發simplehttpoperator,像這樣: 氣流trigger_dag test_trigger --conf '{ 「名」: 「東西」}'如何將參數從pythonoperator任務傳遞到airflow中的simplehttpoperator任務dag?

然後我用pythonoperator python_callable使用kwargs [ 'dag_run' 接受的參數] .conf,我想將['dag_run']。conf傳遞給simplehttpoperator,我該怎麼做?任何人都可以幫忙

cc_ = {} 


def run_this_func(ds, **kwargs): 
    cc_ = kwargs['dag_run'].conf 
    logging.info(cc_) 
    return cc_ 

run_this = PythonOperator(
    task_id='run_this', 
    provide_context=True, 
    python_callable=run_this_func, 
    dag=dag) 

http_task = SimpleHttpOperator(
    task_id='http_task', 
    http_conn_id='test_http', 
    method='POST', 
    endpoint='/api/v1/function', 
    data=cc_, 
    headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*"}, 
    response_check=lambda response: True if "10000" in response.content else False, 
    dag=dag) 

http_task.set_upstream(run_this) 

回答

0

因爲,你可能要檢查的XCOM任務之間的通信,https://airflow.incubator.apache.org/concepts.html#xcoms

***** UPDATE *****
(感謝丹尼爾更詳細) 下面是一些代碼你可以試試看,在你SimpleHttpOperator您通過XCOM得到返回值:

http_task = SimpleHttpOperator(
    task_id='http_task', 
    http_conn_id='test_http', 
    method='POST', 
    endpoint='/api/v1/function', 
    data=json.loads("{{ task_instance.xcom_pull(task_ids='run_this', key='return_value') }}"), 
    headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*"}, 
    response_check=lambda response: True if "10000" in response.content else False, 
    dag=dag) 
+0

但如何在simplehttpoperator中使用XCOM?你可以給任何案件代碼? – pyfroggogogo

+0

@pyfroggogogo,我更新了一些代碼示例,嘗試這個工作 – Chengzhi

+1

模板必須作爲字符串傳遞。您可以使用'data = json.loads('{{... | tojson}}')'在渲染後將其恢復爲字典類型。 –

0

感謝@Chengzhi和@Daniel。 最後我在Jinja2/filter.py中寫了一個自定義過濾器'tojson',因爲在airflow中默認的Jinja2版本是2.8.1,而Jinja2在2.9版之前不包含名爲'tojson'的內建過濾器。

def do_tojson(value): 
    value = json.JSONEncoder().encode(value) 
    return value 

在dag文件中,代碼如下。有用。

def run_this_func(ds, **kwargs): 
    cc_ = kwargs['dag_run'].conf 
    return cc_ 

run_this = PythonOperator(
    task_id='run_this', 
    provide_context=True, 
    python_callable=run_this_func, 
    dag=dag) 

http_task = SimpleHttpOperator(
    task_id='http_task', 
    http_conn_id='test_http', 
    method='POST', 
    endpoint='/api/v1/task', 
    data="{{ task_instance.xcom_pull(task_ids='run_this') |tojson}}", 
    headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*", 
      "Content-Type": "application/json"}, 
    response_check=lambda response: True if "10000" in response.content else False, 
    dag=dag) 

http_task.set_upstream(run_this) 
相關問題