HADOOP-17887. Remove the wrapper class GzipOutputStream (#3377)

This commit is contained in:
Liang-Chi Hsieh 2021-09-08 21:23:25 -07:00 committed by GitHub
parent bddc9bf63c
commit e708836641
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 102 deletions

View File

@ -24,7 +24,6 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -41,101 +40,10 @@ import org.apache.hadoop.io.compress.zlib.ZlibFactory;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class GzipCodec extends DefaultCodec { public class GzipCodec extends DefaultCodec {
/**
* A bridge that wraps around a DeflaterOutputStream to make it
* a CompressionOutputStream.
*/
@InterfaceStability.Evolving
protected static class GzipOutputStream extends CompressorStream {
private static class ResetableGZIPOutputStream extends GZIPOutputStream {
/**
* Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for
* details.
*/
private static final byte[] GZIP_HEADER = new byte[] {
0x1f, (byte) 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 };
private boolean reset = false;
public ResetableGZIPOutputStream(OutputStream out) throws IOException {
super(out);
}
public synchronized void resetState() throws IOException {
reset = true;
}
@Override
public synchronized void write(byte[] buf, int off, int len)
throws IOException {
if (reset) {
def.reset();
crc.reset();
out.write(GZIP_HEADER);
reset = false;
}
super.write(buf, off, len);
}
@Override
public synchronized void close() throws IOException {
reset = false;
super.close();
}
}
public GzipOutputStream(OutputStream out) throws IOException {
super(new ResetableGZIPOutputStream(out));
}
/**
* Allow children types to put a different type in here.
* @param out the Deflater stream to use
*/
protected GzipOutputStream(CompressorStream out) {
super(out);
}
@Override
public void close() throws IOException {
out.close();
}
@Override
public void flush() throws IOException {
out.flush();
}
@Override
public void write(int b) throws IOException {
out.write(b);
}
@Override
public void write(byte[] data, int offset, int length)
throws IOException {
out.write(data, offset, length);
}
@Override
public void finish() throws IOException {
((ResetableGZIPOutputStream) out).finish();
}
@Override
public void resetState() throws IOException {
((ResetableGZIPOutputStream) out).resetState();
}
}
@Override @Override
public CompressionOutputStream createOutputStream(OutputStream out) public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException { throws IOException {
if (!ZlibFactory.isNativeZlibLoaded(conf)) {
return new GzipOutputStream(out);
}
return CompressionCodec.Util. return CompressionCodec.Util.
createOutputStreamWithCodecPool(this, conf, out); createOutputStreamWithCodecPool(this, conf, out);
} }

View File

@ -56,7 +56,6 @@ public class BuiltInGzipCompressor implements Compressor {
private int numExtraBytesWritten = 0; private int numExtraBytesWritten = 0;
private int currentBufLen = 0;
private int accuBufLen = 0; private int accuBufLen = 0;
private final Checksum crc = DataChecksum.newCrc32(); private final Checksum crc = DataChecksum.newCrc32();
@ -86,10 +85,6 @@ public class BuiltInGzipCompressor implements Compressor {
int compressedBytesWritten = 0; int compressedBytesWritten = 0;
if (currentBufLen <= 0) {
return compressedBytesWritten;
}
// If we are not within uncompressed data yet, output the header. // If we are not within uncompressed data yet, output the header.
if (state == BuiltInGzipDecompressor.GzipStateLabel.HEADER_BASIC) { if (state == BuiltInGzipDecompressor.GzipStateLabel.HEADER_BASIC) {
int outputHeaderSize = writeHeader(b, off, len); int outputHeaderSize = writeHeader(b, off, len);
@ -166,7 +161,6 @@ public class BuiltInGzipCompressor implements Compressor {
public void reinit(Configuration conf) { public void reinit(Configuration conf) {
init(conf); init(conf);
numExtraBytesWritten = 0; numExtraBytesWritten = 0;
currentBufLen = 0;
headerOff = 0; headerOff = 0;
trailerOff = 0; trailerOff = 0;
crc.reset(); crc.reset();
@ -178,7 +172,6 @@ public class BuiltInGzipCompressor implements Compressor {
deflater.reset(); deflater.reset();
state = BuiltInGzipDecompressor.GzipStateLabel.HEADER_BASIC; state = BuiltInGzipDecompressor.GzipStateLabel.HEADER_BASIC;
numExtraBytesWritten = 0; numExtraBytesWritten = 0;
currentBufLen = 0;
headerOff = 0; headerOff = 0;
trailerOff = 0; trailerOff = 0;
crc.reset(); crc.reset();
@ -201,8 +194,7 @@ public class BuiltInGzipCompressor implements Compressor {
deflater.setInput(b, off, len); deflater.setInput(b, off, len);
crc.update(b, off, len); // CRC-32 is on uncompressed data crc.update(b, off, len); // CRC-32 is on uncompressed data
currentBufLen = len; accuBufLen += len;
accuBufLen += currentBufLen;
} }
private int writeHeader(byte[] b, int off, int len) { private int writeHeader(byte[] b, int off, int len) {
@ -251,7 +243,6 @@ public class BuiltInGzipCompressor implements Compressor {
if (trailerOff == gzipTrailerLen) { if (trailerOff == gzipTrailerLen) {
state = BuiltInGzipDecompressor.GzipStateLabel.FINISHED; state = BuiltInGzipDecompressor.GzipStateLabel.FINISHED;
currentBufLen = 0;
headerOff = 0; headerOff = 0;
trailerOff = 0; trailerOff = 0;
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.io.compress; package org.apache.hadoop.io.compress;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -1051,4 +1052,45 @@ public class TestCodec {
} }
} }
} }
@Test(timeout=20000)
public void testGzipCompressorWithEmptyInput() throws IOException {
// don't use native libs
ZlibFactory.setNativeZlibLoaded(false);
Configuration conf = new Configuration();
CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, conf);
Compressor compressor = codec.createCompressor();
assertThat(compressor).withFailMessage("should be BuiltInGzipCompressor")
.isInstanceOf(BuiltInGzipCompressor.class);
byte[] b = new byte[0];
compressor.setInput(b, 0, b.length);
compressor.finish();
byte[] output = new byte[100];
int outputOff = 0;
while (!compressor.finished()) {
byte[] buf = new byte[100];
int compressed = compressor.compress(buf, 0, buf.length);
System.arraycopy(buf, 0, output, outputOff, compressed);
outputOff += compressed;
}
DataInputBuffer gzbuf = new DataInputBuffer();
gzbuf.reset(output, outputOff);
Decompressor decom = codec.createDecompressor();
assertThat(decom).as("decompressor should not be null").isNotNull();
assertThat(decom).withFailMessage("should be BuiltInGzipDecompressor")
.isInstanceOf(BuiltInGzipDecompressor.class);
try (InputStream gzin = codec.createInputStream(gzbuf, decom);
DataOutputBuffer dflbuf = new DataOutputBuffer()) {
dflbuf.reset();
IOUtils.copyBytes(gzin, dflbuf, 4096);
final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
assertThat(b).as("check decompressed output").isEqualTo(dflchk);
}
}
} }