發生了!我是Cloud-DataFlow的新手。OutOfMemoryError使用TextIO.Read
我使用DataflowPipelineRunner讀取一個CSV文件,並將結果輸出到BigQuery.It效果很好,當CSV文件的尺寸很小(只有20條記錄,小於1MB),但去OOM錯誤,而該文件的大小變得巨大(超過1000萬條記錄,大約616.42 MB)。
以下是錯誤消息:
java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at com.google.cloud.dataflow.sdk.util.StreamUtils.getBytes(StreamUtils.java:63) at co.coder.MyCoder.decode(MyCoder.java:54) at co.coder.MyCoder.decode(MyCoder.java:1) at com.google.cloud.dataflow.sdk.io.TextIO$TextSource$TextBasedReader.decodeCurrentElement(TextIO.java:1065) at com.google.cloud.dataflow.sdk.io.TextIO$TextSource$TextBasedReader.readNextRecord(TextIO.java:1052) at com.google.cloud.dataflow.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:536) at com.google.cloud.dataflow.sdk.io.OffsetBasedSource$OffsetBasedReader.advance(OffsetBasedSource.java:287) at com.google.cloud.dataflow.sdk.runners.worker.WorkerCustomSources$BoundedReaderIterator.advance(WorkerCustomSources.java:541) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation$SynchronizedReaderIterator.advance(ReadOperation.java:425) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:217) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:182) at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:69) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:284) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:220) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:170) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:192) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:172) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:159) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
從錯誤信息,在發生錯誤[MyCoder.java:54].MyCoder是我實現CustomCoder的子類,它是要編碼的CSV文件從按住Shift JIS爲UTF-8:
53:@Override
54:public String decode(InputStream inStream, Context context) throws CoderException, IOException {
55: if (context.isWholeStream) {
56: byte[] bytes = StreamUtils.getBytes(inStream);
57: return new String(bytes, Charset.forName("Shift_JIS"));
58: } else {
59: try {
60: return readString(new DataInputStream(inStream));
61: } catch (EOFException | UTFDataFormatException exn) {
62: // These exceptions correspond to decoding problems, so change
63: // what kind of exception they're branded as.
64: throw new CoderException(exn);
65: }
66: }
67:}
和ALSE,這裏是我跑了DataflowPipelineRunner:
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setProject(projectId);
options.setStagingLocation(stagingFolderPathInGCS);
options.setWorkerMachineType("n1-highmem-4");
options.setMaxNumWorkers(5);
Pipeline p = Pipeline.create(options);
// read csv from gcs
PCollection<String> lines = p.apply(TextIO.Read.named("csv input")
.from("gs://" + bucketName + "/original/" + fileName).withCoder(MyCoder.of()));
lines.apply(TextIO.Write.named("csv output").to("gs://" + bucketName + "/encoded/" + fileName)
.withCoder(StringUtf8Coder.of()).withoutSharding().withHeader("test Header"));
p.run();
因爲Dataflow是一個可擴展的大數據雲服務,所以我對這個OOM錯誤有點困惑,任何人都可以向我解釋爲什麼[OutOfMemoryError]發生了,以及如何解決它?
非常感謝!
你應該嘗試通過線和工藝這樣TII閱讀您的文件一致。在這個時候,如果一個600MB的大文件正在創建一個字符串。在這裏,你得到的是異常 – ZeusNet
你好,ZeusNet,謝謝你的回覆。我將GCE的機器類型設置爲「n1-highmem-4」,這意味着虛擬機的內存應該是13GB,即使是不逐行讀取它的問題,600MB的字符串似乎沒有達到內存的限制。 .. – xialin
您的管道定義中沒有任何內容會導致OOM。使用'withoutSharding()'會嚴重限制性能,因爲寫操作無法並行化,但這不會導致崩潰。在你的編碼器中調用了什麼'readString()'? –