[Zen2] Simulate scheduling delays (#34181)

Today we schedule tasks (both immediate and future ones) exactly when
requested. In fact it is more realistic to allow for a small amount of delay in
the scheduling of tasks, and this helps to exercise more interleavings of
actions and therefore to improve test coverage.

This change adds to the DeterministicTaskQueue the ability to add a random
delay to the scheduling of tasks.

This change also provides more explicit timeouts for stabilisation in the
CoordinatorTests.

Using the randomised scheduling feature in the CoordinatorTests also found a
situation in which we could become a leader, then a candidate, and then a
leader again very quickly, causing a clash of the _BECOME_MASTER_ and
_FINISH_ELECTION_ tasks. We change their behaviour to not consider these
duplicates to be problematic.
This commit is contained in:
David Turner 2018-10-02 11:22:05 +01:00 committed by GitHub
parent 412face402
commit a127805b4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 333 additions and 156 deletions

View File

@ -58,7 +58,7 @@ public interface ClusterStateTaskExecutor<T> {
* This allows groupd task description but the submitting source.
*/
default String describeTasks(List<T> tasks) {
return String.join(", ", tasks.stream().map(t -> (CharSequence)t.toString()).filter(t -> t.length() == 0)::iterator);
return String.join(", ", tasks.stream().map(t -> (CharSequence)t.toString()).filter(t -> t.length() > 0)::iterator);
}
/**

View File

@ -417,8 +417,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
synchronized (mutex) {
final Optional<DiscoveryNode> peerFinderLeader = peerFinder.getLeader();
assert peerFinder.getCurrentTerm() == getCurrentTerm();
assert followersChecker.getFastResponseState().term == getCurrentTerm();
assert followersChecker.getFastResponseState().mode == getMode();
assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
if (mode == Mode.LEADER) {
final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm();

View File

@ -287,9 +287,9 @@ public class JoinHelper extends AbstractComponent {
final String stateUpdateSource = "elected-as-master ([" + pendingAsTasks.size() + "] nodes joined)";
pendingAsTasks.put(JoinTaskExecutor.BECOME_MASTER_TASK, (source, e) -> {
pendingAsTasks.put(JoinTaskExecutor.newBecomeMasterTask(), (source, e) -> {
});
pendingAsTasks.put(JoinTaskExecutor.FINISH_ELECTION_TASK, (source, e) -> {
pendingAsTasks.put(JoinTaskExecutor.newFinishElectionTask(), (source, e) -> {
});
masterService.submitStateUpdateTasks(stateUpdateSource, pendingAsTasks, ClusterStateTaskConfig.build(Priority.URGENT),
joinTaskExecutor);

View File

@ -63,6 +63,17 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
public String toString() {
return node != null ? node + " " + reason : reason;
}
public boolean isBecomeMasterTask() {
return reason.equals(BECOME_MASTER_TASK_REASON);
}
public boolean isFinishElectionTask() {
return reason.equals(FINISH_ELECTION_TASK_REASON);
}
private static final String BECOME_MASTER_TASK_REASON = "_BECOME_MASTER_TASK_";
private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
}
public JoinTaskExecutor(AllocationService allocationService, Logger logger) {
@ -78,10 +89,11 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
boolean nodesChanged = false;
ClusterState.Builder newState;
if (joiningNodes.size() == 1 && joiningNodes.get(0).equals(FINISH_ELECTION_TASK)) {
if (joiningNodes.size() == 1 && joiningNodes.get(0).isFinishElectionTask()) {
return results.successes(joiningNodes).build(currentState);
} else if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) {
assert joiningNodes.contains(FINISH_ELECTION_TASK) : "becoming a master but election is not finished " + joiningNodes;
} else if (currentNodes.getMasterNode() == null && joiningNodes.stream().anyMatch(Task::isBecomeMasterTask)) {
assert joiningNodes.stream().anyMatch(Task::isFinishElectionTask)
: "becoming a master but election is not finished " + joiningNodes;
// use these joins to try and become the master.
// Note that we don't have to do any validation of the amount of joining nodes - the commit
// during the cluster state publishing guarantees that we have enough
@ -104,7 +116,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
final boolean enforceMajorVersion = currentState.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false;
// processing any joins
for (final Task joinTask : joiningNodes) {
if (joinTask.equals(BECOME_MASTER_TASK) || joinTask.equals(FINISH_ELECTION_TASK)) {
if (joinTask.isBecomeMasterTask() || joinTask.isFinishElectionTask()) {
// noop
} else if (currentNodes.nodeExists(joinTask.node())) {
logger.debug("received a join request for an existing node [{}]", joinTask.node());
@ -146,7 +158,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
nodesBuilder.masterNodeId(currentState.nodes().getLocalNodeId());
for (final Task joinTask : joiningNodes) {
if (joinTask.equals(BECOME_MASTER_TASK) || joinTask.equals(FINISH_ELECTION_TASK)) {
if (joinTask.isBecomeMasterTask() || joinTask.isFinishElectionTask()) {
// noop
} else {
final DiscoveryNode joiningNode = joinTask.node();
@ -180,16 +192,17 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
return false;
}
/**
* a task indicated that the current node should become master, if no current master is known
*/
public static final Task BECOME_MASTER_TASK = new Task(null, "_BECOME_MASTER_TASK_");
public static Task newBecomeMasterTask() {
return new Task(null, Task.BECOME_MASTER_TASK_REASON);
}
/**
* a task that is used to signal the election is stopped and we should process pending joins.
* it may be use in combination with {@link JoinTaskExecutor#BECOME_MASTER_TASK}
* it may be used in combination with {@link JoinTaskExecutor#newBecomeMasterTask()}
*/
public static final Task FINISH_ELECTION_TASK = new Task(null, "_FINISH_ELECTION_");
public static Task newFinishElectionTask() {
return new Task(null, Task.FINISH_ELECTION_TASK_REASON);
}
/**
* Ensures that all indices are compatible with the given node version. This will ensure that all indices in the given metadata

View File

@ -22,6 +22,8 @@ package org.elasticsearch.discovery;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -69,7 +71,9 @@ public class HandshakingTransportAddressConnector extends AbstractComponent impl
// TODO if transportService is already connected to this address then skip the handshaking
final DiscoveryNode targetNode = new DiscoveryNode(transportAddress.toString(), transportAddress, emptyMap(),
final DiscoveryNode targetNode = new DiscoveryNode("", transportAddress.toString(),
UUIDs.randomBase64UUID(Randomness.get()), // generated deterministically for reproducible tests
transportAddress.address().getHostString(), transportAddress.getAddress(), transportAddress, emptyMap(),
emptySet(), Version.CURRENT.minimumCompatibilityVersion());
logger.trace("[{}] opening probe connection", this);

View File

@ -411,7 +411,7 @@ public abstract class PeerFinder extends AbstractComponent {
@Override
public void handleException(TransportException exp) {
peersRequestInFlight = false;
logger.debug(new ParameterizedMessage("{} peers request failed", this), exp);
logger.debug(new ParameterizedMessage("{} peers request failed", Peer.this), exp);
}
@Override

View File

@ -276,8 +276,8 @@ public class NodeJoinController extends AbstractComponent {
final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)";
// noop listener, the election finished listener determines result
tasks.put(JoinTaskExecutor.BECOME_MASTER_TASK, (source1, e) -> {});
tasks.put(JoinTaskExecutor.FINISH_ELECTION_TASK, electionFinishedListener);
tasks.put(JoinTaskExecutor.newBecomeMasterTask(), (source1, e) -> {});
tasks.put(JoinTaskExecutor.newFinishElectionTask(), electionFinishedListener);
masterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
}
@ -285,7 +285,7 @@ public class NodeJoinController extends AbstractComponent {
innerClose();
Map<JoinTaskExecutor.Task, ClusterStateTaskListener> tasks = getPendingAsTasks(reason);
final String source = "zen-disco-election-stop [" + reason + "]";
tasks.put(JoinTaskExecutor.FINISH_ELECTION_TASK, electionFinishedListener);
tasks.put(JoinTaskExecutor.newFinishElectionTask(), electionFinishedListener);
masterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
}

View File

@ -35,8 +35,10 @@ import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver;
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
import org.elasticsearch.test.ESTestCase;
@ -63,12 +65,16 @@ import static org.elasticsearch.cluster.coordination.CoordinationStateTests.setV
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER;
import static org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.DEFAULT_DELAY_VARIABILITY;
import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING;
import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING;
import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
@ -89,7 +95,7 @@ public class CoordinatorTests extends ESTestCase {
logger.info("--> submitting value [{}] to [{}]", finalValue, leader);
leader.submitValue(finalValue);
cluster.stabilise(); // TODO this should only need a short stabilisation
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
for (final ClusterNode clusterNode : cluster.clusterNodes) {
final String nodeId = clusterNode.getId();
@ -103,8 +109,15 @@ public class CoordinatorTests extends ESTestCase {
cluster.stabilise();
final long currentTerm = cluster.getAnyLeader().coordinator.getCurrentTerm();
cluster.addNodes(randomIntBetween(1, 2));
cluster.stabilise();
final int newNodesCount = randomIntBetween(1, 2);
cluster.addNodes(newNodesCount);
cluster.stabilise(
// The first pinging discovers the master
defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING)
// One message delay to send a join
+ DEFAULT_DELAY_VARIABILITY
// Commit a new cluster state with the new node(s). Might be split into multiple commits
+ newNodesCount * DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
final long newTerm = cluster.getAnyLeader().coordinator.getCurrentTerm();
assertEquals(currentTerm, newTerm);
@ -118,7 +131,30 @@ public class CoordinatorTests extends ESTestCase {
logger.info("--> disconnecting leader {}", originalLeader);
originalLeader.disconnect();
cluster.stabilise();
cluster.stabilise(Math.max(
// Each follower may have just sent a leader check, which receives no response
// TODO not necessary if notified of disconnection
defaultMillis(LEADER_CHECK_TIMEOUT_SETTING)
// then wait for the follower to check the leader
+ defaultMillis(LEADER_CHECK_INTERVAL_SETTING)
// then wait for the exception response
+ DEFAULT_DELAY_VARIABILITY
// then wait for a new election
+ DEFAULT_ELECTION_DELAY
// then wait for the old leader's removal to be committed
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY,
// ALSO the leader may have just sent a follower check, which receives no response
// TODO unnecessary if notified of disconnection
defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING)
// wait for the leader to check its followers
+ defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING)
// then wait for the exception response
+ DEFAULT_DELAY_VARIABILITY
// then wait for the removal to be committed
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY
));
assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId())));
}
@ -130,13 +166,24 @@ public class CoordinatorTests extends ESTestCase {
logger.info("--> partitioning leader {}", originalLeader);
originalLeader.partition();
cluster.stabilise(
cluster.stabilise(Math.max(
// first wait for all the followers to notice the leader has gone
(LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis() + LEADER_CHECK_TIMEOUT_SETTING.get(Settings.EMPTY).millis())
* LEADER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY)
// then wait for the new leader to notice that the old leader is unresponsive
+ (FOLLOWER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis() + FOLLOWER_CHECK_TIMEOUT_SETTING.get(Settings.EMPTY).millis())
* FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY));
(defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + defaultMillis(LEADER_CHECK_TIMEOUT_SETTING))
* defaultInt(LEADER_CHECK_RETRY_COUNT_SETTING)
// then wait for a follower to be promoted to leader
+ DEFAULT_ELECTION_DELAY
// then wait for the new leader to notice that the old leader is unresponsive
+ (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING))
* defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING)
// then wait for the new leader to commit a state without the old leader
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY,
// ALSO wait for the leader to notice that its followers are unresponsive
(defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING))
* defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING)
// then wait for the leader to try and commit a state removing them, causing it to stand down
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY
));
assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId())));
}
@ -149,7 +196,25 @@ public class CoordinatorTests extends ESTestCase {
logger.info("--> disconnecting follower {}", follower);
follower.disconnect();
cluster.stabilise();
cluster.stabilise(Math.max(
// the leader may have just sent a follower check, which receives no response
// TODO unnecessary if notified of disconnection
defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING)
// wait for the leader to check the follower
+ defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING)
// then wait for the exception response
+ DEFAULT_DELAY_VARIABILITY
// then wait for the removal to be committed
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY,
// ALSO the follower may have just sent a leader check, which receives no response
// TODO not necessary if notified of disconnection
defaultMillis(LEADER_CHECK_TIMEOUT_SETTING)
// then wait for the follower to check the leader
+ defaultMillis(LEADER_CHECK_INTERVAL_SETTING)
// then wait for the exception response, causing the follower to become a candidate
+ DEFAULT_DELAY_VARIABILITY
));
assertThat(cluster.getAnyLeader().getId(), equalTo(leader.getId()));
}
@ -162,31 +227,82 @@ public class CoordinatorTests extends ESTestCase {
logger.info("--> partitioning follower {}", follower);
follower.partition();
cluster.stabilise(
cluster.stabilise(Math.max(
// wait for the leader to notice that the follower is unresponsive
(FOLLOWER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis() + FOLLOWER_CHECK_TIMEOUT_SETTING.get(Settings.EMPTY).millis())
* FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY));
(defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING))
* defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING)
// then wait for the leader to commit a state without the follower
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY,
// ALSO wait for the follower to notice the leader is unresponsive
(defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + defaultMillis(LEADER_CHECK_TIMEOUT_SETTING))
* defaultInt(LEADER_CHECK_RETRY_COUNT_SETTING)
));
assertThat(cluster.getAnyLeader().getId(), equalTo(leader.getId()));
}
private static long defaultMillis(Setting<TimeValue> setting) {
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
}
private static int defaultInt(Setting<Integer> setting) {
return setting.get(Settings.EMPTY);
}
// Updating the cluster state involves up to 5 delays:
// 1. submit the task to the master service
// 2. send PublishRequest
// 3. receive PublishResponse
// 4. send ApplyCommitRequest
// 5. receive ApplyCommitResponse and apply committed state
private static final long DEFAULT_CLUSTER_STATE_UPDATE_DELAY = 5 * DEFAULT_DELAY_VARIABILITY;
private static final int ELECTION_RETRIES = 10;
// The time it takes to complete an election
private static final long DEFAULT_ELECTION_DELAY
// Pinging all peers twice should be enough to discover all nodes
= defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2
// Then wait for an election to be scheduled; we allow enough time for retries to allow for collisions
+ defaultMillis(ELECTION_INITIAL_TIMEOUT_SETTING) * ELECTION_RETRIES
+ defaultMillis(ELECTION_BACK_OFF_TIME_SETTING) * ELECTION_RETRIES * (ELECTION_RETRIES - 1) / 2
// Allow two round-trip for pre-voting and voting
+ 4 * DEFAULT_DELAY_VARIABILITY
// Then a commit of the new leader's first cluster state
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY;
private static final long DEFAULT_STABILISATION_TIME =
// If leader just blackholed, need to wait for this to be detected
(defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + defaultMillis(LEADER_CHECK_TIMEOUT_SETTING))
* defaultInt(LEADER_CHECK_RETRY_COUNT_SETTING)
// then wait for a follower to be promoted to leader
+ DEFAULT_ELECTION_DELAY
// then wait for the new leader to notice that the old leader is unresponsive
+ (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING))
* defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING)
// then wait for the new leader to commit a state without the old leader
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY;
private static String nodeIdFromIndex(int nodeIndex) {
return "node" + nodeIndex;
}
class Cluster {
static final long DEFAULT_STABILISATION_TIME = 3000L; // TODO use a real stabilisation time - needs fault detection and disruption
static final long DEFAULT_DELAY_VARIABILITY = 100L;
final List<ClusterNode> clusterNodes;
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(
// TODO does ThreadPool need a node name any more?
Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build());
Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build(), random());
private final VotingConfiguration initialConfiguration;
private final Set<String> disconnectedNodes = new HashSet<>();
private final Set<String> blackholedNodes = new HashSet<>();
Cluster(int initialNodeCount) {
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
logger.info("--> creating cluster of {} nodes", initialNodeCount);
Set<String> initialNodeIds = new HashSet<>(initialNodeCount);
@ -216,13 +332,16 @@ public class CoordinatorTests extends ESTestCase {
stabilise(DEFAULT_STABILISATION_TIME);
}
void stabilise(long stabilisationTime) {
void stabilise(long stabiliationDurationMillis) {
final long stabilisationStartTime = deterministicTaskQueue.getCurrentTimeMillis();
while (deterministicTaskQueue.getCurrentTimeMillis() < stabilisationStartTime + stabilisationTime) {
final long stabilisationEndTime = stabilisationStartTime + stabiliationDurationMillis;
logger.info("--> stabilising until [{}ms]", stabilisationEndTime);
while (deterministicTaskQueue.getCurrentTimeMillis() < stabilisationEndTime) {
while (deterministicTaskQueue.hasRunnableTasks()) {
try {
deterministicTaskQueue.runRandomTask(random());
deterministicTaskQueue.runRandomTask();
} catch (CoordinationStateRejectedException e) {
logger.debug("ignoring benign exception thrown when stabilising", e);
}

View File

@ -66,7 +66,7 @@ public class ElectionSchedulerFactoryTests extends ESTestCase {
if (deterministicTaskQueue.hasRunnableTasks() == false) {
deterministicTaskQueue.advanceTime();
}
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
}
assertTrue(electionStarted.compareAndSet(true, false));
@ -101,7 +101,7 @@ public class ElectionSchedulerFactoryTests extends ESTestCase {
lastElectionTime = thisElectionTime;
}
}
deterministicTaskQueue.runAllTasks(random());
deterministicTaskQueue.runAllTasks();
assertFalse(electionStarted.get());
}
@ -130,7 +130,7 @@ public class ElectionSchedulerFactoryTests extends ESTestCase {
final long backOffTime = ELECTION_BACK_OFF_TIME_SETTING.get(settings).millis();
final long maxTimeout = ELECTION_MAX_TIMEOUT_SETTING.get(settings).millis();
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
final ElectionSchedulerFactory electionSchedulerFactory
= new ElectionSchedulerFactory(settings, random(), deterministicTaskQueue.getThreadPool());

View File

@ -68,7 +68,7 @@ public class FollowersCheckerTests extends ESTestCase {
final DiscoveryNodes[] discoveryNodesHolder
= new DiscoveryNodes[]{DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).build()};
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
final Set<DiscoveryNode> checkedNodes = new HashSet<>();
final AtomicInteger checkCount = new AtomicInteger();
@ -98,7 +98,7 @@ public class FollowersCheckerTests extends ESTestCase {
});
followersChecker.setCurrentNodes(discoveryNodesHolder[0]);
deterministicTaskQueue.runAllTasks(random());
deterministicTaskQueue.runAllTasks();
assertThat(checkedNodes, empty());
assertThat(followersChecker.getFaultyNodes(), empty());
@ -107,7 +107,7 @@ public class FollowersCheckerTests extends ESTestCase {
followersChecker.setCurrentNodes(discoveryNodesHolder[0] = DiscoveryNodes.builder(discoveryNodesHolder[0]).add(otherNode1).build());
while (checkCount.get() < 10) {
if (deterministicTaskQueue.hasRunnableTasks()) {
deterministicTaskQueue.runRandomTask(random());
deterministicTaskQueue.runRandomTask();
} else {
deterministicTaskQueue.advanceTime();
}
@ -121,7 +121,7 @@ public class FollowersCheckerTests extends ESTestCase {
followersChecker.setCurrentNodes(discoveryNodesHolder[0] = DiscoveryNodes.builder(discoveryNodesHolder[0]).add(otherNode2).build());
while (checkCount.get() < 10) {
if (deterministicTaskQueue.hasRunnableTasks()) {
deterministicTaskQueue.runRandomTask(random());
deterministicTaskQueue.runRandomTask();
} else {
deterministicTaskQueue.advanceTime();
}
@ -135,7 +135,7 @@ public class FollowersCheckerTests extends ESTestCase {
= DiscoveryNodes.builder(discoveryNodesHolder[0]).remove(otherNode1).build());
while (checkCount.get() < 10) {
if (deterministicTaskQueue.hasRunnableTasks()) {
deterministicTaskQueue.runRandomTask(random());
deterministicTaskQueue.runRandomTask();
} else {
deterministicTaskQueue.advanceTime();
}
@ -145,7 +145,7 @@ public class FollowersCheckerTests extends ESTestCase {
checkedNodes.clear();
followersChecker.clearCurrentNodes();
deterministicTaskQueue.runAllTasks(random());
deterministicTaskQueue.runAllTasks();
assertThat(checkedNodes, empty());
}
@ -225,7 +225,7 @@ public class FollowersCheckerTests extends ESTestCase {
final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).put(testSettings).build();
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
final MockTransport mockTransport = new MockTransport() {
@Override
@ -276,41 +276,41 @@ public class FollowersCheckerTests extends ESTestCase {
if (deterministicTaskQueue.hasRunnableTasks() == false) {
deterministicTaskQueue.advanceTime();
}
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
}
assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(expectedFailureTime));
assertThat(followersChecker.getFaultyNodes(), contains(otherNode));
deterministicTaskQueue.runAllTasks(random()); // checks it does not continue checking a failed node
deterministicTaskQueue.runAllTasks();
// add another node and see that it schedules checks for this new node but keeps on considering the old one faulty
final DiscoveryNode otherNode2 = new DiscoveryNode("other-node-2", buildNewFakeTransportAddress(), Version.CURRENT);
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).add(otherNode2).build();
followersChecker.setCurrentNodes(discoveryNodes);
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
assertThat(followersChecker.getFaultyNodes(), contains(otherNode));
// remove the faulty node and see that it is removed
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).remove(otherNode).build();
followersChecker.setCurrentNodes(discoveryNodes);
assertThat(followersChecker.getFaultyNodes(), empty());
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
// remove the working node and see that everything eventually stops
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).remove(otherNode2).build();
followersChecker.setCurrentNodes(discoveryNodes);
deterministicTaskQueue.runAllTasks(random());
deterministicTaskQueue.runAllTasks();
// add back the faulty node afresh and see that it fails again
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).add(otherNode).build();
followersChecker.setCurrentNodes(discoveryNodes);
nodeFailed.set(false);
assertThat(followersChecker.getFaultyNodes(), empty());
deterministicTaskQueue.runAllTasks(random());
deterministicTaskQueue.runAllTasks();
assertTrue(nodeFailed.get());
assertThat(followersChecker.getFaultyNodes(), contains(otherNode));
}
@ -333,7 +333,7 @@ public class FollowersCheckerTests extends ESTestCase {
final DiscoveryNode leader = new DiscoveryNode("leader", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode follower = new DiscoveryNode("follower", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), follower.getName()).build();
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
final MockTransport mockTransport = new MockTransport() {
@Override
@ -368,7 +368,7 @@ public class FollowersCheckerTests extends ESTestCase {
final ExpectsSuccess expectsSuccess = new ExpectsSuccess();
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(term, leader), expectsSuccess);
deterministicTaskQueue.runAllTasks(random());
deterministicTaskQueue.runAllTasks();
assertTrue(expectsSuccess.succeeded());
assertFalse(calledCoordinator.get());
}
@ -398,7 +398,7 @@ public class FollowersCheckerTests extends ESTestCase {
return Names.SAME;
}
});
deterministicTaskQueue.runAllTasks(random());
deterministicTaskQueue.runAllTasks();
assertFalse(calledCoordinator.get());
assertThat(receivedException.get(), not(nullValue()));
}
@ -412,7 +412,7 @@ public class FollowersCheckerTests extends ESTestCase {
final ExpectsSuccess expectsSuccess = new ExpectsSuccess();
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME,
new FollowerCheckRequest(leaderTerm, leader), expectsSuccess);
deterministicTaskQueue.runAllTasks(random());
deterministicTaskQueue.runAllTasks();
assertTrue(expectsSuccess.succeeded());
assertTrue(calledCoordinator.get());
calledCoordinator.set(false);
@ -425,7 +425,7 @@ public class FollowersCheckerTests extends ESTestCase {
final ExpectsSuccess expectsSuccess = new ExpectsSuccess();
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(term, leader), expectsSuccess);
deterministicTaskQueue.runAllTasks(random());
deterministicTaskQueue.runAllTasks();
assertTrue(expectsSuccess.succeeded());
assertTrue(calledCoordinator.get());
calledCoordinator.set(false);
@ -457,7 +457,7 @@ public class FollowersCheckerTests extends ESTestCase {
return Names.SAME;
}
});
deterministicTaskQueue.runAllTasks(random());
deterministicTaskQueue.runAllTasks();
assertTrue(calledCoordinator.get());
assertThat(receivedException.get(), not(nullValue()));
assertThat(receivedException.get().getRootCause().getMessage(), equalTo(exceptionMessage));

View File

@ -37,7 +37,7 @@ public class JoinHelperTests extends ESTestCase {
public void testJoinDeduplication() {
DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(
Settings.builder().put(NODE_NAME_SETTING.getKey(), "node0").build());
Settings.builder().put(NODE_NAME_SETTING.getKey(), "node0").build(), random());
CapturingTransport capturingTransport = new CapturingTransport();
DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT);
TransportService transportService = capturingTransport.createTransportService(Settings.EMPTY,

View File

@ -93,7 +93,7 @@ public class LeaderCheckerTests extends ESTestCase {
final Settings settings = settingsBuilder.build();
logger.info("--> using {}", settings);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
final MockTransport mockTransport = new MockTransport() {
int consecutiveFailedRequestsCount;
@ -149,13 +149,13 @@ public class LeaderCheckerTests extends ESTestCase {
final long maxCheckCount = randomLongBetween(2, 1000);
logger.info("--> checking that no failure is detected in {} checks", maxCheckCount);
while (checkCount.get() < maxCheckCount) {
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
deterministicTaskQueue.advanceTime();
}
}
logger.info("--> running remaining tasks");
deterministicTaskQueue.runAllTasks(random());
deterministicTaskQueue.runAllTasks();
assertFalse(leaderFailed.get());
logger.info("--> creating second checker");
@ -164,11 +164,11 @@ public class LeaderCheckerTests extends ESTestCase {
final long maxCheckCount = randomLongBetween(2, 1000);
logger.info("--> checking again that no failure is detected in {} checks", maxCheckCount);
while (checkCount.get() < maxCheckCount) {
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
deterministicTaskQueue.advanceTime();
}
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
final long failureTime = deterministicTaskQueue.getCurrentTimeMillis();
allResponsesFail.set(true);
@ -176,7 +176,7 @@ public class LeaderCheckerTests extends ESTestCase {
while (leaderFailed.get() == false) {
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
}
assertThat(deterministicTaskQueue.getCurrentTimeMillis() - failureTime,
@ -197,7 +197,7 @@ public class LeaderCheckerTests extends ESTestCase {
final Response[] responseHolder = new Response[]{Response.SUCCESS};
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getId()).build();
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
final MockTransport mockTransport = new MockTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
@ -239,38 +239,38 @@ public class LeaderCheckerTests extends ESTestCase {
try (Releasable ignored = leaderChecker.startLeaderChecker(leader)) {
while (deterministicTaskQueue.getCurrentTimeMillis() < 10 * LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis()) {
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
deterministicTaskQueue.advanceTime();
}
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
assertFalse(leaderFailed.get());
responseHolder[0] = Response.REMOTE_ERROR;
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
assertTrue(leaderFailed.get());
}
deterministicTaskQueue.runAllTasks(random());
deterministicTaskQueue.runAllTasks();
leaderFailed.set(false);
responseHolder[0] = Response.SUCCESS;
try (Releasable ignored = leaderChecker.startLeaderChecker(leader)) {
while (deterministicTaskQueue.getCurrentTimeMillis() < 10 * LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis()) {
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
deterministicTaskQueue.advanceTime();
}
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
assertFalse(leaderFailed.get());
responseHolder[0] = Response.DIRECT_ERROR;
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
assertTrue(leaderFailed.get());
}
@ -280,7 +280,7 @@ public class LeaderCheckerTests extends ESTestCase {
final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getId()).build();
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
final CapturingTransport capturingTransport = new CapturingTransport();
final TransportService transportService = capturingTransport.createTransportService(settings,
@ -298,7 +298,7 @@ public class LeaderCheckerTests extends ESTestCase {
final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler();
transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler);
deterministicTaskQueue.runAllTasks(random());
deterministicTaskQueue.runAllTasks();
assertFalse(handler.successfulResponseReceived);
assertThat(handler.transportException.getRootCause(), instanceOf(CoordinationStateRejectedException.class));
@ -311,7 +311,7 @@ public class LeaderCheckerTests extends ESTestCase {
final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler();
transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler);
deterministicTaskQueue.runAllTasks(random());
deterministicTaskQueue.runAllTasks();
assertTrue(handler.successfulResponseReceived);
assertThat(handler.transportException, nullValue());
@ -322,7 +322,7 @@ public class LeaderCheckerTests extends ESTestCase {
final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler();
transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler);
deterministicTaskQueue.runAllTasks(random());
deterministicTaskQueue.runAllTasks();
assertFalse(handler.successfulResponseReceived);
assertThat(handler.transportException.getRootCause(), instanceOf(CoordinationStateRejectedException.class));

View File

@ -114,7 +114,8 @@ public class NodeJoinTests extends ESTestCase {
}
private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initialState) {
deterministicTaskQueue = new DeterministicTaskQueue(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build());
deterministicTaskQueue
= new DeterministicTaskQueue(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), random());
FakeThreadPoolMasterService fakeMasterService = new FakeThreadPoolMasterService("test", deterministicTaskQueue::scheduleNow);
setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, deterministicTaskQueue.getThreadPool(), Randomness.get());
fakeMasterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
@ -250,7 +251,7 @@ public class NodeJoinTests extends ESTestCase {
private void joinNodeAndRun(final JoinRequest joinRequest) {
SimpleFuture fut = joinNodeAsync(joinRequest);
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
assertTrue(fut.isDone());
FutureUtils.get(fut);
}
@ -268,7 +269,7 @@ public class NodeJoinTests extends ESTestCase {
SimpleFuture fut = joinNodeAsync(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion))));
assertEquals(Coordinator.Mode.LEADER, coordinator.getMode());
assertNull(coordinator.getStateForMasterService().nodes().getMasterNodeId());
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
assertTrue(fut.isDone());
assertTrue(isLocalNodeElectedMaster());
assertTrue(coordinator.getStateForMasterService().nodes().isLocalNodeElectedMaster());
@ -332,12 +333,12 @@ public class NodeJoinTests extends ESTestCase {
long newTerm = initialTerm + randomLongBetween(1, 10);
SimpleFuture futNode0 = joinNodeAsync(new JoinRequest(node0, Optional.of(
new Join(node0, node0, newTerm, initialTerm, initialVersion))));
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
assertFalse(futNode0.isDone());
assertFalse(isLocalNodeElectedMaster());
SimpleFuture futNode1 = joinNodeAsync(new JoinRequest(node1, Optional.of(
new Join(node1, node0, newTerm, initialTerm, initialVersion))));
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
assertFalse(futNode1.isDone());
assertFalse(isLocalNodeElectedMaster());
joinNodeAndRun(new JoinRequest(node2, Optional.of(new Join(node2, node0, newTerm, initialTerm, initialVersion))));
@ -395,7 +396,7 @@ public class NodeJoinTests extends ESTestCase {
new VotingConfiguration(Collections.singleton(node1.getId()))));
long newTerm = initialTerm + randomLongBetween(1, 10);
SimpleFuture fut = joinNodeAsync(new JoinRequest(node0, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion))));
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
assertFalse(fut.isDone());
assertFalse(isLocalNodeElectedMaster());
synchronized (coordinator.mutex) {

View File

@ -68,7 +68,7 @@ public class PreVoteCollectorTests extends ESTestCase {
@Before
public void createObjects() {
Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
deterministicTaskQueue = new DeterministicTaskQueue(settings);
deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
final MockTransport mockTransport = new MockTransport() {
@Override
protected void onSendRequest(final long requestId, final String action, final TransportRequest request,
@ -128,7 +128,7 @@ public class PreVoteCollectorTests extends ESTestCase {
}
private void runCollector() {
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
assertFalse(deterministicTaskQueue.hasDeferredTasks());
assertFalse(deterministicTaskQueue.hasRunnableTasks());
}
@ -275,7 +275,7 @@ public class PreVoteCollectorTests extends ESTestCase {
}
});
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
assertFalse(deterministicTaskQueue.hasDeferredTasks());
final PreVoteResponse response = responseRef.get();

View File

@ -198,7 +198,7 @@ public class PeerFinderTests extends ESTestCase {
addressResolveDelay = 0L;
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
deterministicTaskQueue = new DeterministicTaskQueue(settings);
deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
localNode = newDiscoveryNode("local-node");
@ -228,7 +228,7 @@ public class PeerFinderTests extends ESTestCase {
@After
public void deactivateAndRunRemainingTasks() {
peerFinder.deactivate(localNode);
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
}
public void testAddsReachableNodesFromUnicastHostsList() {
@ -726,7 +726,7 @@ public class PeerFinderTests extends ESTestCase {
return "assertNotifiedOfAllUpdates";
}
});
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.runAllRunnableTasks();
assertNotifiedOfAllUpdates();
}
}

View File

@ -232,8 +232,8 @@ public class ClusterStateChanges extends AbstractComponent {
public ClusterState joinNodesAndBecomeMaster(ClusterState clusterState, List<DiscoveryNode> nodes) {
List<JoinTaskExecutor.Task> joinNodes = new ArrayList<>();
joinNodes.add(JoinTaskExecutor.BECOME_MASTER_TASK);
joinNodes.add(JoinTaskExecutor.FINISH_ELECTION_TASK);
joinNodes.add(JoinTaskExecutor.newBecomeMasterTask());
joinNodes.add(JoinTaskExecutor.newFinishElectionTask());
joinNodes.addAll(nodes.stream().map(node -> new JoinTaskExecutor.Task(node, "dummy reason"))
.collect(Collectors.toList()));

View File

@ -45,43 +45,38 @@ import java.util.function.Function;
public class DeterministicTaskQueue extends AbstractComponent {
private final List<Runnable> runnableTasks = new ArrayList<>();
private final Random random;
private List<DeferredTask> deferredTasks = new ArrayList<>();
private long currentTimeMillis;
private long nextDeferredTaskExecutionTimeMillis = Long.MAX_VALUE;
private long executionDelayVariabilityMillis;
public DeterministicTaskQueue(Settings settings) {
public DeterministicTaskQueue(Settings settings, Random random) {
super(settings);
this.random = random;
}
public void runAllTasks() {
while (true) {
runAllRunnableTasks();
if (hasDeferredTasks()) {
advanceTime();
} else {
break;
}
}
public long getExecutionDelayVariabilityMillis() {
return executionDelayVariabilityMillis;
}
public void setExecutionDelayVariabilityMillis(long executionDelayVariabilityMillis) {
assert executionDelayVariabilityMillis >= 0 : executionDelayVariabilityMillis;
this.executionDelayVariabilityMillis = executionDelayVariabilityMillis;
}
public void runAllRunnableTasks() {
while (hasRunnableTasks()) {
runNextTask();
runRandomTask();
}
}
public void runAllRunnableTasks(Random random) {
while (hasRunnableTasks()) {
runRandomTask(random);
}
}
public void runAllTasks(Random random) {
public void runAllTasks() {
while (hasDeferredTasks() || hasRunnableTasks()) {
if (hasDeferredTasks() && random.nextBoolean()) {
advanceTime();
} else if (hasRunnableTasks()) {
runRandomTask(random);
runRandomTask();
}
}
}
@ -107,18 +102,10 @@ public class DeterministicTaskQueue extends AbstractComponent {
return currentTimeMillis;
}
/**
* Runs the first runnable task.
*/
public void runNextTask() {
assert hasRunnableTasks();
runTask(0);
}
/**
* Runs an arbitrary runnable task.
*/
public void runRandomTask(final Random random) {
public void runRandomTask() {
assert hasRunnableTasks();
runTask(RandomNumbers.randomIntBetween(random, 0, runnableTasks.size() - 1));
}
@ -133,25 +120,38 @@ public class DeterministicTaskQueue extends AbstractComponent {
* Schedule a task for immediate execution.
*/
public void scheduleNow(final Runnable task) {
logger.trace("scheduleNow: adding runnable {}", task);
runnableTasks.add(task);
if (executionDelayVariabilityMillis > 0 && random.nextBoolean()) {
final long executionDelay = RandomNumbers.randomLongBetween(random, 1, executionDelayVariabilityMillis);
final DeferredTask deferredTask = new DeferredTask(currentTimeMillis + executionDelay, task);
logger.trace("scheduleNow: delaying [{}ms], scheduling {}", executionDelay, deferredTask);
scheduleDeferredTask(deferredTask);
} else {
logger.trace("scheduleNow: adding runnable {}", task);
runnableTasks.add(task);
}
}
/**
* Schedule a task for future execution.
*/
public void scheduleAt(final long executionTimeMillis, final Runnable task) {
if (executionTimeMillis <= currentTimeMillis) {
final long extraDelayMillis = RandomNumbers.randomLongBetween(random, 0, executionDelayVariabilityMillis);
final long actualExecutionTimeMillis = executionTimeMillis + extraDelayMillis;
if (actualExecutionTimeMillis <= currentTimeMillis) {
logger.trace("scheduleAt: [{}ms] is not in the future, adding runnable {}", executionTimeMillis, task);
runnableTasks.add(task);
} else {
final DeferredTask deferredTask = new DeferredTask(executionTimeMillis, task);
logger.trace("scheduleAt: adding {}", deferredTask);
nextDeferredTaskExecutionTimeMillis = Math.min(nextDeferredTaskExecutionTimeMillis, executionTimeMillis);
deferredTasks.add(deferredTask);
final DeferredTask deferredTask = new DeferredTask(actualExecutionTimeMillis, task);
logger.trace("scheduleAt: adding {} with extra delay of [{}ms]", deferredTask, extraDelayMillis);
scheduleDeferredTask(deferredTask);
}
}
private void scheduleDeferredTask(DeferredTask deferredTask) {
nextDeferredTaskExecutionTimeMillis = Math.min(nextDeferredTaskExecutionTimeMillis, deferredTask.getExecutionTimeMillis());
deferredTasks.add(deferredTask);
}
/**
* Advance the current time to the time of the next deferred task, and update the sets of deferred and runnable tasks accordingly.
*/

View File

@ -37,12 +37,15 @@ import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.Is.is;
public class DeterministicTaskQueueTests extends ESTestCase {
public void testRunNextTask() {
public void testRunRandomTask() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
final List<String> strings = new ArrayList<>(2);
@ -52,24 +55,24 @@ public class DeterministicTaskQueueTests extends ESTestCase {
assertThat(strings, empty());
assertTrue(taskQueue.hasRunnableTasks());
taskQueue.runNextTask();
assertThat(strings, contains("foo"));
taskQueue.runRandomTask();
assertThat(strings, contains(isOneOf("foo", "bar")));
assertTrue(taskQueue.hasRunnableTasks());
taskQueue.runNextTask();
assertThat(strings, contains("foo", "bar"));
taskQueue.runRandomTask();
assertThat(strings, containsInAnyOrder("foo", "bar"));
assertFalse(taskQueue.hasRunnableTasks());
}
public void testRunRandomTask() {
public void testRunRandomTaskVariesOrder() {
final List<String> strings1 = getResultsOfRunningRandomly(new Random(4520795446362137264L));
final List<String> strings2 = getResultsOfRunningRandomly(new Random(266504691902226821L));
assertThat(strings1, not(equalTo(strings2)));
}
private List<String> getResultsOfRunningRandomly(Random random) {
final DeterministicTaskQueue taskQueue = newTaskQueue();
final DeterministicTaskQueue taskQueue = newTaskQueue(random);
final List<String> strings = new ArrayList<>(4);
taskQueue.scheduleNow(() -> strings.add("foo"));
@ -80,7 +83,7 @@ public class DeterministicTaskQueueTests extends ESTestCase {
assertThat(strings, empty());
while (taskQueue.hasRunnableTasks()) {
taskQueue.runRandomTask(random);
taskQueue.runRandomTask();
}
assertThat(strings, containsInAnyOrder("foo", "bar", "baz", "quux"));
@ -96,7 +99,7 @@ public class DeterministicTaskQueueTests extends ESTestCase {
taskQueue.scheduleAt(randomLongBetween(1, 100), () -> {
});
taskQueue.advanceTime();
taskQueue.runNextTask();
taskQueue.runRandomTask();
assertFalse(taskQueue.hasRunnableTasks());
assertFalse(taskQueue.hasDeferredTasks());
}
@ -110,7 +113,7 @@ public class DeterministicTaskQueueTests extends ESTestCase {
taskQueue.scheduleAt(taskQueue.getCurrentTimeMillis(), () -> strings.add("foo"));
assertTrue(taskQueue.hasRunnableTasks());
assertFalse(taskQueue.hasDeferredTasks());
taskQueue.runNextTask();
taskQueue.runRandomTask();
assertThat(strings, contains("foo"));
assertFalse(taskQueue.hasRunnableTasks());
@ -125,7 +128,7 @@ public class DeterministicTaskQueueTests extends ESTestCase {
taskQueue.scheduleAt(taskQueue.getCurrentTimeMillis() - randomInt(200), () -> strings.add("foo"));
assertTrue(taskQueue.hasRunnableTasks());
assertFalse(taskQueue.hasDeferredTasks());
taskQueue.runNextTask();
taskQueue.runRandomTask();
assertThat(strings, contains("foo"));
assertFalse(taskQueue.hasRunnableTasks());
@ -146,7 +149,7 @@ public class DeterministicTaskQueueTests extends ESTestCase {
assertTrue(taskQueue.hasRunnableTasks());
assertFalse(taskQueue.hasDeferredTasks());
taskQueue.runNextTask();
taskQueue.runRandomTask();
assertThat(strings, contains("foo"));
assertFalse(taskQueue.hasRunnableTasks());
@ -172,7 +175,7 @@ public class DeterministicTaskQueueTests extends ESTestCase {
assertTrue(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());
taskQueue.runNextTask();
taskQueue.runRandomTask();
assertThat(strings, contains("foo"));
assertFalse(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());
@ -182,7 +185,7 @@ public class DeterministicTaskQueueTests extends ESTestCase {
assertTrue(taskQueue.hasRunnableTasks());
assertFalse(taskQueue.hasDeferredTasks());
taskQueue.runNextTask();
taskQueue.runRandomTask();
assertThat(strings, contains("foo", "bar"));
}
@ -205,7 +208,7 @@ public class DeterministicTaskQueueTests extends ESTestCase {
assertTrue(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());
taskQueue.runNextTask();
taskQueue.runRandomTask();
assertThat(strings, contains("foo"));
assertFalse(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());
@ -218,9 +221,9 @@ public class DeterministicTaskQueueTests extends ESTestCase {
assertTrue(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());
taskQueue.runNextTask();
taskQueue.runRandomTask();
taskQueue.advanceTime();
taskQueue.runNextTask();
taskQueue.runRandomTask();
assertThat(strings, contains("foo", "baz", "bar"));
assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis2));
assertFalse(taskQueue.hasRunnableTasks());
@ -240,7 +243,7 @@ public class DeterministicTaskQueueTests extends ESTestCase {
assertThat(strings, empty());
while (taskQueue.hasRunnableTasks()) {
taskQueue.runRandomTask(random());
taskQueue.runRandomTask();
}
assertThat(strings, containsInAnyOrder("foo", "bar"));
@ -259,7 +262,7 @@ public class DeterministicTaskQueueTests extends ESTestCase {
assertThat(strings, empty());
while (taskQueue.hasRunnableTasks()) {
taskQueue.runRandomTask(random());
taskQueue.runRandomTask();
}
assertThat(strings, containsInAnyOrder("foo", "bar"));
@ -315,7 +318,7 @@ public class DeterministicTaskQueueTests extends ESTestCase {
taskQueue.runAllTasks();
assertThat(taskQueue.getCurrentTimeMillis(), is(startTime + delayMillis));
assertThat(strings, contains("runnable", "also runnable", "deferred"));
assertThat(strings, containsInAnyOrder("runnable", "also runnable", "deferred"));
final long delayMillis1 = randomLongBetween(2, 100);
final long delayMillis2 = randomLongBetween(1, delayMillis1 - 1);
@ -333,12 +336,49 @@ public class DeterministicTaskQueueTests extends ESTestCase {
final ScheduledFuture<?> future = threadPool.schedule(cancelledDelay, "", () -> strings.add("cancelled before execution"));
future.cancel(false);
taskQueue.runAllTasks(random());
taskQueue.runAllTasks();
assertThat(strings, contains("runnable", "also runnable", "deferred", "not quite so deferred", "further deferred"));
assertThat(strings, containsInAnyOrder("runnable", "also runnable", "deferred", "not quite so deferred", "further deferred"));
}
public void testDelayVariabilityAppliesToImmediateTasks() {
final DeterministicTaskQueue deterministicTaskQueue = newTaskQueue();
advanceToRandomTime(deterministicTaskQueue);
final long variabilityMillis = randomLongBetween(100, 500);
deterministicTaskQueue.setExecutionDelayVariabilityMillis(variabilityMillis);
for (int i = 0; i < 100; i++) {
deterministicTaskQueue.scheduleNow(() -> {});
}
final long startTime = deterministicTaskQueue.getCurrentTimeMillis();
deterministicTaskQueue.runAllTasks();
final long elapsedTime = deterministicTaskQueue.getCurrentTimeMillis() - startTime;
assertThat(elapsedTime, greaterThan(0L)); // fails with negligible probability 2^{-100}
assertThat(elapsedTime, lessThanOrEqualTo(variabilityMillis));
}
public void testDelayVariabilityAppliesToFutureTasks() {
final DeterministicTaskQueue deterministicTaskQueue = newTaskQueue();
advanceToRandomTime(deterministicTaskQueue);
final long delayMillis = randomLongBetween(30000, 60000);
final long variabilityMillis = randomLongBetween(100, 500);
deterministicTaskQueue.setExecutionDelayVariabilityMillis(variabilityMillis);
for (int i = 0; i < 100; i++) {
deterministicTaskQueue.scheduleAt(delayMillis, () -> {});
}
final long startTime = deterministicTaskQueue.getCurrentTimeMillis();
deterministicTaskQueue.runAllTasks();
final long elapsedTime = deterministicTaskQueue.getCurrentTimeMillis() - startTime;
assertThat(elapsedTime, greaterThan(delayMillis)); // fails with negligible probability
assertThat(elapsedTime, lessThanOrEqualTo(delayMillis + variabilityMillis));
}
private static DeterministicTaskQueue newTaskQueue() {
return new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build());
return newTaskQueue(random());
}
private static DeterministicTaskQueue newTaskQueue(Random random) {
return new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random);
}
}

View File

@ -95,7 +95,7 @@ public class DisruptableMockTransportTests extends ESTestCase {
List<DisruptableMockTransport> transports = new ArrayList<>();
deterministicTaskQueue = new DeterministicTaskQueue(
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "dummy").build());
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "dummy").build(), random());
transport1 = new DisruptableMockTransport(logger) {
@Override