mysql
  • python-3.x
  • airflow
  • 2017-10-10 153 views 5 likes 
    5
    def mysql_operator_test(): 
        DEFAULT_DATE = datetime(2017, 10, 9) 
        t = MySqlOperator(
         task_id='basic_mysql', 
         sql="SELECT count(*) from table 1 where id>100;", 
         mysql_conn_id='mysql_default', 
         dag=dag) 
        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=False) 
    
    run_this = PythonOperator(
        task_id='getRecoReq', 
        python_callable=mysql_operator_test, 
        # xcom_push=True, 
        dag=dag) 
    
    task2 = PythonOperator(
        task_id= 'mysql_select', 
        provide_context=True, 
        python_callable = blah, 
        templates_dict = {'requests': "{{ ti.xcom_pull(task_ids='getReq') }}" }, 
        dag=dag) 
    
    run_this.set_downstream(task2) 
    

    我想捕捉使用xcoms的MySqlOperator返回的計數。有人可以請相同的指導?如何使用氣流xcoms與MySqlOperator

    回答

    1

    你很近!然而,你問這個問題的方式是一種反模式。您不想在Airflow中的任務間共享數據。另外,您不想使用像mysql_operator_test那樣的操作員。這很誘人,我在開始時做了同樣的事情。

    我嘗試了一些與此類似的東西,但使用SFTP連接。我最終完成了PythonOperator中的所有內容並使用了底層鉤子。

    我建議你在python_callable的內部使用MySQLHook。事情是這樣的:

    def count_mysql_and_then_use_the_count(): 
        """ 
        Returns an SFTP connection created using the SSHHook 
        """ 
        mysql_hook = MySQLHook(...) 
        cur = conn.cursor() 
        cur.execute("""SELECT count(*) from table 1 where id>100""") 
        for count in cur: 
         # Do something with the count... 
    

    我不知道這是否會工作,爲信息,但這個想法是使用鉤你的Python裏面調用,我不使用MySQLHook常常但我這樣做是與SSHHook,它一直很好。

    相關問題