2017-11-17 156 views
0

將固定數量的字符串(用於測試的800,000個1KB)放入PubSub主題並在以下版本中運行以下Apache Beam(2.1.0)作業數據流,正好一次保留語義,如預期。當從PubSub讀取數據流作業並寫入Google Cloud Storage時數據丟失

import org.apache.beam.sdk.Pipeline; 
import org.apache.beam.sdk.io.TextIO; 
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; 
import org.apache.beam.sdk.options.PipelineOptionsFactory; 
import org.apache.beam.sdk.transforms.windowing.FixedWindows; 
import org.apache.beam.sdk.transforms.windowing.Window; 
import org.joda.time.Duration; 

public class PubSubToGsSimpleJob { 

    public static void main(String[] args) { 
     PubSubToGsPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() 
       .as(PubSubToGsPipelineOptions.class); 
     Pipeline p = Pipeline.create(options); 

     p.apply(PubsubIO.readStrings().fromSubscription(options.getInput())) 
       .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1)))) 
       .apply(TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput())); 
     p.run(); 
    } 

} 

PipelineOptions以下實施

import org.apache.beam.sdk.options.Description; 
import org.apache.beam.sdk.options.PipelineOptions; 

public interface PubSubToGsPipelineOptions extends PipelineOptions { 
    @Description("PubSub subscription") 
    String getInput(); 
    void setInput(String input); 

    @Description("Google Cloud Storage output path") 
    String getOutput(); 
    void setOutput(String output); 
} 

但是,如果同樣的工作運行時,所有的元素之前倒掉讀取(如數據流控制檯中顯示),並再次拉開序幕,輸出文件與發佈到PubSub主題中的原始數據集相比,記錄數少。這意味着排除和替換作業可能會導致數據丟失,這似乎很奇怪,因爲this google cloud blog post提到Drain and replace應該至少有一次語義。如何設計這條管道,以便在排乾和更換作業時至少實現一次語義(或者更好但恰好一次語義)?

回答

0

我的猜測是,一個窗口可能會部分寫入之前,排水和替換工作用其餘的窗口覆蓋它。您可以檢查這個日誌行in WriteFiles的排水作業和替換作業中的工人日誌。如果您使用Beam HEAD,它還會在最終目的地被覆蓋時進行記錄。

從概念上消耗的作業和替換作業是完全不同的管道。使用相同的輸出位置與其他兩個不相關的作業使用相同的輸出位置沒有區別。您可以嘗試的另一件事是爲第二個作業使用不同的輸出路徑,並驗證所有記錄都存在於兩個目錄中。

+0

這就是說,可能是TextIO可以/應該更好地處理這種情況(如果上面的猜測是正確的)。請確認更改輸出路徑是否保留所有記錄。這將確認Dataflow的保證。 –

+0

我找不到匹配日誌模式的行。配置替換作業以寫入單獨的輸出至少產生一次語義。有一些重複的記錄意味着一次只使用PubSub作爲源的語義不被維護。 該博客文章還提到'如果您的自定義源擔保恰好一次交付,並提供源側的緩衝,排水和更換可以提供恰好一次semantics.'因爲PubSub的消息可以用一個ACK響應之前進行緩衝,這是否建議PubSub只能提供一次語義? – JonSim

+0

感謝您確認工作。我將爲TextIO提交一個錯誤。我很驚訝你看到重複。在這種情況下,PubSub應該提供完全一次的語義。如果你可以提供更多信息(和job_id id,你可以),這將是非常有用的。我當然有興趣挖掘更多的重複來源。 –