HADOOP-10047. Add a direct-buffer based apis for compression. Contributed by Gopal V.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1543456 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2013-11-19 15:38:27 +00:00
parent 9ff520b7d6
commit b96cc8fdbf
6 changed files with 507 additions and 2 deletions

View File

@ -387,6 +387,9 @@ Release 2.3.0 - UNRELEASED
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn) HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
HADOOP-10047. Add a direct-buffer based apis for compression. (Gopal V via
acmurthy)
BUG FIXES BUG FIXES
HADOOP-9964. Fix deadlocks in TestHttpServer by synchronize HADOOP-9964. Fix deadlocks in TestHttpServer by synchronize

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface DirectCompressor extends Compressor {
/**
* Example usage
* <pre> {@code
* private void compress(DirectCompressor comp, ByteBufferProducer in, ByteBufferConsumer out) throws IOException {
* ByteBuffer outBB = ByteBuffer.allocateDirect(64*1024);
* outBB.clear();
* // returns inBB.remaining() &gt; 0 || inBB == null
* // if you do a inBB.put(), remember to do a inBB.flip()
* ByteBuffer inBB = in.get();
* while(!comp.finished()) {
* comp.compress(outBB, inBB);
* if(outBB.remaining() == 0) {
* // flush when the buffer only when it is full
* outBB.flip();
* // has to consume the buffer, because it is reused
* out.put(outBB);
* outBB.clear();
* }
* if(inBB != null &amp;&amp; inBB.remaining() == 0) {
* inBB = in.get();
* if(inBB == null) {
* // EOF
* comp.finish();
* }
* }
* }
*
* if(outBB.position() &gt; 0) {
* outBB.flip();
* out.put(outBB);
* outBB.clear();
* }
* }
* } </pre>
* @param dst Destination {@link ByteBuffer} for storing the results into. Requires dst.remaining() to be > 0
* @param src Source {@link ByteBuffer} for reading from. This can be null or src.remaining() > 0
* @return bytes stored into dst
* @throws IOException if compression fails
*/
public int compress(ByteBuffer dst, ByteBuffer src) throws IOException;
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface DirectDecompressor extends Decompressor {
/**
* Example usage
*
* <pre>{@code
* private void decompress(DirectDecompressor decomp, ByteBufferProducer in, ByteBufferConsumer out) throws IOException {
* ByteBuffer outBB = ByteBuffer.allocate(64*1024);
* outBB.clear();
* // returns inBB.remaining() &gt; 0 || inBB == null
* // if you do a inBB.put(), remember to do a inBB.flip()
* ByteBuffer inBB = in.get();
* if(inBB == null) {
* // no data at all?
* }
* while(!decomp.finished()) {
* decomp.decompress(outBB, inBB);
* if(outBB.remaining() == 0) {
* // flush when the buffer is full
* outBB.flip();
* // has to consume the buffer, because it is reused
* out.put(outBB);
* outBB.clear();
* }
* if(inBB != null &amp;&amp; inBB.remaining() == 0) {
* // inBB = null for EOF
* inBB = in.get();
* }
* }
*
* if(outBB.position() &gt; 0) {
* outBB.flip();
* out.put(outBB);
* outBB.clear();
* }
* }
* }</pre>
* @param dst Destination {@link ByteBuffer} for storing the results into. Requires dst.remaining() to be > 0
* @param src Source {@link ByteBuffer} for reading from. This can be null or src.remaining() > 0
* @return bytes stored into dst (dst.postion += more)
* @throws IOException if compression fails
*/
public int decompress(ByteBuffer dst, ByteBuffer src) throws IOException;
}

View File

@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.DirectCompressor;
import org.apache.hadoop.util.NativeCodeLoader; import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -35,7 +36,7 @@ import org.apache.commons.logging.LogFactory;
* http://www.zlib.net/ * http://www.zlib.net/
* *
*/ */
public class ZlibCompressor implements Compressor { public class ZlibCompressor implements Compressor,DirectCompressor {
private static final Log LOG = LogFactory.getLog(ZlibCompressor.class); private static final Log LOG = LogFactory.getLog(ZlibCompressor.class);
@ -420,6 +421,7 @@ public class ZlibCompressor implements Compressor {
compressedDirectBuf.limit(directBufferSize); compressedDirectBuf.limit(directBufferSize);
compressedDirectBuf.position(directBufferSize); compressedDirectBuf.position(directBufferSize);
userBufOff = userBufLen = 0; userBufOff = userBufLen = 0;
userBuf = null;
} }
@Override @Override
@ -435,6 +437,110 @@ public class ZlibCompressor implements Compressor {
throw new NullPointerException(); throw new NullPointerException();
} }
private int put(ByteBuffer dst, ByteBuffer src) {
// this will lop off data from src[pos:limit] into dst[pos:limit]
int l1 = src.remaining();
int l2 = dst.remaining();
int pos1 = src.position();
int pos2 = dst.position();
int len = Math.min(l1, l2);
if (len == 0) {
return 0;
}
ByteBuffer slice = src.slice();
slice.limit(len);
dst.put(slice);
src.position(pos1 + len);
return len;
}
public int compress(ByteBuffer dst, ByteBuffer src) throws IOException {
assert dst.remaining() > 0 : "dst.remaining() == 0";
int n = 0;
/* fast path for clean state and direct buffers */
/* TODO: reset should free userBuf? */
if((src != null && src.isDirect()) && dst.isDirect() && userBuf == null) {
/*
* TODO: fix these assumptions in inflateDirect(), eventually by allowing
* it to read position()/limit() directly
*/
boolean cleanDst = (dst.position() == 0 && dst.remaining() == dst.capacity() && dst.capacity() >= directBufferSize);
boolean cleanState = (keepUncompressedBuf == false && uncompressedDirectBufLen == 0 && compressedDirectBuf.remaining() == 0);
/* use the buffers directly */
if(cleanDst && cleanState) {
Buffer originalCompressed = compressedDirectBuf;
Buffer originalUncompressed = uncompressedDirectBuf;
int originalBufferSize = directBufferSize;
uncompressedDirectBuf = src;
uncompressedDirectBufOff = src.position();
uncompressedDirectBufLen = src.remaining();
compressedDirectBuf = dst;
directBufferSize = dst.remaining();
// Compress data
n = deflateBytesDirect();
// we move dst.position() forward, not limit()
// unlike the local buffer case, which moves it when we put() into the dst
dst.position(n);
if(uncompressedDirectBufLen > 0) {
src.position(uncompressedDirectBufOff);
} else {
src.position(src.limit());
}
compressedDirectBuf = originalCompressed;
uncompressedDirectBuf = originalUncompressed;
uncompressedDirectBufOff = 0;
uncompressedDirectBufLen = 0;
directBufferSize = originalBufferSize;
return n;
}
}
// Check if there is compressed data
if (compressedDirectBuf.remaining() > 0) {
n = put(dst, (ByteBuffer) compressedDirectBuf);
}
if (dst.remaining() == 0) {
return n;
} else {
needsInput();
// if we have drained userBuf, read from src (ideally, do not mix buffer
// modes, but sometimes you can)
if (userBufLen == 0 && src != null && src.remaining() > 0) {
put((ByteBuffer) uncompressedDirectBuf, src);
uncompressedDirectBufLen = uncompressedDirectBuf.position();
}
// Re-initialize the zlib's output direct buffer
compressedDirectBuf.rewind();
compressedDirectBuf.limit(directBufferSize);
// Compress data
int more = deflateBytesDirect();
compressedDirectBuf.limit(more);
// Check if zlib consumed all input buffer
// set keepUncompressedBuf properly
if (uncompressedDirectBufLen <= 0) { // zlib consumed all input buffer
keepUncompressedBuf = false;
uncompressedDirectBuf.clear();
uncompressedDirectBufOff = 0;
uncompressedDirectBufLen = 0;
} else { // zlib did not consume all input buffer
keepUncompressedBuf = true;
}
// fill the dst buffer from compressedDirectBuf
int fill = put(dst, ((ByteBuffer) compressedDirectBuf));
return n + fill;
}
}
private native static void initIDs(); private native static void initIDs();
private native static long init(int level, int strategy, int windowBits); private native static long init(int level, int strategy, int windowBits);
private native static void setDictionary(long strm, byte[] b, int off, private native static void setDictionary(long strm, byte[] b, int off,

View File

@ -23,6 +23,7 @@ import java.nio.Buffer;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DirectDecompressor;
import org.apache.hadoop.util.NativeCodeLoader; import org.apache.hadoop.util.NativeCodeLoader;
/** /**
@ -31,7 +32,7 @@ import org.apache.hadoop.util.NativeCodeLoader;
* http://www.zlib.net/ * http://www.zlib.net/
* *
*/ */
public class ZlibDecompressor implements Decompressor { public class ZlibDecompressor implements Decompressor,DirectDecompressor {
private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024; private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
// HACK - Use this as a global lock in the JNI layer // HACK - Use this as a global lock in the JNI layer
@ -280,6 +281,7 @@ public class ZlibDecompressor implements Decompressor {
uncompressedDirectBuf.limit(directBufferSize); uncompressedDirectBuf.limit(directBufferSize);
uncompressedDirectBuf.position(directBufferSize); uncompressedDirectBuf.position(directBufferSize);
userBufOff = userBufLen = 0; userBufOff = userBufLen = 0;
userBuf = null;
} }
@Override @Override
@ -300,6 +302,108 @@ public class ZlibDecompressor implements Decompressor {
throw new NullPointerException(); throw new NullPointerException();
} }
private int put(ByteBuffer dst, ByteBuffer src) {
// this will lop off data from src[pos:limit] into dst[pos:limit], using the
// min() of both remaining()
int l1 = src.remaining();
int l2 = dst.remaining();
int pos1 = src.position();
int pos2 = dst.position();
int len = Math.min(l1, l2);
if (len == 0) {
return 0;
}
ByteBuffer slice = src.slice();
slice.limit(len);
dst.put(slice);
src.position(pos1 + len);
return len;
}
public int decompress(ByteBuffer dst, ByteBuffer src) throws IOException {
assert dst.remaining() > 0 : "dst.remaining == 0";
int n = 0;
/* fast path for clean state and direct buffers */
if((src != null && src.isDirect()) && dst.isDirect() && userBuf == null) {
/*
* TODO: fix these assumptions in inflateDirect(), eventually by allowing
* it to read position()/limit() directly
*/
boolean cleanDst = (dst.position() == 0 && dst.remaining() == dst.capacity() && dst.remaining() >= directBufferSize);
boolean cleanState = (compressedDirectBufLen == 0 && uncompressedDirectBuf.remaining() == 0);
/* use the buffers directly */
if(cleanDst && cleanState) {
Buffer originalCompressed = compressedDirectBuf;
Buffer originalUncompressed = uncompressedDirectBuf;
int originalBufferSize = directBufferSize;
compressedDirectBuf = src;
compressedDirectBufOff = src.position();
compressedDirectBufLen = src.remaining();
uncompressedDirectBuf = dst;
directBufferSize = dst.remaining();
// Compress data
n = inflateBytesDirect();
dst.position(n);
if(compressedDirectBufLen > 0) {
src.position(compressedDirectBufOff);
} else {
src.position(src.limit());
}
compressedDirectBuf = originalCompressed;
uncompressedDirectBuf = originalUncompressed;
compressedDirectBufOff = 0;
compressedDirectBufLen = 0;
directBufferSize = originalBufferSize;
return n;
}
}
// Check if there is compressed data
if (uncompressedDirectBuf.remaining() > 0) {
n = put(dst, (ByteBuffer) uncompressedDirectBuf);
}
if (dst.remaining() == 0) {
return n;
} else {
if (needsInput()) {
// this does not update buffers if we have no userBuf
if (userBufLen <= 0) {
compressedDirectBufOff = 0;
compressedDirectBufLen = 0;
compressedDirectBuf.rewind().limit(directBufferSize);
}
if (src != null) {
assert src.remaining() > 0 : "src.remaining() == 0";
}
}
// if we have drained userBuf, read from src (ideally, do not mix buffer
// modes, but sometimes you can)
if (userBufLen == 0 && src != null && src.remaining() > 0) {
compressedDirectBufLen += put(((ByteBuffer) compressedDirectBuf), src);
}
// Re-initialize the zlib's output direct buffer
uncompressedDirectBuf.rewind();
uncompressedDirectBuf.limit(directBufferSize);
// Compress data
int more = inflateBytesDirect();
uncompressedDirectBuf.limit(more);
// Get atmost 'len' bytes
int fill = put(dst, ((ByteBuffer) uncompressedDirectBuf));
return n + fill;
}
}
private native static void initIDs(); private native static void initIDs();
private native static long init(int windowBits); private native static long init(int windowBits);
private native static void setDictionary(long strm, byte[] b, int off, private native static void setDictionary(long strm, byte[] b, int off,

View File

@ -19,8 +19,13 @@ package org.apache.hadoop.io.compress.zlib;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.junit.Assume.*; import static org.junit.Assume.*;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Console;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Random; import java.util.Random;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -33,8 +38,12 @@ import org.apache.hadoop.io.compress.DecompressorStream;
import org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStrategy; import org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStrategy;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel; import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy; import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
import org.apache.log4j.ConsoleAppender;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import sun.util.logging.resources.logging;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
public class TestZlibCompressorDecompressor { public class TestZlibCompressorDecompressor {
@ -150,6 +159,149 @@ public class TestZlibCompressorDecompressor {
} }
} }
private void compressDecompressLoop(int rawDataSize, int inSize, int outSize)
throws IOException {
byte[] rawData = null;
rawData = generate(rawDataSize);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ByteBuffer inBuf = ByteBuffer.allocateDirect(inSize);
ByteBuffer outBuf = ByteBuffer.allocateDirect(outSize);
ZlibCompressor compressor = new ZlibCompressor();
ZlibDecompressor decompressor = new ZlibDecompressor();
outBuf.clear();
/* compression loop */
int off = 0;
int len = rawDataSize;
int min = Math.min(inBuf.remaining(), len);
if (min > 0) {
inBuf.put(rawData, off, min);
}
inBuf.flip();
len -= min;
off += min;
while (!compressor.finished()) {
compressor.compress(outBuf, inBuf);
if (outBuf.remaining() == 0) {
// flush when the buffer is full
outBuf.flip();
while (outBuf.remaining() > 0) {
baos.write(outBuf.get());
}
outBuf.clear();
}
if (inBuf != null && inBuf.remaining() == 0) {
inBuf.clear();
if (len > 0) {
min = Math.min(inBuf.remaining(), len);
inBuf.put(rawData, off, min);
inBuf.flip();
len -= min;
off += min;
} else {
inBuf = null;
compressor.finish();
}
}
}
outBuf.flip();
if (outBuf.remaining() > 0) {
while (outBuf.remaining() > 0) {
baos.write(outBuf.get());
}
outBuf.clear();
}
compressor.end();
byte[] compressed = baos.toByteArray();
ByteBuffer expected = ByteBuffer.wrap(rawData);
outBuf.clear();
inBuf = ByteBuffer.allocateDirect(inSize);
inBuf.clear();
// zlib always has header
if (compressed.length != 0) {
off = 0;
len = compressed.length;
min = Math.min(inBuf.remaining(), len);
inBuf.put(compressed, off, min);
inBuf.flip();
len -= min;
off += min;
while (!decompressor.finished()) {
decompressor.decompress(outBuf, inBuf);
if (outBuf.remaining() == 0) {
outBuf.flip();
while (outBuf.remaining() > 0) {
assertEquals(expected.get(), outBuf.get());
}
outBuf.clear();
}
if (inBuf != null && inBuf.remaining() == 0) {
inBuf.clear();
if (len > 0) {
min = Math.min(inBuf.remaining(), len);
inBuf.put(compressed, off, min);
inBuf.flip();
len -= min;
off += min;
}
}
}
}
outBuf.flip();
if (outBuf.remaining() > 0) {
while (outBuf.remaining() > 0) {
assertEquals(expected.get(), outBuf.get());
}
outBuf.clear();
}
assertEquals(0, expected.remaining());
}
@Test
public void testZlibDirectCompressDecompress() {
int[] size = { 4, 16, 4 * 1024, 64 * 1024, 128 * 1024, 256 * 1024,
1024 * 1024 };
try {
// 0-2 bytes results in sizeof(outBuf) > sizeof(inBuf)
compressDecompressLoop(0, 4096, 4096);
compressDecompressLoop(0, 1, 1);
compressDecompressLoop(1, 1, 2);
compressDecompressLoop(1, 2, 1);
compressDecompressLoop(2, 3, 2);
for (int i = 0; i < size.length; i++) {
compressDecompressLoop(size[i], 4096, 4096);
compressDecompressLoop(size[i], 1, 1);
compressDecompressLoop(size[i], 1, 2);
compressDecompressLoop(size[i], 2, 1);
compressDecompressLoop(size[i], 3, 2);
compressDecompressLoop(size[i], size[i], 4096);
compressDecompressLoop(size[i], size[i] - 1, 4096);
compressDecompressLoop(size[i], size[i] + 1, 4096);
compressDecompressLoop(size[i], 4096, size[i]);
compressDecompressLoop(size[i], 4096, size[i] - 1);
compressDecompressLoop(size[i], 4096, size[i] + 1);
compressDecompressLoop(size[i], size[i] - 1, size[i] - 1);
compressDecompressLoop(size[i], size[i] / 2, 4096);
compressDecompressLoop(size[i], size[i] / 2 - 1, 4096);
compressDecompressLoop(size[i], size[i] / 2 + 1, 4096);
compressDecompressLoop(size[i], 4096, size[i] / 2);
compressDecompressLoop(size[i], 4096, size[i] / 2 - 1);
compressDecompressLoop(size[i], 4096, size[i] / 2 + 1);
compressDecompressLoop(size[i], size[i] / 2 - 1, size[i] / 2 - 1);
}
} catch (IOException ex) {
fail("testZlibDirectCompressDecompress ex !!!" + ex);
}
}
@Test @Test
public void testZlibCompressorDecompressorSetDictionary() { public void testZlibCompressorDecompressorSetDictionary() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();