From 690820dae3b82e2140ea720af3dfca5aacc02d89 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 21 Jul 2014 23:00:07 +0300 Subject: [PATCH] [Discovery] remove unneeded cluster state serialization during cluster join process At the moment we serialize the cluster state in JoinResponse and ValidateJoinRequest. However this state is not used anywhere and can be removed to save on network overhead Closes #6949 --- .../discovery/zen/ZenDiscovery.java | 7 +-- .../zen/membership/MembershipAction.java | 58 +++++++++++-------- 2 files changed, 38 insertions(+), 27 deletions(-) diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 95bdaef7fa6..d1191543398 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -154,7 +154,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings); this.pingService.setNodesProvider(this); - this.membership = new MembershipAction(settings, transportService, this, new MembershipListener()); + this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener()); transportService.registerHandler(RejoinClusterRequestHandler.ACTION, new RejoinClusterRequestHandler()); } @@ -720,11 +720,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } else { // try and connect to the node, if it fails, we can raise an exception back to the client... transportService.connectToNode(node); - ClusterState state = clusterService.state(); // validate the join request, will throw a failure if it fails, which will get back to the // node calling the join request - membership.sendValidateJoinRequestBlocking(node, state, joinTimeout); + membership.sendValidateJoinRequestBlocking(node, joinTimeout); clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() { @Override @@ -755,7 +754,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - callback.onSuccess(newState); + callback.onSuccess(); } }); } diff --git a/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java b/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java index 9bef402e0b7..caffd2b618d 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java @@ -20,6 +20,8 @@ package org.elasticsearch.discovery.zen.membership; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; @@ -40,7 +42,7 @@ import java.util.concurrent.TimeUnit; public class MembershipAction extends AbstractComponent { public static interface JoinCallback { - void onSuccess(ClusterState state); + void onSuccess(); void onFailure(Throwable t); } @@ -57,11 +59,14 @@ public class MembershipAction extends AbstractComponent { private final MembershipListener listener; - public MembershipAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, MembershipListener listener) { + private final ClusterService clusterService; + + public MembershipAction(Settings settings, ClusterService clusterService, TransportService transportService, DiscoveryNodesProvider nodesProvider, MembershipListener listener) { super(settings); this.transportService = transportService; this.nodesProvider = nodesProvider; this.listener = listener; + this.clusterService = clusterService; transportService.registerHandler(JoinRequestRequestHandler.ACTION, new JoinRequestRequestHandler()); transportService.registerHandler(ValidateJoinRequestRequestHandler.ACTION, new ValidateJoinRequestRequestHandler()); @@ -83,23 +88,23 @@ public class MembershipAction extends AbstractComponent { } public void sendJoinRequest(DiscoveryNode masterNode, DiscoveryNode node) { - transportService.sendRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node, false), EmptyTransportResponseHandler.INSTANCE_SAME); + transportService.sendRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node), EmptyTransportResponseHandler.INSTANCE_SAME); } - public ClusterState sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) throws ElasticsearchException { - return transportService.submitRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node, true), new FutureTransportResponseHandler() { + public void sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) throws ElasticsearchException { + transportService.submitRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node), new FutureTransportResponseHandler() { @Override public JoinResponse newInstance() { return new JoinResponse(); } - }).txGet(timeout.millis(), TimeUnit.MILLISECONDS).clusterState; + }).txGet(timeout.millis(), TimeUnit.MILLISECONDS); } /** * Validates the join request, throwing a failure if it failed. */ - public void sendValidateJoinRequestBlocking(DiscoveryNode node, ClusterState clusterState, TimeValue timeout) throws ElasticsearchException { - transportService.submitRequest(node, ValidateJoinRequestRequestHandler.ACTION, new ValidateJoinRequest(clusterState), EmptyTransportResponseHandler.INSTANCE_SAME) + public void sendValidateJoinRequestBlocking(DiscoveryNode node, TimeValue timeout) throws ElasticsearchException { + transportService.submitRequest(node, ValidateJoinRequestRequestHandler.ACTION, new ValidateJoinRequest(), EmptyTransportResponseHandler.INSTANCE_SAME) .txGet(timeout.millis(), TimeUnit.MILLISECONDS); } @@ -107,31 +112,39 @@ public class MembershipAction extends AbstractComponent { DiscoveryNode node; - boolean withClusterState; + // here for backward compatibility. nodes with a version lower than 1.4.0 send this flag + boolean withClusterState = false; private JoinRequest() { } - private JoinRequest(DiscoveryNode node, boolean withClusterState) { + private JoinRequest(DiscoveryNode node) { this.node = node; - this.withClusterState = withClusterState; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); node = DiscoveryNode.readNode(in); - withClusterState = in.readBoolean(); + if (in.getVersion().before(Version.V_1_4_0)) { + withClusterState = in.readBoolean(); + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); node.writeTo(out); - out.writeBoolean(withClusterState); + if (out.getVersion().before(Version.V_1_4_0)) { + // old with cluster state flag + out.writeBoolean(false); + } } } + + // used to reply to nodes from a version older than 1.4.0 which may expect this + @Deprecated class JoinResponse extends TransportResponse { ClusterState clusterState; @@ -169,10 +182,11 @@ public class MembershipAction extends AbstractComponent { public void messageReceived(final JoinRequest request, final TransportChannel channel) throws Exception { listener.onJoin(request.node, new JoinCallback() { @Override - public void onSuccess(ClusterState state) { + public void onSuccess() { try { + // nodes from a version older than 1.4.0 may ask for this if (request.withClusterState) { - channel.sendResponse(new JoinResponse(state)); + channel.sendResponse(new JoinResponse(clusterService.state())); } else { channel.sendResponse(TransportResponse.Empty.INSTANCE); } @@ -200,25 +214,23 @@ public class MembershipAction extends AbstractComponent { class ValidateJoinRequest extends TransportRequest { - ClusterState clusterState; - ValidateJoinRequest() { } - ValidateJoinRequest(ClusterState clusterState) { - this.clusterState = clusterState; - } - @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); + if (in.getVersion().before(Version.V_1_4_0)) { + ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - ClusterState.Builder.writeTo(clusterState, out); + if (out.getVersion().before(Version.V_1_4_0)) { + ClusterState.Builder.writeTo(clusterService.state(), out); + } } }