diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFDecoder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFDecoder.java index 7adfb39fa74..99f7954dbc2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFDecoder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFDecoder.java @@ -109,7 +109,10 @@ public class LZFDecoder { } /** - * Main decode from a stream. Decompressed bytes are placed in the outputBuffer, inputBuffer is a "scratch-area". + * Main decode from a stream. Decompressed bytes are placed in the outputBuffer, inputBuffer + * is a "scratch-area". + *

+ * If no * * @param is An input stream of LZF compressed bytes * @param inputBuffer A byte array used as a scratch area. @@ -119,18 +122,19 @@ public class LZFDecoder { public static int decompressChunk(final InputStream is, final byte[] inputBuffer, final byte[] outputBuffer) throws IOException { int bytesInOutput; - int headerLength = is.read(inputBuffer, 0, HEADER_BYTES); - if (headerLength != HEADER_BYTES) { - return -1; + /* note: we do NOT read more than 5 bytes because otherwise might need to shuffle bytes + * for output buffer (could perhaps optimize in future?) + */ + int bytesRead = readHeader(is, inputBuffer); + if ((bytesRead < HEADER_BYTES) + || inputBuffer[0] != LZFChunk.BYTE_Z || inputBuffer[1] != LZFChunk.BYTE_V) { + if (bytesRead == 0) { // probably fine, clean EOF + return -1; + } + throw new IOException("Corrupt input data, block did not start with 2 byte signature ('ZV') followed by type byte, 2-byte length)"); } - int inPtr = 0; - if (inputBuffer[inPtr] != LZFChunk.BYTE_Z || inputBuffer[inPtr + 1] != LZFChunk.BYTE_V) { - throw new IOException("Corrupt input data, block did not start with 'ZV' signature bytes"); - } - inPtr += 2; - int type = inputBuffer[inPtr++]; - int compLen = uint16(inputBuffer, inPtr); - inPtr += 2; + int type = inputBuffer[2]; + int compLen = uint16(inputBuffer, 3); if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed readFully(is, false, outputBuffer, 0, compLen); bytesInOutput = compLen; @@ -200,6 +204,38 @@ public class LZFDecoder { return ((data[ptr] & 0xFF) << 8) + (data[ptr + 1] & 0xFF); } + /** + * Helper method to forcibly load header bytes that must be read before + * chunk can be handled. + */ + protected static int readHeader(final InputStream is, final byte[] inputBuffer) + throws IOException { + // Ok: simple case first, where we just get all data we need + int needed = HEADER_BYTES; + int count = is.read(inputBuffer, 0, needed); + + if (count == needed) { + return count; + } + if (count <= 0) { + return 0; + } + + // if not, a source that trickles data (network etc); must loop + int offset = count; + needed -= count; + + do { + count = is.read(inputBuffer, offset, needed); + if (count <= 0) { + break; + } + offset += count; + needed -= count; + } while (needed > 0); + return offset; + } + private final static void readFully(InputStream is, boolean compressed, byte[] outputBuffer, int offset, int len) throws IOException { int left = len; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFInputStream.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFInputStream.java index d8c9fcfa0f4..0d39e192a2f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFInputStream.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFInputStream.java @@ -11,6 +11,12 @@ public class LZFInputStream extends InputStream { */ protected final InputStream inputStream; + /** + * Flag that indicates if we have already called 'inputStream.close()' + * (to avoid calling it multiple times) + */ + protected boolean inputStreamClosed; + /** * Flag that indicates whether we force full reads (reading of as many * bytes as requested), or 'optimal' reads (up to as many as available, @@ -31,6 +37,12 @@ public class LZFInputStream extends InputStream { /* Length of the current uncompressed bytes buffer */ private int bufferLength = 0; + /* + /////////////////////////////////////////////////////////////////////// + // Construction + /////////////////////////////////////////////////////////////////////// + */ + public LZFInputStream(final InputStream inputStream) throws IOException { this(inputStream, false); } @@ -45,31 +57,56 @@ public class LZFInputStream extends InputStream { super(); _recycler = BufferRecycler.instance(); inputStream = in; + inputStreamClosed = false; cfgFullReads = fullReads; _inputBuffer = _recycler.allocInputBuffer(LZFChunk.MAX_CHUNK_LEN); _decodedBytes = _recycler.allocDecodeBuffer(LZFChunk.MAX_CHUNK_LEN); } + /* + /////////////////////////////////////////////////////////////////////// + // InputStream impl + /////////////////////////////////////////////////////////////////////// + */ + + /** + * Method is overridden to report number of bytes that can now be read + * from decoded data buffer, without reading bytes from the underlying + * stream. + * Never throws an exception; returns number of bytes available without + * further reads from underlying source; -1 if stream has been closed, or + * 0 if an actual read (and possible blocking) is needed to find out. + */ @Override - public int read() throws IOException { - readyBuffer(); - if (bufferPosition < bufferLength) { - return _decodedBytes[bufferPosition++] & 255; + public int available() { + // if closed, return -1; + if (inputStreamClosed) { + return -1; } - return -1; + int left = (bufferLength - bufferPosition); + return (left <= 0) ? 0 : left; } + @Override + public int read() throws IOException { + if (!readyBuffer()) { + return -1; + } + return _decodedBytes[bufferPosition++] & 255; + } + + @Override public int read(final byte[] buffer) throws IOException { return read(buffer, 0, buffer.length); } + @Override public int read(final byte[] buffer, int offset, int length) throws IOException { if (length < 1) { return 0; } - readyBuffer(); - if (bufferLength < 0) { + if (!readyBuffer()) { return -1; } // First let's read however much data we happen to have... @@ -84,8 +121,7 @@ public class LZFInputStream extends InputStream { int totalRead = chunkLength; do { offset += chunkLength; - readyBuffer(); - if (bufferLength == -1) { + if (!readyBuffer()) { break; } chunkLength = Math.min(bufferLength - bufferPosition, (length - totalRead)); @@ -97,6 +133,7 @@ public class LZFInputStream extends InputStream { return totalRead; } + @Override public void close() throws IOException { bufferPosition = bufferLength = 0; byte[] buf = _inputBuffer; @@ -109,19 +146,53 @@ public class LZFInputStream extends InputStream { _decodedBytes = null; _recycler.releaseDecodeBuffer(buf); } - inputStream.close(); - + if (!inputStreamClosed) { + inputStreamClosed = true; + inputStream.close(); + } } + /* + /////////////////////////////////////////////////////////////////////// + // Additional public accessors + /////////////////////////////////////////////////////////////////////// + */ + + /** + * Method that can be used to find underlying {@link InputStream} that + * we read from to get LZF encoded data to decode. + * Will never return null; although underlying stream may be closed + * (if this stream has been closed). + * + * @since 0.8 + */ + public InputStream getUnderlyingInputStream() { + return inputStream; + } + + /* + /////////////////////////////////////////////////////////////////////// + // Internal methods + /////////////////////////////////////////////////////////////////////// + */ + /** * Fill the uncompressed bytes buffer by reading the underlying inputStream. * * @throws IOException */ - private final void readyBuffer() throws IOException { - if (bufferPosition >= bufferLength) { - bufferLength = LZFDecoder.decompressChunk(inputStream, _inputBuffer, _decodedBytes); - bufferPosition = 0; + protected boolean readyBuffer() throws IOException { + if (bufferPosition < bufferLength) { + return true; } + if (inputStreamClosed) { + return false; + } + bufferLength = LZFDecoder.decompressChunk(inputStream, _inputBuffer, _decodedBytes); + if (bufferLength < 0) { + return false; + } + bufferPosition = 0; + return (bufferPosition < bufferLength); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFOutputStream.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFOutputStream.java index a22e7b1105d..6d75b301983 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFOutputStream.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFOutputStream.java @@ -17,13 +17,32 @@ public class LZFOutputStream extends OutputStream { protected byte[] _outputBuffer; protected int _position = 0; + /** + * Flag that indicates if we have already called '_outputStream.close()' + * (to avoid calling it multiple times) + */ + protected boolean _outputStreamClosed; + + /* + /////////////////////////////////////////////////////////////////////// + // Construction + /////////////////////////////////////////////////////////////////////// + */ + public LZFOutputStream(final OutputStream outputStream) { _recycler = BufferRecycler.instance(); _encoder = new ChunkEncoder(OUTPUT_BUFFER_SIZE, _recycler); _outputStream = outputStream; _outputBuffer = _recycler.allocOutputBuffer(OUTPUT_BUFFER_SIZE); + _outputStreamClosed = false; } + /* + /////////////////////////////////////////////////////////////////////// + // OutputStream impl + /////////////////////////////////////////////////////////////////////// + */ + @Override public void write(final int singleByte) throws IOException { if (_position >= _outputBuffer.length) { @@ -75,19 +94,46 @@ public class LZFOutputStream extends OutputStream { @Override public void close() throws IOException { flush(); - _outputStream.close(); _encoder.close(); byte[] buf = _outputBuffer; if (buf != null) { _outputBuffer = null; _recycler.releaseOutputBuffer(buf); } + if (!_outputStreamClosed) { + _outputStreamClosed = true; + _outputStream.close(); + } } + /* + /////////////////////////////////////////////////////////////////////// + // Additional public accessors + /////////////////////////////////////////////////////////////////////// + */ + + /** + * Method that can be used to find underlying {@link OutputStream} that + * we write encoded LZF encoded data into, after compressing it. + * Will never return null; although underlying stream may be closed + * (if this stream has been closed). + * + * @since 0.8 + */ + public OutputStream getUnderlyingOutputStream() { + return _outputStream; + } + + /* + /////////////////////////////////////////////////////////////////////// + // Internal methods + /////////////////////////////////////////////////////////////////////// + */ + /** * Compress and write the current block to the OutputStream */ - private void writeCompressedBlock() throws IOException { + protected void writeCompressedBlock() throws IOException { int left = _position; _position = 0; int offset = 0; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamInput.java index 92aede9394f..676accd1141 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamInput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamInput.java @@ -37,6 +37,12 @@ public class LZFStreamInput extends StreamInput { */ protected StreamInput inputStream; + /** + * Flag that indicates if we have already called 'inputStream.close()' + * (to avoid calling it multiple times) + */ + protected boolean inputStreamClosed; + /** * Flag that indicates whether we force full reads (reading of as many * bytes as requested), or 'optimal' reads (up to as many as available, @@ -69,25 +75,44 @@ public class LZFStreamInput extends StreamInput { _recycler = BufferRecycler.instance(); } inputStream = in; + inputStreamClosed = false; _inputBuffer = _recycler.allocInputBuffer(LZFChunk.MAX_CHUNK_LEN); _decodedBytes = _recycler.allocDecodeBuffer(LZFChunk.MAX_CHUNK_LEN); } - @Override public int read() throws IOException { - readyBuffer(); - if (bufferPosition < bufferLength) { - return _decodedBytes[bufferPosition++] & 255; + /** + * Method is overridden to report number of bytes that can now be read + * from decoded data buffer, without reading bytes from the underlying + * stream. + * Never throws an exception; returns number of bytes available without + * further reads from underlying source; -1 if stream has been closed, or + * 0 if an actual read (and possible blocking) is needed to find out. + */ + @Override + public int available() { + // if closed, return -1; + if (inputStreamClosed) { + return -1; } - return -1; + int left = (bufferLength - bufferPosition); + return (left <= 0) ? 0 : left; } - @Override public int read(byte[] buffer, int offset, int length) throws IOException { + @Override + public int read() throws IOException { + if (!readyBuffer()) { + return -1; + } + return _decodedBytes[bufferPosition++] & 255; + } + + @Override + public int read(final byte[] buffer, int offset, int length) throws IOException { if (length < 1) { return 0; } - readyBuffer(); - if (bufferLength < 0) { + if (!readyBuffer()) { return -1; } // First let's read however much data we happen to have... @@ -102,8 +127,7 @@ public class LZFStreamInput extends StreamInput { int totalRead = chunkLength; do { offset += chunkLength; - readyBuffer(); - if (bufferLength == -1) { + if (!readyBuffer()) { break; } chunkLength = Math.min(bufferLength - bufferPosition, (length - totalRead)); @@ -116,11 +140,10 @@ public class LZFStreamInput extends StreamInput { } @Override public byte readByte() throws IOException { - readyBuffer(); - if (bufferPosition < bufferLength) { - return _decodedBytes[bufferPosition++]; + if (!readyBuffer()) { + throw new EOFException(); } - throw new EOFException(); + return _decodedBytes[bufferPosition++]; } @Override public void readBytes(byte[] b, int offset, int len) throws IOException { @@ -165,18 +188,35 @@ public class LZFStreamInput extends StreamInput { _decodedBytes = null; _recycler.releaseDecodeBuffer(buf); } - inputStream.close(); + if (!inputStreamClosed) { + inputStreamClosed = true; + inputStream.close(); + } } + /* + /////////////////////////////////////////////////////////////////////// + // Internal methods + /////////////////////////////////////////////////////////////////////// + */ + /** * Fill the uncompressed bytes buffer by reading the underlying inputStream. * - * @throws java.io.IOException + * @throws IOException */ - private void readyBuffer() throws IOException { - if (bufferPosition >= bufferLength) { - bufferLength = LZFDecoder.decompressChunk(inputStream, _inputBuffer, _decodedBytes); - bufferPosition = 0; + protected boolean readyBuffer() throws IOException { + if (bufferPosition < bufferLength) { + return true; } + if (inputStreamClosed) { + return false; + } + bufferLength = LZFDecoder.decompressChunk(inputStream, _inputBuffer, _decodedBytes); + if (bufferLength < 0) { + return false; + } + bufferPosition = 0; + return (bufferPosition < bufferLength); } }