2017-03-15 51 views
0

我有一個python函數將數據從其他兩個表格加載到sql server表格中。使用luigi任務替換表格加載函數

def load_table(date1,date2): 
    strDate1 = date1.strftime('%m/%d/%Y') 
    strDate2 = date2.strftime('%m/%d/%Y') 
    stmt = "insert into Agent_Queue (ID) select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2) 
    cnx1= connection string 
    self.curs=cnx1.cursor() 
    self.curs.execute(stmt) 
    self.curs.commit() 

我想這個功能轉換成路易吉任務

遵照docs嘗試以下方法:

class Datetask(luigi.Task): 
    def output(self): 
     return luigi.DateParameter() 

class loading(luigi.Task): 
    def requires(self): 
     return {'date1': DateTask(dt.date(2016,10,30)), 'date2': DateTask(dt.date(2016,11,29))} 

    def run(self): 
     date1 = dict['date1'] 

    date2 = dict['date2'] 
    strDate1 = date1.strftime('%m/%d/%Y') 
    strDate2 = date2.strftime('%m/%d/%Y') 
    stmt = "insert into Agent_Queue (ID) select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2) 
    curs=cnx1.cursor() 
    curs.execute(stmt) 
    curs.commit() 
    curs.close() 

當我嘗試和運行它,我得到的錯誤:

python -m luigi --module load_task loading --local-scheduler 
    DEBUG: Checking if loading() is complete 
/usr/local/lib/python2.7/dist-packages/luigi/worker.py:305: UserWarning: Task loading() without outputs has no custom complete() method 
    is_complete = task.complete() 
WARNING: Will not run loading() or any dependencies due to error in deps() method: 
Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/dist-packages/luigi/worker.py", line 697, in _add 
    deps = task.deps() 
    File "/usr/local/lib/python2.7/dist-packages/luigi/task.py", line 572, in deps 
    return flatten(self._requires()) 
    File "/usr/local/lib/python2.7/dist-packages/luigi/task.py", line 544, in _requires 
    return flatten(self.requires()) # base impl 
    File "load_task.py", line 19, in requires 
    return {'date1': DateTask(dt.date(2016,10,30)), 'date2': DateTask(dt.date(2016,11,29))} 
NameError: global name 'DateTask' is not defined 

我在定義DateTask so erro r令我困惑。

而且,做所有的任務,需要有所有3 需要()運行輸出

此外,是否有必要始終將輸出寫入文件?當涉及到使用路易吉所以希望任何輸入 全新

+0

你有一個錯字在你的任務類定義:它應該是'DateTask',而不是'Datetask'(T必須大寫!) – matagus

+0

除此之外,從DateTask輸出的方法必須返回路易吉'子類。目標'而不是參數。 – matagus

+0

ohh..typo是愚蠢的..我讀了關於使用luigi.Target將其寫入本地文本文件。有沒有其他方式來訪問date1和date2? –

回答

2

我覺得這樣的事情會更好地工作:

class LoadTask(luigi.Task): 
    date1 = luigi.DateParameter() 
    date2 = luigi.DateParameter()   

    def requires(self): 
     return None 

    def output(self): 
     return luigi.LocalTarget("{0}-{1}.txt".format(self.date1, self.date2)) 

    def run(self): 
     strDate1 = self.date1.strftime('%m/%d/%Y') 
     strDate2 = self.date2.strftime('%m/%d/%Y') 
     stmt = "insert into Agent_Queue (ID) select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2) 
     curs=cnx1.cursor() 
     curs.execute(stmt) 
     curs.commit() 
     curs.close() 
     with self.output().open('w') as out_file: 
      print >> out_file, strDate1, strDate2 

調用與:

luigi --module load_task LoadTask --date1 2017-01-01 --date2 2017-01-02 --local-scheduler 

另外,盡任務需要全部3個需要(),運行,輸出?

是的。雖然有任務類型,如luigi.WrapperTask不需要output(),你可以從requires()返回None,如果你是在鏈中的第一個任務,等

此外,有必要始終寫輸出一份文件?

否。例如,SQL Alchemy contrib模塊定義了一個Target子類,您可以將其用作數據庫中的目標。 http://luigi.readthedocs.io/en/stable/api/luigi.contrib.sqla.html