Merge -c 1543456 from trunk to branch-2 to fix HADOOP-10047. Add a direct-buffer based apis for compression. Contributed by Gopal V.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1543457 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
86fd96080d
commit
9ff2130eeb
|
@ -97,6 +97,9 @@ Release 2.3.0 - UNRELEASED
|
||||||
|
|
||||||
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
|
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
|
||||||
|
|
||||||
|
HADOOP-10047. Add a direct-buffer based apis for compression. (Gopal V via
|
||||||
|
acmurthy)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
HADOOP-9964. Fix deadlocks in TestHttpServer by synchronize
|
HADOOP-9964. Fix deadlocks in TestHttpServer by synchronize
|
||||||
|
|
|
@ -0,0 +1,69 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.io.compress;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public interface DirectCompressor extends Compressor {
|
||||||
|
/**
|
||||||
|
* Example usage
|
||||||
|
* <pre> {@code
|
||||||
|
* private void compress(DirectCompressor comp, ByteBufferProducer in, ByteBufferConsumer out) throws IOException {
|
||||||
|
* ByteBuffer outBB = ByteBuffer.allocateDirect(64*1024);
|
||||||
|
* outBB.clear();
|
||||||
|
* // returns inBB.remaining() > 0 || inBB == null
|
||||||
|
* // if you do a inBB.put(), remember to do a inBB.flip()
|
||||||
|
* ByteBuffer inBB = in.get();
|
||||||
|
* while(!comp.finished()) {
|
||||||
|
* comp.compress(outBB, inBB);
|
||||||
|
* if(outBB.remaining() == 0) {
|
||||||
|
* // flush when the buffer only when it is full
|
||||||
|
* outBB.flip();
|
||||||
|
* // has to consume the buffer, because it is reused
|
||||||
|
* out.put(outBB);
|
||||||
|
* outBB.clear();
|
||||||
|
* }
|
||||||
|
* if(inBB != null && inBB.remaining() == 0) {
|
||||||
|
* inBB = in.get();
|
||||||
|
* if(inBB == null) {
|
||||||
|
* // EOF
|
||||||
|
* comp.finish();
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
*
|
||||||
|
* if(outBB.position() > 0) {
|
||||||
|
* outBB.flip();
|
||||||
|
* out.put(outBB);
|
||||||
|
* outBB.clear();
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
* } </pre>
|
||||||
|
* @param dst Destination {@link ByteBuffer} for storing the results into. Requires dst.remaining() to be > 0
|
||||||
|
* @param src Source {@link ByteBuffer} for reading from. This can be null or src.remaining() > 0
|
||||||
|
* @return bytes stored into dst
|
||||||
|
* @throws IOException if compression fails
|
||||||
|
*/
|
||||||
|
public int compress(ByteBuffer dst, ByteBuffer src) throws IOException;
|
||||||
|
}
|
|
@ -0,0 +1,71 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.io.compress;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public interface DirectDecompressor extends Decompressor {
|
||||||
|
/**
|
||||||
|
* Example usage
|
||||||
|
*
|
||||||
|
* <pre>{@code
|
||||||
|
* private void decompress(DirectDecompressor decomp, ByteBufferProducer in, ByteBufferConsumer out) throws IOException {
|
||||||
|
* ByteBuffer outBB = ByteBuffer.allocate(64*1024);
|
||||||
|
* outBB.clear();
|
||||||
|
* // returns inBB.remaining() > 0 || inBB == null
|
||||||
|
* // if you do a inBB.put(), remember to do a inBB.flip()
|
||||||
|
* ByteBuffer inBB = in.get();
|
||||||
|
* if(inBB == null) {
|
||||||
|
* // no data at all?
|
||||||
|
* }
|
||||||
|
* while(!decomp.finished()) {
|
||||||
|
* decomp.decompress(outBB, inBB);
|
||||||
|
* if(outBB.remaining() == 0) {
|
||||||
|
* // flush when the buffer is full
|
||||||
|
* outBB.flip();
|
||||||
|
* // has to consume the buffer, because it is reused
|
||||||
|
* out.put(outBB);
|
||||||
|
* outBB.clear();
|
||||||
|
* }
|
||||||
|
* if(inBB != null && inBB.remaining() == 0) {
|
||||||
|
* // inBB = null for EOF
|
||||||
|
* inBB = in.get();
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
*
|
||||||
|
* if(outBB.position() > 0) {
|
||||||
|
* outBB.flip();
|
||||||
|
* out.put(outBB);
|
||||||
|
* outBB.clear();
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
* }</pre>
|
||||||
|
* @param dst Destination {@link ByteBuffer} for storing the results into. Requires dst.remaining() to be > 0
|
||||||
|
* @param src Source {@link ByteBuffer} for reading from. This can be null or src.remaining() > 0
|
||||||
|
* @return bytes stored into dst (dst.postion += more)
|
||||||
|
* @throws IOException if compression fails
|
||||||
|
*/
|
||||||
|
public int decompress(ByteBuffer dst, ByteBuffer src) throws IOException;
|
||||||
|
}
|
|
@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.compress.Compressor;
|
import org.apache.hadoop.io.compress.Compressor;
|
||||||
|
import org.apache.hadoop.io.compress.DirectCompressor;
|
||||||
import org.apache.hadoop.util.NativeCodeLoader;
|
import org.apache.hadoop.util.NativeCodeLoader;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -35,7 +36,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
* http://www.zlib.net/
|
* http://www.zlib.net/
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class ZlibCompressor implements Compressor {
|
public class ZlibCompressor implements Compressor,DirectCompressor {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(ZlibCompressor.class);
|
private static final Log LOG = LogFactory.getLog(ZlibCompressor.class);
|
||||||
|
|
||||||
|
@ -420,6 +421,7 @@ public class ZlibCompressor implements Compressor {
|
||||||
compressedDirectBuf.limit(directBufferSize);
|
compressedDirectBuf.limit(directBufferSize);
|
||||||
compressedDirectBuf.position(directBufferSize);
|
compressedDirectBuf.position(directBufferSize);
|
||||||
userBufOff = userBufLen = 0;
|
userBufOff = userBufLen = 0;
|
||||||
|
userBuf = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -435,6 +437,110 @@ public class ZlibCompressor implements Compressor {
|
||||||
throw new NullPointerException();
|
throw new NullPointerException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int put(ByteBuffer dst, ByteBuffer src) {
|
||||||
|
// this will lop off data from src[pos:limit] into dst[pos:limit]
|
||||||
|
int l1 = src.remaining();
|
||||||
|
int l2 = dst.remaining();
|
||||||
|
int pos1 = src.position();
|
||||||
|
int pos2 = dst.position();
|
||||||
|
int len = Math.min(l1, l2);
|
||||||
|
|
||||||
|
if (len == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteBuffer slice = src.slice();
|
||||||
|
slice.limit(len);
|
||||||
|
dst.put(slice);
|
||||||
|
src.position(pos1 + len);
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int compress(ByteBuffer dst, ByteBuffer src) throws IOException {
|
||||||
|
assert dst.remaining() > 0 : "dst.remaining() == 0";
|
||||||
|
int n = 0;
|
||||||
|
|
||||||
|
/* fast path for clean state and direct buffers */
|
||||||
|
/* TODO: reset should free userBuf? */
|
||||||
|
if((src != null && src.isDirect()) && dst.isDirect() && userBuf == null) {
|
||||||
|
/*
|
||||||
|
* TODO: fix these assumptions in inflateDirect(), eventually by allowing
|
||||||
|
* it to read position()/limit() directly
|
||||||
|
*/
|
||||||
|
boolean cleanDst = (dst.position() == 0 && dst.remaining() == dst.capacity() && dst.capacity() >= directBufferSize);
|
||||||
|
boolean cleanState = (keepUncompressedBuf == false && uncompressedDirectBufLen == 0 && compressedDirectBuf.remaining() == 0);
|
||||||
|
/* use the buffers directly */
|
||||||
|
if(cleanDst && cleanState) {
|
||||||
|
Buffer originalCompressed = compressedDirectBuf;
|
||||||
|
Buffer originalUncompressed = uncompressedDirectBuf;
|
||||||
|
int originalBufferSize = directBufferSize;
|
||||||
|
uncompressedDirectBuf = src;
|
||||||
|
uncompressedDirectBufOff = src.position();
|
||||||
|
uncompressedDirectBufLen = src.remaining();
|
||||||
|
compressedDirectBuf = dst;
|
||||||
|
directBufferSize = dst.remaining();
|
||||||
|
// Compress data
|
||||||
|
n = deflateBytesDirect();
|
||||||
|
// we move dst.position() forward, not limit()
|
||||||
|
// unlike the local buffer case, which moves it when we put() into the dst
|
||||||
|
dst.position(n);
|
||||||
|
if(uncompressedDirectBufLen > 0) {
|
||||||
|
src.position(uncompressedDirectBufOff);
|
||||||
|
} else {
|
||||||
|
src.position(src.limit());
|
||||||
|
}
|
||||||
|
compressedDirectBuf = originalCompressed;
|
||||||
|
uncompressedDirectBuf = originalUncompressed;
|
||||||
|
uncompressedDirectBufOff = 0;
|
||||||
|
uncompressedDirectBufLen = 0;
|
||||||
|
directBufferSize = originalBufferSize;
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if there is compressed data
|
||||||
|
if (compressedDirectBuf.remaining() > 0) {
|
||||||
|
n = put(dst, (ByteBuffer) compressedDirectBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dst.remaining() == 0) {
|
||||||
|
return n;
|
||||||
|
} else {
|
||||||
|
needsInput();
|
||||||
|
|
||||||
|
// if we have drained userBuf, read from src (ideally, do not mix buffer
|
||||||
|
// modes, but sometimes you can)
|
||||||
|
if (userBufLen == 0 && src != null && src.remaining() > 0) {
|
||||||
|
put((ByteBuffer) uncompressedDirectBuf, src);
|
||||||
|
uncompressedDirectBufLen = uncompressedDirectBuf.position();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-initialize the zlib's output direct buffer
|
||||||
|
compressedDirectBuf.rewind();
|
||||||
|
compressedDirectBuf.limit(directBufferSize);
|
||||||
|
|
||||||
|
// Compress data
|
||||||
|
int more = deflateBytesDirect();
|
||||||
|
|
||||||
|
compressedDirectBuf.limit(more);
|
||||||
|
|
||||||
|
// Check if zlib consumed all input buffer
|
||||||
|
// set keepUncompressedBuf properly
|
||||||
|
if (uncompressedDirectBufLen <= 0) { // zlib consumed all input buffer
|
||||||
|
keepUncompressedBuf = false;
|
||||||
|
uncompressedDirectBuf.clear();
|
||||||
|
uncompressedDirectBufOff = 0;
|
||||||
|
uncompressedDirectBufLen = 0;
|
||||||
|
} else { // zlib did not consume all input buffer
|
||||||
|
keepUncompressedBuf = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// fill the dst buffer from compressedDirectBuf
|
||||||
|
int fill = put(dst, ((ByteBuffer) compressedDirectBuf));
|
||||||
|
return n + fill;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private native static void initIDs();
|
private native static void initIDs();
|
||||||
private native static long init(int level, int strategy, int windowBits);
|
private native static long init(int level, int strategy, int windowBits);
|
||||||
private native static void setDictionary(long strm, byte[] b, int off,
|
private native static void setDictionary(long strm, byte[] b, int off,
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.nio.Buffer;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
import org.apache.hadoop.io.compress.Decompressor;
|
import org.apache.hadoop.io.compress.Decompressor;
|
||||||
|
import org.apache.hadoop.io.compress.DirectDecompressor;
|
||||||
import org.apache.hadoop.util.NativeCodeLoader;
|
import org.apache.hadoop.util.NativeCodeLoader;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -31,7 +32,7 @@ import org.apache.hadoop.util.NativeCodeLoader;
|
||||||
* http://www.zlib.net/
|
* http://www.zlib.net/
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class ZlibDecompressor implements Decompressor {
|
public class ZlibDecompressor implements Decompressor,DirectDecompressor {
|
||||||
private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
|
private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
|
||||||
|
|
||||||
// HACK - Use this as a global lock in the JNI layer
|
// HACK - Use this as a global lock in the JNI layer
|
||||||
|
@ -280,6 +281,7 @@ public class ZlibDecompressor implements Decompressor {
|
||||||
uncompressedDirectBuf.limit(directBufferSize);
|
uncompressedDirectBuf.limit(directBufferSize);
|
||||||
uncompressedDirectBuf.position(directBufferSize);
|
uncompressedDirectBuf.position(directBufferSize);
|
||||||
userBufOff = userBufLen = 0;
|
userBufOff = userBufLen = 0;
|
||||||
|
userBuf = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -299,6 +301,108 @@ public class ZlibDecompressor implements Decompressor {
|
||||||
if (stream == 0)
|
if (stream == 0)
|
||||||
throw new NullPointerException();
|
throw new NullPointerException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int put(ByteBuffer dst, ByteBuffer src) {
|
||||||
|
// this will lop off data from src[pos:limit] into dst[pos:limit], using the
|
||||||
|
// min() of both remaining()
|
||||||
|
int l1 = src.remaining();
|
||||||
|
int l2 = dst.remaining();
|
||||||
|
int pos1 = src.position();
|
||||||
|
int pos2 = dst.position();
|
||||||
|
int len = Math.min(l1, l2);
|
||||||
|
|
||||||
|
if (len == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteBuffer slice = src.slice();
|
||||||
|
slice.limit(len);
|
||||||
|
dst.put(slice);
|
||||||
|
src.position(pos1 + len);
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int decompress(ByteBuffer dst, ByteBuffer src) throws IOException {
|
||||||
|
assert dst.remaining() > 0 : "dst.remaining == 0";
|
||||||
|
int n = 0;
|
||||||
|
|
||||||
|
/* fast path for clean state and direct buffers */
|
||||||
|
if((src != null && src.isDirect()) && dst.isDirect() && userBuf == null) {
|
||||||
|
/*
|
||||||
|
* TODO: fix these assumptions in inflateDirect(), eventually by allowing
|
||||||
|
* it to read position()/limit() directly
|
||||||
|
*/
|
||||||
|
boolean cleanDst = (dst.position() == 0 && dst.remaining() == dst.capacity() && dst.remaining() >= directBufferSize);
|
||||||
|
boolean cleanState = (compressedDirectBufLen == 0 && uncompressedDirectBuf.remaining() == 0);
|
||||||
|
/* use the buffers directly */
|
||||||
|
if(cleanDst && cleanState) {
|
||||||
|
Buffer originalCompressed = compressedDirectBuf;
|
||||||
|
Buffer originalUncompressed = uncompressedDirectBuf;
|
||||||
|
int originalBufferSize = directBufferSize;
|
||||||
|
compressedDirectBuf = src;
|
||||||
|
compressedDirectBufOff = src.position();
|
||||||
|
compressedDirectBufLen = src.remaining();
|
||||||
|
uncompressedDirectBuf = dst;
|
||||||
|
directBufferSize = dst.remaining();
|
||||||
|
// Compress data
|
||||||
|
n = inflateBytesDirect();
|
||||||
|
dst.position(n);
|
||||||
|
if(compressedDirectBufLen > 0) {
|
||||||
|
src.position(compressedDirectBufOff);
|
||||||
|
} else {
|
||||||
|
src.position(src.limit());
|
||||||
|
}
|
||||||
|
compressedDirectBuf = originalCompressed;
|
||||||
|
uncompressedDirectBuf = originalUncompressed;
|
||||||
|
compressedDirectBufOff = 0;
|
||||||
|
compressedDirectBufLen = 0;
|
||||||
|
directBufferSize = originalBufferSize;
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if there is compressed data
|
||||||
|
if (uncompressedDirectBuf.remaining() > 0) {
|
||||||
|
n = put(dst, (ByteBuffer) uncompressedDirectBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dst.remaining() == 0) {
|
||||||
|
return n;
|
||||||
|
} else {
|
||||||
|
if (needsInput()) {
|
||||||
|
// this does not update buffers if we have no userBuf
|
||||||
|
if (userBufLen <= 0) {
|
||||||
|
compressedDirectBufOff = 0;
|
||||||
|
compressedDirectBufLen = 0;
|
||||||
|
compressedDirectBuf.rewind().limit(directBufferSize);
|
||||||
|
}
|
||||||
|
if (src != null) {
|
||||||
|
assert src.remaining() > 0 : "src.remaining() == 0";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we have drained userBuf, read from src (ideally, do not mix buffer
|
||||||
|
// modes, but sometimes you can)
|
||||||
|
if (userBufLen == 0 && src != null && src.remaining() > 0) {
|
||||||
|
compressedDirectBufLen += put(((ByteBuffer) compressedDirectBuf), src);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-initialize the zlib's output direct buffer
|
||||||
|
uncompressedDirectBuf.rewind();
|
||||||
|
uncompressedDirectBuf.limit(directBufferSize);
|
||||||
|
|
||||||
|
// Compress data
|
||||||
|
int more = inflateBytesDirect();
|
||||||
|
|
||||||
|
uncompressedDirectBuf.limit(more);
|
||||||
|
|
||||||
|
// Get atmost 'len' bytes
|
||||||
|
int fill = put(dst, ((ByteBuffer) uncompressedDirectBuf));
|
||||||
|
return n + fill;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private native static void initIDs();
|
private native static void initIDs();
|
||||||
private native static long init(int windowBits);
|
private native static long init(int windowBits);
|
||||||
|
|
|
@ -19,8 +19,13 @@ package org.apache.hadoop.io.compress.zlib;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
import static org.junit.Assume.*;
|
import static org.junit.Assume.*;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.Console;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -33,8 +38,12 @@ import org.apache.hadoop.io.compress.DecompressorStream;
|
||||||
import org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStrategy;
|
import org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStrategy;
|
||||||
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
|
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
|
||||||
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
|
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
|
||||||
|
import org.apache.log4j.ConsoleAppender;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import sun.util.logging.resources.logging;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
|
||||||
public class TestZlibCompressorDecompressor {
|
public class TestZlibCompressorDecompressor {
|
||||||
|
@ -150,6 +159,149 @@ public class TestZlibCompressorDecompressor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void compressDecompressLoop(int rawDataSize, int inSize, int outSize)
|
||||||
|
throws IOException {
|
||||||
|
byte[] rawData = null;
|
||||||
|
rawData = generate(rawDataSize);
|
||||||
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
ByteBuffer inBuf = ByteBuffer.allocateDirect(inSize);
|
||||||
|
ByteBuffer outBuf = ByteBuffer.allocateDirect(outSize);
|
||||||
|
ZlibCompressor compressor = new ZlibCompressor();
|
||||||
|
ZlibDecompressor decompressor = new ZlibDecompressor();
|
||||||
|
outBuf.clear();
|
||||||
|
/* compression loop */
|
||||||
|
int off = 0;
|
||||||
|
int len = rawDataSize;
|
||||||
|
int min = Math.min(inBuf.remaining(), len);
|
||||||
|
if (min > 0) {
|
||||||
|
inBuf.put(rawData, off, min);
|
||||||
|
}
|
||||||
|
inBuf.flip();
|
||||||
|
len -= min;
|
||||||
|
off += min;
|
||||||
|
while (!compressor.finished()) {
|
||||||
|
compressor.compress(outBuf, inBuf);
|
||||||
|
if (outBuf.remaining() == 0) {
|
||||||
|
// flush when the buffer is full
|
||||||
|
outBuf.flip();
|
||||||
|
while (outBuf.remaining() > 0) {
|
||||||
|
baos.write(outBuf.get());
|
||||||
|
}
|
||||||
|
outBuf.clear();
|
||||||
|
}
|
||||||
|
if (inBuf != null && inBuf.remaining() == 0) {
|
||||||
|
inBuf.clear();
|
||||||
|
if (len > 0) {
|
||||||
|
min = Math.min(inBuf.remaining(), len);
|
||||||
|
inBuf.put(rawData, off, min);
|
||||||
|
inBuf.flip();
|
||||||
|
len -= min;
|
||||||
|
off += min;
|
||||||
|
} else {
|
||||||
|
inBuf = null;
|
||||||
|
compressor.finish();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
outBuf.flip();
|
||||||
|
if (outBuf.remaining() > 0) {
|
||||||
|
while (outBuf.remaining() > 0) {
|
||||||
|
baos.write(outBuf.get());
|
||||||
|
}
|
||||||
|
outBuf.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
compressor.end();
|
||||||
|
|
||||||
|
byte[] compressed = baos.toByteArray();
|
||||||
|
ByteBuffer expected = ByteBuffer.wrap(rawData);
|
||||||
|
outBuf.clear();
|
||||||
|
inBuf = ByteBuffer.allocateDirect(inSize);
|
||||||
|
inBuf.clear();
|
||||||
|
|
||||||
|
// zlib always has header
|
||||||
|
if (compressed.length != 0) {
|
||||||
|
off = 0;
|
||||||
|
len = compressed.length;
|
||||||
|
min = Math.min(inBuf.remaining(), len);
|
||||||
|
inBuf.put(compressed, off, min);
|
||||||
|
inBuf.flip();
|
||||||
|
len -= min;
|
||||||
|
off += min;
|
||||||
|
while (!decompressor.finished()) {
|
||||||
|
decompressor.decompress(outBuf, inBuf);
|
||||||
|
if (outBuf.remaining() == 0) {
|
||||||
|
outBuf.flip();
|
||||||
|
while (outBuf.remaining() > 0) {
|
||||||
|
assertEquals(expected.get(), outBuf.get());
|
||||||
|
}
|
||||||
|
outBuf.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (inBuf != null && inBuf.remaining() == 0) {
|
||||||
|
inBuf.clear();
|
||||||
|
if (len > 0) {
|
||||||
|
min = Math.min(inBuf.remaining(), len);
|
||||||
|
inBuf.put(compressed, off, min);
|
||||||
|
inBuf.flip();
|
||||||
|
len -= min;
|
||||||
|
off += min;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
outBuf.flip();
|
||||||
|
if (outBuf.remaining() > 0) {
|
||||||
|
while (outBuf.remaining() > 0) {
|
||||||
|
assertEquals(expected.get(), outBuf.get());
|
||||||
|
}
|
||||||
|
outBuf.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(0, expected.remaining());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testZlibDirectCompressDecompress() {
|
||||||
|
int[] size = { 4, 16, 4 * 1024, 64 * 1024, 128 * 1024, 256 * 1024,
|
||||||
|
1024 * 1024 };
|
||||||
|
try {
|
||||||
|
// 0-2 bytes results in sizeof(outBuf) > sizeof(inBuf)
|
||||||
|
compressDecompressLoop(0, 4096, 4096);
|
||||||
|
compressDecompressLoop(0, 1, 1);
|
||||||
|
compressDecompressLoop(1, 1, 2);
|
||||||
|
compressDecompressLoop(1, 2, 1);
|
||||||
|
compressDecompressLoop(2, 3, 2);
|
||||||
|
|
||||||
|
for (int i = 0; i < size.length; i++) {
|
||||||
|
compressDecompressLoop(size[i], 4096, 4096);
|
||||||
|
compressDecompressLoop(size[i], 1, 1);
|
||||||
|
compressDecompressLoop(size[i], 1, 2);
|
||||||
|
compressDecompressLoop(size[i], 2, 1);
|
||||||
|
compressDecompressLoop(size[i], 3, 2);
|
||||||
|
compressDecompressLoop(size[i], size[i], 4096);
|
||||||
|
compressDecompressLoop(size[i], size[i] - 1, 4096);
|
||||||
|
compressDecompressLoop(size[i], size[i] + 1, 4096);
|
||||||
|
compressDecompressLoop(size[i], 4096, size[i]);
|
||||||
|
compressDecompressLoop(size[i], 4096, size[i] - 1);
|
||||||
|
compressDecompressLoop(size[i], 4096, size[i] + 1);
|
||||||
|
compressDecompressLoop(size[i], size[i] - 1, size[i] - 1);
|
||||||
|
|
||||||
|
compressDecompressLoop(size[i], size[i] / 2, 4096);
|
||||||
|
compressDecompressLoop(size[i], size[i] / 2 - 1, 4096);
|
||||||
|
compressDecompressLoop(size[i], size[i] / 2 + 1, 4096);
|
||||||
|
compressDecompressLoop(size[i], 4096, size[i] / 2);
|
||||||
|
compressDecompressLoop(size[i], 4096, size[i] / 2 - 1);
|
||||||
|
compressDecompressLoop(size[i], 4096, size[i] / 2 + 1);
|
||||||
|
compressDecompressLoop(size[i], size[i] / 2 - 1, size[i] / 2 - 1);
|
||||||
|
}
|
||||||
|
} catch (IOException ex) {
|
||||||
|
fail("testZlibDirectCompressDecompress ex !!!" + ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testZlibCompressorDecompressorSetDictionary() {
|
public void testZlibCompressorDecompressorSetDictionary() {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
|
Loading…
Reference in New Issue