Internal: Use DEFLATE instead of LZF for compression.

LZF only stays for backward-compatibility reasons and can only read, not write.
DEFLATE is configured to use level=3, which is a nice trade-off between speed
and compression ratio and is the same as we use for Lucene's high compression
codec.
This commit is contained in:
Adrien Grand 2015-05-22 17:56:51 +02:00
parent 08ee4a87b3
commit b6a3952036
22 changed files with 480 additions and 184 deletions

View File

@ -20,8 +20,14 @@
package org.elasticsearch.client.transport;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.Version;
import org.elasticsearch.action.*;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.client.support.Headers;
@ -30,7 +36,6 @@ import org.elasticsearch.cluster.ClusterNameModule;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.network.NetworkModule;
@ -122,8 +127,6 @@ public class TransportClient extends AbstractClient {
Version version = Version.CURRENT;
CompressorFactory.configure(this.settings);
final ThreadPool threadPool = new ThreadPool(settings);
boolean success = false;

View File

@ -23,7 +23,6 @@ import org.apache.lucene.store.IndexInput;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
@ -32,10 +31,6 @@ import java.io.IOException;
*/
public interface Compressor {
String type();
void configure(Settings settings);
boolean isCompressed(BytesReference bytes);
boolean isCompressed(ChannelBuffer buffer);

View File

@ -19,70 +19,36 @@
package org.elasticsearch.common.compress;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.compress.deflate.DeflateCompressor;
import org.elasticsearch.common.compress.lzf.LZFCompressor;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
/**
*/
public class CompressorFactory {
private static final LZFCompressor LZF = new LZFCompressor();
private static final Compressor[] compressors;
private static final ImmutableMap<String, Compressor> compressorsByType;
private static Compressor defaultCompressor;
private static volatile Compressor defaultCompressor;
static {
List<Compressor> compressorsX = Lists.newArrayList();
compressorsX.add(LZF);
compressors = compressorsX.toArray(new Compressor[compressorsX.size()]);
MapBuilder<String, Compressor> compressorsByTypeX = MapBuilder.newMapBuilder();
for (Compressor compressor : compressors) {
compressorsByTypeX.put(compressor.type(), compressor);
}
compressorsByType = compressorsByTypeX.immutableMap();
defaultCompressor = LZF;
compressors = new Compressor[] {
new LZFCompressor(),
new DeflateCompressor()
};
defaultCompressor = new DeflateCompressor();
}
public static synchronized void configure(Settings settings) {
for (Compressor compressor : compressors) {
compressor.configure(settings);
}
String defaultType = settings.get("compress.default.type", "lzf").toLowerCase(Locale.ENGLISH);
boolean found = false;
for (Compressor compressor : compressors) {
if (defaultType.equalsIgnoreCase(compressor.type())) {
defaultCompressor = compressor;
found = true;
break;
}
}
if (!found) {
Loggers.getLogger(CompressorFactory.class).warn("failed to find default type [{}]", defaultType);
}
}
public static synchronized void setDefaultCompressor(Compressor defaultCompressor) {
public static void setDefaultCompressor(Compressor defaultCompressor) {
CompressorFactory.defaultCompressor = defaultCompressor;
}
@ -94,6 +60,10 @@ public class CompressorFactory {
return compressor(bytes) != null;
}
/**
* @deprecated we don't compress lucene indexes anymore and rely on lucene codecs
*/
@Deprecated
public static boolean isCompressed(IndexInput in) throws IOException {
return compressor(in) != null;
}
@ -127,6 +97,10 @@ public class CompressorFactory {
throw new NotCompressedException();
}
/**
* @deprecated we don't compress lucene indexes anymore and rely on lucene codecs
*/
@Deprecated
@Nullable
public static Compressor compressor(IndexInput in) throws IOException {
for (Compressor compressor : compressors) {
@ -137,10 +111,6 @@ public class CompressorFactory {
return null;
}
public static Compressor compressor(String type) {
return compressorsByType.get(type);
}
/**
* Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(byte[], int, int)}.
*/
@ -160,7 +130,7 @@ public class CompressorFactory {
public static BytesReference uncompress(BytesReference bytes) throws IOException {
Compressor compressor = compressor(bytes);
if (compressor == null) {
throw new IllegalArgumentException("Bytes are not compressed");
throw new NotCompressedException();
}
return uncompress(bytes, compressor);
}

View File

@ -19,9 +19,10 @@
package org.elasticsearch.common.compress;
/** Exception indicating that we were expecting something compressed, which
* was not compressed or corrupted so that the compression format could not
* be detected. */
import org.elasticsearch.common.xcontent.XContent;
/** Exception indicating that we were expecting some {@link XContent} but could
* not detect its type. */
public class NotXContentException extends RuntimeException {
public NotXContentException(String message) {

View File

@ -0,0 +1,156 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.deflate;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedIndexInput;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
/**
* {@link Compressor} implementation based on the DEFLATE compression algorithm.
*/
public class DeflateCompressor implements Compressor {
// An arbitrary header that we use to identify compressed streams
// It needs to be different from other compressors and to not be specific
// enough so that no stream starting with these bytes could be detected as
// a XContent
private static final byte[] HEADER = new byte[] { 'D', 'F', 'L', '\0' };
// 3 is a good trade-off between speed and compression ratio
private static final int LEVEL = 3;
// We use buffering on the input and ouput of in/def-laters in order to
// limit the number of JNI calls
private static final int BUFFER_SIZE = 4096;
@Override
public boolean isCompressed(BytesReference bytes) {
if (bytes.length() < HEADER.length) {
return false;
}
for (int i = 0; i < HEADER.length; ++i) {
if (bytes.get(i) != HEADER[i]) {
return false;
}
}
return true;
}
@Override
public boolean isCompressed(ChannelBuffer buffer) {
if (buffer.readableBytes() < HEADER.length) {
return false;
}
final int offset = buffer.readerIndex();
for (int i = 0; i < HEADER.length; ++i) {
if (buffer.getByte(offset + i) != HEADER[i]) {
return false;
}
}
return true;
}
@Override
public StreamInput streamInput(StreamInput in) throws IOException {
final byte[] headerBytes = new byte[HEADER.length];
int len = 0;
while (len < headerBytes.length) {
final int read = in.read(headerBytes, len, headerBytes.length - len);
if (read == -1) {
break;
}
len += read;
}
if (len != HEADER.length || Arrays.equals(headerBytes, HEADER) == false) {
throw new IllegalArgumentException("Input stream is not compressed with DEFLATE!");
}
final boolean nowrap = true;
final Inflater inflater = new Inflater(nowrap);
InputStream decompressedIn = new InflaterInputStream(in, inflater, BUFFER_SIZE);
decompressedIn = new BufferedInputStream(decompressedIn, BUFFER_SIZE);
return new InputStreamStreamInput(decompressedIn) {
private boolean closed = false;
public void close() throws IOException {
try {
super.close();
} finally {
if (closed == false) {
// important to release native memory
inflater.end();
closed = true;
}
}
}
};
}
@Override
public StreamOutput streamOutput(StreamOutput out) throws IOException {
out.writeBytes(HEADER);
final boolean nowrap = true;
final Deflater deflater = new Deflater(LEVEL, nowrap);
final boolean syncFlush = true;
OutputStream compressedOut = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush);
compressedOut = new BufferedOutputStream(compressedOut, BUFFER_SIZE);
return new OutputStreamStreamOutput(compressedOut) {
private boolean closed = false;
public void close() throws IOException {
try {
super.close();
} finally {
if (closed == false) {
// important to release native memory
deflater.end();
closed = true;
}
}
}
};
}
@Override
public boolean isCompressed(IndexInput in) throws IOException {
return false;
}
@Override
public CompressedIndexInput indexInput(IndexInput in) throws IOException {
throw new UnsupportedOperationException();
}
}

View File

@ -25,25 +25,23 @@ import com.ning.compress.lzf.util.ChunkDecoderFactory;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedIndexInput;
import org.elasticsearch.common.compress.CompressedStreamInput;
import org.elasticsearch.common.compress.CompressedStreamOutput;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.deflate.DeflateCompressor;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.jboss.netty.buffer.ChannelBuffer;
import java.io.IOException;
/**
* @deprecated Use {@link DeflateCompressor} instead
*/
@Deprecated
public class LZFCompressor implements Compressor {
static final byte[] LUCENE_HEADER = {'L', 'Z', 'F', 0};
public static final String TYPE = "lzf";
private ChunkDecoder decoder;
public LZFCompressor() {
@ -52,14 +50,6 @@ public class LZFCompressor implements Compressor {
this.decoder.getClass().getSimpleName());
}
@Override
public String type() {
return TYPE;
}
@Override
public void configure(Settings settings) {}
@Override
public boolean isCompressed(BytesReference bytes) {
return bytes.length() >= 3 &&
@ -95,13 +85,13 @@ public class LZFCompressor implements Compressor {
}
@Override
public CompressedStreamInput streamInput(StreamInput in) throws IOException {
public StreamInput streamInput(StreamInput in) throws IOException {
return new LZFCompressedStreamInput(in, decoder);
}
@Override
public CompressedStreamOutput streamOutput(StreamOutput out) throws IOException {
return new LZFCompressedStreamOutput(out);
public StreamOutput streamOutput(StreamOutput out) throws IOException {
throw new UnsupportedOperationException("LZF is only here for back compat, no write support");
}
@Override

View File

@ -417,10 +417,10 @@ public class XContentHelper {
Compressor compressor = CompressorFactory.compressor(source);
if (compressor != null) {
InputStream compressedStreamInput = compressor.streamInput(source.streamInput());
XContentType contentType = XContentFactory.xContentType(compressedStreamInput);
if (compressedStreamInput.markSupported() == false) {
compressedStreamInput = new BufferedInputStream(compressedStreamInput);
}
XContentType contentType = XContentFactory.xContentType(compressedStreamInput);
if (contentType == builder.contentType()) {
builder.rawField(field, compressedStreamInput);
} else {

View File

@ -227,21 +227,21 @@ public class PublishClusterStateAction extends AbstractComponent {
public static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
BytesStreamOutput bStream = new BytesStreamOutput();
StreamOutput stream = CompressorFactory.defaultCompressor().streamOutput(bStream);
stream.setVersion(nodeVersion);
stream.writeBoolean(true);
clusterState.writeTo(stream);
stream.close();
try (StreamOutput stream = CompressorFactory.defaultCompressor().streamOutput(bStream)) {
stream.setVersion(nodeVersion);
stream.writeBoolean(true);
clusterState.writeTo(stream);
}
return bStream.bytes();
}
public static BytesReference serializeDiffClusterState(Diff diff, Version nodeVersion) throws IOException {
BytesStreamOutput bStream = new BytesStreamOutput();
StreamOutput stream = CompressorFactory.defaultCompressor().streamOutput(bStream);
stream.setVersion(nodeVersion);
stream.writeBoolean(false);
diff.writeTo(stream);
stream.close();
try (StreamOutput stream = CompressorFactory.defaultCompressor().streamOutput(bStream)) {
stream.setVersion(nodeVersion);
stream.writeBoolean(false);
diff.writeTo(stream);
}
return bStream.bytes();
}

View File

@ -468,11 +468,11 @@ public class DocumentMapper implements ToXContent {
private void refreshSource() throws ElasticsearchGenerationException {
try {
BytesStreamOutput bStream = new BytesStreamOutput();
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, CompressorFactory.defaultCompressor().streamOutput(bStream));
builder.startObject();
toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
builder.close();
try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, CompressorFactory.defaultCompressor().streamOutput(bStream))) {
builder.startObject();
toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
}
mappingSource = new CompressedXContent(bStream.bytes());
} catch (Exception e) {
throw new ElasticsearchGenerationException("failed to serialize source for type [" + type + "]", e);

View File

@ -37,8 +37,6 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.XContentParser;
@ -150,10 +148,12 @@ public class BinaryFieldMapper extends AbstractFieldMapper {
try {
return CompressorFactory.uncompressIfNeeded(bytes);
} catch (NotXContentException e) {
// This is a BUG! We try to decompress by detecting a header in
// the stored bytes but since we accept arbitrary bytes, we have
// no guarantee that uncompressed bytes will be detected as
// compressed!
// NOTE: previous versions of Elasticsearch used to try to detect if
// data was compressed. However this could cause decompression failures
// as a user may have submitted arbitrary data which looks like it is
// compressed to elasticsearch but is not. So we removed the ability to
// compress binary fields and keep this empty catch block for backward
// compatibility with 1.x
}
}
return bytes;

View File

@ -36,7 +36,6 @@ import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.lease.Releasable;
@ -151,7 +150,6 @@ public class Node implements Releasable {
// create the environment based on the finalized (processed) view of the settings
this.environment = new Environment(this.settings());
CompressorFactory.configure(settings);
final NodeEnvironment nodeEnvironment;
try {
nodeEnvironment = new NodeEnvironment(this.settings, this.environment);

View File

@ -26,12 +26,13 @@ import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
@ -39,13 +40,19 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardRepository;
@ -55,14 +62,21 @@ import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.*;
import org.elasticsearch.snapshots.InvalidSnapshotNameException;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotCreationException;
import org.elasticsearch.snapshots.SnapshotException;
import org.elasticsearch.snapshots.SnapshotMissingException;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.NoSuchFileException;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static com.google.common.collect.Lists.newArrayList;
@ -230,19 +244,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
}
// Write Global MetaData
// TODO: Check if metadata needs to be written
try (OutputStream output = snapshotsBlobContainer.createOutput(metaDataBlobName(snapshotId))) {
try (StreamOutput output = compressIfNeeded(snapshotsBlobContainer.createOutput(metaDataBlobName(snapshotId)))) {
writeGlobalMetaData(metaData, output);
}
for (String index : indices) {
final IndexMetaData indexMetaData = metaData.index(index);
final BlobPath indexPath = basePath().add("indices").add(index);
final BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
try (OutputStream output = indexMetaDataBlobContainer.createOutput(snapshotBlobName(snapshotId))) {
StreamOutput stream = new OutputStreamStreamOutput(output);
if (isCompress()) {
stream = CompressorFactory.defaultCompressor().streamOutput(stream);
}
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
try (StreamOutput output = compressIfNeeded(indexMetaDataBlobContainer.createOutput(snapshotBlobName(snapshotId)))) {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, output);
builder.startObject();
IndexMetaData.Builder.toXContent(indexMetaData, builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
@ -318,6 +328,22 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
}
}
private StreamOutput compressIfNeeded(OutputStream output) throws IOException {
StreamOutput out = null;
boolean success = false;
try {
out = new OutputStreamStreamOutput(output);
if (isCompress()) {
out = CompressorFactory.defaultCompressor().streamOutput(out);
}
success = true;
return out;
} finally {
if (!success) {
IOUtils.closeWhileHandlingException(out, output);
}
}
}
/**
* {@inheritDoc}
@ -328,7 +354,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
String tempBlobName = tempSnapshotBlobName(snapshotId);
String blobName = snapshotBlobName(snapshotId);
Snapshot blobStoreSnapshot = new Snapshot(snapshotId.getSnapshot(), indices, startTime, failure, System.currentTimeMillis(), totalShards, shardFailures);
try (OutputStream output = snapshotsBlobContainer.createOutput(tempBlobName)) {
try (StreamOutput output = compressIfNeeded(snapshotsBlobContainer.createOutput(tempBlobName))) {
writeSnapshot(blobStoreSnapshot, output);
}
snapshotsBlobContainer.move(tempBlobName, blobName);
@ -387,7 +413,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
}
} catch (FileNotFoundException | NoSuchFileException ex) {
throw new SnapshotMissingException(snapshotId, ex);
} catch (IOException ex) {
} catch (IOException | NotXContentException ex) {
throw new SnapshotException(snapshotId, "failed to get snapshots", ex);
}
}
@ -537,12 +563,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
* @return BytesStreamOutput representing JSON serialized Snapshot
* @throws IOException
*/
private void writeSnapshot(Snapshot snapshot, OutputStream outputStream) throws IOException {
StreamOutput stream = new OutputStreamStreamOutput(outputStream);
if (isCompress()) {
stream = CompressorFactory.defaultCompressor().streamOutput(stream);
}
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
private void writeSnapshot(Snapshot snapshot, StreamOutput outputStream) throws IOException {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, outputStream);
builder.startObject();
snapshot.toXContent(builder, snapshotOnlyFormatParams);
builder.endObject();
@ -556,12 +578,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
* @return BytesStreamOutput representing JSON serialized global MetaData
* @throws IOException
*/
private void writeGlobalMetaData(MetaData metaData, OutputStream outputStream) throws IOException {
StreamOutput stream = new OutputStreamStreamOutput(outputStream);
if (isCompress()) {
stream = CompressorFactory.defaultCompressor().streamOutput(stream);
}
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
private void writeGlobalMetaData(MetaData metaData, StreamOutput outputStream) throws IOException {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, outputStream);
builder.startObject();
MetaData.Builder.toXContent(metaData, builder, snapshotOnlyFormatParams);
builder.endObject();

View File

@ -38,12 +38,12 @@ import java.util.concurrent.CountDownLatch;
/**
* Test streaming compression (e.g. used for recovery)
*/
public class CompressedStreamTests extends ElasticsearchTestCase {
@Override
public void setUp() throws Exception {
super.setUp();
CompressorFactory.configure(Settings.settingsBuilder().put("compress.default.type", "lzf").build());
public abstract class AbstractCompressedStreamTests extends ElasticsearchTestCase {
private final Compressor compressor;
protected AbstractCompressedStreamTests(Compressor compressor) {
this.compressor = compressor;
}
public void testRandom() throws IOException {
@ -54,7 +54,7 @@ public class CompressedStreamTests extends ElasticsearchTestCase {
doTest(bytes);
}
}
public void testRandomThreads() throws Exception {
final Random r = getRandom();
int threadCount = TestUtil.nextInt(r, 2, 10);
@ -85,7 +85,7 @@ public class CompressedStreamTests extends ElasticsearchTestCase {
t.join();
}
}
public void testLineDocs() throws IOException {
Random r = getRandom();
LineFileDocs lineFileDocs = new LineFileDocs(r);
@ -100,7 +100,7 @@ public class CompressedStreamTests extends ElasticsearchTestCase {
}
lineFileDocs.close();
}
public void testLineDocsThreads() throws Exception {
final Random r = getRandom();
int threadCount = TestUtil.nextInt(r, 2, 10);
@ -137,7 +137,7 @@ public class CompressedStreamTests extends ElasticsearchTestCase {
t.join();
}
}
public void testRepetitionsL() throws IOException {
Random r = getRandom();
for (int i = 0; i < 10; i++) {
@ -160,7 +160,7 @@ public class CompressedStreamTests extends ElasticsearchTestCase {
doTest(bos.toByteArray());
}
}
public void testRepetitionsLThreads() throws Exception {
final Random r = getRandom();
int threadCount = TestUtil.nextInt(r, 2, 10);
@ -205,7 +205,7 @@ public class CompressedStreamTests extends ElasticsearchTestCase {
t.join();
}
}
public void testRepetitionsI() throws IOException {
Random r = getRandom();
for (int i = 0; i < 10; i++) {
@ -224,7 +224,7 @@ public class CompressedStreamTests extends ElasticsearchTestCase {
doTest(bos.toByteArray());
}
}
public void testRepetitionsIThreads() throws Exception {
final Random r = getRandom();
int threadCount = TestUtil.nextInt(r, 2, 10);
@ -265,7 +265,7 @@ public class CompressedStreamTests extends ElasticsearchTestCase {
t.join();
}
}
public void testRepetitionsS() throws IOException {
Random r = getRandom();
for (int i = 0; i < 10; i++) {
@ -348,7 +348,7 @@ public class CompressedStreamTests extends ElasticsearchTestCase {
r.nextBytes(bytes);
bos.write(bytes);
}
public void testRepetitionsSThreads() throws Exception {
final Random r = getRandom();
int threadCount = TestUtil.nextInt(r, 2, 10);
@ -387,16 +387,16 @@ public class CompressedStreamTests extends ElasticsearchTestCase {
t.join();
}
}
private void doTest(byte bytes[]) throws IOException {
ByteBuffer bb = ByteBuffer.wrap(bytes);
StreamInput rawIn = new ByteBufferStreamInput(bb);
Compressor c = CompressorFactory.defaultCompressor();
Compressor c = compressor;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
OutputStreamStreamOutput rawOs = new OutputStreamStreamOutput(bos);
StreamOutput os = c.streamOutput(rawOs);
Random r = getRandom();
int bufferSize = r.nextBoolean() ? 65535 : TestUtil.nextInt(getRandom(), 1, 70000);
int prepadding = r.nextInt(70000);
@ -409,27 +409,27 @@ public class CompressedStreamTests extends ElasticsearchTestCase {
}
os.close();
rawIn.close();
// now we have compressed byte array
byte compressed[] = bos.toByteArray();
ByteBuffer bb2 = ByteBuffer.wrap(compressed);
StreamInput compressedIn = new ByteBufferStreamInput(bb2);
StreamInput in = c.streamInput(compressedIn);
// randomize constants again
bufferSize = r.nextBoolean() ? 65535 : TestUtil.nextInt(getRandom(), 1, 70000);
prepadding = r.nextInt(70000);
postpadding = r.nextInt(70000);
buffer = new byte[prepadding + bufferSize + postpadding];
r.nextBytes(buffer); // fill block completely with junk
ByteArrayOutputStream uncompressedOut = new ByteArrayOutputStream();
while ((len = in.read(buffer, prepadding, bufferSize)) != -1) {
uncompressedOut.write(buffer, prepadding, len);
}
uncompressedOut.close();
assertArrayEquals(bytes, uncompressedOut.toByteArray());
}
}

View File

@ -23,10 +23,8 @@ import org.apache.lucene.util.TestUtil;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Random;
@ -37,11 +35,12 @@ import static org.hamcrest.Matchers.not;
/**
*
*/
public class CompressedXContentTests extends ElasticsearchTestCase {
public abstract class AbstractCompressedXContentTests extends ElasticsearchTestCase {
@Test
public void simpleTestsLZF() throws IOException {
simpleTests("lzf");
private final Compressor compressor;
protected AbstractCompressedXContentTests(Compressor compressor) {
this.compressor = compressor;
}
private void assertEquals(CompressedXContent s1, CompressedXContent s2) {
@ -50,38 +49,44 @@ public class CompressedXContentTests extends ElasticsearchTestCase {
assertEquals(s1.hashCode(), s2.hashCode());
}
public void simpleTests(String compressor) throws IOException {
CompressorFactory.configure(Settings.settingsBuilder().put("compress.default.type", compressor).build());
String str = "---\nf:this is a simple string";
CompressedXContent cstr = new CompressedXContent(str);
assertThat(cstr.string(), equalTo(str));
assertThat(new CompressedXContent(str), equalTo(cstr));
public void simpleTests() throws IOException {
Compressor defaultCompressor = CompressorFactory.defaultCompressor();
try {
CompressorFactory.setDefaultCompressor(compressor);
String str = "---\nf:this is a simple string";
CompressedXContent cstr = new CompressedXContent(str);
assertThat(cstr.string(), equalTo(str));
assertThat(new CompressedXContent(str), equalTo(cstr));
String str2 = "---\nf:this is a simple string 2";
CompressedXContent cstr2 = new CompressedXContent(str2);
assertThat(cstr2.string(), not(equalTo(str)));
assertThat(new CompressedXContent(str2), not(equalTo(cstr)));
assertEquals(new CompressedXContent(str2), cstr2);
String str2 = "---\nf:this is a simple string 2";
CompressedXContent cstr2 = new CompressedXContent(str2);
assertThat(cstr2.string(), not(equalTo(str)));
assertThat(new CompressedXContent(str2), not(equalTo(cstr)));
assertEquals(new CompressedXContent(str2), cstr2);
} finally {
CompressorFactory.setDefaultCompressor(defaultCompressor);
}
}
public void testRandom() throws IOException {
String compressor = "lzf";
CompressorFactory.configure(Settings.settingsBuilder().put("compress.default.type", compressor).build());
Random r = getRandom();
for (int i = 0; i < 1000; i++) {
String string = TestUtil.randomUnicodeString(r, 10000);
// hack to make it detected as YAML
string = "---\n" + string;
CompressedXContent compressedXContent = new CompressedXContent(string);
assertThat(compressedXContent.string(), equalTo(string));
Compressor defaultCompressor = CompressorFactory.defaultCompressor();
try {
CompressorFactory.setDefaultCompressor(compressor);
Random r = getRandom();
for (int i = 0; i < 1000; i++) {
String string = TestUtil.randomUnicodeString(r, 10000);
// hack to make it detected as YAML
string = "---\n" + string;
CompressedXContent compressedXContent = new CompressedXContent(string);
assertThat(compressedXContent.string(), equalTo(string));
}
} finally {
CompressorFactory.setDefaultCompressor(defaultCompressor);
}
}
public void testDifferentCompressedRepresentation() throws Exception {
byte[] b = "---\nf:abcdefghijabcdefghij".getBytes("UTF-8");
CompressorFactory.defaultCompressor();
Compressor compressor = CompressorFactory.defaultCompressor();
BytesStreamOutput bout = new BytesStreamOutput();
StreamOutput out = compressor.streamOutput(bout);
out.writeBytes(b);

View File

@ -0,0 +1,30 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.deflate;
import org.elasticsearch.common.compress.AbstractCompressedStreamTests;
public class DeflateCompressedStreamTests extends AbstractCompressedStreamTests {
public DeflateCompressedStreamTests() {
super(new DeflateCompressor());
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.deflate;
import org.elasticsearch.common.compress.AbstractCompressedXContentTests;
public class DeflateXContentTests extends AbstractCompressedXContentTests {
public DeflateXContentTests() {
super(new DeflateCompressor());
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.common.compress;
package org.elasticsearch.common.compress.lzf;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamOutput;

View File

@ -23,7 +23,7 @@ import com.ning.compress.BufferRecycler;
import com.ning.compress.lzf.ChunkEncoder;
import com.ning.compress.lzf.LZFChunk;
import com.ning.compress.lzf.util.ChunkEncoderFactory;
import org.elasticsearch.common.compress.CompressedStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;

View File

@ -0,0 +1,30 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.lzf;
import org.elasticsearch.common.compress.AbstractCompressedStreamTests;
public class LZFCompressedStreamTests extends AbstractCompressedStreamTests {
public LZFCompressedStreamTests() {
super(new LZFTestCompressor());
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.lzf;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
// LZF compressor with write support, for testing only
public class LZFTestCompressor extends LZFCompressor {
@Override
public StreamOutput streamOutput(StreamOutput out) throws IOException {
return new LZFCompressedStreamOutput(out);
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.lzf;
import org.elasticsearch.common.compress.AbstractCompressedXContentTests;
public class LZFXContentTests extends AbstractCompressedXContentTests {
public LZFXContentTests() {
super(new LZFTestCompressor());
}
}

View File

@ -24,8 +24,9 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.lzf.LZFCompressor;
import org.elasticsearch.common.compress.lzf.LZFTestCompressor;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -41,10 +42,15 @@ public class SearchSourceCompressTests extends ElasticsearchSingleNodeTest {
@Test
public void testSourceCompressionLZF() throws IOException {
CompressorFactory.setDefaultCompressor(new LZFCompressor());
verifySource(true);
verifySource(false);
verifySource(null);
final Compressor defaultCompressor = CompressorFactory.defaultCompressor();
try {
CompressorFactory.setDefaultCompressor(new LZFTestCompressor());
verifySource(true);
verifySource(false);
verifySource(null);
} finally {
CompressorFactory.setDefaultCompressor(defaultCompressor);
}
}
private void verifySource(Boolean compress) throws IOException {