abstract compression

abstract the LZF compression into a compress package allowing for different implementation in the future
This commit is contained in:
Shay Banon 2012-06-19 04:07:11 +02:00
parent 1a98a9184e
commit aebd27afbd
42 changed files with 871 additions and 2243 deletions

12
pom.xml
View File

@ -143,6 +143,13 @@
<version>3.5.0.Final</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.ning</groupId>
<artifactId>compress-lzf</artifactId>
<version>0.9.5</version>
<scope>compile</scope>
</dependency>
<!-- END: dependencies that are shaded -->
<dependency>
@ -307,6 +314,7 @@
<include>org.yaml:snakeyaml</include>
<include>joda-time:joda-time</include>
<include>io.netty:netty</include>
<include>com.ning:compress-lzf</include>
</includes>
</artifactSet>
<relocations>
@ -346,6 +354,10 @@
<pattern>org.jboss.netty</pattern>
<shadedPattern>org.elasticsearch.common.netty</shadedPattern>
</relocation>
<relocation>
<pattern>com.ning.compress</pattern>
<shadedPattern>org.elasticsearch.common.compress</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>

View File

@ -21,7 +21,6 @@ package org.elasticsearch.common;
import gnu.trove.map.hash.*;
import jsr166y.LinkedTransferQueue;
import org.elasticsearch.common.compress.lzf.BufferRecycler;
import org.elasticsearch.common.trove.ExtTDoubleObjectHashMap;
import org.elasticsearch.common.trove.ExtTHashMap;
import org.elasticsearch.common.trove.ExtTLongObjectHashMap;
@ -33,7 +32,6 @@ import java.util.Queue;
public class CacheRecycler {
public static void clear() {
BufferRecycler.clean();
hashMap.clear();
doubleObjectHashMap.clear();
longObjectHashMap.clear();

View File

@ -0,0 +1,174 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
/**
*/
public abstract class CompressedStreamInput extends StreamInput {
private final StreamInput in;
private boolean closed;
protected byte[] uncompressed;
private int position = 0;
private int valid = 0;
public CompressedStreamInput(StreamInput in) throws IOException {
this.in = in;
readHeader(in);
}
/**
* Expert!, resets to buffer start, without the need to decompress it again.
*/
public void resetToBufferStart() {
this.position = 0;
}
/**
* Method is overridden to report number of bytes that can now be read
* from decoded data buffer, without reading bytes from the underlying
* stream.
* Never throws an exception; returns number of bytes available without
* further reads from underlying source; -1 if stream has been closed, or
* 0 if an actual read (and possible blocking) is needed to find out.
*/
@Override
public int available() throws IOException {
// if closed, return -1;
if (closed) {
return -1;
}
int left = (valid - position);
return (left <= 0) ? 0 : left;
}
@Override
public int read() throws IOException {
if (!readyBuffer()) {
return -1;
}
return uncompressed[position++] & 255;
}
@Override
public byte readByte() throws IOException {
if (!readyBuffer()) {
throw new EOFException();
}
return uncompressed[position++];
}
@Override
public int read(byte[] buffer, int offset, int length) throws IOException {
return read(buffer, offset, length, false);
}
public int read(byte[] buffer, int offset, int length, boolean fullRead) throws IOException {
if (length < 1) {
return 0;
}
if (!readyBuffer()) {
return -1;
}
// First let's read however much data we happen to have...
int chunkLength = Math.min(valid - position, length);
System.arraycopy(uncompressed, position, buffer, offset, chunkLength);
position += chunkLength;
if (chunkLength == length || !fullRead) {
return chunkLength;
}
// Need more data, then
int totalRead = chunkLength;
do {
offset += chunkLength;
if (!readyBuffer()) {
break;
}
chunkLength = Math.min(valid - position, (length - totalRead));
System.arraycopy(uncompressed, position, buffer, offset, chunkLength);
position += chunkLength;
totalRead += chunkLength;
} while (totalRead < length);
return totalRead;
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
int result = read(b, offset, len, true /* we want to have full reads, thats the contract... */);
if (result < len) {
throw new EOFException();
}
}
@Override
public void reset() throws IOException {
this.position = 0;
this.valid = 0;
in.reset();
}
@Override
public void close() throws IOException {
position = valid = 0;
if (!closed) {
closed = true;
doClose();
in.close();
}
}
protected abstract void doClose() throws IOException;
/**
* Fill the uncompressed bytes buffer by reading the underlying inputStream.
*/
protected boolean readyBuffer() throws IOException {
if (position < valid) {
return true;
}
if (closed) {
return false;
}
valid = uncompress(in, uncompressed);
if (valid < 0) {
return false;
}
position = 0;
return (position < valid);
}
protected abstract void readHeader(StreamInput in) throws IOException;
/**
* Uncompress the data into the out array, returning the size uncompressed
*/
protected abstract int uncompress(InputStream in, byte[] out) throws IOException;
}

View File

@ -0,0 +1,134 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*/
public abstract class CompressedStreamOutput extends StreamOutput {
private final StreamOutput out;
protected byte[] uncompressed;
private int position = 0;
private boolean closed;
public CompressedStreamOutput(StreamOutput out) throws IOException {
this.out = out;
writeHeader(out);
}
@Override
public void write(int b) throws IOException {
if (position >= uncompressed.length) {
flushBuffer();
}
uncompressed[position++] = (byte) b;
}
@Override
public void writeByte(byte b) throws IOException {
if (position >= uncompressed.length) {
flushBuffer();
}
uncompressed[position++] = b;
}
@Override
public void writeBytes(byte[] input, int offset, int length) throws IOException {
// ES, check if length is 0, and don't write in this case
if (length == 0) {
return;
}
final int BUFFER_LEN = uncompressed.length;
// simple case first: buffering only (for trivially short writes)
int free = BUFFER_LEN - position;
if (free >= length) {
System.arraycopy(input, offset, uncompressed, position, length);
position += length;
return;
}
// fill partial input as much as possible and flush
if (position > 0) {
System.arraycopy(input, offset, uncompressed, position, free);
position += free;
flushBuffer();
offset += free;
length -= free;
}
// then write intermediate full block, if any, without copying:
while (length >= BUFFER_LEN) {
compress(input, offset, BUFFER_LEN, out);
offset += BUFFER_LEN;
length -= BUFFER_LEN;
}
// and finally, copy leftovers in input, if any
if (length > 0) {
System.arraycopy(input, offset, uncompressed, 0, length);
}
position = length;
}
@Override
public void flush() throws IOException {
flushBuffer();
out.flush();
}
@Override
public void close() throws IOException {
if (!closed) {
flushBuffer();
closed = true;
doClose();
out.close();
}
}
protected abstract void doClose() throws IOException;
@Override
public void reset() throws IOException {
position = 0;
out.reset();
}
private void flushBuffer() throws IOException {
if (position > 0) {
compress(uncompressed, 0, position, out);
position = 0;
}
}
protected abstract void writeHeader(StreamOutput out) throws IOException;
/**
* Compresses the data into the output
*/
protected abstract void compress(byte[] data, int offset, int len, StreamOutput out) throws IOException;
}

View File

@ -21,9 +21,6 @@ package org.elasticsearch.common.compress;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.compress.lzf.LZFEncoder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -58,16 +55,19 @@ public class CompressedString implements Streamable {
* @throws IOException
*/
public CompressedString(byte[] data, int offset, int length) throws IOException {
if (LZF.isCompressed(data, offset, length)) {
Compressor compressor = CompressorFactory.compressor(data, offset, length);
if (compressor != null) {
// already compressed...
this.bytes = Arrays.copyOfRange(data, offset, offset + length);
} else {
this.bytes = LZFEncoder.encode(data, offset, length);
// default to LZF
this.bytes = CompressorFactory.defaultCompressor().compress(data, offset, length);
}
}
public CompressedString(String str) throws IOException {
UnicodeUtil.UTF8Result result = Unicode.unsafeFromStringAsUtf8(str);
this.bytes = LZFEncoder.encode(result.result, result.length);
this.bytes = CompressorFactory.defaultCompressor().compress(result.result, 0, result.length);
}
public byte[] compressed() {
@ -75,11 +75,12 @@ public class CompressedString implements Streamable {
}
public byte[] uncompressed() throws IOException {
return LZFDecoder.decode(bytes);
Compressor compressor = CompressorFactory.compressor(bytes);
return compressor.uncompress(bytes, 0, bytes.length);
}
public String string() throws IOException {
return Unicode.fromBytes(LZFDecoder.decode(bytes));
return Unicode.fromBytes(uncompressed());
}
public static CompressedString readCompressedString(StreamInput in) throws IOException {

View File

@ -0,0 +1,51 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
/**
*/
public interface Compressor {
String type();
boolean isCompressed(byte[] data, int offset, int length);
boolean isCompressed(ChannelBuffer buffer);
/**
* Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(byte[], int, int)}.
*/
byte[] uncompress(byte[] data, int offset, int length) throws IOException;
/**
* Compresses the provided data, data can be detected as compressed using {@link #isCompressed(byte[], int, int)}.
*/
byte[] compress(byte[] data, int offset, int length) throws IOException;
CompressedStreamInput streamInput(StreamInput in) throws IOException;
CompressedStreamOutput streamOutput(StreamOutput out) throws IOException;
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.compress.lzf.LZFCompressor;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
/**
*/
public class CompressorFactory {
private static final LZFCompressor LZF = new LZFCompressor();
private static final Compressor[] compressors;
private static final ImmutableMap<String, Compressor> compressorsByType;
static {
compressors = new Compressor[1];
compressors[0] = LZF;
MapBuilder<String, Compressor> compressorsByTypeX = MapBuilder.newMapBuilder();
for (Compressor compressor : compressors) {
compressorsByTypeX.put(compressor.type(), compressor);
}
compressorsByType = compressorsByTypeX.immutableMap();
}
public static Compressor defaultCompressor() {
return LZF;
}
public static boolean isCompressed(byte[] data) {
return compressor(data, 0, data.length) != null;
}
public static boolean isCompressed(byte[] data, int offset, int length) {
return compressor(data, offset, length) != null;
}
@Nullable
public static Compressor compressor(BytesHolder bytes) {
return compressor(bytes.bytes(), bytes.offset(), bytes.length());
}
@Nullable
public static Compressor compressor(byte[] data) {
return compressor(data, 0, data.length);
}
@Nullable
public static Compressor compressor(byte[] data, int offset, int length) {
for (Compressor compressor : compressors) {
if (compressor.isCompressed(data, offset, length)) {
return compressor;
}
}
return null;
}
@Nullable
public static Compressor compressor(ChannelBuffer buffer) {
for (Compressor compressor : compressors) {
if (compressor.isCompressed(buffer)) {
return compressor;
}
}
return null;
}
public static Compressor compressor(String type) {
return compressorsByType.get(type);
}
/**
* Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(byte[], int, int)}.
*/
public static BytesHolder uncompressIfNeeded(BytesHolder bytes) throws IOException {
Compressor compressor = compressor(bytes);
if (compressor != null) {
return new BytesHolder(compressor.uncompress(bytes.bytes(), bytes.offset(), bytes.length()));
}
return bytes;
}
}

View File

@ -1,163 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.lzf;
import java.lang.ref.SoftReference;
/**
* Simple helper class to encapsulate details of basic buffer
* recycling scheme, which helps a lot (as per profiling) for
* smaller encoding cases.
*
* @author tatu
*/
public class BufferRecycler {
private final static int MIN_ENCODING_BUFFER = 4000;
private final static int MIN_OUTPUT_BUFFER = 8000;
/**
* This <code>ThreadLocal</code> contains a {@link java.lang.ref.SoftReference}
* to a {@link BufferRecycler} used to provide a low-cost
* buffer recycling for buffers we need for encoding, decoding.
*/
final protected static ThreadLocal<SoftReference<BufferRecycler>> _recyclerRef
= new ThreadLocal<SoftReference<BufferRecycler>>();
private byte[] _inputBuffer;
private byte[] _outputBuffer;
private byte[] _decodingBuffer;
private byte[] _encodingBuffer;
private int[] _encodingHash;
/**
* Accessor to get thread-local recycler instance
*/
public static BufferRecycler instance() {
SoftReference<BufferRecycler> ref = _recyclerRef.get();
BufferRecycler br = (ref == null) ? null : ref.get();
if (br == null) {
br = new BufferRecycler();
_recyclerRef.set(new SoftReference<BufferRecycler>(br));
}
return br;
}
public static void clean() {
_recyclerRef.remove();
}
/*
///////////////////////////////////////////////////////////////////////
// Buffers for encoding (output)
///////////////////////////////////////////////////////////////////////
*/
public byte[] allocEncodingBuffer(int minSize) {
byte[] buf = _encodingBuffer;
if (buf == null || buf.length < minSize) {
buf = new byte[Math.max(minSize, MIN_ENCODING_BUFFER)];
} else {
_encodingBuffer = null;
}
return buf;
}
public void releaseEncodeBuffer(byte[] buffer) {
if (_encodingBuffer == null || buffer.length > _encodingBuffer.length) {
_encodingBuffer = buffer;
}
}
public byte[] allocOutputBuffer(int minSize) {
byte[] buf = _outputBuffer;
if (buf == null || buf.length < minSize) {
buf = new byte[Math.max(minSize, MIN_OUTPUT_BUFFER)];
} else {
_outputBuffer = null;
}
return buf;
}
public void releaseOutputBuffer(byte[] buffer) {
if (_outputBuffer == null || (buffer != null && buffer.length > _outputBuffer.length)) {
_outputBuffer = buffer;
}
}
public int[] allocEncodingHash(int suggestedSize) {
int[] buf = _encodingHash;
if (buf == null || buf.length < suggestedSize) {
buf = new int[suggestedSize];
} else {
_encodingHash = null;
}
return buf;
}
public void releaseEncodingHash(int[] buffer) {
if (_encodingHash == null || (buffer != null && buffer.length > _encodingHash.length)) {
_encodingHash = buffer;
}
}
/*
///////////////////////////////////////////////////////////////////////
// Buffers for decoding (input)
///////////////////////////////////////////////////////////////////////
*/
public byte[] allocInputBuffer(int minSize) {
byte[] buf = _inputBuffer;
if (buf == null || buf.length < minSize) {
buf = new byte[Math.max(minSize, MIN_OUTPUT_BUFFER)];
} else {
_inputBuffer = null;
}
return buf;
}
public void releaseInputBuffer(byte[] buffer) {
if (_inputBuffer == null || (buffer != null && buffer.length > _inputBuffer.length)) {
_inputBuffer = buffer;
}
}
public byte[] allocDecodeBuffer(int size) {
byte[] buf = _decodingBuffer;
if (buf == null || buf.length < size) {
buf = new byte[size];
} else {
_decodingBuffer = null;
}
return buf;
}
public void releaseDecodeBuffer(byte[] buffer) {
if (_decodingBuffer == null || (buffer != null && buffer.length > _decodingBuffer.length)) {
_decodingBuffer = buffer;
}
}
}

View File

@ -1,228 +0,0 @@
package org.elasticsearch.common.compress.lzf;
import java.io.IOException;
import java.io.InputStream;
/**
* Decoder that handles decoding of sequence of encoded LZF chunks,
* combining them into a single contiguous result byte array.
*
* @author Tatu Saloranta (tatu@ning.com)
* @since 0.9
*/
public abstract class ChunkDecoder {
protected final static byte BYTE_NULL = 0;
protected final static int HEADER_BYTES = 5;
public ChunkDecoder() {
}
/*
///////////////////////////////////////////////////////////////////////
// Public API
///////////////////////////////////////////////////////////////////////
*/
/**
* Method for decompressing a block of input data encoded in LZF
* block structure (compatible with lzf command line utility),
* and can consist of any number of blocks.
* Note that input MUST consists of a sequence of one or more complete
* chunks; partial chunks can not be handled.
*/
public final byte[] decode(final byte[] inputBuffer) throws IOException {
byte[] result = new byte[calculateUncompressedSize(inputBuffer, 0, inputBuffer.length)];
decode(inputBuffer, 0, inputBuffer.length, result);
return result;
}
/**
* Method for decompressing a block of input data encoded in LZF
* block structure (compatible with lzf command line utility),
* and can consist of any number of blocks.
* Note that input MUST consists of a sequence of one or more complete
* chunks; partial chunks can not be handled.
*/
public final byte[] decode(final byte[] inputBuffer, int inputPtr, int inputLen) throws IOException {
byte[] result = new byte[calculateUncompressedSize(inputBuffer, inputPtr, inputLen)];
decode(inputBuffer, inputPtr, inputLen, result);
return result;
}
/**
* Method for decompressing a block of input data encoded in LZF
* block structure (compatible with lzf command line utility),
* and can consist of any number of blocks.
* Note that input MUST consists of a sequence of one or more complete
* chunks; partial chunks can not be handled.
*/
public final int decode(final byte[] inputBuffer, final byte[] targetBuffer) throws IOException {
return decode(inputBuffer, 0, inputBuffer.length, targetBuffer);
}
/**
* Method for decompressing a block of input data encoded in LZF
* block structure (compatible with lzf command line utility),
* and can consist of any number of blocks.
* Note that input MUST consists of a sequence of one or more complete
* chunks; partial chunks can not be handled.
*/
public int decode(final byte[] sourceBuffer, int inPtr, int inLength,
final byte[] targetBuffer) throws IOException {
byte[] result = targetBuffer;
int outPtr = 0;
int blockNr = 0;
final int end = inPtr + inLength - 1; // -1 to offset possible end marker
while (inPtr < end) {
// let's do basic sanity checks; no point in skimping with these checks
if (sourceBuffer[inPtr] != LZFChunk.BYTE_Z || sourceBuffer[inPtr + 1] != LZFChunk.BYTE_V) {
throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + inPtr + "): did not start with 'ZV' signature bytes");
}
inPtr += 2;
int type = sourceBuffer[inPtr++];
int len = uint16(sourceBuffer, inPtr);
inPtr += 2;
if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed
System.arraycopy(sourceBuffer, inPtr, result, outPtr, len);
outPtr += len;
} else { // compressed
int uncompLen = uint16(sourceBuffer, inPtr);
inPtr += 2;
decodeChunk(sourceBuffer, inPtr, result, outPtr, outPtr + uncompLen);
outPtr += uncompLen;
}
inPtr += len;
++blockNr;
}
return outPtr;
}
/**
* Main decode from a stream. Decompressed bytes are placed in the outputBuffer, inputBuffer
* is a "scratch-area".
*
* @param is An input stream of LZF compressed bytes
* @param inputBuffer A byte array used as a scratch area.
* @param outputBuffer A byte array in which the result is returned
* @return The number of bytes placed in the outputBuffer.
*/
public abstract int decodeChunk(final InputStream is, final byte[] inputBuffer, final byte[] outputBuffer)
throws IOException;
/**
* Main decode method for individual chunks.
*/
public abstract void decodeChunk(byte[] in, int inPos, byte[] out, int outPos, int outEnd)
throws IOException;
/*
///////////////////////////////////////////////////////////////////////
// Public static methods
///////////////////////////////////////////////////////////////////////
*/
/**
* Helper method that will calculate total uncompressed size, for sequence of
* one or more LZF blocks stored in given byte array.
* Will do basic sanity checking, so that this method can be called to
* verify against some types of corruption.
*/
public static int calculateUncompressedSize(byte[] data, int ptr, int length) throws IOException {
int uncompressedSize = 0;
int blockNr = 0;
final int end = ptr + length;
while (ptr < end) {
// can use optional end marker
if (ptr == (data.length + 1) && data[ptr] == BYTE_NULL) {
++ptr; // so that we'll be at end
break;
}
// simpler to handle bounds checks by catching exception here...
try {
if (data[ptr] != LZFChunk.BYTE_Z || data[ptr + 1] != LZFChunk.BYTE_V) {
throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + ptr + "): did not start with 'ZV' signature bytes");
}
int type = (int) data[ptr + 2];
int blockLen = uint16(data, ptr + 3);
if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed
ptr += 5;
uncompressedSize += blockLen;
} else if (type == LZFChunk.BLOCK_TYPE_COMPRESSED) { // compressed
uncompressedSize += uint16(data, ptr + 5);
ptr += 7;
} else { // unknown... CRC-32 would be 2, but that's not implemented by cli tool
throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + ptr + "): unrecognized block type " + (type & 0xFF));
}
ptr += blockLen;
} catch (ArrayIndexOutOfBoundsException e) {
throw new IOException("Corrupt input data, block #" + blockNr + " (at offset " + ptr + "): truncated block header");
}
++blockNr;
}
// one more sanity check:
if (ptr != end) {
throw new IOException("Corrupt input data: block #" + blockNr + " extends " + (data.length - ptr) + " beyond end of input");
}
return uncompressedSize;
}
/*
///////////////////////////////////////////////////////////////////////
// Internal methods
///////////////////////////////////////////////////////////////////////
*/
protected final static int uint16(byte[] data, int ptr) {
return ((data[ptr] & 0xFF) << 8) + (data[ptr + 1] & 0xFF);
}
/**
* Helper method to forcibly load header bytes that must be read before
* chunk can be handled.
*/
protected final static int readHeader(final InputStream is, final byte[] inputBuffer)
throws IOException {
// Ok: simple case first, where we just get all data we need
int needed = HEADER_BYTES;
int count = is.read(inputBuffer, 0, needed);
if (count == needed) {
return count;
}
if (count <= 0) {
return 0;
}
// if not, a source that trickles data (network etc); must loop
int offset = count;
needed -= count;
do {
count = is.read(inputBuffer, offset, needed);
if (count <= 0) {
break;
}
offset += count;
needed -= count;
} while (needed > 0);
return offset;
}
protected final static void readFully(InputStream is, boolean compressed,
byte[] outputBuffer, int offset, int len) throws IOException {
int left = len;
while (left > 0) {
int count = is.read(outputBuffer, offset, left);
if (count < 0) { // EOF not allowed here
throw new IOException("EOF in " + len + " byte ("
+ (compressed ? "" : "un") + "compressed) block: could only read "
+ (len - left) + " bytes");
}
offset += count;
left -= count;
}
}
}

View File

@ -1,269 +0,0 @@
/* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
package org.elasticsearch.common.compress.lzf;
import java.io.IOException;
import java.io.OutputStream;
/**
* Class that handles actual encoding of individual chunks.
* Resulting chunks can be compressed or non-compressed; compression
* is only used if it actually reduces chunk size (including overhead
* of additional header bytes)
*
* @author Tatu Saloranta (tatu@ning.com)
*/
public class ChunkEncoder {
// Beyond certain point we won't be able to compress; let's use 16 bytes as cut-off
private static final int MIN_BLOCK_TO_COMPRESS = 16;
private static final int MIN_HASH_SIZE = 256;
// Not much point in bigger tables, with 8k window
private static final int MAX_HASH_SIZE = 16384;
private static final int MAX_OFF = 1 << 13; // 8k
private static final int MAX_REF = (1 << 8) + (1 << 3); // 264
// // Encoding tables etc
private final BufferRecycler _recycler;
/**
* Hash table contains lookup based on 3-byte sequence; key is hash
* of such triplet, value is offset in buffer.
*/
private int[] _hashTable;
private final int _hashModulo;
/**
* Buffer in which encoded content is stored during processing
*/
private byte[] _encodeBuffer;
/**
* Small buffer passed to LZFChunk, needed for writing chunk header
*/
private byte[] _headerBuffer;
/**
* @param totalLength Total encoded length; used for calculating size
* of hash table to use
*/
// ES: Added recycler as a parameter so we can control its caching
public ChunkEncoder(int totalLength, BufferRecycler recycler) {
int largestChunkLen = Math.max(totalLength, LZFChunk.MAX_CHUNK_LEN);
int suggestedHashLen = calcHashLen(largestChunkLen);
_recycler = recycler;
_hashTable = _recycler.allocEncodingHash(suggestedHashLen);
_hashModulo = _hashTable.length - 1;
// Ok, then, what's the worst case output buffer length?
// length indicator for each 32 literals, so:
int bufferLen = largestChunkLen + ((largestChunkLen + 31) >> 5);
_encodeBuffer = _recycler.allocEncodingBuffer(bufferLen);
}
/*
///////////////////////////////////////////////////////////////////////
// Public API
///////////////////////////////////////////////////////////////////////
*/
/**
* Method to close once encoder is no longer in use. Note: after calling
* this method, further calls to {@link #encodeChunk} will fail
*/
public void close() {
byte[] buf = _encodeBuffer;
if (buf != null) {
_encodeBuffer = null;
_recycler.releaseEncodeBuffer(buf);
}
int[] ibuf = _hashTable;
if (ibuf != null) {
_hashTable = null;
_recycler.releaseEncodingHash(ibuf);
}
}
/**
* Method for compressing (or not) individual chunks
*/
public LZFChunk encodeChunk(byte[] data, int offset, int len) {
if (len >= MIN_BLOCK_TO_COMPRESS) {
/* If we have non-trivial block, and can compress it by at least
* 2 bytes (since header is 2 bytes longer), let's compress:
*/
int compLen = tryCompress(data, offset, offset + len, _encodeBuffer, 0);
if (compLen < (len - 2)) { // nah; just return uncompressed
return LZFChunk.createCompressed(len, _encodeBuffer, 0, compLen);
}
}
// Otherwise leave uncompressed:
return LZFChunk.createNonCompressed(data, offset, len);
}
/**
* Method for encoding individual chunk, writing it to given output stream.
*/
public void encodeAndWriteChunk(byte[] data, int offset, int len, OutputStream out)
throws IOException {
byte[] headerBuf = _headerBuffer;
if (headerBuf == null) {
_headerBuffer = headerBuf = new byte[LZFChunk.MAX_HEADER_LEN];
}
if (len >= MIN_BLOCK_TO_COMPRESS) {
/* If we have non-trivial block, and can compress it by at least
* 2 bytes (since header is 2 bytes longer), let's compress:
*/
int compLen = tryCompress(data, offset, offset + len, _encodeBuffer, 0);
if (compLen < (len - 2)) { // nah; just return uncompressed
LZFChunk.writeCompressedHeader(len, compLen, out, headerBuf);
out.write(_encodeBuffer, 0, compLen);
return;
}
}
// Otherwise leave uncompressed:
LZFChunk.writeNonCompressedHeader(len, out, headerBuf);
out.write(data, offset, len);
}
/*
///////////////////////////////////////////////////////////////////////
// Internal methods
///////////////////////////////////////////////////////////////////////
*/
private static int calcHashLen(int chunkSize) {
// in general try get hash table size of 2x input size
chunkSize += chunkSize;
// but no larger than max size:
if (chunkSize >= MAX_HASH_SIZE) {
return MAX_HASH_SIZE;
}
// otherwise just need to round up to nearest 2x
int hashLen = MIN_HASH_SIZE;
while (hashLen < chunkSize) {
hashLen += hashLen;
}
return hashLen;
}
private int first(byte[] in, int inPos) {
return (in[inPos] << 8) + (in[inPos + 1] & 255);
}
/*
private static int next(int v, byte[] in, int inPos) {
return (v << 8) + (in[inPos + 2] & 255);
}
*/
private final int hash(int h) {
// or 184117; but this seems to give better hashing?
return ((h * 57321) >> 9) & _hashModulo;
// original lzf-c.c used this:
//return (((h ^ (h << 5)) >> (24 - HLOG) - h*5) & _hashModulo;
// but that didn't seem to provide better matches
}
private int tryCompress(byte[] in, int inPos, int inEnd, byte[] out, int outPos) {
final int[] hashTable = _hashTable;
++outPos;
int seen = first(in, 0); // past 4 bytes we have seen... (last one is LSB)
int literals = 0;
inEnd -= 4;
final int firstPos = inPos; // so that we won't have back references across block boundary
while (inPos < inEnd) {
byte p2 = in[inPos + 2];
// next
seen = (seen << 8) + (p2 & 255);
int off = hash(seen);
int ref = hashTable[off];
hashTable[off] = inPos;
// First expected common case: no back-ref (for whatever reason)
if (ref >= inPos // can't refer forward (i.e. leftovers)
|| ref < firstPos // or to previous block
|| (off = inPos - ref) > MAX_OFF
|| in[ref + 2] != p2 // must match hash
|| in[ref + 1] != (byte) (seen >> 8)
|| in[ref] != (byte) (seen >> 16)) {
out[outPos++] = in[inPos++];
literals++;
if (literals == LZFChunk.MAX_LITERAL) {
out[outPos - 33] = (byte) 31; // <= out[outPos - literals - 1] = MAX_LITERAL_MINUS_1;
literals = 0;
outPos++;
}
continue;
}
// match
int maxLen = inEnd - inPos + 2;
if (maxLen > MAX_REF) {
maxLen = MAX_REF;
}
if (literals == 0) {
outPos--;
} else {
out[outPos - literals - 1] = (byte) (literals - 1);
literals = 0;
}
int len = 3;
while (len < maxLen && in[ref + len] == in[inPos + len]) {
len++;
}
len -= 2;
--off; // was off by one earlier
if (len < 7) {
out[outPos++] = (byte) ((off >> 8) + (len << 5));
} else {
out[outPos++] = (byte) ((off >> 8) + (7 << 5));
out[outPos++] = (byte) (len - 7);
}
out[outPos++] = (byte) off;
outPos++;
inPos += len;
seen = first(in, inPos);
seen = (seen << 8) + (in[inPos + 2] & 255);
hashTable[hash(seen)] = inPos;
++inPos;
seen = (seen << 8) + (in[inPos + 2] & 255); // hash = next(hash, in, inPos);
hashTable[hash(seen)] = inPos;
++inPos;
}
// try offlining the tail
return handleTail(in, inPos, inEnd + 4, out, outPos, literals);
}
private int handleTail(byte[] in, int inPos, int inEnd, byte[] out, int outPos,
int literals) {
while (inPos < inEnd) {
out[outPos++] = in[inPos++];
literals++;
if (literals == LZFChunk.MAX_LITERAL) {
out[outPos - literals - 1] = (byte) (literals - 1);
literals = 0;
outPos++;
}
}
out[outPos - literals - 1] = (byte) (literals - 1);
if (literals == 0) {
outPos--;
}
return outPos;
}
}

View File

@ -1,92 +0,0 @@
/* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
package org.elasticsearch.common.compress.lzf;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
/**
* Simple command-line utility that can be used for testing LZF
* compression, or as rudimentary command-line tool.
* Arguments are the same as used by the "standard" lzf command line tool
*
* @author tatu@ning.com
*/
public class LZF {
public static boolean isCompressed(final byte[] buffer) {
return buffer.length >= 2 && buffer[0] == LZFChunk.BYTE_Z && buffer[1] == LZFChunk.BYTE_V;
}
public static boolean isCompressed(final byte[] buffer, int offset, int length) {
return length >= 2 && buffer[offset] == LZFChunk.BYTE_Z && buffer[offset + 1] == LZFChunk.BYTE_V;
}
public final static String SUFFIX = ".lzf";
void process(String[] args) throws IOException {
if (args.length == 2) {
String oper = args[0];
boolean compress = "-c".equals(oper);
if (compress || "-d".equals(oper)) {
String filename = args[1];
File src = new File(filename);
if (!src.exists()) {
System.err.println("File '" + filename + "' does not exist.");
System.exit(1);
}
if (!compress && !filename.endsWith(SUFFIX)) {
System.err.println("File '" + filename + "' does end with expected suffix ('" + SUFFIX + "', won't decompress.");
System.exit(1);
}
byte[] data = readData(src);
System.out.println("Read " + data.length + " bytes.");
byte[] result = compress ? LZFEncoder.encode(data) : LZFDecoder.decode(data);
System.out.println("Processed into " + result.length + " bytes.");
File resultFile = compress ? new File(filename + SUFFIX) : new File(filename.substring(0, filename.length() - SUFFIX.length()));
FileOutputStream out = new FileOutputStream(resultFile);
out.write(result);
out.close();
System.out.println("Wrote in file '" + resultFile.getAbsolutePath() + "'.");
return;
}
}
System.err.println("Usage: java " + getClass().getName() + " -c/-d file");
System.exit(1);
}
private byte[] readData(File in) throws IOException {
int len = (int) in.length();
byte[] result = new byte[len];
int offset = 0;
FileInputStream fis = new FileInputStream(in);
while (len > 0) {
int count = fis.read(result, offset, len);
if (count < 0) break;
len -= count;
offset += count;
}
fis.close();
if (len > 0) { // should never occur...
throw new IOException("Could not read the whole file -- received EOF when there was " + len + " bytes left to read");
}
return result;
}
public static void main(String[] args) throws IOException {
new LZF().process(args);
}
}

View File

@ -1,125 +0,0 @@
/* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
package org.elasticsearch.common.compress.lzf;
import java.io.IOException;
import java.io.OutputStream;
/**
* Helper class used to store LZF encoded segments (compressed and non-compressed)
* that can be sequenced to produce LZF files/streams.
*
* @author tatu@ning.com
*/
public class LZFChunk {
/**
* Maximum length of literal run for LZF encoding.
*/
public static final int MAX_LITERAL = 1 << 5; // 32
// Chunk length is limited by 2-byte length indicator, to 64k
public static final int MAX_CHUNK_LEN = 0xFFFF;
/**
* Header can be either 7 bytes (compressed) or 5 bytes (uncompressed)
* long
*/
public static final int MAX_HEADER_LEN = 7;
public final static byte BYTE_Z = 'Z';
public final static byte BYTE_V = 'V';
public final static int BLOCK_TYPE_NON_COMPRESSED = 0;
public final static int BLOCK_TYPE_COMPRESSED = 1;
protected final byte[] _data;
protected LZFChunk _next;
private LZFChunk(byte[] data) {
_data = data;
}
/**
* Factory method for constructing compressed chunk
*/
public static LZFChunk createCompressed(int origLen, byte[] encData, int encPtr, int encLen) {
byte[] result = new byte[encLen + 7];
result[0] = BYTE_Z;
result[1] = BYTE_V;
result[2] = BLOCK_TYPE_COMPRESSED;
result[3] = (byte) (encLen >> 8);
result[4] = (byte) encLen;
result[5] = (byte) (origLen >> 8);
result[6] = (byte) origLen;
System.arraycopy(encData, encPtr, result, 7, encLen);
return new LZFChunk(result);
}
public static void writeCompressedHeader(int origLen, int encLen, OutputStream out, byte[] headerBuffer)
throws IOException {
headerBuffer[0] = BYTE_Z;
headerBuffer[1] = BYTE_V;
headerBuffer[2] = BLOCK_TYPE_COMPRESSED;
headerBuffer[3] = (byte) (encLen >> 8);
headerBuffer[4] = (byte) encLen;
headerBuffer[5] = (byte) (origLen >> 8);
headerBuffer[6] = (byte) origLen;
out.write(headerBuffer, 0, 7);
}
/**
* Factory method for constructing compressed chunk
*/
public static LZFChunk createNonCompressed(byte[] plainData, int ptr, int len) {
byte[] result = new byte[len + 5];
result[0] = BYTE_Z;
result[1] = BYTE_V;
result[2] = BLOCK_TYPE_NON_COMPRESSED;
result[3] = (byte) (len >> 8);
result[4] = (byte) len;
System.arraycopy(plainData, ptr, result, 5, len);
return new LZFChunk(result);
}
public static void writeNonCompressedHeader(int len, OutputStream out, byte[] headerBuffer)
throws IOException {
headerBuffer[0] = BYTE_Z;
headerBuffer[1] = BYTE_V;
headerBuffer[2] = BLOCK_TYPE_NON_COMPRESSED;
headerBuffer[3] = (byte) (len >> 8);
headerBuffer[4] = (byte) len;
out.write(headerBuffer, 0, 5);
}
public void setNext(LZFChunk next) {
_next = next;
}
public LZFChunk next() {
return _next;
}
public int length() {
return _data.length;
}
public byte[] getData() {
return _data;
}
public int copyTo(byte[] dst, int ptr) {
int len = _data.length;
System.arraycopy(_data, 0, dst, ptr, len);
return ptr + len;
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.lzf;
import com.ning.compress.BufferRecycler;
import com.ning.compress.lzf.ChunkDecoder;
import com.ning.compress.lzf.LZFChunk;
import org.elasticsearch.common.compress.CompressedStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
import java.io.InputStream;
/**
*/
public class LZFCompressedStreamInput extends CompressedStreamInput {
private final BufferRecycler recycler;
private final com.ning.compress.lzf.ChunkDecoder decoder;
// scratch area buffer
private byte[] inputBuffer;
public LZFCompressedStreamInput(StreamInput in, ChunkDecoder decoder) throws IOException {
super(in);
this.recycler = BufferRecycler.instance();
this.decoder = decoder;
this.uncompressed = recycler.allocDecodeBuffer(LZFChunk.MAX_CHUNK_LEN);
this.inputBuffer = recycler.allocInputBuffer(LZFChunk.MAX_CHUNK_LEN);
}
@Override
public void readHeader(StreamInput in) throws IOException {
// nothing to do here, each chunk has a header
}
@Override
public int uncompress(InputStream in, byte[] out) throws IOException {
return decoder.decodeChunk(in, inputBuffer, out);
}
@Override
protected void doClose() throws IOException {
byte[] buf = inputBuffer;
if (buf != null) {
inputBuffer = null;
recycler.releaseInputBuffer(buf);
}
buf = uncompressed;
if (buf != null) {
uncompressed = null;
recycler.releaseDecodeBuffer(uncompressed);
}
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.lzf;
import com.ning.compress.BufferRecycler;
import com.ning.compress.lzf.ChunkEncoder;
import com.ning.compress.lzf.LZFChunk;
import org.elasticsearch.common.compress.CompressedStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*/
public class LZFCompressedStreamOutput extends CompressedStreamOutput {
private final BufferRecycler recycler;
private final com.ning.compress.lzf.ChunkEncoder encoder;
public LZFCompressedStreamOutput(StreamOutput out) throws IOException {
super(out);
this.recycler = BufferRecycler.instance();
this.uncompressed = this.recycler.allocOutputBuffer(LZFChunk.MAX_CHUNK_LEN);
this.encoder = new ChunkEncoder(LZFChunk.MAX_CHUNK_LEN);
}
@Override
public void writeHeader(StreamOutput out) throws IOException {
// nothing to do here, each chunk has a header of its own
}
@Override
protected void compress(byte[] data, int offset, int len, StreamOutput out) throws IOException {
encoder.encodeAndWriteChunk(data, offset, len, out);
}
@Override
protected void doClose() throws IOException {
byte[] buf = uncompressed;
if (buf != null) {
uncompressed = null;
recycler.releaseOutputBuffer(buf);
}
encoder.close();
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.lzf;
import com.ning.compress.lzf.ChunkDecoder;
import com.ning.compress.lzf.LZFChunk;
import com.ning.compress.lzf.LZFEncoder;
import com.ning.compress.lzf.util.ChunkDecoderFactory;
import org.elasticsearch.common.compress.CompressedStreamInput;
import org.elasticsearch.common.compress.CompressedStreamOutput;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.Loggers;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
/**
*/
public class LZFCompressor implements Compressor {
public static final String TYPE = "lzf";
private ChunkDecoder decoder;
public LZFCompressor() {
this.decoder = ChunkDecoderFactory.optimalInstance();
Loggers.getLogger(LZFCompressor.class).debug("using [{}] decoder", this.decoder.getClass().getSimpleName());
}
@Override
public String type() {
return TYPE;
}
@Override
public boolean isCompressed(byte[] data, int offset, int length) {
return length >= 3 &&
data[offset] == LZFChunk.BYTE_Z &&
data[offset + 1] == LZFChunk.BYTE_V &&
(data[offset + 2] == LZFChunk.BLOCK_TYPE_COMPRESSED || data[offset + 2] == LZFChunk.BLOCK_TYPE_NON_COMPRESSED);
}
@Override
public boolean isCompressed(ChannelBuffer buffer) {
int offset = buffer.readerIndex();
return buffer.readableBytes() >= 3 &&
buffer.getByte(offset) == LZFChunk.BYTE_Z &&
buffer.getByte(offset + 1) == LZFChunk.BYTE_V &&
(buffer.getByte(offset + 2) == LZFChunk.BLOCK_TYPE_COMPRESSED || buffer.getByte(offset + 2) == LZFChunk.BLOCK_TYPE_NON_COMPRESSED);
}
@Override
public byte[] uncompress(byte[] data, int offset, int length) throws IOException {
return decoder.decode(data, offset, length);
}
@Override
public byte[] compress(byte[] data, int offset, int length) throws IOException {
return LZFEncoder.encode(data, offset, length);
}
@Override
public CompressedStreamInput streamInput(StreamInput in) throws IOException {
return new LZFCompressedStreamInput(in, decoder);
}
@Override
public CompressedStreamOutput streamOutput(StreamOutput out) throws IOException {
return new LZFCompressedStreamOutput(out);
}
}

View File

@ -1,55 +0,0 @@
/* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
package org.elasticsearch.common.compress.lzf;
import org.elasticsearch.common.compress.lzf.util.ChunkDecoderFactory;
import java.io.IOException;
/**
* Decoder that handles decoding of sequence of encoded LZF chunks,
* combining them into a single contiguous result byte array.
* As of version 0.9, this class has been mostly replaced by
* {@link ChunkDecoder}, although static methods are left here
* and may still be used.
* All static methods use {@link ChunkDecoderFactory#optimalInstance}
* to find actual {@link ChunkDecoder} instance to use.
*
* @author Tatu Saloranta (tatu@ning.com)
*/
public class LZFDecoder {
/*
///////////////////////////////////////////////////////////////////////
// Old API
///////////////////////////////////////////////////////////////////////
*/
public static byte[] decode(final byte[] inputBuffer) throws IOException {
return decode(inputBuffer, 0, inputBuffer.length);
}
public static byte[] decode(final byte[] inputBuffer, int offset, int length) throws IOException {
return ChunkDecoderFactory.optimalInstance().decode(inputBuffer, offset, length);
}
public static int decode(final byte[] inputBuffer, final byte[] targetBuffer) throws IOException {
return decode(inputBuffer, 0, inputBuffer.length, targetBuffer);
}
public static int decode(final byte[] sourceBuffer, int offset, int length, final byte[] targetBuffer) throws IOException {
return ChunkDecoderFactory.optimalInstance().decode(sourceBuffer, offset, length, targetBuffer);
}
public static int calculateUncompressedSize(byte[] data, int offset, int length) throws IOException {
return ChunkDecoder.calculateUncompressedSize(data, offset, length);
}
}

View File

@ -1,96 +0,0 @@
/* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
package org.elasticsearch.common.compress.lzf;
import java.io.IOException;
/**
* Encoder that handles splitting of input into chunks to encode,
* calls {@link ChunkEncoder} to compress individual chunks and
* combines resulting chunks into contiguous output byte array.
*
* @author tatu@ning.com
*/
public class LZFEncoder {
// Static methods only, no point in instantiating
private LZFEncoder() {
}
public static byte[] encode(byte[] data) throws IOException {
return encode(data, data.length);
}
/**
* Method for compressing given input data using LZF encoding and
* block structure (compatible with lzf command line utility).
* Result consists of a sequence of chunks.
*/
public static byte[] encode(byte[] data, int length) throws IOException {
return encode(data, 0, length);
}
/**
* Method for compressing given input data using LZF encoding and
* block structure (compatible with lzf command line utility).
* Result consists of a sequence of chunks.
*
* @since 0.8.1
*/
public static byte[] encode(byte[] data, int offset, int length) throws IOException {
ChunkEncoder enc = new ChunkEncoder(length, BufferRecycler.instance());
byte[] result = encode(enc, data, offset, length);
// important: may be able to reuse buffers
enc.close();
return result;
}
public static byte[] encode(ChunkEncoder enc, byte[] data, int length)
throws IOException {
return encode(enc, data, 0, length);
}
/**
* @since 0.8.1
*/
public static byte[] encode(ChunkEncoder enc, byte[] data, int offset, int length)
throws IOException {
int left = length;
int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left);
LZFChunk first = enc.encodeChunk(data, offset, chunkLen);
left -= chunkLen;
// shortcut: if it all fit in, no need to coalesce:
if (left < 1) {
return first.getData();
}
// otherwise need to get other chunks:
int resultBytes = first.length();
offset += chunkLen;
LZFChunk last = first;
do {
chunkLen = Math.min(left, LZFChunk.MAX_CHUNK_LEN);
LZFChunk chunk = enc.encodeChunk(data, offset, chunkLen);
offset += chunkLen;
left -= chunkLen;
resultBytes += chunk.length();
last.setNext(chunk);
last = chunk;
} while (left > 0);
// and then coalesce returns into single contiguous byte array
byte[] result = new byte[resultBytes];
int ptr = 0;
for (; first != null; first = first.next()) {
ptr = first.copyTo(result, ptr);
}
return result;
}
}

View File

@ -1,251 +0,0 @@
package org.elasticsearch.common.compress.lzf.impl;
import org.elasticsearch.common.compress.lzf.ChunkDecoder;
import org.elasticsearch.common.compress.lzf.LZFChunk;
import sun.misc.Unsafe;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
/**
* Highly optimized {@link ChunkDecoder} implementation that uses
* Sun JDK's Unsafe class (which may be included by other JDK's as well;
* IBM's apparently does).
* <p/>
* Credits for the idea go to Dain Sundstrom, who kindly suggested this use,
* and is all-around great source for optimization tips and tricks.
*/
@SuppressWarnings("restriction")
public class UnsafeChunkDecoder extends ChunkDecoder {
private static final Unsafe unsafe;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static final long BYTE_ARRAY_OFFSET = unsafe.arrayBaseOffset(byte[].class);
// private static final long SHORT_ARRAY_OFFSET = unsafe.arrayBaseOffset(short[].class);
// private static final long SHORT_ARRAY_STRIDE = unsafe.arrayIndexScale(short[].class);
public UnsafeChunkDecoder() {
}
@Override
public final int decodeChunk(final InputStream is, final byte[] inputBuffer, final byte[] outputBuffer)
throws IOException {
int bytesInOutput;
/* note: we do NOT read more than 5 bytes because otherwise might need to shuffle bytes
* for output buffer (could perhaps optimize in future?)
*/
int bytesRead = readHeader(is, inputBuffer);
if ((bytesRead < HEADER_BYTES)
|| inputBuffer[0] != LZFChunk.BYTE_Z || inputBuffer[1] != LZFChunk.BYTE_V) {
if (bytesRead == 0) { // probably fine, clean EOF
return -1;
}
throw new IOException("Corrupt input data, block did not start with 2 byte signature ('ZV') followed by type byte, 2-byte length)");
}
int type = inputBuffer[2];
int compLen = uint16(inputBuffer, 3);
if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed
readFully(is, false, outputBuffer, 0, compLen);
bytesInOutput = compLen;
} else { // compressed
readFully(is, true, inputBuffer, 0, 2 + compLen); // first 2 bytes are uncompressed length
int uncompLen = uint16(inputBuffer, 0);
decodeChunk(inputBuffer, 2, outputBuffer, 0, uncompLen);
bytesInOutput = uncompLen;
}
return bytesInOutput;
}
@Override
public final void decodeChunk(byte[] in, int inPos, byte[] out, int outPos, int outEnd)
throws IOException {
main_loop:
do {
int ctrl = in[inPos++] & 255;
while (ctrl < LZFChunk.MAX_LITERAL) { // literal run(s)
copyUpTo32(in, inPos, out, outPos, ctrl);
++ctrl;
inPos += ctrl;
outPos += ctrl;
if (outPos >= outEnd) {
break main_loop;
}
ctrl = in[inPos++] & 255;
}
// back reference
int len = ctrl >> 5;
ctrl = -((ctrl & 0x1f) << 8) - 1;
// short back reference? 2 bytes; run lengths of 2 - 8 bytes
if (len < 7) {
ctrl -= in[inPos++] & 255;
if (ctrl < -7) { // non-overlapping? can use efficient bulk copy
moveLong(out, outPos, outEnd, ctrl);
outPos += len + 2;
continue;
}
// otherwise, byte-by-byte
outPos = copyOverlappingShort(out, outPos, ctrl, len);
continue;
}
// long back reference: 3 bytes, length of up to 264 bytes
len = in[inPos++] & 255;
ctrl -= in[inPos++] & 255;
// First: ovelapping case can't use default handling, off line:
if ((ctrl + len) >= -9) {
outPos = copyOverlappingLong(out, outPos, ctrl, len);
continue;
}
// but non-overlapping is simple
len += 9;
if (len <= 32) {
copyUpTo32(out, outPos + ctrl, out, outPos, len - 1);
} else {
System.arraycopy(out, outPos + ctrl, out, outPos, len);
}
outPos += len;
} while (outPos < outEnd);
// sanity check to guard against corrupt data:
if (outPos != outEnd)
throw new IOException("Corrupt data: overrun in decompress, input offset " + inPos + ", output offset " + outPos);
}
/*
///////////////////////////////////////////////////////////////////////
// Internal methods
///////////////////////////////////////////////////////////////////////
*/
private final int copyOverlappingShort(final byte[] out, int outPos, final int offset, int len) {
out[outPos] = out[outPos++ + offset];
out[outPos] = out[outPos++ + offset];
switch (len) {
case 6:
out[outPos] = out[outPos++ + offset];
case 5:
out[outPos] = out[outPos++ + offset];
case 4:
out[outPos] = out[outPos++ + offset];
case 3:
out[outPos] = out[outPos++ + offset];
case 2:
out[outPos] = out[outPos++ + offset];
case 1:
out[outPos] = out[outPos++ + offset];
}
return outPos;
}
private final static int copyOverlappingLong(final byte[] out, int outPos, final int offset, int len) {
// otherwise manual copy: so first just copy 9 bytes we know are needed
out[outPos] = out[outPos++ + offset];
out[outPos] = out[outPos++ + offset];
out[outPos] = out[outPos++ + offset];
out[outPos] = out[outPos++ + offset];
out[outPos] = out[outPos++ + offset];
out[outPos] = out[outPos++ + offset];
out[outPos] = out[outPos++ + offset];
out[outPos] = out[outPos++ + offset];
out[outPos] = out[outPos++ + offset];
// then loop
// Odd: after extensive profiling, looks like magic number
// for unrolling is 4: with 8 performance is worse (even
// bit less than with no unrolling).
len += outPos;
final int end = len - 3;
while (outPos < end) {
out[outPos] = out[outPos++ + offset];
out[outPos] = out[outPos++ + offset];
out[outPos] = out[outPos++ + offset];
out[outPos] = out[outPos++ + offset];
}
switch (len - outPos) {
case 3:
out[outPos] = out[outPos++ + offset];
case 2:
out[outPos] = out[outPos++ + offset];
case 1:
out[outPos] = out[outPos++ + offset];
}
return outPos;
}
/* Note: 'delta' is negative (back ref); dataEnd is the first location AFTER
* end of expected uncompressed data (i.e. end marker)
*/
private final static void moveLong(byte[] data, int resultOffset, int dataEnd, int delta) {
if ((resultOffset + 8) < dataEnd) {
final long rawOffset = BYTE_ARRAY_OFFSET + resultOffset;
long value = unsafe.getLong(data, rawOffset + delta);
unsafe.putLong(data, rawOffset, value);
return;
}
System.arraycopy(data, resultOffset + delta, data, resultOffset, data.length - resultOffset);
}
private final static void copyUpTo32(byte[] in, int inputIndex, byte[] out, int outputIndex, int lengthMinusOne) {
if ((outputIndex + 32) > out.length) {
System.arraycopy(in, inputIndex, out, outputIndex, lengthMinusOne + 1);
return;
}
long inPtr = BYTE_ARRAY_OFFSET + inputIndex;
long outPtr = BYTE_ARRAY_OFFSET + outputIndex;
switch (lengthMinusOne >>> 3) {
case 3: {
long value = unsafe.getLong(in, inPtr);
unsafe.putLong(out, outPtr, value);
inPtr += 8;
outPtr += 8;
value = unsafe.getLong(in, inPtr);
unsafe.putLong(out, outPtr, value);
inPtr += 8;
outPtr += 8;
value = unsafe.getLong(in, inPtr);
unsafe.putLong(out, outPtr, value);
inPtr += 8;
outPtr += 8;
value = unsafe.getLong(in, inPtr);
unsafe.putLong(out, outPtr, value);
}
break;
case 2: {
long value = unsafe.getLong(in, inPtr);
unsafe.putLong(out, outPtr, value);
inPtr += 8;
outPtr += 8;
value = unsafe.getLong(in, inPtr);
unsafe.putLong(out, outPtr, value);
inPtr += 8;
outPtr += 8;
value = unsafe.getLong(in, inPtr);
unsafe.putLong(out, outPtr, value);
}
break;
case 1: {
long value = unsafe.getLong(in, inPtr);
unsafe.putLong(out, outPtr, value);
inPtr += 8;
outPtr += 8;
value = unsafe.getLong(in, inPtr);
unsafe.putLong(out, outPtr, value);
}
break;
case 0: {
long value = unsafe.getLong(in, inPtr);
unsafe.putLong(out, outPtr, value);
}
}
}
}

View File

@ -1,274 +0,0 @@
package org.elasticsearch.common.compress.lzf.impl;
import org.elasticsearch.common.compress.lzf.ChunkDecoder;
import org.elasticsearch.common.compress.lzf.LZFChunk;
import java.io.IOException;
import java.io.InputStream;
/**
* Safe {@link ChunkDecoder} implementation that can be used on any
* platform.
*/
public class VanillaChunkDecoder extends ChunkDecoder {
public VanillaChunkDecoder() {
}
@Override
public final int decodeChunk(final InputStream is, final byte[] inputBuffer, final byte[] outputBuffer)
throws IOException {
int bytesInOutput;
/* note: we do NOT read more than 5 bytes because otherwise might need to shuffle bytes
* for output buffer (could perhaps optimize in future?)
*/
int bytesRead = readHeader(is, inputBuffer);
if ((bytesRead < HEADER_BYTES)
|| inputBuffer[0] != LZFChunk.BYTE_Z || inputBuffer[1] != LZFChunk.BYTE_V) {
if (bytesRead == 0) { // probably fine, clean EOF
return -1;
}
throw new IOException("Corrupt input data, block did not start with 2 byte signature ('ZV') followed by type byte, 2-byte length)");
}
int type = inputBuffer[2];
int compLen = uint16(inputBuffer, 3);
if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed
readFully(is, false, outputBuffer, 0, compLen);
bytesInOutput = compLen;
} else { // compressed
readFully(is, true, inputBuffer, 0, 2 + compLen); // first 2 bytes are uncompressed length
int uncompLen = uint16(inputBuffer, 0);
decodeChunk(inputBuffer, 2, outputBuffer, 0, uncompLen);
bytesInOutput = uncompLen;
}
return bytesInOutput;
}
@Override
public final void decodeChunk(byte[] in, int inPos, byte[] out, int outPos, int outEnd)
throws IOException {
do {
int ctrl = in[inPos++] & 255;
if (ctrl < LZFChunk.MAX_LITERAL) { // literal run
switch (ctrl) {
case 31:
out[outPos++] = in[inPos++];
case 30:
out[outPos++] = in[inPos++];
case 29:
out[outPos++] = in[inPos++];
case 28:
out[outPos++] = in[inPos++];
case 27:
out[outPos++] = in[inPos++];
case 26:
out[outPos++] = in[inPos++];
case 25:
out[outPos++] = in[inPos++];
case 24:
out[outPos++] = in[inPos++];
case 23:
out[outPos++] = in[inPos++];
case 22:
out[outPos++] = in[inPos++];
case 21:
out[outPos++] = in[inPos++];
case 20:
out[outPos++] = in[inPos++];
case 19:
out[outPos++] = in[inPos++];
case 18:
out[outPos++] = in[inPos++];
case 17:
out[outPos++] = in[inPos++];
case 16:
out[outPos++] = in[inPos++];
case 15:
out[outPos++] = in[inPos++];
case 14:
out[outPos++] = in[inPos++];
case 13:
out[outPos++] = in[inPos++];
case 12:
out[outPos++] = in[inPos++];
case 11:
out[outPos++] = in[inPos++];
case 10:
out[outPos++] = in[inPos++];
case 9:
out[outPos++] = in[inPos++];
case 8:
out[outPos++] = in[inPos++];
case 7:
out[outPos++] = in[inPos++];
case 6:
out[outPos++] = in[inPos++];
case 5:
out[outPos++] = in[inPos++];
case 4:
out[outPos++] = in[inPos++];
case 3:
out[outPos++] = in[inPos++];
case 2:
out[outPos++] = in[inPos++];
case 1:
out[outPos++] = in[inPos++];
case 0:
out[outPos++] = in[inPos++];
}
continue;
}
// back reference
int len = ctrl >> 5;
ctrl = -((ctrl & 0x1f) << 8) - 1;
if (len < 7) { // 2 bytes; length of 3 - 8 bytes
ctrl -= in[inPos++] & 255;
out[outPos] = out[outPos++ + ctrl];
out[outPos] = out[outPos++ + ctrl];
switch (len) {
case 6:
out[outPos] = out[outPos++ + ctrl];
case 5:
out[outPos] = out[outPos++ + ctrl];
case 4:
out[outPos] = out[outPos++ + ctrl];
case 3:
out[outPos] = out[outPos++ + ctrl];
case 2:
out[outPos] = out[outPos++ + ctrl];
case 1:
out[outPos] = out[outPos++ + ctrl];
}
continue;
}
// long version (3 bytes, length of up to 264 bytes)
len = in[inPos++] & 255;
ctrl -= in[inPos++] & 255;
// First: if there is no overlap, can just use arraycopy:
if ((ctrl + len) < -9) {
len += 9;
if (len <= 32) {
copyUpTo32WithSwitch(out, outPos + ctrl, out, outPos, len - 1);
} else {
System.arraycopy(out, outPos + ctrl, out, outPos, len);
}
outPos += len;
continue;
}
// otherwise manual copy: so first just copy 9 bytes we know are needed
out[outPos] = out[outPos++ + ctrl];
out[outPos] = out[outPos++ + ctrl];
out[outPos] = out[outPos++ + ctrl];
out[outPos] = out[outPos++ + ctrl];
out[outPos] = out[outPos++ + ctrl];
out[outPos] = out[outPos++ + ctrl];
out[outPos] = out[outPos++ + ctrl];
out[outPos] = out[outPos++ + ctrl];
out[outPos] = out[outPos++ + ctrl];
// then loop
// Odd: after extensive profiling, looks like magic number
// for unrolling is 4: with 8 performance is worse (even
// bit less than with no unrolling).
len += outPos;
final int end = len - 3;
while (outPos < end) {
out[outPos] = out[outPos++ + ctrl];
out[outPos] = out[outPos++ + ctrl];
out[outPos] = out[outPos++ + ctrl];
out[outPos] = out[outPos++ + ctrl];
}
switch (len - outPos) {
case 3:
out[outPos] = out[outPos++ + ctrl];
case 2:
out[outPos] = out[outPos++ + ctrl];
case 1:
out[outPos] = out[outPos++ + ctrl];
}
} while (outPos < outEnd);
// sanity check to guard against corrupt data:
if (outPos != outEnd)
throw new IOException("Corrupt data: overrun in decompress, input offset " + inPos + ", output offset " + outPos);
}
/*
///////////////////////////////////////////////////////////////////////
// Internal methods
///////////////////////////////////////////////////////////////////////
*/
protected static final void copyUpTo32WithSwitch(byte[] in, int inPos, byte[] out, int outPos,
int lengthMinusOne) {
switch (lengthMinusOne) {
case 31:
out[outPos++] = in[inPos++];
case 30:
out[outPos++] = in[inPos++];
case 29:
out[outPos++] = in[inPos++];
case 28:
out[outPos++] = in[inPos++];
case 27:
out[outPos++] = in[inPos++];
case 26:
out[outPos++] = in[inPos++];
case 25:
out[outPos++] = in[inPos++];
case 24:
out[outPos++] = in[inPos++];
case 23:
out[outPos++] = in[inPos++];
case 22:
out[outPos++] = in[inPos++];
case 21:
out[outPos++] = in[inPos++];
case 20:
out[outPos++] = in[inPos++];
case 19:
out[outPos++] = in[inPos++];
case 18:
out[outPos++] = in[inPos++];
case 17:
out[outPos++] = in[inPos++];
case 16:
out[outPos++] = in[inPos++];
case 15:
out[outPos++] = in[inPos++];
case 14:
out[outPos++] = in[inPos++];
case 13:
out[outPos++] = in[inPos++];
case 12:
out[outPos++] = in[inPos++];
case 11:
out[outPos++] = in[inPos++];
case 10:
out[outPos++] = in[inPos++];
case 9:
out[outPos++] = in[inPos++];
case 8:
out[outPos++] = in[inPos++];
case 7:
out[outPos++] = in[inPos++];
case 6:
out[outPos++] = in[inPos++];
case 5:
out[outPos++] = in[inPos++];
case 4:
out[outPos++] = in[inPos++];
case 3:
out[outPos++] = in[inPos++];
case 2:
out[outPos++] = in[inPos++];
case 1:
out[outPos++] = in[inPos++];
case 0:
out[outPos++] = in[inPos++];
}
}
}

View File

@ -1,27 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Copy of LZF code from ning compress based on 0.7 version.
*
* Changes:
*
* 1.
*/
package org.elasticsearch.common.compress.lzf;

View File

@ -1,75 +0,0 @@
package org.elasticsearch.common.compress.lzf.util;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.compress.lzf.ChunkDecoder;
import org.elasticsearch.common.compress.lzf.impl.UnsafeChunkDecoder;
import org.elasticsearch.common.compress.lzf.impl.VanillaChunkDecoder;
/**
* Simple helper class used for loading
* {@link ChunkDecoder} implementations, based on criteria
* such as "fastest available".
* <p/>
* Yes, it looks butt-ugly, but does the job. Nonetheless, if anyone
* has lipstick for this pig, let me know.
*
* @since 0.9
*/
public class ChunkDecoderFactory {
private final static ChunkDecoderFactory _instance;
static {
Class<?> impl = null;
try {
// first, try loading optimal one, which uses Sun JDK Unsafe...
impl = (Class<?>) Class.forName(UnsafeChunkDecoder.class.getName());
} catch (Throwable t) {
}
if (impl == null) {
impl = VanillaChunkDecoder.class;
}
// ES: Seems like: https://github.com/ning/compress/issues/13, is fixed, so enable by defualt, but only from 0.19
if (!Booleans.parseBoolean(System.getProperty("compress.lzf.decoder.optimized"), true)) {
impl = VanillaChunkDecoder.class;
}
_instance = new ChunkDecoderFactory(impl);
}
private final Class<? extends ChunkDecoder> _implClass;
@SuppressWarnings("unchecked")
private ChunkDecoderFactory(Class<?> imp) {
_implClass = (Class<? extends ChunkDecoder>) imp;
}
/*
///////////////////////////////////////////////////////////////////////
// Public API
///////////////////////////////////////////////////////////////////////
*/
/**
* Method to use for getting decompressor instance that uses the most optimal
* available methods for underlying data access. It should be safe to call
* this method as implementations are dynamically loaded; however, on some
* non-standard platforms it may be necessary to either directly load
* instances, or use {@link #safeInstance()}.
*/
public static ChunkDecoder optimalInstance() {
try {
return _instance._implClass.newInstance();
} catch (Exception e) {
throw new IllegalStateException("Failed to load a ChunkDecoder instance (" + e.getClass().getName() + "): "
+ e.getMessage(), e);
}
}
/**
* Method that can be used to ensure that a "safe" decompressor instance is loaded.
* Safe here means that it should work on any and all Java platforms.
*/
public static ChunkDecoder safeInstance() {
// this will always succeed loading; no need to use dynamic class loading or instantiation
return new VanillaChunkDecoder();
}
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.compress.Compressor;
import java.io.IOException;
import java.lang.ref.SoftReference;
@ -30,11 +32,9 @@ public class CachedStreamInput {
static class Entry {
char[] chars = new char[80];
final HandlesStreamInput handles;
final LZFStreamInput lzf;
Entry(HandlesStreamInput handles, LZFStreamInput lzf) {
Entry(HandlesStreamInput handles) {
this.handles = handles;
this.lzf = lzf;
}
}
@ -45,8 +45,7 @@ public class CachedStreamInput {
Entry entry = ref == null ? null : ref.get();
if (entry == null) {
HandlesStreamInput handles = new HandlesStreamInput();
LZFStreamInput lzf = new LZFStreamInput(null, true);
entry = new Entry(handles, lzf);
entry = new Entry(handles);
cache.set(new SoftReference<Entry>(entry));
}
return entry;
@ -56,10 +55,8 @@ public class CachedStreamInput {
cache.remove();
}
public static LZFStreamInput cachedLzf(StreamInput in) throws IOException {
LZFStreamInput lzf = instance().lzf;
lzf.reset(in);
return lzf;
public static StreamInput compressed(Compressor compressor, StreamInput in) throws IOException {
return compressor.streamInput(in);
}
public static HandlesStreamInput cachedHandles(StreamInput in) {
@ -68,10 +65,9 @@ public class CachedStreamInput {
return handles;
}
public static HandlesStreamInput cachedHandlesLzf(StreamInput in) throws IOException {
public static HandlesStreamInput cachedHandlesCompressed(Compressor compressor, StreamInput in) throws IOException {
Entry entry = instance();
entry.lzf.reset(in);
entry.handles.reset(entry.lzf);
entry.handles.reset(compressor.streamInput(in));
return entry.handles;
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.common.io.stream;
import jsr166y.LinkedTransferQueue;
import org.elasticsearch.common.compress.Compressor;
import java.io.IOException;
import java.lang.ref.SoftReference;
@ -40,22 +41,12 @@ public class CachedStreamOutput {
public static class Entry {
private final BytesStreamOutput bytes;
private final HandlesStreamOutput handles;
private LZFStreamOutput lzf;
Entry(BytesStreamOutput bytes, HandlesStreamOutput handles) {
this.bytes = bytes;
this.handles = handles;
}
// lazily initialize LZF, so we won't allocate it if we don't do
// any compression
private LZFStreamOutput lzf() {
if (lzf == null) {
lzf = new LZFStreamOutput(bytes, true);
}
return lzf;
}
/**
* Returns the underlying bytes without any resetting.
*/
@ -71,20 +62,20 @@ public class CachedStreamOutput {
return bytes;
}
public LZFStreamOutput cachedLZFBytes() throws IOException {
LZFStreamOutput lzf = lzf();
lzf.reset();
return lzf;
}
public HandlesStreamOutput cachedHandlesLzfBytes() throws IOException {
LZFStreamOutput lzf = lzf();
handles.reset(lzf);
public StreamOutput cachedHandles() throws IOException {
handles.reset(bytes);
return handles;
}
public HandlesStreamOutput cachedHandlesBytes() throws IOException {
handles.reset(bytes);
public StreamOutput cachedBytes(Compressor compressor) throws IOException {
bytes.reset();
return compressor.streamOutput(bytes);
}
public StreamOutput cachedHandles(Compressor compressor) throws IOException {
bytes.reset();
StreamOutput compressed = compressor.streamOutput(bytes);
handles.reset(compressed);
return handles;
}
}

View File

@ -1,236 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.compress.lzf.BufferRecycler;
import org.elasticsearch.common.compress.lzf.ChunkDecoder;
import org.elasticsearch.common.compress.lzf.LZFChunk;
import org.elasticsearch.common.compress.lzf.util.ChunkDecoderFactory;
import java.io.EOFException;
import java.io.IOException;
/**
*
*/
public class LZFStreamInput extends StreamInput {
/**
* Underlying decoder in use.
*/
private final ChunkDecoder _decoder;
/**
* Object that handles details of buffer recycling
*/
private final BufferRecycler _recycler;
/**
* stream to be decompressed
*/
protected StreamInput inputStream;
/**
* Flag that indicates if we have already called 'inputStream.close()'
* (to avoid calling it multiple times)
*/
protected boolean inputStreamClosed;
/**
* Flag that indicates whether we force full reads (reading of as many
* bytes as requested), or 'optimal' reads (up to as many as available,
* but at least one). Default is false, meaning that 'optimal' read
* is used.
*/
protected boolean _cfgFullReads = true; // ES: ALWAYS TRUE since we need to throw EOF when doing readBytes
/* the current buffer of compressed bytes (from which to decode) */
private byte[] _inputBuffer;
/* the buffer of uncompressed bytes from which content is read */
private byte[] _decodedBytes;
/* The current position (next char to output) in the uncompressed bytes buffer. */
private int bufferPosition = 0;
/* Length of the current uncompressed bytes buffer */
private int bufferLength = 0;
// ES: added to support never closing just resetting
private final boolean cached;
public LZFStreamInput(StreamInput in, boolean cached) {
super();
this.cached = cached;
if (cached) {
_recycler = new BufferRecycler();
} else {
_recycler = BufferRecycler.instance();
}
_decoder = ChunkDecoderFactory.optimalInstance();
inputStream = in;
inputStreamClosed = false;
_inputBuffer = _recycler.allocInputBuffer(LZFChunk.MAX_CHUNK_LEN);
_decodedBytes = _recycler.allocDecodeBuffer(LZFChunk.MAX_CHUNK_LEN);
}
/**
* Method is overridden to report number of bytes that can now be read
* from decoded data buffer, without reading bytes from the underlying
* stream.
* Never throws an exception; returns number of bytes available without
* further reads from underlying source; -1 if stream has been closed, or
* 0 if an actual read (and possible blocking) is needed to find out.
*/
@Override
public int available() {
// if closed, return -1;
if (inputStreamClosed) {
return -1;
}
int left = (bufferLength - bufferPosition);
return (left <= 0) ? 0 : left;
}
@Override
public int read() throws IOException {
if (!readyBuffer()) {
return -1;
}
return _decodedBytes[bufferPosition++] & 255;
}
@Override
public int read(final byte[] buffer, int offset, int length) throws IOException {
if (length < 1) {
return 0;
}
if (!readyBuffer()) {
return -1;
}
// First let's read however much data we happen to have...
int chunkLength = Math.min(bufferLength - bufferPosition, length);
System.arraycopy(_decodedBytes, bufferPosition, buffer, offset, chunkLength);
bufferPosition += chunkLength;
if (chunkLength == length || !_cfgFullReads) {
return chunkLength;
}
// Need more data, then
int totalRead = chunkLength;
do {
offset += chunkLength;
if (!readyBuffer()) {
break;
}
chunkLength = Math.min(bufferLength - bufferPosition, (length - totalRead));
System.arraycopy(_decodedBytes, bufferPosition, buffer, offset, chunkLength);
bufferPosition += chunkLength;
totalRead += chunkLength;
} while (totalRead < length);
return totalRead;
}
@Override
public byte readByte() throws IOException {
if (!readyBuffer()) {
throw new EOFException();
}
return _decodedBytes[bufferPosition++];
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
int result = read(b, offset, len);
if (result < len) {
throw new EOFException();
}
}
@Override
public void reset() throws IOException {
this.bufferPosition = 0;
this.bufferLength = 0;
inputStream.reset();
}
public void reset(StreamInput in) throws IOException {
this.inputStream = in;
this.bufferPosition = 0;
this.bufferLength = 0;
}
/**
* Expert!, resets to buffer start, without the need to decompress it again.
*/
public void resetToBufferStart() {
this.bufferPosition = 0;
}
@Override
public void close() throws IOException {
if (cached) {
reset();
return;
}
bufferPosition = bufferLength = 0;
byte[] buf = _inputBuffer;
if (buf != null) {
_inputBuffer = null;
_recycler.releaseInputBuffer(buf);
}
buf = _decodedBytes;
if (buf != null) {
_decodedBytes = null;
_recycler.releaseDecodeBuffer(buf);
}
if (!inputStreamClosed) {
inputStreamClosed = true;
inputStream.close();
}
}
/*
///////////////////////////////////////////////////////////////////////
// Internal methods
///////////////////////////////////////////////////////////////////////
*/
/**
* Fill the uncompressed bytes buffer by reading the underlying inputStream.
*
* @throws IOException
*/
protected boolean readyBuffer() throws IOException {
if (bufferPosition < bufferLength) {
return true;
}
if (inputStreamClosed) {
return false;
}
bufferLength = _decoder.decodeChunk(inputStream, _inputBuffer, _decodedBytes);
if (bufferLength < 0) {
return false;
}
bufferPosition = 0;
return (bufferPosition < bufferLength);
}
}

View File

@ -1,173 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.compress.lzf.BufferRecycler;
import org.elasticsearch.common.compress.lzf.ChunkEncoder;
import org.elasticsearch.common.compress.lzf.LZFChunk;
import java.io.IOException;
/**
*
*/
public class LZFStreamOutput extends StreamOutput {
private static int OUTPUT_BUFFER_SIZE = LZFChunk.MAX_CHUNK_LEN;
private final ChunkEncoder _encoder;
private final BufferRecycler _recycler;
protected StreamOutput _outputStream;
protected byte[] _outputBuffer;
protected int _position = 0;
/**
* Configuration setting that governs whether basic 'flush()' should
* first complete a block or not.
* <p/>
* Default value is 'true'
*
* @since 0.8
*/
protected boolean _cfgFinishBlockOnFlush = true;
private final boolean neverClose;
public LZFStreamOutput(StreamOutput out, boolean neverClose) {
this.neverClose = neverClose;
_recycler = neverClose ? new BufferRecycler() : BufferRecycler.instance();
_encoder = new ChunkEncoder(OUTPUT_BUFFER_SIZE, _recycler);
_outputStream = out;
_outputBuffer = _recycler.allocOutputBuffer(OUTPUT_BUFFER_SIZE);
}
@Override
public void write(final int singleByte) throws IOException {
if (_position >= _outputBuffer.length) {
writeCompressedBlock();
}
_outputBuffer[_position++] = (byte) singleByte;
}
@Override
public void writeByte(byte b) throws IOException {
if (_position >= _outputBuffer.length) {
writeCompressedBlock();
}
_outputBuffer[_position++] = b;
}
@Override
public void writeBytes(byte[] buffer, int offset, int length) throws IOException {
// ES, check if length is 0, and don't write in this case
if (length == 0) {
return;
}
final int BUFFER_LEN = _outputBuffer.length;
// simple case first: buffering only (for trivially short writes)
int free = BUFFER_LEN - _position;
if (free >= length) {
System.arraycopy(buffer, offset, _outputBuffer, _position, length);
_position += length;
return;
}
// otherwise, copy whatever we can, flush
System.arraycopy(buffer, offset, _outputBuffer, _position, free);
offset += free;
length -= free;
_position += free;
writeCompressedBlock();
// then write intermediate full block, if any, without copying:
while (length >= BUFFER_LEN) {
_encoder.encodeAndWriteChunk(buffer, offset, BUFFER_LEN, _outputStream);
offset += BUFFER_LEN;
length -= BUFFER_LEN;
}
// and finally, copy leftovers in buffer, if any
if (length > 0) {
System.arraycopy(buffer, offset, _outputBuffer, 0, length);
}
_position = length;
}
@Override
public void flush() throws IOException {
if (_cfgFinishBlockOnFlush && _position > 0) {
writeCompressedBlock();
}
_outputStream.flush();
}
@Override
public void close() throws IOException {
if (_position > 0) {
writeCompressedBlock();
}
if (neverClose) {
// just reset here the LZF stream (not the underlying stream, since we might want to read from it)
_position = 0;
return;
}
_outputStream.flush();
_encoder.close();
byte[] buf = _outputBuffer;
if (buf != null) {
_outputBuffer = null;
_recycler.releaseOutputBuffer(buf);
}
_outputStream.close();
}
@Override
public void reset() throws IOException {
_position = 0;
_outputStream.reset();
}
public void reset(StreamOutput out) throws IOException {
this._outputStream = out;
reset();
}
public StreamOutput wrappedOut() {
return this._outputStream;
}
/**
* Compress and write the current block to the OutputStream
*/
private void writeCompressedBlock() throws IOException {
int left = _position;
_position = 0;
int offset = 0;
do {
int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left);
_encoder.encodeAndWriteChunk(_outputBuffer, offset, chunkLen, _outputStream);
offset += chunkLen;
left -= chunkLen;
} while (left > 0);
}
}

View File

@ -23,10 +23,10 @@ import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.compress.CompressedStreamInput;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.LZFStreamInput;
import java.io.IOException;
import java.util.ArrayList;
@ -40,12 +40,12 @@ import java.util.Map;
public class XContentHelper {
public static XContentParser createParser(byte[] data, int offset, int length) throws IOException {
if (LZF.isCompressed(data, offset, length)) {
BytesStreamInput siBytes = new BytesStreamInput(data, offset, length, false);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
XContentType contentType = XContentFactory.xContentType(siLzf);
siLzf.resetToBufferStart();
return XContentFactory.xContent(contentType).createParser(siLzf);
Compressor compressor = CompressorFactory.compressor(data, offset, length);
if (compressor != null) {
CompressedStreamInput compressedInput = compressor.streamInput(new BytesStreamInput(data, offset, length, false));
XContentType contentType = XContentFactory.xContentType(compressedInput);
compressedInput.resetToBufferStart();
return XContentFactory.xContent(contentType).createParser(compressedInput);
} else {
return XContentFactory.xContent(data, offset, length).createParser(data, offset, length);
}
@ -59,12 +59,12 @@ public class XContentHelper {
try {
XContentParser parser;
XContentType contentType;
if (LZF.isCompressed(data, offset, length)) {
BytesStreamInput siBytes = new BytesStreamInput(data, offset, length, false);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
contentType = XContentFactory.xContentType(siLzf);
siLzf.resetToBufferStart();
parser = XContentFactory.xContent(contentType).createParser(siLzf);
Compressor compressor = CompressorFactory.compressor(data, offset, length);
if (compressor != null) {
CompressedStreamInput compressedStreamInput = compressor.streamInput(new BytesStreamInput(data, offset, length, false));
contentType = XContentFactory.xContentType(compressedStreamInput);
compressedStreamInput.resetToBufferStart();
parser = XContentFactory.xContent(contentType).createParser(compressedStreamInput);
} else {
contentType = XContentFactory.xContentType(data, offset, length);
parser = XContentFactory.xContent(contentType).createParser(data, offset, length);

View File

@ -43,7 +43,6 @@ import org.elasticsearch.transport.*;
import java.io.IOException;
import java.net.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@ -269,12 +268,13 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
synchronized (sendMutex) {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
HandlesStreamOutput out = cachedEntry.cachedHandlesBytes();
StreamOutput out = cachedEntry.cachedHandles();
out.writeBytes(INTERNAL_HEADER);
Version.writeVersion(Version.CURRENT, out);
out.writeInt(id);
clusterName.writeTo(out);
nodesProvider.nodes().localNode().writeTo(out);
out.close();
datagramPacketSend.setData(cachedEntry.bytes().copiedByteArray());
multicastSocket.send(datagramPacketSend);
if (logger.isTraceEnabled()) {

View File

@ -23,6 +23,8 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
@ -67,9 +69,9 @@ public class PublishClusterStateAction extends AbstractComponent {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
byte[] clusterStateInBytes;
try {
HandlesStreamOutput stream = cachedEntry.cachedHandlesLzfBytes();
StreamOutput stream = cachedEntry.cachedHandles(CompressorFactory.defaultCompressor());
ClusterState.Builder.writeTo(clusterState, stream);
stream.flush();
stream.close();
clusterStateInBytes = cachedEntry.bytes().copiedByteArray();
} catch (Exception e) {
logger.warn("failed to serialize cluster_state before publishing it to nodes", e);
@ -129,7 +131,14 @@ public class PublishClusterStateAction extends AbstractComponent {
@Override
public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception {
StreamInput in = CachedStreamInput.cachedHandlesLzf(new BytesStreamInput(request.clusterStateInBytes.bytes(), request.clusterStateInBytes.offset(), request.clusterStateInBytes.length(), false));
Compressor compressor = CompressorFactory.compressor(request.clusterStateInBytes.bytes(), request.clusterStateInBytes.offset(), request.clusterStateInBytes.length());
BytesStreamInput bytes = new BytesStreamInput(request.clusterStateInBytes.bytes(), request.clusterStateInBytes.offset(), request.clusterStateInBytes.length(), false);
StreamInput in;
if (compressor != null) {
in = CachedStreamInput.cachedHandlesCompressed(compressor, bytes);
} else {
in = CachedStreamInput.cachedHandles(bytes);
}
ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
listener.onNewClusterState(clusterState);
channel.sendResponse(VoidStreamable.INSTANCE);

View File

@ -26,8 +26,9 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.*;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.*;
@ -147,7 +148,7 @@ public abstract class BlobStoreGateway extends SharedStorageGateway {
try {
StreamOutput streamOutput;
if (compress) {
streamOutput = cachedEntry.cachedLZFBytes();
streamOutput = cachedEntry.cachedBytes(CompressorFactory.defaultCompressor());
} else {
streamOutput = cachedEntry.cachedBytes();
}
@ -206,13 +207,7 @@ public abstract class BlobStoreGateway extends SharedStorageGateway {
private MetaData readMetaData(byte[] data) throws IOException {
XContentParser parser = null;
try {
if (LZF.isCompressed(data)) {
BytesStreamInput siBytes = new BytesStreamInput(data, false);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf);
} else {
parser = XContentFactory.xContent(XContentType.JSON).createParser(data);
}
parser = XContentHelper.createParser(data, 0, data.length);
return MetaData.Builder.fromXContent(parser);
} finally {
if (parser != null) {

View File

@ -27,14 +27,10 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.LZFStreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.*;
@ -402,13 +398,7 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
try {
Map<ShardId, ShardStateInfo> shardsState = Maps.newHashMap();
if (LZF.isCompressed(data)) {
BytesStreamInput siBytes = new BytesStreamInput(data, false);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf);
} else {
parser = XContentFactory.xContent(XContentType.JSON).createParser(data);
}
parser = XContentHelper.createParser(data, 0, data.length);
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();

View File

@ -23,8 +23,7 @@ import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -168,14 +167,12 @@ public class GetResult implements Streamable, Iterable<GetField>, ToXContent {
* Returns bytes reference, also un compress the source if needed.
*/
public BytesHolder sourceRef() {
if (LZF.isCompressed(source.bytes(), source.offset(), source.length())) {
try {
this.source = new BytesHolder(LZFDecoder.decode(source.bytes(), source.offset(), source.length()));
} catch (IOException e) {
throw new ElasticSearchParseException("failed to decompress source", e);
}
try {
this.source = CompressorFactory.uncompressIfNeeded(this.source);
return this.source;
} catch (IOException e) {
throw new ElasticSearchParseException("failed to decompress source", e);
}
return this.source;
}
/**

View File

@ -23,11 +23,11 @@ import org.apache.lucene.document.Field;
import org.apache.lucene.document.Fieldable;
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.LZFStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
@ -127,14 +127,14 @@ public class BinaryFieldMapper extends AbstractFieldMapper<byte[]> {
@Override
public byte[] value(Fieldable field) {
byte[] value = field.getBinaryValue();
if (value != null && LZF.isCompressed(value)) {
try {
return LZFDecoder.decode(value);
} catch (IOException e) {
throw new ElasticSearchParseException("failed to decompress source", e);
}
if (value == null) {
return value;
}
try {
return CompressorFactory.uncompressIfNeeded(new BytesHolder(value)).bytes();
} catch (IOException e) {
throw new ElasticSearchParseException("failed to decompress source", e);
}
return value;
}
@Override
@ -167,12 +167,12 @@ public class BinaryFieldMapper extends AbstractFieldMapper<byte[]> {
return null;
} else {
value = context.parser().binaryValue();
if (compress != null && compress && !LZF.isCompressed(value, 0, value.length)) {
if (compress != null && compress && !CompressorFactory.isCompressed(value, 0, value.length)) {
if (compressThreshold == -1 || value.length > compressThreshold) {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
LZFStreamOutput streamOutput = cachedEntry.cachedLZFBytes();
StreamOutput streamOutput = cachedEntry.cachedBytes(CompressorFactory.defaultCompressor());
streamOutput.writeBytes(value, 0, value.length);
streamOutput.flush();
streamOutput.close();
// we copy over the byte array, since we need to push back the cached entry
// TODO, we we had a handle into when we are done with parsing, then we push back then and not copy over bytes
value = cachedEntry.bytes().copiedByteArray();

View File

@ -24,11 +24,15 @@ import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Fieldable;
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.common.compress.CompressedStreamInput;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.document.ResetFieldSelector;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -251,7 +255,7 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
StreamOutput streamOutput;
if (compress != null && compress && (compressThreshold == -1 || dataLength > compressThreshold)) {
streamOutput = cachedEntry.cachedLZFBytes();
streamOutput = cachedEntry.cachedBytes(CompressorFactory.defaultCompressor());
} else {
streamOutput = cachedEntry.cachedBytes();
}
@ -267,19 +271,19 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
dataLength = data.length;
CachedStreamOutput.pushEntry(cachedEntry);
} else if (compress != null && compress && !LZF.isCompressed(data, dataOffset, dataLength)) {
} else if (compress != null && compress && !CompressorFactory.isCompressed(data, dataOffset, dataLength)) {
if (compressThreshold == -1 || dataLength > compressThreshold) {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
XContentType contentType = XContentFactory.xContentType(data, dataOffset, dataLength);
if (formatContentType != null && formatContentType != contentType) {
XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, cachedEntry.cachedLZFBytes());
XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, cachedEntry.cachedBytes(CompressorFactory.defaultCompressor()));
builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(data, dataOffset, dataLength));
builder.close();
} else {
LZFStreamOutput streamOutput = cachedEntry.cachedLZFBytes();
StreamOutput streamOutput = cachedEntry.cachedBytes(CompressorFactory.defaultCompressor());
streamOutput.writeBytes(data, dataOffset, dataLength);
streamOutput.flush();
streamOutput.close();
}
// we copy over the byte array, since we need to push back the cached entry
// TODO, we we had a handle into when we are done with parsing, then we push back then and not copy over bytes
@ -294,18 +298,18 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
}
} else if (formatContentType != null) {
// see if we need to convert the content type
if (LZF.isCompressed(data, dataOffset, dataLength)) {
BytesStreamInput siBytes = new BytesStreamInput(data, dataOffset, dataLength, false);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
XContentType contentType = XContentFactory.xContentType(siLzf);
siLzf.resetToBufferStart();
Compressor compressor = CompressorFactory.compressor(data, dataOffset, dataLength);
if (compressor != null) {
CompressedStreamInput compressedStreamInput = compressor.streamInput(new BytesStreamInput(data, dataOffset, dataLength, false));
XContentType contentType = XContentFactory.xContentType(compressedStreamInput);
compressedStreamInput.resetToBufferStart();
if (contentType != formatContentType) {
// we need to reread and store back, compressed....
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
LZFStreamOutput streamOutput = cachedEntry.cachedLZFBytes();
StreamOutput streamOutput = cachedEntry.cachedBytes(CompressorFactory.defaultCompressor());
XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, streamOutput);
builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(siLzf));
builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(compressedStreamInput));
builder.close();
data = cachedEntry.bytes().copiedByteArray();
dataOffset = 0;
@ -315,6 +319,8 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
}
} else {
compressedStreamInput.close();
}
} else {
XContentType contentType = XContentFactory.xContentType(data, dataOffset, dataLength);
@ -355,14 +361,11 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
if (value == null) {
return value;
}
if (LZF.isCompressed(value)) {
try {
return LZFDecoder.decode(value);
} catch (IOException e) {
throw new ElasticSearchParseException("failed to decompress source", e);
}
try {
return CompressorFactory.uncompressIfNeeded(new BytesHolder(value)).bytes();
} catch (IOException e) {
throw new ElasticSearchParseException("failed to decompress source", e);
}
return value;
}
@Override

View File

@ -19,11 +19,11 @@
package org.elasticsearch.rest.action.support;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.compress.CompressedStreamInput;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.LZFStreamInput;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.rest.RestRequest;
@ -67,15 +67,15 @@ public class RestXContentBuilder {
}
public static void restDocumentSource(byte[] source, int offset, int length, XContentBuilder builder, ToXContent.Params params) throws IOException {
if (LZF.isCompressed(source, offset, length)) {
BytesStreamInput siBytes = new BytesStreamInput(source, offset, length, false);
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
XContentType contentType = XContentFactory.xContentType(siLzf);
siLzf.resetToBufferStart();
Compressor compressor = CompressorFactory.compressor(source, offset, length);
if (compressor != null) {
CompressedStreamInput compressedStreamInput = compressor.streamInput(new BytesStreamInput(source, offset, length, false));
XContentType contentType = XContentFactory.xContentType(compressedStreamInput);
compressedStreamInput.resetToBufferStart();
if (contentType == builder.contentType()) {
builder.rawField("_source", siLzf);
builder.rawField("_source", compressedStreamInput);
} else {
XContentParser parser = XContentFactory.xContent(contentType).createParser(siLzf);
XContentParser parser = XContentFactory.xContent(contentType).createParser(compressedStreamInput);
try {
parser.nextToken();
builder.field("_source");

View File

@ -26,8 +26,7 @@ import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -167,14 +166,12 @@ public class InternalSearchHit implements SearchHit {
* Returns bytes reference, also un compress the source if needed.
*/
public BytesHolder sourceRef() {
if (LZF.isCompressed(source.bytes(), source.offset(), source.length())) {
try {
this.source = new BytesHolder(LZFDecoder.decode(source.bytes(), source.offset(), source.length()));
} catch (IOException e) {
throw new ElasticSearchParseException("failed to decompress source", e);
}
try {
this.source = CompressorFactory.uncompressIfNeeded(this.source);
return this.source;
} catch (IOException e) {
throw new ElasticSearchParseException("failed to decompress source", e);
}
return this.source;
}
/**

View File

@ -159,7 +159,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
HandlesStreamOutput stream = cachedEntry.cachedHandlesBytes();
StreamOutput stream = cachedEntry.cachedHandles();
stream.writeLong(requestId);
byte status = 0;
@ -169,12 +169,14 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
stream.writeUTF(action);
message.writeTo(stream);
stream.close();
final LocalTransport targetTransport = connectedNodes.get(node);
if (targetTransport == null) {
throw new NodeNotConnectedException(node, "Node not connected");
}
final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();
final byte[] data = cachedEntry.bytes().copiedByteArray();
transportServiceAdapter.sent(data.length);

View File

@ -22,7 +22,7 @@ package org.elasticsearch.transport.local;
import org.elasticsearch.common.io.ThrowableObjectOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.transport.NotSerializableTransportException;
import org.elasticsearch.transport.RemoteTransportException;
@ -68,12 +68,13 @@ public class LocalTransportChannel implements TransportChannel {
public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
HandlesStreamOutput stream = cachedEntry.cachedHandlesBytes();
StreamOutput stream = cachedEntry.cachedHandles();
stream.writeLong(requestId);
byte status = 0;
status = TransportStreams.statusSetResponse(status);
stream.writeByte(status); // 0 for request, 1 for response.
message.writeTo(stream);
stream.close();
final byte[] data = cachedEntry.bytes().copiedByteArray();
targetTransport.threadPool().generic().execute(new Runnable() {
@Override

View File

@ -19,9 +19,11 @@
package org.elasticsearch.transport.netty;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.ThrowableObjectInputStream;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.HandlesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.logging.ESLogger;
@ -215,9 +217,13 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
byte status = buffer.readByte();
boolean isRequest = TransportStreams.statusIsRequest(status);
HandlesStreamInput wrappedStream;
if (TransportStreams.statusIsCompress(status)) {
wrappedStream = CachedStreamInput.cachedHandlesLzf(streamIn);
StreamInput wrappedStream;
if (TransportStreams.statusIsCompress(status) && buffer.readable()) {
Compressor compressor = CompressorFactory.compressor(buffer);
if (compressor == null) {
throw new ElasticSearchIllegalStateException("stream marked as compressed, but no compressor found");
}
wrappedStream = CachedStreamInput.cachedHandlesCompressed(compressor, streamIn);
} else {
wrappedStream = CachedStreamInput.cachedHandles(streamIn);
}
@ -254,7 +260,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
buffer.readerIndex(expectedIndexReader);
}
}
wrappedStream.cleanHandles();
wrappedStream.close();
}
private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) {

View File

@ -19,8 +19,9 @@
package org.elasticsearch.transport.support;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseOptions;
@ -105,17 +106,17 @@ public class TransportStreams {
if (options.compress()) {
status = TransportStreams.statusSetCompress(status);
HandlesStreamOutput stream = cachedEntry.cachedHandlesLzfBytes();
StreamOutput stream = cachedEntry.cachedHandles(CompressorFactory.defaultCompressor());
cachedEntry.bytes().write(HEADER_PLACEHOLDER);
stream.writeUTF(action);
message.writeTo(stream);
stream.flush();
stream.close();
} else {
HandlesStreamOutput stream = cachedEntry.cachedHandlesBytes();
StreamOutput stream = cachedEntry.cachedHandles();
cachedEntry.bytes().write(HEADER_PLACEHOLDER);
stream.writeUTF(action);
message.writeTo(stream);
stream.flush();
stream.close();
}
TransportStreams.writeHeader(cachedEntry.bytes().underlyingBytes(), cachedEntry.bytes().size(), requestId, status);
}
@ -126,15 +127,15 @@ public class TransportStreams {
if (options.compress()) {
status = TransportStreams.statusSetCompress(status);
HandlesStreamOutput stream = cachedEntry.cachedHandlesLzfBytes();
StreamOutput stream = cachedEntry.cachedHandles(CompressorFactory.defaultCompressor());
cachedEntry.bytes().write(HEADER_PLACEHOLDER);
message.writeTo(stream);
stream.flush();
stream.close();
} else {
HandlesStreamOutput stream = cachedEntry.cachedHandlesBytes();
StreamOutput stream = cachedEntry.cachedHandles();
cachedEntry.bytes().write(HEADER_PLACEHOLDER);
message.writeTo(stream);
stream.flush();
stream.close();
}
TransportStreams.writeHeader(cachedEntry.bytes().underlyingBytes(), cachedEntry.bytes().size(), requestId, status);
}

View File

@ -29,7 +29,6 @@ import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.unit.index.engine.AbstractSimpleEngineTests;
import org.elasticsearch.threadpool.ThreadPool;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
@ -39,7 +38,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_
public class SimpleRobinEngineTests extends AbstractSimpleEngineTests {
protected Engine createEngine(Store store, Translog translog) {
return new RobinEngine(shardId, EMPTY_SETTINGS, new ThreadPool(), new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), new ShardIndexingService(shardId, EMPTY_SETTINGS), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
return new RobinEngine(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), new ShardIndexingService(shardId, EMPTY_SETTINGS), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NoneBloomCache(shardId.index()));
}
}

View File

@ -19,11 +19,11 @@
package org.elasticsearch.test.unit.index.mapper.source;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.test.unit.index.mapper.MapperTests;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.test.unit.index.mapper.MapperTests;
import org.testng.annotations.Test;
import static org.hamcrest.MatcherAssert.assertThat;
@ -47,7 +47,7 @@ public class CompressSourceMappingTests {
.field("field2", "value2")
.endObject().copiedBytes());
assertThat(LZF.isCompressed(doc.rootDoc().getBinaryValue("_source")), equalTo(false));
assertThat(CompressorFactory.isCompressed(doc.rootDoc().getBinaryValue("_source")), equalTo(false));
}
@Test
@ -63,7 +63,7 @@ public class CompressSourceMappingTests {
.field("field2", "value2")
.endObject().copiedBytes());
assertThat(LZF.isCompressed(doc.rootDoc().getBinaryValue("_source")), equalTo(true));
assertThat(CompressorFactory.isCompressed(doc.rootDoc().getBinaryValue("_source")), equalTo(true));
}
@Test
@ -78,7 +78,7 @@ public class CompressSourceMappingTests {
.field("field1", "value1")
.endObject().copiedBytes());
assertThat(LZF.isCompressed(doc.rootDoc().getBinaryValue("_source")), equalTo(false));
assertThat(CompressorFactory.isCompressed(doc.rootDoc().getBinaryValue("_source")), equalTo(false));
doc = documentMapper.parse("type", "1", XContentFactory.jsonBuilder().startObject()
.field("field1", "value1")
@ -88,6 +88,6 @@ public class CompressSourceMappingTests {
.field("field2", "value2 xxxxxxxxxxxxxx yyyyyyyyyyyyyyyyyyy zzzzzzzzzzzzzzzzz")
.endObject().copiedBytes());
assertThat(LZF.isCompressed(doc.rootDoc().getBinaryValue("_source")), equalTo(true));
assertThat(CompressorFactory.isCompressed(doc.rootDoc().getBinaryValue("_source")), equalTo(true));
}
}

View File

@ -20,8 +20,8 @@
package org.elasticsearch.test.unit.index.mapper.source;
import org.apache.lucene.document.Fieldable;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.mapper.DocumentMapper;
@ -94,8 +94,8 @@ public class DefaultSourceMappingTests {
.field("field", "value")
.endObject().copiedBytes());
assertThat(LZF.isCompressed(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(true));
byte[] uncompressed = LZFDecoder.decode(doc.source(), doc.sourceOffset(), doc.sourceLength());
assertThat(CompressorFactory.isCompressed(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(true));
byte[] uncompressed = CompressorFactory.uncompressIfNeeded(new BytesHolder(doc.source(), doc.sourceOffset(), doc.sourceLength())).copyBytes();
assertThat(XContentFactory.xContentType(uncompressed), equalTo(XContentType.JSON));
documentMapper = MapperTests.newParser().parse(mapping);
@ -103,8 +103,8 @@ public class DefaultSourceMappingTests {
.field("field", "value")
.endObject().copiedBytes());
assertThat(LZF.isCompressed(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(true));
uncompressed = LZFDecoder.decode(doc.source(), doc.sourceOffset(), doc.sourceLength());
assertThat(CompressorFactory.isCompressed(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(true));
uncompressed = CompressorFactory.uncompressIfNeeded(new BytesHolder(doc.source(), doc.sourceOffset(), doc.sourceLength())).copyBytes();
assertThat(XContentFactory.xContentType(uncompressed), equalTo(XContentType.JSON));
}