HADOOP-7206. Support Snappy compression. Contributed by Issei Yoshida and Alejandro Abdelnur

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1139476 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2011-06-25 01:02:41 +00:00
parent 6894edebd9
commit 8014dfa1db
15 changed files with 1268 additions and 14 deletions

View File

@ -47,6 +47,9 @@ Trunk (unreleased changes)
HADOOP-7379. Add the ability to serialize and deserialize protocol buffers HADOOP-7379. Add the ability to serialize and deserialize protocol buffers
in ObjectWritable. (todd) in ObjectWritable. (todd)
HADOOP-7206. Support Snappy compression. (Issei Yoshida and
Alejandro Abdelnur via eli)
IMPROVEMENTS IMPROVEMENTS
HADOOP-7042. Updates to test-patch.sh to include failed test names and HADOOP-7042. Updates to test-patch.sh to include failed test names and

View File

@ -187,6 +187,9 @@
<property name="build.dir.eclipse-test-classes" value="${build.dir.eclipse}/classes-test"/> <property name="build.dir.eclipse-test-classes" value="${build.dir.eclipse}/classes-test"/>
<property name="build.dir.eclipse-test-generated-classes" value="${build.dir.eclipse}/classes-test-generated"/> <property name="build.dir.eclipse-test-generated-classes" value="${build.dir.eclipse}/classes-test-generated"/>
<!-- Use environment -->
<property environment="env" />
<!-- check if clover reports should be generated --> <!-- check if clover reports should be generated -->
<condition property="clover.enabled"> <condition property="clover.enabled">
<and> <and>
@ -210,6 +213,14 @@
<property name="package.buildroot" value="/tmp/hadoop_package_build_${user.name}"/> <property name="package.buildroot" value="/tmp/hadoop_package_build_${user.name}"/>
<property name="package.build.dir" value="/tmp/hadoop_package_build_${user.name}/BUILD"/> <property name="package.build.dir" value="/tmp/hadoop_package_build_${user.name}/BUILD"/>
<!-- Indicate is Snappy native library should be bundled with Hadoop or not -->
<property name="bundle.snappy" value="false"/>
<!-- Snappy native library location -->
<property name="snappy.prefix" value="/usr/local"/>
<property name="snappy.lib" value="${snappy.prefix}/lib"/>
<property name="snappy.include" value="${snappy.prefix}/include"/>
<!-- the normal classpath --> <!-- the normal classpath -->
<path id="classpath"> <path id="classpath">
<pathelement location="${build.classes}"/> <pathelement location="${build.classes}"/>
@ -401,12 +412,13 @@
<target name="create-native-makefile" depends="check-native-makefile" if="need.native.makefile"> <target name="create-native-makefile" depends="check-native-makefile" if="need.native.makefile">
<antcall target="create-native-configure"/> <antcall target="create-native-configure"/>
<mkdir dir="${build.native}"/> <mkdir dir="${build.native}"/>
<exec dir="${build.native}" executable="sh" failonerror="true"> <exec dir="${build.native}" executable="sh" failonerror="true">
<env key="OS_NAME" value="${os.name}"/> <env key="OS_NAME" value="${os.name}"/>
<env key="OS_ARCH" value="${os.arch}"/> <env key="OS_ARCH" value="${os.arch}"/>
<env key="JVM_DATA_MODEL" value="${sun.arch.data.model}"/> <env key="JVM_DATA_MODEL" value="${sun.arch.data.model}"/>
<env key="HADOOP_NATIVE_SRCDIR" value="${native.src.dir}"/> <env key="HADOOP_NATIVE_SRCDIR" value="${native.src.dir}"/>
<arg line="${native.src.dir}/configure"/> <arg line="${native.src.dir}/configure CPPFLAGS=-I${snappy.include} LDFLAGS=-L${snappy.lib}"/>
</exec> </exec>
</target> </target>
@ -416,6 +428,7 @@
<mkdir dir="${build.native}/lib"/> <mkdir dir="${build.native}/lib"/>
<mkdir dir="${build.native}/src/org/apache/hadoop/io/compress/zlib"/> <mkdir dir="${build.native}/src/org/apache/hadoop/io/compress/zlib"/>
<mkdir dir="${build.native}/src/org/apache/hadoop/io/compress/snappy"/>
<mkdir dir="${build.native}/src/org/apache/hadoop/io/nativeio"/> <mkdir dir="${build.native}/src/org/apache/hadoop/io/nativeio"/>
<mkdir dir="${build.native}/src/org/apache/hadoop/security"/> <mkdir dir="${build.native}/src/org/apache/hadoop/security"/>
@ -429,6 +442,16 @@
<class name="org.apache.hadoop.io.compress.zlib.ZlibDecompressor" /> <class name="org.apache.hadoop.io.compress.zlib.ZlibDecompressor" />
</javah> </javah>
<javah
classpath="${build.classes}"
destdir="${build.native}/src/org/apache/hadoop/io/compress/snappy"
force="yes"
verbose="yes"
>
<class name="org.apache.hadoop.io.compress.snappy.SnappyCompressor"/>
<class name="org.apache.hadoop.io.compress.snappy.SnappyDecompressor"/>
</javah>
<javah <javah
classpath="${build.classes}" classpath="${build.classes}"
destdir="${build.native}/src/org/apache/hadoop/security" destdir="${build.native}/src/org/apache/hadoop/security"
@ -756,9 +779,10 @@
<sysproperty key="java.security.krb5.conf" value="@{test.krb5.conf.filename}"/> <sysproperty key="java.security.krb5.conf" value="@{test.krb5.conf.filename}"/>
<sysproperty key="hadoop.policy.file" value="hadoop-policy.xml" /> <sysproperty key="hadoop.policy.file" value="hadoop-policy.xml" />
<sysproperty key="java.library.path" <sysproperty key="java.library.path"
value="${build.native}/lib:${lib.dir}/native/${build.platform}"/> value="${build.native}/lib:${lib.dir}/native/${build.platform}:${snappy.lib}"/>
<sysproperty key="java.security.egd" value="file:///dev/urandom" /> <sysproperty key="java.security.egd" value="file:///dev/urandom" />
<sysproperty key="install.c++.examples" value="${install.c++.examples}"/> <sysproperty key="install.c++.examples" value="${install.c++.examples}"/>
<!-- set io.compression.codec.lzo.class in the child jvm only if it is set --> <!-- set io.compression.codec.lzo.class in the child jvm only if it is set -->
<syspropertyset dynamic="no"> <syspropertyset dynamic="no">
<propertyref name="io.compression.codec.lzo.class"/> <propertyref name="io.compression.codec.lzo.class"/>
@ -879,7 +903,6 @@
<property name="findbugs.home" value=""/> <property name="findbugs.home" value=""/>
<target name="findbugs" depends="check-for-findbugs, jar" if="findbugs.present" description="Run findbugs if present"> <target name="findbugs" depends="check-for-findbugs, jar" if="findbugs.present" description="Run findbugs if present">
<property environment="env"/>
<property name="findbugs.out.dir" value="${test.build.dir}/findbugs"/> <property name="findbugs.out.dir" value="${test.build.dir}/findbugs"/>
<property name="findbugs.exclude.file" value="${test.src.dir}/findbugsExcludeFile.xml"/> <property name="findbugs.exclude.file" value="${test.src.dir}/findbugsExcludeFile.xml"/>
<property name="findbugs.report.htmlfile" value="${findbugs.out.dir}/hadoop-findbugs-report.html"/> <property name="findbugs.report.htmlfile" value="${findbugs.out.dir}/hadoop-findbugs-report.html"/>
@ -1112,6 +1135,8 @@
<env key="BASE_NATIVE_LIB_DIR" value="${lib.dir}/native"/> <env key="BASE_NATIVE_LIB_DIR" value="${lib.dir}/native"/>
<env key="BUILD_NATIVE_DIR" value="${build.dir}/native"/> <env key="BUILD_NATIVE_DIR" value="${build.dir}/native"/>
<env key="DIST_LIB_DIR" value="${dist.dir}/lib/native"/> <env key="DIST_LIB_DIR" value="${dist.dir}/lib/native"/>
<env key="BUNDLE_SNAPPY_LIB" value="${bundle.snappy}"/>
<env key="SNAPPY_LIB_DIR" value="${snappy.prefix}/lib"/>
<arg line="${native.src.dir}/packageNativeHadoop.sh"/> <arg line="${native.src.dir}/packageNativeHadoop.sh"/>
</exec> </exec>
@ -1213,6 +1238,8 @@
<env key="BASE_NATIVE_LIB_DIR" value="${lib.dir}/native"/> <env key="BASE_NATIVE_LIB_DIR" value="${lib.dir}/native"/>
<env key="BUILD_NATIVE_DIR" value="${build.dir}/native"/> <env key="BUILD_NATIVE_DIR" value="${build.dir}/native"/>
<env key="DIST_LIB_DIR" value="${dist.dir}/lib"/> <env key="DIST_LIB_DIR" value="${dist.dir}/lib"/>
<env key="BUNDLE_SNAPPY_LIB" value="${bundle.snappy}"/>
<env key="SNAPPY_LIB_DIR" value="${snappy.prefix}/lib"/>
<arg line="${native.src.dir}/packageNativeHadoop.sh"/> <arg line="${native.src.dir}/packageNativeHadoop.sh"/>
</exec> </exec>

View File

@ -1,4 +1,6 @@
<?xml version="1.0"?> <?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- <!--
Licensed to the Apache Software Foundation (ASF) under one or more Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with contributor license agreements. See the NOTICE file distributed with
@ -15,7 +17,6 @@
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
--> -->
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Do not modify this file directly. Instead, copy entries that you --> <!-- Do not modify this file directly. Instead, copy entries that you -->
<!-- wish to modify from this file into core-site.xml and change them --> <!-- wish to modify from this file into core-site.xml and change them -->
@ -174,7 +175,7 @@
<property> <property>
<name>io.compression.codecs</name> <name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec</value> <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec</value>
<description>A list of the compression codec classes that can be used <description>A list of the compression codec classes that can be used
for compression/decompression.</description> for compression/decompression.</description>
</property> </property>

View File

@ -85,5 +85,13 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
*/ */
public static final String NET_TOPOLOGY_CONFIGURED_NODE_MAPPING_KEY = public static final String NET_TOPOLOGY_CONFIGURED_NODE_MAPPING_KEY =
"net.topology.configured.node.mapping"; "net.topology.configured.node.mapping";
/** Internal buffer size for Snappy compressor/decompressors */
public static final String IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY =
"io.compression.codec.snappy.buffersize";
/** Default value for IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY */
public static final int IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT =
256 * 1024;
} }

View File

@ -0,0 +1,220 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.snappy.LoadSnappy;
import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
import org.apache.hadoop.fs.CommonConfigurationKeys;
/**
* This class creates snappy compressors/decompressors.
*/
public class SnappyCodec implements Configurable, CompressionCodec {
static {
LoadSnappy.isLoaded();
}
Configuration conf;
/**
* Set the configuration to be used by this object.
*
* @param conf the configuration object.
*/
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
/**
* Return the configuration used by this object.
*
* @return the configuration object used by this objec.
*/
@Override
public Configuration getConf() {
return conf;
}
/**
* Are the native snappy libraries loaded & initialized?
*
* @param conf configuration
* @return true if loaded & initialized, otherwise false
*/
public static boolean isNativeSnappyLoaded(Configuration conf) {
return LoadSnappy.isLoaded() && conf.getBoolean(
CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY,
CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_DEFAULT);
}
/**
* Create a {@link CompressionOutputStream} that will write to the given
* {@link OutputStream}.
*
* @param out the location for the final output stream
* @return a stream the user can write uncompressed data to have it compressed
* @throws IOException
*/
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
return createOutputStream(out, createCompressor());
}
/**
* Create a {@link CompressionOutputStream} that will write to the given
* {@link OutputStream} with the given {@link Compressor}.
*
* @param out the location for the final output stream
* @param compressor compressor to use
* @return a stream the user can write uncompressed data to have it compressed
* @throws IOException
*/
@Override
public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor)
throws IOException {
if (!isNativeSnappyLoaded(conf)) {
throw new RuntimeException("native snappy library not available");
}
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
int compressionOverhead = (bufferSize / 6) + 32;
return new BlockCompressorStream(out, compressor, bufferSize,
compressionOverhead);
}
/**
* Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
*
* @return the type of compressor needed by this codec.
*/
@Override
public Class<? extends Compressor> getCompressorType() {
if (!isNativeSnappyLoaded(conf)) {
throw new RuntimeException("native snappy library not available");
}
return SnappyCompressor.class;
}
/**
* Create a new {@link Compressor} for use by this {@link CompressionCodec}.
*
* @return a new compressor for use by this codec
*/
@Override
public Compressor createCompressor() {
if (!isNativeSnappyLoaded(conf)) {
throw new RuntimeException("native snappy library not available");
}
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
return new SnappyCompressor(bufferSize);
}
/**
* Create a {@link CompressionInputStream} that will read from the given
* input stream.
*
* @param in the stream to read compressed bytes from
* @return a stream to read uncompressed bytes from
* @throws IOException
*/
@Override
public CompressionInputStream createInputStream(InputStream in)
throws IOException {
return createInputStream(in, createDecompressor());
}
/**
* Create a {@link CompressionInputStream} that will read from the given
* {@link InputStream} with the given {@link Decompressor}.
*
* @param in the stream to read compressed bytes from
* @param decompressor decompressor to use
* @return a stream to read uncompressed bytes from
* @throws IOException
*/
@Override
public CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor)
throws IOException {
if (!isNativeSnappyLoaded(conf)) {
throw new RuntimeException("native snappy library not available");
}
return new BlockDecompressorStream(in, decompressor, conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
}
/**
* Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
*
* @return the type of decompressor needed by this codec.
*/
@Override
public Class<? extends Decompressor> getDecompressorType() {
if (!isNativeSnappyLoaded(conf)) {
throw new RuntimeException("native snappy library not available");
}
return SnappyDecompressor.class;
}
/**
* Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
*
* @return a new decompressor for use by this codec
*/
@Override
public Decompressor createDecompressor() {
if (!isNativeSnappyLoaded(conf)) {
throw new RuntimeException("native snappy library not available");
}
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
return new SnappyDecompressor(bufferSize);
}
/**
* Get the default filename extension for this kind of compression.
*
* @return <code>.snappy</code>.
*/
@Override
public String getDefaultExtension() {
return ".snappy";
}
}

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress.snappy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.NativeCodeLoader;
/**
* Determines if Snappy native library is available and loads it if available.
*/
public class LoadSnappy {
private static final Log LOG = LogFactory.getLog(LoadSnappy.class.getName());
private static boolean AVAILABLE = false;
private static boolean LOADED = false;
static {
try {
System.loadLibrary("snappy");
LOG.warn("Snappy native library is available");
AVAILABLE = true;
} catch (UnsatisfiedLinkError ex) {
//NOP
}
boolean hadoopNativeAvailable = NativeCodeLoader.isNativeCodeLoaded();
LOADED = AVAILABLE && hadoopNativeAvailable;
if (LOADED) {
LOG.info("Snappy native library loaded");
} else {
LOG.warn("Snappy native library not loaded");
}
}
/**
* Returns if Snappy native library is loaded.
*
* @return <code>true</code> if Snappy native library is loaded,
* <code>false</code> if not.
*/
public static boolean isAvailable() {
return AVAILABLE;
}
/**
* Returns if Snappy native library is loaded.
*
* @return <code>true</code> if Snappy native library is loaded,
* <code>false</code> if not.
*/
public static boolean isLoaded() {
return LOADED;
}
}

View File

@ -0,0 +1,298 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress.snappy;
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.Compressor;
/**
* A {@link Compressor} based on the snappy compression algorithm.
* http://code.google.com/p/snappy/
*/
public class SnappyCompressor implements Compressor {
private static final Log LOG =
LogFactory.getLog(SnappyCompressor.class.getName());
private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024;
// HACK - Use this as a global lock in the JNI layer
@SuppressWarnings({"unchecked", "unused"})
private static Class clazz = SnappyCompressor.class;
private int directBufferSize;
private Buffer compressedDirectBuf = null;
private int uncompressedDirectBufLen;
private Buffer uncompressedDirectBuf = null;
private byte[] userBuf = null;
private int userBufOff = 0, userBufLen = 0;
private boolean finish, finished;
private long bytesRead = 0L;
private long bytesWritten = 0L;
static {
if (LoadSnappy.isLoaded()) {
// Initialize the native library
try {
initIDs();
} catch (Throwable t) {
// Ignore failure to load/initialize snappy
LOG.warn(t.toString());
}
} else {
LOG.error("Cannot load " + SnappyCompressor.class.getName() +
" without snappy library!");
}
}
/**
* Creates a new compressor.
*
* @param directBufferSize size of the direct buffer to be used.
*/
public SnappyCompressor(int directBufferSize) {
this.directBufferSize = directBufferSize;
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
compressedDirectBuf.position(directBufferSize);
}
/**
* Creates a new compressor with the default buffer size.
*/
public SnappyCompressor() {
this(DEFAULT_DIRECT_BUFFER_SIZE);
}
/**
* Sets input data for compression.
* This should be called whenever #needsInput() returns
* <code>true</code> indicating that more input data is required.
*
* @param b Input data
* @param off Start offset
* @param len Length
*/
@Override
public synchronized void setInput(byte[] b, int off, int len) {
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
finished = false;
if (len > uncompressedDirectBuf.remaining()) {
// save data; now !needsInput
this.userBuf = b;
this.userBufOff = off;
this.userBufLen = len;
} else {
((ByteBuffer) uncompressedDirectBuf).put(b, off, len);
uncompressedDirectBufLen = uncompressedDirectBuf.position();
}
bytesRead += len;
}
/**
* If a write would exceed the capacity of the direct buffers, it is set
* aside to be loaded by this function while the compressed data are
* consumed.
*/
synchronized void setInputFromSavedData() {
if (0 >= userBufLen) {
return;
}
finished = false;
uncompressedDirectBufLen = Math.min(userBufLen, directBufferSize);
((ByteBuffer) uncompressedDirectBuf).put(userBuf, userBufOff,
uncompressedDirectBufLen);
// Note how much data is being fed to snappy
userBufOff += uncompressedDirectBufLen;
userBufLen -= uncompressedDirectBufLen;
}
/**
* Does nothing.
*/
@Override
public synchronized void setDictionary(byte[] b, int off, int len) {
// do nothing
}
/**
* Returns true if the input data buffer is empty and
* #setInput() should be called to provide more input.
*
* @return <code>true</code> if the input data buffer is empty and
* #setInput() should be called in order to provide more input.
*/
@Override
public synchronized boolean needsInput() {
return !(compressedDirectBuf.remaining() > 0
|| uncompressedDirectBuf.remaining() == 0 || userBufLen > 0);
}
/**
* When called, indicates that compression should end
* with the current contents of the input buffer.
*/
@Override
public synchronized void finish() {
finish = true;
}
/**
* Returns true if the end of the compressed
* data output stream has been reached.
*
* @return <code>true</code> if the end of the compressed
* data output stream has been reached.
*/
@Override
public synchronized boolean finished() {
// Check if all uncompressed data has been consumed
return (finish && finished && compressedDirectBuf.remaining() == 0);
}
/**
* Fills specified buffer with compressed data. Returns actual number
* of bytes of compressed data. A return value of 0 indicates that
* needsInput() should be called in order to determine if more input
* data is required.
*
* @param b Buffer for the compressed data
* @param off Start offset of the data
* @param len Size of the buffer
* @return The actual number of bytes of compressed data.
*/
@Override
public synchronized int compress(byte[] b, int off, int len)
throws IOException {
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
// Check if there is compressed data
int n = compressedDirectBuf.remaining();
if (n > 0) {
n = Math.min(n, len);
((ByteBuffer) compressedDirectBuf).get(b, off, n);
bytesWritten += n;
return n;
}
// Re-initialize the snappy's output direct-buffer
compressedDirectBuf.clear();
compressedDirectBuf.limit(0);
if (0 == uncompressedDirectBuf.position()) {
// No compressed data, so we should have !needsInput or !finished
setInputFromSavedData();
if (0 == uncompressedDirectBuf.position()) {
// Called without data; write nothing
finished = true;
return 0;
}
}
// Compress data
n = compressBytesDirect();
compressedDirectBuf.limit(n);
uncompressedDirectBuf.clear(); // snappy consumes all buffer input
// Set 'finished' if snapy has consumed all user-data
if (0 == userBufLen) {
finished = true;
}
// Get atmost 'len' bytes
n = Math.min(n, len);
bytesWritten += n;
((ByteBuffer) compressedDirectBuf).get(b, off, n);
return n;
}
/**
* Resets compressor so that a new set of input data can be processed.
*/
@Override
public synchronized void reset() {
finish = false;
finished = false;
uncompressedDirectBuf.clear();
uncompressedDirectBufLen = 0;
compressedDirectBuf.clear();
compressedDirectBuf.limit(0);
userBufOff = userBufLen = 0;
bytesRead = bytesWritten = 0L;
}
/**
* Prepare the compressor to be used in a new stream with settings defined in
* the given Configuration
*
* @param conf Configuration from which new setting are fetched
*/
@Override
public synchronized void reinit(Configuration conf) {
reset();
}
/**
* Return number of bytes given to this compressor since last reset.
*/
@Override
public synchronized long getBytesRead() {
return bytesRead;
}
/**
* Return number of bytes consumed by callers of compress since last reset.
*/
@Override
public synchronized long getBytesWritten() {
return bytesWritten;
}
/**
* Closes the compressor and discards any unprocessed input.
*/
@Override
public synchronized void end() {
}
private native static void initIDs();
private native int compressBytesDirect();
}

View File

@ -0,0 +1,280 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress.snappy;
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.Decompressor;
/**
* A {@link Decompressor} based on the snappy compression algorithm.
* http://code.google.com/p/snappy/
*/
public class SnappyDecompressor implements Decompressor {
private static final Log LOG =
LogFactory.getLog(SnappyCompressor.class.getName());
private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024;
// HACK - Use this as a global lock in the JNI layer
@SuppressWarnings({"unchecked", "unused"})
private static Class clazz = SnappyDecompressor.class;
private int directBufferSize;
private Buffer compressedDirectBuf = null;
private int compressedDirectBufLen;
private Buffer uncompressedDirectBuf = null;
private byte[] userBuf = null;
private int userBufOff = 0, userBufLen = 0;
private boolean finished;
static {
if (LoadSnappy.isLoaded()) {
// Initialize the native library
try {
initIDs();
} catch (Throwable t) {
// Ignore failure to load/initialize snappy
LOG.warn(t.toString());
}
} else {
LOG.error("Cannot load " + SnappyDecompressor.class.getName() +
" without snappy library!");
}
}
/**
* Creates a new compressor.
*
* @param directBufferSize size of the direct buffer to be used.
*/
public SnappyDecompressor(int directBufferSize) {
this.directBufferSize = directBufferSize;
compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
uncompressedDirectBuf.position(directBufferSize);
}
/**
* Creates a new decompressor with the default buffer size.
*/
public SnappyDecompressor() {
this(DEFAULT_DIRECT_BUFFER_SIZE);
}
/**
* Sets input data for decompression.
* This should be called if and only if {@link #needsInput()} returns
* <code>true</code> indicating that more input data is required.
* (Both native and non-native versions of various Decompressors require
* that the data passed in via <code>b[]</code> remain unmodified until
* the caller is explicitly notified--via {@link #needsInput()}--that the
* buffer may be safely modified. With this requirement, an extra
* buffer-copy can be avoided.)
*
* @param b Input data
* @param off Start offset
* @param len Length
*/
@Override
public synchronized void setInput(byte[] b, int off, int len) {
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
this.userBuf = b;
this.userBufOff = off;
this.userBufLen = len;
setInputFromSavedData();
// Reinitialize snappy's output direct-buffer
uncompressedDirectBuf.limit(directBufferSize);
uncompressedDirectBuf.position(directBufferSize);
}
/**
* If a write would exceed the capacity of the direct buffers, it is set
* aside to be loaded by this function while the compressed data are
* consumed.
*/
synchronized void setInputFromSavedData() {
compressedDirectBufLen = Math.min(userBufLen, directBufferSize);
// Reinitialize snappy's input direct buffer
compressedDirectBuf.rewind();
((ByteBuffer) compressedDirectBuf).put(userBuf, userBufOff,
compressedDirectBufLen);
// Note how much data is being fed to snappy
userBufOff += compressedDirectBufLen;
userBufLen -= compressedDirectBufLen;
}
/**
* Does nothing.
*/
@Override
public synchronized void setDictionary(byte[] b, int off, int len) {
// do nothing
}
/**
* Returns true if the input data buffer is empty and
* {@link #setInput(byte[], int, int)} should be called to
* provide more input.
*
* @return <code>true</code> if the input data buffer is empty and
* {@link #setInput(byte[], int, int)} should be called in
* order to provide more input.
*/
@Override
public synchronized boolean needsInput() {
// Consume remaining compressed data?
if (uncompressedDirectBuf.remaining() > 0) {
return false;
}
// Check if snappy has consumed all input
if (compressedDirectBufLen <= 0) {
// Check if we have consumed all user-input
if (userBufLen <= 0) {
return true;
} else {
setInputFromSavedData();
}
}
return false;
}
/**
* Returns <code>false</code>.
*
* @return <code>false</code>.
*/
@Override
public synchronized boolean needsDictionary() {
return false;
}
/**
* Returns true if the end of the decompressed
* data output stream has been reached.
*
* @return <code>true</code> if the end of the decompressed
* data output stream has been reached.
*/
@Override
public synchronized boolean finished() {
return (finished && uncompressedDirectBuf.remaining() == 0);
}
/**
* Fills specified buffer with uncompressed data. Returns actual number
* of bytes of uncompressed data. A return value of 0 indicates that
* {@link #needsInput()} should be called in order to determine if more
* input data is required.
*
* @param b Buffer for the compressed data
* @param off Start offset of the data
* @param len Size of the buffer
* @return The actual number of bytes of compressed data.
* @throws IOException
*/
@Override
public synchronized int decompress(byte[] b, int off, int len)
throws IOException {
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
int n = 0;
// Check if there is uncompressed data
n = uncompressedDirectBuf.remaining();
if (n > 0) {
n = Math.min(n, len);
((ByteBuffer) uncompressedDirectBuf).get(b, off, n);
return n;
}
if (compressedDirectBufLen > 0) {
// Re-initialize the snappy's output direct buffer
uncompressedDirectBuf.rewind();
uncompressedDirectBuf.limit(directBufferSize);
// Decompress data
n = decompressBytesDirect();
uncompressedDirectBuf.limit(n);
if (userBufLen <= 0) {
finished = true;
}
// Get atmost 'len' bytes
n = Math.min(n, len);
((ByteBuffer) uncompressedDirectBuf).get(b, off, n);
}
return n;
}
/**
* Returns <code>0</code>.
*
* @return <code>0</code>.
*/
@Override
public synchronized int getRemaining() {
// Never use this function in BlockDecompressorStream.
return 0;
}
public synchronized void reset() {
finished = false;
compressedDirectBufLen = 0;
uncompressedDirectBuf.limit(directBufferSize);
uncompressedDirectBuf.position(directBufferSize);
userBufOff = userBufLen = 0;
}
/**
* Resets decompressor and input and output buffers so that a new set of
* input data can be processed.
*/
@Override
public synchronized void end() {
// do nothing
}
private native static void initIDs();
private native int decompressBytesDirect();
}

View File

@ -34,6 +34,7 @@ export PLATFORM = $(shell echo $$OS_NAME | tr [A-Z] [a-z])
ACLOCAL_AMFLAGS = -I m4 ACLOCAL_AMFLAGS = -I m4
AM_CPPFLAGS = @JNI_CPPFLAGS@ -I$(HADOOP_NATIVE_SRCDIR)/src \ AM_CPPFLAGS = @JNI_CPPFLAGS@ -I$(HADOOP_NATIVE_SRCDIR)/src \
-Isrc/org/apache/hadoop/io/compress/zlib \ -Isrc/org/apache/hadoop/io/compress/zlib \
-Isrc/org/apache/hadoop/io/compress/snappy \
-Isrc/org/apache/hadoop/security \ -Isrc/org/apache/hadoop/security \
-Isrc/org/apache/hadoop/io/nativeio/ -Isrc/org/apache/hadoop/io/nativeio/
AM_LDFLAGS = @JNI_LDFLAGS@ AM_LDFLAGS = @JNI_LDFLAGS@
@ -46,6 +47,8 @@ endif
lib_LTLIBRARIES = libhadoop.la lib_LTLIBRARIES = libhadoop.la
libhadoop_la_SOURCES = src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c \ libhadoop_la_SOURCES = src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c \
src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c \ src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c \
src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c \
src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c \
src/org/apache/hadoop/security/getGroup.c \ src/org/apache/hadoop/security/getGroup.c \
src/org/apache/hadoop/security/JniBasedUnixGroupsMapping.c \ src/org/apache/hadoop/security/JniBasedUnixGroupsMapping.c \
src/org/apache/hadoop/security/JniBasedUnixGroupsNetgroupMapping.c \ src/org/apache/hadoop/security/JniBasedUnixGroupsNetgroupMapping.c \

View File

@ -88,6 +88,9 @@ AC_SUBST([JNI_CPPFLAGS])
dnl Check for zlib headers dnl Check for zlib headers
AC_CHECK_HEADERS([zlib.h zconf.h], AC_COMPUTE_NEEDED_DSO(z,HADOOP_ZLIB_LIBRARY), AC_MSG_ERROR(Zlib headers were not found... native-hadoop library needs zlib to build. Please install the requisite zlib development package.)) AC_CHECK_HEADERS([zlib.h zconf.h], AC_COMPUTE_NEEDED_DSO(z,HADOOP_ZLIB_LIBRARY), AC_MSG_ERROR(Zlib headers were not found... native-hadoop library needs zlib to build. Please install the requisite zlib development package.))
dnl Check for snappy headers
AC_CHECK_HEADERS([snappy-c.h], AC_COMPUTE_NEEDED_DSO(snappy,HADOOP_SNAPPY_LIBRARY), AC_MSG_WARN(Snappy headers were not found... building without snappy.))
dnl Check for headers needed by the native Group resolution implementation dnl Check for headers needed by the native Group resolution implementation
AC_CHECK_HEADERS([fcntl.h stdlib.h string.h unistd.h], [], AC_MSG_ERROR(Some system headers not found... please ensure their presence on your platform.)) AC_CHECK_HEADERS([fcntl.h stdlib.h string.h unistd.h], [], AC_MSG_ERROR(Some system headers not found... please ensure their presence on your platform.))

View File

@ -62,4 +62,17 @@ then
done done
fi fi
if [ "${BUNDLE_SNAPPY_LIB}" = "true" ]
then
if [ -d ${SNAPPY_LIB_DIR} ]
then
echo "Copying Snappy library in ${SNAPPY_LIB_DIR} to $DIST_LIB_DIR/"
cd ${SNAPPY_LIB_DIR}
$TAR . | (cd $DIST_LIB_DIR/; $UNTAR)
else
echo "Snappy lib directory ${SNAPPY_LIB_DIR} does not exist"
exit 1
fi
fi
#vim: ts=2: sw=2: et #vim: ts=2: sw=2: et

View File

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if defined HAVE_CONFIG_H
#include <config.h>
#endif
#if defined HADOOP_SNAPPY_LIBRARY
#if defined HAVE_STDIO_H
#include <stdio.h>
#else
#error 'stdio.h not found'
#endif
#if defined HAVE_STDLIB_H
#include <stdlib.h>
#else
#error 'stdlib.h not found'
#endif
#if defined HAVE_STRING_H
#include <string.h>
#else
#error 'string.h not found'
#endif
#if defined HAVE_DLFCN_H
#include <dlfcn.h>
#else
#error 'dlfcn.h not found'
#endif
#include "org_apache_hadoop_io_compress_snappy.h"
#include "org_apache_hadoop_io_compress_snappy_SnappyCompressor.h"
static jfieldID SnappyCompressor_clazz;
static jfieldID SnappyCompressor_uncompressedDirectBuf;
static jfieldID SnappyCompressor_uncompressedDirectBufLen;
static jfieldID SnappyCompressor_compressedDirectBuf;
static jfieldID SnappyCompressor_directBufferSize;
static snappy_status (*dlsym_snappy_compress)(const char*, size_t, char*, size_t*);
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyCompressor_initIDs
(JNIEnv *env, jclass clazz){
// Load libsnappy.so
void *libsnappy = dlopen(HADOOP_SNAPPY_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
if (!libsnappy) {
char* msg = (char*)malloc(1000);
snprintf(msg, 1000, "%s (%s)!", "Cannot load " HADOOP_SNAPPY_LIBRARY, dlerror());
THROW(env, "java/lang/UnsatisfiedLinkError", msg);
return;
}
// Locate the requisite symbols from libsnappy.so
dlerror(); // Clear any existing error
LOAD_DYNAMIC_SYMBOL(dlsym_snappy_compress, env, libsnappy, "snappy_compress");
SnappyCompressor_clazz = (*env)->GetStaticFieldID(env, clazz, "clazz",
"Ljava/lang/Class;");
SnappyCompressor_uncompressedDirectBuf = (*env)->GetFieldID(env, clazz,
"uncompressedDirectBuf",
"Ljava/nio/Buffer;");
SnappyCompressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, clazz,
"uncompressedDirectBufLen", "I");
SnappyCompressor_compressedDirectBuf = (*env)->GetFieldID(env, clazz,
"compressedDirectBuf",
"Ljava/nio/Buffer;");
SnappyCompressor_directBufferSize = (*env)->GetFieldID(env, clazz,
"directBufferSize", "I");
}
JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyCompressor_compressBytesDirect
(JNIEnv *env, jobject thisj){
// Get members of SnappyCompressor
jobject clazz = (*env)->GetStaticObjectField(env, thisj, SnappyCompressor_clazz);
jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, SnappyCompressor_uncompressedDirectBuf);
jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, SnappyCompressor_uncompressedDirectBufLen);
jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, SnappyCompressor_compressedDirectBuf);
jint compressed_direct_buf_len = (*env)->GetIntField(env, thisj, SnappyCompressor_directBufferSize);
// Get the input direct buffer
LOCK_CLASS(env, clazz, "SnappyCompressor");
const char* uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
UNLOCK_CLASS(env, clazz, "SnappyCompressor");
if (uncompressed_bytes == 0) {
return (jint)0;
}
// Get the output direct buffer
LOCK_CLASS(env, clazz, "SnappyCompressor");
char* compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
UNLOCK_CLASS(env, clazz, "SnappyCompressor");
if (compressed_bytes == 0) {
return (jint)0;
}
snappy_status ret = dlsym_snappy_compress(uncompressed_bytes, uncompressed_direct_buf_len, compressed_bytes, &compressed_direct_buf_len);
if (ret != SNAPPY_OK){
THROW(env, "Ljava/lang/InternalError", "Could not compress data. Buffer length is too small.");
}
(*env)->SetIntField(env, thisj, SnappyCompressor_uncompressedDirectBufLen, 0);
return (jint)compressed_direct_buf_len;
}
#endif //define HADOOP_SNAPPY_LIBRARY

View File

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if defined HAVE_CONFIG_H
#include <config.h>
#endif
#if defined HADOOP_SNAPPY_LIBRARY
#if defined HAVE_STDIO_H
#include <stdio.h>
#else
#error 'stdio.h not found'
#endif
#if defined HAVE_STDLIB_H
#include <stdlib.h>
#else
#error 'stdlib.h not found'
#endif
#if defined HAVE_STRING_H
#include <string.h>
#else
#error 'string.h not found'
#endif
#if defined HAVE_DLFCN_H
#include <dlfcn.h>
#else
#error 'dlfcn.h not found'
#endif
#include "org_apache_hadoop_io_compress_snappy.h"
#include "org_apache_hadoop_io_compress_snappy_SnappyDecompressor.h"
static jfieldID SnappyDecompressor_clazz;
static jfieldID SnappyDecompressor_compressedDirectBuf;
static jfieldID SnappyDecompressor_compressedDirectBufLen;
static jfieldID SnappyDecompressor_uncompressedDirectBuf;
static jfieldID SnappyDecompressor_directBufferSize;
static snappy_status (*dlsym_snappy_uncompress)(const char*, size_t, char*, size_t*);
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyDecompressor_initIDs
(JNIEnv *env, jclass clazz){
// Load libsnappy.so
void *libsnappy = dlopen(HADOOP_SNAPPY_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
if (!libsnappy) {
char* msg = (char*)malloc(1000);
snprintf(msg, 1000, "%s (%s)!", "Cannot load " HADOOP_SNAPPY_LIBRARY, dlerror());
THROW(env, "java/lang/UnsatisfiedLinkError", msg);
return;
}
// Locate the requisite symbols from libsnappy.so
dlerror(); // Clear any existing error
LOAD_DYNAMIC_SYMBOL(dlsym_snappy_uncompress, env, libsnappy, "snappy_uncompress");
SnappyDecompressor_clazz = (*env)->GetStaticFieldID(env, clazz, "clazz",
"Ljava/lang/Class;");
SnappyDecompressor_compressedDirectBuf = (*env)->GetFieldID(env,clazz,
"compressedDirectBuf",
"Ljava/nio/Buffer;");
SnappyDecompressor_compressedDirectBufLen = (*env)->GetFieldID(env,clazz,
"compressedDirectBufLen", "I");
SnappyDecompressor_uncompressedDirectBuf = (*env)->GetFieldID(env,clazz,
"uncompressedDirectBuf",
"Ljava/nio/Buffer;");
SnappyDecompressor_directBufferSize = (*env)->GetFieldID(env, clazz,
"directBufferSize", "I");
}
JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyDecompressor_decompressBytesDirect
(JNIEnv *env, jobject thisj){
// Get members of SnappyDecompressor
jobject clazz = (*env)->GetStaticObjectField(env,thisj, SnappyDecompressor_clazz);
jobject compressed_direct_buf = (*env)->GetObjectField(env,thisj, SnappyDecompressor_compressedDirectBuf);
jint compressed_direct_buf_len = (*env)->GetIntField(env,thisj, SnappyDecompressor_compressedDirectBufLen);
jobject uncompressed_direct_buf = (*env)->GetObjectField(env,thisj, SnappyDecompressor_uncompressedDirectBuf);
size_t uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, SnappyDecompressor_directBufferSize);
// Get the input direct buffer
LOCK_CLASS(env, clazz, "SnappyDecompressor");
const char* compressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
UNLOCK_CLASS(env, clazz, "SnappyDecompressor");
if (compressed_bytes == 0) {
return (jint)0;
}
// Get the output direct buffer
LOCK_CLASS(env, clazz, "SnappyDecompressor");
char* uncompressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
UNLOCK_CLASS(env, clazz, "SnappyDecompressor");
if (uncompressed_bytes == 0) {
return (jint)0;
}
snappy_status ret = dlsym_snappy_uncompress(compressed_bytes, compressed_direct_buf_len, uncompressed_bytes, &uncompressed_direct_buf_len);
if (ret == SNAPPY_BUFFER_TOO_SMALL){
THROW(env, "Ljava/lang/InternalError", "Could not decompress data. Buffer length is too small.");
} else if (ret == SNAPPY_INVALID_INPUT){
THROW(env, "Ljava/lang/InternalError", "Could not decompress data. Input is invalid.");
} else if (ret != SNAPPY_OK){
THROW(env, "Ljava/lang/InternalError", "Could not decompress data.");
}
(*env)->SetIntField(env, thisj, SnappyDecompressor_compressedDirectBufLen, 0);
return (jint)uncompressed_direct_buf_len;
}
#endif //define HADOOP_SNAPPY_LIBRARY

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if !defined ORG_APACHE_HADOOP_IO_COMPRESS_SNAPPY_SNAPPY_H
#define ORG_APACHE_HADOOP_IO_COMPRESS_SNAPPY_SNAPPY_H
#if defined HAVE_CONFIG_H
#include <config.h>
#endif
#if defined HADOOP_SNAPPY_LIBRARY
#if defined HAVE_STDDEF_H
#include <stddef.h>
#else
#error 'stddef.h not found'
#endif
#if defined HAVE_SNAPPY_C_H
#include <snappy-c.h>
#else
#error 'Please install snappy-development packages for your platform.'
#endif
#if defined HAVE_DLFCN_H
#include <dlfcn.h>
#else
#error "dlfcn.h not found"
#endif
#if defined HAVE_JNI_H
#include <jni.h>
#else
#error 'jni.h not found'
#endif
#include "org_apache_hadoop.h"
#endif //define HADOOP_SNAPPY_LIBRARY
#endif //ORG_APACHE_HADOOP_IO_COMPRESS_SNAPPY_SNAPPY_H

View File

@ -40,7 +40,6 @@ import java.util.zip.GZIPOutputStream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -52,8 +51,7 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.snappy.LoadSnappy;
import org.apache.hadoop.io.compress.CompressorStream;
import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor; import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
import org.apache.hadoop.io.compress.zlib.BuiltInZlibDeflater; import org.apache.hadoop.io.compress.zlib.BuiltInZlibDeflater;
import org.apache.hadoop.io.compress.zlib.BuiltInZlibInflater; import org.apache.hadoop.io.compress.zlib.BuiltInZlibInflater;
@ -68,6 +66,7 @@ import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -97,6 +96,19 @@ public class TestCodec {
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec"); codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");
} }
@Test
public void testSnappyCodec() throws IOException {
if (LoadSnappy.isAvailable()) {
if (LoadSnappy.isLoaded()) {
codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.SnappyCodec");
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.SnappyCodec");
}
else {
Assert.fail("Snappy native available but Hadoop native not");
}
}
}
@Test @Test
public void testDeflateCodec() throws IOException { public void testDeflateCodec() throws IOException {
codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.DeflateCodec"); codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.DeflateCodec");