Compression: Support snappy as a compression option, closes #2081.

This commit is contained in:
Shay Banon 2012-07-04 17:14:12 +02:00
parent 2436f645aa
commit 57023c8ba9
53 changed files with 1173 additions and 133 deletions

10
pom.xml
View File

@ -184,6 +184,14 @@
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.0.4.1</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<!-- We don't use this since the publish pom is then messed up -->
<!--
<dependency>
@ -494,7 +502,7 @@
</data>
<data>
<src>${project.build.directory}/lib</src>
<includes>lucene*, log4j*, jna*</includes>
<includes>lucene*, log4j*, jna*, snappy-java-*</includes>
<type>directory</type>
<mapper>
<type>perm</type>

View File

@ -7,6 +7,7 @@
<include>org.apache.lucene:lucene*</include>
<include>log4j:log4j</include>
<include>net.java.dev.jna:jna</include>
<include>org.xerial.snappy:snappy-java</include>
</includes>
</dependencySet>
<dependencySet>

View File

@ -50,6 +50,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.io.CachedStreams;
@ -155,6 +156,8 @@ public class TransportClient extends AbstractClient {
this.pluginsService = new PluginsService(tuple.v1(), tuple.v2());
this.settings = pluginsService.updatedSettings();
CompressorFactory.configure(settings);
ModulesBuilder modules = new ModulesBuilder();
modules.add(new PluginsModule(settings, pluginsService));
modules.add(new EnvironmentModule(environment));

View File

@ -224,7 +224,7 @@ public class ClusterState {
public static byte[] toBytes(ClusterState state) throws IOException {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
BytesStreamOutput os = cachedEntry.cachedBytes();
BytesStreamOutput os = cachedEntry.bytes();
writeTo(state, os);
return os.copiedByteArray();
} finally {

View File

@ -27,31 +27,34 @@ import java.io.IOException;
/**
*/
public abstract class CompressedIndexInput extends IndexInput {
public abstract class CompressedIndexInput<T extends CompressorContext> extends IndexInput {
private IndexInput in;
protected final T context;
private int version;
private long uncompressedLength;
private long totalUncompressedLength;
private BigLongArray offsets;
private boolean closed;
protected byte[] uncompressed;
protected int uncompressedLength;
private int position = 0;
private int valid = 0;
private int currentOffsetIdx;
private long currentUncompressedChunkPointer;
public CompressedIndexInput(IndexInput in) throws IOException {
public CompressedIndexInput(IndexInput in, T context) throws IOException {
super("compressed(" + in.toString() + ")");
this.in = in;
this.context = context;
readHeader(in);
this.version = in.readInt();
long metaDataPosition = in.readLong();
long headerLength = in.getFilePointer();
in.seek(metaDataPosition);
this.uncompressedLength = in.readVLong();
this.totalUncompressedLength = in.readVLong();
int size = in.readVInt();
offsets = new BigLongArray(size);
for (int i = 0; i < size; i++) {
@ -133,7 +136,7 @@ public abstract class CompressedIndexInput extends IndexInput {
@Override
public void seek(long pos) throws IOException {
int idx = (int) (pos / uncompressed.length);
int idx = (int) (pos / uncompressedLength);
if (idx >= offsets.size) {
// set the next "readyBuffer" to EOF
currentOffsetIdx = idx;
@ -151,12 +154,12 @@ public abstract class CompressedIndexInput extends IndexInput {
currentOffsetIdx = idx - 1; // we are going to increase it in readyBuffer...
readyBuffer();
}
position = (int) (pos % uncompressed.length);
position = (int) (pos % uncompressedLength);
}
@Override
public long length() {
return uncompressedLength;
return totalUncompressedLength;
}
@Override
@ -187,7 +190,7 @@ public abstract class CompressedIndexInput extends IndexInput {
return false;
}
currentOffsetIdx++;
currentUncompressedChunkPointer = ((long) currentOffsetIdx) * uncompressed.length;
currentUncompressedChunkPointer = ((long) currentOffsetIdx) * uncompressedLength;
position = 0;
return (position < valid);
}
@ -203,8 +206,8 @@ public abstract class CompressedIndexInput extends IndexInput {
public Object clone() {
// we clone and we need to make sure we keep the same positions!
CompressedIndexInput cloned = (CompressedIndexInput) super.clone();
cloned.uncompressed = new byte[uncompressed.length];
System.arraycopy(uncompressed, 0, cloned.uncompressed, 0, uncompressed.length);
cloned.uncompressed = new byte[uncompressedLength];
System.arraycopy(uncompressed, 0, cloned.uncompressed, 0, uncompressedLength);
cloned.in = (IndexInput) cloned.in.clone();
return cloned;
}

View File

@ -28,11 +28,13 @@ import java.io.IOException;
/**
*/
public abstract class CompressedIndexOutput extends IndexOutput {
public abstract class CompressedIndexOutput<T extends CompressorContext> extends IndexOutput {
final IndexOutput out;
protected final T context;
protected byte[] uncompressed;
protected int uncompressedLength;
private int position = 0;
private long uncompressedPosition;
@ -43,8 +45,9 @@ public abstract class CompressedIndexOutput extends IndexOutput {
// need to have a growing segment long array list here...
private TLongArrayList offsets = new TLongArrayList();
public CompressedIndexOutput(IndexOutput out) throws IOException {
public CompressedIndexOutput(IndexOutput out, T context) throws IOException {
this.out = out;
this.context = context;
writeHeader(out);
out.writeInt(0); // version
metaDataPointer = out.getFilePointer();
@ -57,7 +60,7 @@ public abstract class CompressedIndexOutput extends IndexOutput {
@Override
public void writeByte(byte b) throws IOException {
if (position >= uncompressed.length) {
if (position >= uncompressedLength) {
flushBuffer();
}
uncompressedPosition++;
@ -70,7 +73,7 @@ public abstract class CompressedIndexOutput extends IndexOutput {
if (length == 0) {
return;
}
final int BUFFER_LEN = uncompressed.length;
final int BUFFER_LEN = uncompressedLength;
// simple case first: buffering only (for trivially short writes)
int free = BUFFER_LEN - position;
@ -109,7 +112,7 @@ public abstract class CompressedIndexOutput extends IndexOutput {
@Override
public void copyBytes(DataInput input, long length) throws IOException {
final int BUFFER_LEN = uncompressed.length;
final int BUFFER_LEN = uncompressedLength;
// simple case first: buffering only (for trivially short writes)
int free = BUFFER_LEN - position;

View File

@ -23,13 +23,13 @@ import org.elasticsearch.common.io.stream.StreamInput;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
/**
*/
public abstract class CompressedStreamInput extends StreamInput {
public abstract class CompressedStreamInput<T extends CompressorContext> extends StreamInput {
private final StreamInput in;
protected final CompressorContext context;
private boolean closed;
@ -37,8 +37,9 @@ public abstract class CompressedStreamInput extends StreamInput {
private int position = 0;
private int valid = 0;
public CompressedStreamInput(StreamInput in) throws IOException {
public CompressedStreamInput(StreamInput in, T context) throws IOException {
this.in = in;
this.context = context;
readHeader(in);
}
@ -169,6 +170,6 @@ public abstract class CompressedStreamInput extends StreamInput {
/**
* Uncompress the data into the out array, returning the size uncompressed
*/
protected abstract int uncompress(InputStream in, byte[] out) throws IOException;
protected abstract int uncompress(StreamInput in, byte[] out) throws IOException;
}

View File

@ -25,23 +25,26 @@ import java.io.IOException;
/**
*/
public abstract class CompressedStreamOutput extends StreamOutput {
public abstract class CompressedStreamOutput<T extends CompressorContext> extends StreamOutput {
private final StreamOutput out;
protected final T context;
protected byte[] uncompressed;
protected int uncompressedLength;
private int position = 0;
private boolean closed;
public CompressedStreamOutput(StreamOutput out) throws IOException {
public CompressedStreamOutput(StreamOutput out, T context) throws IOException {
this.out = out;
this.context = context;
writeHeader(out);
}
@Override
public void write(int b) throws IOException {
if (position >= uncompressed.length) {
if (position >= uncompressedLength) {
flushBuffer();
}
uncompressed[position++] = (byte) b;
@ -49,7 +52,7 @@ public abstract class CompressedStreamOutput extends StreamOutput {
@Override
public void writeByte(byte b) throws IOException {
if (position >= uncompressed.length) {
if (position >= uncompressedLength) {
flushBuffer();
}
uncompressed[position++] = b;
@ -61,7 +64,7 @@ public abstract class CompressedStreamOutput extends StreamOutput {
if (length == 0) {
return;
}
final int BUFFER_LEN = uncompressed.length;
final int BUFFER_LEN = uncompressedLength;
// simple case first: buffering only (for trivially short writes)
int free = BUFFER_LEN - position;

View File

@ -23,6 +23,7 @@ import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
@ -33,6 +34,8 @@ public interface Compressor {
String type();
void configure(Settings settings);
boolean isCompressed(byte[] data, int offset, int length);
boolean isCompressed(ChannelBuffer buffer);

View File

@ -0,0 +1,25 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress;
/**
*/
public interface CompressorContext {
}

View File

@ -20,14 +20,22 @@
package org.elasticsearch.common.compress;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.compress.lzf.LZFCompressor;
import org.elasticsearch.common.compress.snappy.UnavailableSnappyCompressor;
import org.elasticsearch.common.compress.snappy.xerial.XerialSnappy;
import org.elasticsearch.common.compress.snappy.xerial.XerialSnappyCompressor;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
/**
*/
@ -37,20 +45,57 @@ public class CompressorFactory {
private static final Compressor[] compressors;
private static final ImmutableMap<String, Compressor> compressorsByType;
private static Compressor defaultCompressor;
static {
compressors = new Compressor[1];
compressors[0] = LZF;
List<Compressor> compressorsX = Lists.newArrayList();
compressorsX.add(LZF);
boolean addedSnappy = false;
if (XerialSnappy.available) {
compressorsX.add(new XerialSnappyCompressor());
addedSnappy = true;
} else {
Loggers.getLogger(CompressorFactory.class).debug("failed to load xerial snappy-java", XerialSnappy.failure);
}
if (!addedSnappy) {
compressorsX.add(new UnavailableSnappyCompressor());
}
compressors = compressorsX.toArray(new Compressor[compressorsX.size()]);
MapBuilder<String, Compressor> compressorsByTypeX = MapBuilder.newMapBuilder();
for (Compressor compressor : compressors) {
compressorsByTypeX.put(compressor.type(), compressor);
}
compressorsByType = compressorsByTypeX.immutableMap();
defaultCompressor = LZF;
}
public static synchronized void configure(Settings settings) {
for (Compressor compressor : compressors) {
compressor.configure(settings);
}
String defaultType = settings.get("compress.default.type", "lzf").toLowerCase(Locale.ENGLISH);
boolean found = false;
for (Compressor compressor : compressors) {
if (defaultType.equalsIgnoreCase(compressor.type())) {
defaultCompressor = compressor;
found = true;
break;
}
}
if (!found) {
Loggers.getLogger(CompressorFactory.class).warn("failed to find default type [{}]", defaultType);
}
}
public static synchronized void setDefaultCompressor(Compressor defaultCompressor) {
CompressorFactory.defaultCompressor = defaultCompressor;
}
public static Compressor defaultCompressor() {
return LZF;
return defaultCompressor;
}
public static boolean isCompressed(byte[] data) {

View File

@ -30,17 +30,18 @@ import java.util.Arrays;
/**
*/
public class LZFCompressedIndexInput extends CompressedIndexInput {
public class LZFCompressedIndexInput extends CompressedIndexInput<LZFCompressorContext> {
private final ChunkDecoder decoder;
// scratch area buffer
private byte[] inputBuffer;
public LZFCompressedIndexInput(IndexInput in, ChunkDecoder decoder) throws IOException {
super(in);
super(in, LZFCompressorContext.INSTANCE);
this.decoder = decoder;
this.uncompressed = new byte[LZFChunk.MAX_CHUNK_LEN];
this.uncompressedLength = LZFChunk.MAX_CHUNK_LEN;
this.inputBuffer = new byte[LZFChunk.MAX_CHUNK_LEN];
}

View File

@ -30,15 +30,16 @@ import java.io.IOException;
/**
*/
public class LZFCompressedIndexOutput extends CompressedIndexOutput {
public class LZFCompressedIndexOutput extends CompressedIndexOutput<LZFCompressorContext> {
private final BufferRecycler recycler;
private final ChunkEncoder encoder;
public LZFCompressedIndexOutput(IndexOutput out) throws IOException {
super(out);
super(out, LZFCompressorContext.INSTANCE);
this.recycler = BufferRecycler.instance();
this.uncompressed = this.recycler.allocOutputBuffer(LZFChunk.MAX_CHUNK_LEN);
this.uncompressedLength = LZFChunk.MAX_CHUNK_LEN;
this.encoder = new ChunkEncoder(LZFChunk.MAX_CHUNK_LEN);
}

View File

@ -26,11 +26,10 @@ import org.elasticsearch.common.compress.CompressedStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
import java.io.InputStream;
/**
*/
public class LZFCompressedStreamInput extends CompressedStreamInput {
public class LZFCompressedStreamInput extends CompressedStreamInput<LZFCompressorContext> {
private final BufferRecycler recycler;
@ -40,7 +39,7 @@ public class LZFCompressedStreamInput extends CompressedStreamInput {
private byte[] inputBuffer;
public LZFCompressedStreamInput(StreamInput in, ChunkDecoder decoder) throws IOException {
super(in);
super(in, LZFCompressorContext.INSTANCE);
this.recycler = BufferRecycler.instance();
this.decoder = decoder;
@ -54,7 +53,7 @@ public class LZFCompressedStreamInput extends CompressedStreamInput {
}
@Override
public int uncompress(InputStream in, byte[] out) throws IOException {
public int uncompress(StreamInput in, byte[] out) throws IOException {
return decoder.decodeChunk(in, inputBuffer, out);
}

View File

@ -29,15 +29,16 @@ import java.io.IOException;
/**
*/
public class LZFCompressedStreamOutput extends CompressedStreamOutput {
public class LZFCompressedStreamOutput extends CompressedStreamOutput<LZFCompressorContext> {
private final BufferRecycler recycler;
private final ChunkEncoder encoder;
public LZFCompressedStreamOutput(StreamOutput out) throws IOException {
super(out);
super(out, LZFCompressorContext.INSTANCE);
this.recycler = BufferRecycler.instance();
this.uncompressed = this.recycler.allocOutputBuffer(LZFChunk.MAX_CHUNK_LEN);
this.uncompressedLength = LZFChunk.MAX_CHUNK_LEN;
this.encoder = new ChunkEncoder(LZFChunk.MAX_CHUNK_LEN);
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.compress.*;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
@ -53,6 +54,10 @@ public class LZFCompressor implements Compressor {
return TYPE;
}
@Override
public void configure(Settings settings) {
}
@Override
public boolean isCompressed(byte[] data, int offset, int length) {
return length >= 3 &&

View File

@ -0,0 +1,29 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.lzf;
import org.elasticsearch.common.compress.CompressorContext;
/**
*/
public class LZFCompressorContext implements CompressorContext {
public static final LZFCompressorContext INSTANCE = new LZFCompressorContext();
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.snappy;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.common.compress.CompressedIndexInput;
import java.io.IOException;
import java.util.Arrays;
/**
*/
public abstract class SnappyCompressedIndexInput extends CompressedIndexInput<SnappyCompressorContext> {
protected int chunkSize;
protected int maxCompressedChunkLength;
protected byte[] inputBuffer;
public SnappyCompressedIndexInput(IndexInput in, SnappyCompressorContext context) throws IOException {
super(in, context);
this.uncompressed = new byte[chunkSize];
this.uncompressedLength = chunkSize;
this.inputBuffer = new byte[Math.max(chunkSize, maxCompressedChunkLength)];
}
@Override
protected void readHeader(IndexInput in) throws IOException {
byte[] header = new byte[SnappyCompressor.HEADER.length];
in.readBytes(header, 0, header.length);
if (!Arrays.equals(header, SnappyCompressor.HEADER)) {
throw new IOException("wrong snappy compressed header [" + Arrays.toString(header) + "]");
}
this.chunkSize = in.readVInt();
this.maxCompressedChunkLength = in.readVInt();
}
@Override
protected void doClose() throws IOException {
// nothing to do here
}
@Override
public Object clone() {
SnappyCompressedIndexInput cloned = (SnappyCompressedIndexInput) super.clone();
cloned.inputBuffer = new byte[inputBuffer.length];
return cloned;
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.snappy;
import com.ning.compress.BufferRecycler;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.compress.CompressedIndexOutput;
import java.io.IOException;
/**
*/
public abstract class SnappyCompressedIndexOutput extends CompressedIndexOutput<SnappyCompressorContext> {
protected final BufferRecycler recycler;
protected byte[] compressedBuffer;
public SnappyCompressedIndexOutput(IndexOutput out, SnappyCompressorContext context) throws IOException {
super(out, context);
this.recycler = BufferRecycler.instance();
this.uncompressed = this.recycler.allocOutputBuffer(context.compressChunkLength());
this.uncompressedLength = context.compressChunkLength();
this.compressedBuffer = recycler.allocEncodingBuffer(context.compressMaxCompressedChunkLength());
}
@Override
protected void writeHeader(IndexOutput out) throws IOException {
out.writeBytes(SnappyCompressor.HEADER, SnappyCompressor.HEADER.length);
out.writeVInt(context.compressChunkLength());
out.writeVInt(context.compressMaxCompressedChunkLength());
}
@Override
protected void doClose() throws IOException {
byte[] buf = uncompressed;
if (buf != null) {
uncompressed = null;
recycler.releaseOutputBuffer(buf);
}
buf = compressedBuffer;
if (buf != null) {
compressedBuffer = null;
recycler.releaseEncodeBuffer(buf);
}
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.snappy;
import com.ning.compress.BufferRecycler;
import org.elasticsearch.common.compress.CompressedStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
import java.util.Arrays;
/**
*/
public abstract class SnappyCompressedStreamInput extends CompressedStreamInput<SnappyCompressorContext> {
protected final BufferRecycler recycler;
protected int chunkSize;
protected int maxCompressedChunkLength;
protected byte[] inputBuffer;
public SnappyCompressedStreamInput(StreamInput in, SnappyCompressorContext context) throws IOException {
super(in, context);
this.recycler = BufferRecycler.instance();
this.uncompressed = recycler.allocDecodeBuffer(Math.max(chunkSize, maxCompressedChunkLength));
this.inputBuffer = recycler.allocInputBuffer(Math.max(chunkSize, maxCompressedChunkLength));
}
@Override
public void readHeader(StreamInput in) throws IOException {
byte[] header = new byte[SnappyCompressor.HEADER.length];
in.readBytes(header, 0, header.length);
if (!Arrays.equals(header, SnappyCompressor.HEADER)) {
throw new IOException("wrong snappy compressed header [" + Arrays.toString(header) + "]");
}
this.chunkSize = in.readVInt();
this.maxCompressedChunkLength = in.readVInt();
}
@Override
protected void doClose() throws IOException {
byte[] buf = uncompressed;
if (buf != null) {
uncompressed = null;
recycler.releaseDecodeBuffer(uncompressed);
}
buf = inputBuffer;
if (buf != null) {
inputBuffer = null;
recycler.releaseInputBuffer(inputBuffer);
}
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.snappy;
import com.ning.compress.BufferRecycler;
import org.elasticsearch.common.compress.CompressedStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*/
public abstract class SnappyCompressedStreamOutput extends CompressedStreamOutput<SnappyCompressorContext> {
protected final BufferRecycler recycler;
protected byte[] compressedBuffer;
public SnappyCompressedStreamOutput(StreamOutput out, SnappyCompressorContext context) throws IOException {
super(out, context);
this.recycler = BufferRecycler.instance();
this.uncompressed = this.recycler.allocOutputBuffer(context.compressChunkLength());
this.uncompressedLength = context.compressChunkLength();
this.compressedBuffer = recycler.allocEncodingBuffer(context.compressMaxCompressedChunkLength());
}
@Override
public void writeHeader(StreamOutput out) throws IOException {
out.writeBytes(SnappyCompressor.HEADER);
out.writeVInt(context.compressChunkLength());
out.writeVInt(context.compressMaxCompressedChunkLength());
}
@Override
protected void doClose() throws IOException {
byte[] buf = uncompressed;
if (buf != null) {
uncompressed = null;
recycler.releaseOutputBuffer(buf);
}
buf = compressedBuffer;
if (buf != null) {
compressedBuffer = null;
recycler.releaseEncodeBuffer(buf);
}
}
}

View File

@ -0,0 +1,128 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.snappy;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
/**
*/
public abstract class SnappyCompressor implements Compressor {
public static final byte[] HEADER = {'s', 'n', 'a', 'p', 'p', 'y', 0};
protected SnappyCompressorContext compressorContext;
// default block size (32k)
static final int DEFAULT_CHUNK_SIZE = 1 << 15;
protected SnappyCompressor() {
this.compressorContext = new SnappyCompressorContext(DEFAULT_CHUNK_SIZE, maxCompressedLength(DEFAULT_CHUNK_SIZE));
}
@Override
public void configure(Settings settings) {
int chunkLength = (int) settings.getAsBytesSize("compress.snappy.chunk_size", new ByteSizeValue(compressorContext.compressChunkLength())).bytes();
int maxCompressedChunkLength = maxCompressedLength(chunkLength);
this.compressorContext = new SnappyCompressorContext(chunkLength, maxCompressedChunkLength);
}
protected abstract int maxCompressedLength(int length);
@Override
public boolean isCompressed(byte[] data, int offset, int length) {
if (length < HEADER.length) {
return false;
}
for (int i = 0; i < HEADER.length; i++) {
if (data[offset + i] != HEADER[i]) {
return false;
}
}
return true;
}
@Override
public boolean isCompressed(ChannelBuffer buffer) {
if (buffer.readableBytes() < HEADER.length) {
return false;
}
int offset = buffer.readerIndex();
for (int i = 0; i < HEADER.length; i++) {
if (buffer.getByte(offset + i) != HEADER[i]) {
return false;
}
}
return true;
}
@Override
public boolean isCompressed(IndexInput in) throws IOException {
long currentPointer = in.getFilePointer();
// since we have some metdata before the first compressed header, we check on our specific header
if (in.length() - currentPointer < (HEADER.length)) {
return false;
}
for (int i = 0; i < HEADER.length; i++) {
if (in.readByte() != HEADER[i]) {
in.seek(currentPointer);
return false;
}
}
in.seek(currentPointer);
return true;
}
@Override
public byte[] compress(byte[] data, int offset, int length) throws IOException {
// this needs to be the same format as regular streams reading from it!
CachedStreamOutput.Entry entry = CachedStreamOutput.popEntry();
try {
StreamOutput compressed = entry.bytes(this);
compressed.writeBytes(data, offset, length);
compressed.close();
return entry.bytes().copiedByteArray();
} finally {
CachedStreamOutput.pushEntry(entry);
}
}
@Override
public byte[] uncompress(byte[] data, int offset, int length) throws IOException {
StreamInput compressed = streamInput(new BytesStreamInput(data, offset, length, false));
CachedStreamOutput.Entry entry = CachedStreamOutput.popEntry();
try {
Streams.copy(compressed, entry.bytes());
return entry.bytes().copiedByteArray();
} finally {
CachedStreamOutput.pushEntry(entry);
}
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.snappy;
import org.elasticsearch.common.compress.CompressorContext;
/**
*/
public class SnappyCompressorContext implements CompressorContext {
private final int compressChunkLength;
private final int compressMaxCompressedChunkLength;
public SnappyCompressorContext(int compressChunkLength, int compressMaxCompressedChunkLength) {
this.compressChunkLength = compressChunkLength;
this.compressMaxCompressedChunkLength = compressMaxCompressedChunkLength;
}
public int compressChunkLength() {
return compressChunkLength;
}
public int compressMaxCompressedChunkLength() {
return compressMaxCompressedChunkLength;
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.snappy;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.common.compress.CompressedIndexInput;
import org.elasticsearch.common.compress.CompressedIndexOutput;
import org.elasticsearch.common.compress.CompressedStreamInput;
import org.elasticsearch.common.compress.CompressedStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*/
public class UnavailableSnappyCompressor extends SnappyCompressor {
@Override
public String type() {
return "snappy";
}
@Override
protected int maxCompressedLength(int length) {
return length;
}
@Override
public byte[] uncompress(byte[] data, int offset, int length) throws IOException {
throw new ElasticSearchIllegalStateException("snappy unavailable");
}
@Override
public byte[] compress(byte[] data, int offset, int length) throws IOException {
throw new ElasticSearchIllegalStateException("snappy unavailable");
}
@Override
public CompressedStreamInput streamInput(StreamInput in) throws IOException {
throw new ElasticSearchIllegalStateException("snappy unavailable");
}
@Override
public CompressedStreamOutput streamOutput(StreamOutput out) throws IOException {
throw new ElasticSearchIllegalStateException("snappy unavailable");
}
@Override
public CompressedIndexInput indexInput(IndexInput in) throws IOException {
throw new ElasticSearchIllegalStateException("snappy unavailable");
}
@Override
public CompressedIndexOutput indexOutput(IndexOutput out) throws IOException {
throw new ElasticSearchIllegalStateException("snappy unavailable");
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.snappy.xerial;
import org.xerial.snappy.Snappy;
/**
*/
public class XerialSnappy {
public static final boolean available;
public static final Throwable failure;
static {
Throwable failureX = null;
boolean availableX;
try {
byte[] tests = Snappy.compress("test");
Snappy.uncompressString(tests);
availableX = true;
} catch (Throwable e) {
availableX = false;
failureX = e;
}
available = availableX;
failure = failureX;
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.snappy.xerial;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.common.compress.snappy.SnappyCompressedIndexInput;
import org.elasticsearch.common.compress.snappy.SnappyCompressorContext;
import org.xerial.snappy.Snappy;
import java.io.IOException;
/**
*/
public class XerialSnappyCompressedIndexInput extends SnappyCompressedIndexInput {
public XerialSnappyCompressedIndexInput(IndexInput in, SnappyCompressorContext context) throws IOException {
super(in, context);
}
@Override
protected int uncompress(IndexInput in, byte[] out) throws IOException {
boolean compressed = in.readByte() != 0;
int length = in.readVInt();
if (!compressed) {
in.readBytes(out, 0, length);
return length;
} else {
in.readBytes(inputBuffer, 0, length);
return Snappy.rawUncompress(inputBuffer, 0, length, out, 0);
}
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.snappy.xerial;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.compress.snappy.SnappyCompressedIndexOutput;
import org.elasticsearch.common.compress.snappy.SnappyCompressorContext;
import org.xerial.snappy.Snappy;
import java.io.IOException;
/**
*/
public class XerialSnappyCompressedIndexOutput extends SnappyCompressedIndexOutput {
public XerialSnappyCompressedIndexOutput(IndexOutput out, SnappyCompressorContext context) throws IOException {
super(out, context);
}
@Override
protected void compress(byte[] data, int offset, int len, IndexOutput out) throws IOException {
int compressedLength = Snappy.rawCompress(data, offset, len, compressedBuffer, 0);
// use uncompressed input if less than 12.5% compression
if (compressedLength >= (len - (len / 8))) {
out.writeByte((byte) 0);
out.writeVInt(len);
out.writeBytes(data, offset, len);
} else {
out.writeByte((byte) 1);
out.writeVInt(compressedLength);
out.writeBytes(compressedBuffer, 0, compressedLength);
}
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.snappy.xerial;
import org.elasticsearch.common.compress.snappy.SnappyCompressedStreamInput;
import org.elasticsearch.common.compress.snappy.SnappyCompressorContext;
import org.elasticsearch.common.io.stream.StreamInput;
import org.xerial.snappy.Snappy;
import java.io.IOException;
/**
*/
public class XerialSnappyCompressedStreamInput extends SnappyCompressedStreamInput {
public XerialSnappyCompressedStreamInput(StreamInput in, SnappyCompressorContext context) throws IOException {
super(in, context);
}
@Override
protected int uncompress(StreamInput in, byte[] out) throws IOException {
int compressedByte = in.read();
// if we are on the "tip", just return 0 uncompressed data...
if (compressedByte == -1) {
return 0;
}
boolean compressed = compressedByte == 1;
int length = in.readVInt();
if (!compressed) {
in.readBytes(out, 0, length);
return length;
} else {
in.readBytes(inputBuffer, 0, length);
return Snappy.rawUncompress(inputBuffer, 0, length, out, 0);
}
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.snappy.xerial;
import org.elasticsearch.common.compress.snappy.SnappyCompressedStreamOutput;
import org.elasticsearch.common.compress.snappy.SnappyCompressorContext;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.xerial.snappy.Snappy;
import java.io.IOException;
/**
*/
public class XerialSnappyCompressedStreamOutput extends SnappyCompressedStreamOutput {
public XerialSnappyCompressedStreamOutput(StreamOutput out, SnappyCompressorContext context) throws IOException {
super(out, context);
}
@Override
protected void compress(byte[] data, int offset, int len, StreamOutput out) throws IOException {
int compressedLength = Snappy.rawCompress(data, offset, len, compressedBuffer, 0);
// use uncompressed input if less than 12.5% compression
if (compressedLength >= (len - (len / 8))) {
out.writeBoolean(false);
out.writeVInt(len);
out.writeBytes(data, offset, len);
} else {
out.writeBoolean(true);
out.writeVInt(compressedLength);
out.writeBytes(compressedBuffer, 0, compressedLength);
}
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.snappy.xerial;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.compress.CompressedIndexInput;
import org.elasticsearch.common.compress.CompressedIndexOutput;
import org.elasticsearch.common.compress.CompressedStreamInput;
import org.elasticsearch.common.compress.CompressedStreamOutput;
import org.elasticsearch.common.compress.snappy.SnappyCompressor;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.xerial.snappy.Snappy;
import java.io.IOException;
/**
*/
public class XerialSnappyCompressor extends SnappyCompressor {
@Override
public String type() {
return "snappy";
}
@Override
protected int maxCompressedLength(int length) {
return Snappy.maxCompressedLength(length);
}
@Override
public CompressedStreamInput streamInput(StreamInput in) throws IOException {
return new XerialSnappyCompressedStreamInput(in, compressorContext);
}
@Override
public CompressedStreamOutput streamOutput(StreamOutput out) throws IOException {
return new XerialSnappyCompressedStreamOutput(out, compressorContext);
}
@Override
public CompressedIndexInput indexInput(IndexInput in) throws IOException {
return new XerialSnappyCompressedIndexInput(in, compressorContext);
}
@Override
public CompressedIndexOutput indexOutput(IndexOutput out) throws IOException {
return new XerialSnappyCompressedIndexOutput(out, compressorContext);
}
}

View File

@ -32,8 +32,6 @@ import java.io.*;
* <p/>
* <p>Mainly for use within the framework,
* but also useful for application code.
*
*
*/
public abstract class Streams {
@ -164,7 +162,7 @@ public abstract class Streams {
public static byte[] copyToByteArray(InputStream in) throws IOException {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
BytesStreamOutput out = cachedEntry.cachedBytes();
BytesStreamOutput out = cachedEntry.bytes();
copy(in, out);
return out.copiedByteArray();
} finally {

View File

@ -34,7 +34,7 @@ public class AdapterStreamOutput extends StreamOutput {
this.out = out;
}
public void reset(StreamOutput out) throws IOException {
public void setOut(StreamOutput out) {
this.out = out;
}

View File

@ -47,35 +47,28 @@ public class CachedStreamOutput {
this.handles = handles;
}
/**
* Returns the underlying bytes without any resetting.
*/
public void reset() {
bytes.reset();
handles.setOut(bytes);
handles.clear();
}
public BytesStreamOutput bytes() {
return bytes;
}
/**
* Returns cached bytes that are also reset.
*/
public BytesStreamOutput cachedBytes() {
bytes.reset();
return bytes;
}
public StreamOutput cachedHandles() throws IOException {
handles.reset(bytes);
public StreamOutput handles() throws IOException {
return handles;
}
public StreamOutput cachedBytes(Compressor compressor) throws IOException {
bytes.reset();
public StreamOutput bytes(Compressor compressor) throws IOException {
return compressor.streamOutput(bytes);
}
public StreamOutput cachedHandles(Compressor compressor) throws IOException {
bytes.reset();
public StreamOutput handles(Compressor compressor) throws IOException {
StreamOutput compressed = compressor.streamOutput(bytes);
handles.reset(compressed);
handles.clear();
handles.setOut(compressed);
return handles;
}
}
@ -118,10 +111,12 @@ public class CachedStreamOutput {
return newEntry();
}
counter.decrementAndGet();
entry.reset();
return entry;
}
public static void pushEntry(Entry entry) {
entry.reset();
if (entry.bytes().underlyingBytes().length > BYTES_LIMIT) {
return;
}

View File

@ -80,12 +80,14 @@ public class HandlesStreamOutput extends AdapterStreamOutput {
public void reset() throws IOException {
handles.clear();
identityHandles.clear();
if (out != null) {
out.reset();
}
}
public void reset(StreamOutput out) throws IOException {
super.reset(out);
reset();
public void clear() {
handles.clear();
identityHandles.clear();
}
/**

View File

@ -268,7 +268,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
synchronized (sendMutex) {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
StreamOutput out = cachedEntry.cachedHandles();
StreamOutput out = cachedEntry.handles();
out.writeBytes(INTERNAL_HEADER);
Version.writeVersion(Version.CURRENT, out);
out.writeInt(id);

View File

@ -69,7 +69,7 @@ public class PublishClusterStateAction extends AbstractComponent {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
byte[] clusterStateInBytes;
try {
StreamOutput stream = cachedEntry.cachedHandles(CompressorFactory.defaultCompressor());
StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor());
ClusterState.Builder.writeTo(clusterState, stream);
stream.close();
clusterStateInBytes = cachedEntry.bytes().copiedByteArray();

View File

@ -155,9 +155,9 @@ public abstract class BlobStoreGateway extends SharedStorageGateway {
try {
StreamOutput streamOutput;
if (compress) {
streamOutput = cachedEntry.cachedBytes(CompressorFactory.defaultCompressor());
streamOutput = cachedEntry.bytes(CompressorFactory.defaultCompressor());
} else {
streamOutput = cachedEntry.cachedBytes();
streamOutput = cachedEntry.bytes();
}
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, streamOutput);
builder.startObject();

View File

@ -322,7 +322,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
logger.trace("[{}] writing state, reason [{}]", indexMetaData.index(), reason);
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
XContentBuilder builder = XContentFactory.contentBuilder(format, cachedEntry.cachedBytes());
XContentBuilder builder = XContentFactory.contentBuilder(format, cachedEntry.bytes());
builder.startObject();
IndexMetaData.Builder.toXContent(indexMetaData, builder, formatParams);
builder.endObject();
@ -373,7 +373,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
XContentBuilder builder = XContentFactory.contentBuilder(format, cachedEntry.cachedBytes());
XContentBuilder builder = XContentFactory.contentBuilder(format, cachedEntry.bytes());
builder.startObject();
MetaData.Builder.toXContent(globalMetaData, builder, formatParams);
builder.endObject();

View File

@ -268,7 +268,7 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
logger.trace("[{}][{}] writing shard state, reason [{}]", shardId.index().name(), shardId.id(), reason);
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, cachedEntry.cachedBytes());
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, cachedEntry.bytes());
builder.prettyPrint();
builder.startObject();
builder.field("version", shardStateInfo.version);

View File

@ -170,7 +170,7 @@ public class BinaryFieldMapper extends AbstractFieldMapper<byte[]> {
if (compress != null && compress && !CompressorFactory.isCompressed(value, 0, value.length)) {
if (compressThreshold == -1 || value.length > compressThreshold) {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
StreamOutput streamOutput = cachedEntry.cachedBytes(CompressorFactory.defaultCompressor());
StreamOutput streamOutput = cachedEntry.bytes(CompressorFactory.defaultCompressor());
streamOutput.writeBytes(value, 0, value.length);
streamOutput.close();
// we copy over the byte array, since we need to push back the cached entry

View File

@ -255,9 +255,9 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
StreamOutput streamOutput;
if (compress != null && compress && (compressThreshold == -1 || dataLength > compressThreshold)) {
streamOutput = cachedEntry.cachedBytes(CompressorFactory.defaultCompressor());
streamOutput = cachedEntry.bytes(CompressorFactory.defaultCompressor());
} else {
streamOutput = cachedEntry.cachedBytes();
streamOutput = cachedEntry.bytes();
}
XContentType contentType = formatContentType;
if (contentType == null) {
@ -277,11 +277,11 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
try {
XContentType contentType = XContentFactory.xContentType(data, dataOffset, dataLength);
if (formatContentType != null && formatContentType != contentType) {
XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, cachedEntry.cachedBytes(CompressorFactory.defaultCompressor()));
XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, cachedEntry.bytes(CompressorFactory.defaultCompressor()));
builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(data, dataOffset, dataLength));
builder.close();
} else {
StreamOutput streamOutput = cachedEntry.cachedBytes(CompressorFactory.defaultCompressor());
StreamOutput streamOutput = cachedEntry.bytes(CompressorFactory.defaultCompressor());
streamOutput.writeBytes(data, dataOffset, dataLength);
streamOutput.close();
}
@ -307,7 +307,7 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
// we need to reread and store back, compressed....
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
StreamOutput streamOutput = cachedEntry.cachedBytes(CompressorFactory.defaultCompressor());
StreamOutput streamOutput = cachedEntry.bytes(CompressorFactory.defaultCompressor());
XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, streamOutput);
builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(compressedStreamInput));
builder.close();
@ -329,7 +329,7 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
// we need to reread and store back, compressed....
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, cachedEntry.cachedBytes());
XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, cachedEntry.bytes());
builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(data, dataOffset, dataLength));
builder.close();
data = cachedEntry.bytes().copiedByteArray();

View File

@ -323,7 +323,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
rwl.readLock().lock();
try {
BytesStreamOutput out = cachedEntry.cachedBytes();
BytesStreamOutput out = cachedEntry.bytes();
out.writeInt(0); // marker for the size...
TranslogStreams.writeTranslogOperation(out, operation);
out.flush();

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Injectors;
import org.elasticsearch.common.inject.ModulesBuilder;
@ -117,6 +118,8 @@ public final class InternalNode implements Node {
this.settings = pluginsService.updatedSettings();
this.environment = tuple.v2();
CompressorFactory.configure(settings);
NodeEnvironment nodeEnvironment = new NodeEnvironment(this.settings, this.environment);
ModulesBuilder modules = new ModulesBuilder();

View File

@ -47,7 +47,7 @@ public class RestXContentBuilder {
contentType = XContentType.JSON;
}
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
XContentBuilder builder = new XContentBuilder(XContentFactory.xContent(contentType), cachedEntry.cachedBytes(), cachedEntry);
XContentBuilder builder = new XContentBuilder(XContentFactory.xContent(contentType), cachedEntry.bytes(), cachedEntry);
if (request.paramAsBoolean("pretty", false)) {
builder.prettyPrint();
}

View File

@ -159,7 +159,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
StreamOutput stream = cachedEntry.cachedHandles();
StreamOutput stream = cachedEntry.handles();
stream.writeLong(requestId);
byte status = 0;

View File

@ -68,7 +68,7 @@ public class LocalTransportChannel implements TransportChannel {
public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
StreamOutput stream = cachedEntry.cachedHandles();
StreamOutput stream = cachedEntry.handles();
stream.writeLong(requestId);
byte status = 0;
status = TransportStreams.statusSetResponse(status);
@ -93,14 +93,15 @@ public class LocalTransportChannel implements TransportChannel {
try {
BytesStreamOutput stream;
try {
stream = cachedEntry.cachedBytes();
stream = cachedEntry.bytes();
writeResponseExceptionHeader(stream);
RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, error);
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
too.writeObject(tx);
too.close();
} catch (NotSerializableException e) {
stream = cachedEntry.cachedBytes();
cachedEntry.reset();
stream = cachedEntry.bytes();
writeResponseExceptionHeader(stream);
RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, new NotSerializableTransportException(error));
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);

View File

@ -85,14 +85,15 @@ public class NettyTransportChannel implements TransportChannel {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
BytesStreamOutput stream;
try {
stream = cachedEntry.cachedBytes();
stream = cachedEntry.bytes();
writeResponseExceptionHeader(stream);
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error);
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
too.writeObject(tx);
too.close();
} catch (NotSerializableException e) {
stream = cachedEntry.cachedBytes();
cachedEntry.reset();
stream = cachedEntry.bytes();
writeResponseExceptionHeader(stream);
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, new NotSerializableTransportException(error));
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);

View File

@ -106,13 +106,13 @@ public class TransportStreams {
if (options.compress()) {
status = TransportStreams.statusSetCompress(status);
StreamOutput stream = cachedEntry.cachedHandles(CompressorFactory.defaultCompressor());
cachedEntry.bytes().write(HEADER_PLACEHOLDER);
StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor());
stream.writeUTF(action);
message.writeTo(stream);
stream.close();
} else {
StreamOutput stream = cachedEntry.cachedHandles();
StreamOutput stream = cachedEntry.handles();
cachedEntry.bytes().write(HEADER_PLACEHOLDER);
stream.writeUTF(action);
message.writeTo(stream);
@ -127,12 +127,12 @@ public class TransportStreams {
if (options.compress()) {
status = TransportStreams.statusSetCompress(status);
StreamOutput stream = cachedEntry.cachedHandles(CompressorFactory.defaultCompressor());
cachedEntry.bytes().write(HEADER_PLACEHOLDER);
StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor());
message.writeTo(stream);
stream.close();
} else {
StreamOutput stream = cachedEntry.cachedHandles();
StreamOutput stream = cachedEntry.handles();
cachedEntry.bytes().write(HEADER_PLACEHOLDER);
message.writeTo(stream);
stream.close();

View File

@ -27,8 +27,8 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.NIOFSDirectory;
import org.elasticsearch.common.compress.CompressedDirectory;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.lzf.LZFCompressor;
import org.elasticsearch.common.compress.snappy.xerial.XerialSnappyCompressor;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -45,8 +45,6 @@ public class LuceneCompressionBenchmark {
final long MAX_SIZE = ByteSizeValue.parseBytesSizeValue("50mb").bytes();
final boolean WITH_TV = true;
final Compressor compressor = CompressorFactory.defaultCompressor();
File testFile = new File("target/test/compress/lucene");
FileSystemUtils.deleteRecursively(testFile);
testFile.mkdirs();
@ -54,9 +52,11 @@ public class LuceneCompressionBenchmark {
FSDirectory uncompressedDir = new NIOFSDirectory(new File(testFile, "uncompressed"));
IndexWriter uncompressedWriter = new IndexWriter(uncompressedDir, new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER));
Directory compressedDir = new CompressedDirectory(new NIOFSDirectory(new File(testFile, "compressed")), compressor, false, "fdt", "tvf");
Directory compressedLzfDir = new CompressedDirectory(new NIOFSDirectory(new File(testFile, "compressed_lzf")), new LZFCompressor(), false, "fdt", "tvf");
IndexWriter compressedLzfWriter = new IndexWriter(compressedLzfDir, new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER));
IndexWriter compressedWriter = new IndexWriter(compressedDir, new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER));
Directory compressedSnappyDir = new CompressedDirectory(new NIOFSDirectory(new File(testFile, "compressed_snappy")), new XerialSnappyCompressor(), false, "fdt", "tvf");
IndexWriter compressedSnappyWriter = new IndexWriter(compressedSnappyDir, new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER));
System.out.println("feeding data...");
TestData testData = new TestData();
@ -72,19 +72,24 @@ public class LuceneCompressionBenchmark {
doc.add(field);
}
uncompressedWriter.addDocument(doc);
compressedWriter.addDocument(doc);
compressedLzfWriter.addDocument(doc);
compressedSnappyWriter.addDocument(doc);
}
System.out.println("optimizing...");
uncompressedWriter.forceMerge(1);
compressedWriter.forceMerge(1);
compressedLzfWriter.forceMerge(1);
compressedSnappyWriter.forceMerge(1);
uncompressedWriter.waitForMerges();
compressedWriter.waitForMerges();
compressedLzfWriter.waitForMerges();
compressedSnappyWriter.waitForMerges();
System.out.println("done");
uncompressedDir.close();
compressedWriter.close();
uncompressedWriter.close();
compressedLzfWriter.close();
compressedSnappyWriter.close();
compressedDir.close();
compressedLzfDir.close();
compressedSnappyDir.close();
uncompressedDir.close();
}

View File

@ -22,6 +22,10 @@ package org.elasticsearch.test.integration.search.compress;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.lzf.LZFCompressor;
import org.elasticsearch.common.compress.snappy.xerial.XerialSnappy;
import org.elasticsearch.common.compress.snappy.xerial.XerialSnappyCompressor;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
@ -60,19 +64,30 @@ public class SearchSourceCompressTests extends AbstractNodesTests {
}
@Test
public void testSourceFieldCompressed() throws IOException {
public void testSourceCompressionLZF() throws IOException {
CompressorFactory.setDefaultCompressor(new LZFCompressor());
verifySource(true);
}
@Test
public void testSourceFieldPlainExplicit() throws IOException {
verifySource(false);
verifySource(null);
}
@Test
public void testSourceFieldPlain() throws IOException {
public void testSourceCompressionXerialSnappy() throws IOException {
if (XerialSnappy.available) {
CompressorFactory.setDefaultCompressor(new XerialSnappyCompressor());
verifySource(true);
verifySource(false);
verifySource(null);
}
}
@Test
public void testAll() throws IOException {
testSourceCompressionLZF();
testSourceCompressionXerialSnappy();
testSourceCompressionLZF();
testSourceCompressionXerialSnappy();
}
private void verifySource(Boolean compress) throws IOException {
try {

View File

@ -1,3 +1,22 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.unit.common.compress;
import jsr166y.ThreadLocalRandom;
@ -13,10 +32,15 @@ import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RAMDirectory;
import org.elasticsearch.common.RandomStringGenerator;
import org.elasticsearch.common.compress.*;
import org.elasticsearch.common.compress.CompressedDirectory;
import org.elasticsearch.common.compress.CompressedIndexInput;
import org.elasticsearch.common.compress.CompressedIndexOutput;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.lzf.LZFCompressor;
import org.elasticsearch.common.compress.snappy.xerial.XerialSnappy;
import org.elasticsearch.common.compress.snappy.xerial.XerialSnappyCompressor;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.SizeValue;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.EOFException;
@ -29,15 +53,40 @@ import static org.hamcrest.Matchers.equalTo;
@Test
public class CompressIndexInputOutputTests {
private Compressor compressor;
@BeforeClass
public void buildCompressor() {
this.compressor = CompressorFactory.defaultCompressor();
@Test
public void testXerialSnappy() throws Exception {
if (XerialSnappy.available) {
testCompressor(new XerialSnappyCompressor());
}
}
@Test
public void empty() throws Exception {
public void testLZF() throws Exception {
testCompressor(new LZFCompressor());
}
@Test
public void testSideAffects() throws Exception {
if (XerialSnappy.available) {
testCompressor(new XerialSnappyCompressor());
}
testCompressor(new LZFCompressor());
if (XerialSnappy.available) {
testCompressor(new XerialSnappyCompressor());
}
testCompressor(new LZFCompressor());
}
private void testCompressor(Compressor compressor) throws Exception {
empty(compressor);
simple(compressor);
seek1Compressed(compressor);
seek1UnCompressed(compressor);
copyBytes(compressor);
lucene(compressor);
}
private void empty(Compressor compressor) throws Exception {
Directory dir = new RAMDirectory();
IndexOutput out = compressor.indexOutput(dir.createOutput("test"));
out.close();
@ -59,8 +108,7 @@ public class CompressIndexInputOutputTests {
}
}
@Test
public void simple() throws Exception {
private void simple(Compressor compressor) throws Exception {
Directory dir = new RAMDirectory();
IndexOutput out = compressor.indexOutput(dir.createOutput("test"));
long pos1 = out.getFilePointer();
@ -99,17 +147,15 @@ public class CompressIndexInputOutputTests {
in.close();
}
@Test
public void seek1Compressed() throws Exception {
seek1(true);
private void seek1Compressed(Compressor compressor) throws Exception {
seek1(true, compressor);
}
@Test
public void seek1UnCompressed() throws Exception {
seek1(false);
private void seek1UnCompressed(Compressor compressor) throws Exception {
seek1(false, compressor);
}
private void seek1(boolean compressed) throws Exception {
private void seek1(boolean compressed, Compressor compressor) throws Exception {
Directory dir = new RAMDirectory();
IndexOutput out = compressed ? compressor.indexOutput(dir.createOutput("test")) : dir.createOutput("test");
long pos1 = out.getFilePointer();
@ -152,8 +198,7 @@ public class CompressIndexInputOutputTests {
}
}
@Test
public void copyBytes() throws Exception {
private void copyBytes(Compressor compressor) throws Exception {
Directory dir = new RAMDirectory();
IndexOutput out = compressor.indexOutput(dir.createOutput("test"));
long pos1 = out.getFilePointer();
@ -204,8 +249,7 @@ public class CompressIndexInputOutputTests {
}
}
@Test
public void lucene() throws Exception {
private void lucene(Compressor compressor) throws Exception {
CompressedDirectory dir = new CompressedDirectory(new RAMDirectory(), compressor, false, "fdt");
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER));
writer.addDocument(createDoc(1, (int) SizeValue.parseSizeValue("100b").singles()));

View File

@ -20,6 +20,8 @@
package org.elasticsearch.test.unit.common.compress;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.testng.annotations.Test;
import java.io.IOException;
@ -34,7 +36,17 @@ import static org.hamcrest.Matchers.not;
public class CompressedStringTests {
@Test
public void simpleTests() throws IOException {
public void simpleTestsSnappy() throws IOException {
simpleTests("snappy");
}
@Test
public void simpleTestsLZF() throws IOException {
simpleTests("lzf");
}
public void simpleTests(String compressor) throws IOException {
CompressorFactory.configure(ImmutableSettings.settingsBuilder().put("compress.default.type", compressor).build());
String str = "this is a simple string";
CompressedString cstr = new CompressedString(str);
assertThat(cstr.string(), equalTo(str));

View File

@ -36,7 +36,7 @@ public class BytesStreamsTests {
@Test
public void testSimpleStreams() throws Exception {
BytesStreamOutput out = CachedStreamOutput.popEntry().cachedBytes();
BytesStreamOutput out = CachedStreamOutput.popEntry().bytes();
out.writeBoolean(false);
out.writeByte((byte) 1);
out.writeShort((short) -1);