Merge -c 1543542 from trunk to branch-2 to fix HADOOP-10047. Add a direct-buffer based apis for compression. Contributed by Gopal V.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1543543 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2013-11-19 19:30:04 +00:00
parent 71f9f49590
commit 258775c9fd
11 changed files with 408 additions and 4 deletions

View File

@ -97,6 +97,9 @@ Release 2.3.0 - UNRELEASED
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
HADOOP-10047. Add a direct-buffer based apis for compression. (Gopal V
via acmurthy)
BUG FIXES
HADOOP-9964. Fix deadlocks in TestHttpServer by synchronize

View File

@ -28,11 +28,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class DefaultCodec implements Configurable, CompressionCodec {
public class DefaultCodec implements Configurable, CompressionCodec, DirectDecompressionCodec {
private static final Log LOG = LogFactory.getLog(DefaultCodec.class);
Configuration conf;
@ -103,6 +104,15 @@ public class DefaultCodec implements Configurable, CompressionCodec {
return ZlibFactory.getZlibDecompressor(conf);
}
/**
* {@inheritDoc}
*/
@Override
public DirectDecompressor createDirectDecompressor() {
return ZlibFactory.getZlibDirectDecompressor(conf);
}
@Override
public String getDefaultExtension() {
return ".deflate";

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This class encapsulates a codec which can decompress direct bytebuffers.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface DirectDecompressionCodec extends CompressionCodec {
/**
* Create a new {@link DirectDecompressor} for use by this {@link DirectDecompressionCodec}.
*
* @return a new direct decompressor for use by this codec
*/
DirectDecompressor createDirectDecompressor();
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Specification of a direct ByteBuffer 'de-compressor'.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface DirectDecompressor {
/*
* This exposes a direct interface for record decompression with direct byte
* buffers.
*
* The decompress() function need not always consume the buffers provided,
* it will need to be called multiple times to decompress an entire buffer
* and the object will hold the compression context internally.
*
* Codecs such as {@link SnappyCodec} may or may not support partial
* decompression of buffers and will need enough space in the destination
* buffer to decompress an entire block.
*
* The operation is modelled around dst.put(src);
*
* The end result will move src.position() by the bytes-read and
* dst.position() by the bytes-written. It should not modify the src.limit()
* or dst.limit() to maintain consistency of operation between codecs.
*
* @param src Source direct {@link ByteBuffer} for reading from. Requires src
* != null and src.remaining() > 0
*
* @param dst Destination direct {@link ByteBuffer} for storing the results
* into. Requires dst != null and dst.remaining() to be > 0
*
* @throws IOException if compression fails
*/
public void decompress(ByteBuffer src, ByteBuffer dst) throws IOException;
}

View File

@ -25,6 +25,8 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.zlib.*;
import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
import static org.apache.hadoop.util.PlatformName.IBM_JAVA;
/**
@ -163,6 +165,13 @@ public class GzipCodec extends DefaultCodec {
? GzipZlibDecompressor.class
: BuiltInGzipDecompressor.class;
}
@Override
public DirectDecompressor createDirectDecompressor() {
return ZlibFactory.isNativeZlibLoaded(conf)
? new ZlibDecompressor.ZlibDirectDecompressor(
ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB, 0) : null;
}
@Override
public String getDefaultExtension() {

View File

@ -26,13 +26,14 @@ import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.util.NativeCodeLoader;
/**
* This class creates snappy compressors/decompressors.
*/
public class SnappyCodec implements Configurable, CompressionCodec {
public class SnappyCodec implements Configurable, CompressionCodec, DirectDecompressionCodec {
Configuration conf;
/**
@ -203,6 +204,14 @@ public class SnappyCodec implements Configurable, CompressionCodec {
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
return new SnappyDecompressor(bufferSize);
}
/**
* {@inheritDoc}
*/
@Override
public DirectDecompressor createDirectDecompressor() {
return isNativeCodeLoaded() ? new SnappyDirectDecompressor() : null;
}
/**
* Get the default filename extension for this kind of compression.

View File

@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DirectDecompressor;
import org.apache.hadoop.util.NativeCodeLoader;
/**
@ -282,4 +283,75 @@ public class SnappyDecompressor implements Decompressor {
private native static void initIDs();
private native int decompressBytesDirect();
int decompressDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
assert (this instanceof SnappyDirectDecompressor);
ByteBuffer presliced = dst;
if (dst.position() > 0) {
presliced = dst;
dst = dst.slice();
}
Buffer originalCompressed = compressedDirectBuf;
Buffer originalUncompressed = uncompressedDirectBuf;
int originalBufferSize = directBufferSize;
compressedDirectBuf = src.slice();
compressedDirectBufLen = src.remaining();
uncompressedDirectBuf = dst;
directBufferSize = dst.remaining();
int n = 0;
try {
n = decompressBytesDirect();
presliced.position(presliced.position() + n);
// SNAPPY always consumes the whole buffer or throws an exception
src.position(src.limit());
finished = true;
} finally {
compressedDirectBuf = originalCompressed;
uncompressedDirectBuf = originalUncompressed;
compressedDirectBufLen = 0;
directBufferSize = originalBufferSize;
}
return n;
}
public static class SnappyDirectDecompressor extends SnappyDecompressor implements
DirectDecompressor {
@Override
public boolean finished() {
return (endOfInput && super.finished());
}
@Override
public void reset() {
super.reset();
endOfInput = true;
}
private boolean endOfInput;
@Override
public synchronized void decompress(ByteBuffer src, ByteBuffer dst)
throws IOException {
assert dst.isDirect() : "dst.isDirect()";
assert src.isDirect() : "src.isDirect()";
assert dst.remaining() > 0 : "dst.remaining() > 0";
this.decompressDirect(src, dst);
endOfInput = !src.hasRemaining();
}
@Override
public synchronized void setDictionary(byte[] b, int off, int len) {
throw new UnsupportedOperationException(
"byte[] arrays are not supported for DirectDecompressor");
}
@Override
public synchronized int decompress(byte[] b, int off, int len) {
throw new UnsupportedOperationException(
"byte[] arrays are not supported for DirectDecompressor");
}
}
}

View File

@ -23,6 +23,7 @@ import java.nio.Buffer;
import java.nio.ByteBuffer;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DirectDecompressor;
import org.apache.hadoop.util.NativeCodeLoader;
/**
@ -106,7 +107,7 @@ public class ZlibDecompressor implements Decompressor {
*/
public ZlibDecompressor(CompressionHeader header, int directBufferSize) {
this.header = header;
this.directBufferSize = directBufferSize;
this.directBufferSize = directBufferSize;
compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
uncompressedDirectBuf.position(directBufferSize);
@ -310,4 +311,86 @@ public class ZlibDecompressor implements Decompressor {
private native static int getRemaining(long strm);
private native static void reset(long strm);
private native static void end(long strm);
int inflateDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
assert (this instanceof ZlibDirectDecompressor);
ByteBuffer presliced = dst;
if (dst.position() > 0) {
presliced = dst;
dst = dst.slice();
}
Buffer originalCompressed = compressedDirectBuf;
Buffer originalUncompressed = uncompressedDirectBuf;
int originalBufferSize = directBufferSize;
compressedDirectBuf = src;
compressedDirectBufOff = src.position();
compressedDirectBufLen = src.remaining();
uncompressedDirectBuf = dst;
directBufferSize = dst.remaining();
int n = 0;
try {
n = inflateBytesDirect();
presliced.position(presliced.position() + n);
if (compressedDirectBufLen > 0) {
src.position(compressedDirectBufOff);
} else {
src.position(src.limit());
}
} finally {
compressedDirectBuf = originalCompressed;
uncompressedDirectBuf = originalUncompressed;
compressedDirectBufOff = 0;
compressedDirectBufLen = 0;
directBufferSize = originalBufferSize;
}
return n;
}
public static class ZlibDirectDecompressor
extends ZlibDecompressor implements DirectDecompressor {
public ZlibDirectDecompressor() {
super(CompressionHeader.DEFAULT_HEADER, 0);
}
public ZlibDirectDecompressor(CompressionHeader header, int directBufferSize) {
super(header, directBufferSize);
}
@Override
public boolean finished() {
return (endOfInput && super.finished());
}
@Override
public void reset() {
super.reset();
endOfInput = true;
}
private boolean endOfInput;
@Override
public synchronized void decompress(ByteBuffer src, ByteBuffer dst)
throws IOException {
assert dst.isDirect() : "dst.isDirect()";
assert src.isDirect() : "src.isDirect()";
assert dst.remaining() > 0 : "dst.remaining() > 0";
this.inflateDirect(src, dst);
endOfInput = !src.hasRemaining();
}
@Override
public synchronized void setDictionary(byte[] b, int off, int len) {
throw new UnsupportedOperationException(
"byte[] arrays are not supported for DirectDecompressor");
}
@Override
public synchronized int decompress(byte[] b, int off, int len) {
throw new UnsupportedOperationException(
"byte[] arrays are not supported for DirectDecompressor");
}
}
}

View File

@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DirectDecompressor;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
import org.apache.hadoop.util.NativeCodeLoader;
@ -116,6 +117,17 @@ public class ZlibFactory {
return (isNativeZlibLoaded(conf)) ?
new ZlibDecompressor() : new BuiltInZlibInflater();
}
/**
* Return the appropriate implementation of the zlib direct decompressor.
*
* @param conf configuration
* @return the appropriate implementation of the zlib decompressor.
*/
public static DirectDecompressor getZlibDirectDecompressor(Configuration conf) {
return (isNativeZlibLoaded(conf)) ?
new ZlibDecompressor.ZlibDirectDecompressor() : null;
}
public static void setCompressionStrategy(Configuration conf,
CompressionStrategy strategy) {

View File

@ -29,6 +29,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.hadoop.io.DataInputBuffer;
@ -38,6 +39,7 @@ import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -147,7 +149,7 @@ public class TestSnappyCompressorDecompressor {
fail("testSnappyCompressorCompressAIOBException ex error !!!");
}
}
@Test
public void testSnappyDecompressorCompressAIOBException() {
try {
@ -275,6 +277,56 @@ public class TestSnappyCompressorDecompressor {
fail("testSnappyBlockCompression ex error !!!");
}
}
private void compressDecompressLoop(int rawDataSize) throws IOException {
byte[] rawData = BytesGenerator.get(rawDataSize);
byte[] compressedResult = new byte[rawDataSize+20];
int directBufferSize = Math.max(rawDataSize*2, 64*1024);
SnappyCompressor compressor = new SnappyCompressor(directBufferSize);
compressor.setInput(rawData, 0, rawDataSize);
int compressedSize = compressor.compress(compressedResult, 0, compressedResult.length);
SnappyDirectDecompressor decompressor = new SnappyDirectDecompressor();
ByteBuffer inBuf = ByteBuffer.allocateDirect(compressedSize);
ByteBuffer outBuf = ByteBuffer.allocateDirect(rawDataSize);
inBuf.put(compressedResult, 0, compressedSize);
inBuf.flip();
ByteBuffer expected = ByteBuffer.wrap(rawData);
outBuf.clear();
while(!decompressor.finished()) {
decompressor.decompress(inBuf, outBuf);
if (outBuf.remaining() == 0) {
outBuf.flip();
while (outBuf.remaining() > 0) {
assertEquals(expected.get(), outBuf.get());
}
outBuf.clear();
}
}
outBuf.flip();
while (outBuf.remaining() > 0) {
assertEquals(expected.get(), outBuf.get());
}
outBuf.clear();
assertEquals(0, expected.remaining());
}
@Test
public void testSnappyDirectBlockCompression() {
int[] size = { 4 * 1024, 64 * 1024, 128 * 1024, 1024 * 1024 };
assumeTrue(SnappyCodec.isNativeCodeLoaded());
try {
for (int i = 0; i < size.length; i++) {
compressDecompressLoop(size[i]);
}
} catch (IOException ex) {
fail("testSnappyDirectBlockCompression ex !!!" + ex);
}
}
@Test
public void testSnappyCompressorDecopressorLogicWithCompressionStreams() {

View File

@ -19,9 +19,13 @@ package org.apache.hadoop.io.compress.zlib;
import static org.junit.Assert.*;
import static org.junit.Assume.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.zip.DeflaterOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -33,6 +37,8 @@ import org.apache.hadoop.io.compress.DecompressorStream;
import org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStrategy;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
import org.apache.hadoop.util.NativeCodeLoader;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableSet;
@ -150,6 +156,60 @@ public class TestZlibCompressorDecompressor {
}
}
private void compressDecompressLoop(int rawDataSize) throws IOException {
byte[] rawData = null;
rawData = generate(rawDataSize);
ByteArrayOutputStream baos = new ByteArrayOutputStream(rawDataSize+12);
DeflaterOutputStream dos = new DeflaterOutputStream(baos);
dos.write(rawData);
dos.flush();
dos.close();
byte[] compressedResult = baos.toByteArray();
int compressedSize = compressedResult.length;
ZlibDirectDecompressor decompressor = new ZlibDirectDecompressor();
ByteBuffer inBuf = ByteBuffer.allocateDirect(compressedSize);
ByteBuffer outBuf = ByteBuffer.allocateDirect(rawDataSize);
inBuf.put(compressedResult, 0, compressedSize);
inBuf.flip();
ByteBuffer expected = ByteBuffer.wrap(rawData);
outBuf.clear();
while(!decompressor.finished()) {
decompressor.decompress(inBuf, outBuf);
if (outBuf.remaining() == 0) {
outBuf.flip();
while (outBuf.remaining() > 0) {
assertEquals(expected.get(), outBuf.get());
}
outBuf.clear();
}
}
outBuf.flip();
while (outBuf.remaining() > 0) {
assertEquals(expected.get(), outBuf.get());
}
outBuf.clear();
assertEquals(0, expected.remaining());
}
@Test
public void testZlibDirectCompressDecompress() {
int[] size = { 1, 4, 16, 4 * 1024, 64 * 1024, 128 * 1024, 1024 * 1024 };
assumeTrue(NativeCodeLoader.isNativeCodeLoaded());
try {
for (int i = 0; i < size.length; i++) {
compressDecompressLoop(size[i]);
}
} catch (IOException ex) {
fail("testZlibDirectCompressDecompress ex !!!" + ex);
}
}
@Test
public void testZlibCompressorDecompressorSetDictionary() {
Configuration conf = new Configuration();