diff --git a/CHANGES.txt b/CHANGES.txt index 9fe67e3da7e..55d96b23b0b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -212,6 +212,9 @@ Release 0.22.0 - Unreleased HADOOP-6884. Add LOG.isDebugEnabled() guard for each LOG.debug(..). (Erik Steffl via szetszwo) + HADOOP-6683. ZlibCompressor does not fully utilize the buffer. + (Kang Xiao via eli) + BUG FIXES HADOOP-6638. try to relogin in a case of failed RPC connection (expired diff --git a/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java b/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java index 4cb10e6da62..8839bc98fa0 100644 --- a/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java +++ b/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java @@ -53,6 +53,7 @@ public class ZlibCompressor implements Compressor { private int userBufOff = 0, userBufLen = 0; private Buffer uncompressedDirectBuf = null; private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0; + private boolean keepUncompressedBuf = false; private Buffer compressedDirectBuf = null; private boolean finish, finished; @@ -269,6 +270,7 @@ public synchronized void setInput(byte[] b, int off, int len) { this.userBuf = b; this.userBufOff = off; this.userBufLen = len; + uncompressedDirectBufOff = 0; setInputFromSavedData(); // Reinitialize zlib's output direct buffer @@ -276,21 +278,13 @@ public synchronized void setInput(byte[] b, int off, int len) { compressedDirectBuf.position(directBufferSize); } + //copy enough data from userBuf to uncompressedDirectBuf synchronized void setInputFromSavedData() { - uncompressedDirectBufOff = 0; - uncompressedDirectBufLen = userBufLen; - if (uncompressedDirectBufLen > directBufferSize) { - uncompressedDirectBufLen = directBufferSize; - } - - // Reinitialize zlib's input direct buffer - uncompressedDirectBuf.rewind(); - ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff, - uncompressedDirectBufLen); - - // Note how much data is being fed to zlib - userBufOff += uncompressedDirectBufLen; - userBufLen -= uncompressedDirectBufLen; + int len = Math.min(userBufLen, uncompressedDirectBuf.remaining()); + ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff, len); + userBufLen -= len; + userBufOff += len; + uncompressedDirectBufLen = uncompressedDirectBuf.position(); } public synchronized void setDictionary(byte[] b, int off, int len) { @@ -310,12 +304,21 @@ public synchronized boolean needsInput() { } // Check if zlib has consumed all input - if (uncompressedDirectBufLen <= 0) { + // compress should be invoked if keepUncompressedBuf true + if (keepUncompressedBuf && uncompressedDirectBufLen > 0) + return false; + + if (uncompressedDirectBuf.remaining() > 0) { // Check if we have consumed all user-input if (userBufLen <= 0) { return true; } else { + // copy enough data from userBuf to uncompressedDirectBuf setInputFromSavedData(); + if (uncompressedDirectBuf.remaining() > 0) // uncompressedDirectBuf is not full + return true; + else + return false; } } @@ -359,6 +362,17 @@ public synchronized int compress(byte[] b, int off, int len) n = deflateBytesDirect(); compressedDirectBuf.limit(n); + // Check if zlib consumed all input buffer + // set keepUncompressedBuf properly + if (uncompressedDirectBufLen <= 0) { // zlib consumed all input buffer + keepUncompressedBuf = false; + uncompressedDirectBuf.clear(); + uncompressedDirectBufOff = 0; + uncompressedDirectBufLen = 0; + } else { // zlib did not consume all input buffer + keepUncompressedBuf = true; + } + // Get atmost 'len' bytes n = Math.min(n, len); ((ByteBuffer)compressedDirectBuf).get(b, off, n); @@ -393,6 +407,7 @@ public synchronized void reset() { finished = false; uncompressedDirectBuf.rewind(); uncompressedDirectBufOff = uncompressedDirectBufLen = 0; + keepUncompressedBuf = false; compressedDirectBuf.limit(directBufferSize); compressedDirectBuf.position(directBufferSize); userBufOff = userBufLen = 0;