2017-07-16 32 views
1

讓我們來看看我能否在這個問題上解釋我自己。在AIRFLOW中訪問修改的氣流變量作爲S3傳感器的自定義參數

請參閱我的S3文件,我從格式錯誤的客戶那裏收到。例如,日期顯示爲小破折號,例如「2017_07_10」。

由於我想訪問它們以便能夠下載它們,首先我有一個任務是氣流中的S3傳感器。它看起來像這樣:

xxx = S3KeySensor(
    task_id='task_name', 
    bucket_key=BUCKET_KEY, 
    wildcard_match=True, 
    params={'yesterday_ds_formatted': ????}, 
    provide_context=True, 
    bucket_name=BUCKET_NAME, 
    s3_conn_id=S3_CONN_ID, 
    timeout=18 * 60 * 60, 
    poke_interval=120, 
    dag=dag) 

在氣流控制檯變量部分我有與模板變量bucket_key {{yesterday_ds_formatted}}。

例如:'folder1/folder2/folder3/blablablablabla - {{params.yesterday_ds_formatted}} *。csv

我需要修改該模板變量或其他東西,以便抓取{{yesterday_ds}}並用「_」替換「 - 」 」。

我該怎麼做那些傢伙?我無法設法使其工作... 我已經嘗試調用自定義python函數時,設置參數,但然後我不能訪問「DS」,甚至沒有與kwargs。好像我不能事先訪問模板變量。

謝謝!

回答

0

如果我理解這個權利,您想使用jinja模板作爲bucket_key參數,但S3KeySensor不支持。

一個簡單的方法來做到這將是從S3KeySensor繼承自定義傳感器,像這樣:

TemplatedS3KeySensor(S3KeySensor): 
    template_fields = ('bucket_key',) 
相關問題