From 0cdaefae04f325d7b50cfc2fea58938800023cf6 Mon Sep 17 00:00:00 2001
From: Jason Darrell Lowe
Date: Wed, 6 Mar 2013 23:16:33 +0000
Subject: [PATCH] svn merge -c 1453608 FIXES: HADOOP-8462. Native-code
implementation of bzip2 codec. Contributed by Govind Kamat
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1453610 13f79535-47bb-0310-9956-ffa450edef68
---
.../hadoop-common/CHANGES.txt | 3 +
hadoop-common-project/hadoop-common/pom.xml | 5 +-
.../hadoop-common/src/CMakeLists.txt | 19 ++
.../hadoop-common/src/config.h.cmake | 1 +
.../apache/hadoop/io/compress/BZip2Codec.java | 175 ++++++----
.../io/compress/bzip2/Bzip2Compressor.java | 301 ++++++++++++++++++
.../io/compress/bzip2/Bzip2Decompressor.java | 250 +++++++++++++++
.../io/compress/bzip2/Bzip2Factory.java | 145 +++++++++
.../io/compress/bzip2/Bzip2Compressor.c | 245 ++++++++++++++
.../io/compress/bzip2/Bzip2Decompressor.c | 248 +++++++++++++++
.../org_apache_hadoop_io_compress_bzip2.h | 39 +++
.../src/main/resources/core-default.xml | 14 +
.../apache/hadoop/io/compress/TestCodec.java | 49 ++-
13 files changed, 1422 insertions(+), 72 deletions(-)
create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.java
create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.java
create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Factory.java
create mode 100644 hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.c
create mode 100644 hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.c
create mode 100644 hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/org_apache_hadoop_io_compress_bzip2.h
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index ebaa84149f7..f730b638e54 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -1074,6 +1074,9 @@ Release 0.23.7 - UNRELEASED
OPTIMIZATIONS
+ HADOOP-8462. Native-code implementation of bzip2 codec. (Govind Kamat via
+ jlowe)
+
BUG FIXES
HADOOP-9302. HDFS docs not linked from top level (Andy Isaacson via
diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index 7762b17a099..216583e3d9f 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -452,6 +452,7 @@
false
+ false
@@ -473,6 +474,8 @@
org.apache.hadoop.io.compress.zlib.ZlibCompressor
org.apache.hadoop.io.compress.zlib.ZlibDecompressor
+ org.apache.hadoop.io.compress.bzip2.Bzip2Compressor
+ org.apache.hadoop.io.compress.bzip2.Bzip2Decompressor
org.apache.hadoop.security.JniBasedUnixGroupsMapping
org.apache.hadoop.io.nativeio.NativeIO
org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping
@@ -498,7 +501,7 @@
-
+
diff --git a/hadoop-common-project/hadoop-common/src/CMakeLists.txt b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
index 051337e2e2d..7449610b9ae 100644
--- a/hadoop-common-project/hadoop-common/src/CMakeLists.txt
+++ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
@@ -97,6 +97,23 @@ set(T main/native/src/test/org/apache/hadoop)
GET_FILENAME_COMPONENT(HADOOP_ZLIB_LIBRARY ${ZLIB_LIBRARIES} NAME)
+SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES)
+set_find_shared_library_version("1")
+find_package(BZip2 QUIET)
+if (BZIP2_INCLUDE_DIR AND BZIP2_LIBRARIES)
+ GET_FILENAME_COMPONENT(HADOOP_BZIP2_LIBRARY ${BZIP2_LIBRARIES} NAME)
+ set(BZIP2_SOURCE_FILES
+ "${D}/io/compress/bzip2/Bzip2Compressor.c"
+ "${D}/io/compress/bzip2/Bzip2Decompressor.c")
+else (BZIP2_INCLUDE_DIR AND BZIP2_LIBRARIES)
+ set(BZIP2_SOURCE_FILES "")
+ set(BZIP2_INCLUDE_DIR "")
+ IF(REQUIRE_BZIP2)
+ MESSAGE(FATAL_ERROR "Required bzip2 library and/or header files could not be found.")
+ ENDIF(REQUIRE_BZIP2)
+endif (BZIP2_INCLUDE_DIR AND BZIP2_LIBRARIES)
+SET(CMAKE_FIND_LIBRARY_SUFFIXES STORED_CMAKE_FIND_LIBRARY_SUFFIXES)
+
INCLUDE(CheckFunctionExists)
INCLUDE(CheckCSourceCompiles)
INCLUDE(CheckLibraryExists)
@@ -136,6 +153,7 @@ include_directories(
${CMAKE_BINARY_DIR}
${JNI_INCLUDE_DIRS}
${ZLIB_INCLUDE_DIRS}
+ ${BZIP2_INCLUDE_DIR}
${SNAPPY_INCLUDE_DIR}
${D}/util
)
@@ -155,6 +173,7 @@ add_dual_library(hadoop
${SNAPPY_SOURCE_FILES}
${D}/io/compress/zlib/ZlibCompressor.c
${D}/io/compress/zlib/ZlibDecompressor.c
+ ${BZIP2_SOURCE_FILES}
${D}/io/nativeio/NativeIO.c
${D}/io/nativeio/errno_enum.c
${D}/io/nativeio/file_descriptor.c
diff --git a/hadoop-common-project/hadoop-common/src/config.h.cmake b/hadoop-common-project/hadoop-common/src/config.h.cmake
index e720d306570..020017c02fa 100644
--- a/hadoop-common-project/hadoop-common/src/config.h.cmake
+++ b/hadoop-common-project/hadoop-common/src/config.h.cmake
@@ -19,6 +19,7 @@
#define CONFIG_H
#cmakedefine HADOOP_ZLIB_LIBRARY "@HADOOP_ZLIB_LIBRARY@"
+#cmakedefine HADOOP_BZIP2_LIBRARY "@HADOOP_BZIP2_LIBRARY@"
#cmakedefine HADOOP_SNAPPY_LIBRARY "@HADOOP_SNAPPY_LIBRARY@"
#cmakedefine HAVE_SYNC_FILE_RANGE
#cmakedefine HAVE_POSIX_FADVISE
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
index 35f7cb43ea0..42e96cfdc50 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
@@ -23,108 +23,156 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.compress.bzip2.BZip2Constants;
-import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor;
-import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor;
import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
+import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
/**
- * This class provides CompressionOutputStream and CompressionInputStream for
- * compression and decompression. Currently we dont have an implementation of
- * the Compressor and Decompressor interfaces, so those methods of
- * CompressionCodec which have a Compressor or Decompressor type argument, throw
- * UnsupportedOperationException.
+ * This class provides output and input streams for bzip2 compression
+ * and decompression. It uses the native bzip2 library on the system
+ * if possible, else it uses a pure-Java implementation of the bzip2
+ * algorithm. The configuration parameter
+ * io.compression.codec.bzip2.library can be used to control this
+ * behavior.
+ *
+ * In the pure-Java mode, the Compressor and Decompressor interfaces
+ * are not implemented. Therefore, in that mode, those methods of
+ * CompressionCodec which have a Compressor or Decompressor type
+ * argument, throw UnsupportedOperationException.
+ *
+ * Currently, support for splittability is available only in the
+ * pure-Java mode; therefore, if a SplitCompressionInputStream is
+ * requested, the pure-Java implementation is used, regardless of the
+ * setting of the configuration parameter mentioned above.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public class BZip2Codec implements SplittableCompressionCodec {
+public class BZip2Codec implements Configurable, SplittableCompressionCodec {
private static final String HEADER = "BZ";
private static final int HEADER_LEN = HEADER.length();
private static final String SUB_HEADER = "h9";
private static final int SUB_HEADER_LEN = SUB_HEADER.length();
+ private Configuration conf;
+
/**
- * Creates a new instance of BZip2Codec
+ * Set the configuration to be used by this object.
+ *
+ * @param conf the configuration object.
+ */
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Return the configuration used by this object.
+ *
+ * @return the configuration object used by this objec.
+ */
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Creates a new instance of BZip2Codec.
*/
public BZip2Codec() { }
/**
- * Creates CompressionOutputStream for BZip2
- *
- * @param out
- * The output Stream
- * @return The BZip2 CompressionOutputStream
- * @throws java.io.IOException
- * Throws IO exception
- */
+ * Create a {@link CompressionOutputStream} that will write to the given
+ * {@link OutputStream}.
+ *
+ * @param out the location for the final output stream
+ * @return a stream the user can write uncompressed data to, to have it
+ * compressed
+ * @throws IOException
+ */
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
- return new BZip2CompressionOutputStream(out);
+ return createOutputStream(out, createCompressor());
}
/**
- * Creates a compressor using given OutputStream.
+ * Create a {@link CompressionOutputStream} that will write to the given
+ * {@link OutputStream} with the given {@link Compressor}.
*
- * @return CompressionOutputStream
- @throws java.io.IOException
+ * @param out the location for the final output stream
+ * @param compressor compressor to use
+ * @return a stream the user can write uncompressed data to, to have it
+ * compressed
+ * @throws IOException
*/
@Override
public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor) throws IOException {
- return createOutputStream(out);
+ return Bzip2Factory.isNativeBzip2Loaded(conf) ?
+ new CompressorStream(out, compressor,
+ conf.getInt("io.file.buffer.size", 4*1024)) :
+ new BZip2CompressionOutputStream(out);
}
/**
- * This functionality is currently not supported.
- *
- * @return BZip2DummyCompressor.class
- */
+ * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
+ *
+ * @return the type of compressor needed by this codec.
+ */
@Override
- public Class extends org.apache.hadoop.io.compress.Compressor> getCompressorType() {
- return BZip2DummyCompressor.class;
+ public Class extends Compressor> getCompressorType() {
+ return Bzip2Factory.getBzip2CompressorType(conf);
}
/**
- * This functionality is currently not supported.
- *
- * @return Compressor
- */
+ * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
+ *
+ * @return a new compressor for use by this codec
+ */
@Override
public Compressor createCompressor() {
- return new BZip2DummyCompressor();
+ return Bzip2Factory.getBzip2Compressor(conf);
}
/**
- * Creates CompressionInputStream to be used to read off uncompressed data.
- *
- * @param in
- * The InputStream
- * @return Returns CompressionInputStream for BZip2
- * @throws java.io.IOException
- * Throws IOException
- */
+ * Create a {@link CompressionInputStream} that will read from the given
+ * input stream and return a stream for uncompressed data.
+ *
+ * @param in the stream to read compressed bytes from
+ * @return a stream to read uncompressed bytes from
+ * @throws IOException
+ */
@Override
public CompressionInputStream createInputStream(InputStream in)
throws IOException {
- return new BZip2CompressionInputStream(in);
+ return createInputStream(in, createDecompressor());
}
/**
- * This functionality is currently not supported.
- *
- * @return CompressionInputStream
- */
+ * Create a {@link CompressionInputStream} that will read from the given
+ * {@link InputStream} with the given {@link Decompressor}, and return a
+ * stream for uncompressed data.
+ *
+ * @param in the stream to read compressed bytes from
+ * @param decompressor decompressor to use
+ * @return a stream to read uncompressed bytes from
+ * @throws IOException
+ */
@Override
public CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor) throws IOException {
- return createInputStream(in);
+ return Bzip2Factory.isNativeBzip2Loaded(conf) ?
+ new DecompressorStream(in, decompressor,
+ conf.getInt("io.file.buffer.size", 4*1024)) :
+ new BZip2CompressionInputStream(in);
}
/**
@@ -139,7 +187,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
*
* @return CompressionInputStream for BZip2 aligned at block boundaries
*/
- @Override
public SplitCompressionInputStream createInputStream(InputStream seekableIn,
Decompressor decompressor, long start, long end, READ_MODE readMode)
throws IOException {
@@ -184,23 +231,23 @@ public class BZip2Codec implements SplittableCompressionCodec {
}
/**
- * This functionality is currently not supported.
- *
- * @return BZip2DummyDecompressor.class
- */
+ * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
+ *
+ * @return the type of decompressor needed by this codec.
+ */
@Override
- public Class extends org.apache.hadoop.io.compress.Decompressor> getDecompressorType() {
- return BZip2DummyDecompressor.class;
+ public Class extends Decompressor> getDecompressorType() {
+ return Bzip2Factory.getBzip2DecompressorType(conf);
}
/**
- * This functionality is currently not supported.
- *
- * @return Decompressor
- */
+ * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
+ *
+ * @return a new decompressor for use by this codec
+ */
@Override
public Decompressor createDecompressor() {
- return new BZip2DummyDecompressor();
+ return Bzip2Factory.getBzip2Decompressor(conf);
}
/**
@@ -236,7 +283,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
}
}
- @Override
public void finish() throws IOException {
if (needsReset) {
// In the case that nothing is written to this stream, we still need to
@@ -256,14 +302,12 @@ public class BZip2Codec implements SplittableCompressionCodec {
}
}
- @Override
public void resetState() throws IOException {
// Cannot write to out at this point because out might not be ready
// yet, as in SequenceFile.Writer implementation.
needsReset = true;
}
- @Override
public void write(int b) throws IOException {
if (needsReset) {
internalReset();
@@ -271,7 +315,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
this.output.write(b);
}
- @Override
public void write(byte[] b, int off, int len) throws IOException {
if (needsReset) {
internalReset();
@@ -279,7 +322,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
this.output.write(b, off, len);
}
- @Override
public void close() throws IOException {
if (needsReset) {
// In the case that nothing is written to this stream, we still need to
@@ -397,7 +439,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
}// end of method
- @Override
public void close() throws IOException {
if (!needsReset) {
input.close();
@@ -433,7 +474,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
*
*/
- @Override
public int read(byte[] b, int off, int len) throws IOException {
if (needsReset) {
internalReset();
@@ -457,7 +497,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
}
- @Override
public int read() throws IOException {
byte b[] = new byte[1];
int result = this.read(b, 0, 1);
@@ -472,7 +511,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
}
}
- @Override
public void resetState() throws IOException {
// Cannot read from bufferedIn at this point because bufferedIn
// might not be ready
@@ -480,7 +518,6 @@ public class BZip2Codec implements SplittableCompressionCodec {
needsReset = true;
}
- @Override
public long getPos() {
return this.compressedStreamPosition;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.java
new file mode 100644
index 00000000000..6f502476131
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.bzip2;
+
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.Compressor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A {@link Compressor} based on the popular
+ * bzip2 compression algorithm.
+ * http://www.bzip2.org/
+ *
+ */
+public class Bzip2Compressor implements Compressor {
+ private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
+
+ // The default values for the block size and work factor are the same
+ // those in Julian Seward's original bzip2 implementation.
+ static final int DEFAULT_BLOCK_SIZE = 9;
+ static final int DEFAULT_WORK_FACTOR = 30;
+
+ private static final Log LOG = LogFactory.getLog(Bzip2Compressor.class);
+
+ // HACK - Use this as a global lock in the JNI layer.
+ private static Class clazz = Bzip2Compressor.class;
+
+ private long stream;
+ private int blockSize;
+ private int workFactor;
+ private int directBufferSize;
+ private byte[] userBuf = null;
+ private int userBufOff = 0, userBufLen = 0;
+ private Buffer uncompressedDirectBuf = null;
+ private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0;
+ private boolean keepUncompressedBuf = false;
+ private Buffer compressedDirectBuf = null;
+ private boolean finish, finished;
+
+ /**
+ * Creates a new compressor with a default values for the
+ * compression block size and work factor. Compressed data will be
+ * generated in bzip2 format.
+ */
+ public Bzip2Compressor() {
+ this(DEFAULT_BLOCK_SIZE, DEFAULT_WORK_FACTOR, DEFAULT_DIRECT_BUFFER_SIZE);
+ }
+
+ /**
+ * Creates a new compressor, taking settings from the configuration.
+ */
+ public Bzip2Compressor(Configuration conf) {
+ this(Bzip2Factory.getBlockSize(conf),
+ Bzip2Factory.getWorkFactor(conf),
+ DEFAULT_DIRECT_BUFFER_SIZE);
+ }
+
+ /**
+ * Creates a new compressor using the specified block size.
+ * Compressed data will be generated in bzip2 format.
+ *
+ * @param blockSize The block size to be used for compression. This is
+ * an integer from 1 through 9, which is multiplied by 100,000 to
+ * obtain the actual block size in bytes.
+ * @param workFactor This parameter is a threshold that determines when a
+ * fallback algorithm is used for pathological data. It ranges from
+ * 0 to 250.
+ * @param directBufferSize Size of the direct buffer to be used.
+ */
+ public Bzip2Compressor(int blockSize, int workFactor,
+ int directBufferSize) {
+ this.blockSize = blockSize;
+ this.workFactor = workFactor;
+ this.directBufferSize = directBufferSize;
+ stream = init(blockSize, workFactor);
+ uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+ compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+ compressedDirectBuf.position(directBufferSize);
+ }
+
+ /**
+ * Prepare the compressor to be used in a new stream with settings defined in
+ * the given Configuration. It will reset the compressor's block size and
+ * and work factor.
+ *
+ * @param conf Configuration storing new settings
+ */
+ @Override
+ public synchronized void reinit(Configuration conf) {
+ reset();
+ end(stream);
+ if (conf == null) {
+ stream = init(blockSize, workFactor);
+ return;
+ }
+ blockSize = Bzip2Factory.getBlockSize(conf);
+ workFactor = Bzip2Factory.getWorkFactor(conf);
+ stream = init(blockSize, workFactor);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Reinit compressor with new compression configuration");
+ }
+ }
+
+ @Override
+ public synchronized void setInput(byte[] b, int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ this.userBuf = b;
+ this.userBufOff = off;
+ this.userBufLen = len;
+ uncompressedDirectBufOff = 0;
+ setInputFromSavedData();
+
+ // Reinitialize bzip2's output direct buffer.
+ compressedDirectBuf.limit(directBufferSize);
+ compressedDirectBuf.position(directBufferSize);
+ }
+
+ // Copy enough data from userBuf to uncompressedDirectBuf.
+ synchronized void setInputFromSavedData() {
+ int len = Math.min(userBufLen, uncompressedDirectBuf.remaining());
+ ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff, len);
+ userBufLen -= len;
+ userBufOff += len;
+ uncompressedDirectBufLen = uncompressedDirectBuf.position();
+ }
+
+ @Override
+ public synchronized void setDictionary(byte[] b, int off, int len) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public synchronized boolean needsInput() {
+ // Compressed data still available?
+ if (compressedDirectBuf.remaining() > 0) {
+ return false;
+ }
+
+ // Uncompressed data available in either the direct buffer or user buffer?
+ if (keepUncompressedBuf && uncompressedDirectBufLen > 0)
+ return false;
+
+ if (uncompressedDirectBuf.remaining() > 0) {
+ // Check if we have consumed all data in the user buffer.
+ if (userBufLen <= 0) {
+ return true;
+ } else {
+ // Copy enough data from userBuf to uncompressedDirectBuf.
+ setInputFromSavedData();
+ return uncompressedDirectBuf.remaining() > 0;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public synchronized void finish() {
+ finish = true;
+ }
+
+ @Override
+ public synchronized boolean finished() {
+ // Check if bzip2 says it has finished and
+ // all compressed data has been consumed.
+ return (finished && compressedDirectBuf.remaining() == 0);
+ }
+
+ @Override
+ public synchronized int compress(byte[] b, int off, int len)
+ throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ // Check if there is compressed data.
+ int n = compressedDirectBuf.remaining();
+ if (n > 0) {
+ n = Math.min(n, len);
+ ((ByteBuffer)compressedDirectBuf).get(b, off, n);
+ return n;
+ }
+
+ // Re-initialize bzip2's output direct buffer.
+ compressedDirectBuf.rewind();
+ compressedDirectBuf.limit(directBufferSize);
+
+ // Compress the data.
+ n = deflateBytesDirect();
+ compressedDirectBuf.limit(n);
+
+ // Check if bzip2 has consumed the entire input buffer.
+ // Set keepUncompressedBuf properly.
+ if (uncompressedDirectBufLen <= 0) { // bzip2 consumed all input
+ keepUncompressedBuf = false;
+ uncompressedDirectBuf.clear();
+ uncompressedDirectBufOff = 0;
+ uncompressedDirectBufLen = 0;
+ } else {
+ keepUncompressedBuf = true;
+ }
+
+ // Get at most 'len' bytes.
+ n = Math.min(n, len);
+ ((ByteBuffer)compressedDirectBuf).get(b, off, n);
+
+ return n;
+ }
+
+ /**
+ * Returns the total number of compressed bytes output so far.
+ *
+ * @return the total (non-negative) number of compressed bytes output so far
+ */
+ @Override
+ public synchronized long getBytesWritten() {
+ checkStream();
+ return getBytesWritten(stream);
+ }
+
+ /**
+ * Returns the total number of uncompressed bytes input so far.
+ *
+ * @return the total (non-negative) number of uncompressed bytes input so far
+ */
+ @Override
+ public synchronized long getBytesRead() {
+ checkStream();
+ return getBytesRead(stream);
+ }
+
+ @Override
+ public synchronized void reset() {
+ checkStream();
+ end(stream);
+ stream = init(blockSize, workFactor);
+ finish = false;
+ finished = false;
+ uncompressedDirectBuf.rewind();
+ uncompressedDirectBufOff = uncompressedDirectBufLen = 0;
+ keepUncompressedBuf = false;
+ compressedDirectBuf.limit(directBufferSize);
+ compressedDirectBuf.position(directBufferSize);
+ userBufOff = userBufLen = 0;
+ }
+
+ @Override
+ public synchronized void end() {
+ if (stream != 0) {
+ end(stream);
+ stream = 0;
+ }
+ }
+
+ static void initSymbols(String libname) {
+ initIDs(libname);
+ }
+
+ private void checkStream() {
+ if (stream == 0)
+ throw new NullPointerException();
+ }
+
+ private native static void initIDs(String libname);
+ private native static long init(int blockSize, int workFactor);
+ private native int deflateBytesDirect();
+ private native static long getBytesRead(long strm);
+ private native static long getBytesWritten(long strm);
+ private native static void end(long strm);
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.java
new file mode 100644
index 00000000000..672090209db
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.bzip2;
+
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.compress.Decompressor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A {@link Decompressor} based on the popular
+ * bzip2 compression algorithm.
+ * http://www.bzip2.org/
+ *
+ */
+public class Bzip2Decompressor implements Decompressor {
+ private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
+
+ private static final Log LOG = LogFactory.getLog(Bzip2Decompressor.class);
+
+ // HACK - Use this as a global lock in the JNI layer.
+ private static Class clazz = Bzip2Decompressor.class;
+
+ private long stream;
+ private boolean conserveMemory;
+ private int directBufferSize;
+ private Buffer compressedDirectBuf = null;
+ private int compressedDirectBufOff, compressedDirectBufLen;
+ private Buffer uncompressedDirectBuf = null;
+ private byte[] userBuf = null;
+ private int userBufOff = 0, userBufLen = 0;
+ private boolean finished;
+
+ /**
+ * Creates a new decompressor.
+ */
+ public Bzip2Decompressor(boolean conserveMemory, int directBufferSize) {
+ this.conserveMemory = conserveMemory;
+ this.directBufferSize = directBufferSize;
+ compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+ uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+ uncompressedDirectBuf.position(directBufferSize);
+
+ stream = init(conserveMemory ? 1 : 0);
+ }
+
+ public Bzip2Decompressor() {
+ this(false, DEFAULT_DIRECT_BUFFER_SIZE);
+ }
+
+ @Override
+ public synchronized void setInput(byte[] b, int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ this.userBuf = b;
+ this.userBufOff = off;
+ this.userBufLen = len;
+
+ setInputFromSavedData();
+
+ // Reinitialize bzip2's output direct buffer.
+ uncompressedDirectBuf.limit(directBufferSize);
+ uncompressedDirectBuf.position(directBufferSize);
+ }
+
+ synchronized void setInputFromSavedData() {
+ compressedDirectBufOff = 0;
+ compressedDirectBufLen = userBufLen;
+ if (compressedDirectBufLen > directBufferSize) {
+ compressedDirectBufLen = directBufferSize;
+ }
+
+ // Reinitialize bzip2's input direct buffer.
+ compressedDirectBuf.rewind();
+ ((ByteBuffer)compressedDirectBuf).put(userBuf, userBufOff,
+ compressedDirectBufLen);
+
+ // Note how much data is being fed to bzip2.
+ userBufOff += compressedDirectBufLen;
+ userBufLen -= compressedDirectBufLen;
+ }
+
+ @Override
+ public synchronized void setDictionary(byte[] b, int off, int len) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public synchronized boolean needsInput() {
+ // Consume remaining compressed data?
+ if (uncompressedDirectBuf.remaining() > 0) {
+ return false;
+ }
+
+ // Check if bzip2 has consumed all input.
+ if (compressedDirectBufLen <= 0) {
+ // Check if we have consumed all user-input.
+ if (userBufLen <= 0) {
+ return true;
+ } else {
+ setInputFromSavedData();
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public synchronized boolean needsDictionary() {
+ return false;
+ }
+
+ @Override
+ public synchronized boolean finished() {
+ // Check if bzip2 says it has finished and
+ // all compressed data has been consumed.
+ return (finished && uncompressedDirectBuf.remaining() == 0);
+ }
+
+ @Override
+ public synchronized int decompress(byte[] b, int off, int len)
+ throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ // Check if there is uncompressed data.
+ int n = uncompressedDirectBuf.remaining();
+ if (n > 0) {
+ n = Math.min(n, len);
+ ((ByteBuffer)uncompressedDirectBuf).get(b, off, n);
+ return n;
+ }
+
+ // Re-initialize bzip2's output direct buffer.
+ uncompressedDirectBuf.rewind();
+ uncompressedDirectBuf.limit(directBufferSize);
+
+ // Decompress the data.
+ n = finished ? 0 : inflateBytesDirect();
+ uncompressedDirectBuf.limit(n);
+
+ // Get at most 'len' bytes.
+ n = Math.min(n, len);
+ ((ByteBuffer)uncompressedDirectBuf).get(b, off, n);
+
+ return n;
+ }
+
+ /**
+ * Returns the total number of uncompressed bytes output so far.
+ *
+ * @return the total (non-negative) number of uncompressed bytes output so far
+ */
+ public synchronized long getBytesWritten() {
+ checkStream();
+ return getBytesWritten(stream);
+ }
+
+ /**
+ * Returns the total number of compressed bytes input so far.
+ *
+ * @return the total (non-negative) number of compressed bytes input so far
+ */
+ public synchronized long getBytesRead() {
+ checkStream();
+ return getBytesRead(stream);
+ }
+
+ /**
+ * Returns the number of bytes remaining in the input buffers; normally
+ * called when finished() is true to determine amount of post-gzip-stream
+ * data.
+ *
+ * @return the total (non-negative) number of unprocessed bytes in input
+ */
+ @Override
+ public synchronized int getRemaining() {
+ checkStream();
+ return userBufLen + getRemaining(stream); // userBuf + compressedDirectBuf
+ }
+
+ /**
+ * Resets everything including the input buffers (user and direct).
+ */
+ @Override
+ public synchronized void reset() {
+ checkStream();
+ end(stream);
+ stream = init(conserveMemory ? 1 : 0);
+ finished = false;
+ compressedDirectBufOff = compressedDirectBufLen = 0;
+ uncompressedDirectBuf.limit(directBufferSize);
+ uncompressedDirectBuf.position(directBufferSize);
+ userBufOff = userBufLen = 0;
+ }
+
+ @Override
+ public synchronized void end() {
+ if (stream != 0) {
+ end(stream);
+ stream = 0;
+ }
+ }
+
+ static void initSymbols(String libname) {
+ initIDs(libname);
+ }
+
+ private void checkStream() {
+ if (stream == 0)
+ throw new NullPointerException();
+ }
+
+ private native static void initIDs(String libname);
+ private native static long init(int conserveMemory);
+ private native int inflateBytesDirect();
+ private native static long getBytesRead(long strm);
+ private native static long getBytesWritten(long strm);
+ private native static int getRemaining(long strm);
+ private native static void end(long strm);
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Factory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Factory.java
new file mode 100644
index 00000000000..80dc4e93bad
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Factory.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.bzip2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.util.NativeCodeLoader;
+
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.bzip2.Bzip2Compressor;
+import org.apache.hadoop.io.compress.bzip2.Bzip2Decompressor;
+import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor;
+import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor;
+
+/**
+ * A collection of factories to create the right
+ * bzip2 compressor/decompressor instances.
+ *
+ */
+public class Bzip2Factory {
+ private static final Log LOG = LogFactory.getLog(Bzip2Factory.class);
+
+ private static String bzip2LibraryName = "";
+ private static boolean nativeBzip2Loaded;
+
+ /**
+ * Check if native-bzip2 code is loaded & initialized correctly and
+ * can be loaded for this job.
+ *
+ * @param conf configuration
+ * @return true
if native-bzip2 is loaded & initialized
+ * and can be loaded for this job, else false
+ */
+ public static boolean isNativeBzip2Loaded(Configuration conf) {
+ String libname = conf.get("io.compression.codec.bzip2.library",
+ "system-native");
+ if (!bzip2LibraryName.equals(libname)) {
+ nativeBzip2Loaded = false;
+ bzip2LibraryName = libname;
+ if (libname.equals("java-builtin")) {
+ LOG.info("Using pure-Java version of bzip2 library");
+ } else if (conf.getBoolean(
+ CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY,
+ CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_DEFAULT) &&
+ NativeCodeLoader.isNativeCodeLoaded()) {
+ try {
+ // Initialize the native library.
+ Bzip2Compressor.initSymbols(libname);
+ Bzip2Decompressor.initSymbols(libname);
+ nativeBzip2Loaded = true;
+ LOG.info("Successfully loaded & initialized native-bzip2 library " +
+ libname);
+ } catch (Throwable t) {
+ LOG.warn("Failed to load/initialize native-bzip2 library " +
+ libname + ", will use pure-Java version");
+ }
+ }
+ }
+ return nativeBzip2Loaded;
+ }
+
+ /**
+ * Return the appropriate type of the bzip2 compressor.
+ *
+ * @param conf configuration
+ * @return the appropriate type of the bzip2 compressor.
+ */
+ public static Class extends Compressor>
+ getBzip2CompressorType(Configuration conf) {
+ return isNativeBzip2Loaded(conf) ?
+ Bzip2Compressor.class : BZip2DummyCompressor.class;
+ }
+
+ /**
+ * Return the appropriate implementation of the bzip2 compressor.
+ *
+ * @param conf configuration
+ * @return the appropriate implementation of the bzip2 compressor.
+ */
+ public static Compressor getBzip2Compressor(Configuration conf) {
+ return isNativeBzip2Loaded(conf)?
+ new Bzip2Compressor(conf) : new BZip2DummyCompressor();
+ }
+
+ /**
+ * Return the appropriate type of the bzip2 decompressor.
+ *
+ * @param conf configuration
+ * @return the appropriate type of the bzip2 decompressor.
+ */
+ public static Class extends Decompressor>
+ getBzip2DecompressorType(Configuration conf) {
+ return isNativeBzip2Loaded(conf) ?
+ Bzip2Decompressor.class : BZip2DummyDecompressor.class;
+ }
+
+ /**
+ * Return the appropriate implementation of the bzip2 decompressor.
+ *
+ * @param conf configuration
+ * @return the appropriate implementation of the bzip2 decompressor.
+ */
+ public static Decompressor getBzip2Decompressor(Configuration conf) {
+ return isNativeBzip2Loaded(conf) ?
+ new Bzip2Decompressor() : new BZip2DummyDecompressor();
+ }
+
+ public static void setBlockSize(Configuration conf, int blockSize) {
+ conf.setInt("bzip2.compress.blocksize", blockSize);
+ }
+
+ public static int getBlockSize(Configuration conf) {
+ return conf.getInt("bzip2.compress.blocksize",
+ Bzip2Compressor.DEFAULT_BLOCK_SIZE);
+ }
+
+ public static void setWorkFactor(Configuration conf, int workFactor) {
+ conf.setInt("bzip2.compress.workfactor", workFactor);
+ }
+
+ public static int getWorkFactor(Configuration conf) {
+ return conf.getInt("bzip2.compress.workfactor",
+ Bzip2Compressor.DEFAULT_WORK_FACTOR);
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.c
new file mode 100644
index 00000000000..8d0b005ab85
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.c
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+#include
+#include
+#include
+#include
+
+#include "org_apache_hadoop_io_compress_bzip2.h"
+#include "org_apache_hadoop_io_compress_bzip2_Bzip2Compressor.h"
+
+static jfieldID Bzip2Compressor_clazz;
+static jfieldID Bzip2Compressor_stream;
+static jfieldID Bzip2Compressor_uncompressedDirectBuf;
+static jfieldID Bzip2Compressor_uncompressedDirectBufOff;
+static jfieldID Bzip2Compressor_uncompressedDirectBufLen;
+static jfieldID Bzip2Compressor_compressedDirectBuf;
+static jfieldID Bzip2Compressor_directBufferSize;
+static jfieldID Bzip2Compressor_finish;
+static jfieldID Bzip2Compressor_finished;
+
+static int (*dlsym_BZ2_bzCompressInit)(bz_stream*, int, int, int);
+static int (*dlsym_BZ2_bzCompress)(bz_stream*, int);
+static int (*dlsym_BZ2_bzCompressEnd)(bz_stream*);
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_initIDs(
+ JNIEnv *env, jclass class, jstring libname)
+{
+ const char* bzlib_name = (*env)->GetStringUTFChars(env, libname, NULL);
+ if (strcmp(bzlib_name, "system-native") == 0)
+ bzlib_name = HADOOP_BZIP2_LIBRARY;
+ // Load the native library.
+ void *libbz2 = dlopen(bzlib_name, RTLD_LAZY | RTLD_GLOBAL);
+ if (!libbz2) {
+ THROW(env, "java/lang/UnsatisfiedLinkError",
+ "Cannot load bzip2 native library");
+ return;
+ }
+
+ // Locate the requisite symbols from libbz2.so.
+ dlerror(); // Clear any existing error.
+ LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzCompressInit, env, libbz2,
+ "BZ2_bzCompressInit");
+ LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzCompress, env, libbz2,
+ "BZ2_bzCompress");
+ LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzCompressEnd, env, libbz2,
+ "BZ2_bzCompressEnd");
+
+ // Initialize the requisite fieldIds.
+ Bzip2Compressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz",
+ "Ljava/lang/Class;");
+ Bzip2Compressor_stream = (*env)->GetFieldID(env, class, "stream", "J");
+ Bzip2Compressor_finish = (*env)->GetFieldID(env, class, "finish", "Z");
+ Bzip2Compressor_finished = (*env)->GetFieldID(env, class, "finished", "Z");
+ Bzip2Compressor_uncompressedDirectBuf = (*env)->GetFieldID(env, class,
+ "uncompressedDirectBuf",
+ "Ljava/nio/Buffer;");
+ Bzip2Compressor_uncompressedDirectBufOff = (*env)->GetFieldID(env, class,
+ "uncompressedDirectBufOff",
+ "I");
+ Bzip2Compressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, class,
+ "uncompressedDirectBufLen",
+ "I");
+ Bzip2Compressor_compressedDirectBuf = (*env)->GetFieldID(env, class,
+ "compressedDirectBuf",
+ "Ljava/nio/Buffer;");
+ Bzip2Compressor_directBufferSize = (*env)->GetFieldID(env, class,
+ "directBufferSize", "I");
+}
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_init(
+ JNIEnv *env, jclass class, jint blockSize, jint workFactor)
+{
+ // Create a bz_stream.
+ bz_stream *stream = malloc(sizeof(bz_stream));
+ if (!stream) {
+ THROW(env, "java/lang/OutOfMemoryError", NULL);
+ return (jlong)0;
+ }
+ memset((void*)stream, 0, sizeof(bz_stream));
+
+ // Initialize stream.
+ int rv = (*dlsym_BZ2_bzCompressInit)(stream, blockSize, 0, workFactor);
+ if (rv != BZ_OK) {
+ // Contingency - Report error by throwing appropriate exceptions.
+ free(stream);
+ stream = NULL;
+
+ switch (rv) {
+ case BZ_MEM_ERROR:
+ {
+ THROW(env, "java/lang/OutOfMemoryError", NULL);
+ }
+ break;
+ case BZ_PARAM_ERROR:
+ {
+ THROW(env,
+ "java/lang/IllegalArgumentException",
+ NULL);
+ }
+ break;
+ default:
+ {
+ THROW(env, "java/lang/InternalError", NULL);
+ }
+ break;
+ }
+ }
+
+ return JLONG(stream);
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_deflateBytesDirect(
+ JNIEnv *env, jobject this)
+{
+ // Get members of Bzip2Compressor.
+ bz_stream *stream = BZSTREAM((*env)->GetLongField(env, this,
+ Bzip2Compressor_stream));
+ if (!stream) {
+ THROW(env, "java/lang/NullPointerException", NULL);
+ return (jint)0;
+ }
+
+ jobject clazz = (*env)->GetStaticObjectField(env, this,
+ Bzip2Compressor_clazz);
+ jobject uncompressed_direct_buf = (*env)->GetObjectField(env, this,
+ Bzip2Compressor_uncompressedDirectBuf);
+ jint uncompressed_direct_buf_off = (*env)->GetIntField(env, this,
+ Bzip2Compressor_uncompressedDirectBufOff);
+ jint uncompressed_direct_buf_len = (*env)->GetIntField(env, this,
+ Bzip2Compressor_uncompressedDirectBufLen);
+
+ jobject compressed_direct_buf = (*env)->GetObjectField(env, this,
+ Bzip2Compressor_compressedDirectBuf);
+ jint compressed_direct_buf_len = (*env)->GetIntField(env, this,
+ Bzip2Compressor_directBufferSize);
+
+ jboolean finish = (*env)->GetBooleanField(env, this,
+ Bzip2Compressor_finish);
+
+ // Get the input and output direct buffers.
+ LOCK_CLASS(env, clazz, "Bzip2Compressor");
+ char* uncompressed_bytes = (*env)->GetDirectBufferAddress(env,
+ uncompressed_direct_buf);
+ char* compressed_bytes = (*env)->GetDirectBufferAddress(env,
+ compressed_direct_buf);
+ UNLOCK_CLASS(env, clazz, "Bzip2Compressor");
+
+ if (!uncompressed_bytes || !compressed_bytes) {
+ return (jint)0;
+ }
+
+ // Re-calibrate the bz_stream.
+ stream->next_in = uncompressed_bytes + uncompressed_direct_buf_off;
+ stream->avail_in = uncompressed_direct_buf_len;
+ stream->next_out = compressed_bytes;
+ stream->avail_out = compressed_direct_buf_len;
+
+ // Compress.
+ int rv = dlsym_BZ2_bzCompress(stream, finish ? BZ_FINISH : BZ_RUN);
+
+ jint no_compressed_bytes = 0;
+ switch (rv) {
+ // Contingency? - Report error by throwing appropriate exceptions.
+ case BZ_STREAM_END:
+ {
+ (*env)->SetBooleanField(env, this,
+ Bzip2Compressor_finished,
+ JNI_TRUE);
+ } // cascade
+ case BZ_RUN_OK:
+ case BZ_FINISH_OK:
+ {
+ uncompressed_direct_buf_off +=
+ uncompressed_direct_buf_len - stream->avail_in;
+ (*env)->SetIntField(env, this,
+ Bzip2Compressor_uncompressedDirectBufOff,
+ uncompressed_direct_buf_off);
+ (*env)->SetIntField(env, this,
+ Bzip2Compressor_uncompressedDirectBufLen,
+ stream->avail_in);
+ no_compressed_bytes =
+ compressed_direct_buf_len - stream->avail_out;
+ }
+ break;
+ default:
+ {
+ THROW(env, "java/lang/InternalError", NULL);
+ }
+ break;
+ }
+
+ return no_compressed_bytes;
+}
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_getBytesRead(
+ JNIEnv *env, jclass class, jlong stream)
+{
+ const bz_stream* strm = BZSTREAM(stream);
+ return ((jlong)strm->total_in_hi32 << 32) | strm->total_in_lo32;
+}
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_getBytesWritten(
+ JNIEnv *env, jclass class, jlong stream)
+{
+ const bz_stream* strm = BZSTREAM(stream);
+ return ((jlong)strm->total_out_hi32 << 32) | strm->total_out_lo32;
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_end(
+ JNIEnv *env, jclass class, jlong stream)
+{
+ if (dlsym_BZ2_bzCompressEnd(BZSTREAM(stream)) != BZ_OK) {
+ THROW(env, "java/lang/InternalError", NULL);
+ } else {
+ free(BZSTREAM(stream));
+ }
+}
+
+/**
+ * vim: sw=2: ts=2: et:
+ */
+
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.c
new file mode 100644
index 00000000000..b6c52135246
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.c
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+#include
+#include
+#include
+#include
+
+#include "org_apache_hadoop_io_compress_bzip2.h"
+#include "org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor.h"
+
+static jfieldID Bzip2Decompressor_clazz;
+static jfieldID Bzip2Decompressor_stream;
+static jfieldID Bzip2Decompressor_compressedDirectBuf;
+static jfieldID Bzip2Decompressor_compressedDirectBufOff;
+static jfieldID Bzip2Decompressor_compressedDirectBufLen;
+static jfieldID Bzip2Decompressor_uncompressedDirectBuf;
+static jfieldID Bzip2Decompressor_directBufferSize;
+static jfieldID Bzip2Decompressor_finished;
+
+static int (*dlsym_BZ2_bzDecompressInit)(bz_stream*, int, int);
+static int (*dlsym_BZ2_bzDecompress)(bz_stream*);
+static int (*dlsym_BZ2_bzDecompressEnd)(bz_stream*);
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_initIDs(
+ JNIEnv *env, jclass class, jstring libname)
+{
+ const char* bzlib_name = (*env)->GetStringUTFChars(env, libname, NULL);
+ if (strcmp(bzlib_name, "system-native") == 0)
+ bzlib_name = HADOOP_BZIP2_LIBRARY;
+ // Load the native library.
+ void *libbz2 = dlopen(bzlib_name, RTLD_LAZY | RTLD_GLOBAL);
+ if (!libbz2) {
+ THROW(env, "java/lang/UnsatisfiedLinkError",
+ "Cannot load bzip2 native library");
+ return;
+ }
+
+ // Locate the requisite symbols from libbz2.so.
+ dlerror(); // Clear any existing error.
+ LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzDecompressInit, env, libbz2,
+ "BZ2_bzDecompressInit");
+ LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzDecompress, env, libbz2,
+ "BZ2_bzDecompress");
+ LOAD_DYNAMIC_SYMBOL(dlsym_BZ2_bzDecompressEnd, env, libbz2,
+ "BZ2_bzDecompressEnd");
+
+ // Initialize the requisite fieldIds.
+ Bzip2Decompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz",
+ "Ljava/lang/Class;");
+ Bzip2Decompressor_stream = (*env)->GetFieldID(env, class, "stream", "J");
+ Bzip2Decompressor_finished = (*env)->GetFieldID(env, class,
+ "finished", "Z");
+ Bzip2Decompressor_compressedDirectBuf = (*env)->GetFieldID(env, class,
+ "compressedDirectBuf",
+ "Ljava/nio/Buffer;");
+ Bzip2Decompressor_compressedDirectBufOff = (*env)->GetFieldID(env, class,
+ "compressedDirectBufOff", "I");
+ Bzip2Decompressor_compressedDirectBufLen = (*env)->GetFieldID(env, class,
+ "compressedDirectBufLen", "I");
+ Bzip2Decompressor_uncompressedDirectBuf = (*env)->GetFieldID(env, class,
+ "uncompressedDirectBuf",
+ "Ljava/nio/Buffer;");
+ Bzip2Decompressor_directBufferSize = (*env)->GetFieldID(env, class,
+ "directBufferSize", "I");
+}
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_init(
+ JNIEnv *env, jclass cls, jint conserveMemory)
+{
+ bz_stream *stream = malloc(sizeof(bz_stream));
+ if (stream == 0) {
+ THROW(env, "java/lang/OutOfMemoryError", NULL);
+ return (jlong)0;
+ }
+ memset((void*)stream, 0, sizeof(bz_stream));
+
+ int rv = dlsym_BZ2_bzDecompressInit(stream, 0, conserveMemory);
+
+ if (rv != BZ_OK) {
+ // Contingency - Report error by throwing appropriate exceptions.
+ free(stream);
+ stream = NULL;
+
+ switch (rv) {
+ case BZ_MEM_ERROR:
+ {
+ THROW(env, "java/lang/OutOfMemoryError", NULL);
+ }
+ break;
+ default:
+ {
+ THROW(env, "java/lang/InternalError", NULL);
+ }
+ break;
+ }
+ }
+
+ return JLONG(stream);
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_inflateBytesDirect(
+ JNIEnv *env, jobject this)
+{
+ // Get members of Bzip2Decompressor.
+ bz_stream *stream = BZSTREAM((*env)->GetLongField(env, this,
+ Bzip2Decompressor_stream));
+ if (!stream) {
+ THROW(env, "java/lang/NullPointerException", NULL);
+ return (jint)0;
+ }
+
+ jobject clazz = (*env)->GetStaticObjectField(env, this,
+ Bzip2Decompressor_clazz);
+ jarray compressed_direct_buf = (jarray)(*env)->GetObjectField(env,
+ this, Bzip2Decompressor_compressedDirectBuf);
+ jint compressed_direct_buf_off = (*env)->GetIntField(env, this,
+ Bzip2Decompressor_compressedDirectBufOff);
+ jint compressed_direct_buf_len = (*env)->GetIntField(env, this,
+ Bzip2Decompressor_compressedDirectBufLen);
+
+ jarray uncompressed_direct_buf = (jarray)(*env)->GetObjectField(env,
+ this, Bzip2Decompressor_uncompressedDirectBuf);
+ jint uncompressed_direct_buf_len = (*env)->GetIntField(env, this,
+ Bzip2Decompressor_directBufferSize);
+
+ // Get the input and output direct buffers.
+ LOCK_CLASS(env, clazz, "Bzip2Decompressor");
+ char* compressed_bytes = (*env)->GetDirectBufferAddress(env,
+ compressed_direct_buf);
+ char* uncompressed_bytes = (*env)->GetDirectBufferAddress(env,
+ uncompressed_direct_buf);
+ UNLOCK_CLASS(env, clazz, "Bzip2Decompressor");
+
+ if (!compressed_bytes || !uncompressed_bytes) {
+ return (jint)0;
+ }
+
+ // Re-calibrate the bz_stream.
+ stream->next_in = compressed_bytes + compressed_direct_buf_off;
+ stream->avail_in = compressed_direct_buf_len;
+ stream->next_out = uncompressed_bytes;
+ stream->avail_out = uncompressed_direct_buf_len;
+
+ // Decompress.
+ int rv = dlsym_BZ2_bzDecompress(stream);
+
+ // Contingency? - Report error by throwing appropriate exceptions.
+ int no_decompressed_bytes = 0;
+ switch (rv) {
+ case BZ_STREAM_END:
+ {
+ (*env)->SetBooleanField(env, this,
+ Bzip2Decompressor_finished,
+ JNI_TRUE);
+ } // cascade down
+ case BZ_OK:
+ {
+ compressed_direct_buf_off +=
+ compressed_direct_buf_len - stream->avail_in;
+ (*env)->SetIntField(env, this,
+ Bzip2Decompressor_compressedDirectBufOff,
+ compressed_direct_buf_off);
+ (*env)->SetIntField(env, this,
+ Bzip2Decompressor_compressedDirectBufLen,
+ stream->avail_in);
+ no_decompressed_bytes =
+ uncompressed_direct_buf_len - stream->avail_out;
+ }
+ break;
+ case BZ_DATA_ERROR:
+ case BZ_DATA_ERROR_MAGIC:
+ {
+ THROW(env, "java/io/IOException", NULL);
+ }
+ break;
+ case BZ_MEM_ERROR:
+ {
+ THROW(env, "java/lang/OutOfMemoryError", NULL);
+ }
+ break;
+ default:
+ {
+ THROW(env, "java/lang/InternalError", NULL);
+ }
+ break;
+ }
+
+ return no_decompressed_bytes;
+}
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_getBytesRead(
+ JNIEnv *env, jclass cls, jlong stream)
+{
+ const bz_stream* strm = BZSTREAM(stream);
+ return ((jlong)strm->total_in_hi32 << 32) | strm->total_in_lo32;
+}
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_getBytesWritten(
+ JNIEnv *env, jclass cls, jlong stream)
+{
+ const bz_stream* strm = BZSTREAM(stream);
+ return ((jlong)strm->total_out_hi32 << 32) | strm->total_out_lo32;
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_getRemaining(
+ JNIEnv *env, jclass cls, jlong stream)
+{
+ return (BZSTREAM(stream))->avail_in;
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_end(
+ JNIEnv *env, jclass cls, jlong stream)
+{
+ if (dlsym_BZ2_bzDecompressEnd(BZSTREAM(stream)) != BZ_OK) {
+ THROW(env, "java/lang/InternalError", 0);
+ } else {
+ free(BZSTREAM(stream));
+ }
+}
+
+/**
+ * vim: sw=2: ts=2: et:
+ */
+
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/org_apache_hadoop_io_compress_bzip2.h b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/org_apache_hadoop_io_compress_bzip2.h
new file mode 100644
index 00000000000..fa525bdedb9
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/org_apache_hadoop_io_compress_bzip2.h
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#if !defined ORG_APACHE_HADOOP_IO_COMPRESS_BZIP2_BZIP2_H
+#define ORG_APACHE_HADOOP_IO_COMPRESS_BZIP2_BZIP2_H
+
+#include
+#include
+#include
+#include
+#include
+
+#include "org_apache_hadoop.h"
+
+#define HADOOP_BZIP2_LIBRARY "libbz2.so.1"
+
+
+/* A helper macro to convert the java 'stream-handle' to a bz_stream pointer. */
+#define BZSTREAM(stream) ((bz_stream*)((ptrdiff_t)(stream)))
+
+/* A helper macro to convert the bz_stream pointer to the java 'stream-handle'. */
+#define JLONG(stream) ((jlong)((ptrdiff_t)(stream)))
+
+#endif //ORG_APACHE_HADOOP_IO_COMPRESS_BZIP2_BZIP2_H
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index f090f011e02..db57799753b 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -309,6 +309,20 @@
are discovered using a Java ServiceLoader.
+
+ io.compression.codec.bzip2.library
+ system-native
+ The native-code library to be used for compression and
+ decompression by the bzip2 codec. This library could be specified
+ either by by name or the full pathname. In the former case, the
+ library is located by the dynamic linker, usually searching the
+ directories specified in the environment variable LD_LIBRARY_PATH.
+
+ The value of "system-native" indicates that the default system
+ library should be used. To indicate that the algorithm should
+ operate entirely in Java, specify "java-builtin".
+
+
io.serializations
org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
index a5c6c617283..f29e51b4ff0 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.io.compress.zlib.ZlibCompressor;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.util.ReflectionUtils;
@@ -92,12 +93,33 @@ public class TestCodec {
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec");
}
- @Test
+ @Test(timeout=20000)
public void testBZip2Codec() throws IOException {
+ Configuration conf = new Configuration();
+ conf.set("io.compression.codec.bzip2.library", "java-builtin");
codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.BZip2Codec");
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");
}
+ @Test(timeout=20000)
+ public void testBZip2NativeCodec() throws IOException {
+ Configuration conf = new Configuration();
+ conf.set("io.compression.codec.bzip2.library", "system-native");
+ if (NativeCodeLoader.isNativeCodeLoaded()) {
+ if (Bzip2Factory.isNativeBzip2Loaded(conf)) {
+ codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.BZip2Codec");
+ codecTest(conf, seed, count,
+ "org.apache.hadoop.io.compress.BZip2Codec");
+ conf.set("io.compression.codec.bzip2.library", "java-builtin");
+ codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.BZip2Codec");
+ codecTest(conf, seed, count,
+ "org.apache.hadoop.io.compress.BZip2Codec");
+ } else {
+ LOG.warn("Native hadoop library available but native bzip2 is not");
+ }
+ }
+ }
+
@Test
public void testSnappyCodec() throws IOException {
if (SnappyCodec.isNativeCodeLoaded()) {
@@ -446,14 +468,37 @@ public class TestCodec {
sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.DefaultCodec", 1000000);
}
- @Test
+ @Test(timeout=20000)
public void testSequenceFileBZip2Codec() throws IOException, ClassNotFoundException,
InstantiationException, IllegalAccessException {
+ Configuration conf = new Configuration();
+ conf.set("io.compression.codec.bzip2.library", "java-builtin");
sequenceFileCodecTest(conf, 0, "org.apache.hadoop.io.compress.BZip2Codec", 100);
sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.BZip2Codec", 100);
sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.BZip2Codec", 1000000);
}
+ @Test(timeout=20000)
+ public void testSequenceFileBZip2NativeCodec() throws IOException,
+ ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
+ Configuration conf = new Configuration();
+ conf.set("io.compression.codec.bzip2.library", "system-native");
+ if (NativeCodeLoader.isNativeCodeLoaded()) {
+ if (Bzip2Factory.isNativeBzip2Loaded(conf)) {
+ sequenceFileCodecTest(conf, 0,
+ "org.apache.hadoop.io.compress.BZip2Codec", 100);
+ sequenceFileCodecTest(conf, 100,
+ "org.apache.hadoop.io.compress.BZip2Codec", 100);
+ sequenceFileCodecTest(conf, 200000,
+ "org.apache.hadoop.io.compress.BZip2Codec",
+ 1000000);
+ } else {
+ LOG.warn("Native hadoop library available but native bzip2 is not");
+ }
+ }
+ }
+
@Test
public void testSequenceFileDeflateCodec() throws IOException, ClassNotFoundException,
InstantiationException, IllegalAccessException {