From 7e1e4bf50fa83083e762fc267b5215d606a64c3e Mon Sep 17 00:00:00 2001 From: Thomas White Date: Mon, 20 Jun 2011 16:32:27 +0000 Subject: [PATCH] HADOOP-7206. Integrate Snappy compression. Contributed by T Jake Luciani. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1137690 13f79535-47bb-0310-9956-ffa450edef68 --- common/CHANGES.txt | 2 + common/ivy.xml | 4 + common/ivy/hadoop-common-template.xml | 5 + common/ivy/libraries.properties | 1 + common/src/java/core-default.xml | 2 +- .../hadoop/io/compress/SnappyCodec.java | 150 +++++++++++++++ .../io/compress/snappy/SnappyCompressor.java | 177 ++++++++++++++++++ .../compress/snappy/SnappyDecompressor.java | 171 +++++++++++++++++ .../apache/hadoop/io/compress/TestCodec.java | 6 + 9 files changed, 517 insertions(+), 1 deletion(-) create mode 100644 common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java create mode 100644 common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java create mode 100644 common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java diff --git a/common/CHANGES.txt b/common/CHANGES.txt index ef429796cb1..9f612da0b70 100644 --- a/common/CHANGES.txt +++ b/common/CHANGES.txt @@ -47,6 +47,8 @@ Trunk (unreleased changes) HADOOP-7379. Add the ability to serialize and deserialize protocol buffers in ObjectWritable. (todd) + HADOOP-7206. Integrate Snappy compression. (T Jake Luciani via tomwhite) + IMPROVEMENTS HADOOP-7042. Updates to test-patch.sh to include failed test names and diff --git a/common/ivy.xml b/common/ivy.xml index c7d369943c1..72801ff17e5 100644 --- a/common/ivy.xml +++ b/common/ivy.xml @@ -327,5 +327,9 @@ name="protobuf-java" rev="${protobuf.version}" conf="common->default"/> + diff --git a/common/ivy/hadoop-common-template.xml b/common/ivy/hadoop-common-template.xml index fcb31ff0f1e..4e43af537e4 100644 --- a/common/ivy/hadoop-common-template.xml +++ b/common/ivy/hadoop-common-template.xml @@ -155,5 +155,10 @@ protobuf-java 2.4.0a + + org.xerial.snappy + java-snappy + 1.0.3-rc2 + diff --git a/common/ivy/libraries.properties b/common/ivy/libraries.properties index 0b312635b8e..0c4c3cdcaba 100644 --- a/common/ivy/libraries.properties +++ b/common/ivy/libraries.properties @@ -74,6 +74,7 @@ rats-lib.version=0.6 servlet.version=4.0.6 servlet-api-2.5.version=6.1.14 servlet-api.version=2.5 +snappy-java.version=1.0.3-rc2 slf4j-api.version=1.5.11 slf4j-log4j12.version=1.5.11 diff --git a/common/src/java/core-default.xml b/common/src/java/core-default.xml index 3529211ed92..74b17179b71 100644 --- a/common/src/java/core-default.xml +++ b/common/src/java/core-default.xml @@ -174,7 +174,7 @@ io.compression.codecs - org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec + org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.DeflateCodec A list of the compression codec classes that can be used for compression/decompression. diff --git a/common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java b/common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java new file mode 100644 index 00000000000..e005a39e451 --- /dev/null +++ b/common/src/java/org/apache/hadoop/io/compress/SnappyCodec.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.snappy.SnappyCompressor; +import org.apache.hadoop.io.compress.snappy.SnappyDecompressor; +import org.xerial.snappy.Snappy; +import org.xerial.snappy.SnappyError; + +public class SnappyCodec implements Configurable, CompressionCodec { + private static final Log logger = LogFactory.getLog(SnappyCodec.class + .getName()); + private static boolean nativeSnappyLoaded = false; + private Configuration conf; + + public static final String SNAPPY_BUFFER_SIZE_KEY = "io.compression.codec.snappy.buffersize"; + public static final int DEFAULT_SNAPPY_BUFFER_SIZE = 256 * 1024; + + public SnappyCodec() { + + } + + public SnappyCodec(Configuration conf) { + setConf(conf); + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public Configuration getConf() { + return conf; + } + + static { + try { + if (Snappy.getNativeLibraryVersion() != null) { + logger + .info("Successfully loaded & initialized native-snappy library [snappy-java rev " + + Snappy.getNativeLibraryVersion() + "]"); + + nativeSnappyLoaded = true; + } else { + logger.info("Failed to load native-snappy library"); + } + + } catch (SnappyError e) { + logger.error("Native Snappy load error: ", e); + } + } + + public static boolean isNativeSnappyLoaded(Configuration conf) { + return nativeSnappyLoaded; + } + + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { + return createOutputStream(out, createCompressor()); + } + + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) throws IOException { + + if (!isNativeSnappyLoaded(conf)) { + throw new RuntimeException("native-snappy library not available"); + } + + int bufferSize = conf.getInt(SNAPPY_BUFFER_SIZE_KEY, + DEFAULT_SNAPPY_BUFFER_SIZE); + + int compressionOverhead = Snappy.maxCompressedLength(bufferSize) - bufferSize; + + return new BlockCompressorStream(out, compressor, bufferSize, + compressionOverhead); + } + + public Class getCompressorType() { + if (!isNativeSnappyLoaded(conf)) { + throw new RuntimeException("native-snappy library not available"); + } + return SnappyCompressor.class; + } + + public Compressor createCompressor() { + if (!isNativeSnappyLoaded(conf)) { + throw new RuntimeException("native-snappy library not available"); + } + + return new SnappyCompressor(conf.getInt(SNAPPY_BUFFER_SIZE_KEY, + DEFAULT_SNAPPY_BUFFER_SIZE)); + } + + public CompressionInputStream createInputStream(InputStream in) + throws IOException { + return createInputStream(in, createDecompressor()); + } + + public CompressionInputStream createInputStream(InputStream in, + Decompressor decompressor) throws IOException { + if (!isNativeSnappyLoaded(conf)) { + throw new RuntimeException("native-snappy library not available"); + } + return new BlockDecompressorStream(in, decompressor, conf.getInt( + SNAPPY_BUFFER_SIZE_KEY, DEFAULT_SNAPPY_BUFFER_SIZE)); + } + + public Class getDecompressorType() { + if (!isNativeSnappyLoaded(conf)) { + throw new RuntimeException("native-snappy library not available"); + } + return SnappyDecompressor.class; + } + + public Decompressor createDecompressor() { + if (!isNativeSnappyLoaded(conf)) { + throw new RuntimeException("native-snappy library not available"); + } + + return new SnappyDecompressor(conf.getInt(SNAPPY_BUFFER_SIZE_KEY, + DEFAULT_SNAPPY_BUFFER_SIZE)); + } + + public String getDefaultExtension() { + return ".snappy"; + } +} diff --git a/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java b/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java new file mode 100644 index 00000000000..2c1968c0e79 --- /dev/null +++ b/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress.snappy; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.Compressor; +import org.xerial.snappy.Snappy; +import org.xerial.snappy.SnappyException; + +public class SnappyCompressor implements Compressor { + private static final Log logger = LogFactory.getLog(SnappyCompressor.class + .getName()); + + private boolean finish, finished; + private ByteBuffer outBuf; + private ByteBuffer compressedBuf; + + private long bytesRead = 0L; + private long bytesWritten = 0L; + + public SnappyCompressor(int bufferSize) { + outBuf = ByteBuffer.allocateDirect(bufferSize); + compressedBuf = ByteBuffer.allocateDirect(Snappy + .maxCompressedLength(bufferSize)); + + reset(); + } + + public synchronized void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + finished = false; + + outBuf.put(b, off, len); + + bytesRead += len; + } + + public synchronized void setDictionary(byte[] b, int off, int len) { + // do nothing + } + + public synchronized boolean needsInput() { + // needs input if compressed data was consumed + if (compressedBuf.position() > 0 + && compressedBuf.limit() > compressedBuf.position()) + return false; + + return true; + } + + public synchronized void finish() { + finish = true; + } + + public synchronized boolean finished() { + // Check if all compressed data has been consumed + return (finish && finished); + } + + public synchronized int compress(byte[] b, int off, int len) + throws IOException { + + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + if (finished || outBuf.position() == 0) { + finished = true; + return 0; + } + + // Only need todo this once + if (compressedBuf.position() == 0) { + try { + outBuf.limit(outBuf.position()); + outBuf.rewind(); + + int lim = Snappy.compress(outBuf, compressedBuf); + + compressedBuf.limit(lim); + compressedBuf.rewind(); + } catch (SnappyException e) { + throw new IOException(e); + } + } + + int n = (compressedBuf.limit() - compressedBuf.position()) > len ? len + : (compressedBuf.limit() - compressedBuf.position()); + + if (n == 0) { + finished = true; + return 0; + } + + compressedBuf.get(b, off, n); + + bytesWritten += n; + + // Set 'finished' if snappy has consumed all user-data + if (compressedBuf.position() == compressedBuf.limit()) { + finished = true; + + outBuf.limit(outBuf.capacity()); + outBuf.rewind(); + + compressedBuf.limit(compressedBuf.capacity()); + compressedBuf.rewind(); + + } + + return n; + } + + public synchronized void reset() { + finish = false; + finished = false; + + outBuf.limit(outBuf.capacity()); + outBuf.rewind(); + + compressedBuf.limit(compressedBuf.capacity()); + compressedBuf.rewind(); + + bytesRead = bytesWritten = 0L; + } + + public synchronized void reinit(Configuration conf) { + reset(); + } + + /** + * Return number of bytes given to this compressor since last reset. + */ + public synchronized long getBytesRead() { + return bytesRead; + } + + /** + * Return number of bytes consumed by callers of compress since last reset. + */ + public synchronized long getBytesWritten() { + return bytesWritten; + } + + public synchronized void end() { + } + +} diff --git a/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java b/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java new file mode 100644 index 00000000000..1c53e3e87e2 --- /dev/null +++ b/common/src/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress.snappy; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.io.compress.Decompressor; +import org.xerial.snappy.Snappy; +import org.xerial.snappy.SnappyException; + +public class SnappyDecompressor implements Decompressor { + + private static final Log logger = LogFactory.getLog(SnappyDecompressor.class + .getName()); + + private boolean finished; + private ByteBuffer outBuf; + private ByteBuffer uncompressedBuf; + + private long bytesRead = 0L; + private long bytesWritten = 0L; + + public SnappyDecompressor(int bufferSize) { + outBuf = ByteBuffer.allocateDirect(bufferSize); + uncompressedBuf = ByteBuffer.allocateDirect(bufferSize); + + reset(); + } + + public synchronized void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + finished = false; + + outBuf.put(b, off, len); + + bytesRead += len; + } + + public synchronized void setDictionary(byte[] b, int off, int len) { + // do nothing + } + + public synchronized boolean needsInput() { + // needs input if the uncompressed data was consumed + if (uncompressedBuf.position() > 0 + && uncompressedBuf.limit() > uncompressedBuf.position()) + return false; + + return true; + } + + public synchronized boolean needsDictionary() { + return false; + } + + public synchronized boolean finished() { + return finished; + } + + public synchronized int decompress(byte[] b, int off, int len) + throws IOException { + + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + // nothing to decompress + if ((outBuf.position() == 0 && uncompressedBuf.position() == 0) || finished) { + reset(); + finished = true; + + return 0; + } + + // only needs to do this once per input + if (uncompressedBuf.position() == 0) { + try { + outBuf.limit(outBuf.position()); + outBuf.rewind(); + + int neededLen = Snappy.uncompressedLength(outBuf); + outBuf.rewind(); + + if (neededLen > uncompressedBuf.capacity()) + uncompressedBuf = ByteBuffer.allocateDirect(neededLen); + + int lim = Snappy.uncompress(outBuf, uncompressedBuf); + + uncompressedBuf.limit(lim); + uncompressedBuf.rewind(); + } catch (SnappyException e) { + throw new IOException(e); + } + } + + int n = (uncompressedBuf.limit() - uncompressedBuf.position()) > len ? len + : (uncompressedBuf.limit() - uncompressedBuf.position()); + + if (n == 0) { + reset(); + finished = true; + return 0; + } + + uncompressedBuf.get(b, off, n); + + bytesWritten += n; + + // Set 'finished' if snappy has consumed all user-data + if (uncompressedBuf.position() == uncompressedBuf.limit()) { + reset(); + finished = true; + } + + return n; + } + + public synchronized int getRemaining() { + // Never use this function in BlockDecompressorStream. + return 0; + } + + public synchronized void reset() { + finished = false; + + uncompressedBuf.limit(uncompressedBuf.capacity()); + uncompressedBuf.rewind(); + + outBuf.limit(outBuf.capacity()); + outBuf.rewind(); + + bytesRead = bytesWritten = 0L; + } + + public synchronized void end() { + // do nothing + } + + protected void finalize() { + end(); + } + +} diff --git a/common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java b/common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java index 1929336abba..53a1cede0bb 100644 --- a/common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java +++ b/common/src/test/core/org/apache/hadoop/io/compress/TestCodec.java @@ -102,6 +102,12 @@ public class TestCodec { codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.DeflateCodec"); codecTest(conf, seed, count, "org.apache.hadoop.io.compress.DeflateCodec"); } + + @Test + public void testSnappyCodec() throws IOException { + codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.SnappyCodec"); + codecTest(conf, seed, count, "org.apache.hadoop.io.compress.SnappyCodec"); + } @Test public void testGzipCodecWithParam() throws IOException {