1
A
回答
1
可能最好使用PythonOperator
來逐行處理文件。我有一個用於輪詢和SFTP服務器文件的用例,當我找到一些時,我逐行處理它們,並將結果寫成JSON。我不喜歡的東西解析日期爲YYYY-MM-DD格式等這樣的事情可能爲你工作:
def csv_file_to_kafka(**context):
f = '/path/to/downloaded/csv_file.csv'
csvfile = open(f, 'r')
reader = csv.DictReader(csvfile)
for row in reader:
"""
Send the row to Kafka
"""
return
csv_file_to_kafka = PythonOperator(
task_id='csv_file_to_kafka',
python_callable=csv_file_to_kafka,
dag=dag
)
現在,它真的取決於你將如何得到要下載的文件。在我的情況下,我使用SSHHook
和GoogleCloudStorageHook
從SFTP服務器獲取文件,然後將這些文件的名稱傳遞給解析和清理csv文件的任務。我通過SFTP拉低文件,並把它們放入谷歌雲存儲做到這一點:
"""
HOOKS: Connections to external systems
"""
def sftp_connection():
"""
Returns an SFTP connection created using the SSHHook
"""
ssh_hook = SSHHook(ssh_conn_id='sftp_connection')
ssh_client = ssh_hook.get_conn()
return ssh_client.open_sftp()
def gcs_connection():
"""
Returns an GCP connection created using the GoogleCloudStorageHook
"""
return GoogleCloudStorageHook(google_cloud_storage_conn_id='my_gcs_connection')
"""
PYTHON CALLABLES: Called by PythonOperators
"""
def get_files(**context):
"""
Looks at all files on the FTP server and returns a list files.
"""
sftp_client = sftp_connection()
all_files = sftp_client.listdir('/path/to/files/')
files = []
for f in all_files:
files.append(f)
return files
def save_files(**context):
"""
Looks to see if a file already exists in GCS. If not, the file is downloaed
from SFTP server and uploaded to GCS. A list of
"""
files = context['task_instance'].xcom_pull(task_ids='get_files')
sftp_client = sftp_connection()
gcs = gcs_connection()
new_files = []
new_outcomes_files = []
new_si_files = []
new_files = process_sftp_files(files, gcs, sftp_client)
return new_files
def csv_file_to_kafka(**context):
"""
Untested sample parse csv files and send to kafka
"""
files = context['task_instance'].xcom_pull(task_ids='save_files')
for f in new_files:
csvfile = open(f, 'r')
reader = csv.DictReader(csvfile)
for row in reader:
"""
Send the row to Kafka
"""
return
get_files = PythonOperator(
task_id='get_files',
python_callable=get_files,
dag=dag
)
save_files = PythonOperator(
task_id='save_files',
python_callable=save_files,
dag=dag
)
csv_file_to_kafka = PythonOperator(
task_id='csv_file_to_kafka',
python_callable=csv_file_to_kafka,
dag=dag
)
我知道我可以做到這一切在一個大蟒蛇可調用的,這就是我現在如何重構代碼,以便在可調用。因此,它輪詢SFTP服務器,提取最新的文件,並根據我的規則解析它們,所有這些都在一個Python函數中。我聽說使用XCom並不理想,據推測,Airflow任務不應該彼此交流太多。
根據您的使用情況,您甚至可能想要探索Apache Nifi之類的東西,我現在也正在研究它。
相關問題
- 1. 將kafka(kafka-python)轉儲到txt文件
- 2. 是否可以使用Kafka傳輸文件?
- 3. 將DBF文件傳輸到EXCEL到PHP
- 4. 使用按鈕將值傳輸到文本文件中php
- 5. 將日誌文件kafka移動到hadoop
- 6. 使用TCP傳輸文件
- 7. 使用FTP傳輸文件
- 8. 使用FTP傳輸文件
- 9. 使用c#傳輸文件
- 10. 使用python傳輸文件
- 11. 使用WCF傳輸文件
- 12. 將大量大文件傳輸到s3
- 13. 將CSV文件從iPhone傳輸到MacBook
- 14. 自動將csv文件傳輸到MySQL
- 15. 將回聲傳送到輸出文件
- 16. 將文件傳輸到Windows服務
- 17. 將文件傳輸到Tomcat位置
- 18. 如何將.frm文件傳輸到表?
- 19. 將文件傳輸到消息代理
- 20. 將100個Excel文件傳輸到MySQL
- 21. 如何使用java將文件從url傳輸到ftp?
- 22. 使用SNMP將文件從代理傳輸到管理器?
- 23. 如何使用ubuntu將文件傳輸到安卓android 14.04
- 24. 使用pscp將文件從windows傳輸到linux包裝盒
- 25. 如何使用SSIS包將文件傳輸到SFTP路徑?
- 26. 使用gstreamer將本地mpeg-ts文件流式傳輸到udp
- 27. 如何使用ssh將文件從Windows傳輸到Linux?
- 28. 使用Windows機器將.ipa文件傳輸到iPad中
- 29. 我可以使用Apache NIFI將文件傳輸到HDFS系統。
- 30. 使用WCF將2-10MB文件傳輸到非.Net客戶端
你是真的將這些文件加載到文件中,還是將它們加入到文件中?氣流確實支持配料/微配料,但對於流媒體來說,我的經驗表明它不是太好,基本上就像_nano_-batching。我對遠程主機上的CSV文件進行了大量輪詢,並將它們作爲批次拉入BigQuery中。 – Mike
我逐行處理它們並將每行發送到kafka。 – bsd