diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 95bdaef7fa6..d1191543398 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -154,7 +154,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings); this.pingService.setNodesProvider(this); - this.membership = new MembershipAction(settings, transportService, this, new MembershipListener()); + this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener()); transportService.registerHandler(RejoinClusterRequestHandler.ACTION, new RejoinClusterRequestHandler()); } @@ -720,11 +720,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } else { // try and connect to the node, if it fails, we can raise an exception back to the client... transportService.connectToNode(node); - ClusterState state = clusterService.state(); // validate the join request, will throw a failure if it fails, which will get back to the // node calling the join request - membership.sendValidateJoinRequestBlocking(node, state, joinTimeout); + membership.sendValidateJoinRequestBlocking(node, joinTimeout); clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() { @Override @@ -755,7 +754,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - callback.onSuccess(newState); + callback.onSuccess(); } }); } diff --git a/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java b/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java index 9bef402e0b7..caffd2b618d 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java @@ -20,6 +20,8 @@ package org.elasticsearch.discovery.zen.membership; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; @@ -40,7 +42,7 @@ import java.util.concurrent.TimeUnit; public class MembershipAction extends AbstractComponent { public static interface JoinCallback { - void onSuccess(ClusterState state); + void onSuccess(); void onFailure(Throwable t); } @@ -57,11 +59,14 @@ public class MembershipAction extends AbstractComponent { private final MembershipListener listener; - public MembershipAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, MembershipListener listener) { + private final ClusterService clusterService; + + public MembershipAction(Settings settings, ClusterService clusterService, TransportService transportService, DiscoveryNodesProvider nodesProvider, MembershipListener listener) { super(settings); this.transportService = transportService; this.nodesProvider = nodesProvider; this.listener = listener; + this.clusterService = clusterService; transportService.registerHandler(JoinRequestRequestHandler.ACTION, new JoinRequestRequestHandler()); transportService.registerHandler(ValidateJoinRequestRequestHandler.ACTION, new ValidateJoinRequestRequestHandler()); @@ -83,23 +88,23 @@ public class MembershipAction extends AbstractComponent { } public void sendJoinRequest(DiscoveryNode masterNode, DiscoveryNode node) { - transportService.sendRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node, false), EmptyTransportResponseHandler.INSTANCE_SAME); + transportService.sendRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node), EmptyTransportResponseHandler.INSTANCE_SAME); } - public ClusterState sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) throws ElasticsearchException { - return transportService.submitRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node, true), new FutureTransportResponseHandler() { + public void sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) throws ElasticsearchException { + transportService.submitRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node), new FutureTransportResponseHandler() { @Override public JoinResponse newInstance() { return new JoinResponse(); } - }).txGet(timeout.millis(), TimeUnit.MILLISECONDS).clusterState; + }).txGet(timeout.millis(), TimeUnit.MILLISECONDS); } /** * Validates the join request, throwing a failure if it failed. */ - public void sendValidateJoinRequestBlocking(DiscoveryNode node, ClusterState clusterState, TimeValue timeout) throws ElasticsearchException { - transportService.submitRequest(node, ValidateJoinRequestRequestHandler.ACTION, new ValidateJoinRequest(clusterState), EmptyTransportResponseHandler.INSTANCE_SAME) + public void sendValidateJoinRequestBlocking(DiscoveryNode node, TimeValue timeout) throws ElasticsearchException { + transportService.submitRequest(node, ValidateJoinRequestRequestHandler.ACTION, new ValidateJoinRequest(), EmptyTransportResponseHandler.INSTANCE_SAME) .txGet(timeout.millis(), TimeUnit.MILLISECONDS); } @@ -107,31 +112,39 @@ public class MembershipAction extends AbstractComponent { DiscoveryNode node; - boolean withClusterState; + // here for backward compatibility. nodes with a version lower than 1.4.0 send this flag + boolean withClusterState = false; private JoinRequest() { } - private JoinRequest(DiscoveryNode node, boolean withClusterState) { + private JoinRequest(DiscoveryNode node) { this.node = node; - this.withClusterState = withClusterState; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); node = DiscoveryNode.readNode(in); - withClusterState = in.readBoolean(); + if (in.getVersion().before(Version.V_1_4_0)) { + withClusterState = in.readBoolean(); + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); node.writeTo(out); - out.writeBoolean(withClusterState); + if (out.getVersion().before(Version.V_1_4_0)) { + // old with cluster state flag + out.writeBoolean(false); + } } } + + // used to reply to nodes from a version older than 1.4.0 which may expect this + @Deprecated class JoinResponse extends TransportResponse { ClusterState clusterState; @@ -169,10 +182,11 @@ public class MembershipAction extends AbstractComponent { public void messageReceived(final JoinRequest request, final TransportChannel channel) throws Exception { listener.onJoin(request.node, new JoinCallback() { @Override - public void onSuccess(ClusterState state) { + public void onSuccess() { try { + // nodes from a version older than 1.4.0 may ask for this if (request.withClusterState) { - channel.sendResponse(new JoinResponse(state)); + channel.sendResponse(new JoinResponse(clusterService.state())); } else { channel.sendResponse(TransportResponse.Empty.INSTANCE); } @@ -200,25 +214,23 @@ public class MembershipAction extends AbstractComponent { class ValidateJoinRequest extends TransportRequest { - ClusterState clusterState; - ValidateJoinRequest() { } - ValidateJoinRequest(ClusterState clusterState) { - this.clusterState = clusterState; - } - @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); + if (in.getVersion().before(Version.V_1_4_0)) { + ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode()); + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - ClusterState.Builder.writeTo(clusterState, out); + if (out.getVersion().before(Version.V_1_4_0)) { + ClusterState.Builder.writeTo(clusterService.state(), out); + } } }