2017-10-16 468 views
1

使用氣流將CSV文件傳輸到kafka主題的最佳方法是什麼?使用airflow將文件傳輸到kafka

寫一個自定義的運算符用於氣流?

+0

你是真的將這些文件加載​​到文件中,還是將它們加入到文件中?氣流確實支持配料/微配料,但對於流媒體來說,我的經驗表明它不是太好,基本上就像_nano_-batching。我對遠程主機上的CSV文件進行了大量輪詢,並將它們作爲批次拉入BigQuery中。 – Mike

+0

我逐行處理它們並將每行發送到kafka。 – bsd

回答

1

可能最好使用PythonOperator來逐行處理文件。我有一個用於輪詢和SFTP服務器文件的用例,當我找到一些時,我逐行處理它們,並將結果寫成JSON。我不喜歡的東西解析日期爲YYYY-MM-DD格式等這樣的事情可能爲你工作:

def csv_file_to_kafka(**context): 

    f = '/path/to/downloaded/csv_file.csv' 
    csvfile = open(f, 'r') 
    reader = csv.DictReader(csvfile) 

    for row in reader: 
     """ 
     Send the row to Kafka 
     """ 
    return 

csv_file_to_kafka = PythonOperator(
    task_id='csv_file_to_kafka', 
    python_callable=csv_file_to_kafka, 
    dag=dag 
) 

現在,它真的取決於你將如何得到要下載的文件。在我的情況下,我使用SSHHookGoogleCloudStorageHook從SFTP服務器獲取文件,然後將這些文件的名稱傳遞給解析和清理csv文件的任務。我通過SFTP拉低文件,並把它們放入谷歌雲存儲做到這一點:

""" 
HOOKS: Connections to external systems 
""" 
def sftp_connection(): 
    """ 
    Returns an SFTP connection created using the SSHHook 
    """ 
    ssh_hook = SSHHook(ssh_conn_id='sftp_connection') 
    ssh_client = ssh_hook.get_conn() 
    return ssh_client.open_sftp() 
def gcs_connection(): 
    """ 
    Returns an GCP connection created using the GoogleCloudStorageHook 
    """ 
    return GoogleCloudStorageHook(google_cloud_storage_conn_id='my_gcs_connection') 

""" 
PYTHON CALLABLES: Called by PythonOperators 
""" 
def get_files(**context): 
    """ 
    Looks at all files on the FTP server and returns a list files. 
    """ 
    sftp_client = sftp_connection() 
    all_files = sftp_client.listdir('/path/to/files/') 
    files = [] 

    for f in all_files: 
     files.append(f) 

    return files 

def save_files(**context): 
    """ 
    Looks to see if a file already exists in GCS. If not, the file is downloaed 
    from SFTP server and uploaded to GCS. A list of 
    """ 
    files = context['task_instance'].xcom_pull(task_ids='get_files') 

    sftp_client = sftp_connection() 
    gcs = gcs_connection() 
    new_files = [] 
    new_outcomes_files = [] 
    new_si_files = [] 

    new_files = process_sftp_files(files, gcs, sftp_client) 

    return new_files 

def csv_file_to_kafka(**context): 
    """ 
    Untested sample parse csv files and send to kafka 
    """ 
    files = context['task_instance'].xcom_pull(task_ids='save_files') 
    for f in new_files: 
     csvfile = open(f, 'r') 
     reader = csv.DictReader(csvfile) 

     for row in reader: 
      """ 
      Send the row to Kafka 
      """ 
    return 

get_files = PythonOperator(
    task_id='get_files', 
    python_callable=get_files, 
    dag=dag 
) 
save_files = PythonOperator(
    task_id='save_files', 
    python_callable=save_files, 
    dag=dag 
) 
csv_file_to_kafka = PythonOperator(
    task_id='csv_file_to_kafka', 
    python_callable=csv_file_to_kafka, 
    dag=dag 
) 

我知道我可以做到這一切在一個大蟒蛇可調用的,這就是我現在如何重構代碼,以便在可調用。因此,它輪詢SFTP服務器,提取最新的文件,並根據我的規則解析它們,所有這些都在一個Python函數中。我聽說使用XCom並不理想,據推測,Airflow任務不應該彼此交流太多。

根據您的使用情況,您甚至可能想要探索Apache Nifi之類的東西,我現在也正在研究它。