Zen Discovery: A node might get into an infinite state of trying to find a master (when client / non_master) nodes exists, closes #247.

This commit is contained in:
kimchy 2010-07-09 00:43:06 +03:00
parent 87eb6bed8f
commit d531d82cfb
4 changed files with 112 additions and 91 deletions

View File

@ -104,6 +104,11 @@ public class Lifecycle {
return state == State.CLOSED;
}
public boolean stoppedOrClosed() {
Lifecycle.State state = this.state;
return state == State.STOPPED || state == State.CLOSED;
}
public boolean canMoveToStarted() throws ElasticSearchIllegalStateException {
State localState = this.state;
if (localState == State.INITIALIZED || localState == State.STOPPED) {

View File

@ -142,16 +142,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
nodesFD.updateNodes(latestDiscoNodes);
pingService.start();
if (nodeAttributes.containsKey("zen.master") && nodeAttributes.get("zen.master").equals("false")) {
// do the join on a different thread
threadPool.execute(new Runnable() {
@Override public void run() {
initialJoin();
}
});
} else {
initialJoin();
}
// do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
threadPool.execute(new Runnable() {
@Override public void run() {
joinCluster();
}
});
}
@Override protected void doStop() throws ElasticSearchException {
@ -221,11 +217,18 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
publishClusterState.publish(clusterState);
}
private void initialJoin() {
private void joinCluster() {
boolean retry = true;
while (retry) {
if (lifecycle.stoppedOrClosed()) {
return;
}
retry = false;
DiscoveryNode masterNode = broadPingTillMasterResolved();
DiscoveryNode masterNode = findMaster();
if (masterNode == null) {
retry = true;
continue;
}
if (localNode.equals(masterNode)) {
this.master = true;
nodesFD.start(); // start the nodes FD
@ -274,11 +277,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// cool, we found a master, start an FD on it
masterFD.start(masterNode, "initial_join");
}
if (retry) {
if (!lifecycle.started()) {
return;
}
}
}
}
@ -306,7 +304,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
});
}
private void handleMasterGone(final DiscoveryNode masterNode, String reason) {
private void handleMasterGone(final DiscoveryNode masterNode, final String reason) {
if (lifecycleState() != Lifecycle.State.STARTED) {
// not started, ignore a master failure
return;
@ -316,57 +314,73 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return;
}
logger.info("master [{}] left, reason [{}]", masterNode, reason);
List<DiscoveryNode> nodes = newArrayList(latestDiscoNodes.nodes().values());
nodes.remove(masterNode); // remove the master node from the list, it has failed
DiscoveryNode electedMaster = electMaster.electMaster(nodes); // elect master
if (localNode.equals(electedMaster)) {
this.master = true;
masterFD.stop("got elected as new master since master left (reason = " + reason + ")");
nodesFD.start();
clusterService.submitStateUpdateTask("zen-disco-elected_as_master(old master [" + masterNode + "])", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
logger.info("master_left [{}], reason [{}]", masterNode, reason);
clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
if (!masterNode.id().equals(currentState.nodes().masterNodeId())) {
// master got switched on us, no need to send anything
return currentState;
}
List<DiscoveryNode> nodes = newArrayList(currentState.nodes().nodes().values());
nodes.remove(masterNode); // remove the master node from the list, it has failed
final DiscoveryNode electedMaster = electMaster.electMaster(nodes); // elect master
if (localNode.equals(electedMaster)) {
master = true;
masterFD.stop("got elected as new master since master left (reason = " + reason + ")");
nodesFD.start();
DiscoveryNodes.Builder builder = DiscoveryNodes.newNodesBuilder()
.putAll(currentState.nodes())
// make sure the old master node, which has failed, is not part of the nodes we publish
.remove(masterNode.id())
.masterNodeId(localNode.id());
// update the fact that we are the master...
latestDiscoNodes = builder.build();
return newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build();
} else {
nodesFD.stop();
DiscoveryNodes.Builder builder = DiscoveryNodes.newNodesBuilder()
.putAll(currentState.nodes()).remove(masterNode.id());
if (electedMaster != null) {
builder.masterNodeId(electedMaster.id());
masterFD.restart(electedMaster, "possible elected master since master left (reason = " + reason + ")");
} else {
builder.masterNodeId(null);
masterFD.stop("no master elected since master left (reason = " + reason + ")");
// try and join the cluster again...
threadPool.execute(new Runnable() {
@Override public void run() {
joinCluster();
}
});
}
latestDiscoNodes = builder.build();
return newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build();
}
@Override public void clusterStateProcessed(ClusterState clusterState) {
sendInitialStateEventIfNeeded();
}
});
} else {
nodesFD.stop();
if (electedMaster != null) {
// we are not the master, start FD against the possible master
masterFD.restart(electedMaster, "possible elected master since master left (reason = " + reason + ")");
} else {
masterFD.stop("no master elected since master left (reason = " + reason + ")");
}
}
@Override public void clusterStateProcessed(ClusterState clusterState) {
sendInitialStateEventIfNeeded();
}
});
}
void handleNewClusterState(final ClusterState clusterState) {
void handleNewClusterStateFromMaster(final ClusterState clusterState) {
if (master) {
logger.warn("master should not receive new cluster state from [{}]", clusterState.nodes().masterNode());
} else {
latestDiscoNodes = clusterState.nodes();
// check to see that we monitor the correct master of the cluster
if (masterFD.masterNode() == null || !masterFD.masterNode().equals(latestDiscoNodes.masterNode())) {
masterFD.restart(latestDiscoNodes.masterNode(), "new cluster stare received and we monitor the wrong master [" + masterFD.masterNode() + "]");
}
if (clusterState.nodes().localNode() == null) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", clusterState.nodes().masterNode());
} else {
clusterService.submitStateUpdateTask("zen-disco-receive(from [" + clusterState.nodes().masterNode() + "])", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
latestDiscoNodes = clusterState.nodes();
// check to see that we monitor the correct master of the cluster
if (masterFD.masterNode() == null || !masterFD.masterNode().equals(latestDiscoNodes.masterNode())) {
masterFD.restart(latestDiscoNodes.masterNode(), "new cluster stare received and we monitor the wrong master [" + masterFD.masterNode() + "]");
}
return clusterState;
}
@ -418,44 +432,43 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
}
private DiscoveryNode broadPingTillMasterResolved() {
while (true) {
ZenPing.PingResponse[] pingResponses = pingService.pingAndWait(initialPingTimeout);
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder("ping responses:");
if (pingResponses.length == 0) {
sb.append(" {none}");
} else {
for (ZenPing.PingResponse pingResponse : pingResponses) {
sb.append("\n\t--> ").append("target [").append(pingResponse.target()).append("], master [").append(pingResponse.master()).append("]");
}
}
logger.debug(sb.toString());
}
List<DiscoveryNode> pingMasters = newArrayList();
for (ZenPing.PingResponse pingResponse : pingResponses) {
if (pingResponse.master() != null) {
pingMasters.add(pingResponse.master());
}
}
if (pingMasters.isEmpty()) {
// lets tie break between discovered nodes
List<DiscoveryNode> possibleMasterNodes = newArrayList();
possibleMasterNodes.add(localNode);
for (ZenPing.PingResponse pingResponse : pingResponses) {
possibleMasterNodes.add(pingResponse.target());
}
DiscoveryNode electedMaster = electMaster.electMaster(possibleMasterNodes);
if (localNode.equals(electedMaster)) {
return localNode;
}
private DiscoveryNode findMaster() {
ZenPing.PingResponse[] pingResponses = pingService.pingAndWait(initialPingTimeout);
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder("ping responses:");
if (pingResponses.length == 0) {
sb.append(" {none}");
} else {
DiscoveryNode electedMaster = electMaster.electMaster(pingMasters);
if (electedMaster != null) {
return electedMaster;
for (ZenPing.PingResponse pingResponse : pingResponses) {
sb.append("\n\t--> ").append("target [").append(pingResponse.target()).append("], master [").append(pingResponse.master()).append("]");
}
}
logger.debug(sb.toString());
}
List<DiscoveryNode> pingMasters = newArrayList();
for (ZenPing.PingResponse pingResponse : pingResponses) {
if (pingResponse.master() != null) {
pingMasters.add(pingResponse.master());
}
}
if (pingMasters.isEmpty()) {
// lets tie break between discovered nodes
List<DiscoveryNode> possibleMasterNodes = newArrayList();
possibleMasterNodes.add(localNode);
for (ZenPing.PingResponse pingResponse : pingResponses) {
possibleMasterNodes.add(pingResponse.target());
}
DiscoveryNode electedMaster = electMaster.electMaster(possibleMasterNodes);
if (localNode.equals(electedMaster)) {
return localNode;
}
} else {
DiscoveryNode electedMaster = electMaster.electMaster(pingMasters);
if (electedMaster != null) {
return electedMaster;
}
}
return null;
}
private void sendInitialStateEventIfNeeded() {
@ -468,7 +481,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private class NewClusterStateListener implements PublishClusterStateAction.NewClusterStateListener {
@Override public void onNewClusterState(ClusterState clusterState) {
handleNewClusterState(clusterState);
handleNewClusterStateFromMaster(clusterState);
}
}

View File

@ -93,6 +93,9 @@ public interface ZenPing extends LifecycleComponent<ZenPing> {
@Override public void writeTo(StreamOutput out) throws IOException {
clusterName.writeTo(out);
if (target == null) {
System.out.println("ARGH!");
}
target.writeTo(out);
if (master == null) {
out.writeBoolean(false);

View File

@ -345,14 +345,14 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
// connect to the node if possible
try {
transportService.connectToNode(requestingNode);
transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(false) {
@Override public void handleException(RemoteTransportException exp) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
});
} catch (Exception e) {
logger.warn("failed to connect to requesting node {}", e, requestingNode);
}
transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(false) {
@Override public void handleException(RemoteTransportException exp) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
});
}
});
} else {