HADOOP-6835. Add support for concatenated gzip input. Contributed by Greg Roelofs

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@961532 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Christopher Douglas 2010-07-07 23:22:28 +00:00
parent 4b34109a72
commit 2a248dfc32
10 changed files with 814 additions and 161 deletions

View File

@ -57,6 +57,9 @@ Trunk (unreleased changes)
HADOOP-6756. Documentation for common configuration keys. HADOOP-6756. Documentation for common configuration keys.
(Erik Steffl via shv) (Erik Steffl via shv)
HADOOP-6835. Add support for concatenated gzip input. (Greg Roelofs via
cdouglas)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -86,7 +86,9 @@ public class BlockDecompressorStream extends DecompressorStream {
} }
} }
if (decompressor.needsInput()) { if (decompressor.needsInput()) {
getCompressedData(); int m = getCompressedData();
// Send the read data to the decompressor
decompressor.setInput(buffer, 0, m);
} }
} }
@ -96,10 +98,10 @@ public class BlockDecompressorStream extends DecompressorStream {
return n; return n;
} }
protected void getCompressedData() throws IOException { protected int getCompressedData() throws IOException {
checkStream(); checkStream();
// Get the size of the compressed chunk // Get the size of the compressed chunk (always non-negative)
int len = rawReadInt(); int len = rawReadInt();
// Read len bytes from underlying stream // Read len bytes from underlying stream
@ -110,13 +112,12 @@ public class BlockDecompressorStream extends DecompressorStream {
while (n < len) { while (n < len) {
int count = in.read(buffer, off + n, len - n); int count = in.read(buffer, off + n, len - n);
if (count < 0) { if (count < 0) {
throw new EOFException(); throw new EOFException("Unexpected end of block in input stream");
} }
n += count; n += count;
} }
// Send the read data to the decompressor return len;
decompressor.setInput(buffer, 0, len);
} }
public void resetState() throws IOException { public void resetState() throws IOException {

View File

@ -34,8 +34,13 @@ import org.apache.hadoop.classification.InterfaceStability;
public interface Decompressor { public interface Decompressor {
/** /**
* Sets input data for decompression. * Sets input data for decompression.
* This should be called whenever #needsInput() returns * This should be called if and only if {@link #needsInput()} returns
* <code>true</code> indicating that more input data is required. * <code>true</code> indicating that more input data is required.
* (Both native and non-native versions of various Decompressors require
* that the data passed in via <code>b[]</code> remain unmodified until
* the caller is explicitly notified--via {@link #needsInput()}--that the
* buffer may be safely modified. With this requirement, an extra
* buffer-copy can be avoided.)
* *
* @param b Input data * @param b Input data
* @param off Start offset * @param off Start offset
@ -45,10 +50,12 @@ public interface Decompressor {
/** /**
* Returns true if the input data buffer is empty and * Returns true if the input data buffer is empty and
* #setInput() should be called to provide more input. * {@link #setInput(byte[], int, int)} should be called to
* provide more input.
* *
* @return <code>true</code> if the input data buffer is empty and * @return <code>true</code> if the input data buffer is empty and
* #setInput() should be called in order to provide more input. * {@link #setInput(byte[], int, int)} should be called in
* order to provide more input.
*/ */
public boolean needsInput(); public boolean needsInput();
@ -69,9 +76,9 @@ public interface Decompressor {
public boolean needsDictionary(); public boolean needsDictionary();
/** /**
* Returns true if the end of the compressed * Returns true if the end of the decompressed
* data output stream has been reached. * data output stream has been reached.
* @return <code>true</code> if the end of the compressed * @return <code>true</code> if the end of the decompressed
* data output stream has been reached. * data output stream has been reached.
*/ */
public boolean finished(); public boolean finished();
@ -79,8 +86,8 @@ public interface Decompressor {
/** /**
* Fills specified buffer with uncompressed data. Returns actual number * Fills specified buffer with uncompressed data. Returns actual number
* of bytes of uncompressed data. A return value of 0 indicates that * of bytes of uncompressed data. A return value of 0 indicates that
* #needsInput() should be called in order to determine if more input * {@link #needsInput()} should be called in order to determine if more
* data is required. * input data is required.
* *
* @param b Buffer for the compressed data * @param b Buffer for the compressed data
* @param off Start offset of the data * @param off Start offset of the data
@ -91,7 +98,15 @@ public interface Decompressor {
public int decompress(byte[] b, int off, int len) throws IOException; public int decompress(byte[] b, int off, int len) throws IOException;
/** /**
* Resets decompressor so that a new set of input data can be processed. * Returns the number of bytes remaining in the compressed-data buffer;
* typically called after the decompressor has finished decompressing
* the current gzip stream (a.k.a. "member").
*/
public int getRemaining();
/**
* Resets decompressor and input and output buffers so that a new set of
* input data can be processed.
*/ */
public void reset(); public void reset();

View File

@ -33,8 +33,11 @@ public class DecompressorStream extends CompressionInputStream {
protected byte[] buffer; protected byte[] buffer;
protected boolean eof = false; protected boolean eof = false;
protected boolean closed = false; protected boolean closed = false;
private int lastBytesSent = 0;
public DecompressorStream(InputStream in, Decompressor decompressor, int bufferSize) throws IOException { public DecompressorStream(InputStream in, Decompressor decompressor,
int bufferSize)
throws IOException {
super(in); super(in);
if (in == null || decompressor == null) { if (in == null || decompressor == null) {
@ -47,7 +50,8 @@ public class DecompressorStream extends CompressionInputStream {
buffer = new byte[bufferSize]; buffer = new byte[bufferSize];
} }
public DecompressorStream(InputStream in, Decompressor decompressor) throws IOException { public DecompressorStream(InputStream in, Decompressor decompressor)
throws IOException {
this(in, decompressor, 512); this(in, decompressor, 512);
} }
@ -83,27 +87,74 @@ public class DecompressorStream extends CompressionInputStream {
int n = 0; int n = 0;
while ((n = decompressor.decompress(b, off, len)) == 0) { while ((n = decompressor.decompress(b, off, len)) == 0) {
if (decompressor.finished() || decompressor.needsDictionary()) { if (decompressor.needsDictionary()) {
eof = true; eof = true;
return -1; return -1;
} }
if (decompressor.needsInput()) {
getCompressedData(); if (decompressor.finished()) {
// First see if there was any leftover buffered input from previous
// stream; if not, attempt to refill buffer. If refill -> EOF, we're
// all done; else reset, fix up input buffer, and get ready for next
// concatenated substream/"member".
int nRemaining = decompressor.getRemaining();
if (nRemaining == 0) {
int m = getCompressedData();
if (m == -1) {
// apparently the previous end-of-stream was also end-of-file:
// return success, as if we had never called getCompressedData()
eof = true;
return -1;
}
decompressor.reset();
decompressor.setInput(buffer, 0, m);
lastBytesSent = m;
} else {
// looks like it's a concatenated stream: reset low-level zlib (or
// other engine) and buffers, then "resend" remaining input data
decompressor.reset();
int leftoverOffset = lastBytesSent - nRemaining;
assert (leftoverOffset >= 0);
// this recopies userBuf -> direct buffer if using native libraries:
decompressor.setInput(buffer, leftoverOffset, nRemaining);
// NOTE: this is the one place we do NOT want to save the number
// of bytes sent (nRemaining here) into lastBytesSent: since we
// are resending what we've already sent before, offset is nonzero
// in general (only way it could be zero is if it already equals
// nRemaining), which would then screw up the offset calculation
// _next_ time around. IOW, getRemaining() is in terms of the
// original, zero-offset bufferload, so lastBytesSent must be as
// well. Cheesy ASCII art:
//
// <------------ m, lastBytesSent ----------->
// +===============================================+
// buffer: |1111111111|22222222222222222|333333333333| |
// +===============================================+
// #1: <-- off -->|<-------- nRemaining --------->
// #2: <----------- off ----------->|<-- nRem. -->
// #3: (final substream: nRemaining == 0; eof = true)
//
// If lastBytesSent is anything other than m, as shown, then "off"
// will be calculated incorrectly.
}
} else if (decompressor.needsInput()) {
int m = getCompressedData();
if (m == -1) {
throw new EOFException("Unexpected end of input stream");
}
decompressor.setInput(buffer, 0, m);
lastBytesSent = m;
} }
} }
return n; return n;
} }
protected void getCompressedData() throws IOException { protected int getCompressedData() throws IOException {
checkStream(); checkStream();
int n = in.read(buffer, 0, buffer.length); // note that the _caller_ is now required to call setInput() or throw
if (n == -1) { return in.read(buffer, 0, buffer.length);
throw new EOFException("Unexpected end of input stream");
}
decompressor.setInput(buffer, 0, n);
} }
protected void checkStream() throws IOException { protected void checkStream() throws IOException {

View File

@ -92,57 +92,6 @@ public class GzipCodec extends DefaultCodec {
} }
} }
@InterfaceStability.Evolving
protected static class GzipInputStream extends DecompressorStream {
private static class ResetableGZIPInputStream extends GZIPInputStream {
public ResetableGZIPInputStream(InputStream in) throws IOException {
super(in);
}
public void resetState() throws IOException {
inf.reset();
}
}
public GzipInputStream(InputStream in) throws IOException {
super(new ResetableGZIPInputStream(in));
}
/**
* Allow subclasses to directly set the inflater stream.
* @throws IOException
*/
protected GzipInputStream(DecompressorStream in) throws IOException {
super(in);
}
public int available() throws IOException {
return in.available();
}
public void close() throws IOException {
in.close();
}
public int read() throws IOException {
return in.read();
}
public int read(byte[] data, int offset, int len) throws IOException {
return in.read(data, offset, len);
}
public long skip(long offset) throws IOException {
return in.skip(offset);
}
public void resetState() throws IOException {
((ResetableGZIPInputStream) in).resetState();
}
}
public CompressionOutputStream createOutputStream(OutputStream out) public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException { throws IOException {
return (ZlibFactory.isNativeZlibLoaded(conf)) ? return (ZlibFactory.isNativeZlibLoaded(conf)) ?
@ -159,7 +108,6 @@ public class GzipCodec extends DefaultCodec {
conf.getInt("io.file.buffer.size", conf.getInt("io.file.buffer.size",
4*1024)) : 4*1024)) :
createOutputStream(out); createOutputStream(out);
} }
public Compressor createCompressor() { public Compressor createCompressor() {
@ -176,33 +124,29 @@ public class GzipCodec extends DefaultCodec {
public CompressionInputStream createInputStream(InputStream in) public CompressionInputStream createInputStream(InputStream in)
throws IOException { throws IOException {
return (ZlibFactory.isNativeZlibLoaded(conf)) ? return createInputStream(in, null);
new DecompressorStream(in, createDecompressor(),
conf.getInt("io.file.buffer.size",
4*1024)) :
new GzipInputStream(in);
} }
public CompressionInputStream createInputStream(InputStream in, public CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor) Decompressor decompressor)
throws IOException { throws IOException {
return (decompressor != null) ? if (decompressor == null) {
new DecompressorStream(in, decompressor, decompressor = createDecompressor(); // always succeeds (or throws)
conf.getInt("io.file.buffer.size", }
4*1024)) : return new DecompressorStream(in, decompressor,
createInputStream(in); conf.getInt("io.file.buffer.size", 4*1024));
} }
public Decompressor createDecompressor() { public Decompressor createDecompressor() {
return (ZlibFactory.isNativeZlibLoaded(conf)) return (ZlibFactory.isNativeZlibLoaded(conf))
? new GzipZlibDecompressor() ? new GzipZlibDecompressor()
: null; : new BuiltInGzipDecompressor();
} }
public Class<? extends Decompressor> getDecompressorType() { public Class<? extends Decompressor> getDecompressorType() {
return ZlibFactory.isNativeZlibLoaded(conf) return ZlibFactory.isNativeZlibLoaded(conf)
? GzipZlibDecompressor.class ? GzipZlibDecompressor.class
: null; : BuiltInGzipDecompressor.class;
} }
public String getDefaultExtension() { public String getDefaultExtension() {

View File

@ -52,6 +52,11 @@ public class BZip2DummyDecompressor implements Decompressor {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public int getRemaining() {
throw new UnsupportedOperationException();
}
@Override @Override
public void reset() { public void reset() {
// do nothing // do nothing

View File

@ -0,0 +1,570 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress.zlib;
import java.io.IOException;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.io.compress.Decompressor;
/**
* A {@link Decompressor} based on the popular gzip compressed file format.
* http://www.gzip.org/
*
*/
public class BuiltInGzipDecompressor implements Decompressor {
private static final int GZIP_MAGIC_ID = 0x8b1f; // if read as LE short int
private static final int GZIP_DEFLATE_METHOD = 8;
private static final int GZIP_FLAGBIT_HEADER_CRC = 0x02;
private static final int GZIP_FLAGBIT_EXTRA_FIELD = 0x04;
private static final int GZIP_FLAGBIT_FILENAME = 0x08;
private static final int GZIP_FLAGBIT_COMMENT = 0x10;
private static final int GZIP_FLAGBITS_RESERVED = 0xe0;
// 'true' (nowrap) => Inflater will handle raw deflate stream only
private Inflater inflater = new Inflater(true);
private byte[] userBuf = null;
private int userBufOff = 0;
private int userBufLen = 0;
private byte[] localBuf = new byte[256];
private int localBufOff = 0;
private int headerBytesRead = 0;
private int trailerBytesRead = 0;
private int numExtraFieldBytesRemaining = -1;
private PureJavaCrc32 crc = new PureJavaCrc32();
private boolean hasExtraField = false;
private boolean hasFilename = false;
private boolean hasComment = false;
private boolean hasHeaderCRC = false;
private GzipStateLabel state;
/**
* The current state of the gzip decoder, external to the Inflater context.
* (Technically, the private variables localBuf through hasHeaderCRC are
* also part of the state, so this enum is merely the label for it.)
*/
private static enum GzipStateLabel {
/**
* Immediately prior to or (strictly) within the 10-byte basic gzip header.
*/
HEADER_BASIC,
/**
* Immediately prior to or within the optional "extra field."
*/
HEADER_EXTRA_FIELD,
/**
* Immediately prior to or within the optional filename field.
*/
HEADER_FILENAME,
/**
* Immediately prior to or within the optional comment field.
*/
HEADER_COMMENT,
/**
* Immediately prior to or within the optional 2-byte header CRC value.
*/
HEADER_CRC,
/**
* Immediately prior to or within the main compressed (deflate) data stream.
*/
DEFLATE_STREAM,
/**
* Immediately prior to or (strictly) within the 4-byte uncompressed CRC.
*/
TRAILER_CRC,
/**
* Immediately prior to or (strictly) within the 4-byte uncompressed size.
*/
TRAILER_SIZE,
/**
* Immediately after the trailer (and potentially prior to the next gzip
* member/substream header), without reset() having been called.
*/
FINISHED;
}
/**
* Creates a new (pure Java) gzip decompressor.
*/
public BuiltInGzipDecompressor() {
state = GzipStateLabel.HEADER_BASIC;
crc.reset();
// FIXME? Inflater docs say: 'it is also necessary to provide an extra
// "dummy" byte as input. This is required by the ZLIB native
// library in order to support certain optimizations.' However,
// this does not appear to be true, and in any case, it's not
// entirely clear where the byte should go or what its value
// should be. Perhaps it suffices to have some deflated bytes
// in the first buffer load? (But how else would one do it?)
}
/** {@inheritDoc} */
public synchronized boolean needsInput() {
if (state == GzipStateLabel.DEFLATE_STREAM) { // most common case
return inflater.needsInput();
}
// see userBufLen comment at top of decompress(); currently no need to
// verify userBufLen <= 0
return (state != GzipStateLabel.FINISHED);
}
/** {@inheritDoc} */
/*
* In our case, the input data includes both gzip header/trailer bytes (which
* we handle in executeState()) and deflate-stream bytes (which we hand off
* to Inflater).
*
* NOTE: This code assumes the data passed in via b[] remains unmodified
* until _we_ signal that it's safe to modify it (via needsInput()).
* The alternative would require an additional buffer-copy even for
* the bulk deflate stream, which is a performance hit we don't want
* to absorb. (Decompressor now documents this requirement.)
*/
public synchronized void setInput(byte[] b, int off, int len) {
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
userBuf = b;
userBufOff = off;
userBufLen = len; // note: might be zero
}
/**
* Decompress the data (gzip header, deflate stream, gzip trailer) in the
* provided buffer.
*
* @return the number of decompressed bytes placed into b
*/
/* From the caller's perspective, this is where the state machine lives.
* The code is written such that we never return from decompress() with
* data remaining in userBuf unless we're in FINISHED state and there was
* data beyond the current gzip member (e.g., we're within a concatenated
* gzip stream). If this ever changes, {@link #needsInput()} will also
* need to be modified (i.e., uncomment the userBufLen condition).
*
* The actual deflate-stream processing (decompression) is handled by
* Java's Inflater class. Unlike the gzip header/trailer code (execute*
* methods below), the deflate stream is never copied; Inflater operates
* directly on the user's buffer.
*/
public synchronized int decompress(byte[] b, int off, int len)
throws IOException {
int numAvailBytes = 0;
if (state != GzipStateLabel.DEFLATE_STREAM) {
executeHeaderState();
if (userBufLen <= 0) {
return numAvailBytes;
}
}
// "executeDeflateStreamState()"
if (state == GzipStateLabel.DEFLATE_STREAM) {
// hand off user data (or what's left of it) to Inflater--but note that
// Inflater may not have consumed all of previous bufferload (e.g., if
// data highly compressed or output buffer very small), in which case
// userBufLen will be zero
if (userBufLen > 0) {
inflater.setInput(userBuf, userBufOff, userBufLen);
userBufOff += userBufLen;
userBufLen = 0;
}
// now decompress it into b[]
try {
numAvailBytes = inflater.inflate(b, off, len);
} catch (DataFormatException dfe) {
throw new IOException(dfe.getMessage());
}
crc.update(b, off, numAvailBytes); // CRC-32 is on _uncompressed_ data
if (inflater.finished()) {
state = GzipStateLabel.TRAILER_CRC;
int bytesRemaining = inflater.getRemaining();
assert (bytesRemaining >= 0) :
"logic error: Inflater finished; byte-count is inconsistent";
// could save a copy of userBufLen at call to inflater.setInput() and
// verify that bytesRemaining <= origUserBufLen, but would have to
// be a (class) member variable...seems excessive for a sanity check
userBufOff -= bytesRemaining;
userBufLen = bytesRemaining; // or "+=", but guaranteed 0 coming in
} else {
return numAvailBytes; // minor optimization
}
}
executeTrailerState();
return numAvailBytes;
}
/**
* Parse the gzip header (assuming we're in the appropriate state).
* In order to deal with degenerate cases (e.g., user buffer is one byte
* long), we copy (some) header bytes to another buffer. (Filename,
* comment, and extra-field bytes are simply skipped.)</p>
*
* See http://www.ietf.org/rfc/rfc1952.txt for the gzip spec. Note that
* no version of gzip to date (at least through 1.4.0, 2010-01-20) supports
* the FHCRC header-CRC16 flagbit; instead, the implementation treats it
* as a multi-file continuation flag (which it also doesn't support). :-(
* Sun's JDK v6 (1.6) supports the header CRC, however, and so do we.
*/
private void executeHeaderState() throws IOException {
// this can happen because DecompressorStream's decompress() is written
// to call decompress() first, setInput() second:
if (userBufLen <= 0) {
return;
}
// "basic"/required header: somewhere in first 10 bytes
if (state == GzipStateLabel.HEADER_BASIC) {
int n = Math.min(userBufLen, 10-localBufOff); // (or 10-headerBytesRead)
checkAndCopyBytesToLocal(n); // modifies userBufLen, etc.
if (localBufOff >= 10) { // should be strictly ==
processBasicHeader(); // sig, compression method, flagbits
localBufOff = 0; // no further need for basic header
state = GzipStateLabel.HEADER_EXTRA_FIELD;
}
}
if (userBufLen <= 0) {
return;
}
// optional header stuff (extra field, filename, comment, header CRC)
if (state == GzipStateLabel.HEADER_EXTRA_FIELD) {
if (hasExtraField) {
// 2 substates: waiting for 2 bytes => get numExtraFieldBytesRemaining,
// or already have 2 bytes & waiting to finish skipping specified length
if (numExtraFieldBytesRemaining < 0) {
int n = Math.min(userBufLen, 2-localBufOff);
checkAndCopyBytesToLocal(n);
if (localBufOff >= 2) {
numExtraFieldBytesRemaining = readUShortLE(localBuf, 0);
localBufOff = 0;
}
}
if (numExtraFieldBytesRemaining > 0 && userBufLen > 0) {
int n = Math.min(userBufLen, numExtraFieldBytesRemaining);
checkAndSkipBytes(n); // modifies userBufLen, etc.
numExtraFieldBytesRemaining -= n;
}
if (numExtraFieldBytesRemaining == 0) {
state = GzipStateLabel.HEADER_FILENAME;
}
} else {
state = GzipStateLabel.HEADER_FILENAME;
}
}
if (userBufLen <= 0) {
return;
}
if (state == GzipStateLabel.HEADER_FILENAME) {
if (hasFilename) {
boolean doneWithFilename = checkAndSkipBytesUntilNull();
if (!doneWithFilename) {
return; // exit early: used up entire buffer without hitting NULL
}
}
state = GzipStateLabel.HEADER_COMMENT;
}
if (userBufLen <= 0) {
return;
}
if (state == GzipStateLabel.HEADER_COMMENT) {
if (hasComment) {
boolean doneWithComment = checkAndSkipBytesUntilNull();
if (!doneWithComment) {
return; // exit early: used up entire buffer
}
}
state = GzipStateLabel.HEADER_CRC;
}
if (userBufLen <= 0) {
return;
}
if (state == GzipStateLabel.HEADER_CRC) {
if (hasHeaderCRC) {
assert (localBufOff < 2);
int n = Math.min(userBufLen, 2-localBufOff);
copyBytesToLocal(n);
if (localBufOff >= 2) {
long headerCRC = readUShortLE(localBuf, 0);
if (headerCRC != (crc.getValue() & 0xffff)) {
throw new IOException("gzip header CRC failure");
}
localBufOff = 0;
crc.reset();
state = GzipStateLabel.DEFLATE_STREAM;
}
} else {
crc.reset(); // will reuse for CRC-32 of uncompressed data
state = GzipStateLabel.DEFLATE_STREAM; // switching to Inflater now
}
}
}
/**
* Parse the gzip trailer (assuming we're in the appropriate state).
* In order to deal with degenerate cases (e.g., user buffer is one byte
* long), we copy trailer bytes (all 8 of 'em) to a local buffer.</p>
*
* See http://www.ietf.org/rfc/rfc1952.txt for the gzip spec.
*/
private void executeTrailerState() throws IOException {
if (userBufLen <= 0) {
return;
}
// verify that the CRC-32 of the decompressed stream matches the value
// stored in the gzip trailer
if (state == GzipStateLabel.TRAILER_CRC) {
// localBuf was empty before we handed off to Inflater, so we handle this
// exactly like header fields
assert (localBufOff < 4); // initially 0, but may need multiple calls
int n = Math.min(userBufLen, 4-localBufOff);
copyBytesToLocal(n);
if (localBufOff >= 4) {
long streamCRC = readUIntLE(localBuf, 0);
if (streamCRC != crc.getValue()) {
throw new IOException("gzip stream CRC failure");
}
localBufOff = 0;
crc.reset();
state = GzipStateLabel.TRAILER_SIZE;
}
}
if (userBufLen <= 0) {
return;
}
// verify that the mod-2^32 decompressed stream size matches the value
// stored in the gzip trailer
if (state == GzipStateLabel.TRAILER_SIZE) {
assert (localBufOff < 4); // initially 0, but may need multiple calls
int n = Math.min(userBufLen, 4-localBufOff);
copyBytesToLocal(n); // modifies userBufLen, etc.
if (localBufOff >= 4) { // should be strictly ==
long inputSize = readUIntLE(localBuf, 0);
if (inputSize != (inflater.getBytesWritten() & 0xffffffff)) {
throw new IOException(
"stored gzip size doesn't match decompressed size");
}
localBufOff = 0;
state = GzipStateLabel.FINISHED;
}
}
if (state == GzipStateLabel.FINISHED) {
return;
}
}
/**
* Returns the total number of compressed bytes input so far, including
* gzip header/trailer bytes.</p>
*
* @return the total (non-negative) number of compressed bytes read so far
*/
public synchronized long getBytesRead() {
return headerBytesRead + inflater.getBytesRead() + trailerBytesRead;
}
/**
* Returns the number of bytes remaining in the input buffer; normally
* called when finished() is true to determine amount of post-gzip-stream
* data. Note that, other than the finished state with concatenated data
* after the end of the current gzip stream, this will never return a
* non-zero value unless called after {@link #setInput(byte[] b, int off,
* int len)} and before {@link #decompress(byte[] b, int off, int len)}.
* (That is, after {@link #decompress(byte[] b, int off, int len)} it
* always returns zero, except in finished state with concatenated data.)</p>
*
* @return the total (non-negative) number of unprocessed bytes in input
*/
public synchronized int getRemaining() {
return userBufLen;
}
/** {@inheritDoc} */
public synchronized boolean needsDictionary() {
return inflater.needsDictionary();
}
/** {@inheritDoc} */
public synchronized void setDictionary(byte[] b, int off, int len) {
inflater.setDictionary(b, off, len);
}
/**
* Returns true if the end of the gzip substream (single "member") has been
* reached.</p>
*/
public synchronized boolean finished() {
return (state == GzipStateLabel.FINISHED);
}
/**
* Resets everything, including the input buffer, regardless of whether the
* current gzip substream is finished.</p>
*/
public synchronized void reset() {
// could optionally emit INFO message if state != GzipStateLabel.FINISHED
inflater.reset();
state = GzipStateLabel.HEADER_BASIC;
crc.reset();
userBufOff = userBufLen = 0;
localBufOff = 0;
headerBytesRead = 0;
trailerBytesRead = 0;
numExtraFieldBytesRemaining = -1;
hasExtraField = false;
hasFilename = false;
hasComment = false;
hasHeaderCRC = false;
}
/** {@inheritDoc} */
public synchronized void end() {
inflater.end();
}
/**
* Check ID bytes (throw if necessary), compression method (throw if not 8),
* and flag bits (set hasExtraField, hasFilename, hasComment, hasHeaderCRC).
* Ignore MTIME, XFL, OS. Caller must ensure we have at least 10 bytes (at
* the start of localBuf).</p>
*/
/*
* Flag bits (remainder are reserved and must be zero):
* bit 0 FTEXT
* bit 1 FHCRC (never implemented in gzip, at least through version
* 1.4.0; instead interpreted as "continuation of multi-
* part gzip file," which is unsupported through 1.4.0)
* bit 2 FEXTRA
* bit 3 FNAME
* bit 4 FCOMMENT
* [bit 5 encrypted]
*/
private void processBasicHeader() throws IOException {
if (readUShortLE(localBuf, 0) != GZIP_MAGIC_ID) {
throw new IOException("not a gzip file");
}
if (readUByte(localBuf, 2) != GZIP_DEFLATE_METHOD) {
throw new IOException("gzip data not compressed with deflate method");
}
int flg = readUByte(localBuf, 3);
if ((flg & GZIP_FLAGBITS_RESERVED) != 0) {
throw new IOException("unknown gzip format (reserved flagbits set)");
}
hasExtraField = ((flg & GZIP_FLAGBIT_EXTRA_FIELD) != 0);
hasFilename = ((flg & GZIP_FLAGBIT_FILENAME) != 0);
hasComment = ((flg & GZIP_FLAGBIT_COMMENT) != 0);
hasHeaderCRC = ((flg & GZIP_FLAGBIT_HEADER_CRC) != 0);
}
private void checkAndCopyBytesToLocal(int len) {
System.arraycopy(userBuf, userBufOff, localBuf, localBufOff, len);
localBufOff += len;
// alternatively, could call checkAndSkipBytes(len) for rest...
crc.update(userBuf, userBufOff, len);
userBufOff += len;
userBufLen -= len;
headerBytesRead += len;
}
private void checkAndSkipBytes(int len) {
crc.update(userBuf, userBufOff, len);
userBufOff += len;
userBufLen -= len;
headerBytesRead += len;
}
// returns true if saw NULL, false if ran out of buffer first; called _only_
// during gzip-header processing (not trailer)
// (caller can check before/after state of userBufLen to compute num bytes)
private boolean checkAndSkipBytesUntilNull() {
boolean hitNull = false;
if (userBufLen > 0) {
do {
hitNull = (userBuf[userBufOff] == 0);
crc.update(userBuf[userBufOff]);
++userBufOff;
--userBufLen;
++headerBytesRead;
} while (userBufLen > 0 && !hitNull);
}
return hitNull;
}
// this one doesn't update the CRC and does support trailer processing but
// otherwise is same as its "checkAnd" sibling
private void copyBytesToLocal(int len) {
System.arraycopy(userBuf, userBufOff, localBuf, localBufOff, len);
localBufOff += len;
userBufOff += len;
userBufLen -= len;
if (state == GzipStateLabel.TRAILER_CRC ||
state == GzipStateLabel.TRAILER_SIZE) {
trailerBytesRead += len;
} else {
headerBytesRead += len;
}
}
private int readUByte(byte[] b, int off) {
return ((int)b[off] & 0xff);
}
// caller is responsible for not overrunning buffer
private int readUShortLE(byte[] b, int off) {
return ((((b[off+1] & 0xff) << 8) |
((b[off] & 0xff) )) & 0xffff);
}
// caller is responsible for not overrunning buffer
private long readUIntLE(byte[] b, int off) {
return ((((long)(b[off+3] & 0xff) << 24) |
((long)(b[off+2] & 0xff) << 16) |
((long)(b[off+1] & 0xff) << 8) |
((long)(b[off] & 0xff) )) & 0xffffffff);
}
}

View File

@ -166,7 +166,7 @@ public class ZlibDecompressor implements Decompressor {
} }
public synchronized boolean needsInput() { public synchronized boolean needsInput() {
// Consume remanining compressed data? // Consume remaining compressed data?
if (uncompressedDirectBuf.remaining() > 0) { if (uncompressedDirectBuf.remaining() > 0) {
return false; return false;
} }
@ -189,7 +189,7 @@ public class ZlibDecompressor implements Decompressor {
} }
public synchronized boolean finished() { public synchronized boolean finished() {
// Check if 'zlib' says its 'finished' and // Check if 'zlib' says it's 'finished' and
// all compressed data has been consumed // all compressed data has been consumed
return (finished && uncompressedDirectBuf.remaining() == 0); return (finished && uncompressedDirectBuf.remaining() == 0);
} }
@ -221,7 +221,7 @@ public class ZlibDecompressor implements Decompressor {
n = inflateBytesDirect(); n = inflateBytesDirect();
uncompressedDirectBuf.limit(n); uncompressedDirectBuf.limit(n);
// Get atmost 'len' bytes // Get at most 'len' bytes
n = Math.min(n, len); n = Math.min(n, len);
((ByteBuffer)uncompressedDirectBuf).get(b, off, n); ((ByteBuffer)uncompressedDirectBuf).get(b, off, n);
@ -229,9 +229,9 @@ public class ZlibDecompressor implements Decompressor {
} }
/** /**
* Returns the total number of compressed bytes output so far. * Returns the total number of uncompressed bytes output so far.
* *
* @return the total (non-negative) number of compressed bytes output so far * @return the total (non-negative) number of uncompressed bytes output so far
*/ */
public synchronized long getBytesWritten() { public synchronized long getBytesWritten() {
checkStream(); checkStream();
@ -239,15 +239,30 @@ public class ZlibDecompressor implements Decompressor {
} }
/** /**
* Returns the total number of uncompressed bytes input so far.</p> * Returns the total number of compressed bytes input so far.</p>
* *
* @return the total (non-negative) number of uncompressed bytes input so far * @return the total (non-negative) number of compressed bytes input so far
*/ */
public synchronized long getBytesRead() { public synchronized long getBytesRead() {
checkStream(); checkStream();
return getBytesRead(stream); return getBytesRead(stream);
} }
/**
* Returns the number of bytes remaining in the input buffers; normally
* called when finished() is true to determine amount of post-gzip-stream
* data.</p>
*
* @return the total (non-negative) number of unprocessed bytes in input
*/
public synchronized int getRemaining() {
checkStream();
return userBufLen + getRemaining(stream); // userBuf + compressedDirectBuf
}
/**
* Resets everything including the input buffers (user and direct).</p>
*/
public synchronized void reset() { public synchronized void reset() {
checkStream(); checkStream();
reset(stream); reset(stream);
@ -282,6 +297,7 @@ public class ZlibDecompressor implements Decompressor {
private native int inflateBytesDirect(); private native int inflateBytesDirect();
private native static long getBytesRead(long strm); private native static long getBytesRead(long strm);
private native static long getBytesWritten(long strm); private native static long getBytesWritten(long strm);
private native static int getRemaining(long strm);
private native static void reset(long strm); private native static void reset(long strm);
private native static void end(long strm); private native static void end(long strm);
} }

View File

@ -291,6 +291,13 @@ Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_getBytesWritten(
return (ZSTREAM(stream))->total_out; return (ZSTREAM(stream))->total_out;
} }
JNIEXPORT jint JNICALL
Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_getRemaining(
JNIEnv *env, jclass cls, jlong stream
) {
return (ZSTREAM(stream))->avail_in;
}
JNIEXPORT void JNICALL JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_reset( Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_reset(
JNIEnv *env, jclass cls, jlong stream JNIEnv *env, jclass cls, jlong stream

View File

@ -39,6 +39,7 @@ import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -53,6 +54,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.CompressorStream; import org.apache.hadoop.io.compress.CompressorStream;
import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
import org.apache.hadoop.io.compress.zlib.BuiltInZlibDeflater; import org.apache.hadoop.io.compress.zlib.BuiltInZlibDeflater;
import org.apache.hadoop.io.compress.zlib.BuiltInZlibInflater; import org.apache.hadoop.io.compress.zlib.BuiltInZlibInflater;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel; import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
@ -70,8 +72,7 @@ import static org.junit.Assert.*;
public class TestCodec { public class TestCodec {
private static final Log LOG= private static final Log LOG= LogFactory.getLog(TestCodec.class);
LogFactory.getLog(TestCodec.class);
private Configuration conf = new Configuration(); private Configuration conf = new Configuration();
private int count = 10000; private int count = 10000;
@ -277,7 +278,7 @@ public class TestCodec {
@Test @Test
public void testCodecPoolGzipReuse() throws Exception { public void testCodecPoolGzipReuse() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean("hadoop.native.lib", true); conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
if (!ZlibFactory.isNativeZlibLoaded(conf)) { if (!ZlibFactory.isNativeZlibLoaded(conf)) {
LOG.warn("testCodecPoolGzipReuse skipped: native libs not loaded"); LOG.warn("testCodecPoolGzipReuse skipped: native libs not loaded");
return; return;
@ -362,7 +363,7 @@ public class TestCodec {
@Test @Test
public void testCodecInitWithCompressionLevel() throws Exception { public void testCodecInitWithCompressionLevel() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean("io.native.lib.available", true); conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
if (ZlibFactory.isNativeZlibLoaded(conf)) { if (ZlibFactory.isNativeZlibLoaded(conf)) {
LOG.info("testCodecInitWithCompressionLevel with native"); LOG.info("testCodecInitWithCompressionLevel with native");
codecTestWithNOCompression(conf, codecTestWithNOCompression(conf,
@ -374,7 +375,7 @@ public class TestCodec {
+ ": native libs not loaded"); + ": native libs not loaded");
} }
conf = new Configuration(); conf = new Configuration();
conf.setBoolean("io.native.lib.available", false); conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, false);
codecTestWithNOCompression( conf, codecTestWithNOCompression( conf,
"org.apache.hadoop.io.compress.DefaultCodec"); "org.apache.hadoop.io.compress.DefaultCodec");
} }
@ -382,14 +383,14 @@ public class TestCodec {
@Test @Test
public void testCodecPoolCompressorReinit() throws Exception { public void testCodecPoolCompressorReinit() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean("hadoop.native.lib", true); conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
if (ZlibFactory.isNativeZlibLoaded(conf)) { if (ZlibFactory.isNativeZlibLoaded(conf)) {
GzipCodec gzc = ReflectionUtils.newInstance(GzipCodec.class, conf); GzipCodec gzc = ReflectionUtils.newInstance(GzipCodec.class, conf);
gzipReinitTest(conf, gzc); gzipReinitTest(conf, gzc);
} else { } else {
LOG.warn("testCodecPoolCompressorReinit skipped: native libs not loaded"); LOG.warn("testCodecPoolCompressorReinit skipped: native libs not loaded");
} }
conf.setBoolean("hadoop.native.lib", false); conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, false);
DefaultCodec dfc = ReflectionUtils.newInstance(DefaultCodec.class, conf); DefaultCodec dfc = ReflectionUtils.newInstance(DefaultCodec.class, conf);
gzipReinitTest(conf, dfc); gzipReinitTest(conf, dfc);
} }
@ -490,49 +491,89 @@ public class TestCodec {
} }
@Test @Test
public void testCodecPoolAndGzipDecompressor() { public void testGzipCompatibility() throws IOException {
// BuiltInZlibInflater should not be used as the GzipCodec decompressor. Random r = new Random();
// Assert that this is the case. long seed = r.nextLong();
r.setSeed(seed);
LOG.info("seed: " + seed);
DataOutputBuffer dflbuf = new DataOutputBuffer();
GZIPOutputStream gzout = new GZIPOutputStream(dflbuf);
byte[] b = new byte[r.nextInt(128 * 1024 + 1)];
r.nextBytes(b);
gzout.write(b);
gzout.close();
DataInputBuffer gzbuf = new DataInputBuffer();
gzbuf.reset(dflbuf.getData(), dflbuf.getLength());
// Don't use native libs for this test.
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean("hadoop.native.lib", false); conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, false);
assertFalse("ZlibFactory is using native libs against request", CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, conf);
ZlibFactory.isNativeZlibLoaded(conf)); Decompressor decom = codec.createDecompressor();
assertNotNull(decom);
assertEquals(BuiltInGzipDecompressor.class, decom.getClass());
InputStream gzin = codec.createInputStream(gzbuf, decom);
// This should give us a BuiltInZlibInflater. dflbuf.reset();
Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf); IOUtils.copyBytes(gzin, dflbuf, 4096);
assertNotNull("zlibDecompressor is null!", zlibDecompressor); final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
assertTrue("ZlibFactory returned unexpected inflator", assertArrayEquals(b, dflchk);
zlibDecompressor instanceof BuiltInZlibInflater);
// Asking for a decompressor directly from GzipCodec should return null;
// its createOutputStream() just wraps the existing stream in a
// java.util.zip.GZIPOutputStream.
CompressionCodecFactory ccf = new CompressionCodecFactory(conf);
CompressionCodec codec = ccf.getCodec(new Path("foo.gz"));
assertTrue("Codec for .gz file is not GzipCodec", codec instanceof GzipCodec);
Decompressor codecDecompressor = codec.createDecompressor();
if (null != codecDecompressor) {
fail("Got non-null codecDecompressor: " + codecDecompressor);
} }
// Asking the CodecPool for a decompressor for GzipCodec void GzipConcatTest(Configuration conf,
// should return null as well. Class<? extends Decompressor> decomClass) throws IOException {
Decompressor poolDecompressor = CodecPool.getDecompressor(codec); Random r = new Random();
if (null != poolDecompressor) { long seed = r.nextLong();
fail("Got non-null poolDecompressor: " + poolDecompressor); r.setSeed(seed);
LOG.info(decomClass + " seed: " + seed);
final int CONCAT = r.nextInt(4) + 3;
final int BUFLEN = 128 * 1024;
DataOutputBuffer dflbuf = new DataOutputBuffer();
DataOutputBuffer chkbuf = new DataOutputBuffer();
byte[] b = new byte[BUFLEN];
for (int i = 0; i < CONCAT; ++i) {
GZIPOutputStream gzout = new GZIPOutputStream(dflbuf);
r.nextBytes(b);
int len = r.nextInt(BUFLEN);
int off = r.nextInt(BUFLEN - len);
chkbuf.write(b, off, len);
gzout.write(b, off, len);
gzout.close();
}
final byte[] chk = Arrays.copyOf(chkbuf.getData(), chkbuf.getLength());
CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, conf);
Decompressor decom = codec.createDecompressor();
assertNotNull(decom);
assertEquals(decomClass, decom.getClass());
DataInputBuffer gzbuf = new DataInputBuffer();
gzbuf.reset(dflbuf.getData(), dflbuf.getLength());
InputStream gzin = codec.createInputStream(gzbuf, decom);
dflbuf.reset();
IOUtils.copyBytes(gzin, dflbuf, 4096);
final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
assertArrayEquals(chk, dflchk);
} }
// If we then ensure that the pool is populated... @Test
CodecPool.returnDecompressor(zlibDecompressor); public void testBuiltInGzipConcat() throws IOException {
Configuration conf = new Configuration();
// Asking the pool another time should still not bind this to GzipCodec. conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, false);
poolDecompressor = CodecPool.getDecompressor(codec); GzipConcatTest(conf, BuiltInGzipDecompressor.class);
if (null != poolDecompressor) {
fail("Second time, got non-null poolDecompressor: "
+ poolDecompressor);
} }
@Test
public void testNativeGzipConcat() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
if (!ZlibFactory.isNativeZlibLoaded(conf)) {
LOG.warn("skipped: native libs not loaded");
return;
}
GzipConcatTest(conf, GzipCodec.GzipZlibDecompressor.class);
} }
@Test @Test
@ -542,7 +583,7 @@ public class TestCodec {
// Don't use native libs for this test. // Don't use native libs for this test.
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean("hadoop.native.lib", false); conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, false);
assertFalse("ZlibFactory is using native libs against request", assertFalse("ZlibFactory is using native libs against request",
ZlibFactory.isNativeZlibLoaded(conf)); ZlibFactory.isNativeZlibLoaded(conf));
@ -595,7 +636,7 @@ public class TestCodec {
// Don't use native libs for this test. // Don't use native libs for this test.
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean("hadoop.native.lib", false); conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, false);
assertFalse("ZlibFactory is using native libs against request", assertFalse("ZlibFactory is using native libs against request",
ZlibFactory.isNativeZlibLoaded(conf)); ZlibFactory.isNativeZlibLoaded(conf));