Drain master task queue when stabilising (#42504)

Today the default stabilisation time is calculated on the assumption that the
elected master has no pending tasks to process when it is elected, but this is
not a safe assumption to make. This can result in a cluster reaching the end of
its stabilisation time without having stabilised. Furthermore in #36943 we
increased the probability that each step in `runRandomly()` enqueues another
task, vastly increasing the chance that we hit such a situation.

This change extends the stabilisation process to allow time for all pending
tasks, plus a task that might currently be in flight.

Fixes #41967, in which the master entered the stabilisation phase with over 800
tasks to process.
This commit is contained in:
David Turner 2019-05-24 14:17:21 +01:00
parent 56677f69cf
commit 4d02ca1633
3 changed files with 11 additions and 1 deletions

View File

@ -1264,7 +1264,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
@Override
public String toString() {
return "scheduled timeout for " + this;
return "scheduled timeout for " + CoordinatorPublication.this;
}
}, publishTimeout, Names.GENERIC);
}

View File

@ -1517,6 +1517,10 @@ public class CoordinatorTests extends ESTestCase {
final ClusterNode leader = getAnyLeader();
final long leaderTerm = leader.coordinator.getCurrentTerm();
final int pendingTaskCount = leader.masterService.getFakeMasterServicePendingTaskCount();
runFor((pendingTaskCount + 1) * DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "draining task queue");
final Matcher<Long> isEqualToLeaderVersion = equalTo(leader.coordinator.getLastAcceptedState().getVersion());
final String leaderId = leader.getId();
@ -1529,6 +1533,8 @@ public class CoordinatorTests extends ESTestCase {
assertFalse(nodeId + " should not have an active publication", clusterNode.coordinator.publicationInProgress());
if (clusterNode == leader) {
assertThat(nodeId + " is still the leader", clusterNode.coordinator.getMode(), is(LEADER));
assertThat(nodeId + " did not change term", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
continue;
}

View File

@ -84,6 +84,10 @@ public class FakeThreadPoolMasterService extends MasterService {
};
}
public int getFakeMasterServicePendingTaskCount() {
return pendingTasks.size();
}
private void scheduleNextTaskIfNecessary() {
if (taskInProgress == false && pendingTasks.isEmpty() == false && scheduledNextTask == false) {
scheduledNextTask = true;