有人已經提到過streaming data solution,但如果您嘗試移動大塊日誌數據而不是建立連續流,則可能需要採用異步加載作業的路徑。
的GCS library的作用類似於谷歌應用程序引擎一起使用時,可存儲在雲端存儲分區的導入文件最Python文件庫:
import cloudstorage as gcs
filePath = "/CloudStorageBucket/dir/dir/logs.json"
with gcs.open(filePath, "w") as f:
f.write(SomeLogData)
f.close()
您可以指示大查詢裝載CSV或換行的列表在雲存儲,-delimited JSON文件由通過API創建負荷工作:(注意:您需要use oauth 2)
from apiclient.discovery import build
service = build("bigquery", "v2", http = oAuthedHttp)
job = {
"configuration": {
"load": {
"sourceUris": ["gs://CloudStorageBucket/dir/dir/logs.json"],
"schema": {
"files" : [
{"name": "Column1",
"type": "STRING"},
...
]
},
"destinationTable": {
"projectId": "Example-BigQuery-ProjectId",
"datasetId": "LogsDataset",
"tableId": "LogsTable"
},
"sourceFormat" : "NEWLINE_DELIMITED_JSON"
"createDisposition": "CREATE_IF_NEEDED"
}
}
}
response = service.jobs().insert(
projectId = "Example-BigQuery-ProjectId",
body = job
).execute()
你可以閱讀更多有關如何創建Big Query load jobs如果你想設置OT她的屬性喜歡在CSV文件中編寫處置或跳過行。您還可以看到如何加載數據的other good examples,包括命令行提示。
編輯:
要回答你的更具體的問題:
這是可行的解決方案?
是的。我們使用延期任務將我們的Google App Engine日誌導出到雲存儲並導入到BigQuery。有些人使用map reduce jobs,但如果你不需要洗牌或減少,這可能是矯枉過正的。
日誌數據的結構變化經常會導致錯誤 當插入到BigQuery中。我們將如何在python腳本中處理它?
它不應該是一個問題,除非您在消息到達大查詢之前解析消息。更好的設計是將消息,時間戳,級別等移植到Big Query,然後在那裏使用查詢進行消化。
Incase,我們必須在特定的時間段內重新運行日誌數據。我們如何做到這一點?需要編寫python腳本?
流式處理數據不會給你備份,除非你自己在BigQuery中設置它們。使用我上面概述的方法會自動爲您提供Google雲端存儲的備份,這是首選。
知道BigQuery是一個OLAP數據庫,不是事務性的,所以每次添加更多日誌數據時最好重建表,而不是嘗試插入新數據。這是違反直覺的,但是BigQuery是爲此而設計的,因爲它一次是can import 10,000 files/1TB。使用工作分配的分頁,理論上可以很快地導入數十萬條記錄。如果您不關心備份日誌,則數據流將是理想的。
你能給我更多的細節嗎?我是Google雲解決方案的新成員。 – user3769827 2014-09-05 16:46:09
如果您將原始數據流入bigquery,那麼您始終可以在BigQuery上處理該數據以轉換並加載到其他表中。它解決了如何重新運行比寫腳本更容易的IMO數據的部分問題。 – koma 2014-09-05 19:09:40