2015-11-03 81 views
4

我想使用Airflow執行一個簡單的任務python。氣流 - Python文件不在同一個DAG文件夾中

from __future__ import print_function 
from airflow.operators.python_operator import PythonOperator 
from airflow.models import DAG 
from datetime import datetime, timedelta 


from pprint import pprint 

seven_days_ago = datetime.combine(datetime.today() - timedelta(7), 
            datetime.min.time()) 

args = { 
    'owner': 'airflow', 
    'start_date': seven_days_ago, 
} 

dag = DAG(dag_id='python_test', default_args=args) 


def print_context(ds, **kwargs): 
    pprint(kwargs) 
    print(ds) 
    return 'Whatever you return gets printed in the logs' 

run_this = PythonOperator(
    task_id='print', 
    provide_context=True, 
    python_callable=print_context, 
    dag=dag) 

,如果我嘗試,例如:

airflow test python_test print 2015-01-01

它的工作原理!

現在我想把我的def print_context(ds, **kwargs)函數放在其他python文件中。所以,我創建了一個名爲antoher文件:simple_test.py和變化:

run_this = PythonOperator(
    task_id='print', 
    provide_context=True, 
    python_callable=simple_test.print_context, 
    dag=dag) 

現在我再次嘗試運行:

airflow test python_test print 2015-01-01

和OK!它仍然工作!

但是,如果我創建一個模塊,例如,工作模塊與文件SimplePython.py,進口(from worker import SimplePython),並嘗試:

airflow test python_test print 2015-01-01

它給人的消息:

ImportError: No module named worker

的問題:

  1. 是否可以在DAG定義中導入模塊?
  2. Airflow + Celery如何將所有必需的Python源文件分佈在工作節點上?

回答

0

對於你的第一個問題,這是可能的。

我猜你應該建立在同一目錄下,命名爲__init__.pySimplePython.py一個空文件(這是worker目錄你的情況)。通過這樣做worker目錄將被視爲一個python模塊。

然後在您的DAG定義中,嘗試from worker.SimplePython import print_context

在你的情況下,我想如果你爲氣流寫一個插件會更好,因爲你可能想升級氣流核心項目而不刪除你的定製功能。

0

對於第二個問題:Airflow + Celery如何將所有必需的Python源文件分佈在工作節點上?

來自文檔:工作人員需要訪問其DAGS_FOLDER,並且您需要按照自己的意思同步文件系統。一個常見的設置是將您的DAGS_FOLDER存儲在Git存儲庫中,並使用Chef,Puppet,Ansible或用於在您的環境中配置計算機的任何機器跨計算機進行同步。如果所有的箱子都有一個共同的掛載點,讓您的管道文件共享應該工作以及

http://pythonhosted.org/airflow/installation.html?highlight=chef

4

您可以打包DAG的依賴性按:

https://pythonhosted.org/airflow/concepts.html?highlight=zip#packaged-dags

To allow this you can create a zip file that contains the dag(s) in the root of the zip file and have the extra modules unpacked in directories. For instance you can create a zip file that looks like this:

my_dag1.py 
my_dag2.py 
package1/__init__.py 
package1/functions.py 

Airflow will scan the zip file and try to load my_dag1.py and my_dag2.py. It will not go into subdirectories as these are considered to be potential packages.

當使用CeleryExe cutor,你需要手動同步DAG目錄,氣流不照顧,對你:

https://pythonhosted.org/airflow/configuration.html#scaling-out-with-celery

The worker needs to have access to its DAGS_FOLDER, and you need to synchronize the filesystems by your own means

0

雖然打包的DAG到所涵蓋的文檔拉鍊是唯一支持的解決方案我已經看到,您還可以執行dags文件夾內的模塊導入。如果您使用其他工具(如puppet & git)自動同步dags文件夾,這非常有用。

我不是從這個問題您的目錄結構清晰,所以這裏是根據典型的Python項目結構的例子DAG的文件夾:

└── airflow/dags # root airflow dags folder where all dags live 
    └── my_dags # git repo project root 
     ├── my_dags # python src root (usually named same as project) 
     │   ├── my_test_globals.py # file I want to import 
     │   ├── dag_in_package.py 
     │ └── dags 
     │  └── dag_in_subpackage.py 
     ├── README.md # also setup.py, LICENSE, etc here 
     └── dag_in_project_root.py 

我已經離開了(需要[1])__init__.py文件。請注意三個示例dag的位置。你幾乎肯定會只使用這些地方中的一個來滿足你的所有需求。爲了舉例,我將它們都包括在內,因爲這對進口應該沒有關係。爲了從其中任何一個導入my_test_globals

from my_dags.my_dags import my_test_globals 

我認爲,這意味着氣流負荷的DAG文件夾作爲一個Python包的每個子目錄。在我的情況下,這是額外的中間項目根目錄阻礙了典型的包內絕對導入。因此,我們可以重組該氣流項目是這樣的:

└── airflow/dags # root airflow dags folder where all dags live 
    └── my_dags # git repo project root & python src root 
     ├── my_test_globals.py # file I want to import 
     ├── dag_in_package.py 
     ├── dags 
     │ └── dag_in_subpackage.py 
     ├── README.md # also setup.py, LICENSE, etc here 
     └── dag_in_project_root.py 

使進口看起來我們希望他們:

from my_dags import my_test_globals 
相關問題