2017-05-30 82 views
0

我有一個簡單的氣流工作流程,由兩個任務組成。其中一個會下載包含庫存數據的csv文件。另一個提取最大股價並將數據寫入另一個文件。氣流不滿足任務依賴關係

如果我運行第一個任務,然後第二個一切工作正常,而不是如果執行:airflow運行stocks_d get_max_share它無法滿足依賴項。

import csv 
from datetime import datetime 
from datetime import timedelta 
from airflow import DAG 
from airflow.operators.python_operator import PythonOperator 
import requests 


def get_stock_data(): 
    url = "https://app.quotemedia.com/quotetools/getHistoryDownload.csv?&webmasterId=501&startDay=02&startMonth=02&startYear=2002&endDay=02&endMonth=07&endYear=2009&isRanged=false&symbol=APL" 
    try: 
     r = requests.get(url) 
    except requests.RequestException as re: 
     raise 
    else: 
     with open('/tmp/stocks/airflow_stock_data.txt', 'w') as f: 
      f.write(r.text) 

def get_max_share(): 
    stock_data = [] 
    stock_max = {} 
    with open('/tmp/stocks/airflow_stock_data.txt', 'r') as f: 
     stock_reader = csv.reader(f) 
     next(stock_reader, None) 
     for row in stock_reader: 
      stock_data.append(row) 

    for stock in stock_data: 
     stock_max[stock[2]] = stock[0] 

    with open('/tmp/stocks/max_stock', 'w') as f: 
     stock_price = max(stock_max.keys()) 
     stock_max_price_date = stock_max[stock_price] 
     stock_entry = stock_max_price_date + ' -> ' + stock_price 
     f.write(stock_entry) 


default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2017, 5, 30), 
    'email': ['[email protected]'], 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=5), 
    'catchup': False, 
    } 

dag = DAG('stocks_d', default_args=default_args, schedule_interval=timedelta(minutes=5)) 


task_get_stocks = PythonOperator(task_id='get_stocks', python_callable=get_stock_data, dag=dag) 
task_get_max_share = PythonOperator(task_id='get_max_share', python_callable=get_max_share, dag=dag) 

task_get_max_share.set_upstream(task_get_stocks) 

任何想法爲什麼會發生這種情況?

回答

0

$ airflow runs stocks_d get_max_share 上面的命令只運行get_max_share任務而不是之前的任務,然後再運行它。

如果需要檢查整個DAG運行,請嘗試以下命令 $氣流trigger_dag stocks_d

+0

沒有任何反應。沒有文件被創建。 – bsd