[Discovery] join master after first election

Currently, pinging results are only used if the local node is elected master or if they detect another *already* active master. This has the effect that master election requires two pinging rounds - one for the elected master to take is role and another for the other nodes to detect it and join the cluster. We can be smarter and use the election of the first round on other nodes as well. Those nodes can try to join the elected master immediately. There is a catch though - the elected master node may still be processing the election and may reject the join request if not ready yet. To compensate a retry mechanism is introduced to try again (up to 3 times by default) if this happens.

Closes #6943
This commit is contained in:
Boaz Leskes 2014-07-18 08:14:38 +02:00
parent a40984887b
commit ffcf1077d8
1 changed files with 60 additions and 32 deletions

View File

@ -99,6 +99,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private final TimeValue pingTimeout;
private final TimeValue joinTimeout;
/** how many retry attempts to perform if join request failed with an retriable error */
private final int joinRetryAttempts;
/** how long to wait before performing another join attempt after a join request failed with an retriable error */
private final TimeValue joinRetryDelay;
// a flag that should be used only for testing
private final boolean sendLeaveRequest;
@ -144,6 +150,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// also support direct discovery.zen settings, for cases when it gets extended
this.pingTimeout = settings.getAsTime("discovery.zen.ping.timeout", settings.getAsTime("discovery.zen.ping_timeout", componentSettings.getAsTime("ping_timeout", componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3)))));
this.joinTimeout = settings.getAsTime("discovery.zen.join_timeout", TimeValue.timeValueMillis(pingTimeout.millis() * 20));
this.joinRetryAttempts = settings.getAsInt("discovery.zen.join_retry_attempts", 3);
this.joinRetryDelay = settings.getAsTime("discovery.zen.join_retry_delay", TimeValue.timeValueMillis(100));
this.sendLeaveRequest = componentSettings.getAsBoolean("send_leave_request", true);
this.masterElectionFilterClientNodes = settings.getAsBoolean("discovery.zen.master_election.filter_client", true);
@ -350,30 +358,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
});
} else {
this.master = false;
try {
// first, make sure we can connect to the master
transportService.connectToNode(masterNode);
} catch (Exception e) {
logger.warn("failed to connect to master [{}], retrying...", e, masterNode);
retry = true;
continue;
}
// send join request
try {
membership.sendJoinRequestBlocking(masterNode, localNode, joinTimeout);
} catch (Exception e) {
if (e instanceof ElasticsearchException) {
logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ((ElasticsearchException) e).getDetailedMessage());
} else {
logger.info("failed to send join request to master [{}], reason [{}]", masterNode, e.getMessage());
}
if (logger.isTraceEnabled()) {
logger.trace("detailed failed reason", e);
}
// failed to send the join request, retry
retry = true;
retry = !joinElectedMaster(masterNode);
if (retry) {
continue;
}
masterFD.start(masterNode, "initial_join");
// no need to submit the received cluster state, we will get it from the master when it publishes
// the fact that we joined
@ -381,6 +371,52 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
}
/**
* Join a newly elected master.
*
* @return true if successful
*/
private boolean joinElectedMaster(DiscoveryNode masterNode) {
try {
// first, make sure we can connect to the master
transportService.connectToNode(masterNode);
} catch (Exception e) {
logger.warn("failed to connect to master [{}], retrying...", e, masterNode);
return false;
}
for (int joinAttempt = 0; joinAttempt < this.joinRetryAttempts; joinAttempt++) {
try {
logger.trace("joining master {}", masterNode);
membership.sendJoinRequestBlocking(masterNode, localNode, joinTimeout);
return true;
} catch (ElasticsearchIllegalStateException e) {
if (joinAttempt >= this.joinRetryAttempts) {
logger.info("failed to send join request to master [{}], reason [{}]. Tried [{}] times",
masterNode, e.getDetailedMessage(), joinAttempt + 1);
return false;
} else {
logger.trace("master {} failed with [{}]. retrying... (attempts done: [{}])", masterNode, e.getDetailedMessage(), joinAttempt + 1);
}
} catch (Exception e) {
if (logger.isTraceEnabled()) {
logger.trace("failed to send join request to master [{}]", e);
} else if (e instanceof ElasticsearchException) {
logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ((ElasticsearchException) e).getDetailedMessage());
} else {
logger.info("failed to send join request to master [{}], reason [{}]", masterNode, e.getMessage());
}
return false;
}
try {
Thread.sleep(this.joinRetryDelay.millis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
return false;
}
private void handleLeaveRequest(final DiscoveryNode node) {
if (lifecycleState() != Lifecycle.State.STARTED) {
// not started, ignore a node failure
@ -887,17 +923,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return null;
}
// lets tie break between discovered nodes
DiscoveryNode electedMaster = electMaster.electMaster(possibleMasterNodes);
if (localNode.equals(electedMaster)) {
return localNode;
}
return electMaster.electMaster(possibleMasterNodes);
} else {
DiscoveryNode electedMaster = electMaster.electMaster(pingMasters);
if (electedMaster != null) {
return electedMaster;
}
return electMaster.electMaster(pingMasters);
}
return null;
}
private ClusterState rejoin(ClusterState clusterState, String reason) {
@ -1028,8 +1057,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
public void onFailure(String source, Throwable t) {
if (t instanceof ClusterService.NoLongerMasterException) {
logger.debug("not processing [{}] as we are no longer master", source);
}
else {
} else {
logger.error("unexpected failure during [{}]", t, source);
}
}