0
我在HDFS上有一些tar文件。我的目標是提取HDFS上存儲的提取文件的這些文件&。獲取異常:提取tar文件時IOException輸入緩衝區關閉異常
例如:
這是我的輸入的目錄結構(HDFS)。
Path : /data/160823 -->
--------
| 160823 |
--------
|
| --- 00
|----- xyz.tar
|----- xyz2.tar
| --- 01
|----- xyz3.tar
|----- abc2.tar
| --- 02
|----- abc3.tar
|----- abc4.tar
.
.
.
--- 23
|----- pqr.tar
|----- pqr2.tar
預計產出將是:
--------
| Output |
--------
|
|----- xyz.gz
|----- xyz2.gz
我的代碼提取的焦油文件,並將這些文件存儲到HDFS上的路徑。
所以我能夠提取第一個.tar文件&也能夠在HDFS上存儲輸出,但是在讀取下一個.tar文件之後,我得到了這個異常。
java.io.IOException: input buffer is closed
at org.apache.commons.compress.archivers.tar.TarBuffer.readRecord(TarBuffer.java:190)
at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getRecord(TarArchiveInputStream.java:302)
at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getNextTarEntry(TarArchiveInputStream.java:230)
at com.lsr.TarMapper.call(TarMapper.java:53)
at com.lsr.TarMapper.call(TarMapper.java:1)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
這是我的代碼片段,
import java.util.ArrayList;
import java.util.List;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
import com.utils.FileWrapper;
public class TarMapper implements FlatMapFunction<String, String>{
public Iterable<String> call(String arg0) throws Exception {
System.out.println("Arg0 : "+arg0);
List<String> untarFile = new ArrayList<String>();
FileSystem fileSystem = LTar.fs;
FSDataInputStream fsin = null;
TarArchiveInputStream tarin = null;
OutputStream outstr = null;
TarArchiveEntry tarentry = null;
FSDataOutputStream fsDataOutputStream = null;
Path outputPath = null;
try{
fileSystem = FileSystem.get(LTar.conf);
fsin = fileSystem.open(new Path(arg0));
tarin = new TarArchiveInputStream(fsin);
tarentry = tarin.getNextTarEntry();
while (tarentry != null) {
if (!tarentry.isDirectory()) {
System.out.println("TAR ENTRY : "+tarentry);
outputPath = new Path("/data/tar/"+tarentry.getName().substring(2));
fsDataOutputStream = fileSystem.create(outputPath);
System.out.println("Name : "+tarentry.getName()+"Other : ");
IOUtils.copyBytes(tarin, fsDataOutputStream, LTar.conf);
}
tarentry = tarin.getNextTarEntry();
}
}catch (Exception e) {
e.printStackTrace();
} finally {
if (tarin != null) {
tarin.close();
}
if (fsin != null) {
fsin.close();
}
if (fileSystem != null) {
fileSystem.close();
}
if(outstr !=null){
outstr.close();
}
if(fsDataOutputStream != null){
fsDataOutputStream.close();
}
}
return untarFile;
}
}
請你對這個問題的建議。
它的工作原理!感謝@EJP,我通過IOUtils包的文檔找到了另一個帶有布爾參數的copyBytes()函數,藉此我可以提取所有.tar文件。這是語法。 copyBytes(InputStream in,OutputStream out,Configuration conf,boolean close) –