diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java index e95d4bf72da..dd52e43efcc 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesOperationAction.java @@ -76,6 +76,10 @@ public abstract class TransportNodesOperationAction() { @Override public void onResponse(Response response) { + TransportResponseOptions options = TransportResponseOptions.options().withCompress(transportCompress()); try { - channel.sendResponse(response); + channel.sendResponse(response, options); } catch (Exception e) { onFailure(e); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java index 67a8b634f4d..4b5f48b554e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java @@ -26,6 +26,11 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.*; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.Lists; +import org.elasticsearch.common.compress.lzf.LZFDecoder; +import org.elasticsearch.common.compress.lzf.LZFEncoder; +import org.elasticsearch.common.io.stream.BytesStreamInput; +import org.elasticsearch.common.io.stream.CachedStreamInput; +import org.elasticsearch.common.io.stream.LZFStreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.*; @@ -53,6 +58,8 @@ public abstract class BlobStoreGateway extends SharedStorageGateway { private ImmutableBlobContainer metaDataBlobContainer; + private boolean compress; + private volatile int currentIndex; protected BlobStoreGateway(Settings settings, ThreadPool threadPool, ClusterService clusterService) { @@ -65,6 +72,7 @@ public abstract class BlobStoreGateway extends SharedStorageGateway { this.basePath = BlobPath.cleanPath().add(clusterName.value()); this.metaDataBlobContainer = blobStore.immutableBlobContainer(basePath.add("metadata")); this.currentIndex = findLatestIndex(); + this.compress = componentSettings.getAsBoolean("compress", true); logger.debug("Latest metadata found at index [" + currentIndex + "]"); } @@ -137,7 +145,6 @@ public abstract class BlobStoreGateway extends SharedStorageGateway { XContentBuilder builder; try { builder = XContentFactory.contentBuilder(XContentType.JSON); - builder.prettyPrint(); builder.startObject(); MetaData.Builder.toXContent(metaData, builder, ToXContent.EMPTY_PARAMS); builder.endObject(); @@ -146,7 +153,15 @@ public abstract class BlobStoreGateway extends SharedStorageGateway { } try { - metaDataBlobContainer.writeBlob(newMetaData, new ByteArrayInputStream(builder.unsafeBytes(), 0, builder.unsafeBytesLength()), builder.unsafeBytesLength()); + byte[] data = builder.unsafeBytes(); + int size = builder.unsafeBytesLength(); + + if (compress) { + data = LZFEncoder.encode(data, size); + size = data.length; + } + + metaDataBlobContainer.writeBlob(newMetaData, new ByteArrayInputStream(data, 0, size), size); } catch (IOException e) { throw new GatewayException("Failed to write metadata [" + newMetaData + "]", e); } @@ -191,7 +206,13 @@ public abstract class BlobStoreGateway extends SharedStorageGateway { private MetaData readMetaData(byte[] data) throws IOException { XContentParser parser = null; try { - parser = XContentFactory.xContent(XContentType.JSON).createParser(data); + if (LZFDecoder.isCompressed(data)) { + BytesStreamInput siBytes = new BytesStreamInput(data); + LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); + parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf); + } else { + parser = XContentFactory.xContent(XContentType.JSON).createParser(data); + } return MetaData.Builder.fromXContent(parser); } finally { if (parser != null) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java index 2d2ad32723e..b7858b8a43b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java @@ -31,10 +31,15 @@ import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.compress.lzf.LZFDecoder; +import org.elasticsearch.common.compress.lzf.LZFOutputStream; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.BytesStreamInput; +import org.elasticsearch.common.io.stream.CachedStreamInput; +import org.elasticsearch.common.io.stream.LZFStreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.*; import org.elasticsearch.env.NodeEnvironment; @@ -65,6 +70,9 @@ public class LocalGateway extends AbstractLifecycleComponent implements private final TransportNodesListGatewayStartedShards listGatewayStartedShards; + + private final boolean compress; + private volatile LocalGatewayMetaState currentMetaState; private volatile LocalGatewayStartedShards currentStartedShards; @@ -80,6 +88,8 @@ public class LocalGateway extends AbstractLifecycleComponent implements this.nodeEnv = nodeEnv; this.listGatewayMetaState = listGatewayMetaState.initGateway(this); this.listGatewayStartedShards = listGatewayStartedShards.initGateway(this); + + this.compress = componentSettings.getAsBoolean("compress", true); } @Override public String type() { @@ -181,13 +191,15 @@ public class LocalGateway extends AbstractLifecycleComponent implements try { LocalGatewayMetaState stateToWrite = builder.build(); XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON); - xContentBuilder.prettyPrint(); xContentBuilder.startObject(); LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); xContentBuilder.endObject(); File stateFile = new File(location, "metadata-" + event.state().version()); - FileOutputStream fos = new FileOutputStream(stateFile); + OutputStream fos = new FileOutputStream(stateFile); + if (compress) { + fos = new LZFOutputStream(fos); + } fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength()); fos.close(); @@ -246,13 +258,15 @@ public class LocalGateway extends AbstractLifecycleComponent implements try { LocalGatewayStartedShards stateToWrite = builder.build(); XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON); - xContentBuilder.prettyPrint(); xContentBuilder.startObject(); LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); xContentBuilder.endObject(); File stateFile = new File(location, "shards-" + event.state().version()); - FileOutputStream fos = new FileOutputStream(stateFile); + OutputStream fos = new FileOutputStream(stateFile); + if (compress) { + fos = new LZFOutputStream(fos); + } fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength()); fos.close(); @@ -376,7 +390,13 @@ public class LocalGateway extends AbstractLifecycleComponent implements private LocalGatewayMetaState readMetaState(byte[] data) throws IOException { XContentParser parser = null; try { - parser = XContentFactory.xContent(XContentType.JSON).createParser(data); + if (LZFDecoder.isCompressed(data)) { + BytesStreamInput siBytes = new BytesStreamInput(data); + LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); + parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf); + } else { + parser = XContentFactory.xContent(XContentType.JSON).createParser(data); + } return LocalGatewayMetaState.Builder.fromXContent(parser); } finally { if (parser != null) { @@ -388,7 +408,13 @@ public class LocalGateway extends AbstractLifecycleComponent implements private LocalGatewayStartedShards readStartedShards(byte[] data) throws IOException { XContentParser parser = null; try { - parser = XContentFactory.xContent(XContentType.JSON).createParser(data); + if (LZFDecoder.isCompressed(data)) { + BytesStreamInput siBytes = new BytesStreamInput(data); + LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); + parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf); + } else { + parser = XContentFactory.xContent(XContentType.JSON).createParser(data); + } return LocalGatewayStartedShards.Builder.fromXContent(parser); } finally { if (parser != null) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayMetaState.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayMetaState.java index 4eff271ccac..ee6399b8b2a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayMetaState.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayMetaState.java @@ -73,6 +73,10 @@ public class TransportNodesListGatewayMetaState extends TransportNodesOperationA return "/gateway/local/meta-state/node"; } + @Override protected boolean transportCompress() { + return true; // compress since the metadata can become large + } + @Override protected Request newRequest() { return new Request(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayStartedShards.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayStartedShards.java index 0c2d0a3a9cd..3ecfdaf4990 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayStartedShards.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/TransportNodesListGatewayStartedShards.java @@ -73,6 +73,10 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat return "/gateway/local/started-shards/node"; } + @Override protected boolean transportCompress() { + return true; // this can become big... + } + @Override protected Request newRequest() { return new Request(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java index bffbbe5ee55..501b2dcc966 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java @@ -32,8 +32,8 @@ public class TransportResponseOptions { private boolean compress; - public TransportResponseOptions withCompress() { - this.compress = true; + public TransportResponseOptions withCompress(boolean compress) { + this.compress = compress; return this; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index adafb73ed53..32c402464ce 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -67,7 +67,7 @@ public class NettyTransportChannel implements TransportChannel { @Override public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException { if (transport.compress) { - options.withCompress(); + options.withCompress(true); } byte[] data = TransportStreams.buildResponse(requestId, message, options); ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java index 70eb4cb9f20..5ec8bb16a34 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java @@ -130,7 +130,7 @@ public abstract class AbstractSimpleTransportTests { @Override public void messageReceived(StringMessage request, TransportChannel channel) { assertThat("moshe", equalTo(request.message)); try { - channel.sendResponse(new StringMessage("hello " + request.message), TransportResponseOptions.options().withCompress()); + channel.sendResponse(new StringMessage("hello " + request.message), TransportResponseOptions.options().withCompress(true)); } catch (IOException e) { e.printStackTrace(); assertThat(e.getMessage(), false, equalTo(true));