將固定數量的字符串(用於測試的800,000個1KB)放入PubSub主題並在以下版本中運行以下Apache Beam(2.1.0)作業數據流,正好一次保留語義,如預期。當從PubSub讀取數據流作業並寫入Google Cloud Storage時數據丟失
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
public class PubSubToGsSimpleJob {
public static void main(String[] args) {
PubSubToGsPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(PubSubToGsPipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.readStrings().fromSubscription(options.getInput()))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()));
p.run();
}
}
PipelineOptions
以下實施
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
public interface PubSubToGsPipelineOptions extends PipelineOptions {
@Description("PubSub subscription")
String getInput();
void setInput(String input);
@Description("Google Cloud Storage output path")
String getOutput();
void setOutput(String output);
}
但是,如果同樣的工作運行時,所有的元素之前倒掉讀取(如數據流控制檯中顯示),並再次拉開序幕,輸出文件與發佈到PubSub主題中的原始數據集相比,記錄數少。這意味着排除和替換作業可能會導致數據丟失,這似乎很奇怪,因爲this google cloud blog post提到Drain and replace
應該至少有一次語義。如何設計這條管道,以便在排乾和更換作業時至少實現一次語義(或者更好但恰好一次語義)?
這就是說,可能是TextIO可以/應該更好地處理這種情況(如果上面的猜測是正確的)。請確認更改輸出路徑是否保留所有記錄。這將確認Dataflow的保證。 –
我找不到匹配日誌模式的行。配置替換作業以寫入單獨的輸出至少產生一次語義。有一些重複的記錄意味着一次只使用PubSub作爲源的語義不被維護。 該博客文章還提到'如果您的自定義源擔保恰好一次交付,並提供源側的緩衝,排水和更換可以提供恰好一次semantics.'因爲PubSub的消息可以用一個ACK響應之前進行緩衝,這是否建議PubSub只能提供一次語義? – JonSim
感謝您確認工作。我將爲TextIO提交一個錯誤。我很驚訝你看到重複。在這種情況下,PubSub應該提供完全一次的語義。如果你可以提供更多信息(和job_id id,你可以),這將是非常有用的。我當然有興趣挖掘更多的重複來源。 –