From e70883664153a21eb0ab63b8922277dae391c3bc Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 8 Sep 2021 21:23:25 -0700 Subject: [PATCH] HADOOP-17887. Remove the wrapper class GzipOutputStream (#3377) --- .../apache/hadoop/io/compress/GzipCodec.java | 92 ------------------- .../compress/zlib/BuiltInGzipCompressor.java | 11 +-- .../apache/hadoop/io/compress/TestCodec.java | 42 +++++++++ 3 files changed, 43 insertions(+), 102 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java index 1f4c02c261b..9136fa9b927 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java @@ -24,7 +24,6 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.zip.GZIPOutputStream; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -41,101 +40,10 @@ import org.apache.hadoop.io.compress.zlib.ZlibFactory; @InterfaceAudience.Public @InterfaceStability.Evolving public class GzipCodec extends DefaultCodec { - /** - * A bridge that wraps around a DeflaterOutputStream to make it - * a CompressionOutputStream. - */ - @InterfaceStability.Evolving - protected static class GzipOutputStream extends CompressorStream { - - private static class ResetableGZIPOutputStream extends GZIPOutputStream { - /** - * Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for - * details. - */ - private static final byte[] GZIP_HEADER = new byte[] { - 0x1f, (byte) 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; - - private boolean reset = false; - - public ResetableGZIPOutputStream(OutputStream out) throws IOException { - super(out); - } - - public synchronized void resetState() throws IOException { - reset = true; - } - - @Override - public synchronized void write(byte[] buf, int off, int len) - throws IOException { - if (reset) { - def.reset(); - crc.reset(); - out.write(GZIP_HEADER); - reset = false; - } - super.write(buf, off, len); - } - - @Override - public synchronized void close() throws IOException { - reset = false; - super.close(); - } - - } - - public GzipOutputStream(OutputStream out) throws IOException { - super(new ResetableGZIPOutputStream(out)); - } - - /** - * Allow children types to put a different type in here. - * @param out the Deflater stream to use - */ - protected GzipOutputStream(CompressorStream out) { - super(out); - } - - @Override - public void close() throws IOException { - out.close(); - } - - @Override - public void flush() throws IOException { - out.flush(); - } - - @Override - public void write(int b) throws IOException { - out.write(b); - } - - @Override - public void write(byte[] data, int offset, int length) - throws IOException { - out.write(data, offset, length); - } - - @Override - public void finish() throws IOException { - ((ResetableGZIPOutputStream) out).finish(); - } - - @Override - public void resetState() throws IOException { - ((ResetableGZIPOutputStream) out).resetState(); - } - } @Override public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { - if (!ZlibFactory.isNativeZlibLoaded(conf)) { - return new GzipOutputStream(out); - } return CompressionCodec.Util. createOutputStreamWithCodecPool(this, conf, out); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java index 61f7c12541e..fcb431dce86 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java @@ -56,7 +56,6 @@ public class BuiltInGzipCompressor implements Compressor { private int numExtraBytesWritten = 0; - private int currentBufLen = 0; private int accuBufLen = 0; private final Checksum crc = DataChecksum.newCrc32(); @@ -86,10 +85,6 @@ public class BuiltInGzipCompressor implements Compressor { int compressedBytesWritten = 0; - if (currentBufLen <= 0) { - return compressedBytesWritten; - } - // If we are not within uncompressed data yet, output the header. if (state == BuiltInGzipDecompressor.GzipStateLabel.HEADER_BASIC) { int outputHeaderSize = writeHeader(b, off, len); @@ -166,7 +161,6 @@ public class BuiltInGzipCompressor implements Compressor { public void reinit(Configuration conf) { init(conf); numExtraBytesWritten = 0; - currentBufLen = 0; headerOff = 0; trailerOff = 0; crc.reset(); @@ -178,7 +172,6 @@ public class BuiltInGzipCompressor implements Compressor { deflater.reset(); state = BuiltInGzipDecompressor.GzipStateLabel.HEADER_BASIC; numExtraBytesWritten = 0; - currentBufLen = 0; headerOff = 0; trailerOff = 0; crc.reset(); @@ -201,8 +194,7 @@ public class BuiltInGzipCompressor implements Compressor { deflater.setInput(b, off, len); crc.update(b, off, len); // CRC-32 is on uncompressed data - currentBufLen = len; - accuBufLen += currentBufLen; + accuBufLen += len; } private int writeHeader(byte[] b, int off, int len) { @@ -251,7 +243,6 @@ public class BuiltInGzipCompressor implements Compressor { if (trailerOff == gzipTrailerLen) { state = BuiltInGzipDecompressor.GzipStateLabel.FINISHED; - currentBufLen = 0; headerOff = 0; trailerOff = 0; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java index e3afefd1b73..26867eed91a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.io.compress; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -1051,4 +1052,45 @@ public class TestCodec { } } } + + @Test(timeout=20000) + public void testGzipCompressorWithEmptyInput() throws IOException { + // don't use native libs + ZlibFactory.setNativeZlibLoaded(false); + Configuration conf = new Configuration(); + CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, conf); + + Compressor compressor = codec.createCompressor(); + assertThat(compressor).withFailMessage("should be BuiltInGzipCompressor") + .isInstanceOf(BuiltInGzipCompressor.class); + + byte[] b = new byte[0]; + compressor.setInput(b, 0, b.length); + compressor.finish(); + + byte[] output = new byte[100]; + int outputOff = 0; + + while (!compressor.finished()) { + byte[] buf = new byte[100]; + int compressed = compressor.compress(buf, 0, buf.length); + System.arraycopy(buf, 0, output, outputOff, compressed); + outputOff += compressed; + } + + DataInputBuffer gzbuf = new DataInputBuffer(); + gzbuf.reset(output, outputOff); + + Decompressor decom = codec.createDecompressor(); + assertThat(decom).as("decompressor should not be null").isNotNull(); + assertThat(decom).withFailMessage("should be BuiltInGzipDecompressor") + .isInstanceOf(BuiltInGzipDecompressor.class); + try (InputStream gzin = codec.createInputStream(gzbuf, decom); + DataOutputBuffer dflbuf = new DataOutputBuffer()) { + dflbuf.reset(); + IOUtils.copyBytes(gzin, dflbuf, 4096); + final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength()); + assertThat(b).as("check decompressed output").isEqualTo(dflchk); + } + } }