diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java index 11fcf60e76e..9bd861da9e8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java @@ -41,27 +41,54 @@ import org.apache.hadoop.io.compress.zlib.ZlibFactory; @InterfaceStability.Evolving public class GzipCodec extends DefaultCodec { /** - * A bridge that wraps around a DeflaterOutputStream to make it + * A bridge that wraps around a DeflaterOutputStream to make it * a CompressionOutputStream. */ @InterfaceStability.Evolving protected static class GzipOutputStream extends CompressorStream { private static class ResetableGZIPOutputStream extends GZIPOutputStream { + /** + * Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for + * details. + */ + private static final byte[] GZIP_HEADER = new byte[] { + 0x1f, (byte) 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; + + private boolean reset = false; public ResetableGZIPOutputStream(OutputStream out) throws IOException { super(out); } - public void resetState() throws IOException { - def.reset(); + public synchronized void resetState() throws IOException { + reset = true; } + + @Override + public synchronized void write(byte[] buf, int off, int len) + throws IOException { + if (reset) { + def.reset(); + crc.reset(); + out.write(GZIP_HEADER); + reset = false; + } + super.write(buf, off, len); + } + + @Override + public synchronized void close() throws IOException { + reset = false; + super.close(); + } + } public GzipOutputStream(OutputStream out) throws IOException { super(new ResetableGZIPOutputStream(out)); } - + /** * Allow children types to put a different type in here. * @param out the Deflater stream to use @@ -69,7 +96,7 @@ public class GzipCodec extends DefaultCodec { protected GzipOutputStream(CompressorStream out) { super(out); } - + @Override public void close() throws IOException { out.close(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestGzipCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestGzipCodec.java new file mode 100644 index 00000000000..c8c1a4786e0 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestGzipCodec.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Random; +import java.util.zip.GZIPInputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Verify resettable compressor. + */ +public class TestGzipCodec { + + private static final Logger LOG = + LoggerFactory.getLogger(TestGzipCodec.class); + + private static final String DATA1 = "Dogs don't know it's not bacon!\n"; + private static final String DATA2 = "It's baconnnn!!\n"; + private GzipCodec codec = new GzipCodec(); + + @Before + public void setUp() { + codec.setConf(new Configuration(false)); + } + + // Test simple compression. + @Test + public void testSingleCompress() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + CompressionOutputStream cmpOut = codec.createOutputStream(baos); + cmpOut.write(DATA1.getBytes(StandardCharsets.UTF_8)); + cmpOut.finish(); + cmpOut.close(); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + GZIPInputStream cmpIn = new GZIPInputStream(bais); + byte[] buf = new byte[1024]; + int len = cmpIn.read(buf); + String result = new String(buf, 0, len, StandardCharsets.UTF_8); + assertEquals("Input must match output", DATA1, result); + } + + // Test multi-member gzip file created via finish(), resetState(). + @Test + public void testResetCompress() throws IOException { + DataOutputBuffer dob = new DataOutputBuffer(); + CompressionOutputStream cmpOut = codec.createOutputStream(dob); + cmpOut.write(DATA1.getBytes(StandardCharsets.UTF_8)); + cmpOut.finish(); + cmpOut.resetState(); + cmpOut.write(DATA2.getBytes(StandardCharsets.UTF_8)); + cmpOut.finish(); + cmpOut.close(); + dob.close(); + + DataInputBuffer dib = new DataInputBuffer(); + dib.reset(dob.getData(), 0, dob.getLength()); + CompressionInputStream cmpIn = codec.createInputStream(dib); + byte[] buf = new byte[1024]; + StringBuilder result = new StringBuilder(); + int len = 0; + while (true) { + len = cmpIn.read(buf); + if (len < 0) { + break; + } + result.append(new String(buf, 0, len, StandardCharsets.UTF_8)); + } + assertEquals("Output must match input", DATA1 + DATA2, result.toString()); + } + + // ensure all necessary methods are overwritten + @Test + public void testWriteOverride() throws IOException { + Random r = new Random(); + long seed = r.nextLong(); + LOG.info("seed: " + seed); + r.setSeed(seed); + byte[] buf = new byte[128]; + r.nextBytes(buf); + DataOutputBuffer dob = new DataOutputBuffer(); + CompressionOutputStream cmpOut = codec.createOutputStream(dob); + cmpOut.write(buf); + int i = r.nextInt(128 - 10); + int l = r.nextInt(128 - i); + cmpOut.write(buf, i, l); + cmpOut.write((byte)(r.nextInt() & 0xFF)); + cmpOut.close(); + + r.setSeed(seed); + DataInputBuffer dib = new DataInputBuffer(); + dib.reset(dob.getData(), 0, dob.getLength()); + CompressionInputStream cmpIn = codec.createInputStream(dib); + byte[] vbuf = new byte[128]; + assertEquals(128, cmpIn.read(vbuf)); + assertArrayEquals(buf, vbuf); + r.nextBytes(vbuf); + int vi = r.nextInt(128 - 10); + int vl = r.nextInt(128 - vi); + assertEquals(vl, cmpIn.read(vbuf, 0, vl)); + assertArrayEquals(Arrays.copyOfRange(buf, i, i + l), + Arrays.copyOf(vbuf, vl)); + assertEquals(r.nextInt() & 0xFF, cmpIn.read()); + assertEquals(-1, cmpIn.read()); + } + + // don't write a new header if no data are written after reset + @Test + public void testIdempotentResetState() throws IOException { + DataOutputBuffer dob = new DataOutputBuffer(); + CompressionOutputStream cmpOut = codec.createOutputStream(dob); + cmpOut.write(DATA1.getBytes(StandardCharsets.UTF_8)); + cmpOut.finish(); + cmpOut.finish(); + cmpOut.finish(); + cmpOut.resetState(); + cmpOut.resetState(); + cmpOut.finish(); + cmpOut.resetState(); + cmpOut.close(); + dob.close(); + + DataInputBuffer dib = new DataInputBuffer(); + dib.reset(dob.getData(), 0, dob.getLength()); + CompressionInputStream cmpIn = codec.createInputStream(dib); + byte[] buf = new byte[1024]; + StringBuilder result = new StringBuilder(); + int len = 0; + while (true) { + len = cmpIn.read(buf); + if (len < 0) { + break; + } + result.append(new String(buf, 0, len, StandardCharsets.UTF_8)); + } + assertEquals("Output must match input", DATA1, result.toString()); + } +}