apache-beam

    0熱度

    1回答

    我正在嘗試將我的管道響應寫入Google存儲,但獲取已安裝在服務器上的模塊導入錯誤。 代碼: from __future__ import print_function, absolute_import import apache_beam as beam from apache_beam.io import ReadFromText from apache_beam.io import W

    0熱度

    1回答

    我有我要在Apache梁管道與數據流轉輪內使用本地Python包。 我試圖按照文檔中提供的說明:https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/(部分本地或非PyPI將依賴),但沒有成功。 我的包具有以下結構: my_common ├── __init__.py └── shared ├─

    2熱度

    1回答

    我正在構建一個將在Google Cloud Dataflow中運行的Apache Beam(v2.0)管道。預期流程如下: 來自Pub/Sub的事件流(無界數據源)。它們是簡單的JSON對象,具有sessionId屬性。 使用自定義的DoFn事件到KV<String, String>,其中他們的關鍵是sessionId並且該值是整個JSON對象。 使用會話窗口的窗口事件(開發時間間隔爲2秒,生產時

    0熱度

    1回答

    我在存儲CSV文件,我想讀它,並將其寫入BigQuery資料表。這是我的CSV文件,其中第一行是標題: GroupName,Groupcode,GroupOwner,GroupCategoryID System Administrators,sysadmin,13456,100 Independence High Teachers,HS Teachers,,101 John Glenn Mi

    9熱度

    3回答

    我明白在F#功能組合物的基礎,如,例如,描述here。 也許我失去了一些東西,但。該>>和<<運營商似乎已經與假設定義的每個函數只需要一個參數: > (>>);; val it : (('a -> 'b) -> ('b -> 'c) -> 'a -> 'c) = <fun:[email protected]> > (<<);; val it : (('a -> 'b) -> ('c -> '

    0熱度

    1回答

    我試圖修改Apache Beam的MinimalWordCount python示例以從BigQuery表中讀取。我做了以下修改,我似乎有查詢工作,但例如。 原來的例子在這裏: with beam.Pipeline(options=pipeline_options) as p: # Read the text file[pattern] into a PCollection.

    1熱度

    1回答

    我正在嘗試創建一個非常簡單的Beam管道,它需要PubSub消息並將其寫入BigQuery。消息以字符串形式出現,我需要將其轉換爲TableRow以將其寫入BigQuery。我,對於我的生活,找不到一個簡單的方法來做到這一點。我的表格在這一點上只是一個列。任何建議?

    4熱度

    3回答

    在分佈式處理環境中,通常使用「part-000」等「part」文件名,是否可以編寫某種擴展名來重命名個別輸出文件名(例如per窗口文件名)Apache Beam? 要做到這一點,人們可能必須能夠爲窗口指定名稱或根據窗口內容推斷文件名。我想知道這種方法是否可行。 至於溶液是否應該被流式傳輸或批次,流模式的例子是優選的

    0熱度

    1回答

    我正面臨着一個奇怪的問題,那就是我在帶有Apache Beam的流BigQuery表上實現了一個小的delta作業。 我正在將數據流式傳輸到BigQuery表,並且每運行一小時我都會將任何新記錄從該流式表複製到協調錶中。增量構建在我在流表上引入的CreateDatetime列的頂部。一旦記錄被加載到流表中,它將獲得當前的UTC時間戳。因此,三角洲自然會取得所有具有比上一次更新的CreateDate

    0熱度

    1回答

    有什麼方法可以知道程序中BigQueryIO.write()操作是否已成功完成? 它返回WriteResult,但在其中找不到任何相關的方法。或者在那裏?