diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 36f15fb86d6..bd8205b5621 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.UUID; @@ -128,7 +129,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener()); this.pingService.setNodesProvider(this); - this.membership = new MembershipAction(settings, transportService, new MembershipListener()); + this.membership = new MembershipAction(settings, transportService, this, new MembershipListener()); } @Override protected void doStart() throws ElasticSearchException { @@ -268,8 +269,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen continue; } // send join request + ClusterState clusterState; try { - membership.sendJoinRequestBlocking(masterNode, localNode, initialPingTimeout); + clusterState = membership.sendJoinRequestBlocking(masterNode, localNode, initialPingTimeout); } catch (Exception e) { if (e instanceof ElasticSearchException) { logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ((ElasticSearchException) e).getDetailedMessage()); @@ -284,10 +286,15 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen continue; } masterFD.start(masterNode, "initial_join"); + + // we update the metadata once we managed to join, so we pre-create indices and so on (no shards allocation) + final MetaData metaData = clusterState.metaData(); clusterService.submitStateUpdateTask("zen-disco-join (detected master)", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(NO_MASTER_BLOCK).build(); - return newClusterStateBuilder().state(currentState).blocks(clusterBlocks).build(); + // make sure we have the local node id set, we might need it as a result of the new metadata + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.newNodesBuilder().putAll(currentState.nodes()).put(localNode).localNodeId(localNode.id()); + return newClusterStateBuilder().state(currentState).nodes(nodesBuilder).blocks(clusterBlocks).metaData(metaData).build(); } @Override public void clusterStateProcessed(ClusterState clusterState) { @@ -508,8 +515,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } private class MembershipListener implements MembershipAction.MembershipListener { - @Override public void onJoin(DiscoveryNode node) { + @Override public ClusterState onJoin(DiscoveryNode node) { handleJoinRequest(node); + return clusterService.state(); } @Override public void onLeave(DiscoveryNode node) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java index fffb69f9069..31c62a797b3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.discovery.zen.membership; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; @@ -28,10 +29,8 @@ import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.VoidStreamable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.transport.BaseTransportRequestHandler; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.transport.VoidTransportResponseHandler; +import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; +import org.elasticsearch.transport.*; import java.io.IOException; import java.util.concurrent.TimeUnit; @@ -42,18 +41,21 @@ import java.util.concurrent.TimeUnit; public class MembershipAction extends AbstractComponent { public static interface MembershipListener { - void onJoin(DiscoveryNode node); + ClusterState onJoin(DiscoveryNode node); void onLeave(DiscoveryNode node); } private final TransportService transportService; + private final DiscoveryNodesProvider nodesProvider; + private final MembershipListener listener; - public MembershipAction(Settings settings, TransportService transportService, MembershipListener listener) { + public MembershipAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, MembershipListener listener) { super(settings); this.transportService = transportService; + this.nodesProvider = nodesProvider; this.listener = listener; transportService.registerHandler(JoinRequestRequestHandler.ACTION, new JoinRequestRequestHandler()); @@ -74,30 +76,59 @@ public class MembershipAction extends AbstractComponent { } public void sendJoinRequest(DiscoveryNode masterNode, DiscoveryNode node) { - transportService.sendRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node), VoidTransportResponseHandler.INSTANCE_NOSPAWN); + transportService.sendRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node, false), VoidTransportResponseHandler.INSTANCE_NOSPAWN); } - public void sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) throws ElasticSearchException { - transportService.submitRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node), VoidTransportResponseHandler.INSTANCE_NOSPAWN).txGet(timeout.millis(), TimeUnit.MILLISECONDS); + public ClusterState sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) throws ElasticSearchException { + return transportService.submitRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node, true), new FutureTransportResponseHandler() { + @Override public JoinResponse newInstance() { + return new JoinResponse(); + } + }).txGet(timeout.millis(), TimeUnit.MILLISECONDS).clusterState; } - private static class JoinRequest implements Streamable { + static class JoinRequest implements Streamable { - private DiscoveryNode node; + DiscoveryNode node; + + boolean withClusterState; private JoinRequest() { } - private JoinRequest(DiscoveryNode node) { + private JoinRequest(DiscoveryNode node, boolean withClusterState) { this.node = node; + this.withClusterState = withClusterState; } @Override public void readFrom(StreamInput in) throws IOException { node = DiscoveryNode.readNode(in); + withClusterState = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { node.writeTo(out); + out.writeBoolean(withClusterState); + } + } + + class JoinResponse implements Streamable { + + ClusterState clusterState; + + JoinResponse() { + } + + JoinResponse(ClusterState clusterState) { + this.clusterState = clusterState; + } + + @Override public void readFrom(StreamInput in) throws IOException { + clusterState = ClusterState.Builder.readFrom(in, settings, nodesProvider.nodes().localNode()); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + ClusterState.Builder.writeTo(clusterState, out); } } @@ -110,8 +141,12 @@ public class MembershipAction extends AbstractComponent { } @Override public void messageReceived(JoinRequest request, TransportChannel channel) throws Exception { - listener.onJoin(request.node); - channel.sendResponse(VoidStreamable.INSTANCE); + ClusterState clusterState = listener.onJoin(request.node); + if (request.withClusterState) { + channel.sendResponse(new JoinResponse(clusterState)); + } else { + channel.sendResponse(VoidStreamable.INSTANCE); + } } }