Zen Discovery: Control which nodes are allowed to become masters, closes #248.

This commit is contained in:
kimchy 2010-07-09 04:22:03 +03:00
parent d1f5577c40
commit b657ffc5e7
1 changed files with 24 additions and 4 deletions

View File

@ -22,6 +22,9 @@ package org.elasticsearch.discovery.zen;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUID; import org.elasticsearch.common.UUID;
@ -58,6 +61,8 @@ import static org.elasticsearch.common.unit.TimeValue.*;
*/ */
public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, DiscoveryNodesProvider { public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, DiscoveryNodesProvider {
public final ClusterBlock NO_MASTER_BLOCK = new ClusterBlock(2, "no master", ClusterBlockLevel.ALL);
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final TransportService transportService; private final TransportService transportService;
@ -82,6 +87,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// a flag that should be used only for testing // a flag that should be used only for testing
private final boolean sendLeaveRequest; private final boolean sendLeaveRequest;
private final boolean blockClusterOnNoMaster;
private final ElectMasterService electMaster; private final ElectMasterService electMaster;
@ -107,6 +114,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
this.initialPingTimeout = componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3)); this.initialPingTimeout = componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3));
this.sendLeaveRequest = componentSettings.getAsBoolean("send_leave_request", true); this.sendLeaveRequest = componentSettings.getAsBoolean("send_leave_request", true);
this.blockClusterOnNoMaster = componentSettings.getAsBoolean("block_on_no_master", true);
logger.debug("using initial_ping_timeout [{}]", initialPingTimeout); logger.debug("using initial_ping_timeout [{}]", initialPingTimeout);
@ -232,7 +240,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
if (localNode.equals(masterNode)) { if (localNode.equals(masterNode)) {
this.master = true; this.master = true;
nodesFD.start(); // start the nodes FD nodesFD.start(); // start the nodes FD
clusterService.submitStateUpdateTask("zen-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) { @Override public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder() DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
.localNodeId(localNode.id()) .localNodeId(localNode.id())
@ -241,7 +249,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
.put(localNode); .put(localNode);
// update the fact that we are the master... // update the fact that we are the master...
latestDiscoNodes = builder.build(); latestDiscoNodes = builder.build();
return newClusterStateBuilder().state(currentState).nodes(builder).build(); ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(NO_MASTER_BLOCK).build();
return newClusterStateBuilder().state(currentState).nodes(builder).blocks(clusterBlocks).build();
} }
@Override public void clusterStateProcessed(ClusterState clusterState) { @Override public void clusterStateProcessed(ClusterState clusterState) {
@ -274,8 +283,17 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
retry = true; retry = true;
continue; continue;
} }
// cool, we found a master, start an FD on it
masterFD.start(masterNode, "initial_join"); masterFD.start(masterNode, "initial_join");
clusterService.submitStateUpdateTask("zen-disco-join (detected master)", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(NO_MASTER_BLOCK).build();
return newClusterStateBuilder().state(currentState).blocks(clusterBlocks).build();
}
@Override public void clusterStateProcessed(ClusterState clusterState) {
sendInitialStateEventIfNeeded();
}
});
} }
} }
} }
@ -323,6 +341,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return currentState; return currentState;
} }
ClusterBlocks clusterBlocks = currentState.blocks();
List<DiscoveryNode> nodes = newArrayList(currentState.nodes().nodes().values()); List<DiscoveryNode> nodes = newArrayList(currentState.nodes().nodes().values());
nodes.remove(masterNode); // remove the master node from the list, it has failed nodes.remove(masterNode); // remove the master node from the list, it has failed
final DiscoveryNode electedMaster = electMaster.electMaster(nodes); // elect master final DiscoveryNode electedMaster = electMaster.electMaster(nodes); // elect master
@ -346,6 +365,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
masterFD.restart(electedMaster, "possible elected master since master left (reason = " + reason + ")"); masterFD.restart(electedMaster, "possible elected master since master left (reason = " + reason + ")");
} else { } else {
builder.masterNodeId(null); builder.masterNodeId(null);
clusterBlocks = ClusterBlocks.builder().blocks(clusterBlocks).addGlobalBlock(NO_MASTER_BLOCK).build();
masterFD.stop("no master elected since master left (reason = " + reason + ")"); masterFD.stop("no master elected since master left (reason = " + reason + ")");
// try and join the cluster again... // try and join the cluster again...
threadPool.execute(new Runnable() { threadPool.execute(new Runnable() {
@ -355,7 +375,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}); });
} }
latestDiscoNodes = builder.build(); latestDiscoNodes = builder.build();
return newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build(); return newClusterStateBuilder().state(currentState).blocks(clusterBlocks).nodes(latestDiscoNodes).build();
} }
} }