2017-08-11 58 views
3

氣流新手。嘗試運行sql並將結果存儲在BigQuery表中。AssertionError:內部:沒有指定默認項目

獲取跟隨錯誤。不知道在哪裏設置default_rpoject_id。

請幫幫我。

錯誤:

Traceback (most recent call last): 
    File "/usr/local/bin/airflow", line 28, in <module> 
    args.func(args) 
    File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 585, in test 
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) 
    File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 53, in wrapper 
    result = func(*args, **kwargs) 
    File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run 
    result = task_copy.execute(context=context) 
    File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/bigquery_operator.py", line 82, in execute 
    self.allow_large_results, self.udf_config, self.use_legacy_sql) 
    File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/bigquery_hook.py", line 228, in run_query 
    default_project_id=self.project_id) 
    File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/bigquery_hook.py", line 917, in _split_tablename 
    assert default_project_id is not None, "INTERNAL: No default project is specified" 
AssertionError: INTERNAL: No default project is specified 

代碼:

sql_bigquery = BigQueryOperator(
     task_id='sql_bigquery', 
     use_legacy_sql=False, 
     write_disposition='WRITE_TRUNCATE', 
     allow_large_results=True, 
     bql=''' 
      #standardSQL 
       SELECT ID, Name, Group, Mark, RATIO_TO_REPORT(Mark) OVER(PARTITION BY Group) AS percent FROM `tensile-site-168620.temp.marks` 
       ''', 
     destination_dataset_table='temp.percentage', 
     dag=dag 
     ) 

回答

2

編輯:我終於通過簡單地添加在BigQueryOperator任務bigquery_conn_id='bigquery'參數修正了這個問題,運行下面的代碼在一個單獨的Python腳本後, 。

顯然您需要在Airflow UI的Admin - > Connection中指定您的項目ID。您必須將其作爲JSON對象執行,如「project」:「」。

就我個人而言,我無法獲得使用GCP的網絡服務器,因此這是不可行的。這裏有一個程序化的解決方案:

from airflow.models import Connection 
from airflow.settings import Session 

session = Session() 
gcp_conn = Connection(
    conn_id='bigquery', 
    conn_type='google_cloud_platform', 
    extra='{"extra__google_cloud_platform__project":"<YOUR PROJECT HERE>"}') 
if not session.query(Connection).filter(
     Connection.conn_id == gcp_conn.conn_id).first(): 
    session.add(gcp_conn) 
    session.commit() 

但我不知道該把它放在哪裏。這些建議來自a similar question here.

+1

您也可以通過CLI創建連接,使其可以自動化:http://airflow.readthedocs.io/en/v1-9-stable/cli.html#connections –

相關問題