2016-12-26 68 views
0

發生了!我是Cloud-DataFlow的新手。OutOfMemoryError使用TextIO.Read

我使用DataflowPipelineRunner讀取一個CSV文件,並將結果輸出到BigQuery.It效果很好,當CSV文件的尺寸很小(只有20條記錄,小於1MB),但去OOM錯誤,而該文件的大小變得巨大(超過1000萬條記錄,大約616.42 MB)。

以下是錯誤消息:

java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at com.google.cloud.dataflow.sdk.util.StreamUtils.getBytes(StreamUtils.java:63) at co.coder.MyCoder.decode(MyCoder.java:54) at co.coder.MyCoder.decode(MyCoder.java:1) at com.google.cloud.dataflow.sdk.io.TextIO$TextSource$TextBasedReader.decodeCurrentElement(TextIO.java:1065) at com.google.cloud.dataflow.sdk.io.TextIO$TextSource$TextBasedReader.readNextRecord(TextIO.java:1052) at com.google.cloud.dataflow.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:536) at com.google.cloud.dataflow.sdk.io.OffsetBasedSource$OffsetBasedReader.advance(OffsetBasedSource.java:287) at com.google.cloud.dataflow.sdk.runners.worker.WorkerCustomSources$BoundedReaderIterator.advance(WorkerCustomSources.java:541) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation$SynchronizedReaderIterator.advance(ReadOperation.java:425) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:217) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:182) at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:69) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:284) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:220) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:170) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:192) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:172) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:159) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

從錯誤信息,在發生錯誤[MyCoder.java:54].MyCoder是我實現CustomCoder的子類,它是要編碼的CSV文件從按住Shift JIS爲UTF-8:

53:@Override 
54:public String decode(InputStream inStream, Context context) throws CoderException, IOException { 
55: if (context.isWholeStream) { 
56:  byte[] bytes = StreamUtils.getBytes(inStream); 
57:  return new String(bytes, Charset.forName("Shift_JIS")); 
58: } else { 
59:  try { 
60:   return readString(new DataInputStream(inStream)); 
61:  } catch (EOFException | UTFDataFormatException exn) { 
62:   // These exceptions correspond to decoding problems, so change 
63:   // what kind of exception they're branded as. 
64:   throw new CoderException(exn); 
65:  } 
66: } 
67:} 

和ALSE,這裏是我跑了DataflowPipelineRunner:

DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); 
    options.setRunner(DataflowPipelineRunner.class); 
    options.setProject(projectId); 
    options.setStagingLocation(stagingFolderPathInGCS); 
    options.setWorkerMachineType("n1-highmem-4"); 
    options.setMaxNumWorkers(5); 
    Pipeline p = Pipeline.create(options); 
    // read csv from gcs 
    PCollection<String> lines = p.apply(TextIO.Read.named("csv input") 
      .from("gs://" + bucketName + "/original/" + fileName).withCoder(MyCoder.of())); 
    lines.apply(TextIO.Write.named("csv output").to("gs://" + bucketName + "/encoded/" + fileName) 
      .withCoder(StringUtf8Coder.of()).withoutSharding().withHeader("test Header")); 
p.run(); 

因爲Dataflow是一個可擴展的大數據雲服務,所以我對這個OOM錯誤有點困惑,任何人都可以向我解釋爲什麼[OutOfMemoryError]發生了,以及如何解決它?

非常感謝!

+0

你應該嘗試通過線和工藝這樣TII閱讀您的文件一致。在這個時候,如果一個600MB的大文件正在創建一個字符串。在這裏,你得到的是異常 – ZeusNet

+0

你好,ZeusNet,謝謝你的回覆。我將GCE的機器類型設置爲「n1-highmem-4」,這意味着虛擬機的內存應該是13GB,即使是不逐行讀取它的問題,600MB的字符串似乎沒有達到內存的限制。 .. – xialin

+0

您的管道定義中沒有任何內容會導致OOM。使用'withoutSharding()'會嚴重限制性能,因爲寫操作無法並行化,但這不會導致崩潰。在你的編碼器中調用了什麼'readString()'? –

回答

1

我沒有聲張理解,但解決像下面這樣的問題:

but went OOM error while the file's size becomes huge(over 10million records, about 616.42 MB).

那是因爲我剛剛從應對較小的文件,使測試數據(僅20個記錄,小於1MB),在換句話說,這1000萬條數據只有20個密鑰。 所以我改變了另一個有很多密鑰的測試數據(不是太多的重複數據)。

還有,我跟着肯·諾爾斯建議,讓數據流進行管理的工作,例如全自動去除下面的代碼:

withoutSharding() 
options.setWorkerMachineType("n1-highmem-4"); 

Finnaly數據流的工作效果很好(MachineType全自動使用N1-STANDARD-1 )!

有關數據流的【動態再平衡工作進一步信息可以發現如下: https://cloud.google.com/dataflow/service/dataflow-service-desc#Autotuning