2017-04-08 121 views
2

我的Java程序實現了一個服務器,該服務器應該從客戶端通過websockets獲取一個使用gzip壓縮的非常大的文件,並檢查文件內容中的某些字節模式。Java按順序解壓縮GZIP流

客戶端發送嵌入專有協議內的文件塊,以便在客戶端收到消息後解析消息並提取gzip文件內容。

我無法在程序存儲器中保存整個文件,所以我試圖解壓每個塊,處理數據並繼續到下一個塊。

我用下面的代碼:

public static String gzipDecompress(byte[] compressed) throws IOException { 
    String uncompressed; 
    try (
     ByteArrayInputStream bis = new ByteArrayInputStream(compressed); 
     GZIPInputStream gis = new GZIPInputStream(bis); 
     Reader reader = new InputStreamReader(gis); 
     Writer writer = new StringWriter() 
    ) { 

     char[] buffer = new char[10240]; 
     for (int length = 0; (length = reader.read(buffer)) > 0;) { 
     writer.write(buffer, 0, length); 
     } 
     uncompressed = writer.toString(); 
    } 

    return uncompressed; 
    } 

但調用與第一壓縮塊的功能時,我發現了以下異常:

java.io.EOFException: Unexpected end of ZLIB input stream 
    at java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:240) 
    at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:158) 
    at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117) 
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) 
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) 
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) 
    at java.io.InputStreamReader.read(InputStreamReader.java:184) 
    at java.io.Reader.read(Reader.java:140) 

重要的是要提的是我很重要不會跳過任何塊並嘗試按順序解壓塊。

我錯過了什麼?

+1

目前尚不清楚這些數據是從哪裏開始的。您應該創建一個讀取* all *數據的流,並將其包裝在GZipInputStream中。它不需要在內存中擁有所有數據*,但它應該是單個流。 –

回答

2

問題是你用這些塊手動玩。

正確的方法是獲得一些InputStream,用GZIPInputStream包裝它,然後讀取數據。

InputStream is = // obtain the original gzip stream 

    GZIPInputStream gis = new GZIPInputStream(is); 
    Reader reader = new InputStreamReader(gis); 

    //... proceed reading and so on 

GZIPInputStream作品流的方式,所以如果你一次只能從reader問10KB,整體內存佔用會很低,無論最初的GZIP文件的大小。這個問題是更新

更新後,您的情況可能的解決方案是寫一個InputStream實現,流正在由客戶端協議處理程序向它提出的塊字節。

這裏是一個原型:

public class ProtocolDataInputStream extends InputStream { 
    private BlockingQueue<byte[]> nextChunks = new ArrayBlockingQueue<byte[]>(100); 
    private byte[] currentChunk = null; 
    private int currentChunkOffset = 0; 
    private boolean noMoreChunks = false; 

    @Override 
    public synchronized int read() throws IOException { 
     boolean takeNextChunk = currentChunk == null || currentChunkOffset >= currentChunk.length; 
     if (takeNextChunk) { 
      if (noMoreChunks) { 
       // stream is exhausted 
       return -1; 
      } else { 
       currentChunk = nextChunks.take(); 
       currentChunkOffset = 0; 
      } 
     } 
     return currentChunk[currentChunkOffset++]; 
    } 

    @Override 
    public synchronized int available() throws IOException { 
     if (currentChunk == null) { 
      return 0; 
     } else { 
      return currentChunk.length - currentChunkOffset; 
     } 
    } 

    public synchronized void addChunk(byte[] chunk, boolean chunkIsLast) { 
     nextChunks.add(chunk); 
     if (chunkIsLast) { 
      noMoreChunks = true; 
     } 
    } 
} 

您的客戶端協議處理程序使用addChunk(),而你的解壓碼翻出此流的數據增加了字節塊(通過Reader)。

請注意,此代碼有一些問題:

  1. 正在使用的隊列的大小有限。如果太頻繁地呼叫addChunk(),則可能會填充隊列,這將阻止addChunk()。這可能是合意的或不合適的。
  2. 只有read()方法用於說明目的。爲了性能,最好以相同的方式實施read(byte[])
  3. 在讀者(解壓縮程序)和寫入程序(協議處理程序調用addChunk())是不同線程的假設下使用保守同步。
  4. InterruptedException未在take()上處理,以避免太多細節。

如果你的解壓縮和addChunk()在同一個線程(在同一迴路)執行,那麼你可以嘗試使用InputStream.available()方法使用InputStreamReader.ready()拉着一個Reader拉動時,時。

+1

無法使用ByteArrayInputStream或其他InputStream將字節數組作爲InputStream傳遞給GZIPInputStream?在我的情況下,我無法真正使用從服務器獲取數據的原始InputSteam。 – Eldad

+0

爲什麼不能使用原始的'InputStream'?用我知道的字節來提供'GZIPInputStream'的唯一安全方法是首先將所有字節讀入內存,這不是您想要的大文件。 –

+0

我添加了詳細信息以更好地描述情況,我得到嵌入專有協議內的文件塊,以便我的InputStream獲取完整的協議消息,解析它,然後從中提取文件塊,然後才能解壓縮塊,I不要控制客戶端,也不知道下一個包含下一個文件塊的消息何時到達。感謝和抱歉的描述不好。 – Eldad

0

來自gzipped流的任意字節序列不是有效的獨立gzip數據。不管怎樣,你必須連接所有的字節塊。

最簡單的方法是爲了積累他們都用一個簡單的管道:

import java.io.PipedOutputStream; 
import java.io.IOException; 
import java.util.zip.GZIPInputStream; 

public class ChunkInflater { 
    private final PipedOutputStream pipe; 

    private final InputStream stream; 

    public ChunkInflater() 
    throws IOException { 
     pipe = new PipedOutputStream(); 
     stream = new GZIPInputStream(new PipedInputStream(pipe)); 
    } 

    public InputStream getInputStream() { 
     return stream; 
    } 

    public void addChunk(byte[] compressedChunk) 
    throws IOException { 
     pipe.write(compressedChunk); 
    } 
} 

現在你有,你可以在你想要的任何單位讀取的InputStream。例如:

ChunkInflater inflater = new ChunkInflater(); 

Callable<Void> chunkReader = new Callable<Void>() { 
    @Override 
    public Void call() 
    throws IOException { 
     byte[] chunk; 
     while ((chunk = readChunkFromSource()) != null) { 
      inflater.addChunk(chunk); 
     } 

     return null; 
    } 
}; 
ExecutorService executor = Executors.newSingleThreadExecutor(); 
executor.submit(chunkReader); 
executor.shutdown(); 

Reader reader = new InputStreamReader(inflater.getInputStream()); 
// read text here