2017-06-19 95 views
2

我有三個運營商的一個簡單的DAG。第一個是PythonOperator與我們自己的功能,另外兩個是標準的運營商從airflow.contribFileToGoogleCloudStorageOperatorGoogleCloudStorageToBigQueryOperator要準確)。他們按順序工作。我們的自定義任務會生成許多文件,通常在2到5之間,具體取決於參數。所有這些文件都必須由後續任務單獨處理。這意味着我需要幾個下游分支機構,但在運行DAG之前有多少個分支機構是不可知的。如何巢氣流DAG動態?

你會如何解決這個問題?

UPDATE:

使用jhnclvr在他another reply提到的出發點BranchPythonOperator,我創建了一個運營商,將跳過或繼續執行分支,視情況而定。這種方法是可行的,只是因爲儘可能多的分支已知且足夠小。

操作:

class SkipOperator(PythonOperator): 
    def execute(self, context): 
     boolean = super(SkipOperator, self).execute(context) 
     session = settings.Session() 
     for task in context['task'].downstream_list: 
      if boolean is False: 
       ti = TaskInstance(
        task, execution_date=context['ti'].execution_date) 
       ti.state = State.SKIPPED 
       ti.start_date = datetime.now() 
       ti.end_date = datetime.now() 
       session.merge(ti) 
     session.commit() 
     session.close() 

用法:

def check(i, templates_dict=None, **kwargs): 
    return len(templates_dict["data_list"].split(",")) > i 

dag = DAG(
    dag_name, 
    default_args=default_args, 
    schedule_interval=None 
) 

load = CustomOperator(
    task_id="load_op", 
    bash_command=' '.join([ 
     './command.sh' 
     '--data-list {{ dag_run.conf["data_list"]|join(",") }}' 
    ]), 
    dag=dag 
) 

for i in range(0, 5): 
    condition = SkipOperator(
     task_id=f"{dag_name}_condition_{i}", 
     python_callable=partial(check, i), 
     provide_context=True, 
     templates_dict={"data_list": '{{ dag_run.conf["data_list"]|join(",") }}'}, 
     dag=dag 
    ) 
    gs_filename = 'prefix_{{ dag_run.conf["data_list"][%d] }}.json' % i 

    load_to_gcs = CustomFileToGoogleCloudStorageOperator(
     task_id=f"{dag_name}_to_gs_{i}", 
     src='/tmp/{{ run_id }}_%d.{{ dag_run.conf["file_extension"] }}' % i, 
     bucket=gs_bucket, 
     dst=gs_filename, 
     mime_type='application/json', 
     google_cloud_storage_conn_id=connection_id, 
     dag=dag 
    ) 
    load_to_bq = GoogleCloudStorageToBigQueryOperator(
     task_id=f"{dag_name}_to_bq_{i}", 
     bucket=gs_bucket, 
     source_objects=[gs_filename, ], 
     source_format='NEWLINE_DELIMITED_JSON', 
     destination_project_dataset_table='myproject.temp_{{ dag_run.conf["data_list"][%d] }}' % i, 
     bigquery_conn_id=connection_id, 
     schema_fields={}, 
     google_cloud_storage_conn_id=connection_id, 
     write_disposition='WRITE_TRUNCATE', 
     dag=dag 
    ) 

    condition.set_upstream(load) 
    load_to_gcs.set_upstream(condition) 
    load_to_bq.set_upstream(load_to_gcs) 

回答

2

See a similar (but different) question here

基本上,你不能任務時,它的運行添加到DAG。您需要提前知道要添加多少任務。

你可以使用一個單一的運營商處理N個文件。

或者,如果您有另一個單獨的DAG是處理一個文件,你可以觸發DAG N次,通過在conf文件的名稱。

See here for an example of the TriggerDagRunOperator.

See here for the DAG that would be triggered.

And lastly see this post from which the above examples are from.