only serialize the cluster state ones when publishing it to multiple nodes

This commit is contained in:
kimchy 2011-04-13 15:36:28 +03:00
parent e9bc7f7d95
commit b80324db08
1 changed files with 27 additions and 16 deletions

View File

@ -22,10 +22,7 @@ package org.elasticsearch.discovery.zen.publish;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -48,8 +45,6 @@ public class PublishClusterStateAction extends AbstractComponent {
private final NewClusterStateListener listener; private final NewClusterStateListener listener;
private final boolean compesss;
public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider,
NewClusterStateListener listener) { NewClusterStateListener listener) {
super(settings); super(settings);
@ -57,8 +52,6 @@ public class PublishClusterStateAction extends AbstractComponent {
this.nodesProvider = nodesProvider; this.nodesProvider = nodesProvider;
this.listener = listener; this.listener = listener;
this.compesss = componentSettings.getAsBoolean("compress", true);
transportService.registerHandler(PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequestHandler()); transportService.registerHandler(PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequestHandler());
} }
@ -68,14 +61,28 @@ public class PublishClusterStateAction extends AbstractComponent {
public void publish(ClusterState clusterState) { public void publish(ClusterState clusterState) {
DiscoveryNode localNode = nodesProvider.nodes().localNode(); DiscoveryNode localNode = nodesProvider.nodes().localNode();
// serialize the cluster state here, so we won't do it several times per node
byte[] clusterStateInBytes;
try {
HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesLzfBytes();
ClusterState.Builder.writeTo(clusterState, stream);
stream.flush();
BytesStreamOutput wrapped = ((BytesStreamOutput) ((LZFStreamOutput) stream.wrappedOut()).wrappedOut());
clusterStateInBytes = wrapped.copiedByteArray();
} catch (Exception e) {
logger.warn("failed to serialize cluster_state before publishing it to nodes", e);
return;
}
for (final DiscoveryNode node : clusterState.nodes()) { for (final DiscoveryNode node : clusterState.nodes()) {
if (node.equals(localNode)) { if (node.equals(localNode)) {
// no need to send to our self // no need to send to our self
continue; continue;
} }
transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION,
new PublishClusterStateRequest(clusterState), new PublishClusterStateRequest(clusterStateInBytes),
TransportRequestOptions.options().withHighType().withCompress(compesss), TransportRequestOptions.options().withHighType().withCompress(false), // no need to compress, we already compressed the bytes
new VoidTransportResponseHandler(ThreadPool.Names.SAME) { new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override public void handleException(TransportException exp) { @Override public void handleException(TransportException exp) {
@ -87,21 +94,23 @@ public class PublishClusterStateAction extends AbstractComponent {
private class PublishClusterStateRequest implements Streamable { private class PublishClusterStateRequest implements Streamable {
private ClusterState clusterState; private byte[] clusterStateInBytes;
private PublishClusterStateRequest() { private PublishClusterStateRequest() {
} }
private PublishClusterStateRequest(ClusterState clusterState) { private PublishClusterStateRequest(byte[] clusterStateInBytes) {
this.clusterState = clusterState; this.clusterStateInBytes = clusterStateInBytes;
} }
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); clusterStateInBytes = new byte[in.readVInt()];
in.readFully(clusterStateInBytes);
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
ClusterState.Builder.writeTo(clusterState, out); out.writeVInt(clusterStateInBytes.length);
out.writeBytes(clusterStateInBytes);
} }
} }
@ -114,7 +123,9 @@ public class PublishClusterStateAction extends AbstractComponent {
} }
@Override public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception { @Override public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception {
listener.onNewClusterState(request.clusterState); StreamInput in = CachedStreamInput.cachedHandlesLzf(new BytesStreamInput(request.clusterStateInBytes));
ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
listener.onNewClusterState(clusterState);
channel.sendResponse(VoidStreamable.INSTANCE); channel.sendResponse(VoidStreamable.INSTANCE);
} }