diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 629b7e0ef89..a14faeeb6d3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -22,10 +22,7 @@ package org.elasticsearch.discovery.zen.publish; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.io.stream.VoidStreamable; +import org.elasticsearch.common.io.stream.*; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.threadpool.ThreadPool; @@ -48,8 +45,6 @@ public class PublishClusterStateAction extends AbstractComponent { private final NewClusterStateListener listener; - private final boolean compesss; - public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, NewClusterStateListener listener) { super(settings); @@ -57,8 +52,6 @@ public class PublishClusterStateAction extends AbstractComponent { this.nodesProvider = nodesProvider; this.listener = listener; - this.compesss = componentSettings.getAsBoolean("compress", true); - transportService.registerHandler(PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequestHandler()); } @@ -68,14 +61,28 @@ public class PublishClusterStateAction extends AbstractComponent { public void publish(ClusterState clusterState) { DiscoveryNode localNode = nodesProvider.nodes().localNode(); + + // serialize the cluster state here, so we won't do it several times per node + byte[] clusterStateInBytes; + try { + HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesLzfBytes(); + ClusterState.Builder.writeTo(clusterState, stream); + stream.flush(); + BytesStreamOutput wrapped = ((BytesStreamOutput) ((LZFStreamOutput) stream.wrappedOut()).wrappedOut()); + clusterStateInBytes = wrapped.copiedByteArray(); + } catch (Exception e) { + logger.warn("failed to serialize cluster_state before publishing it to nodes", e); + return; + } + for (final DiscoveryNode node : clusterState.nodes()) { if (node.equals(localNode)) { // no need to send to our self continue; } transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, - new PublishClusterStateRequest(clusterState), - TransportRequestOptions.options().withHighType().withCompress(compesss), + new PublishClusterStateRequest(clusterStateInBytes), + TransportRequestOptions.options().withHighType().withCompress(false), // no need to compress, we already compressed the bytes new VoidTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { @@ -87,21 +94,23 @@ public class PublishClusterStateAction extends AbstractComponent { private class PublishClusterStateRequest implements Streamable { - private ClusterState clusterState; + private byte[] clusterStateInBytes; private PublishClusterStateRequest() { } - private PublishClusterStateRequest(ClusterState clusterState) { - this.clusterState = clusterState; + private PublishClusterStateRequest(byte[] clusterStateInBytes) { + this.clusterStateInBytes = clusterStateInBytes; } @Override public void readFrom(StreamInput in) throws IOException { - clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); + clusterStateInBytes = new byte[in.readVInt()]; + in.readFully(clusterStateInBytes); } @Override public void writeTo(StreamOutput out) throws IOException { - ClusterState.Builder.writeTo(clusterState, out); + out.writeVInt(clusterStateInBytes.length); + out.writeBytes(clusterStateInBytes); } } @@ -114,7 +123,9 @@ public class PublishClusterStateAction extends AbstractComponent { } @Override public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception { - listener.onNewClusterState(request.clusterState); + StreamInput in = CachedStreamInput.cachedHandlesLzf(new BytesStreamInput(request.clusterStateInBytes)); + ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); + listener.onNewClusterState(clusterState); channel.sendResponse(VoidStreamable.INSTANCE); }