diff --git a/common/CHANGES.txt b/common/CHANGES.txt index b5a1a2be81c..86275563fe3 100644 --- a/common/CHANGES.txt +++ b/common/CHANGES.txt @@ -47,6 +47,9 @@ Trunk (unreleased changes) HADOOP-7379. Add the ability to serialize and deserialize protocol buffers in ObjectWritable. (todd) + HADOOP-7206. Support Snappy compression. (Issei Yoshida and + Alejandro Abdelnur via eli) + IMPROVEMENTS HADOOP-7042. Updates to test-patch.sh to include failed test names and diff --git a/common/build.xml b/common/build.xml index b5d70e0aed0..9dc0ad720ed 100644 --- a/common/build.xml +++ b/common/build.xml @@ -187,6 +187,9 @@ + + + @@ -210,6 +213,14 @@ + + + + + + + + @@ -401,12 +412,13 @@ - - - - - - + + + + + + + @@ -416,6 +428,7 @@ + @@ -429,7 +442,17 @@ - + + + + + + value="${build.native}/lib:${lib.dir}/native/${build.platform}:${snappy.lib}"/> + @@ -879,7 +903,6 @@ - @@ -1112,6 +1135,8 @@ + + @@ -1213,6 +1238,8 @@ + + diff --git a/common/src/java/core-default.xml b/common/src/java/core-default.xml index 3529211ed92..ccf39897948 100644 --- a/common/src/java/core-default.xml +++ b/common/src/java/core-default.xml @@ -1,4 +1,6 @@ + + - @@ -174,7 +175,7 @@ io.compression.codecs - org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec + org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec A list of the compression codec classes that can be used for compression/decompression. diff --git a/common/src/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/common/src/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index c5ca2471f56..faea3d1946e 100644 --- a/common/src/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/common/src/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -85,5 +85,13 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { */ public static final String NET_TOPOLOGY_CONFIGURED_NODE_MAPPING_KEY = "net.topology.configured.node.mapping"; + + /** Internal buffer size for Snappy compressor/decompressors */ + public static final String IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY = + "io.compression.codec.snappy.buffersize"; + + /** Default value for IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY */ + public static final int IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT = + 256 * 1024; } diff --git a/common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java b/common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java new file mode 100644 index 00000000000..718e7003c1e --- /dev/null +++ b/common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.snappy.LoadSnappy; +import org.apache.hadoop.io.compress.snappy.SnappyCompressor; +import org.apache.hadoop.io.compress.snappy.SnappyDecompressor; +import org.apache.hadoop.fs.CommonConfigurationKeys; + +/** + * This class creates snappy compressors/decompressors. + */ +public class SnappyCodec implements Configurable, CompressionCodec { + + static { + LoadSnappy.isLoaded(); + } + + Configuration conf; + + /** + * Set the configuration to be used by this object. + * + * @param conf the configuration object. + */ + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + /** + * Return the configuration used by this object. + * + * @return the configuration object used by this objec. + */ + @Override + public Configuration getConf() { + return conf; + } + + /** + * Are the native snappy libraries loaded & initialized? + * + * @param conf configuration + * @return true if loaded & initialized, otherwise false + */ + public static boolean isNativeSnappyLoaded(Configuration conf) { + return LoadSnappy.isLoaded() && conf.getBoolean( + CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, + CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_DEFAULT); + } + + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream}. + * + * @param out the location for the final output stream + * @return a stream the user can write uncompressed data to have it compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { + return createOutputStream(out, createCompressor()); + } + + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream} with the given {@link Compressor}. + * + * @param out the location for the final output stream + * @param compressor compressor to use + * @return a stream the user can write uncompressed data to have it compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) + throws IOException { + if (!isNativeSnappyLoaded(conf)) { + throw new RuntimeException("native snappy library not available"); + } + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT); + + int compressionOverhead = (bufferSize / 6) + 32; + + return new BlockCompressorStream(out, compressor, bufferSize, + compressionOverhead); + } + + /** + * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. + * + * @return the type of compressor needed by this codec. + */ + @Override + public Class getCompressorType() { + if (!isNativeSnappyLoaded(conf)) { + throw new RuntimeException("native snappy library not available"); + } + + return SnappyCompressor.class; + } + + /** + * Create a new {@link Compressor} for use by this {@link CompressionCodec}. + * + * @return a new compressor for use by this codec + */ + @Override + public Compressor createCompressor() { + if (!isNativeSnappyLoaded(conf)) { + throw new RuntimeException("native snappy library not available"); + } + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT); + return new SnappyCompressor(bufferSize); + } + + /** + * Create a {@link CompressionInputStream} that will read from the given + * input stream. + * + * @param in the stream to read compressed bytes from + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in) + throws IOException { + return createInputStream(in, createDecompressor()); + } + + /** + * Create a {@link CompressionInputStream} that will read from the given + * {@link InputStream} with the given {@link Decompressor}. + * + * @param in the stream to read compressed bytes from + * @param decompressor decompressor to use + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in, + Decompressor decompressor) + throws IOException { + if (!isNativeSnappyLoaded(conf)) { + throw new RuntimeException("native snappy library not available"); + } + + return new BlockDecompressorStream(in, decompressor, conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT)); + } + + /** + * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. + * + * @return the type of decompressor needed by this codec. + */ + @Override + public Class getDecompressorType() { + if (!isNativeSnappyLoaded(conf)) { + throw new RuntimeException("native snappy library not available"); + } + + return SnappyDecompressor.class; + } + + /** + * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. + * + * @return a new decompressor for use by this codec + */ + @Override + public Decompressor createDecompressor() { + if (!isNativeSnappyLoaded(conf)) { + throw new RuntimeException("native snappy library not available"); + } + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT); + return new SnappyDecompressor(bufferSize); + } + + /** + * Get the default filename extension for this kind of compression. + * + * @return .snappy. + */ + @Override + public String getDefaultExtension() { + return ".snappy"; + } +} diff --git a/common/src/java/org/apache/hadoop/io/compress/snappy/LoadSnappy.java b/common/src/java/org/apache/hadoop/io/compress/snappy/LoadSnappy.java new file mode 100644 index 00000000000..05dc984afac --- /dev/null +++ b/common/src/java/org/apache/hadoop/io/compress/snappy/LoadSnappy.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress.snappy; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.NativeCodeLoader; + +/** + * Determines if Snappy native library is available and loads it if available. + */ +public class LoadSnappy { + private static final Log LOG = LogFactory.getLog(LoadSnappy.class.getName()); + + private static boolean AVAILABLE = false; + private static boolean LOADED = false; + + static { + try { + System.loadLibrary("snappy"); + LOG.warn("Snappy native library is available"); + AVAILABLE = true; + } catch (UnsatisfiedLinkError ex) { + //NOP + } + boolean hadoopNativeAvailable = NativeCodeLoader.isNativeCodeLoaded(); + LOADED = AVAILABLE && hadoopNativeAvailable; + if (LOADED) { + LOG.info("Snappy native library loaded"); + } else { + LOG.warn("Snappy native library not loaded"); + } + } + + /** + * Returns if Snappy native library is loaded. + * + * @return true if Snappy native library is loaded, + * false if not. + */ + public static boolean isAvailable() { + return AVAILABLE; + } + + /** + * Returns if Snappy native library is loaded. + * + * @return true if Snappy native library is loaded, + * false if not. + */ + public static boolean isLoaded() { + return LOADED; + } + +} diff --git a/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java b/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java new file mode 100644 index 00000000000..ba778e0e0bd --- /dev/null +++ b/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.snappy; + +import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.Compressor; + +/** + * A {@link Compressor} based on the snappy compression algorithm. + * http://code.google.com/p/snappy/ + */ +public class SnappyCompressor implements Compressor { + private static final Log LOG = + LogFactory.getLog(SnappyCompressor.class.getName()); + private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; + + // HACK - Use this as a global lock in the JNI layer + @SuppressWarnings({"unchecked", "unused"}) + private static Class clazz = SnappyCompressor.class; + + private int directBufferSize; + private Buffer compressedDirectBuf = null; + private int uncompressedDirectBufLen; + private Buffer uncompressedDirectBuf = null; + private byte[] userBuf = null; + private int userBufOff = 0, userBufLen = 0; + private boolean finish, finished; + + private long bytesRead = 0L; + private long bytesWritten = 0L; + + + static { + if (LoadSnappy.isLoaded()) { + // Initialize the native library + try { + initIDs(); + } catch (Throwable t) { + // Ignore failure to load/initialize snappy + LOG.warn(t.toString()); + } + } else { + LOG.error("Cannot load " + SnappyCompressor.class.getName() + + " without snappy library!"); + } + } + + /** + * Creates a new compressor. + * + * @param directBufferSize size of the direct buffer to be used. + */ + public SnappyCompressor(int directBufferSize) { + this.directBufferSize = directBufferSize; + + uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + compressedDirectBuf.position(directBufferSize); + } + + /** + * Creates a new compressor with the default buffer size. + */ + public SnappyCompressor() { + this(DEFAULT_DIRECT_BUFFER_SIZE); + } + + /** + * Sets input data for compression. + * This should be called whenever #needsInput() returns + * true indicating that more input data is required. + * + * @param b Input data + * @param off Start offset + * @param len Length + */ + @Override + public synchronized void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + finished = false; + + if (len > uncompressedDirectBuf.remaining()) { + // save data; now !needsInput + this.userBuf = b; + this.userBufOff = off; + this.userBufLen = len; + } else { + ((ByteBuffer) uncompressedDirectBuf).put(b, off, len); + uncompressedDirectBufLen = uncompressedDirectBuf.position(); + } + + bytesRead += len; + } + + /** + * If a write would exceed the capacity of the direct buffers, it is set + * aside to be loaded by this function while the compressed data are + * consumed. + */ + synchronized void setInputFromSavedData() { + if (0 >= userBufLen) { + return; + } + finished = false; + + uncompressedDirectBufLen = Math.min(userBufLen, directBufferSize); + ((ByteBuffer) uncompressedDirectBuf).put(userBuf, userBufOff, + uncompressedDirectBufLen); + + // Note how much data is being fed to snappy + userBufOff += uncompressedDirectBufLen; + userBufLen -= uncompressedDirectBufLen; + } + + /** + * Does nothing. + */ + @Override + public synchronized void setDictionary(byte[] b, int off, int len) { + // do nothing + } + + /** + * Returns true if the input data buffer is empty and + * #setInput() should be called to provide more input. + * + * @return true if the input data buffer is empty and + * #setInput() should be called in order to provide more input. + */ + @Override + public synchronized boolean needsInput() { + return !(compressedDirectBuf.remaining() > 0 + || uncompressedDirectBuf.remaining() == 0 || userBufLen > 0); + } + + /** + * When called, indicates that compression should end + * with the current contents of the input buffer. + */ + @Override + public synchronized void finish() { + finish = true; + } + + /** + * Returns true if the end of the compressed + * data output stream has been reached. + * + * @return true if the end of the compressed + * data output stream has been reached. + */ + @Override + public synchronized boolean finished() { + // Check if all uncompressed data has been consumed + return (finish && finished && compressedDirectBuf.remaining() == 0); + } + + /** + * Fills specified buffer with compressed data. Returns actual number + * of bytes of compressed data. A return value of 0 indicates that + * needsInput() should be called in order to determine if more input + * data is required. + * + * @param b Buffer for the compressed data + * @param off Start offset of the data + * @param len Size of the buffer + * @return The actual number of bytes of compressed data. + */ + @Override + public synchronized int compress(byte[] b, int off, int len) + throws IOException { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + // Check if there is compressed data + int n = compressedDirectBuf.remaining(); + if (n > 0) { + n = Math.min(n, len); + ((ByteBuffer) compressedDirectBuf).get(b, off, n); + bytesWritten += n; + return n; + } + + // Re-initialize the snappy's output direct-buffer + compressedDirectBuf.clear(); + compressedDirectBuf.limit(0); + if (0 == uncompressedDirectBuf.position()) { + // No compressed data, so we should have !needsInput or !finished + setInputFromSavedData(); + if (0 == uncompressedDirectBuf.position()) { + // Called without data; write nothing + finished = true; + return 0; + } + } + + // Compress data + n = compressBytesDirect(); + compressedDirectBuf.limit(n); + uncompressedDirectBuf.clear(); // snappy consumes all buffer input + + // Set 'finished' if snapy has consumed all user-data + if (0 == userBufLen) { + finished = true; + } + + // Get atmost 'len' bytes + n = Math.min(n, len); + bytesWritten += n; + ((ByteBuffer) compressedDirectBuf).get(b, off, n); + + return n; + } + + /** + * Resets compressor so that a new set of input data can be processed. + */ + @Override + public synchronized void reset() { + finish = false; + finished = false; + uncompressedDirectBuf.clear(); + uncompressedDirectBufLen = 0; + compressedDirectBuf.clear(); + compressedDirectBuf.limit(0); + userBufOff = userBufLen = 0; + bytesRead = bytesWritten = 0L; + } + + /** + * Prepare the compressor to be used in a new stream with settings defined in + * the given Configuration + * + * @param conf Configuration from which new setting are fetched + */ + @Override + public synchronized void reinit(Configuration conf) { + reset(); + } + + /** + * Return number of bytes given to this compressor since last reset. + */ + @Override + public synchronized long getBytesRead() { + return bytesRead; + } + + /** + * Return number of bytes consumed by callers of compress since last reset. + */ + @Override + public synchronized long getBytesWritten() { + return bytesWritten; + } + + /** + * Closes the compressor and discards any unprocessed input. + */ + @Override + public synchronized void end() { + } + + private native static void initIDs(); + + private native int compressBytesDirect(); +} diff --git a/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java b/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java new file mode 100644 index 00000000000..baf864094e0 --- /dev/null +++ b/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.snappy; + +import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.compress.Decompressor; + +/** + * A {@link Decompressor} based on the snappy compression algorithm. + * http://code.google.com/p/snappy/ + */ +public class SnappyDecompressor implements Decompressor { + private static final Log LOG = + LogFactory.getLog(SnappyCompressor.class.getName()); + private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; + + // HACK - Use this as a global lock in the JNI layer + @SuppressWarnings({"unchecked", "unused"}) + private static Class clazz = SnappyDecompressor.class; + + private int directBufferSize; + private Buffer compressedDirectBuf = null; + private int compressedDirectBufLen; + private Buffer uncompressedDirectBuf = null; + private byte[] userBuf = null; + private int userBufOff = 0, userBufLen = 0; + private boolean finished; + + static { + if (LoadSnappy.isLoaded()) { + // Initialize the native library + try { + initIDs(); + } catch (Throwable t) { + // Ignore failure to load/initialize snappy + LOG.warn(t.toString()); + } + } else { + LOG.error("Cannot load " + SnappyDecompressor.class.getName() + + " without snappy library!"); + } + } + + /** + * Creates a new compressor. + * + * @param directBufferSize size of the direct buffer to be used. + */ + public SnappyDecompressor(int directBufferSize) { + this.directBufferSize = directBufferSize; + + compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + + } + + /** + * Creates a new decompressor with the default buffer size. + */ + public SnappyDecompressor() { + this(DEFAULT_DIRECT_BUFFER_SIZE); + } + + /** + * Sets input data for decompression. + * This should be called if and only if {@link #needsInput()} returns + * true indicating that more input data is required. + * (Both native and non-native versions of various Decompressors require + * that the data passed in via b[] remain unmodified until + * the caller is explicitly notified--via {@link #needsInput()}--that the + * buffer may be safely modified. With this requirement, an extra + * buffer-copy can be avoided.) + * + * @param b Input data + * @param off Start offset + * @param len Length + */ + @Override + public synchronized void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + this.userBuf = b; + this.userBufOff = off; + this.userBufLen = len; + + setInputFromSavedData(); + + // Reinitialize snappy's output direct-buffer + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + } + + /** + * If a write would exceed the capacity of the direct buffers, it is set + * aside to be loaded by this function while the compressed data are + * consumed. + */ + synchronized void setInputFromSavedData() { + compressedDirectBufLen = Math.min(userBufLen, directBufferSize); + + // Reinitialize snappy's input direct buffer + compressedDirectBuf.rewind(); + ((ByteBuffer) compressedDirectBuf).put(userBuf, userBufOff, + compressedDirectBufLen); + + // Note how much data is being fed to snappy + userBufOff += compressedDirectBufLen; + userBufLen -= compressedDirectBufLen; + } + + /** + * Does nothing. + */ + @Override + public synchronized void setDictionary(byte[] b, int off, int len) { + // do nothing + } + + /** + * Returns true if the input data buffer is empty and + * {@link #setInput(byte[], int, int)} should be called to + * provide more input. + * + * @return true if the input data buffer is empty and + * {@link #setInput(byte[], int, int)} should be called in + * order to provide more input. + */ + @Override + public synchronized boolean needsInput() { + // Consume remaining compressed data? + if (uncompressedDirectBuf.remaining() > 0) { + return false; + } + + // Check if snappy has consumed all input + if (compressedDirectBufLen <= 0) { + // Check if we have consumed all user-input + if (userBufLen <= 0) { + return true; + } else { + setInputFromSavedData(); + } + } + + return false; + } + + /** + * Returns false. + * + * @return false. + */ + @Override + public synchronized boolean needsDictionary() { + return false; + } + + /** + * Returns true if the end of the decompressed + * data output stream has been reached. + * + * @return true if the end of the decompressed + * data output stream has been reached. + */ + @Override + public synchronized boolean finished() { + return (finished && uncompressedDirectBuf.remaining() == 0); + } + + /** + * Fills specified buffer with uncompressed data. Returns actual number + * of bytes of uncompressed data. A return value of 0 indicates that + * {@link #needsInput()} should be called in order to determine if more + * input data is required. + * + * @param b Buffer for the compressed data + * @param off Start offset of the data + * @param len Size of the buffer + * @return The actual number of bytes of compressed data. + * @throws IOException + */ + @Override + public synchronized int decompress(byte[] b, int off, int len) + throws IOException { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + int n = 0; + + // Check if there is uncompressed data + n = uncompressedDirectBuf.remaining(); + if (n > 0) { + n = Math.min(n, len); + ((ByteBuffer) uncompressedDirectBuf).get(b, off, n); + return n; + } + if (compressedDirectBufLen > 0) { + // Re-initialize the snappy's output direct buffer + uncompressedDirectBuf.rewind(); + uncompressedDirectBuf.limit(directBufferSize); + + // Decompress data + n = decompressBytesDirect(); + uncompressedDirectBuf.limit(n); + + if (userBufLen <= 0) { + finished = true; + } + + // Get atmost 'len' bytes + n = Math.min(n, len); + ((ByteBuffer) uncompressedDirectBuf).get(b, off, n); + } + + return n; + } + + /** + * Returns 0. + * + * @return 0. + */ + @Override + public synchronized int getRemaining() { + // Never use this function in BlockDecompressorStream. + return 0; + } + + public synchronized void reset() { + finished = false; + compressedDirectBufLen = 0; + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + userBufOff = userBufLen = 0; + } + + /** + * Resets decompressor and input and output buffers so that a new set of + * input data can be processed. + */ + @Override + public synchronized void end() { + // do nothing + } + + private native static void initIDs(); + + private native int decompressBytesDirect(); +} diff --git a/common/src/native/Makefile.am b/common/src/native/Makefile.am index 6475359fa71..286be3159b1 100644 --- a/common/src/native/Makefile.am +++ b/common/src/native/Makefile.am @@ -34,6 +34,7 @@ export PLATFORM = $(shell echo $$OS_NAME | tr [A-Z] [a-z]) ACLOCAL_AMFLAGS = -I m4 AM_CPPFLAGS = @JNI_CPPFLAGS@ -I$(HADOOP_NATIVE_SRCDIR)/src \ -Isrc/org/apache/hadoop/io/compress/zlib \ + -Isrc/org/apache/hadoop/io/compress/snappy \ -Isrc/org/apache/hadoop/security \ -Isrc/org/apache/hadoop/io/nativeio/ AM_LDFLAGS = @JNI_LDFLAGS@ @@ -46,6 +47,8 @@ endif lib_LTLIBRARIES = libhadoop.la libhadoop_la_SOURCES = src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c \ src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c \ + src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c \ + src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c \ src/org/apache/hadoop/security/getGroup.c \ src/org/apache/hadoop/security/JniBasedUnixGroupsMapping.c \ src/org/apache/hadoop/security/JniBasedUnixGroupsNetgroupMapping.c \ diff --git a/common/src/native/configure.ac b/common/src/native/configure.ac index a1ce73c15c2..80f7f81c8d2 100644 --- a/common/src/native/configure.ac +++ b/common/src/native/configure.ac @@ -88,6 +88,9 @@ AC_SUBST([JNI_CPPFLAGS]) dnl Check for zlib headers AC_CHECK_HEADERS([zlib.h zconf.h], AC_COMPUTE_NEEDED_DSO(z,HADOOP_ZLIB_LIBRARY), AC_MSG_ERROR(Zlib headers were not found... native-hadoop library needs zlib to build. Please install the requisite zlib development package.)) +dnl Check for snappy headers +AC_CHECK_HEADERS([snappy-c.h], AC_COMPUTE_NEEDED_DSO(snappy,HADOOP_SNAPPY_LIBRARY), AC_MSG_WARN(Snappy headers were not found... building without snappy.)) + dnl Check for headers needed by the native Group resolution implementation AC_CHECK_HEADERS([fcntl.h stdlib.h string.h unistd.h], [], AC_MSG_ERROR(Some system headers not found... please ensure their presence on your platform.)) diff --git a/common/src/native/packageNativeHadoop.sh b/common/src/native/packageNativeHadoop.sh index 46c69bae592..9ede8fd077e 100755 --- a/common/src/native/packageNativeHadoop.sh +++ b/common/src/native/packageNativeHadoop.sh @@ -62,4 +62,17 @@ then done fi +if [ "${BUNDLE_SNAPPY_LIB}" = "true" ] +then + if [ -d ${SNAPPY_LIB_DIR} ] + then + echo "Copying Snappy library in ${SNAPPY_LIB_DIR} to $DIST_LIB_DIR/" + cd ${SNAPPY_LIB_DIR} + $TAR . | (cd $DIST_LIB_DIR/; $UNTAR) + else + echo "Snappy lib directory ${SNAPPY_LIB_DIR} does not exist" + exit 1 + fi +fi + #vim: ts=2: sw=2: et diff --git a/common/src/native/src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c b/common/src/native/src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c new file mode 100644 index 00000000000..13991c23f4f --- /dev/null +++ b/common/src/native/src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if defined HAVE_CONFIG_H + #include +#endif + +#if defined HADOOP_SNAPPY_LIBRARY + +#if defined HAVE_STDIO_H + #include +#else + #error 'stdio.h not found' +#endif + +#if defined HAVE_STDLIB_H + #include +#else + #error 'stdlib.h not found' +#endif + +#if defined HAVE_STRING_H + #include +#else + #error 'string.h not found' +#endif + +#if defined HAVE_DLFCN_H + #include +#else + #error 'dlfcn.h not found' +#endif + +#include "org_apache_hadoop_io_compress_snappy.h" +#include "org_apache_hadoop_io_compress_snappy_SnappyCompressor.h" + +static jfieldID SnappyCompressor_clazz; +static jfieldID SnappyCompressor_uncompressedDirectBuf; +static jfieldID SnappyCompressor_uncompressedDirectBufLen; +static jfieldID SnappyCompressor_compressedDirectBuf; +static jfieldID SnappyCompressor_directBufferSize; + +static snappy_status (*dlsym_snappy_compress)(const char*, size_t, char*, size_t*); + +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyCompressor_initIDs +(JNIEnv *env, jclass clazz){ + + // Load libsnappy.so + void *libsnappy = dlopen(HADOOP_SNAPPY_LIBRARY, RTLD_LAZY | RTLD_GLOBAL); + if (!libsnappy) { + char* msg = (char*)malloc(1000); + snprintf(msg, 1000, "%s (%s)!", "Cannot load " HADOOP_SNAPPY_LIBRARY, dlerror()); + THROW(env, "java/lang/UnsatisfiedLinkError", msg); + return; + } + + // Locate the requisite symbols from libsnappy.so + dlerror(); // Clear any existing error + LOAD_DYNAMIC_SYMBOL(dlsym_snappy_compress, env, libsnappy, "snappy_compress"); + + SnappyCompressor_clazz = (*env)->GetStaticFieldID(env, clazz, "clazz", + "Ljava/lang/Class;"); + SnappyCompressor_uncompressedDirectBuf = (*env)->GetFieldID(env, clazz, + "uncompressedDirectBuf", + "Ljava/nio/Buffer;"); + SnappyCompressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, clazz, + "uncompressedDirectBufLen", "I"); + SnappyCompressor_compressedDirectBuf = (*env)->GetFieldID(env, clazz, + "compressedDirectBuf", + "Ljava/nio/Buffer;"); + SnappyCompressor_directBufferSize = (*env)->GetFieldID(env, clazz, + "directBufferSize", "I"); +} + +JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyCompressor_compressBytesDirect +(JNIEnv *env, jobject thisj){ + // Get members of SnappyCompressor + jobject clazz = (*env)->GetStaticObjectField(env, thisj, SnappyCompressor_clazz); + jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, SnappyCompressor_uncompressedDirectBuf); + jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, SnappyCompressor_uncompressedDirectBufLen); + jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, SnappyCompressor_compressedDirectBuf); + jint compressed_direct_buf_len = (*env)->GetIntField(env, thisj, SnappyCompressor_directBufferSize); + + // Get the input direct buffer + LOCK_CLASS(env, clazz, "SnappyCompressor"); + const char* uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); + UNLOCK_CLASS(env, clazz, "SnappyCompressor"); + + if (uncompressed_bytes == 0) { + return (jint)0; + } + + // Get the output direct buffer + LOCK_CLASS(env, clazz, "SnappyCompressor"); + char* compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf); + UNLOCK_CLASS(env, clazz, "SnappyCompressor"); + + if (compressed_bytes == 0) { + return (jint)0; + } + + snappy_status ret = dlsym_snappy_compress(uncompressed_bytes, uncompressed_direct_buf_len, compressed_bytes, &compressed_direct_buf_len); + if (ret != SNAPPY_OK){ + THROW(env, "Ljava/lang/InternalError", "Could not compress data. Buffer length is too small."); + } + + (*env)->SetIntField(env, thisj, SnappyCompressor_uncompressedDirectBufLen, 0); + + return (jint)compressed_direct_buf_len; +} + +#endif //define HADOOP_SNAPPY_LIBRARY diff --git a/common/src/native/src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c b/common/src/native/src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c new file mode 100644 index 00000000000..767c5f4b313 --- /dev/null +++ b/common/src/native/src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if defined HAVE_CONFIG_H + #include +#endif + +#if defined HADOOP_SNAPPY_LIBRARY + +#if defined HAVE_STDIO_H + #include +#else + #error 'stdio.h not found' +#endif + +#if defined HAVE_STDLIB_H + #include +#else + #error 'stdlib.h not found' +#endif + +#if defined HAVE_STRING_H + #include +#else + #error 'string.h not found' +#endif + +#if defined HAVE_DLFCN_H + #include +#else + #error 'dlfcn.h not found' +#endif + +#include "org_apache_hadoop_io_compress_snappy.h" +#include "org_apache_hadoop_io_compress_snappy_SnappyDecompressor.h" + +static jfieldID SnappyDecompressor_clazz; +static jfieldID SnappyDecompressor_compressedDirectBuf; +static jfieldID SnappyDecompressor_compressedDirectBufLen; +static jfieldID SnappyDecompressor_uncompressedDirectBuf; +static jfieldID SnappyDecompressor_directBufferSize; + +static snappy_status (*dlsym_snappy_uncompress)(const char*, size_t, char*, size_t*); + +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyDecompressor_initIDs +(JNIEnv *env, jclass clazz){ + + // Load libsnappy.so + void *libsnappy = dlopen(HADOOP_SNAPPY_LIBRARY, RTLD_LAZY | RTLD_GLOBAL); + if (!libsnappy) { + char* msg = (char*)malloc(1000); + snprintf(msg, 1000, "%s (%s)!", "Cannot load " HADOOP_SNAPPY_LIBRARY, dlerror()); + THROW(env, "java/lang/UnsatisfiedLinkError", msg); + return; + } + + // Locate the requisite symbols from libsnappy.so + dlerror(); // Clear any existing error + LOAD_DYNAMIC_SYMBOL(dlsym_snappy_uncompress, env, libsnappy, "snappy_uncompress"); + + SnappyDecompressor_clazz = (*env)->GetStaticFieldID(env, clazz, "clazz", + "Ljava/lang/Class;"); + SnappyDecompressor_compressedDirectBuf = (*env)->GetFieldID(env,clazz, + "compressedDirectBuf", + "Ljava/nio/Buffer;"); + SnappyDecompressor_compressedDirectBufLen = (*env)->GetFieldID(env,clazz, + "compressedDirectBufLen", "I"); + SnappyDecompressor_uncompressedDirectBuf = (*env)->GetFieldID(env,clazz, + "uncompressedDirectBuf", + "Ljava/nio/Buffer;"); + SnappyDecompressor_directBufferSize = (*env)->GetFieldID(env, clazz, + "directBufferSize", "I"); +} + +JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyDecompressor_decompressBytesDirect +(JNIEnv *env, jobject thisj){ + // Get members of SnappyDecompressor + jobject clazz = (*env)->GetStaticObjectField(env,thisj, SnappyDecompressor_clazz); + jobject compressed_direct_buf = (*env)->GetObjectField(env,thisj, SnappyDecompressor_compressedDirectBuf); + jint compressed_direct_buf_len = (*env)->GetIntField(env,thisj, SnappyDecompressor_compressedDirectBufLen); + jobject uncompressed_direct_buf = (*env)->GetObjectField(env,thisj, SnappyDecompressor_uncompressedDirectBuf); + size_t uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, SnappyDecompressor_directBufferSize); + + // Get the input direct buffer + LOCK_CLASS(env, clazz, "SnappyDecompressor"); + const char* compressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, compressed_direct_buf); + UNLOCK_CLASS(env, clazz, "SnappyDecompressor"); + + if (compressed_bytes == 0) { + return (jint)0; + } + + // Get the output direct buffer + LOCK_CLASS(env, clazz, "SnappyDecompressor"); + char* uncompressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); + UNLOCK_CLASS(env, clazz, "SnappyDecompressor"); + + if (uncompressed_bytes == 0) { + return (jint)0; + } + + snappy_status ret = dlsym_snappy_uncompress(compressed_bytes, compressed_direct_buf_len, uncompressed_bytes, &uncompressed_direct_buf_len); + if (ret == SNAPPY_BUFFER_TOO_SMALL){ + THROW(env, "Ljava/lang/InternalError", "Could not decompress data. Buffer length is too small."); + } else if (ret == SNAPPY_INVALID_INPUT){ + THROW(env, "Ljava/lang/InternalError", "Could not decompress data. Input is invalid."); + } else if (ret != SNAPPY_OK){ + THROW(env, "Ljava/lang/InternalError", "Could not decompress data."); + } + + (*env)->SetIntField(env, thisj, SnappyDecompressor_compressedDirectBufLen, 0); + + return (jint)uncompressed_direct_buf_len; +} + +#endif //define HADOOP_SNAPPY_LIBRARY diff --git a/common/src/native/src/org/apache/hadoop/io/compress/snappy/org_apache_hadoop_io_compress_snappy.h b/common/src/native/src/org/apache/hadoop/io/compress/snappy/org_apache_hadoop_io_compress_snappy.h new file mode 100644 index 00000000000..815e0306736 --- /dev/null +++ b/common/src/native/src/org/apache/hadoop/io/compress/snappy/org_apache_hadoop_io_compress_snappy.h @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#if !defined ORG_APACHE_HADOOP_IO_COMPRESS_SNAPPY_SNAPPY_H +#define ORG_APACHE_HADOOP_IO_COMPRESS_SNAPPY_SNAPPY_H + + +#if defined HAVE_CONFIG_H + #include +#endif + +#if defined HADOOP_SNAPPY_LIBRARY + + #if defined HAVE_STDDEF_H + #include + #else + #error 'stddef.h not found' + #endif + + #if defined HAVE_SNAPPY_C_H + #include + #else + #error 'Please install snappy-development packages for your platform.' + #endif + + #if defined HAVE_DLFCN_H + #include + #else + #error "dlfcn.h not found" + #endif + + #if defined HAVE_JNI_H + #include + #else + #error 'jni.h not found' + #endif + + #include "org_apache_hadoop.h" + +#endif //define HADOOP_SNAPPY_LIBRARY + +#endif //ORG_APACHE_HADOOP_IO_COMPRESS_SNAPPY_SNAPPY_H diff --git a/common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java b/common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java index 1929336abba..50bc1e10a53 100644 --- a/common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java +++ b/common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java @@ -40,7 +40,6 @@ import java.util.zip.GZIPOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -52,8 +51,7 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.compress.CompressionOutputStream; -import org.apache.hadoop.io.compress.CompressorStream; +import org.apache.hadoop.io.compress.snappy.LoadSnappy; import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor; import org.apache.hadoop.io.compress.zlib.BuiltInZlibDeflater; import org.apache.hadoop.io.compress.zlib.BuiltInZlibInflater; @@ -68,6 +66,7 @@ import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.junit.Assert; import org.junit.Test; import static org.junit.Assert.*; @@ -96,6 +95,19 @@ public class TestCodec { codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.BZip2Codec"); codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec"); } + + @Test + public void testSnappyCodec() throws IOException { + if (LoadSnappy.isAvailable()) { + if (LoadSnappy.isLoaded()) { + codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.SnappyCodec"); + codecTest(conf, seed, count, "org.apache.hadoop.io.compress.SnappyCodec"); + } + else { + Assert.fail("Snappy native available but Hadoop native not"); + } + } + } @Test public void testDeflateCodec() throws IOException {