2016-08-23 111 views
0

我在HDFS上有一些tar文件。我的目標是提取HDFS上存儲的提取文件的這些文件&。獲取異常:提取tar文件時IOException輸入緩衝區關閉異常

例如:

這是我的輸入的目錄結構(HDFS)。

Path : /data/160823 --> 
-------- 
| 160823 | 
-------- 
    | 
    | --- 00 
     |----- xyz.tar 
     |----- xyz2.tar 

    | --- 01 
     |----- xyz3.tar 
     |----- abc2.tar 

    | --- 02 
     |----- abc3.tar 
     |----- abc4.tar 

    . 
    . 
    . 
    --- 23 
     |----- pqr.tar 
     |----- pqr2.tar 

預計產出將是:

-------- 
| Output | 
-------- 
    | 
    |----- xyz.gz 
    |----- xyz2.gz 

我的代碼提取的焦油文件,並將這些文件存儲到HDFS上的路徑。

所以我能夠提取第一個.tar文件&也能夠在HDFS上存儲輸出,但是在讀取下一個.tar文件之後,我得到了這個異常。

java.io.IOException: input buffer is closed 
    at org.apache.commons.compress.archivers.tar.TarBuffer.readRecord(TarBuffer.java:190) 
    at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getRecord(TarArchiveInputStream.java:302) 
    at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getNextTarEntry(TarArchiveInputStream.java:230) 
    at com.lsr.TarMapper.call(TarMapper.java:53) 
    at com.lsr.TarMapper.call(TarMapper.java:1) 
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129) 
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

這是我的代碼片段,

import java.util.ArrayList; 
import java.util.List; 
import java.io.File; 
import java.io.FileOutputStream; 
import java.io.OutputStream; 
import java.net.URI; 
import org.apache.commons.compress.archivers.tar.TarArchiveEntry; 
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileStatus; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IOUtils; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import com.utils.FileWrapper; 

public class TarMapper implements FlatMapFunction<String, String>{ 

    public Iterable<String> call(String arg0) throws Exception { 
     System.out.println("Arg0 : "+arg0); 
     List<String> untarFile = new ArrayList<String>(); 
     FileSystem fileSystem = LTar.fs; 
     FSDataInputStream fsin = null; 
     TarArchiveInputStream tarin = null; 
     OutputStream outstr = null; 
     TarArchiveEntry tarentry = null; 
     FSDataOutputStream fsDataOutputStream = null; 
     Path outputPath = null; 
     try{ 
      fileSystem = FileSystem.get(LTar.conf); 
      fsin = fileSystem.open(new Path(arg0)); 
      tarin = new TarArchiveInputStream(fsin); 
      tarentry = tarin.getNextTarEntry(); 
      while (tarentry != null) { 
       if (!tarentry.isDirectory()) { 
        System.out.println("TAR ENTRY : "+tarentry); 
        outputPath = new Path("/data/tar/"+tarentry.getName().substring(2)); 
        fsDataOutputStream = fileSystem.create(outputPath); 
        System.out.println("Name : "+tarentry.getName()+"Other : "); 
        IOUtils.copyBytes(tarin, fsDataOutputStream, LTar.conf); 
       } 
       tarentry = tarin.getNextTarEntry(); 
      } 
     }catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      if (tarin != null) { 
       tarin.close(); 
      } 
      if (fsin != null) { 
       fsin.close(); 
      } 
      if (fileSystem != null) { 
       fileSystem.close(); 
      } 
      if(outstr !=null){ 
       outstr.close(); 
      } 
      if(fsDataOutputStream != null){ 
       fsDataOutputStream.close(); 
      } 
     } 
     return untarFile; 
    } 
} 

請你對這個問題的建議。

回答

1

您正在調用copyBytes()的超負荷將在複製結束時關閉輸入流。

使用另一個。

+0

它的工作原理!感謝@EJP,我通過IOUtils包的文檔找到了另一個帶有布爾參數的copyBytes()函數,藉此我可以提取所有.tar文件。這是語法。 copyBytes(InputStream in,OutputStream out,Configuration conf,boolean close) –