svn merge -c 1453608 FIXES: HADOOP-8462. Native-code implementation of bzip2 codec. Contributed by Govind Kamat

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1453610 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-03-06 23:16:33 +00:00
parent d6481d4a60
commit 0cdaefae04
13 changed files with 1422 additions and 72 deletions

View File

@ -1074,6 +1074,9 @@ Release 0.23.7 - UNRELEASED
OPTIMIZATIONS
HADOOP-8462. Native-code implementation of bzip2 codec. (Govind Kamat via
jlowe)
BUG FIXES
HADOOP-9302. HDFS docs not linked from top level (Andy Isaacson via

View File

@ -452,6 +452,7 @@
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<require.bzip2>false</require.bzip2>
<snappy.prefix></snappy.prefix>
<snappy.lib></snappy.lib>
<snappy.include></snappy.include>
@ -473,6 +474,8 @@
<javahClassNames>
<javahClassName>org.apache.hadoop.io.compress.zlib.ZlibCompressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.zlib.ZlibDecompressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.bzip2.Bzip2Compressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.bzip2.Bzip2Decompressor</javahClassName>
<javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsMapping</javahClassName>
<javahClassName>org.apache.hadoop.io.nativeio.NativeIO</javahClassName>
<javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName>
@ -498,7 +501,7 @@
<configuration>
<target>
<exec executable="cmake" dir="${project.build.directory}/native" failonerror="true">
<arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model} -DREQUIRE_SNAPPY=${require.snappy} -DCUSTOM_SNAPPY_PREFIX=${snappy.prefix} -DCUSTOM_SNAPPY_LIB=${snappy.lib} -DCUSTOM_SNAPPY_INCLUDE=${snappy.include}"/>
<arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model} -DREQUIRE_BZIP2=${require.bzip2} -DREQUIRE_SNAPPY=${require.snappy} -DCUSTOM_SNAPPY_PREFIX=${snappy.prefix} -DCUSTOM_SNAPPY_LIB=${snappy.lib} -DCUSTOM_SNAPPY_INCLUDE=${snappy.include}"/>
</exec>
<exec executable="make" dir="${project.build.directory}/native" failonerror="true">
<arg line="VERBOSE=1"/>

View File

@ -97,6 +97,23 @@ set(T main/native/src/test/org/apache/hadoop)
GET_FILENAME_COMPONENT(HADOOP_ZLIB_LIBRARY ${ZLIB_LIBRARIES} NAME)
SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES)
set_find_shared_library_version("1")
find_package(BZip2 QUIET)
if (BZIP2_INCLUDE_DIR AND BZIP2_LIBRARIES)
GET_FILENAME_COMPONENT(HADOOP_BZIP2_LIBRARY ${BZIP2_LIBRARIES} NAME)
set(BZIP2_SOURCE_FILES
"${D}/io/compress/bzip2/Bzip2Compressor.c"
"${D}/io/compress/bzip2/Bzip2Decompressor.c")
else (BZIP2_INCLUDE_DIR AND BZIP2_LIBRARIES)
set(BZIP2_SOURCE_FILES "")
set(BZIP2_INCLUDE_DIR "")
IF(REQUIRE_BZIP2)
MESSAGE(FATAL_ERROR "Required bzip2 library and/or header files could not be found.")
ENDIF(REQUIRE_BZIP2)
endif (BZIP2_INCLUDE_DIR AND BZIP2_LIBRARIES)
SET(CMAKE_FIND_LIBRARY_SUFFIXES STORED_CMAKE_FIND_LIBRARY_SUFFIXES)
INCLUDE(CheckFunctionExists)
INCLUDE(CheckCSourceCompiles)
INCLUDE(CheckLibraryExists)
@ -136,6 +153,7 @@ include_directories(
${CMAKE_BINARY_DIR}
${JNI_INCLUDE_DIRS}
${ZLIB_INCLUDE_DIRS}
${BZIP2_INCLUDE_DIR}
${SNAPPY_INCLUDE_DIR}
${D}/util
)
@ -155,6 +173,7 @@ add_dual_library(hadoop
${SNAPPY_SOURCE_FILES}
${D}/io/compress/zlib/ZlibCompressor.c
${D}/io/compress/zlib/ZlibDecompressor.c
${BZIP2_SOURCE_FILES}
${D}/io/nativeio/NativeIO.c
${D}/io/nativeio/errno_enum.c
${D}/io/nativeio/file_descriptor.c

View File

@ -19,6 +19,7 @@
#define CONFIG_H
#cmakedefine HADOOP_ZLIB_LIBRARY "@HADOOP_ZLIB_LIBRARY@"
#cmakedefine HADOOP_BZIP2_LIBRARY "@HADOOP_BZIP2_LIBRARY@"
#cmakedefine HADOOP_SNAPPY_LIBRARY "@HADOOP_SNAPPY_LIBRARY@"
#cmakedefine HAVE_SYNC_FILE_RANGE
#cmakedefine HAVE_POSIX_FADVISE

View File

@ -23,108 +23,156 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.compress.bzip2.BZip2Constants;
import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor;
import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor;
import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
/**
* This class provides CompressionOutputStream and CompressionInputStream for
* compression and decompression. Currently we dont have an implementation of
* the Compressor and Decompressor interfaces, so those methods of
* CompressionCodec which have a Compressor or Decompressor type argument, throw
* UnsupportedOperationException.
* This class provides output and input streams for bzip2 compression
* and decompression. It uses the native bzip2 library on the system
* if possible, else it uses a pure-Java implementation of the bzip2
* algorithm. The configuration parameter
* io.compression.codec.bzip2.library can be used to control this
* behavior.
*
* In the pure-Java mode, the Compressor and Decompressor interfaces
* are not implemented. Therefore, in that mode, those methods of
* CompressionCodec which have a Compressor or Decompressor type
* argument, throw UnsupportedOperationException.
*
* Currently, support for splittability is available only in the
* pure-Java mode; therefore, if a SplitCompressionInputStream is
* requested, the pure-Java implementation is used, regardless of the
* setting of the configuration parameter mentioned above.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class BZip2Codec implements SplittableCompressionCodec {
public class BZip2Codec implements Configurable, SplittableCompressionCodec {
private static final String HEADER = "BZ";
private static final int HEADER_LEN = HEADER.length();
private static final String SUB_HEADER = "h9";
private static final int SUB_HEADER_LEN = SUB_HEADER.length();
private Configuration conf;
/**
* Creates a new instance of BZip2Codec
* Set the configuration to be used by this object.
*
* @param conf the configuration object.
*/
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
/**
* Return the configuration used by this object.
*
* @return the configuration object used by this objec.
*/
@Override
public Configuration getConf() {
return conf;
}
/**
* Creates a new instance of BZip2Codec.
*/
public BZip2Codec() { }
/**
* Creates CompressionOutputStream for BZip2
*
* @param out
* The output Stream
* @return The BZip2 CompressionOutputStream
* @throws java.io.IOException
* Throws IO exception
*/
* Create a {@link CompressionOutputStream} that will write to the given
* {@link OutputStream}.
*
* @param out the location for the final output stream
* @return a stream the user can write uncompressed data to, to have it
* compressed
* @throws IOException
*/
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
return new BZip2CompressionOutputStream(out);
return createOutputStream(out, createCompressor());
}
/**
* Creates a compressor using given OutputStream.
* Create a {@link CompressionOutputStream} that will write to the given
* {@link OutputStream} with the given {@link Compressor}.
*
* @return CompressionOutputStream
@throws java.io.IOException
* @param out the location for the final output stream
* @param compressor compressor to use
* @return a stream the user can write uncompressed data to, to have it
* compressed
* @throws IOException
*/
@Override
public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor) throws IOException {
return createOutputStream(out);
return Bzip2Factory.isNativeBzip2Loaded(conf) ?
new CompressorStream(out, compressor,
conf.getInt("io.file.buffer.size", 4*1024)) :
new BZip2CompressionOutputStream(out);
}
/**
* This functionality is currently not supported.
*
* @return BZip2DummyCompressor.class
*/
* Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
*
* @return the type of compressor needed by this codec.
*/
@Override
public Class<? extends org.apache.hadoop.io.compress.Compressor> getCompressorType() {
return BZip2DummyCompressor.class;
public Class<? extends Compressor> getCompressorType() {
return Bzip2Factory.getBzip2CompressorType(conf);
}
/**
* This functionality is currently not supported.
*
* @return Compressor
*/
* Create a new {@link Compressor} for use by this {@link CompressionCodec}.
*
* @return a new compressor for use by this codec
*/
@Override
public Compressor createCompressor() {
return new BZip2DummyCompressor();
return Bzip2Factory.getBzip2Compressor(conf);
}
/**
* Creates CompressionInputStream to be used to read off uncompressed data.
*
* @param in
* The InputStream
* @return Returns CompressionInputStream for BZip2
* @throws java.io.IOException
* Throws IOException
*/
* Create a {@link CompressionInputStream} that will read from the given
* input stream and return a stream for uncompressed data.
*
* @param in the stream to read compressed bytes from
* @return a stream to read uncompressed bytes from
* @throws IOException
*/
@Override
public CompressionInputStream createInputStream(InputStream in)
throws IOException {
return new BZip2CompressionInputStream(in);
return createInputStream(in, createDecompressor());
}
/**
* This functionality is currently not supported.
*
* @return CompressionInputStream
*/
* Create a {@link CompressionInputStream} that will read from the given
* {@link InputStream} with the given {@link Decompressor}, and return a
* stream for uncompressed data.
*
* @param in the stream to read compressed bytes from
* @param decompressor decompressor to use
* @return a stream to read uncompressed bytes from
* @throws IOException
*/
@Override
public CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor) throws IOException {
return createInputStream(in);
return Bzip2Factory.isNativeBzip2Loaded(conf) ?
new DecompressorStream(in, decompressor,
conf.getInt("io.file.buffer.size", 4*1024)) :
new BZip2CompressionInputStream(in);
}
/**
@ -139,7 +187,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
*
* @return CompressionInputStream for BZip2 aligned at block boundaries
*/
@Override
public SplitCompressionInputStream createInputStream(InputStream seekableIn,
Decompressor decompressor, long start, long end, READ_MODE readMode)
throws IOException {
@ -184,23 +231,23 @@ public class BZip2Codec implements SplittableCompressionCodec {
}
/**
* This functionality is currently not supported.
*
* @return BZip2DummyDecompressor.class
*/
* Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
*
* @return the type of decompressor needed by this codec.
*/
@Override
public Class<? extends org.apache.hadoop.io.compress.Decompressor> getDecompressorType() {
return BZip2DummyDecompressor.class;
public Class<? extends Decompressor> getDecompressorType() {
return Bzip2Factory.getBzip2DecompressorType(conf);
}
/**
* This functionality is currently not supported.
*
* @return Decompressor
*/
* Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
*
* @return a new decompressor for use by this codec
*/
@Override
public Decompressor createDecompressor() {
return new BZip2DummyDecompressor();
return Bzip2Factory.getBzip2Decompressor(conf);
}
/**
@ -236,7 +283,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
}
}
@Override
public void finish() throws IOException {
if (needsReset) {
// In the case that nothing is written to this stream, we still need to
@ -256,14 +302,12 @@ public class BZip2Codec implements SplittableCompressionCodec {
}
}
@Override
public void resetState() throws IOException {
// Cannot write to out at this point because out might not be ready
// yet, as in SequenceFile.Writer implementation.
needsReset = true;
}
@Override
public void write(int b) throws IOException {
if (needsReset) {
internalReset();
@ -271,7 +315,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
this.output.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
if (needsReset) {
internalReset();
@ -279,7 +322,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
this.output.write(b, off, len);
}
@Override
public void close() throws IOException {
if (needsReset) {
// In the case that nothing is written to this stream, we still need to
@ -397,7 +439,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
}// end of method
@Override
public void close() throws IOException {
if (!needsReset) {
input.close();
@ -433,7 +474,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
*
*/
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (needsReset) {
internalReset();
@ -457,7 +497,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
}
@Override
public int read() throws IOException {
byte b[] = new byte[1];
int result = this.read(b, 0, 1);
@ -472,7 +511,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
}
}
@Override
public void resetState() throws IOException {
// Cannot read from bufferedIn at this point because bufferedIn
// might not be ready
@ -480,7 +518,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
needsReset = true;
}
@Override
public long getPos() {
return this.compressedStreamPosition;
}

View File

@ -0,0 +1,301 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress.bzip2;
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A {@link Compressor} based on the popular
* bzip2 compression algorithm.
* http://www.bzip2.org/
*
*/
public class Bzip2Compressor implements Compressor {
private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
// The default values for the block size and work factor are the same
// those in Julian Seward's original bzip2 implementation.
static final int DEFAULT_BLOCK_SIZE = 9;
static final int DEFAULT_WORK_FACTOR = 30;
private static final Log LOG = LogFactory.getLog(Bzip2Compressor.class);
// HACK - Use this as a global lock in the JNI layer.
private static Class<Bzip2Compressor> clazz = Bzip2Compressor.class;
private long stream;
private int blockSize;
private int workFactor;
private int directBufferSize;
private byte[] userBuf = null;
private int userBufOff = 0, userBufLen = 0;
private Buffer uncompressedDirectBuf = null;
private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0;
private boolean keepUncompressedBuf = false;
private Buffer compressedDirectBuf = null;
private boolean finish, finished;
/**
* Creates a new compressor with a default values for the
* compression block size and work factor. Compressed data will be
* generated in bzip2 format.
*/
public Bzip2Compressor() {
this(DEFAULT_BLOCK_SIZE, DEFAULT_WORK_FACTOR, DEFAULT_DIRECT_BUFFER_SIZE);
}
/**
* Creates a new compressor, taking settings from the configuration.
*/
public Bzip2Compressor(Configuration conf) {
this(Bzip2Factory.getBlockSize(conf),
Bzip2Factory.getWorkFactor(conf),
DEFAULT_DIRECT_BUFFER_SIZE);
}
/**
* Creates a new compressor using the specified block size.
* Compressed data will be generated in bzip2 format.
*
* @param blockSize The block size to be used for compression. This is
* an integer from 1 through 9, which is multiplied by 100,000 to
* obtain the actual block size in bytes.
* @param workFactor This parameter is a threshold that determines when a
* fallback algorithm is used for pathological data. It ranges from
* 0 to 250.
* @param directBufferSize Size of the direct buffer to be used.
*/
public Bzip2Compressor(int blockSize, int workFactor,
int directBufferSize) {
this.blockSize = blockSize;
this.workFactor = workFactor;
this.directBufferSize = directBufferSize;
stream = init(blockSize, workFactor);
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
compressedDirectBuf.position(directBufferSize);
}
/**
* Prepare the compressor to be used in a new stream with settings defined in
* the given Configuration. It will reset the compressor's block size and
* and work factor.
*
* @param conf Configuration storing new settings
*/
@Override
public synchronized void reinit(Configuration conf) {
reset();
end(stream);
if (conf == null) {
stream = init(blockSize, workFactor);
return;
}
blockSize = Bzip2Factory.getBlockSize(conf);
workFactor = Bzip2Factory.getWorkFactor(conf);
stream = init(blockSize, workFactor);
if(LOG.isDebugEnabled()) {
LOG.debug("Reinit compressor with new compression configuration");
}
}
@Override
public synchronized void setInput(byte[] b, int off, int len) {
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
this.userBuf = b;
this.userBufOff = off;
this.userBufLen = len;
uncompressedDirectBufOff = 0;
setInputFromSavedData();
// Reinitialize bzip2's output direct buffer.
compressedDirectBuf.limit(directBufferSize);
compressedDirectBuf.position(directBufferSize);
}
// Copy enough data from userBuf to uncompressedDirectBuf.
synchronized void setInputFromSavedData() {
int len = Math.min(userBufLen, uncompressedDirectBuf.remaining());
((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff, len);
userBufLen -= len;
userBufOff += len;
uncompressedDirectBufLen = uncompressedDirectBuf.position();
}
@Override
public synchronized void setDictionary(byte[] b, int off, int len) {
throw new UnsupportedOperationException();
}
@Override
public synchronized boolean needsInput() {
// Compressed data still available?
if (compressedDirectBuf.remaining() > 0) {
return false;
}
// Uncompressed data available in either the direct buffer or user buffer?
if (keepUncompressedBuf && uncompressedDirectBufLen > 0)
return false;
if (uncompressedDirectBuf.remaining() > 0) {
// Check if we have consumed all data in the user buffer.
if (userBufLen <= 0) {
return true;
} else {
// Copy enough data from userBuf to uncompressedDirectBuf.
setInputFromSavedData();
return uncompressedDirectBuf.remaining() > 0;
}
}
return false;
}
@Override
public synchronized void finish() {
finish = true;
}
@Override
public synchronized boolean finished() {
// Check if bzip2 says it has finished and
// all compressed data has been consumed.
return (finished && compressedDirectBuf.remaining() == 0);
}
@Override
public synchronized int compress(byte[] b, int off, int len)
throws IOException {
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
// Check if there is compressed data.
int n = compressedDirectBuf.remaining();
if (n > 0) {
n = Math.min(n, len);
((ByteBuffer)compressedDirectBuf).get(b, off, n);
return n;
}
// Re-initialize bzip2's output direct buffer.
compressedDirectBuf.rewind();
compressedDirectBuf.limit(directBufferSize);
// Compress the data.
n = deflateBytesDirect();
compressedDirectBuf.limit(n);
// Check if bzip2 has consumed the entire input buffer.
// Set keepUncompressedBuf properly.
if (uncompressedDirectBufLen <= 0) { // bzip2 consumed all input
keepUncompressedBuf = false;
uncompressedDirectBuf.clear();
uncompressedDirectBufOff = 0;
uncompressedDirectBufLen = 0;
} else {
keepUncompressedBuf = true;
}
// Get at most 'len' bytes.
n = Math.min(n, len);
((ByteBuffer)compressedDirectBuf).get(b, off, n);
return n;
}
/**
* Returns the total number of compressed bytes output so far.
*
* @return the total (non-negative) number of compressed bytes output so far
*/
@Override
public synchronized long getBytesWritten() {
checkStream();
return getBytesWritten(stream);
}
/**
* Returns the total number of uncompressed bytes input so far.</p>
*
* @return the total (non-negative) number of uncompressed bytes input so far
*/
@Override
public synchronized long getBytesRead() {
checkStream();
return getBytesRead(stream);
}
@Override
public synchronized void reset() {
checkStream();
end(stream);
stream = init(blockSize, workFactor);
finish = false;
finished = false;
uncompressedDirectBuf.rewind();
uncompressedDirectBufOff = uncompressedDirectBufLen = 0;
keepUncompressedBuf = false;
compressedDirectBuf.limit(directBufferSize);
compressedDirectBuf.position(directBufferSize);
userBufOff = userBufLen = 0;
}
@Override
public synchronized void end() {
if (stream != 0) {
end(stream);
stream = 0;
}
}
static void initSymbols(String libname) {
initIDs(libname);
}
private void checkStream() {
if (stream == 0)
throw new NullPointerException();
}
private native static void initIDs(String libname);
private native static long init(int blockSize, int workFactor);
private native int deflateBytesDirect();
private native static long getBytesRead(long strm);
private native static long getBytesWritten(long strm);
private native static void end(long strm);
}

View File

@ -0,0 +1,250 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress.bzip2;
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A {@link Decompressor} based on the popular
* bzip2 compression algorithm.
* http://www.bzip2.org/
*
*/
public class Bzip2Decompressor implements Decompressor {
private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
private static final Log LOG = LogFactory.getLog(Bzip2Decompressor.class);
// HACK - Use this as a global lock in the JNI layer.
private static Class<Bzip2Decompressor> clazz = Bzip2Decompressor.class;
private long stream;
private boolean conserveMemory;
private int directBufferSize;
private Buffer compressedDirectBuf = null;
private int compressedDirectBufOff, compressedDirectBufLen;
private Buffer uncompressedDirectBuf = null;
private byte[] userBuf = null;
private int userBufOff = 0, userBufLen = 0;
private boolean finished;
/**
* Creates a new decompressor.
*/
public Bzip2Decompressor(boolean conserveMemory, int directBufferSize) {
this.conserveMemory = conserveMemory;
this.directBufferSize = directBufferSize;
compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
uncompressedDirectBuf.position(directBufferSize);
stream = init(conserveMemory ? 1 : 0);
}
public Bzip2Decompressor() {
this(false, DEFAULT_DIRECT_BUFFER_SIZE);
}
@Override
public synchronized void setInput(byte[] b, int off, int len) {
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
this.userBuf = b;
this.userBufOff = off;
this.userBufLen = len;
setInputFromSavedData();
// Reinitialize bzip2's output direct buffer.
uncompressedDirectBuf.limit(directBufferSize);
uncompressedDirectBuf.position(directBufferSize);
}
synchronized void setInputFromSavedData() {
compressedDirectBufOff = 0;
compressedDirectBufLen = userBufLen;
if (compressedDirectBufLen > directBufferSize) {
compressedDirectBufLen = directBufferSize;
}
// Reinitialize bzip2's input direct buffer.
compressedDirectBuf.rewind();
((ByteBuffer)compressedDirectBuf).put(userBuf, userBufOff,
compressedDirectBufLen);
// Note how much data is being fed to bzip2.
userBufOff += compressedDirectBufLen;
userBufLen -= compressedDirectBufLen;
}
@Override
public synchronized void setDictionary(byte[] b, int off, int len) {
throw new UnsupportedOperationException();
}
@Override
public synchronized boolean needsInput() {
// Consume remaining compressed data?
if (uncompressedDirectBuf.remaining() > 0) {
return false;
}
// Check if bzip2 has consumed all input.
if (compressedDirectBufLen <= 0) {
// Check if we have consumed all user-input.
if (userBufLen <= 0) {
return true;
} else {
setInputFromSavedData();
}
}
return false;
}
@Override
public synchronized boolean needsDictionary() {
return false;
}
@Override
public synchronized boolean finished() {
// Check if bzip2 says it has finished and
// all compressed data has been consumed.
return (finished && uncompressedDirectBuf.remaining() == 0);
}
@Override
public synchronized int decompress(byte[] b, int off, int len)
throws IOException {
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
// Check if there is uncompressed data.
int n = uncompressedDirectBuf.remaining();
if (n > 0) {
n = Math.min(n, len);
((ByteBuffer)uncompressedDirectBuf).get(b, off, n);
return n;
}
// Re-initialize bzip2's output direct buffer.
uncompressedDirectBuf.rewind();
uncompressedDirectBuf.limit(directBufferSize);
// Decompress the data.
n = finished ? 0 : inflateBytesDirect();
uncompressedDirectBuf.limit(n);
// Get at most 'len' bytes.
n = Math.min(n, len);
((ByteBuffer)uncompressedDirectBuf).get(b, off, n);
return n;
}
/**
* Returns the total number of uncompressed bytes output so far.
*
* @return the total (non-negative) number of uncompressed bytes output so far
*/
public synchronized long getBytesWritten() {
checkStream();
return getBytesWritten(stream);
}
/**
* Returns the total number of compressed bytes input so far.</p>
*
* @return the total (non-negative) number of compressed bytes input so far
*/
public synchronized long getBytesRead() {
checkStream();
return getBytesRead(stream);
}
/**
* Returns the number of bytes remaining in the input buffers; normally
* called when finished() is true to determine amount of post-gzip-stream
* data.</p>
*
* @return the total (non-negative) number of unprocessed bytes in input
*/
@Override
public synchronized int getRemaining() {
checkStream();
return userBufLen + getRemaining(stream); // userBuf + compressedDirectBuf
}
/**
* Resets everything including the input buffers (user and direct).</p>
*/
@Override
public synchronized void reset() {
checkStream();
end(stream);
stream = init(conserveMemory ? 1 : 0);
finished = false;
compressedDirectBufOff = compressedDirectBufLen = 0;
uncompressedDirectBuf.limit(directBufferSize);
uncompressedDirectBuf.position(directBufferSize);
userBufOff = userBufLen = 0;
}
@Override
public synchronized void end() {
if (stream != 0) {
end(stream);
stream = 0;
}
}
static void initSymbols(String libname) {
initIDs(libname);
}
private void checkStream() {
if (stream == 0)
throw new NullPointerException();
}
private native static void initIDs(String libname);
private native static long init(int conserveMemory);
private native int inflateBytesDirect();
private native static long getBytesRead(long strm);
private native static long getBytesWritten(long strm);
private native static int getRemaining(long strm);
private native static void end(long strm);
}

View File

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress.bzip2;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.bzip2.Bzip2Compressor;
import org.apache.hadoop.io.compress.bzip2.Bzip2Decompressor;
import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor;
import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor;
/**
* A collection of factories to create the right
* bzip2 compressor/decompressor instances.
*
*/
public class Bzip2Factory {
private static final Log LOG = LogFactory.getLog(Bzip2Factory.class);
private static String bzip2LibraryName = "";
private static boolean nativeBzip2Loaded;
/**
* Check if native-bzip2 code is loaded & initialized correctly and
* can be loaded for this job.
*
* @param conf configuration
* @return <code>true</code> if native-bzip2 is loaded & initialized
* and can be loaded for this job, else <code>false</code>
*/
public static boolean isNativeBzip2Loaded(Configuration conf) {
String libname = conf.get("io.compression.codec.bzip2.library",
"system-native");
if (!bzip2LibraryName.equals(libname)) {
nativeBzip2Loaded = false;
bzip2LibraryName = libname;
if (libname.equals("java-builtin")) {
LOG.info("Using pure-Java version of bzip2 library");
} else if (conf.getBoolean(
CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY,
CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_DEFAULT) &&
NativeCodeLoader.isNativeCodeLoaded()) {
try {
// Initialize the native library.
Bzip2Compressor.initSymbols(libname);
Bzip2Decompressor.initSymbols(libname);
nativeBzip2Loaded = true;
LOG.info("Successfully loaded & initialized native-bzip2 library " +
libname);
} catch (Throwable t) {
LOG.warn("Failed to load/initialize native-bzip2 library " +
libname + ", will use pure-Java version");
}
}
}
return nativeBzip2Loaded;
}
/**
* Return the appropriate type of the bzip2 compressor.
*
* @param conf configuration
* @return the appropriate type of the bzip2 compressor.
*/
public static Class<? extends Compressor>
getBzip2CompressorType(Configuration conf) {
return isNativeBzip2Loaded(conf) ?
Bzip2Compressor.class : BZip2DummyCompressor.class;
}
/**
* Return the appropriate implementation of the bzip2 compressor.
*
* @param conf configuration
* @return the appropriate implementation of the bzip2 compressor.
*/
public static Compressor getBzip2Compressor(Configuration conf) {
return isNativeBzip2Loaded(conf)?
new Bzip2Compressor(conf) : new BZip2DummyCompressor();
}
/**
* Return the appropriate type of the bzip2 decompressor.
*
* @param conf configuration
* @return the appropriate type of the bzip2 decompressor.
*/
public static Class<? extends Decompressor>
getBzip2DecompressorType(Configuration conf) {
return isNativeBzip2Loaded(conf) ?
Bzip2Decompressor.class : BZip2DummyDecompressor.class;
}
/**
* Return the appropriate implementation of the bzip2 decompressor.
*
* @param conf configuration
* @return the appropriate implementation of the bzip2 decompressor.
*/
public static Decompressor getBzip2Decompressor(Configuration conf) {
return isNativeBzip2Loaded(conf) ?
new Bzip2Decompressor() : new BZip2DummyDecompressor();
}
public static void setBlockSize(Configuration conf, int blockSize) {
conf.setInt("bzip2.compress.blocksize", blockSize);
}
public static int getBlockSize(Configuration conf) {
return conf.getInt("bzip2.compress.blocksize",
Bzip2Compressor.DEFAULT_BLOCK_SIZE);
}
public static void setWorkFactor(Configuration conf, int workFactor) {
conf.setInt("bzip2.compress.workfactor", workFactor);
}
public static int getWorkFactor(Configuration conf) {
return conf.getInt("bzip2.compress.workfactor",
Bzip2Compressor.DEFAULT_WORK_FACTOR);
}
}

View File

@ -0,0 +1,245 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <config.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <dlfcn.h>
#include "org_apache_hadoop_io_compress_bzip2.h"
#include "org_apache_hadoop_io_compress_bzip2_Bzip2Compressor.h"
static jfieldID Bzip2Compressor_clazz;
static jfieldID Bzip2Compressor_stream;
static jfieldID Bzip2Compressor_uncompressedDirectBuf;
static jfieldID Bzip2Compressor_uncompressedDirectBufOff;
static jfieldID Bzip2Compressor_uncompressedDirectBufLen;
static jfieldID Bzip2Compressor_compressedDirectBuf;
static jfieldID Bzip2Compressor_directBufferSize;
static jfieldID Bzip2Compressor_finish;
static jfieldID Bzip2Compressor_finished;
static int (*dlsym_BZ2_bzCompressInit)(bz_stream*, int, int, int);
static int (*dlsym_BZ2_bzCompress)(bz_stream*, int);
static int (*dlsym_BZ2_bzCompressEnd)(bz_stream*);
JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_initIDs(
JNIEnv *env, jclass class, jstring libname)
{
const char* bzlib_name = (*env)->GetStringUTFChars(env, libname, NULL);
if (strcmp(bzlib_name, "system-native") == 0)
bzlib_name = HADOOP_BZIP2_LIBRARY;
// Load the native library.
void *libbz2 = dlopen(bzlib_name, RTLD_LAZY | RTLD_GLOBAL);
if (!libbz2) {
THROW(env, "java/lang/UnsatisfiedLinkError",
"Cannot load bzip2 native library");
return;
}
// Locate the requisite symbols from libbz2.so.
dlerror(); // Clear any existing error.
LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzCompressInit, env, libbz2,
"BZ2_bzCompressInit");
LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzCompress, env, libbz2,
"BZ2_bzCompress");
LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzCompressEnd, env, libbz2,
"BZ2_bzCompressEnd");
// Initialize the requisite fieldIds.
Bzip2Compressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz",
"Ljava/lang/Class;");
Bzip2Compressor_stream = (*env)->GetFieldID(env, class, "stream", "J");
Bzip2Compressor_finish = (*env)->GetFieldID(env, class, "finish", "Z");
Bzip2Compressor_finished = (*env)->GetFieldID(env, class, "finished", "Z");
Bzip2Compressor_uncompressedDirectBuf = (*env)->GetFieldID(env, class,
"uncompressedDirectBuf",
"Ljava/nio/Buffer;");
Bzip2Compressor_uncompressedDirectBufOff = (*env)->GetFieldID(env, class,
"uncompressedDirectBufOff",
"I");
Bzip2Compressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, class,
"uncompressedDirectBufLen",
"I");
Bzip2Compressor_compressedDirectBuf = (*env)->GetFieldID(env, class,
"compressedDirectBuf",
"Ljava/nio/Buffer;");
Bzip2Compressor_directBufferSize = (*env)->GetFieldID(env, class,
"directBufferSize", "I");
}
JNIEXPORT jlong JNICALL
Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_init(
JNIEnv *env, jclass class, jint blockSize, jint workFactor)
{
// Create a bz_stream.
bz_stream *stream = malloc(sizeof(bz_stream));
if (!stream) {
THROW(env, "java/lang/OutOfMemoryError", NULL);
return (jlong)0;
}
memset((void*)stream, 0, sizeof(bz_stream));
// Initialize stream.
int rv = (*dlsym_BZ2_bzCompressInit)(stream, blockSize, 0, workFactor);
if (rv != BZ_OK) {
// Contingency - Report error by throwing appropriate exceptions.
free(stream);
stream = NULL;
switch (rv) {
case BZ_MEM_ERROR:
{
THROW(env, "java/lang/OutOfMemoryError", NULL);
}
break;
case BZ_PARAM_ERROR:
{
THROW(env,
"java/lang/IllegalArgumentException",
NULL);
}
break;
default:
{
THROW(env, "java/lang/InternalError", NULL);
}
break;
}
}
return JLONG(stream);
}
JNIEXPORT jint JNICALL
Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_deflateBytesDirect(
JNIEnv *env, jobject this)
{
// Get members of Bzip2Compressor.
bz_stream *stream = BZSTREAM((*env)->GetLongField(env, this,
Bzip2Compressor_stream));
if (!stream) {
THROW(env, "java/lang/NullPointerException", NULL);
return (jint)0;
}
jobject clazz = (*env)->GetStaticObjectField(env, this,
Bzip2Compressor_clazz);
jobject uncompressed_direct_buf = (*env)->GetObjectField(env, this,
Bzip2Compressor_uncompressedDirectBuf);
jint uncompressed_direct_buf_off = (*env)->GetIntField(env, this,
Bzip2Compressor_uncompressedDirectBufOff);
jint uncompressed_direct_buf_len = (*env)->GetIntField(env, this,
Bzip2Compressor_uncompressedDirectBufLen);
jobject compressed_direct_buf = (*env)->GetObjectField(env, this,
Bzip2Compressor_compressedDirectBuf);
jint compressed_direct_buf_len = (*env)->GetIntField(env, this,
Bzip2Compressor_directBufferSize);
jboolean finish = (*env)->GetBooleanField(env, this,
Bzip2Compressor_finish);
// Get the input and output direct buffers.
LOCK_CLASS(env, clazz, "Bzip2Compressor");
char* uncompressed_bytes = (*env)->GetDirectBufferAddress(env,
uncompressed_direct_buf);
char* compressed_bytes = (*env)->GetDirectBufferAddress(env,
compressed_direct_buf);
UNLOCK_CLASS(env, clazz, "Bzip2Compressor");
if (!uncompressed_bytes || !compressed_bytes) {
return (jint)0;
}
// Re-calibrate the bz_stream.
stream->next_in = uncompressed_bytes + uncompressed_direct_buf_off;
stream->avail_in = uncompressed_direct_buf_len;
stream->next_out = compressed_bytes;
stream->avail_out = compressed_direct_buf_len;
// Compress.
int rv = dlsym_BZ2_bzCompress(stream, finish ? BZ_FINISH : BZ_RUN);
jint no_compressed_bytes = 0;
switch (rv) {
// Contingency? - Report error by throwing appropriate exceptions.
case BZ_STREAM_END:
{
(*env)->SetBooleanField(env, this,
Bzip2Compressor_finished,
JNI_TRUE);
} // cascade
case BZ_RUN_OK:
case BZ_FINISH_OK:
{
uncompressed_direct_buf_off +=
uncompressed_direct_buf_len - stream->avail_in;
(*env)->SetIntField(env, this,
Bzip2Compressor_uncompressedDirectBufOff,
uncompressed_direct_buf_off);
(*env)->SetIntField(env, this,
Bzip2Compressor_uncompressedDirectBufLen,
stream->avail_in);
no_compressed_bytes =
compressed_direct_buf_len - stream->avail_out;
}
break;
default:
{
THROW(env, "java/lang/InternalError", NULL);
}
break;
}
return no_compressed_bytes;
}
JNIEXPORT jlong JNICALL
Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_getBytesRead(
JNIEnv *env, jclass class, jlong stream)
{
const bz_stream* strm = BZSTREAM(stream);
return ((jlong)strm->total_in_hi32 << 32) | strm->total_in_lo32;
}
JNIEXPORT jlong JNICALL
Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_getBytesWritten(
JNIEnv *env, jclass class, jlong stream)
{
const bz_stream* strm = BZSTREAM(stream);
return ((jlong)strm->total_out_hi32 << 32) | strm->total_out_lo32;
}
JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_end(
JNIEnv *env, jclass class, jlong stream)
{
if (dlsym_BZ2_bzCompressEnd(BZSTREAM(stream)) != BZ_OK) {
THROW(env, "java/lang/InternalError", NULL);
} else {
free(BZSTREAM(stream));
}
}
/**
* vim: sw=2: ts=2: et:
*/

View File

@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <config.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <dlfcn.h>
#include "org_apache_hadoop_io_compress_bzip2.h"
#include "org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor.h"
static jfieldID Bzip2Decompressor_clazz;
static jfieldID Bzip2Decompressor_stream;
static jfieldID Bzip2Decompressor_compressedDirectBuf;
static jfieldID Bzip2Decompressor_compressedDirectBufOff;
static jfieldID Bzip2Decompressor_compressedDirectBufLen;
static jfieldID Bzip2Decompressor_uncompressedDirectBuf;
static jfieldID Bzip2Decompressor_directBufferSize;
static jfieldID Bzip2Decompressor_finished;
static int (*dlsym_BZ2_bzDecompressInit)(bz_stream*, int, int);
static int (*dlsym_BZ2_bzDecompress)(bz_stream*);
static int (*dlsym_BZ2_bzDecompressEnd)(bz_stream*);
JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_initIDs(
JNIEnv *env, jclass class, jstring libname)
{
const char* bzlib_name = (*env)->GetStringUTFChars(env, libname, NULL);
if (strcmp(bzlib_name, "system-native") == 0)
bzlib_name = HADOOP_BZIP2_LIBRARY;
// Load the native library.
void *libbz2 = dlopen(bzlib_name, RTLD_LAZY | RTLD_GLOBAL);
if (!libbz2) {
THROW(env, "java/lang/UnsatisfiedLinkError",
"Cannot load bzip2 native library");
return;
}
// Locate the requisite symbols from libbz2.so.
dlerror(); // Clear any existing error.
LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzDecompressInit, env, libbz2,
"BZ2_bzDecompressInit");
LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzDecompress, env, libbz2,
"BZ2_bzDecompress");
LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzDecompressEnd, env, libbz2,
"BZ2_bzDecompressEnd");
// Initialize the requisite fieldIds.
Bzip2Decompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz",
"Ljava/lang/Class;");
Bzip2Decompressor_stream = (*env)->GetFieldID(env, class, "stream", "J");
Bzip2Decompressor_finished = (*env)->GetFieldID(env, class,
"finished", "Z");
Bzip2Decompressor_compressedDirectBuf = (*env)->GetFieldID(env, class,
"compressedDirectBuf",
"Ljava/nio/Buffer;");
Bzip2Decompressor_compressedDirectBufOff = (*env)->GetFieldID(env, class,
"compressedDirectBufOff", "I");
Bzip2Decompressor_compressedDirectBufLen = (*env)->GetFieldID(env, class,
"compressedDirectBufLen", "I");
Bzip2Decompressor_uncompressedDirectBuf = (*env)->GetFieldID(env, class,
"uncompressedDirectBuf",
"Ljava/nio/Buffer;");
Bzip2Decompressor_directBufferSize = (*env)->GetFieldID(env, class,
"directBufferSize", "I");
}
JNIEXPORT jlong JNICALL
Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_init(
JNIEnv *env, jclass cls, jint conserveMemory)
{
bz_stream *stream = malloc(sizeof(bz_stream));
if (stream == 0) {
THROW(env, "java/lang/OutOfMemoryError", NULL);
return (jlong)0;
}
memset((void*)stream, 0, sizeof(bz_stream));
int rv = dlsym_BZ2_bzDecompressInit(stream, 0, conserveMemory);
if (rv != BZ_OK) {
// Contingency - Report error by throwing appropriate exceptions.
free(stream);
stream = NULL;
switch (rv) {
case BZ_MEM_ERROR:
{
THROW(env, "java/lang/OutOfMemoryError", NULL);
}
break;
default:
{
THROW(env, "java/lang/InternalError", NULL);
}
break;
}
}
return JLONG(stream);
}
JNIEXPORT jint JNICALL
Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_inflateBytesDirect(
JNIEnv *env, jobject this)
{
// Get members of Bzip2Decompressor.
bz_stream *stream = BZSTREAM((*env)->GetLongField(env, this,
Bzip2Decompressor_stream));
if (!stream) {
THROW(env, "java/lang/NullPointerException", NULL);
return (jint)0;
}
jobject clazz = (*env)->GetStaticObjectField(env, this,
Bzip2Decompressor_clazz);
jarray compressed_direct_buf = (jarray)(*env)->GetObjectField(env,
this, Bzip2Decompressor_compressedDirectBuf);
jint compressed_direct_buf_off = (*env)->GetIntField(env, this,
Bzip2Decompressor_compressedDirectBufOff);
jint compressed_direct_buf_len = (*env)->GetIntField(env, this,
Bzip2Decompressor_compressedDirectBufLen);
jarray uncompressed_direct_buf = (jarray)(*env)->GetObjectField(env,
this, Bzip2Decompressor_uncompressedDirectBuf);
jint uncompressed_direct_buf_len = (*env)->GetIntField(env, this,
Bzip2Decompressor_directBufferSize);
// Get the input and output direct buffers.
LOCK_CLASS(env, clazz, "Bzip2Decompressor");
char* compressed_bytes = (*env)->GetDirectBufferAddress(env,
compressed_direct_buf);
char* uncompressed_bytes = (*env)->GetDirectBufferAddress(env,
uncompressed_direct_buf);
UNLOCK_CLASS(env, clazz, "Bzip2Decompressor");
if (!compressed_bytes || !uncompressed_bytes) {
return (jint)0;
}
// Re-calibrate the bz_stream.
stream->next_in = compressed_bytes + compressed_direct_buf_off;
stream->avail_in = compressed_direct_buf_len;
stream->next_out = uncompressed_bytes;
stream->avail_out = uncompressed_direct_buf_len;
// Decompress.
int rv = dlsym_BZ2_bzDecompress(stream);
// Contingency? - Report error by throwing appropriate exceptions.
int no_decompressed_bytes = 0;
switch (rv) {
case BZ_STREAM_END:
{
(*env)->SetBooleanField(env, this,
Bzip2Decompressor_finished,
JNI_TRUE);
} // cascade down
case BZ_OK:
{
compressed_direct_buf_off +=
compressed_direct_buf_len - stream->avail_in;
(*env)->SetIntField(env, this,
Bzip2Decompressor_compressedDirectBufOff,
compressed_direct_buf_off);
(*env)->SetIntField(env, this,
Bzip2Decompressor_compressedDirectBufLen,
stream->avail_in);
no_decompressed_bytes =
uncompressed_direct_buf_len - stream->avail_out;
}
break;
case BZ_DATA_ERROR:
case BZ_DATA_ERROR_MAGIC:
{
THROW(env, "java/io/IOException", NULL);
}
break;
case BZ_MEM_ERROR:
{
THROW(env, "java/lang/OutOfMemoryError", NULL);
}
break;
default:
{
THROW(env, "java/lang/InternalError", NULL);
}
break;
}
return no_decompressed_bytes;
}
JNIEXPORT jlong JNICALL
Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_getBytesRead(
JNIEnv *env, jclass cls, jlong stream)
{
const bz_stream* strm = BZSTREAM(stream);
return ((jlong)strm->total_in_hi32 << 32) | strm->total_in_lo32;
}
JNIEXPORT jlong JNICALL
Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_getBytesWritten(
JNIEnv *env, jclass cls, jlong stream)
{
const bz_stream* strm = BZSTREAM(stream);
return ((jlong)strm->total_out_hi32 << 32) | strm->total_out_lo32;
}
JNIEXPORT jint JNICALL
Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_getRemaining(
JNIEnv *env, jclass cls, jlong stream)
{
return (BZSTREAM(stream))->avail_in;
}
JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_end(
JNIEnv *env, jclass cls, jlong stream)
{
if (dlsym_BZ2_bzDecompressEnd(BZSTREAM(stream)) != BZ_OK) {
THROW(env, "java/lang/InternalError", 0);
} else {
free(BZSTREAM(stream));
}
}
/**
* vim: sw=2: ts=2: et:
*/

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if !defined ORG_APACHE_HADOOP_IO_COMPRESS_BZIP2_BZIP2_H
#define ORG_APACHE_HADOOP_IO_COMPRESS_BZIP2_BZIP2_H
#include <config.h>
#include <stddef.h>
#include <bzlib.h>
#include <dlfcn.h>
#include <jni.h>
#include "org_apache_hadoop.h"
#define HADOOP_BZIP2_LIBRARY "libbz2.so.1"
/* A helper macro to convert the java 'stream-handle' to a bz_stream pointer. */
#define BZSTREAM(stream) ((bz_stream*)((ptrdiff_t)(stream)))
/* A helper macro to convert the bz_stream pointer to the java 'stream-handle'. */
#define JLONG(stream) ((jlong)((ptrdiff_t)(stream)))
#endif //ORG_APACHE_HADOOP_IO_COMPRESS_BZIP2_BZIP2_H

View File

@ -309,6 +309,20 @@
are discovered using a Java ServiceLoader.</description>
</property>
<property>
<name>io.compression.codec.bzip2.library</name>
<value>system-native</value>
<description>The native-code library to be used for compression and
decompression by the bzip2 codec. This library could be specified
either by by name or the full pathname. In the former case, the
library is located by the dynamic linker, usually searching the
directories specified in the environment variable LD_LIBRARY_PATH.
The value of "system-native" indicates that the default system
library should be used. To indicate that the algorithm should
operate entirely in Java, specify "java-builtin".</description>
</property>
<property>
<name>io.serializations</name>
<value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization</value>

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.io.compress.zlib.ZlibCompressor;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.util.ReflectionUtils;
@ -92,12 +93,33 @@ public class TestCodec {
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec");
}
@Test
@Test(timeout=20000)
public void testBZip2Codec() throws IOException {
Configuration conf = new Configuration();
conf.set("io.compression.codec.bzip2.library", "java-builtin");
codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.BZip2Codec");
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");
}
@Test(timeout=20000)
public void testBZip2NativeCodec() throws IOException {
Configuration conf = new Configuration();
conf.set("io.compression.codec.bzip2.library", "system-native");
if (NativeCodeLoader.isNativeCodeLoaded()) {
if (Bzip2Factory.isNativeBzip2Loaded(conf)) {
codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.BZip2Codec");
codecTest(conf, seed, count,
"org.apache.hadoop.io.compress.BZip2Codec");
conf.set("io.compression.codec.bzip2.library", "java-builtin");
codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.BZip2Codec");
codecTest(conf, seed, count,
"org.apache.hadoop.io.compress.BZip2Codec");
} else {
LOG.warn("Native hadoop library available but native bzip2 is not");
}
}
}
@Test
public void testSnappyCodec() throws IOException {
if (SnappyCodec.isNativeCodeLoaded()) {
@ -446,14 +468,37 @@ public class TestCodec {
sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.DefaultCodec", 1000000);
}
@Test
@Test(timeout=20000)
public void testSequenceFileBZip2Codec() throws IOException, ClassNotFoundException,
InstantiationException, IllegalAccessException {
Configuration conf = new Configuration();
conf.set("io.compression.codec.bzip2.library", "java-builtin");
sequenceFileCodecTest(conf, 0, "org.apache.hadoop.io.compress.BZip2Codec", 100);
sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.BZip2Codec", 100);
sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.BZip2Codec", 1000000);
}
@Test(timeout=20000)
public void testSequenceFileBZip2NativeCodec() throws IOException,
ClassNotFoundException, InstantiationException,
IllegalAccessException {
Configuration conf = new Configuration();
conf.set("io.compression.codec.bzip2.library", "system-native");
if (NativeCodeLoader.isNativeCodeLoaded()) {
if (Bzip2Factory.isNativeBzip2Loaded(conf)) {
sequenceFileCodecTest(conf, 0,
"org.apache.hadoop.io.compress.BZip2Codec", 100);
sequenceFileCodecTest(conf, 100,
"org.apache.hadoop.io.compress.BZip2Codec", 100);
sequenceFileCodecTest(conf, 200000,
"org.apache.hadoop.io.compress.BZip2Codec",
1000000);
} else {
LOG.warn("Native hadoop library available but native bzip2 is not");
}
}
}
@Test
public void testSequenceFileDeflateCodec() throws IOException, ClassNotFoundException,
InstantiationException, IllegalAccessException {