improve join process in cluster, fetch the cluster meta-data on join and handle new meta data

This commit is contained in:
kimchy 2010-07-14 09:28:56 +03:00
parent 2e2f22fbd6
commit 141506afc5
2 changed files with 61 additions and 18 deletions

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUID; import org.elasticsearch.common.UUID;
@ -128,7 +129,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener()); this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener());
this.pingService.setNodesProvider(this); this.pingService.setNodesProvider(this);
this.membership = new MembershipAction(settings, transportService, new MembershipListener()); this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());
} }
@Override protected void doStart() throws ElasticSearchException { @Override protected void doStart() throws ElasticSearchException {
@ -268,8 +269,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
continue; continue;
} }
// send join request // send join request
ClusterState clusterState;
try { try {
membership.sendJoinRequestBlocking(masterNode, localNode, initialPingTimeout); clusterState = membership.sendJoinRequestBlocking(masterNode, localNode, initialPingTimeout);
} catch (Exception e) { } catch (Exception e) {
if (e instanceof ElasticSearchException) { if (e instanceof ElasticSearchException) {
logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ((ElasticSearchException) e).getDetailedMessage()); logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ((ElasticSearchException) e).getDetailedMessage());
@ -284,10 +286,15 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
continue; continue;
} }
masterFD.start(masterNode, "initial_join"); masterFD.start(masterNode, "initial_join");
// we update the metadata once we managed to join, so we pre-create indices and so on (no shards allocation)
final MetaData metaData = clusterState.metaData();
clusterService.submitStateUpdateTask("zen-disco-join (detected master)", new ProcessedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("zen-disco-join (detected master)", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) { @Override public ClusterState execute(ClusterState currentState) {
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(NO_MASTER_BLOCK).build(); ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(NO_MASTER_BLOCK).build();
return newClusterStateBuilder().state(currentState).blocks(clusterBlocks).build(); // make sure we have the local node id set, we might need it as a result of the new metadata
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.newNodesBuilder().putAll(currentState.nodes()).put(localNode).localNodeId(localNode.id());
return newClusterStateBuilder().state(currentState).nodes(nodesBuilder).blocks(clusterBlocks).metaData(metaData).build();
} }
@Override public void clusterStateProcessed(ClusterState clusterState) { @Override public void clusterStateProcessed(ClusterState clusterState) {
@ -508,8 +515,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
} }
private class MembershipListener implements MembershipAction.MembershipListener { private class MembershipListener implements MembershipAction.MembershipListener {
@Override public void onJoin(DiscoveryNode node) { @Override public ClusterState onJoin(DiscoveryNode node) {
handleJoinRequest(node); handleJoinRequest(node);
return clusterService.state();
} }
@Override public void onLeave(DiscoveryNode node) { @Override public void onLeave(DiscoveryNode node) {

View File

@ -20,6 +20,7 @@
package org.elasticsearch.discovery.zen.membership; package org.elasticsearch.discovery.zen.membership;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -28,10 +29,8 @@ import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.VoidStreamable; import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.BaseTransportRequestHandler; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.*;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.VoidTransportResponseHandler;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -42,18 +41,21 @@ import java.util.concurrent.TimeUnit;
public class MembershipAction extends AbstractComponent { public class MembershipAction extends AbstractComponent {
public static interface MembershipListener { public static interface MembershipListener {
void onJoin(DiscoveryNode node); ClusterState onJoin(DiscoveryNode node);
void onLeave(DiscoveryNode node); void onLeave(DiscoveryNode node);
} }
private final TransportService transportService; private final TransportService transportService;
private final DiscoveryNodesProvider nodesProvider;
private final MembershipListener listener; private final MembershipListener listener;
public MembershipAction(Settings settings, TransportService transportService, MembershipListener listener) { public MembershipAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, MembershipListener listener) {
super(settings); super(settings);
this.transportService = transportService; this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.listener = listener; this.listener = listener;
transportService.registerHandler(JoinRequestRequestHandler.ACTION, new JoinRequestRequestHandler()); transportService.registerHandler(JoinRequestRequestHandler.ACTION, new JoinRequestRequestHandler());
@ -74,30 +76,59 @@ public class MembershipAction extends AbstractComponent {
} }
public void sendJoinRequest(DiscoveryNode masterNode, DiscoveryNode node) { public void sendJoinRequest(DiscoveryNode masterNode, DiscoveryNode node) {
transportService.sendRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node), VoidTransportResponseHandler.INSTANCE_NOSPAWN); transportService.sendRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node, false), VoidTransportResponseHandler.INSTANCE_NOSPAWN);
} }
public void sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) throws ElasticSearchException { public ClusterState sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) throws ElasticSearchException {
transportService.submitRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node), VoidTransportResponseHandler.INSTANCE_NOSPAWN).txGet(timeout.millis(), TimeUnit.MILLISECONDS); return transportService.submitRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node, true), new FutureTransportResponseHandler<JoinResponse>() {
@Override public JoinResponse newInstance() {
return new JoinResponse();
}
}).txGet(timeout.millis(), TimeUnit.MILLISECONDS).clusterState;
} }
private static class JoinRequest implements Streamable { static class JoinRequest implements Streamable {
private DiscoveryNode node; DiscoveryNode node;
boolean withClusterState;
private JoinRequest() { private JoinRequest() {
} }
private JoinRequest(DiscoveryNode node) { private JoinRequest(DiscoveryNode node, boolean withClusterState) {
this.node = node; this.node = node;
this.withClusterState = withClusterState;
} }
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
node = DiscoveryNode.readNode(in); node = DiscoveryNode.readNode(in);
withClusterState = in.readBoolean();
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out); node.writeTo(out);
out.writeBoolean(withClusterState);
}
}
class JoinResponse implements Streamable {
ClusterState clusterState;
JoinResponse() {
}
JoinResponse(ClusterState clusterState) {
this.clusterState = clusterState;
}
@Override public void readFrom(StreamInput in) throws IOException {
clusterState = ClusterState.Builder.readFrom(in, settings, nodesProvider.nodes().localNode());
}
@Override public void writeTo(StreamOutput out) throws IOException {
ClusterState.Builder.writeTo(clusterState, out);
} }
} }
@ -110,8 +141,12 @@ public class MembershipAction extends AbstractComponent {
} }
@Override public void messageReceived(JoinRequest request, TransportChannel channel) throws Exception { @Override public void messageReceived(JoinRequest request, TransportChannel channel) throws Exception {
listener.onJoin(request.node); ClusterState clusterState = listener.onJoin(request.node);
channel.sendResponse(VoidStreamable.INSTANCE); if (request.withClusterState) {
channel.sendResponse(new JoinResponse(clusterState));
} else {
channel.sendResponse(VoidStreamable.INSTANCE);
}
} }
} }