HADOOP-6683. ZlibCompressor does not fully utilize the buffer. Contributed by Kang Xiao

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1037901 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2010-11-22 21:51:57 +00:00
parent 82c5214ab1
commit 8bd9dd0f33
2 changed files with 33 additions and 15 deletions

View File

@ -212,6 +212,9 @@ Release 0.22.0 - Unreleased
HADOOP-6884. Add LOG.isDebugEnabled() guard for each LOG.debug(..).
(Erik Steffl via szetszwo)
HADOOP-6683. ZlibCompressor does not fully utilize the buffer.
(Kang Xiao via eli)
BUG FIXES
HADOOP-6638. try to relogin in a case of failed RPC connection (expired

View File

@ -53,6 +53,7 @@ public class ZlibCompressor implements Compressor {
private int userBufOff = 0, userBufLen = 0;
private Buffer uncompressedDirectBuf = null;
private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0;
private boolean keepUncompressedBuf = false;
private Buffer compressedDirectBuf = null;
private boolean finish, finished;
@ -269,6 +270,7 @@ public synchronized void setInput(byte[] b, int off, int len) {
this.userBuf = b;
this.userBufOff = off;
this.userBufLen = len;
uncompressedDirectBufOff = 0;
setInputFromSavedData();
// Reinitialize zlib's output direct buffer
@ -276,21 +278,13 @@ public synchronized void setInput(byte[] b, int off, int len) {
compressedDirectBuf.position(directBufferSize);
}
//copy enough data from userBuf to uncompressedDirectBuf
synchronized void setInputFromSavedData() {
uncompressedDirectBufOff = 0;
uncompressedDirectBufLen = userBufLen;
if (uncompressedDirectBufLen > directBufferSize) {
uncompressedDirectBufLen = directBufferSize;
}
// Reinitialize zlib's input direct buffer
uncompressedDirectBuf.rewind();
((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff,
uncompressedDirectBufLen);
// Note how much data is being fed to zlib
userBufOff += uncompressedDirectBufLen;
userBufLen -= uncompressedDirectBufLen;
int len = Math.min(userBufLen, uncompressedDirectBuf.remaining());
((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff, len);
userBufLen -= len;
userBufOff += len;
uncompressedDirectBufLen = uncompressedDirectBuf.position();
}
public synchronized void setDictionary(byte[] b, int off, int len) {
@ -310,12 +304,21 @@ public synchronized boolean needsInput() {
}
// Check if zlib has consumed all input
if (uncompressedDirectBufLen <= 0) {
// compress should be invoked if keepUncompressedBuf true
if (keepUncompressedBuf && uncompressedDirectBufLen > 0)
return false;
if (uncompressedDirectBuf.remaining() > 0) {
// Check if we have consumed all user-input
if (userBufLen <= 0) {
return true;
} else {
// copy enough data from userBuf to uncompressedDirectBuf
setInputFromSavedData();
if (uncompressedDirectBuf.remaining() > 0) // uncompressedDirectBuf is not full
return true;
else
return false;
}
}
@ -359,6 +362,17 @@ public synchronized int compress(byte[] b, int off, int len)
n = deflateBytesDirect();
compressedDirectBuf.limit(n);
// Check if zlib consumed all input buffer
// set keepUncompressedBuf properly
if (uncompressedDirectBufLen <= 0) { // zlib consumed all input buffer
keepUncompressedBuf = false;
uncompressedDirectBuf.clear();
uncompressedDirectBufOff = 0;
uncompressedDirectBufLen = 0;
} else { // zlib did not consume all input buffer
keepUncompressedBuf = true;
}
// Get atmost 'len' bytes
n = Math.min(n, len);
((ByteBuffer)compressedDirectBuf).get(b, off, n);
@ -393,6 +407,7 @@ public synchronized void reset() {
finished = false;
uncompressedDirectBuf.rewind();
uncompressedDirectBufOff = uncompressedDirectBufLen = 0;
keepUncompressedBuf = false;
compressedDirectBuf.limit(directBufferSize);
compressedDirectBuf.position(directBufferSize);
userBufOff = userBufLen = 0;