2017-07-18 72 views
1

我正嘗試構建數據流程來幫助將數據存儲到Google雲端存儲中來存檔數據。我有一個包含client_id和一些元數據的PubSub事件數據流。這個過程應該歸檔所有傳入的事件,所以這需要成爲一個流媒體管道。基於元素值的使用數據流寫入Google雲端存儲

我希望能夠通過將每個接收到的事件放入類似gs://archive/client_id/eventdata.json的存儲桶中來處理歸檔事件。有可能在dataflow/apache beam中做,特別是能夠爲PCollection中的每個事件分配不同的文件名?

編輯: 所以我的代碼目前的樣子:

public static class PerWindowFiles extends FileBasedSink.FilenamePolicy { 

private String customerId; 

public PerWindowFiles(String customerId) { 
    this.customerId = customerId; 
} 

@Override 
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) { 
    String filename = bucket+"/"+customerId; 
    return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE); 
} 

@Override 
public ResourceId unwindowedFilename(
    ResourceId outputDirectory, Context context, String extension) { 
    throw new UnsupportedOperationException("Unsupported."); 
} 
} 


public static void main(String[] args) throws IOException { 
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args) 
    .withValidation() 
    .as(DataflowPipelineOptions.class); 
options.setRunner(DataflowRunner.class); 
options.setStreaming(true); 
Pipeline p = Pipeline.create(options); 

PCollection<Event> set = p.apply(PubsubIO.readStrings() 
            .fromTopic("topic")) 
    .apply(new ConvertToEvent())); 

PCollection<KV<String, Event>> events = labelEvents(set); 
PCollection<KV<String, EventGroup>> sessions = groupEvents(events); 

String customers = System.getProperty("CUSTOMERS"); 
JSONArray custList = new JSONArray(customers); 
for (Object cust : custList) { 
    if (cust instanceof String) { 
    String customerId = (String) cust; 
    PCollection<KV<String, EventGroup>> custCol = sessions.apply(new FilterByCustomer(customerId)); 
      stringifyEvents(custCol) 
       .apply(TextIO.write() 
               .to("gs://archive/") 
               .withFilenamePolicy(new PerWindowFiles(customerId)) 
               .withWindowedWrites() 
               .withNumShards(3)); 
    } else { 
    LOG.info("Failed to create TextIO: customerId was not String"); 
    } 
} 

p.run() 
    .waitUntilFinish(); 
} 

這段代碼是醜陋的,因爲我需要重新部署每一個新客戶發生在爲了能夠保存其數據的時間。我寧願能夠將客戶數據動態分配到適當的存儲桶。

+0

請提供您嘗試的代碼示例以及您在軟件開發中正確面對的困難。 – Fabien

+0

這是我編輯的工作代碼,它依賴所有客戶端的JSON數組作爲要部署的maven命令的一部分傳入。這顯然是不理想的代碼。 –

回答

2

「動態目標」 - 根據正在寫入的元素選擇文件名 - 將成爲Beam 2.1.0中的一項新功能,該功能尚未發佈。

+0

有點令人失望,但謝謝你的回答。 Beam 2.1.0發佈有預期的時間表嗎? –

+0

我們目前正在投票發佈候選人。如果沒有阻塞問題,這將只是幾天。 –

相關問題