From aebd27afbda5b6d4651348055e5ce2d7db3bb793 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Tue, 19 Jun 2012 04:07:11 +0200 Subject: [PATCH] abstract compression abstract the LZF compression into a compress package allowing for different implementation in the future --- pom.xml | 12 + .../elasticsearch/common/CacheRecycler.java | 2 - .../compress/CompressedStreamInput.java | 174 +++++++++++ .../compress/CompressedStreamOutput.java | 134 +++++++++ .../common/compress/CompressedString.java | 17 +- .../common/compress/Compressor.java | 51 ++++ .../common/compress/CompressorFactory.java | 107 +++++++ .../common/compress/lzf/BufferRecycler.java | 163 ----------- .../common/compress/lzf/ChunkDecoder.java | 228 --------------- .../common/compress/lzf/ChunkEncoder.java | 269 ----------------- .../common/compress/lzf/LZF.java | 92 ------ .../common/compress/lzf/LZFChunk.java | 125 -------- .../lzf/LZFCompressedStreamInput.java | 74 +++++ .../lzf/LZFCompressedStreamOutput.java | 63 ++++ .../common/compress/lzf/LZFCompressor.java | 91 ++++++ .../common/compress/lzf/LZFDecoder.java | 55 ---- .../common/compress/lzf/LZFEncoder.java | 96 ------ .../compress/lzf/impl/UnsafeChunkDecoder.java | 251 ---------------- .../lzf/impl/VanillaChunkDecoder.java | 274 ------------------ .../common/compress/lzf/package-info.java | 27 -- .../lzf/util/ChunkDecoderFactory.java | 75 ----- .../common/io/stream/CachedStreamInput.java | 20 +- .../common/io/stream/CachedStreamOutput.java | 33 +-- .../common/io/stream/LZFStreamInput.java | 236 --------------- .../common/io/stream/LZFStreamOutput.java | 173 ----------- .../common/xcontent/XContentHelper.java | 30 +- .../zen/ping/multicast/MulticastZenPing.java | 4 +- .../publish/PublishClusterStateAction.java | 15 +- .../gateway/blobstore/BlobStoreGateway.java | 15 +- .../state/shards/LocalGatewayShardsState.java | 12 +- .../elasticsearch/index/get/GetResult.java | 15 +- .../index/mapper/core/BinaryFieldMapper.java | 26 +- .../mapper/internal/SourceFieldMapper.java | 47 +-- .../action/support/RestXContentBuilder.java | 20 +- .../search/internal/InternalSearchHit.java | 15 +- .../transport/local/LocalTransport.java | 6 +- .../local/LocalTransportChannel.java | 5 +- .../netty/MessageChannelHandler.java | 16 +- .../transport/support/TransportStreams.java | 19 +- .../engine/robin/SimpleRobinEngineTests.java | 3 +- .../source/CompressSourceMappingTests.java | 12 +- .../source/DefaultSourceMappingTests.java | 12 +- 42 files changed, 871 insertions(+), 2243 deletions(-) create mode 100644 src/main/java/org/elasticsearch/common/compress/CompressedStreamInput.java create mode 100644 src/main/java/org/elasticsearch/common/compress/CompressedStreamOutput.java create mode 100644 src/main/java/org/elasticsearch/common/compress/Compressor.java create mode 100644 src/main/java/org/elasticsearch/common/compress/CompressorFactory.java delete mode 100644 src/main/java/org/elasticsearch/common/compress/lzf/BufferRecycler.java delete mode 100755 src/main/java/org/elasticsearch/common/compress/lzf/ChunkDecoder.java delete mode 100644 src/main/java/org/elasticsearch/common/compress/lzf/ChunkEncoder.java delete mode 100644 src/main/java/org/elasticsearch/common/compress/lzf/LZF.java delete mode 100644 src/main/java/org/elasticsearch/common/compress/lzf/LZFChunk.java create mode 100644 src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedStreamInput.java create mode 100644 src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedStreamOutput.java create mode 100644 src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressor.java delete mode 100644 src/main/java/org/elasticsearch/common/compress/lzf/LZFDecoder.java delete mode 100644 src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java delete mode 100755 src/main/java/org/elasticsearch/common/compress/lzf/impl/UnsafeChunkDecoder.java delete mode 100755 src/main/java/org/elasticsearch/common/compress/lzf/impl/VanillaChunkDecoder.java delete mode 100644 src/main/java/org/elasticsearch/common/compress/lzf/package-info.java delete mode 100755 src/main/java/org/elasticsearch/common/compress/lzf/util/ChunkDecoderFactory.java delete mode 100644 src/main/java/org/elasticsearch/common/io/stream/LZFStreamInput.java delete mode 100644 src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java diff --git a/pom.xml b/pom.xml index f51f8ece333..2eb76e748c2 100644 --- a/pom.xml +++ b/pom.xml @@ -143,6 +143,13 @@ 3.5.0.Final compile + + + com.ning + compress-lzf + 0.9.5 + compile + @@ -307,6 +314,7 @@ org.yaml:snakeyaml joda-time:joda-time io.netty:netty + com.ning:compress-lzf @@ -346,6 +354,10 @@ org.jboss.netty org.elasticsearch.common.netty + + com.ning.compress + org.elasticsearch.common.compress + diff --git a/src/main/java/org/elasticsearch/common/CacheRecycler.java b/src/main/java/org/elasticsearch/common/CacheRecycler.java index fd712e922f9..b7afa95365b 100644 --- a/src/main/java/org/elasticsearch/common/CacheRecycler.java +++ b/src/main/java/org/elasticsearch/common/CacheRecycler.java @@ -21,7 +21,6 @@ package org.elasticsearch.common; import gnu.trove.map.hash.*; import jsr166y.LinkedTransferQueue; -import org.elasticsearch.common.compress.lzf.BufferRecycler; import org.elasticsearch.common.trove.ExtTDoubleObjectHashMap; import org.elasticsearch.common.trove.ExtTHashMap; import org.elasticsearch.common.trove.ExtTLongObjectHashMap; @@ -33,7 +32,6 @@ import java.util.Queue; public class CacheRecycler { public static void clear() { - BufferRecycler.clean(); hashMap.clear(); doubleObjectHashMap.clear(); longObjectHashMap.clear(); diff --git a/src/main/java/org/elasticsearch/common/compress/CompressedStreamInput.java b/src/main/java/org/elasticsearch/common/compress/CompressedStreamInput.java new file mode 100644 index 00000000000..4acd6cf358a --- /dev/null +++ b/src/main/java/org/elasticsearch/common/compress/CompressedStreamInput.java @@ -0,0 +1,174 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.compress; + +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +/** + */ +public abstract class CompressedStreamInput extends StreamInput { + + private final StreamInput in; + + private boolean closed; + + protected byte[] uncompressed; + private int position = 0; + private int valid = 0; + + public CompressedStreamInput(StreamInput in) throws IOException { + this.in = in; + readHeader(in); + } + + /** + * Expert!, resets to buffer start, without the need to decompress it again. + */ + public void resetToBufferStart() { + this.position = 0; + } + + /** + * Method is overridden to report number of bytes that can now be read + * from decoded data buffer, without reading bytes from the underlying + * stream. + * Never throws an exception; returns number of bytes available without + * further reads from underlying source; -1 if stream has been closed, or + * 0 if an actual read (and possible blocking) is needed to find out. + */ + @Override + public int available() throws IOException { + // if closed, return -1; + if (closed) { + return -1; + } + int left = (valid - position); + return (left <= 0) ? 0 : left; + } + + @Override + public int read() throws IOException { + if (!readyBuffer()) { + return -1; + } + return uncompressed[position++] & 255; + } + + @Override + public byte readByte() throws IOException { + if (!readyBuffer()) { + throw new EOFException(); + } + return uncompressed[position++]; + } + + @Override + public int read(byte[] buffer, int offset, int length) throws IOException { + return read(buffer, offset, length, false); + } + + public int read(byte[] buffer, int offset, int length, boolean fullRead) throws IOException { + if (length < 1) { + return 0; + } + if (!readyBuffer()) { + return -1; + } + // First let's read however much data we happen to have... + int chunkLength = Math.min(valid - position, length); + System.arraycopy(uncompressed, position, buffer, offset, chunkLength); + position += chunkLength; + + if (chunkLength == length || !fullRead) { + return chunkLength; + } + // Need more data, then + int totalRead = chunkLength; + do { + offset += chunkLength; + if (!readyBuffer()) { + break; + } + chunkLength = Math.min(valid - position, (length - totalRead)); + System.arraycopy(uncompressed, position, buffer, offset, chunkLength); + position += chunkLength; + totalRead += chunkLength; + } while (totalRead < length); + + return totalRead; + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + int result = read(b, offset, len, true /* we want to have full reads, thats the contract... */); + if (result < len) { + throw new EOFException(); + } + } + + @Override + public void reset() throws IOException { + this.position = 0; + this.valid = 0; + in.reset(); + } + + @Override + public void close() throws IOException { + position = valid = 0; + if (!closed) { + closed = true; + doClose(); + in.close(); + } + } + + protected abstract void doClose() throws IOException; + + /** + * Fill the uncompressed bytes buffer by reading the underlying inputStream. + */ + protected boolean readyBuffer() throws IOException { + if (position < valid) { + return true; + } + if (closed) { + return false; + } + valid = uncompress(in, uncompressed); + if (valid < 0) { + return false; + } + position = 0; + return (position < valid); + } + + protected abstract void readHeader(StreamInput in) throws IOException; + + /** + * Uncompress the data into the out array, returning the size uncompressed + */ + protected abstract int uncompress(InputStream in, byte[] out) throws IOException; + +} diff --git a/src/main/java/org/elasticsearch/common/compress/CompressedStreamOutput.java b/src/main/java/org/elasticsearch/common/compress/CompressedStreamOutput.java new file mode 100644 index 00000000000..eccb224c112 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/compress/CompressedStreamOutput.java @@ -0,0 +1,134 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.compress; + +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + */ +public abstract class CompressedStreamOutput extends StreamOutput { + + private final StreamOutput out; + + protected byte[] uncompressed; + private int position = 0; + + private boolean closed; + + public CompressedStreamOutput(StreamOutput out) throws IOException { + this.out = out; + writeHeader(out); + } + + @Override + public void write(int b) throws IOException { + if (position >= uncompressed.length) { + flushBuffer(); + } + uncompressed[position++] = (byte) b; + } + + @Override + public void writeByte(byte b) throws IOException { + if (position >= uncompressed.length) { + flushBuffer(); + } + uncompressed[position++] = b; + } + + @Override + public void writeBytes(byte[] input, int offset, int length) throws IOException { + // ES, check if length is 0, and don't write in this case + if (length == 0) { + return; + } + final int BUFFER_LEN = uncompressed.length; + + // simple case first: buffering only (for trivially short writes) + int free = BUFFER_LEN - position; + if (free >= length) { + System.arraycopy(input, offset, uncompressed, position, length); + position += length; + return; + } + // fill partial input as much as possible and flush + if (position > 0) { + System.arraycopy(input, offset, uncompressed, position, free); + position += free; + flushBuffer(); + offset += free; + length -= free; + } + + // then write intermediate full block, if any, without copying: + while (length >= BUFFER_LEN) { + compress(input, offset, BUFFER_LEN, out); + offset += BUFFER_LEN; + length -= BUFFER_LEN; + } + + // and finally, copy leftovers in input, if any + if (length > 0) { + System.arraycopy(input, offset, uncompressed, 0, length); + } + position = length; + } + + @Override + public void flush() throws IOException { + flushBuffer(); + out.flush(); + } + + @Override + public void close() throws IOException { + if (!closed) { + flushBuffer(); + closed = true; + doClose(); + out.close(); + } + } + + protected abstract void doClose() throws IOException; + + @Override + public void reset() throws IOException { + position = 0; + out.reset(); + } + + private void flushBuffer() throws IOException { + if (position > 0) { + compress(uncompressed, 0, position, out); + position = 0; + } + } + + protected abstract void writeHeader(StreamOutput out) throws IOException; + + /** + * Compresses the data into the output + */ + protected abstract void compress(byte[] data, int offset, int len, StreamOutput out) throws IOException; + +} diff --git a/src/main/java/org/elasticsearch/common/compress/CompressedString.java b/src/main/java/org/elasticsearch/common/compress/CompressedString.java index 0fd564cd09b..57b24c7e5f0 100644 --- a/src/main/java/org/elasticsearch/common/compress/CompressedString.java +++ b/src/main/java/org/elasticsearch/common/compress/CompressedString.java @@ -21,9 +21,6 @@ package org.elasticsearch.common.compress; import org.apache.lucene.util.UnicodeUtil; import org.elasticsearch.common.Unicode; -import org.elasticsearch.common.compress.lzf.LZF; -import org.elasticsearch.common.compress.lzf.LZFDecoder; -import org.elasticsearch.common.compress.lzf.LZFEncoder; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -58,16 +55,19 @@ public class CompressedString implements Streamable { * @throws IOException */ public CompressedString(byte[] data, int offset, int length) throws IOException { - if (LZF.isCompressed(data, offset, length)) { + Compressor compressor = CompressorFactory.compressor(data, offset, length); + if (compressor != null) { + // already compressed... this.bytes = Arrays.copyOfRange(data, offset, offset + length); } else { - this.bytes = LZFEncoder.encode(data, offset, length); + // default to LZF + this.bytes = CompressorFactory.defaultCompressor().compress(data, offset, length); } } public CompressedString(String str) throws IOException { UnicodeUtil.UTF8Result result = Unicode.unsafeFromStringAsUtf8(str); - this.bytes = LZFEncoder.encode(result.result, result.length); + this.bytes = CompressorFactory.defaultCompressor().compress(result.result, 0, result.length); } public byte[] compressed() { @@ -75,11 +75,12 @@ public class CompressedString implements Streamable { } public byte[] uncompressed() throws IOException { - return LZFDecoder.decode(bytes); + Compressor compressor = CompressorFactory.compressor(bytes); + return compressor.uncompress(bytes, 0, bytes.length); } public String string() throws IOException { - return Unicode.fromBytes(LZFDecoder.decode(bytes)); + return Unicode.fromBytes(uncompressed()); } public static CompressedString readCompressedString(StreamInput in) throws IOException { diff --git a/src/main/java/org/elasticsearch/common/compress/Compressor.java b/src/main/java/org/elasticsearch/common/compress/Compressor.java new file mode 100644 index 00000000000..ebd13d79233 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/compress/Compressor.java @@ -0,0 +1,51 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.compress; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.jboss.netty.buffer.ChannelBuffer; + +import java.io.IOException; + +/** + */ +public interface Compressor { + + String type(); + + boolean isCompressed(byte[] data, int offset, int length); + + boolean isCompressed(ChannelBuffer buffer); + + /** + * Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(byte[], int, int)}. + */ + byte[] uncompress(byte[] data, int offset, int length) throws IOException; + + /** + * Compresses the provided data, data can be detected as compressed using {@link #isCompressed(byte[], int, int)}. + */ + byte[] compress(byte[] data, int offset, int length) throws IOException; + + CompressedStreamInput streamInput(StreamInput in) throws IOException; + + CompressedStreamOutput streamOutput(StreamOutput out) throws IOException; +} diff --git a/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java b/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java new file mode 100644 index 00000000000..cb8195b5e68 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java @@ -0,0 +1,107 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.compress; + +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.common.BytesHolder; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.compress.lzf.LZFCompressor; +import org.jboss.netty.buffer.ChannelBuffer; + +import java.io.IOException; + +/** + */ +public class CompressorFactory { + + private static final LZFCompressor LZF = new LZFCompressor(); + + private static final Compressor[] compressors; + private static final ImmutableMap compressorsByType; + + static { + compressors = new Compressor[1]; + compressors[0] = LZF; + + MapBuilder compressorsByTypeX = MapBuilder.newMapBuilder(); + for (Compressor compressor : compressors) { + compressorsByTypeX.put(compressor.type(), compressor); + } + compressorsByType = compressorsByTypeX.immutableMap(); + } + + public static Compressor defaultCompressor() { + return LZF; + } + + public static boolean isCompressed(byte[] data) { + return compressor(data, 0, data.length) != null; + } + + public static boolean isCompressed(byte[] data, int offset, int length) { + return compressor(data, offset, length) != null; + } + + @Nullable + public static Compressor compressor(BytesHolder bytes) { + return compressor(bytes.bytes(), bytes.offset(), bytes.length()); + } + + @Nullable + public static Compressor compressor(byte[] data) { + return compressor(data, 0, data.length); + } + + @Nullable + public static Compressor compressor(byte[] data, int offset, int length) { + for (Compressor compressor : compressors) { + if (compressor.isCompressed(data, offset, length)) { + return compressor; + } + } + return null; + } + + @Nullable + public static Compressor compressor(ChannelBuffer buffer) { + for (Compressor compressor : compressors) { + if (compressor.isCompressed(buffer)) { + return compressor; + } + } + return null; + } + + public static Compressor compressor(String type) { + return compressorsByType.get(type); + } + + /** + * Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(byte[], int, int)}. + */ + public static BytesHolder uncompressIfNeeded(BytesHolder bytes) throws IOException { + Compressor compressor = compressor(bytes); + if (compressor != null) { + return new BytesHolder(compressor.uncompress(bytes.bytes(), bytes.offset(), bytes.length())); + } + return bytes; + } +} diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/BufferRecycler.java b/src/main/java/org/elasticsearch/common/compress/lzf/BufferRecycler.java deleted file mode 100644 index 23c42ad4940..00000000000 --- a/src/main/java/org/elasticsearch/common/compress/lzf/BufferRecycler.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.compress.lzf; - -import java.lang.ref.SoftReference; - -/** - * Simple helper class to encapsulate details of basic buffer - * recycling scheme, which helps a lot (as per profiling) for - * smaller encoding cases. - * - * @author tatu - */ -public class BufferRecycler { - private final static int MIN_ENCODING_BUFFER = 4000; - - private final static int MIN_OUTPUT_BUFFER = 8000; - - /** - * This ThreadLocal contains a {@link java.lang.ref.SoftReference} - * to a {@link BufferRecycler} used to provide a low-cost - * buffer recycling for buffers we need for encoding, decoding. - */ - final protected static ThreadLocal> _recyclerRef - = new ThreadLocal>(); - - - private byte[] _inputBuffer; - private byte[] _outputBuffer; - - private byte[] _decodingBuffer; - private byte[] _encodingBuffer; - - private int[] _encodingHash; - - - /** - * Accessor to get thread-local recycler instance - */ - public static BufferRecycler instance() { - SoftReference ref = _recyclerRef.get(); - BufferRecycler br = (ref == null) ? null : ref.get(); - if (br == null) { - br = new BufferRecycler(); - _recyclerRef.set(new SoftReference(br)); - } - return br; - } - - public static void clean() { - _recyclerRef.remove(); - } - - /* - /////////////////////////////////////////////////////////////////////// - // Buffers for encoding (output) - /////////////////////////////////////////////////////////////////////// - */ - - public byte[] allocEncodingBuffer(int minSize) { - byte[] buf = _encodingBuffer; - if (buf == null || buf.length < minSize) { - buf = new byte[Math.max(minSize, MIN_ENCODING_BUFFER)]; - } else { - _encodingBuffer = null; - } - return buf; - } - - public void releaseEncodeBuffer(byte[] buffer) { - if (_encodingBuffer == null || buffer.length > _encodingBuffer.length) { - _encodingBuffer = buffer; - } - } - - public byte[] allocOutputBuffer(int minSize) { - byte[] buf = _outputBuffer; - if (buf == null || buf.length < minSize) { - buf = new byte[Math.max(minSize, MIN_OUTPUT_BUFFER)]; - } else { - _outputBuffer = null; - } - return buf; - } - - public void releaseOutputBuffer(byte[] buffer) { - if (_outputBuffer == null || (buffer != null && buffer.length > _outputBuffer.length)) { - _outputBuffer = buffer; - } - } - - public int[] allocEncodingHash(int suggestedSize) { - int[] buf = _encodingHash; - if (buf == null || buf.length < suggestedSize) { - buf = new int[suggestedSize]; - } else { - _encodingHash = null; - } - return buf; - } - - public void releaseEncodingHash(int[] buffer) { - if (_encodingHash == null || (buffer != null && buffer.length > _encodingHash.length)) { - _encodingHash = buffer; - } - } - - /* - /////////////////////////////////////////////////////////////////////// - // Buffers for decoding (input) - /////////////////////////////////////////////////////////////////////// - */ - - public byte[] allocInputBuffer(int minSize) { - byte[] buf = _inputBuffer; - if (buf == null || buf.length < minSize) { - buf = new byte[Math.max(minSize, MIN_OUTPUT_BUFFER)]; - } else { - _inputBuffer = null; - } - return buf; - } - - public void releaseInputBuffer(byte[] buffer) { - if (_inputBuffer == null || (buffer != null && buffer.length > _inputBuffer.length)) { - _inputBuffer = buffer; - } - } - - public byte[] allocDecodeBuffer(int size) { - byte[] buf = _decodingBuffer; - if (buf == null || buf.length < size) { - buf = new byte[size]; - } else { - _decodingBuffer = null; - } - return buf; - } - - public void releaseDecodeBuffer(byte[] buffer) { - if (_decodingBuffer == null || (buffer != null && buffer.length > _decodingBuffer.length)) { - _decodingBuffer = buffer; - } - } - -} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/ChunkDecoder.java b/src/main/java/org/elasticsearch/common/compress/lzf/ChunkDecoder.java deleted file mode 100755 index 1b787361949..00000000000 --- a/src/main/java/org/elasticsearch/common/compress/lzf/ChunkDecoder.java +++ /dev/null @@ -1,228 +0,0 @@ -package org.elasticsearch.common.compress.lzf; - -import java.io.IOException; -import java.io.InputStream; - -/** - * Decoder that handles decoding of sequence of encoded LZF chunks, - * combining them into a single contiguous result byte array. - * - * @author Tatu Saloranta (tatu@ning.com) - * @since 0.9 - */ -public abstract class ChunkDecoder { - protected final static byte BYTE_NULL = 0; - protected final static int HEADER_BYTES = 5; - - public ChunkDecoder() { - } - - /* - /////////////////////////////////////////////////////////////////////// - // Public API - /////////////////////////////////////////////////////////////////////// - */ - - /** - * Method for decompressing a block of input data encoded in LZF - * block structure (compatible with lzf command line utility), - * and can consist of any number of blocks. - * Note that input MUST consists of a sequence of one or more complete - * chunks; partial chunks can not be handled. - */ - public final byte[] decode(final byte[] inputBuffer) throws IOException { - byte[] result = new byte[calculateUncompressedSize(inputBuffer, 0, inputBuffer.length)]; - decode(inputBuffer, 0, inputBuffer.length, result); - return result; - } - - /** - * Method for decompressing a block of input data encoded in LZF - * block structure (compatible with lzf command line utility), - * and can consist of any number of blocks. - * Note that input MUST consists of a sequence of one or more complete - * chunks; partial chunks can not be handled. - */ - public final byte[] decode(final byte[] inputBuffer, int inputPtr, int inputLen) throws IOException { - byte[] result = new byte[calculateUncompressedSize(inputBuffer, inputPtr, inputLen)]; - decode(inputBuffer, inputPtr, inputLen, result); - return result; - } - - /** - * Method for decompressing a block of input data encoded in LZF - * block structure (compatible with lzf command line utility), - * and can consist of any number of blocks. - * Note that input MUST consists of a sequence of one or more complete - * chunks; partial chunks can not be handled. - */ - public final int decode(final byte[] inputBuffer, final byte[] targetBuffer) throws IOException { - return decode(inputBuffer, 0, inputBuffer.length, targetBuffer); - } - - /** - * Method for decompressing a block of input data encoded in LZF - * block structure (compatible with lzf command line utility), - * and can consist of any number of blocks. - * Note that input MUST consists of a sequence of one or more complete - * chunks; partial chunks can not be handled. - */ - public int decode(final byte[] sourceBuffer, int inPtr, int inLength, - final byte[] targetBuffer) throws IOException { - byte[] result = targetBuffer; - int outPtr = 0; - int blockNr = 0; - - final int end = inPtr + inLength - 1; // -1 to offset possible end marker - - while (inPtr < end) { - // let's do basic sanity checks; no point in skimping with these checks - if (sourceBuffer[inPtr] != LZFChunk.BYTE_Z || sourceBuffer[inPtr + 1] != LZFChunk.BYTE_V) { - throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + inPtr + "): did not start with 'ZV' signature bytes"); - } - inPtr += 2; - int type = sourceBuffer[inPtr++]; - int len = uint16(sourceBuffer, inPtr); - inPtr += 2; - if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed - System.arraycopy(sourceBuffer, inPtr, result, outPtr, len); - outPtr += len; - } else { // compressed - int uncompLen = uint16(sourceBuffer, inPtr); - inPtr += 2; - decodeChunk(sourceBuffer, inPtr, result, outPtr, outPtr + uncompLen); - outPtr += uncompLen; - } - inPtr += len; - ++blockNr; - } - return outPtr; - } - - /** - * Main decode from a stream. Decompressed bytes are placed in the outputBuffer, inputBuffer - * is a "scratch-area". - * - * @param is An input stream of LZF compressed bytes - * @param inputBuffer A byte array used as a scratch area. - * @param outputBuffer A byte array in which the result is returned - * @return The number of bytes placed in the outputBuffer. - */ - public abstract int decodeChunk(final InputStream is, final byte[] inputBuffer, final byte[] outputBuffer) - throws IOException; - - /** - * Main decode method for individual chunks. - */ - public abstract void decodeChunk(byte[] in, int inPos, byte[] out, int outPos, int outEnd) - throws IOException; - - /* - /////////////////////////////////////////////////////////////////////// - // Public static methods - /////////////////////////////////////////////////////////////////////// - */ - - /** - * Helper method that will calculate total uncompressed size, for sequence of - * one or more LZF blocks stored in given byte array. - * Will do basic sanity checking, so that this method can be called to - * verify against some types of corruption. - */ - public static int calculateUncompressedSize(byte[] data, int ptr, int length) throws IOException { - int uncompressedSize = 0; - int blockNr = 0; - final int end = ptr + length; - - while (ptr < end) { - // can use optional end marker - if (ptr == (data.length + 1) && data[ptr] == BYTE_NULL) { - ++ptr; // so that we'll be at end - break; - } - // simpler to handle bounds checks by catching exception here... - try { - if (data[ptr] != LZFChunk.BYTE_Z || data[ptr + 1] != LZFChunk.BYTE_V) { - throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + ptr + "): did not start with 'ZV' signature bytes"); - } - int type = (int) data[ptr + 2]; - int blockLen = uint16(data, ptr + 3); - if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed - ptr += 5; - uncompressedSize += blockLen; - } else if (type == LZFChunk.BLOCK_TYPE_COMPRESSED) { // compressed - uncompressedSize += uint16(data, ptr + 5); - ptr += 7; - } else { // unknown... CRC-32 would be 2, but that's not implemented by cli tool - throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + ptr + "): unrecognized block type " + (type & 0xFF)); - } - ptr += blockLen; - } catch (ArrayIndexOutOfBoundsException e) { - throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + ptr + "): truncated block header"); - } - ++blockNr; - } - // one more sanity check: - if (ptr != end) { - throw new IOException("Corrupt input data: block #" + blockNr + " extends " + (data.length - ptr) + " beyond end of input"); - } - return uncompressedSize; - } - - /* - /////////////////////////////////////////////////////////////////////// - // Internal methods - /////////////////////////////////////////////////////////////////////// - */ - - protected final static int uint16(byte[] data, int ptr) { - return ((data[ptr] & 0xFF) << 8) + (data[ptr + 1] & 0xFF); - } - - /** - * Helper method to forcibly load header bytes that must be read before - * chunk can be handled. - */ - protected final static int readHeader(final InputStream is, final byte[] inputBuffer) - throws IOException { - // Ok: simple case first, where we just get all data we need - int needed = HEADER_BYTES; - int count = is.read(inputBuffer, 0, needed); - - if (count == needed) { - return count; - } - if (count <= 0) { - return 0; - } - - // if not, a source that trickles data (network etc); must loop - int offset = count; - needed -= count; - - do { - count = is.read(inputBuffer, offset, needed); - if (count <= 0) { - break; - } - offset += count; - needed -= count; - } while (needed > 0); - return offset; - } - - protected final static void readFully(InputStream is, boolean compressed, - byte[] outputBuffer, int offset, int len) throws IOException { - int left = len; - while (left > 0) { - int count = is.read(outputBuffer, offset, left); - if (count < 0) { // EOF not allowed here - throw new IOException("EOF in " + len + " byte (" - + (compressed ? "" : "un") + "compressed) block: could only read " - + (len - left) + " bytes"); - } - offset += count; - left -= count; - } - } -} diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/ChunkEncoder.java b/src/main/java/org/elasticsearch/common/compress/lzf/ChunkEncoder.java deleted file mode 100644 index 4a08b2f275f..00000000000 --- a/src/main/java/org/elasticsearch/common/compress/lzf/ChunkEncoder.java +++ /dev/null @@ -1,269 +0,0 @@ -/* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this - * file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under - * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - * OF ANY KIND, either express or implied. See the License for the specific language - * governing permissions and limitations under the License. - */ - -package org.elasticsearch.common.compress.lzf; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * Class that handles actual encoding of individual chunks. - * Resulting chunks can be compressed or non-compressed; compression - * is only used if it actually reduces chunk size (including overhead - * of additional header bytes) - * - * @author Tatu Saloranta (tatu@ning.com) - */ -public class ChunkEncoder { - // Beyond certain point we won't be able to compress; let's use 16 bytes as cut-off - private static final int MIN_BLOCK_TO_COMPRESS = 16; - - private static final int MIN_HASH_SIZE = 256; - - // Not much point in bigger tables, with 8k window - private static final int MAX_HASH_SIZE = 16384; - - private static final int MAX_OFF = 1 << 13; // 8k - private static final int MAX_REF = (1 << 8) + (1 << 3); // 264 - - // // Encoding tables etc - - private final BufferRecycler _recycler; - - /** - * Hash table contains lookup based on 3-byte sequence; key is hash - * of such triplet, value is offset in buffer. - */ - private int[] _hashTable; - - private final int _hashModulo; - - /** - * Buffer in which encoded content is stored during processing - */ - private byte[] _encodeBuffer; - - /** - * Small buffer passed to LZFChunk, needed for writing chunk header - */ - private byte[] _headerBuffer; - - /** - * @param totalLength Total encoded length; used for calculating size - * of hash table to use - */ - // ES: Added recycler as a parameter so we can control its caching - public ChunkEncoder(int totalLength, BufferRecycler recycler) { - int largestChunkLen = Math.max(totalLength, LZFChunk.MAX_CHUNK_LEN); - - int suggestedHashLen = calcHashLen(largestChunkLen); - _recycler = recycler; - _hashTable = _recycler.allocEncodingHash(suggestedHashLen); - _hashModulo = _hashTable.length - 1; - // Ok, then, what's the worst case output buffer length? - // length indicator for each 32 literals, so: - int bufferLen = largestChunkLen + ((largestChunkLen + 31) >> 5); - _encodeBuffer = _recycler.allocEncodingBuffer(bufferLen); - } - - /* - /////////////////////////////////////////////////////////////////////// - // Public API - /////////////////////////////////////////////////////////////////////// - */ - - /** - * Method to close once encoder is no longer in use. Note: after calling - * this method, further calls to {@link #encodeChunk} will fail - */ - public void close() { - byte[] buf = _encodeBuffer; - if (buf != null) { - _encodeBuffer = null; - _recycler.releaseEncodeBuffer(buf); - } - int[] ibuf = _hashTable; - if (ibuf != null) { - _hashTable = null; - _recycler.releaseEncodingHash(ibuf); - } - } - - /** - * Method for compressing (or not) individual chunks - */ - public LZFChunk encodeChunk(byte[] data, int offset, int len) { - if (len >= MIN_BLOCK_TO_COMPRESS) { - /* If we have non-trivial block, and can compress it by at least - * 2 bytes (since header is 2 bytes longer), let's compress: - */ - int compLen = tryCompress(data, offset, offset + len, _encodeBuffer, 0); - if (compLen < (len - 2)) { // nah; just return uncompressed - return LZFChunk.createCompressed(len, _encodeBuffer, 0, compLen); - } - } - // Otherwise leave uncompressed: - return LZFChunk.createNonCompressed(data, offset, len); - } - - /** - * Method for encoding individual chunk, writing it to given output stream. - */ - public void encodeAndWriteChunk(byte[] data, int offset, int len, OutputStream out) - throws IOException { - byte[] headerBuf = _headerBuffer; - if (headerBuf == null) { - _headerBuffer = headerBuf = new byte[LZFChunk.MAX_HEADER_LEN]; - } - if (len >= MIN_BLOCK_TO_COMPRESS) { - /* If we have non-trivial block, and can compress it by at least - * 2 bytes (since header is 2 bytes longer), let's compress: - */ - int compLen = tryCompress(data, offset, offset + len, _encodeBuffer, 0); - if (compLen < (len - 2)) { // nah; just return uncompressed - LZFChunk.writeCompressedHeader(len, compLen, out, headerBuf); - out.write(_encodeBuffer, 0, compLen); - return; - } - } - // Otherwise leave uncompressed: - LZFChunk.writeNonCompressedHeader(len, out, headerBuf); - out.write(data, offset, len); - } - - /* - /////////////////////////////////////////////////////////////////////// - // Internal methods - /////////////////////////////////////////////////////////////////////// - */ - - private static int calcHashLen(int chunkSize) { - // in general try get hash table size of 2x input size - chunkSize += chunkSize; - // but no larger than max size: - if (chunkSize >= MAX_HASH_SIZE) { - return MAX_HASH_SIZE; - } - // otherwise just need to round up to nearest 2x - int hashLen = MIN_HASH_SIZE; - while (hashLen < chunkSize) { - hashLen += hashLen; - } - return hashLen; - } - - private int first(byte[] in, int inPos) { - return (in[inPos] << 8) + (in[inPos + 1] & 255); - } - - /* - private static int next(int v, byte[] in, int inPos) { - return (v << 8) + (in[inPos + 2] & 255); - } -*/ - - private final int hash(int h) { - // or 184117; but this seems to give better hashing? - return ((h * 57321) >> 9) & _hashModulo; - // original lzf-c.c used this: - //return (((h ^ (h << 5)) >> (24 - HLOG) - h*5) & _hashModulo; - // but that didn't seem to provide better matches - } - - private int tryCompress(byte[] in, int inPos, int inEnd, byte[] out, int outPos) { - final int[] hashTable = _hashTable; - ++outPos; - int seen = first(in, 0); // past 4 bytes we have seen... (last one is LSB) - int literals = 0; - inEnd -= 4; - final int firstPos = inPos; // so that we won't have back references across block boundary - - while (inPos < inEnd) { - byte p2 = in[inPos + 2]; - // next - seen = (seen << 8) + (p2 & 255); - int off = hash(seen); - int ref = hashTable[off]; - hashTable[off] = inPos; - - // First expected common case: no back-ref (for whatever reason) - if (ref >= inPos // can't refer forward (i.e. leftovers) - || ref < firstPos // or to previous block - || (off = inPos - ref) > MAX_OFF - || in[ref + 2] != p2 // must match hash - || in[ref + 1] != (byte) (seen >> 8) - || in[ref] != (byte) (seen >> 16)) { - out[outPos++] = in[inPos++]; - literals++; - if (literals == LZFChunk.MAX_LITERAL) { - out[outPos - 33] = (byte) 31; // <= out[outPos - literals - 1] = MAX_LITERAL_MINUS_1; - literals = 0; - outPos++; - } - continue; - } - // match - int maxLen = inEnd - inPos + 2; - if (maxLen > MAX_REF) { - maxLen = MAX_REF; - } - if (literals == 0) { - outPos--; - } else { - out[outPos - literals - 1] = (byte) (literals - 1); - literals = 0; - } - int len = 3; - while (len < maxLen && in[ref + len] == in[inPos + len]) { - len++; - } - len -= 2; - --off; // was off by one earlier - if (len < 7) { - out[outPos++] = (byte) ((off >> 8) + (len << 5)); - } else { - out[outPos++] = (byte) ((off >> 8) + (7 << 5)); - out[outPos++] = (byte) (len - 7); - } - out[outPos++] = (byte) off; - outPos++; - inPos += len; - seen = first(in, inPos); - seen = (seen << 8) + (in[inPos + 2] & 255); - hashTable[hash(seen)] = inPos; - ++inPos; - seen = (seen << 8) + (in[inPos + 2] & 255); // hash = next(hash, in, inPos); - hashTable[hash(seen)] = inPos; - ++inPos; - } - // try offlining the tail - return handleTail(in, inPos, inEnd + 4, out, outPos, literals); - } - - private int handleTail(byte[] in, int inPos, int inEnd, byte[] out, int outPos, - int literals) { - while (inPos < inEnd) { - out[outPos++] = in[inPos++]; - literals++; - if (literals == LZFChunk.MAX_LITERAL) { - out[outPos - literals - 1] = (byte) (literals - 1); - literals = 0; - outPos++; - } - } - out[outPos - literals - 1] = (byte) (literals - 1); - if (literals == 0) { - outPos--; - } - return outPos; - } - -} diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/LZF.java b/src/main/java/org/elasticsearch/common/compress/lzf/LZF.java deleted file mode 100644 index 55484fa3f6b..00000000000 --- a/src/main/java/org/elasticsearch/common/compress/lzf/LZF.java +++ /dev/null @@ -1,92 +0,0 @@ -/* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this - * file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under - * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - * OF ANY KIND, either express or implied. See the License for the specific language - * governing permissions and limitations under the License. - */ - -package org.elasticsearch.common.compress.lzf; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; - -/** - * Simple command-line utility that can be used for testing LZF - * compression, or as rudimentary command-line tool. - * Arguments are the same as used by the "standard" lzf command line tool - * - * @author tatu@ning.com - */ -public class LZF { - - public static boolean isCompressed(final byte[] buffer) { - return buffer.length >= 2 && buffer[0] == LZFChunk.BYTE_Z && buffer[1] == LZFChunk.BYTE_V; - } - - public static boolean isCompressed(final byte[] buffer, int offset, int length) { - return length >= 2 && buffer[offset] == LZFChunk.BYTE_Z && buffer[offset + 1] == LZFChunk.BYTE_V; - } - - public final static String SUFFIX = ".lzf"; - - void process(String[] args) throws IOException { - if (args.length == 2) { - String oper = args[0]; - boolean compress = "-c".equals(oper); - if (compress || "-d".equals(oper)) { - String filename = args[1]; - File src = new File(filename); - if (!src.exists()) { - System.err.println("File '" + filename + "' does not exist."); - System.exit(1); - } - if (!compress && !filename.endsWith(SUFFIX)) { - System.err.println("File '" + filename + "' does end with expected suffix ('" + SUFFIX + "', won't decompress."); - System.exit(1); - } - byte[] data = readData(src); - System.out.println("Read " + data.length + " bytes."); - byte[] result = compress ? LZFEncoder.encode(data) : LZFDecoder.decode(data); - System.out.println("Processed into " + result.length + " bytes."); - File resultFile = compress ? new File(filename + SUFFIX) : new File(filename.substring(0, filename.length() - SUFFIX.length())); - FileOutputStream out = new FileOutputStream(resultFile); - out.write(result); - out.close(); - System.out.println("Wrote in file '" + resultFile.getAbsolutePath() + "'."); - return; - } - } - System.err.println("Usage: java " + getClass().getName() + " -c/-d file"); - System.exit(1); - } - - private byte[] readData(File in) throws IOException { - int len = (int) in.length(); - byte[] result = new byte[len]; - int offset = 0; - FileInputStream fis = new FileInputStream(in); - - while (len > 0) { - int count = fis.read(result, offset, len); - if (count < 0) break; - len -= count; - offset += count; - } - fis.close(); - if (len > 0) { // should never occur... - throw new IOException("Could not read the whole file -- received EOF when there was " + len + " bytes left to read"); - } - return result; - } - - public static void main(String[] args) throws IOException { - new LZF().process(args); - } -} - diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/LZFChunk.java b/src/main/java/org/elasticsearch/common/compress/lzf/LZFChunk.java deleted file mode 100644 index 13ee6b9c722..00000000000 --- a/src/main/java/org/elasticsearch/common/compress/lzf/LZFChunk.java +++ /dev/null @@ -1,125 +0,0 @@ -/* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this - * file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under - * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - * OF ANY KIND, either express or implied. See the License for the specific language - * governing permissions and limitations under the License. - */ - -package org.elasticsearch.common.compress.lzf; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * Helper class used to store LZF encoded segments (compressed and non-compressed) - * that can be sequenced to produce LZF files/streams. - * - * @author tatu@ning.com - */ -public class LZFChunk { - /** - * Maximum length of literal run for LZF encoding. - */ - public static final int MAX_LITERAL = 1 << 5; // 32 - - // Chunk length is limited by 2-byte length indicator, to 64k - public static final int MAX_CHUNK_LEN = 0xFFFF; - - /** - * Header can be either 7 bytes (compressed) or 5 bytes (uncompressed) - * long - */ - public static final int MAX_HEADER_LEN = 7; - - public final static byte BYTE_Z = 'Z'; - public final static byte BYTE_V = 'V'; - - public final static int BLOCK_TYPE_NON_COMPRESSED = 0; - public final static int BLOCK_TYPE_COMPRESSED = 1; - - - protected final byte[] _data; - protected LZFChunk _next; - - private LZFChunk(byte[] data) { - _data = data; - } - - /** - * Factory method for constructing compressed chunk - */ - public static LZFChunk createCompressed(int origLen, byte[] encData, int encPtr, int encLen) { - byte[] result = new byte[encLen + 7]; - result[0] = BYTE_Z; - result[1] = BYTE_V; - result[2] = BLOCK_TYPE_COMPRESSED; - result[3] = (byte) (encLen >> 8); - result[4] = (byte) encLen; - result[5] = (byte) (origLen >> 8); - result[6] = (byte) origLen; - System.arraycopy(encData, encPtr, result, 7, encLen); - return new LZFChunk(result); - } - - public static void writeCompressedHeader(int origLen, int encLen, OutputStream out, byte[] headerBuffer) - throws IOException { - headerBuffer[0] = BYTE_Z; - headerBuffer[1] = BYTE_V; - headerBuffer[2] = BLOCK_TYPE_COMPRESSED; - headerBuffer[3] = (byte) (encLen >> 8); - headerBuffer[4] = (byte) encLen; - headerBuffer[5] = (byte) (origLen >> 8); - headerBuffer[6] = (byte) origLen; - out.write(headerBuffer, 0, 7); - } - - /** - * Factory method for constructing compressed chunk - */ - public static LZFChunk createNonCompressed(byte[] plainData, int ptr, int len) { - byte[] result = new byte[len + 5]; - result[0] = BYTE_Z; - result[1] = BYTE_V; - result[2] = BLOCK_TYPE_NON_COMPRESSED; - result[3] = (byte) (len >> 8); - result[4] = (byte) len; - System.arraycopy(plainData, ptr, result, 5, len); - return new LZFChunk(result); - } - - public static void writeNonCompressedHeader(int len, OutputStream out, byte[] headerBuffer) - throws IOException { - headerBuffer[0] = BYTE_Z; - headerBuffer[1] = BYTE_V; - headerBuffer[2] = BLOCK_TYPE_NON_COMPRESSED; - headerBuffer[3] = (byte) (len >> 8); - headerBuffer[4] = (byte) len; - out.write(headerBuffer, 0, 5); - } - - public void setNext(LZFChunk next) { - _next = next; - } - - public LZFChunk next() { - return _next; - } - - public int length() { - return _data.length; - } - - public byte[] getData() { - return _data; - } - - public int copyTo(byte[] dst, int ptr) { - int len = _data.length; - System.arraycopy(_data, 0, dst, ptr, len); - return ptr + len; - } -} diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedStreamInput.java b/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedStreamInput.java new file mode 100644 index 00000000000..0334c321441 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedStreamInput.java @@ -0,0 +1,74 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.compress.lzf; + +import com.ning.compress.BufferRecycler; +import com.ning.compress.lzf.ChunkDecoder; +import com.ning.compress.lzf.LZFChunk; +import org.elasticsearch.common.compress.CompressedStreamInput; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; +import java.io.InputStream; + +/** + */ +public class LZFCompressedStreamInput extends CompressedStreamInput { + + private final BufferRecycler recycler; + + private final com.ning.compress.lzf.ChunkDecoder decoder; + + // scratch area buffer + private byte[] inputBuffer; + + public LZFCompressedStreamInput(StreamInput in, ChunkDecoder decoder) throws IOException { + super(in); + this.recycler = BufferRecycler.instance(); + this.decoder = decoder; + + this.uncompressed = recycler.allocDecodeBuffer(LZFChunk.MAX_CHUNK_LEN); + this.inputBuffer = recycler.allocInputBuffer(LZFChunk.MAX_CHUNK_LEN); + } + + @Override + public void readHeader(StreamInput in) throws IOException { + // nothing to do here, each chunk has a header + } + + @Override + public int uncompress(InputStream in, byte[] out) throws IOException { + return decoder.decodeChunk(in, inputBuffer, out); + } + + @Override + protected void doClose() throws IOException { + byte[] buf = inputBuffer; + if (buf != null) { + inputBuffer = null; + recycler.releaseInputBuffer(buf); + } + buf = uncompressed; + if (buf != null) { + uncompressed = null; + recycler.releaseDecodeBuffer(uncompressed); + } + } +} diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedStreamOutput.java b/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedStreamOutput.java new file mode 100644 index 00000000000..a8847547e75 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedStreamOutput.java @@ -0,0 +1,63 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.compress.lzf; + +import com.ning.compress.BufferRecycler; +import com.ning.compress.lzf.ChunkEncoder; +import com.ning.compress.lzf.LZFChunk; +import org.elasticsearch.common.compress.CompressedStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + */ +public class LZFCompressedStreamOutput extends CompressedStreamOutput { + + private final BufferRecycler recycler; + private final com.ning.compress.lzf.ChunkEncoder encoder; + + public LZFCompressedStreamOutput(StreamOutput out) throws IOException { + super(out); + this.recycler = BufferRecycler.instance(); + this.uncompressed = this.recycler.allocOutputBuffer(LZFChunk.MAX_CHUNK_LEN); + this.encoder = new ChunkEncoder(LZFChunk.MAX_CHUNK_LEN); + } + + @Override + public void writeHeader(StreamOutput out) throws IOException { + // nothing to do here, each chunk has a header of its own + } + + @Override + protected void compress(byte[] data, int offset, int len, StreamOutput out) throws IOException { + encoder.encodeAndWriteChunk(data, offset, len, out); + } + + @Override + protected void doClose() throws IOException { + byte[] buf = uncompressed; + if (buf != null) { + uncompressed = null; + recycler.releaseOutputBuffer(buf); + } + encoder.close(); + } +} diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressor.java b/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressor.java new file mode 100644 index 00000000000..93dec24bf7f --- /dev/null +++ b/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressor.java @@ -0,0 +1,91 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.compress.lzf; + +import com.ning.compress.lzf.ChunkDecoder; +import com.ning.compress.lzf.LZFChunk; +import com.ning.compress.lzf.LZFEncoder; +import com.ning.compress.lzf.util.ChunkDecoderFactory; +import org.elasticsearch.common.compress.CompressedStreamInput; +import org.elasticsearch.common.compress.CompressedStreamOutput; +import org.elasticsearch.common.compress.Compressor; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.logging.Loggers; +import org.jboss.netty.buffer.ChannelBuffer; + +import java.io.IOException; + +/** + */ +public class LZFCompressor implements Compressor { + + public static final String TYPE = "lzf"; + + private ChunkDecoder decoder; + + public LZFCompressor() { + this.decoder = ChunkDecoderFactory.optimalInstance(); + Loggers.getLogger(LZFCompressor.class).debug("using [{}] decoder", this.decoder.getClass().getSimpleName()); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public boolean isCompressed(byte[] data, int offset, int length) { + return length >= 3 && + data[offset] == LZFChunk.BYTE_Z && + data[offset + 1] == LZFChunk.BYTE_V && + (data[offset + 2] == LZFChunk.BLOCK_TYPE_COMPRESSED || data[offset + 2] == LZFChunk.BLOCK_TYPE_NON_COMPRESSED); + } + + @Override + public boolean isCompressed(ChannelBuffer buffer) { + int offset = buffer.readerIndex(); + return buffer.readableBytes() >= 3 && + buffer.getByte(offset) == LZFChunk.BYTE_Z && + buffer.getByte(offset + 1) == LZFChunk.BYTE_V && + (buffer.getByte(offset + 2) == LZFChunk.BLOCK_TYPE_COMPRESSED || buffer.getByte(offset + 2) == LZFChunk.BLOCK_TYPE_NON_COMPRESSED); + } + + @Override + public byte[] uncompress(byte[] data, int offset, int length) throws IOException { + return decoder.decode(data, offset, length); + } + + @Override + public byte[] compress(byte[] data, int offset, int length) throws IOException { + return LZFEncoder.encode(data, offset, length); + } + + @Override + public CompressedStreamInput streamInput(StreamInput in) throws IOException { + return new LZFCompressedStreamInput(in, decoder); + } + + @Override + public CompressedStreamOutput streamOutput(StreamOutput out) throws IOException { + return new LZFCompressedStreamOutput(out); + } + +} diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/LZFDecoder.java b/src/main/java/org/elasticsearch/common/compress/lzf/LZFDecoder.java deleted file mode 100644 index 784137e053a..00000000000 --- a/src/main/java/org/elasticsearch/common/compress/lzf/LZFDecoder.java +++ /dev/null @@ -1,55 +0,0 @@ -/* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this - * file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under - * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - * OF ANY KIND, either express or implied. See the License for the specific language - * governing permissions and limitations under the License. - */ - -package org.elasticsearch.common.compress.lzf; - -import org.elasticsearch.common.compress.lzf.util.ChunkDecoderFactory; - -import java.io.IOException; - -/** - * Decoder that handles decoding of sequence of encoded LZF chunks, - * combining them into a single contiguous result byte array. - * As of version 0.9, this class has been mostly replaced by - * {@link ChunkDecoder}, although static methods are left here - * and may still be used. - * All static methods use {@link ChunkDecoderFactory#optimalInstance} - * to find actual {@link ChunkDecoder} instance to use. - * - * @author Tatu Saloranta (tatu@ning.com) - */ -public class LZFDecoder { - /* - /////////////////////////////////////////////////////////////////////// - // Old API - /////////////////////////////////////////////////////////////////////// - */ - - public static byte[] decode(final byte[] inputBuffer) throws IOException { - return decode(inputBuffer, 0, inputBuffer.length); - } - - public static byte[] decode(final byte[] inputBuffer, int offset, int length) throws IOException { - return ChunkDecoderFactory.optimalInstance().decode(inputBuffer, offset, length); - } - - public static int decode(final byte[] inputBuffer, final byte[] targetBuffer) throws IOException { - return decode(inputBuffer, 0, inputBuffer.length, targetBuffer); - } - - public static int decode(final byte[] sourceBuffer, int offset, int length, final byte[] targetBuffer) throws IOException { - return ChunkDecoderFactory.optimalInstance().decode(sourceBuffer, offset, length, targetBuffer); - } - - public static int calculateUncompressedSize(byte[] data, int offset, int length) throws IOException { - return ChunkDecoder.calculateUncompressedSize(data, offset, length); - } -} diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java b/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java deleted file mode 100644 index ee619bf8b28..00000000000 --- a/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java +++ /dev/null @@ -1,96 +0,0 @@ -/* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this - * file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under - * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - * OF ANY KIND, either express or implied. See the License for the specific language - * governing permissions and limitations under the License. - */ - -package org.elasticsearch.common.compress.lzf; - -import java.io.IOException; - -/** - * Encoder that handles splitting of input into chunks to encode, - * calls {@link ChunkEncoder} to compress individual chunks and - * combines resulting chunks into contiguous output byte array. - * - * @author tatu@ning.com - */ -public class LZFEncoder { - // Static methods only, no point in instantiating - private LZFEncoder() { - } - - public static byte[] encode(byte[] data) throws IOException { - return encode(data, data.length); - } - - /** - * Method for compressing given input data using LZF encoding and - * block structure (compatible with lzf command line utility). - * Result consists of a sequence of chunks. - */ - public static byte[] encode(byte[] data, int length) throws IOException { - return encode(data, 0, length); - } - - /** - * Method for compressing given input data using LZF encoding and - * block structure (compatible with lzf command line utility). - * Result consists of a sequence of chunks. - * - * @since 0.8.1 - */ - public static byte[] encode(byte[] data, int offset, int length) throws IOException { - ChunkEncoder enc = new ChunkEncoder(length, BufferRecycler.instance()); - byte[] result = encode(enc, data, offset, length); - // important: may be able to reuse buffers - enc.close(); - return result; - } - - public static byte[] encode(ChunkEncoder enc, byte[] data, int length) - throws IOException { - return encode(enc, data, 0, length); - } - - /** - * @since 0.8.1 - */ - public static byte[] encode(ChunkEncoder enc, byte[] data, int offset, int length) - throws IOException { - int left = length; - int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left); - LZFChunk first = enc.encodeChunk(data, offset, chunkLen); - left -= chunkLen; - // shortcut: if it all fit in, no need to coalesce: - if (left < 1) { - return first.getData(); - } - // otherwise need to get other chunks: - int resultBytes = first.length(); - offset += chunkLen; - LZFChunk last = first; - - do { - chunkLen = Math.min(left, LZFChunk.MAX_CHUNK_LEN); - LZFChunk chunk = enc.encodeChunk(data, offset, chunkLen); - offset += chunkLen; - left -= chunkLen; - resultBytes += chunk.length(); - last.setNext(chunk); - last = chunk; - } while (left > 0); - // and then coalesce returns into single contiguous byte array - byte[] result = new byte[resultBytes]; - int ptr = 0; - for (; first != null; first = first.next()) { - ptr = first.copyTo(result, ptr); - } - return result; - } -} diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/impl/UnsafeChunkDecoder.java b/src/main/java/org/elasticsearch/common/compress/lzf/impl/UnsafeChunkDecoder.java deleted file mode 100755 index 14738bf4674..00000000000 --- a/src/main/java/org/elasticsearch/common/compress/lzf/impl/UnsafeChunkDecoder.java +++ /dev/null @@ -1,251 +0,0 @@ -package org.elasticsearch.common.compress.lzf.impl; - -import org.elasticsearch.common.compress.lzf.ChunkDecoder; -import org.elasticsearch.common.compress.lzf.LZFChunk; -import sun.misc.Unsafe; - -import java.io.IOException; -import java.io.InputStream; -import java.lang.reflect.Field; - -/** - * Highly optimized {@link ChunkDecoder} implementation that uses - * Sun JDK's Unsafe class (which may be included by other JDK's as well; - * IBM's apparently does). - *

- * Credits for the idea go to Dain Sundstrom, who kindly suggested this use, - * and is all-around great source for optimization tips and tricks. - */ -@SuppressWarnings("restriction") -public class UnsafeChunkDecoder extends ChunkDecoder { - private static final Unsafe unsafe; - - static { - try { - Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); - theUnsafe.setAccessible(true); - unsafe = (Unsafe) theUnsafe.get(null); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private static final long BYTE_ARRAY_OFFSET = unsafe.arrayBaseOffset(byte[].class); -// private static final long SHORT_ARRAY_OFFSET = unsafe.arrayBaseOffset(short[].class); -// private static final long SHORT_ARRAY_STRIDE = unsafe.arrayIndexScale(short[].class); - - public UnsafeChunkDecoder() { - } - - @Override - public final int decodeChunk(final InputStream is, final byte[] inputBuffer, final byte[] outputBuffer) - throws IOException { - int bytesInOutput; - /* note: we do NOT read more than 5 bytes because otherwise might need to shuffle bytes - * for output buffer (could perhaps optimize in future?) - */ - int bytesRead = readHeader(is, inputBuffer); - if ((bytesRead < HEADER_BYTES) - || inputBuffer[0] != LZFChunk.BYTE_Z || inputBuffer[1] != LZFChunk.BYTE_V) { - if (bytesRead == 0) { // probably fine, clean EOF - return -1; - } - throw new IOException("Corrupt input data, block did not start with 2 byte signature ('ZV') followed by type byte, 2-byte length)"); - } - int type = inputBuffer[2]; - int compLen = uint16(inputBuffer, 3); - if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed - readFully(is, false, outputBuffer, 0, compLen); - bytesInOutput = compLen; - } else { // compressed - readFully(is, true, inputBuffer, 0, 2 + compLen); // first 2 bytes are uncompressed length - int uncompLen = uint16(inputBuffer, 0); - decodeChunk(inputBuffer, 2, outputBuffer, 0, uncompLen); - bytesInOutput = uncompLen; - } - return bytesInOutput; - } - - @Override - public final void decodeChunk(byte[] in, int inPos, byte[] out, int outPos, int outEnd) - throws IOException { - main_loop: - do { - int ctrl = in[inPos++] & 255; - while (ctrl < LZFChunk.MAX_LITERAL) { // literal run(s) - copyUpTo32(in, inPos, out, outPos, ctrl); - ++ctrl; - inPos += ctrl; - outPos += ctrl; - if (outPos >= outEnd) { - break main_loop; - } - ctrl = in[inPos++] & 255; - } - // back reference - int len = ctrl >> 5; - ctrl = -((ctrl & 0x1f) << 8) - 1; - // short back reference? 2 bytes; run lengths of 2 - 8 bytes - if (len < 7) { - ctrl -= in[inPos++] & 255; - if (ctrl < -7) { // non-overlapping? can use efficient bulk copy - moveLong(out, outPos, outEnd, ctrl); - outPos += len + 2; - continue; - } - // otherwise, byte-by-byte - outPos = copyOverlappingShort(out, outPos, ctrl, len); - continue; - } - // long back reference: 3 bytes, length of up to 264 bytes - len = in[inPos++] & 255; - ctrl -= in[inPos++] & 255; - // First: ovelapping case can't use default handling, off line: - if ((ctrl + len) >= -9) { - outPos = copyOverlappingLong(out, outPos, ctrl, len); - continue; - } - // but non-overlapping is simple - len += 9; - if (len <= 32) { - copyUpTo32(out, outPos + ctrl, out, outPos, len - 1); - } else { - System.arraycopy(out, outPos + ctrl, out, outPos, len); - } - outPos += len; - } while (outPos < outEnd); - - // sanity check to guard against corrupt data: - if (outPos != outEnd) - throw new IOException("Corrupt data: overrun in decompress, input offset " + inPos + ", output offset " + outPos); - } - - /* - /////////////////////////////////////////////////////////////////////// - // Internal methods - /////////////////////////////////////////////////////////////////////// - */ - - private final int copyOverlappingShort(final byte[] out, int outPos, final int offset, int len) { - out[outPos] = out[outPos++ + offset]; - out[outPos] = out[outPos++ + offset]; - switch (len) { - case 6: - out[outPos] = out[outPos++ + offset]; - case 5: - out[outPos] = out[outPos++ + offset]; - case 4: - out[outPos] = out[outPos++ + offset]; - case 3: - out[outPos] = out[outPos++ + offset]; - case 2: - out[outPos] = out[outPos++ + offset]; - case 1: - out[outPos] = out[outPos++ + offset]; - } - return outPos; - } - - private final static int copyOverlappingLong(final byte[] out, int outPos, final int offset, int len) { - // otherwise manual copy: so first just copy 9 bytes we know are needed - out[outPos] = out[outPos++ + offset]; - out[outPos] = out[outPos++ + offset]; - out[outPos] = out[outPos++ + offset]; - out[outPos] = out[outPos++ + offset]; - out[outPos] = out[outPos++ + offset]; - out[outPos] = out[outPos++ + offset]; - out[outPos] = out[outPos++ + offset]; - out[outPos] = out[outPos++ + offset]; - out[outPos] = out[outPos++ + offset]; - - // then loop - // Odd: after extensive profiling, looks like magic number - // for unrolling is 4: with 8 performance is worse (even - // bit less than with no unrolling). - len += outPos; - final int end = len - 3; - while (outPos < end) { - out[outPos] = out[outPos++ + offset]; - out[outPos] = out[outPos++ + offset]; - out[outPos] = out[outPos++ + offset]; - out[outPos] = out[outPos++ + offset]; - } - switch (len - outPos) { - case 3: - out[outPos] = out[outPos++ + offset]; - case 2: - out[outPos] = out[outPos++ + offset]; - case 1: - out[outPos] = out[outPos++ + offset]; - } - return outPos; - } - - /* Note: 'delta' is negative (back ref); dataEnd is the first location AFTER - * end of expected uncompressed data (i.e. end marker) - */ - private final static void moveLong(byte[] data, int resultOffset, int dataEnd, int delta) { - if ((resultOffset + 8) < dataEnd) { - final long rawOffset = BYTE_ARRAY_OFFSET + resultOffset; - long value = unsafe.getLong(data, rawOffset + delta); - unsafe.putLong(data, rawOffset, value); - return; - } - System.arraycopy(data, resultOffset + delta, data, resultOffset, data.length - resultOffset); - } - - private final static void copyUpTo32(byte[] in, int inputIndex, byte[] out, int outputIndex, int lengthMinusOne) { - if ((outputIndex + 32) > out.length) { - System.arraycopy(in, inputIndex, out, outputIndex, lengthMinusOne + 1); - return; - } - long inPtr = BYTE_ARRAY_OFFSET + inputIndex; - long outPtr = BYTE_ARRAY_OFFSET + outputIndex; - - switch (lengthMinusOne >>> 3) { - case 3: { - long value = unsafe.getLong(in, inPtr); - unsafe.putLong(out, outPtr, value); - inPtr += 8; - outPtr += 8; - value = unsafe.getLong(in, inPtr); - unsafe.putLong(out, outPtr, value); - inPtr += 8; - outPtr += 8; - value = unsafe.getLong(in, inPtr); - unsafe.putLong(out, outPtr, value); - inPtr += 8; - outPtr += 8; - value = unsafe.getLong(in, inPtr); - unsafe.putLong(out, outPtr, value); - } - break; - case 2: { - long value = unsafe.getLong(in, inPtr); - unsafe.putLong(out, outPtr, value); - inPtr += 8; - outPtr += 8; - value = unsafe.getLong(in, inPtr); - unsafe.putLong(out, outPtr, value); - inPtr += 8; - outPtr += 8; - value = unsafe.getLong(in, inPtr); - unsafe.putLong(out, outPtr, value); - } - break; - case 1: { - long value = unsafe.getLong(in, inPtr); - unsafe.putLong(out, outPtr, value); - inPtr += 8; - outPtr += 8; - value = unsafe.getLong(in, inPtr); - unsafe.putLong(out, outPtr, value); - } - break; - case 0: { - long value = unsafe.getLong(in, inPtr); - unsafe.putLong(out, outPtr, value); - } - } - } -} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/impl/VanillaChunkDecoder.java b/src/main/java/org/elasticsearch/common/compress/lzf/impl/VanillaChunkDecoder.java deleted file mode 100755 index d74034de5f5..00000000000 --- a/src/main/java/org/elasticsearch/common/compress/lzf/impl/VanillaChunkDecoder.java +++ /dev/null @@ -1,274 +0,0 @@ -package org.elasticsearch.common.compress.lzf.impl; - -import org.elasticsearch.common.compress.lzf.ChunkDecoder; -import org.elasticsearch.common.compress.lzf.LZFChunk; - -import java.io.IOException; -import java.io.InputStream; - -/** - * Safe {@link ChunkDecoder} implementation that can be used on any - * platform. - */ -public class VanillaChunkDecoder extends ChunkDecoder { - public VanillaChunkDecoder() { - } - - @Override - public final int decodeChunk(final InputStream is, final byte[] inputBuffer, final byte[] outputBuffer) - throws IOException { - int bytesInOutput; - /* note: we do NOT read more than 5 bytes because otherwise might need to shuffle bytes - * for output buffer (could perhaps optimize in future?) - */ - int bytesRead = readHeader(is, inputBuffer); - if ((bytesRead < HEADER_BYTES) - || inputBuffer[0] != LZFChunk.BYTE_Z || inputBuffer[1] != LZFChunk.BYTE_V) { - if (bytesRead == 0) { // probably fine, clean EOF - return -1; - } - throw new IOException("Corrupt input data, block did not start with 2 byte signature ('ZV') followed by type byte, 2-byte length)"); - } - int type = inputBuffer[2]; - int compLen = uint16(inputBuffer, 3); - if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed - readFully(is, false, outputBuffer, 0, compLen); - bytesInOutput = compLen; - } else { // compressed - readFully(is, true, inputBuffer, 0, 2 + compLen); // first 2 bytes are uncompressed length - int uncompLen = uint16(inputBuffer, 0); - decodeChunk(inputBuffer, 2, outputBuffer, 0, uncompLen); - bytesInOutput = uncompLen; - } - return bytesInOutput; - } - - @Override - public final void decodeChunk(byte[] in, int inPos, byte[] out, int outPos, int outEnd) - throws IOException { - do { - int ctrl = in[inPos++] & 255; - if (ctrl < LZFChunk.MAX_LITERAL) { // literal run - switch (ctrl) { - case 31: - out[outPos++] = in[inPos++]; - case 30: - out[outPos++] = in[inPos++]; - case 29: - out[outPos++] = in[inPos++]; - case 28: - out[outPos++] = in[inPos++]; - case 27: - out[outPos++] = in[inPos++]; - case 26: - out[outPos++] = in[inPos++]; - case 25: - out[outPos++] = in[inPos++]; - case 24: - out[outPos++] = in[inPos++]; - case 23: - out[outPos++] = in[inPos++]; - case 22: - out[outPos++] = in[inPos++]; - case 21: - out[outPos++] = in[inPos++]; - case 20: - out[outPos++] = in[inPos++]; - case 19: - out[outPos++] = in[inPos++]; - case 18: - out[outPos++] = in[inPos++]; - case 17: - out[outPos++] = in[inPos++]; - case 16: - out[outPos++] = in[inPos++]; - case 15: - out[outPos++] = in[inPos++]; - case 14: - out[outPos++] = in[inPos++]; - case 13: - out[outPos++] = in[inPos++]; - case 12: - out[outPos++] = in[inPos++]; - case 11: - out[outPos++] = in[inPos++]; - case 10: - out[outPos++] = in[inPos++]; - case 9: - out[outPos++] = in[inPos++]; - case 8: - out[outPos++] = in[inPos++]; - case 7: - out[outPos++] = in[inPos++]; - case 6: - out[outPos++] = in[inPos++]; - case 5: - out[outPos++] = in[inPos++]; - case 4: - out[outPos++] = in[inPos++]; - case 3: - out[outPos++] = in[inPos++]; - case 2: - out[outPos++] = in[inPos++]; - case 1: - out[outPos++] = in[inPos++]; - case 0: - out[outPos++] = in[inPos++]; - } - continue; - } - // back reference - int len = ctrl >> 5; - ctrl = -((ctrl & 0x1f) << 8) - 1; - if (len < 7) { // 2 bytes; length of 3 - 8 bytes - ctrl -= in[inPos++] & 255; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - switch (len) { - case 6: - out[outPos] = out[outPos++ + ctrl]; - case 5: - out[outPos] = out[outPos++ + ctrl]; - case 4: - out[outPos] = out[outPos++ + ctrl]; - case 3: - out[outPos] = out[outPos++ + ctrl]; - case 2: - out[outPos] = out[outPos++ + ctrl]; - case 1: - out[outPos] = out[outPos++ + ctrl]; - } - continue; - } - - // long version (3 bytes, length of up to 264 bytes) - len = in[inPos++] & 255; - ctrl -= in[inPos++] & 255; - - // First: if there is no overlap, can just use arraycopy: - if ((ctrl + len) < -9) { - len += 9; - if (len <= 32) { - copyUpTo32WithSwitch(out, outPos + ctrl, out, outPos, len - 1); - } else { - System.arraycopy(out, outPos + ctrl, out, outPos, len); - } - outPos += len; - continue; - } - - // otherwise manual copy: so first just copy 9 bytes we know are needed - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - - // then loop - // Odd: after extensive profiling, looks like magic number - // for unrolling is 4: with 8 performance is worse (even - // bit less than with no unrolling). - len += outPos; - final int end = len - 3; - while (outPos < end) { - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - out[outPos] = out[outPos++ + ctrl]; - } - switch (len - outPos) { - case 3: - out[outPos] = out[outPos++ + ctrl]; - case 2: - out[outPos] = out[outPos++ + ctrl]; - case 1: - out[outPos] = out[outPos++ + ctrl]; - } - } while (outPos < outEnd); - - // sanity check to guard against corrupt data: - if (outPos != outEnd) - throw new IOException("Corrupt data: overrun in decompress, input offset " + inPos + ", output offset " + outPos); - } - - /* - /////////////////////////////////////////////////////////////////////// - // Internal methods - /////////////////////////////////////////////////////////////////////// - */ - - protected static final void copyUpTo32WithSwitch(byte[] in, int inPos, byte[] out, int outPos, - int lengthMinusOne) { - switch (lengthMinusOne) { - case 31: - out[outPos++] = in[inPos++]; - case 30: - out[outPos++] = in[inPos++]; - case 29: - out[outPos++] = in[inPos++]; - case 28: - out[outPos++] = in[inPos++]; - case 27: - out[outPos++] = in[inPos++]; - case 26: - out[outPos++] = in[inPos++]; - case 25: - out[outPos++] = in[inPos++]; - case 24: - out[outPos++] = in[inPos++]; - case 23: - out[outPos++] = in[inPos++]; - case 22: - out[outPos++] = in[inPos++]; - case 21: - out[outPos++] = in[inPos++]; - case 20: - out[outPos++] = in[inPos++]; - case 19: - out[outPos++] = in[inPos++]; - case 18: - out[outPos++] = in[inPos++]; - case 17: - out[outPos++] = in[inPos++]; - case 16: - out[outPos++] = in[inPos++]; - case 15: - out[outPos++] = in[inPos++]; - case 14: - out[outPos++] = in[inPos++]; - case 13: - out[outPos++] = in[inPos++]; - case 12: - out[outPos++] = in[inPos++]; - case 11: - out[outPos++] = in[inPos++]; - case 10: - out[outPos++] = in[inPos++]; - case 9: - out[outPos++] = in[inPos++]; - case 8: - out[outPos++] = in[inPos++]; - case 7: - out[outPos++] = in[inPos++]; - case 6: - out[outPos++] = in[inPos++]; - case 5: - out[outPos++] = in[inPos++]; - case 4: - out[outPos++] = in[inPos++]; - case 3: - out[outPos++] = in[inPos++]; - case 2: - out[outPos++] = in[inPos++]; - case 1: - out[outPos++] = in[inPos++]; - case 0: - out[outPos++] = in[inPos++]; - } - } - -} diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/package-info.java b/src/main/java/org/elasticsearch/common/compress/lzf/package-info.java deleted file mode 100644 index f1904f2acf6..00000000000 --- a/src/main/java/org/elasticsearch/common/compress/lzf/package-info.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * Copy of LZF code from ning compress based on 0.7 version. - * - * Changes: - * - * 1. - */ -package org.elasticsearch.common.compress.lzf; \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/util/ChunkDecoderFactory.java b/src/main/java/org/elasticsearch/common/compress/lzf/util/ChunkDecoderFactory.java deleted file mode 100755 index 09140e81682..00000000000 --- a/src/main/java/org/elasticsearch/common/compress/lzf/util/ChunkDecoderFactory.java +++ /dev/null @@ -1,75 +0,0 @@ -package org.elasticsearch.common.compress.lzf.util; - -import org.elasticsearch.common.Booleans; -import org.elasticsearch.common.compress.lzf.ChunkDecoder; -import org.elasticsearch.common.compress.lzf.impl.UnsafeChunkDecoder; -import org.elasticsearch.common.compress.lzf.impl.VanillaChunkDecoder; - -/** - * Simple helper class used for loading - * {@link ChunkDecoder} implementations, based on criteria - * such as "fastest available". - *

- * Yes, it looks butt-ugly, but does the job. Nonetheless, if anyone - * has lipstick for this pig, let me know. - * - * @since 0.9 - */ -public class ChunkDecoderFactory { - private final static ChunkDecoderFactory _instance; - - static { - Class impl = null; - try { - // first, try loading optimal one, which uses Sun JDK Unsafe... - impl = (Class) Class.forName(UnsafeChunkDecoder.class.getName()); - } catch (Throwable t) { - } - if (impl == null) { - impl = VanillaChunkDecoder.class; - } - // ES: Seems like: https://github.com/ning/compress/issues/13, is fixed, so enable by defualt, but only from 0.19 - if (!Booleans.parseBoolean(System.getProperty("compress.lzf.decoder.optimized"), true)) { - impl = VanillaChunkDecoder.class; - } - _instance = new ChunkDecoderFactory(impl); - } - - private final Class _implClass; - - @SuppressWarnings("unchecked") - private ChunkDecoderFactory(Class imp) { - _implClass = (Class) imp; - } - - /* - /////////////////////////////////////////////////////////////////////// - // Public API - /////////////////////////////////////////////////////////////////////// - */ - - /** - * Method to use for getting decompressor instance that uses the most optimal - * available methods for underlying data access. It should be safe to call - * this method as implementations are dynamically loaded; however, on some - * non-standard platforms it may be necessary to either directly load - * instances, or use {@link #safeInstance()}. - */ - public static ChunkDecoder optimalInstance() { - try { - return _instance._implClass.newInstance(); - } catch (Exception e) { - throw new IllegalStateException("Failed to load a ChunkDecoder instance (" + e.getClass().getName() + "): " - + e.getMessage(), e); - } - } - - /** - * Method that can be used to ensure that a "safe" decompressor instance is loaded. - * Safe here means that it should work on any and all Java platforms. - */ - public static ChunkDecoder safeInstance() { - // this will always succeed loading; no need to use dynamic class loading or instantiation - return new VanillaChunkDecoder(); - } -} diff --git a/src/main/java/org/elasticsearch/common/io/stream/CachedStreamInput.java b/src/main/java/org/elasticsearch/common/io/stream/CachedStreamInput.java index ef5d867608c..894c0036251 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/CachedStreamInput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/CachedStreamInput.java @@ -19,6 +19,8 @@ package org.elasticsearch.common.io.stream; +import org.elasticsearch.common.compress.Compressor; + import java.io.IOException; import java.lang.ref.SoftReference; @@ -30,11 +32,9 @@ public class CachedStreamInput { static class Entry { char[] chars = new char[80]; final HandlesStreamInput handles; - final LZFStreamInput lzf; - Entry(HandlesStreamInput handles, LZFStreamInput lzf) { + Entry(HandlesStreamInput handles) { this.handles = handles; - this.lzf = lzf; } } @@ -45,8 +45,7 @@ public class CachedStreamInput { Entry entry = ref == null ? null : ref.get(); if (entry == null) { HandlesStreamInput handles = new HandlesStreamInput(); - LZFStreamInput lzf = new LZFStreamInput(null, true); - entry = new Entry(handles, lzf); + entry = new Entry(handles); cache.set(new SoftReference(entry)); } return entry; @@ -56,10 +55,8 @@ public class CachedStreamInput { cache.remove(); } - public static LZFStreamInput cachedLzf(StreamInput in) throws IOException { - LZFStreamInput lzf = instance().lzf; - lzf.reset(in); - return lzf; + public static StreamInput compressed(Compressor compressor, StreamInput in) throws IOException { + return compressor.streamInput(in); } public static HandlesStreamInput cachedHandles(StreamInput in) { @@ -68,10 +65,9 @@ public class CachedStreamInput { return handles; } - public static HandlesStreamInput cachedHandlesLzf(StreamInput in) throws IOException { + public static HandlesStreamInput cachedHandlesCompressed(Compressor compressor, StreamInput in) throws IOException { Entry entry = instance(); - entry.lzf.reset(in); - entry.handles.reset(entry.lzf); + entry.handles.reset(compressor.streamInput(in)); return entry.handles; } diff --git a/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java b/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java index 65c5075110e..3436e2a9a48 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java @@ -20,6 +20,7 @@ package org.elasticsearch.common.io.stream; import jsr166y.LinkedTransferQueue; +import org.elasticsearch.common.compress.Compressor; import java.io.IOException; import java.lang.ref.SoftReference; @@ -40,22 +41,12 @@ public class CachedStreamOutput { public static class Entry { private final BytesStreamOutput bytes; private final HandlesStreamOutput handles; - private LZFStreamOutput lzf; Entry(BytesStreamOutput bytes, HandlesStreamOutput handles) { this.bytes = bytes; this.handles = handles; } - // lazily initialize LZF, so we won't allocate it if we don't do - // any compression - private LZFStreamOutput lzf() { - if (lzf == null) { - lzf = new LZFStreamOutput(bytes, true); - } - return lzf; - } - /** * Returns the underlying bytes without any resetting. */ @@ -71,20 +62,20 @@ public class CachedStreamOutput { return bytes; } - public LZFStreamOutput cachedLZFBytes() throws IOException { - LZFStreamOutput lzf = lzf(); - lzf.reset(); - return lzf; - } - - public HandlesStreamOutput cachedHandlesLzfBytes() throws IOException { - LZFStreamOutput lzf = lzf(); - handles.reset(lzf); + public StreamOutput cachedHandles() throws IOException { + handles.reset(bytes); return handles; } - public HandlesStreamOutput cachedHandlesBytes() throws IOException { - handles.reset(bytes); + public StreamOutput cachedBytes(Compressor compressor) throws IOException { + bytes.reset(); + return compressor.streamOutput(bytes); + } + + public StreamOutput cachedHandles(Compressor compressor) throws IOException { + bytes.reset(); + StreamOutput compressed = compressor.streamOutput(bytes); + handles.reset(compressed); return handles; } } diff --git a/src/main/java/org/elasticsearch/common/io/stream/LZFStreamInput.java b/src/main/java/org/elasticsearch/common/io/stream/LZFStreamInput.java deleted file mode 100644 index be80df1a5e5..00000000000 --- a/src/main/java/org/elasticsearch/common/io/stream/LZFStreamInput.java +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io.stream; - -import org.elasticsearch.common.compress.lzf.BufferRecycler; -import org.elasticsearch.common.compress.lzf.ChunkDecoder; -import org.elasticsearch.common.compress.lzf.LZFChunk; -import org.elasticsearch.common.compress.lzf.util.ChunkDecoderFactory; - -import java.io.EOFException; -import java.io.IOException; - -/** - * - */ -public class LZFStreamInput extends StreamInput { - /** - * Underlying decoder in use. - */ - private final ChunkDecoder _decoder; - - /** - * Object that handles details of buffer recycling - */ - private final BufferRecycler _recycler; - - /** - * stream to be decompressed - */ - protected StreamInput inputStream; - - /** - * Flag that indicates if we have already called 'inputStream.close()' - * (to avoid calling it multiple times) - */ - protected boolean inputStreamClosed; - - /** - * Flag that indicates whether we force full reads (reading of as many - * bytes as requested), or 'optimal' reads (up to as many as available, - * but at least one). Default is false, meaning that 'optimal' read - * is used. - */ - protected boolean _cfgFullReads = true; // ES: ALWAYS TRUE since we need to throw EOF when doing readBytes - - /* the current buffer of compressed bytes (from which to decode) */ - private byte[] _inputBuffer; - - /* the buffer of uncompressed bytes from which content is read */ - private byte[] _decodedBytes; - - /* The current position (next char to output) in the uncompressed bytes buffer. */ - private int bufferPosition = 0; - - /* Length of the current uncompressed bytes buffer */ - private int bufferLength = 0; - - // ES: added to support never closing just resetting - private final boolean cached; - - public LZFStreamInput(StreamInput in, boolean cached) { - super(); - this.cached = cached; - if (cached) { - _recycler = new BufferRecycler(); - } else { - _recycler = BufferRecycler.instance(); - } - _decoder = ChunkDecoderFactory.optimalInstance(); - inputStream = in; - inputStreamClosed = false; - - _inputBuffer = _recycler.allocInputBuffer(LZFChunk.MAX_CHUNK_LEN); - _decodedBytes = _recycler.allocDecodeBuffer(LZFChunk.MAX_CHUNK_LEN); - } - - /** - * Method is overridden to report number of bytes that can now be read - * from decoded data buffer, without reading bytes from the underlying - * stream. - * Never throws an exception; returns number of bytes available without - * further reads from underlying source; -1 if stream has been closed, or - * 0 if an actual read (and possible blocking) is needed to find out. - */ - @Override - public int available() { - // if closed, return -1; - if (inputStreamClosed) { - return -1; - } - int left = (bufferLength - bufferPosition); - return (left <= 0) ? 0 : left; - } - - @Override - public int read() throws IOException { - if (!readyBuffer()) { - return -1; - } - return _decodedBytes[bufferPosition++] & 255; - } - - @Override - public int read(final byte[] buffer, int offset, int length) throws IOException { - if (length < 1) { - return 0; - } - if (!readyBuffer()) { - return -1; - } - // First let's read however much data we happen to have... - int chunkLength = Math.min(bufferLength - bufferPosition, length); - System.arraycopy(_decodedBytes, bufferPosition, buffer, offset, chunkLength); - bufferPosition += chunkLength; - - if (chunkLength == length || !_cfgFullReads) { - return chunkLength; - } - // Need more data, then - int totalRead = chunkLength; - do { - offset += chunkLength; - if (!readyBuffer()) { - break; - } - chunkLength = Math.min(bufferLength - bufferPosition, (length - totalRead)); - System.arraycopy(_decodedBytes, bufferPosition, buffer, offset, chunkLength); - bufferPosition += chunkLength; - totalRead += chunkLength; - } while (totalRead < length); - - return totalRead; - } - - @Override - public byte readByte() throws IOException { - if (!readyBuffer()) { - throw new EOFException(); - } - return _decodedBytes[bufferPosition++]; - } - - @Override - public void readBytes(byte[] b, int offset, int len) throws IOException { - int result = read(b, offset, len); - if (result < len) { - throw new EOFException(); - } - } - - @Override - public void reset() throws IOException { - this.bufferPosition = 0; - this.bufferLength = 0; - inputStream.reset(); - } - - public void reset(StreamInput in) throws IOException { - this.inputStream = in; - this.bufferPosition = 0; - this.bufferLength = 0; - } - - /** - * Expert!, resets to buffer start, without the need to decompress it again. - */ - public void resetToBufferStart() { - this.bufferPosition = 0; - } - - @Override - public void close() throws IOException { - if (cached) { - reset(); - return; - } - bufferPosition = bufferLength = 0; - byte[] buf = _inputBuffer; - if (buf != null) { - _inputBuffer = null; - _recycler.releaseInputBuffer(buf); - } - buf = _decodedBytes; - if (buf != null) { - _decodedBytes = null; - _recycler.releaseDecodeBuffer(buf); - } - if (!inputStreamClosed) { - inputStreamClosed = true; - inputStream.close(); - } - } - - /* - /////////////////////////////////////////////////////////////////////// - // Internal methods - /////////////////////////////////////////////////////////////////////// - */ - - /** - * Fill the uncompressed bytes buffer by reading the underlying inputStream. - * - * @throws IOException - */ - protected boolean readyBuffer() throws IOException { - if (bufferPosition < bufferLength) { - return true; - } - if (inputStreamClosed) { - return false; - } - bufferLength = _decoder.decodeChunk(inputStream, _inputBuffer, _decodedBytes); - if (bufferLength < 0) { - return false; - } - bufferPosition = 0; - return (bufferPosition < bufferLength); - } -} diff --git a/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java b/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java deleted file mode 100644 index 62de85ee63e..00000000000 --- a/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io.stream; - -import org.elasticsearch.common.compress.lzf.BufferRecycler; -import org.elasticsearch.common.compress.lzf.ChunkEncoder; -import org.elasticsearch.common.compress.lzf.LZFChunk; - -import java.io.IOException; - -/** - * - */ -public class LZFStreamOutput extends StreamOutput { - - private static int OUTPUT_BUFFER_SIZE = LZFChunk.MAX_CHUNK_LEN; - - private final ChunkEncoder _encoder; - private final BufferRecycler _recycler; - - protected StreamOutput _outputStream; - protected byte[] _outputBuffer; - protected int _position = 0; - - - /** - * Configuration setting that governs whether basic 'flush()' should - * first complete a block or not. - *

- * Default value is 'true' - * - * @since 0.8 - */ - protected boolean _cfgFinishBlockOnFlush = true; - - private final boolean neverClose; - - public LZFStreamOutput(StreamOutput out, boolean neverClose) { - this.neverClose = neverClose; - _recycler = neverClose ? new BufferRecycler() : BufferRecycler.instance(); - _encoder = new ChunkEncoder(OUTPUT_BUFFER_SIZE, _recycler); - _outputStream = out; - _outputBuffer = _recycler.allocOutputBuffer(OUTPUT_BUFFER_SIZE); - } - - @Override - public void write(final int singleByte) throws IOException { - if (_position >= _outputBuffer.length) { - writeCompressedBlock(); - } - _outputBuffer[_position++] = (byte) singleByte; - } - - @Override - public void writeByte(byte b) throws IOException { - if (_position >= _outputBuffer.length) { - writeCompressedBlock(); - } - _outputBuffer[_position++] = b; - } - - @Override - public void writeBytes(byte[] buffer, int offset, int length) throws IOException { - // ES, check if length is 0, and don't write in this case - if (length == 0) { - return; - } - final int BUFFER_LEN = _outputBuffer.length; - - // simple case first: buffering only (for trivially short writes) - int free = BUFFER_LEN - _position; - if (free >= length) { - System.arraycopy(buffer, offset, _outputBuffer, _position, length); - _position += length; - return; - } - // otherwise, copy whatever we can, flush - System.arraycopy(buffer, offset, _outputBuffer, _position, free); - offset += free; - length -= free; - _position += free; - writeCompressedBlock(); - - // then write intermediate full block, if any, without copying: - while (length >= BUFFER_LEN) { - _encoder.encodeAndWriteChunk(buffer, offset, BUFFER_LEN, _outputStream); - offset += BUFFER_LEN; - length -= BUFFER_LEN; - } - - // and finally, copy leftovers in buffer, if any - if (length > 0) { - System.arraycopy(buffer, offset, _outputBuffer, 0, length); - } - _position = length; - } - - @Override - public void flush() throws IOException { - if (_cfgFinishBlockOnFlush && _position > 0) { - writeCompressedBlock(); - } - _outputStream.flush(); - } - - @Override - public void close() throws IOException { - if (_position > 0) { - writeCompressedBlock(); - } - if (neverClose) { - // just reset here the LZF stream (not the underlying stream, since we might want to read from it) - _position = 0; - return; - } - _outputStream.flush(); - _encoder.close(); - byte[] buf = _outputBuffer; - if (buf != null) { - _outputBuffer = null; - _recycler.releaseOutputBuffer(buf); - } - _outputStream.close(); - } - - @Override - public void reset() throws IOException { - _position = 0; - _outputStream.reset(); - } - - public void reset(StreamOutput out) throws IOException { - this._outputStream = out; - reset(); - } - - public StreamOutput wrappedOut() { - return this._outputStream; - } - - /** - * Compress and write the current block to the OutputStream - */ - private void writeCompressedBlock() throws IOException { - int left = _position; - _position = 0; - int offset = 0; - - do { - int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left); - _encoder.encodeAndWriteChunk(_outputBuffer, offset, chunkLen, _outputStream); - offset += chunkLen; - left -= chunkLen; - } while (left > 0); - } -} diff --git a/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java b/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java index 3186ad370dd..12abf3fb5de 100644 --- a/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java +++ b/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java @@ -23,10 +23,10 @@ import com.google.common.base.Charsets; import com.google.common.collect.Maps; import org.elasticsearch.ElasticSearchParseException; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.compress.lzf.LZF; +import org.elasticsearch.common.compress.CompressedStreamInput; +import org.elasticsearch.common.compress.Compressor; +import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.BytesStreamInput; -import org.elasticsearch.common.io.stream.CachedStreamInput; -import org.elasticsearch.common.io.stream.LZFStreamInput; import java.io.IOException; import java.util.ArrayList; @@ -40,12 +40,12 @@ import java.util.Map; public class XContentHelper { public static XContentParser createParser(byte[] data, int offset, int length) throws IOException { - if (LZF.isCompressed(data, offset, length)) { - BytesStreamInput siBytes = new BytesStreamInput(data, offset, length, false); - LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); - XContentType contentType = XContentFactory.xContentType(siLzf); - siLzf.resetToBufferStart(); - return XContentFactory.xContent(contentType).createParser(siLzf); + Compressor compressor = CompressorFactory.compressor(data, offset, length); + if (compressor != null) { + CompressedStreamInput compressedInput = compressor.streamInput(new BytesStreamInput(data, offset, length, false)); + XContentType contentType = XContentFactory.xContentType(compressedInput); + compressedInput.resetToBufferStart(); + return XContentFactory.xContent(contentType).createParser(compressedInput); } else { return XContentFactory.xContent(data, offset, length).createParser(data, offset, length); } @@ -59,12 +59,12 @@ public class XContentHelper { try { XContentParser parser; XContentType contentType; - if (LZF.isCompressed(data, offset, length)) { - BytesStreamInput siBytes = new BytesStreamInput(data, offset, length, false); - LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); - contentType = XContentFactory.xContentType(siLzf); - siLzf.resetToBufferStart(); - parser = XContentFactory.xContent(contentType).createParser(siLzf); + Compressor compressor = CompressorFactory.compressor(data, offset, length); + if (compressor != null) { + CompressedStreamInput compressedStreamInput = compressor.streamInput(new BytesStreamInput(data, offset, length, false)); + contentType = XContentFactory.xContentType(compressedStreamInput); + compressedStreamInput.resetToBufferStart(); + parser = XContentFactory.xContent(contentType).createParser(compressedStreamInput); } else { contentType = XContentFactory.xContentType(data, offset, length); parser = XContentFactory.xContent(contentType).createParser(data, offset, length); diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java index 49d7cbacd7b..bbd83e06e70 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java @@ -43,7 +43,6 @@ import org.elasticsearch.transport.*; import java.io.IOException; import java.net.*; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @@ -269,12 +268,13 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem synchronized (sendMutex) { CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); try { - HandlesStreamOutput out = cachedEntry.cachedHandlesBytes(); + StreamOutput out = cachedEntry.cachedHandles(); out.writeBytes(INTERNAL_HEADER); Version.writeVersion(Version.CURRENT, out); out.writeInt(id); clusterName.writeTo(out); nodesProvider.nodes().localNode().writeTo(out); + out.close(); datagramPacketSend.setData(cachedEntry.bytes().copiedByteArray()); multicastSocket.send(datagramPacketSend); if (logger.isTraceEnabled()) { diff --git a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 44e6ab565ff..9c817b52b50 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -23,6 +23,8 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.compress.Compressor; +import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.*; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; @@ -67,9 +69,9 @@ public class PublishClusterStateAction extends AbstractComponent { CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); byte[] clusterStateInBytes; try { - HandlesStreamOutput stream = cachedEntry.cachedHandlesLzfBytes(); + StreamOutput stream = cachedEntry.cachedHandles(CompressorFactory.defaultCompressor()); ClusterState.Builder.writeTo(clusterState, stream); - stream.flush(); + stream.close(); clusterStateInBytes = cachedEntry.bytes().copiedByteArray(); } catch (Exception e) { logger.warn("failed to serialize cluster_state before publishing it to nodes", e); @@ -129,7 +131,14 @@ public class PublishClusterStateAction extends AbstractComponent { @Override public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception { - StreamInput in = CachedStreamInput.cachedHandlesLzf(new BytesStreamInput(request.clusterStateInBytes.bytes(), request.clusterStateInBytes.offset(), request.clusterStateInBytes.length(), false)); + Compressor compressor = CompressorFactory.compressor(request.clusterStateInBytes.bytes(), request.clusterStateInBytes.offset(), request.clusterStateInBytes.length()); + BytesStreamInput bytes = new BytesStreamInput(request.clusterStateInBytes.bytes(), request.clusterStateInBytes.offset(), request.clusterStateInBytes.length(), false); + StreamInput in; + if (compressor != null) { + in = CachedStreamInput.cachedHandlesCompressed(compressor, bytes); + } else { + in = CachedStreamInput.cachedHandles(bytes); + } ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); listener.onNewClusterState(clusterState); channel.sendResponse(VoidStreamable.INSTANCE); diff --git a/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java b/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java index 07edbca12e8..b3240d21fbc 100644 --- a/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java +++ b/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java @@ -26,8 +26,9 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.*; -import org.elasticsearch.common.compress.lzf.LZF; -import org.elasticsearch.common.io.stream.*; +import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.io.stream.CachedStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.*; @@ -147,7 +148,7 @@ public abstract class BlobStoreGateway extends SharedStorageGateway { try { StreamOutput streamOutput; if (compress) { - streamOutput = cachedEntry.cachedLZFBytes(); + streamOutput = cachedEntry.cachedBytes(CompressorFactory.defaultCompressor()); } else { streamOutput = cachedEntry.cachedBytes(); } @@ -206,13 +207,7 @@ public abstract class BlobStoreGateway extends SharedStorageGateway { private MetaData readMetaData(byte[] data) throws IOException { XContentParser parser = null; try { - if (LZF.isCompressed(data)) { - BytesStreamInput siBytes = new BytesStreamInput(data, false); - LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); - parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf); - } else { - parser = XContentFactory.xContent(XContentType.JSON).createParser(data); - } + parser = XContentHelper.createParser(data, 0, data.length); return MetaData.Builder.fromXContent(parser); } finally { if (parser != null) { diff --git a/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java b/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java index 22c7c6fb700..bdece0b85b9 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/shards/LocalGatewayShardsState.java @@ -27,14 +27,10 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.*; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.compress.lzf.LZF; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.io.stream.BytesStreamInput; -import org.elasticsearch.common.io.stream.CachedStreamInput; import org.elasticsearch.common.io.stream.CachedStreamOutput; -import org.elasticsearch.common.io.stream.LZFStreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.*; @@ -402,13 +398,7 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste try { Map shardsState = Maps.newHashMap(); - if (LZF.isCompressed(data)) { - BytesStreamInput siBytes = new BytesStreamInput(data, false); - LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); - parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf); - } else { - parser = XContentFactory.xContent(XContentType.JSON).createParser(data); - } + parser = XContentHelper.createParser(data, 0, data.length); String currentFieldName = null; XContentParser.Token token = parser.nextToken(); diff --git a/src/main/java/org/elasticsearch/index/get/GetResult.java b/src/main/java/org/elasticsearch/index/get/GetResult.java index 4d999b9d070..f1ee0eea768 100644 --- a/src/main/java/org/elasticsearch/index/get/GetResult.java +++ b/src/main/java/org/elasticsearch/index/get/GetResult.java @@ -23,8 +23,7 @@ import com.google.common.collect.ImmutableMap; import org.elasticsearch.ElasticSearchParseException; import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.Unicode; -import org.elasticsearch.common.compress.lzf.LZF; -import org.elasticsearch.common.compress.lzf.LZFDecoder; +import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -168,14 +167,12 @@ public class GetResult implements Streamable, Iterable, ToXContent { * Returns bytes reference, also un compress the source if needed. */ public BytesHolder sourceRef() { - if (LZF.isCompressed(source.bytes(), source.offset(), source.length())) { - try { - this.source = new BytesHolder(LZFDecoder.decode(source.bytes(), source.offset(), source.length())); - } catch (IOException e) { - throw new ElasticSearchParseException("failed to decompress source", e); - } + try { + this.source = CompressorFactory.uncompressIfNeeded(this.source); + return this.source; + } catch (IOException e) { + throw new ElasticSearchParseException("failed to decompress source", e); } - return this.source; } /** diff --git a/src/main/java/org/elasticsearch/index/mapper/core/BinaryFieldMapper.java b/src/main/java/org/elasticsearch/index/mapper/core/BinaryFieldMapper.java index 68756688793..b0531363da4 100644 --- a/src/main/java/org/elasticsearch/index/mapper/core/BinaryFieldMapper.java +++ b/src/main/java/org/elasticsearch/index/mapper/core/BinaryFieldMapper.java @@ -23,11 +23,11 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.Fieldable; import org.elasticsearch.ElasticSearchParseException; import org.elasticsearch.common.Base64; +import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.compress.lzf.LZF; -import org.elasticsearch.common.compress.lzf.LZFDecoder; +import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.CachedStreamOutput; -import org.elasticsearch.common.io.stream.LZFStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -127,14 +127,14 @@ public class BinaryFieldMapper extends AbstractFieldMapper { @Override public byte[] value(Fieldable field) { byte[] value = field.getBinaryValue(); - if (value != null && LZF.isCompressed(value)) { - try { - return LZFDecoder.decode(value); - } catch (IOException e) { - throw new ElasticSearchParseException("failed to decompress source", e); - } + if (value == null) { + return value; + } + try { + return CompressorFactory.uncompressIfNeeded(new BytesHolder(value)).bytes(); + } catch (IOException e) { + throw new ElasticSearchParseException("failed to decompress source", e); } - return value; } @Override @@ -167,12 +167,12 @@ public class BinaryFieldMapper extends AbstractFieldMapper { return null; } else { value = context.parser().binaryValue(); - if (compress != null && compress && !LZF.isCompressed(value, 0, value.length)) { + if (compress != null && compress && !CompressorFactory.isCompressed(value, 0, value.length)) { if (compressThreshold == -1 || value.length > compressThreshold) { CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); - LZFStreamOutput streamOutput = cachedEntry.cachedLZFBytes(); + StreamOutput streamOutput = cachedEntry.cachedBytes(CompressorFactory.defaultCompressor()); streamOutput.writeBytes(value, 0, value.length); - streamOutput.flush(); + streamOutput.close(); // we copy over the byte array, since we need to push back the cached entry // TODO, we we had a handle into when we are done with parsing, then we push back then and not copy over bytes value = cachedEntry.bytes().copiedByteArray(); diff --git a/src/main/java/org/elasticsearch/index/mapper/internal/SourceFieldMapper.java b/src/main/java/org/elasticsearch/index/mapper/internal/SourceFieldMapper.java index e6be08d1828..dbe76d60895 100644 --- a/src/main/java/org/elasticsearch/index/mapper/internal/SourceFieldMapper.java +++ b/src/main/java/org/elasticsearch/index/mapper/internal/SourceFieldMapper.java @@ -24,11 +24,15 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.Fieldable; import org.elasticsearch.ElasticSearchParseException; +import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.compress.lzf.LZF; -import org.elasticsearch.common.compress.lzf.LZFDecoder; -import org.elasticsearch.common.io.stream.*; +import org.elasticsearch.common.compress.CompressedStreamInput; +import org.elasticsearch.common.compress.Compressor; +import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.io.stream.BytesStreamInput; +import org.elasticsearch.common.io.stream.CachedStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.document.ResetFieldSelector; import org.elasticsearch.common.unit.ByteSizeValue; @@ -251,7 +255,7 @@ public class SourceFieldMapper extends AbstractFieldMapper implements In CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); StreamOutput streamOutput; if (compress != null && compress && (compressThreshold == -1 || dataLength > compressThreshold)) { - streamOutput = cachedEntry.cachedLZFBytes(); + streamOutput = cachedEntry.cachedBytes(CompressorFactory.defaultCompressor()); } else { streamOutput = cachedEntry.cachedBytes(); } @@ -267,19 +271,19 @@ public class SourceFieldMapper extends AbstractFieldMapper implements In dataLength = data.length; CachedStreamOutput.pushEntry(cachedEntry); - } else if (compress != null && compress && !LZF.isCompressed(data, dataOffset, dataLength)) { + } else if (compress != null && compress && !CompressorFactory.isCompressed(data, dataOffset, dataLength)) { if (compressThreshold == -1 || dataLength > compressThreshold) { CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); try { XContentType contentType = XContentFactory.xContentType(data, dataOffset, dataLength); if (formatContentType != null && formatContentType != contentType) { - XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, cachedEntry.cachedLZFBytes()); + XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, cachedEntry.cachedBytes(CompressorFactory.defaultCompressor())); builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(data, dataOffset, dataLength)); builder.close(); } else { - LZFStreamOutput streamOutput = cachedEntry.cachedLZFBytes(); + StreamOutput streamOutput = cachedEntry.cachedBytes(CompressorFactory.defaultCompressor()); streamOutput.writeBytes(data, dataOffset, dataLength); - streamOutput.flush(); + streamOutput.close(); } // we copy over the byte array, since we need to push back the cached entry // TODO, we we had a handle into when we are done with parsing, then we push back then and not copy over bytes @@ -294,18 +298,18 @@ public class SourceFieldMapper extends AbstractFieldMapper implements In } } else if (formatContentType != null) { // see if we need to convert the content type - if (LZF.isCompressed(data, dataOffset, dataLength)) { - BytesStreamInput siBytes = new BytesStreamInput(data, dataOffset, dataLength, false); - LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); - XContentType contentType = XContentFactory.xContentType(siLzf); - siLzf.resetToBufferStart(); + Compressor compressor = CompressorFactory.compressor(data, dataOffset, dataLength); + if (compressor != null) { + CompressedStreamInput compressedStreamInput = compressor.streamInput(new BytesStreamInput(data, dataOffset, dataLength, false)); + XContentType contentType = XContentFactory.xContentType(compressedStreamInput); + compressedStreamInput.resetToBufferStart(); if (contentType != formatContentType) { // we need to reread and store back, compressed.... CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); try { - LZFStreamOutput streamOutput = cachedEntry.cachedLZFBytes(); + StreamOutput streamOutput = cachedEntry.cachedBytes(CompressorFactory.defaultCompressor()); XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, streamOutput); - builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(siLzf)); + builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(compressedStreamInput)); builder.close(); data = cachedEntry.bytes().copiedByteArray(); dataOffset = 0; @@ -315,6 +319,8 @@ public class SourceFieldMapper extends AbstractFieldMapper implements In } finally { CachedStreamOutput.pushEntry(cachedEntry); } + } else { + compressedStreamInput.close(); } } else { XContentType contentType = XContentFactory.xContentType(data, dataOffset, dataLength); @@ -355,14 +361,11 @@ public class SourceFieldMapper extends AbstractFieldMapper implements In if (value == null) { return value; } - if (LZF.isCompressed(value)) { - try { - return LZFDecoder.decode(value); - } catch (IOException e) { - throw new ElasticSearchParseException("failed to decompress source", e); - } + try { + return CompressorFactory.uncompressIfNeeded(new BytesHolder(value)).bytes(); + } catch (IOException e) { + throw new ElasticSearchParseException("failed to decompress source", e); } - return value; } @Override diff --git a/src/main/java/org/elasticsearch/rest/action/support/RestXContentBuilder.java b/src/main/java/org/elasticsearch/rest/action/support/RestXContentBuilder.java index 793128dc849..330150beb35 100644 --- a/src/main/java/org/elasticsearch/rest/action/support/RestXContentBuilder.java +++ b/src/main/java/org/elasticsearch/rest/action/support/RestXContentBuilder.java @@ -19,11 +19,11 @@ package org.elasticsearch.rest.action.support; -import org.elasticsearch.common.compress.lzf.LZF; +import org.elasticsearch.common.compress.CompressedStreamInput; +import org.elasticsearch.common.compress.Compressor; +import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.BytesStreamInput; -import org.elasticsearch.common.io.stream.CachedStreamInput; import org.elasticsearch.common.io.stream.CachedStreamOutput; -import org.elasticsearch.common.io.stream.LZFStreamInput; import org.elasticsearch.common.xcontent.*; import org.elasticsearch.rest.RestRequest; @@ -67,15 +67,15 @@ public class RestXContentBuilder { } public static void restDocumentSource(byte[] source, int offset, int length, XContentBuilder builder, ToXContent.Params params) throws IOException { - if (LZF.isCompressed(source, offset, length)) { - BytesStreamInput siBytes = new BytesStreamInput(source, offset, length, false); - LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); - XContentType contentType = XContentFactory.xContentType(siLzf); - siLzf.resetToBufferStart(); + Compressor compressor = CompressorFactory.compressor(source, offset, length); + if (compressor != null) { + CompressedStreamInput compressedStreamInput = compressor.streamInput(new BytesStreamInput(source, offset, length, false)); + XContentType contentType = XContentFactory.xContentType(compressedStreamInput); + compressedStreamInput.resetToBufferStart(); if (contentType == builder.contentType()) { - builder.rawField("_source", siLzf); + builder.rawField("_source", compressedStreamInput); } else { - XContentParser parser = XContentFactory.xContent(contentType).createParser(siLzf); + XContentParser parser = XContentFactory.xContent(contentType).createParser(compressedStreamInput); try { parser.nextToken(); builder.field("_source"); diff --git a/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java b/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java index 489a4b6a9e3..dd0a9ab7852 100644 --- a/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java +++ b/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java @@ -26,8 +26,7 @@ import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.Unicode; -import org.elasticsearch.common.compress.lzf.LZF; -import org.elasticsearch.common.compress.lzf.LZFDecoder; +import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -167,14 +166,12 @@ public class InternalSearchHit implements SearchHit { * Returns bytes reference, also un compress the source if needed. */ public BytesHolder sourceRef() { - if (LZF.isCompressed(source.bytes(), source.offset(), source.length())) { - try { - this.source = new BytesHolder(LZFDecoder.decode(source.bytes(), source.offset(), source.length())); - } catch (IOException e) { - throw new ElasticSearchParseException("failed to decompress source", e); - } + try { + this.source = CompressorFactory.uncompressIfNeeded(this.source); + return this.source; + } catch (IOException e) { + throw new ElasticSearchParseException("failed to decompress source", e); } - return this.source; } /** diff --git a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index cde0ba20981..e22b6bcd342 100644 --- a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -159,7 +159,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException { CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); try { - HandlesStreamOutput stream = cachedEntry.cachedHandlesBytes(); + StreamOutput stream = cachedEntry.cachedHandles(); stream.writeLong(requestId); byte status = 0; @@ -169,12 +169,14 @@ public class LocalTransport extends AbstractLifecycleComponent implem stream.writeUTF(action); message.writeTo(stream); + stream.close(); + final LocalTransport targetTransport = connectedNodes.get(node); if (targetTransport == null) { throw new NodeNotConnectedException(node, "Node not connected"); } - final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray(); + final byte[] data = cachedEntry.bytes().copiedByteArray(); transportServiceAdapter.sent(data.length); diff --git a/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java b/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java index 5c5610be4c1..5b686b099ff 100644 --- a/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java +++ b/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java @@ -22,7 +22,7 @@ package org.elasticsearch.transport.local; import org.elasticsearch.common.io.ThrowableObjectOutputStream; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.CachedStreamOutput; -import org.elasticsearch.common.io.stream.HandlesStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.transport.NotSerializableTransportException; import org.elasticsearch.transport.RemoteTransportException; @@ -68,12 +68,13 @@ public class LocalTransportChannel implements TransportChannel { public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException { CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); try { - HandlesStreamOutput stream = cachedEntry.cachedHandlesBytes(); + StreamOutput stream = cachedEntry.cachedHandles(); stream.writeLong(requestId); byte status = 0; status = TransportStreams.statusSetResponse(status); stream.writeByte(status); // 0 for request, 1 for response. message.writeTo(stream); + stream.close(); final byte[] data = cachedEntry.bytes().copiedByteArray(); targetTransport.threadPool().generic().execute(new Runnable() { @Override diff --git a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 89e9ebd3d94..acc24a86513 100644 --- a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -19,9 +19,11 @@ package org.elasticsearch.transport.netty; +import org.elasticsearch.ElasticSearchIllegalStateException; +import org.elasticsearch.common.compress.Compressor; +import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.ThrowableObjectInputStream; import org.elasticsearch.common.io.stream.CachedStreamInput; -import org.elasticsearch.common.io.stream.HandlesStreamInput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.logging.ESLogger; @@ -215,9 +217,13 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { byte status = buffer.readByte(); boolean isRequest = TransportStreams.statusIsRequest(status); - HandlesStreamInput wrappedStream; - if (TransportStreams.statusIsCompress(status)) { - wrappedStream = CachedStreamInput.cachedHandlesLzf(streamIn); + StreamInput wrappedStream; + if (TransportStreams.statusIsCompress(status) && buffer.readable()) { + Compressor compressor = CompressorFactory.compressor(buffer); + if (compressor == null) { + throw new ElasticSearchIllegalStateException("stream marked as compressed, but no compressor found"); + } + wrappedStream = CachedStreamInput.cachedHandlesCompressed(compressor, streamIn); } else { wrappedStream = CachedStreamInput.cachedHandles(streamIn); } @@ -254,7 +260,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { buffer.readerIndex(expectedIndexReader); } } - wrappedStream.cleanHandles(); + wrappedStream.close(); } private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) { diff --git a/src/main/java/org/elasticsearch/transport/support/TransportStreams.java b/src/main/java/org/elasticsearch/transport/support/TransportStreams.java index 2d1cf54e70d..e3396420da0 100644 --- a/src/main/java/org/elasticsearch/transport/support/TransportStreams.java +++ b/src/main/java/org/elasticsearch/transport/support/TransportStreams.java @@ -19,8 +19,9 @@ package org.elasticsearch.transport.support; +import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.stream.CachedStreamOutput; -import org.elasticsearch.common.io.stream.HandlesStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponseOptions; @@ -105,17 +106,17 @@ public class TransportStreams { if (options.compress()) { status = TransportStreams.statusSetCompress(status); - HandlesStreamOutput stream = cachedEntry.cachedHandlesLzfBytes(); + StreamOutput stream = cachedEntry.cachedHandles(CompressorFactory.defaultCompressor()); cachedEntry.bytes().write(HEADER_PLACEHOLDER); stream.writeUTF(action); message.writeTo(stream); - stream.flush(); + stream.close(); } else { - HandlesStreamOutput stream = cachedEntry.cachedHandlesBytes(); + StreamOutput stream = cachedEntry.cachedHandles(); cachedEntry.bytes().write(HEADER_PLACEHOLDER); stream.writeUTF(action); message.writeTo(stream); - stream.flush(); + stream.close(); } TransportStreams.writeHeader(cachedEntry.bytes().underlyingBytes(), cachedEntry.bytes().size(), requestId, status); } @@ -126,15 +127,15 @@ public class TransportStreams { if (options.compress()) { status = TransportStreams.statusSetCompress(status); - HandlesStreamOutput stream = cachedEntry.cachedHandlesLzfBytes(); + StreamOutput stream = cachedEntry.cachedHandles(CompressorFactory.defaultCompressor()); cachedEntry.bytes().write(HEADER_PLACEHOLDER); message.writeTo(stream); - stream.flush(); + stream.close(); } else { - HandlesStreamOutput stream = cachedEntry.cachedHandlesBytes(); + StreamOutput stream = cachedEntry.cachedHandles(); cachedEntry.bytes().write(HEADER_PLACEHOLDER); message.writeTo(stream); - stream.flush(); + stream.close(); } TransportStreams.writeHeader(cachedEntry.bytes().underlyingBytes(), cachedEntry.bytes().size(), requestId, status); } diff --git a/src/test/java/org/elasticsearch/test/unit/index/engine/robin/SimpleRobinEngineTests.java b/src/test/java/org/elasticsearch/test/unit/index/engine/robin/SimpleRobinEngineTests.java index 18a8c9e5b82..6fd1252e86e 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/engine/robin/SimpleRobinEngineTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/engine/robin/SimpleRobinEngineTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.unit.index.engine.AbstractSimpleEngineTests; -import org.elasticsearch.threadpool.ThreadPool; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; @@ -39,7 +38,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_ public class SimpleRobinEngineTests extends AbstractSimpleEngineTests { protected Engine createEngine(Store store, Translog translog) { - return new RobinEngine(shardId, EMPTY_SETTINGS, new ThreadPool(), new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), new ShardIndexingService(shardId, EMPTY_SETTINGS), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(), + return new RobinEngine(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), new ShardIndexingService(shardId, EMPTY_SETTINGS), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(), new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NoneBloomCache(shardId.index())); } } diff --git a/src/test/java/org/elasticsearch/test/unit/index/mapper/source/CompressSourceMappingTests.java b/src/test/java/org/elasticsearch/test/unit/index/mapper/source/CompressSourceMappingTests.java index 9cfb8d934af..de196539874 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/mapper/source/CompressSourceMappingTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/mapper/source/CompressSourceMappingTests.java @@ -19,11 +19,11 @@ package org.elasticsearch.test.unit.index.mapper.source; -import org.elasticsearch.common.compress.lzf.LZF; +import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.mapper.DocumentMapper; -import org.elasticsearch.test.unit.index.mapper.MapperTests; import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.test.unit.index.mapper.MapperTests; import org.testng.annotations.Test; import static org.hamcrest.MatcherAssert.assertThat; @@ -47,7 +47,7 @@ public class CompressSourceMappingTests { .field("field2", "value2") .endObject().copiedBytes()); - assertThat(LZF.isCompressed(doc.rootDoc().getBinaryValue("_source")), equalTo(false)); + assertThat(CompressorFactory.isCompressed(doc.rootDoc().getBinaryValue("_source")), equalTo(false)); } @Test @@ -63,7 +63,7 @@ public class CompressSourceMappingTests { .field("field2", "value2") .endObject().copiedBytes()); - assertThat(LZF.isCompressed(doc.rootDoc().getBinaryValue("_source")), equalTo(true)); + assertThat(CompressorFactory.isCompressed(doc.rootDoc().getBinaryValue("_source")), equalTo(true)); } @Test @@ -78,7 +78,7 @@ public class CompressSourceMappingTests { .field("field1", "value1") .endObject().copiedBytes()); - assertThat(LZF.isCompressed(doc.rootDoc().getBinaryValue("_source")), equalTo(false)); + assertThat(CompressorFactory.isCompressed(doc.rootDoc().getBinaryValue("_source")), equalTo(false)); doc = documentMapper.parse("type", "1", XContentFactory.jsonBuilder().startObject() .field("field1", "value1") @@ -88,6 +88,6 @@ public class CompressSourceMappingTests { .field("field2", "value2 xxxxxxxxxxxxxx yyyyyyyyyyyyyyyyyyy zzzzzzzzzzzzzzzzz") .endObject().copiedBytes()); - assertThat(LZF.isCompressed(doc.rootDoc().getBinaryValue("_source")), equalTo(true)); + assertThat(CompressorFactory.isCompressed(doc.rootDoc().getBinaryValue("_source")), equalTo(true)); } } diff --git a/src/test/java/org/elasticsearch/test/unit/index/mapper/source/DefaultSourceMappingTests.java b/src/test/java/org/elasticsearch/test/unit/index/mapper/source/DefaultSourceMappingTests.java index 476e9e7b5f6..11586e8de2e 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/mapper/source/DefaultSourceMappingTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/mapper/source/DefaultSourceMappingTests.java @@ -20,8 +20,8 @@ package org.elasticsearch.test.unit.index.mapper.source; import org.apache.lucene.document.Fieldable; -import org.elasticsearch.common.compress.lzf.LZF; -import org.elasticsearch.common.compress.lzf.LZFDecoder; +import org.elasticsearch.common.BytesHolder; +import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.mapper.DocumentMapper; @@ -94,8 +94,8 @@ public class DefaultSourceMappingTests { .field("field", "value") .endObject().copiedBytes()); - assertThat(LZF.isCompressed(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(true)); - byte[] uncompressed = LZFDecoder.decode(doc.source(), doc.sourceOffset(), doc.sourceLength()); + assertThat(CompressorFactory.isCompressed(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(true)); + byte[] uncompressed = CompressorFactory.uncompressIfNeeded(new BytesHolder(doc.source(), doc.sourceOffset(), doc.sourceLength())).copyBytes(); assertThat(XContentFactory.xContentType(uncompressed), equalTo(XContentType.JSON)); documentMapper = MapperTests.newParser().parse(mapping); @@ -103,8 +103,8 @@ public class DefaultSourceMappingTests { .field("field", "value") .endObject().copiedBytes()); - assertThat(LZF.isCompressed(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(true)); - uncompressed = LZFDecoder.decode(doc.source(), doc.sourceOffset(), doc.sourceLength()); + assertThat(CompressorFactory.isCompressed(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(true)); + uncompressed = CompressorFactory.uncompressIfNeeded(new BytesHolder(doc.source(), doc.sourceOffset(), doc.sourceLength())).copyBytes(); assertThat(XContentFactory.xContentType(uncompressed), equalTo(XContentType.JSON)); }