diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java index c0de5fc07e8..b64bbe95a36 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java @@ -119,7 +119,7 @@ public final class Compression { @Override DefaultCodec getCodec(Configuration conf) { if (codec == null) { - codec = new GzipCodec(); + codec = new ReusableStreamGzipCodec(); codec.setConf(new Configuration(conf)); } @@ -213,7 +213,6 @@ public final class Compression { public OutputStream createCompressionStream( OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { - CompressionCodec codec = getCodec(conf); OutputStream bos1 = null; if (downStreamBufferSize > 0) { bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); @@ -221,15 +220,25 @@ public final class Compression { else { bos1 = downStream; } - ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024); CompressionOutputStream cos = - codec.createOutputStream(bos1, compressor); + createPlainCompressionStream(bos1, compressor); BufferedOutputStream bos2 = new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE); return bos2; } + /** + * Creates a compression stream without any additional wrapping into + * buffering streams. + */ + CompressionOutputStream createPlainCompressionStream( + OutputStream downStream, Compressor compressor) throws IOException { + CompressionCodec codec = getCodec(conf); + ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024); + return codec.createOutputStream(downStream, compressor); + } + public Compressor getCompressor() { CompressionCodec codec = getCodec(conf); if (codec != null) { diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 256554d2bf2..ef4cb9de121 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -28,7 +28,6 @@ import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.nio.ByteBuffer; import org.apache.hadoop.fs.FSDataInputStream; @@ -44,6 +43,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; @@ -547,6 +547,12 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { /** Compressor, which is also reused between consecutive blocks. */ private Compressor compressor; + /** Compression output stream */ + private CompressionOutputStream compressionStream; + + /** Underlying stream to write compressed bytes to */ + private ByteArrayOutputStream compressedByteStream; + /** * Current block type. Set in {@link #startWriting(BlockType)}. Could be * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA} @@ -602,9 +608,19 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; baosInMemory = new ByteArrayOutputStream(); - if (compressAlgo != NONE) + if (compressAlgo != NONE) { compressor = compressionAlgorithm.getCompressor(); - + compressedByteStream = new ByteArrayOutputStream(); + try { + compressionStream = + compressionAlgorithm.createPlainCompressionStream( + compressedByteStream, compressor); + } catch (IOException e) { + throw new RuntimeException("Could not create compression stream " + + "for algorithm " + compressionAlgorithm, e); + } + } + prevOffsetByType = new long[BlockType.values().length]; for (int i = 0; i < prevOffsetByType.length; ++i) prevOffsetByType[i] = -1; @@ -697,19 +713,18 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { private void doCompression() throws IOException { // do the compression if (compressAlgo != NONE) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - baos.write(DUMMY_HEADER); + compressedByteStream.reset(); + compressedByteStream.write(DUMMY_HEADER); - // compress the data - OutputStream compressingOutputStream = - compressAlgo.createCompressionStream(baos, compressor, 0); - compressingOutputStream.write(uncompressedBytesWithHeader, HEADER_SIZE, + compressionStream.resetState(); + + compressionStream.write(uncompressedBytesWithHeader, HEADER_SIZE, uncompressedBytesWithHeader.length - HEADER_SIZE); - // finish compression stream - compressingOutputStream.flush(); + compressionStream.flush(); + compressionStream.finish(); - onDiskBytesWithHeader = baos.toByteArray(); + onDiskBytesWithHeader = compressedByteStream.toByteArray(); putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length, uncompressedBytesWithHeader.length); } else { diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java new file mode 100644 index 00000000000..2b1d48b5a60 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.CompressorStream; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.zlib.ZlibFactory; + +/** + * Fixes an inefficiency in Hadoop's Gzip codec, allowing to reuse compression + * streams. + */ +public class ReusableStreamGzipCodec extends GzipCodec { + + private static final Log LOG = LogFactory.getLog(Compression.class); + + /** + * A bridge that wraps around a DeflaterOutputStream to make it a + * CompressionOutputStream. + */ + protected static class ReusableGzipOutputStream extends CompressorStream { + + private static final int GZIP_HEADER_LENGTH = 10; + + /** + * Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for + * details. + */ + private static final byte[] GZIP_HEADER; + + static { + // Capture the fixed ten-byte header hard-coded in GZIPOutputStream. + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] header = null; + GZIPOutputStream gzipStream = null; + try { + gzipStream = new GZIPOutputStream(baos); + gzipStream.finish(); + header = Arrays.copyOfRange(baos.toByteArray(), 0, GZIP_HEADER_LENGTH); + } catch (IOException e) { + throw new RuntimeException("Could not create gzip stream", e); + } finally { + if (gzipStream != null) { + try { + gzipStream.close(); + } catch (IOException e) { + LOG.error(e); + } + } + } + GZIP_HEADER = header; + } + + private static class ResetableGZIPOutputStream extends GZIPOutputStream { + public ResetableGZIPOutputStream(OutputStream out) throws IOException { + super(out); + } + + public void resetState() throws IOException { + def.reset(); + crc.reset(); + out.write(GZIP_HEADER); + } + } + + public ReusableGzipOutputStream(OutputStream out) throws IOException { + super(new ResetableGZIPOutputStream(out)); + } + + @Override + public void close() throws IOException { + out.close(); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void write(int b) throws IOException { + out.write(b); + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + out.write(data, offset, length); + } + + @Override + public void finish() throws IOException { + ((GZIPOutputStream) out).finish(); + } + + @Override + public void resetState() throws IOException { + ((ResetableGZIPOutputStream) out).resetState(); + } + } + + @Override + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { + if (ZlibFactory.isNativeZlibLoaded(getConf())) { + return super.createOutputStream(out); + } + return new ReusableGzipOutputStream(out); + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index e60e6174a56..e8b7df07591 100644 --- a/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*; @@ -75,9 +76,6 @@ public class TestHFileBlock { static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ }; - // In case we need to temporarily switch some test cases to just test gzip. - static final Compression.Algorithm[] GZIP_ONLY = { GZ }; - private static final int NUM_TEST_BLOCKS = 1000; private static final int NUM_READER_THREADS = 26; @@ -206,14 +204,16 @@ public class TestHFileBlock { return headerAndData; } - public String createTestBlockStr(Compression.Algorithm algo) - throws IOException { + public String createTestBlockStr(Compression.Algorithm algo, + int correctLength) throws IOException { byte[] testV2Block = createTestV2Block(algo); int osOffset = HFileBlock.HEADER_SIZE + 9; - if (osOffset < testV2Block.length) { + if (testV2Block.length == correctLength) { // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid // variations across operating systems. // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format. + // We only make this change when the compressed block length matches. + // Otherwise, there are obviously other inconsistencies. testV2Block[osOffset] = 3; } return Bytes.toStringBinary(testV2Block); @@ -226,7 +226,7 @@ public class TestHFileBlock { @Test public void testGzipCompression() throws IOException { - assertEquals( + final String correctTestBlockStr = "DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF" + "\\xFF\\xFF\\xFF\\xFF" // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html @@ -240,8 +240,10 @@ public class TestHFileBlock { + "\\x03" + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa" + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c" - + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\s\\xA0\\x0F\\x00\\x00", - createTestBlockStr(GZ)); + + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\s\\xA0\\x0F\\x00\\x00"; + final int correctGzipBlockLength = 82; + assertEquals(correctTestBlockStr, createTestBlockStr(GZ, + correctGzipBlockLength)); } @Test