Introduce deterministic task queue (#32197)

The cluster coordination layer relies on timeouts to ensure that a cluster can
successfully form, and must also deal with concurrent activity in the cluster.
This commit introduces some infrastructure that will help us to
deterministically test components that use concurrency and/or timeouts.
This commit is contained in:
David Turner 2018-07-20 16:05:44 +01:00 committed by GitHub
parent 384cc5455b
commit d209210019
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 718 additions and 3 deletions

View File

@ -0,0 +1,33 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.common.unit.TimeValue;
/**
* Device which supports running a task after some delay has elapsed.
*/
public interface FutureExecutor {
/**
* Schedule the given task for execution after the given delay has elapsed.
*/
void schedule(Runnable task, TimeValue delay);
}

View File

@ -362,9 +362,13 @@ public class ThreadPool extends AbstractComponent implements Scheduler, Closeabl
return getThreadContext().preserveContext(command);
}
public void shutdown() {
protected final void stopCachedTimeThread() {
cachedTimeThread.running = false;
cachedTimeThread.interrupt();
}
public void shutdown() {
stopCachedTimeThread();
scheduler.shutdown();
for (ExecutorHolder executor : executors.values()) {
if (executor.executor() instanceof ThreadPoolExecutor) {
@ -374,8 +378,7 @@ public class ThreadPool extends AbstractComponent implements Scheduler, Closeabl
}
public void shutdownNow() {
cachedTimeThread.running = false;
cachedTimeThread.interrupt();
stopCachedTimeThread();
scheduler.shutdownNow();
for (ExecutorHolder executor : executors.values()) {
if (executor.executor() instanceof ThreadPoolExecutor) {

View File

@ -0,0 +1,360 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import org.apache.lucene.util.Counter;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolInfo;
import org.elasticsearch.threadpool.ThreadPoolStats;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class DeterministicTaskQueue extends AbstractComponent {
private final List<Runnable> runnableTasks = new ArrayList<>();
private List<DeferredTask> deferredTasks = new ArrayList<>();
private long currentTimeMillis;
private long nextDeferredTaskExecutionTimeMillis = Long.MAX_VALUE;
public DeterministicTaskQueue(Settings settings) {
super(settings);
}
/**
* @return whether there are any runnable tasks.
*/
public boolean hasRunnableTasks() {
return runnableTasks.isEmpty() == false;
}
/**
* @return whether there are any deferred tasks, i.e. tasks that are scheduled for the future.
*/
public boolean hasDeferredTasks() {
return deferredTasks.isEmpty() == false;
}
/**
* @return the current (simulated) time, in milliseconds.
*/
public long getCurrentTimeMillis() {
return currentTimeMillis;
}
/**
* Runs the first runnable task.
*/
public void runNextTask() {
assert hasRunnableTasks();
runTask(0);
}
/**
* Runs an arbitrary runnable task.
*/
public void runRandomTask(final Random random) {
assert hasRunnableTasks();
runTask(RandomNumbers.randomIntBetween(random, 0, runnableTasks.size() - 1));
}
private void runTask(final int index) {
final Runnable task = runnableTasks.remove(index);
logger.trace("running task {} of {}: {}", index, runnableTasks.size() + 1, task);
task.run();
}
/**
* Schedule a task for immediate execution.
*/
public void scheduleNow(final Runnable task) {
logger.trace("scheduleNow: adding runnable {}", task);
runnableTasks.add(task);
}
/**
* Schedule a task for future execution.
*/
public void scheduleAt(final long executionTimeMillis, final Runnable task) {
if (executionTimeMillis <= currentTimeMillis) {
logger.trace("scheduleAt: [{}ms] is not in the future, adding runnable {}", executionTimeMillis, task);
runnableTasks.add(task);
} else {
final DeferredTask deferredTask = new DeferredTask(executionTimeMillis, task);
logger.trace("scheduleAt: adding {}", deferredTask);
nextDeferredTaskExecutionTimeMillis = Math.min(nextDeferredTaskExecutionTimeMillis, executionTimeMillis);
deferredTasks.add(deferredTask);
}
}
/**
* Advance the current time to the time of the next deferred task, and update the sets of deferred and runnable tasks accordingly.
*/
public void advanceTime() {
assert hasDeferredTasks();
assert currentTimeMillis < nextDeferredTaskExecutionTimeMillis;
logger.trace("advanceTime: from [{}ms] to [{}ms]", currentTimeMillis, nextDeferredTaskExecutionTimeMillis);
currentTimeMillis = nextDeferredTaskExecutionTimeMillis;
nextDeferredTaskExecutionTimeMillis = Long.MAX_VALUE;
List<DeferredTask> remainingDeferredTasks = new ArrayList<>();
for (final DeferredTask deferredTask : deferredTasks) {
assert currentTimeMillis <= deferredTask.getExecutionTimeMillis();
if (deferredTask.getExecutionTimeMillis() == currentTimeMillis) {
logger.trace("advanceTime: no longer deferred: {}", deferredTask);
runnableTasks.add(deferredTask.getTask());
} else {
remainingDeferredTasks.add(deferredTask);
nextDeferredTaskExecutionTimeMillis = Math.min(nextDeferredTaskExecutionTimeMillis, deferredTask.getExecutionTimeMillis());
}
}
deferredTasks = remainingDeferredTasks;
assert deferredTasks.isEmpty() == (nextDeferredTaskExecutionTimeMillis == Long.MAX_VALUE);
}
/**
* @return A <code>FutureExecutor</code> that uses this task queue.
*/
public FutureExecutor getFutureExecutor() {
return (task, delay) -> scheduleAt(currentTimeMillis + delay.millis(), task);
}
/**
* @return A <code>ExecutorService</code> that uses this task queue.
*/
public ExecutorService getExecutorService() {
return new ExecutorService() {
@Override
public void shutdown() {
throw new UnsupportedOperationException();
}
@Override
public List<Runnable> shutdownNow() {
throw new UnsupportedOperationException();
}
@Override
public boolean isShutdown() {
throw new UnsupportedOperationException();
}
@Override
public boolean isTerminated() {
throw new UnsupportedOperationException();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public <T> Future<T> submit(Callable<T> task) {
throw new UnsupportedOperationException();
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
throw new UnsupportedOperationException();
}
@Override
public Future<?> submit(Runnable task) {
throw new UnsupportedOperationException();
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
throw new UnsupportedOperationException();
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) {
throw new UnsupportedOperationException();
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public void execute(Runnable command) {
scheduleNow(command);
}
};
}
/**
* @return A <code>ThreadPool</code> that uses this task queue.
*/
public ThreadPool getThreadPool() {
return new ThreadPool(settings) {
{
stopCachedTimeThread();
}
@Override
public long relativeTimeInMillis() {
return currentTimeMillis;
}
@Override
public long absoluteTimeInMillis() {
return currentTimeMillis;
}
@Override
public Counter estimatedTimeInMillisCounter() {
return new Counter() {
@Override
public long addAndGet(long delta) {
throw new UnsupportedOperationException();
}
@Override
public long get() {
return currentTimeMillis;
}
};
}
@Override
public ThreadPoolInfo info() {
throw new UnsupportedOperationException();
}
@Override
public Info info(String name) {
throw new UnsupportedOperationException();
}
@Override
public ThreadPoolStats stats() {
throw new UnsupportedOperationException();
}
@Override
public ExecutorService generic() {
return getExecutorService();
}
@Override
public ExecutorService executor(String name) {
return getExecutorService();
}
@Override
public ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable command) {
throw new UnsupportedOperationException();
}
@Override
public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, String executor) {
throw new UnsupportedOperationException();
}
@Override
public Runnable preserveContext(Runnable command) {
throw new UnsupportedOperationException();
}
@Override
public void shutdown() {
throw new UnsupportedOperationException();
}
@Override
public void shutdownNow() {
throw new UnsupportedOperationException();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public ScheduledExecutorService scheduler() {
throw new UnsupportedOperationException();
}
@Override
public void close() {
throw new UnsupportedOperationException();
}
@Override
public ThreadContext getThreadContext() {
throw new UnsupportedOperationException();
}
};
}
private static class DeferredTask {
private final long executionTimeMillis;
private final Runnable task;
DeferredTask(long executionTimeMillis, Runnable task) {
this.executionTimeMillis = executionTimeMillis;
this.task = task;
assert executionTimeMillis < Long.MAX_VALUE : "Long.MAX_VALUE is special, cannot be an execution time";
}
long getExecutionTimeMillis() {
return executionTimeMillis;
}
Runnable getTask() {
return task;
}
@Override
public String toString() {
return "DeferredTask{" +
"executionTimeMillis=" + executionTimeMillis +
", task=" + task +
'}';
}
}
}

View File

@ -0,0 +1,319 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.Is.is;
public class DeterministicTaskQueueTests extends ESTestCase {
public void testRunNextTask() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
final List<String> strings = new ArrayList<>(2);
taskQueue.scheduleNow(() -> strings.add("foo"));
taskQueue.scheduleNow(() -> strings.add("bar"));
assertThat(strings, empty());
assertTrue(taskQueue.hasRunnableTasks());
taskQueue.runNextTask();
assertThat(strings, contains("foo"));
assertTrue(taskQueue.hasRunnableTasks());
taskQueue.runNextTask();
assertThat(strings, contains("foo", "bar"));
assertFalse(taskQueue.hasRunnableTasks());
}
public void testRunRandomTask() {
final List<String> strings1 = getResultsOfRunningRandomly(new Random(4520795446362137264L));
final List<String> strings2 = getResultsOfRunningRandomly(new Random(266504691902226821L));
assertThat(strings1, not(equalTo(strings2)));
}
private List<String> getResultsOfRunningRandomly(Random random) {
final DeterministicTaskQueue taskQueue = newTaskQueue();
final List<String> strings = new ArrayList<>(4);
taskQueue.scheduleNow(() -> strings.add("foo"));
taskQueue.scheduleNow(() -> strings.add("bar"));
taskQueue.scheduleNow(() -> strings.add("baz"));
taskQueue.scheduleNow(() -> strings.add("quux"));
assertThat(strings, empty());
while (taskQueue.hasRunnableTasks()) {
taskQueue.runRandomTask(random);
}
assertThat(strings, containsInAnyOrder("foo", "bar", "baz", "quux"));
return strings;
}
public void testStartsAtTimeZero() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
assertThat(taskQueue.getCurrentTimeMillis(), is(0L));
}
private void advanceToRandomTime(DeterministicTaskQueue taskQueue) {
taskQueue.scheduleAt(randomLongBetween(1, 100), () -> {
});
taskQueue.advanceTime();
taskQueue.runNextTask();
assertFalse(taskQueue.hasRunnableTasks());
assertFalse(taskQueue.hasDeferredTasks());
}
public void testDoesNotDeferTasksForImmediateExecution() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
advanceToRandomTime(taskQueue);
final List<String> strings = new ArrayList<>(1);
taskQueue.scheduleAt(taskQueue.getCurrentTimeMillis(), () -> strings.add("foo"));
assertTrue(taskQueue.hasRunnableTasks());
assertFalse(taskQueue.hasDeferredTasks());
taskQueue.runNextTask();
assertThat(strings, contains("foo"));
assertFalse(taskQueue.hasRunnableTasks());
}
public void testDoesNotDeferTasksScheduledInThePast() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
advanceToRandomTime(taskQueue);
final List<String> strings = new ArrayList<>(1);
taskQueue.scheduleAt(taskQueue.getCurrentTimeMillis() - randomInt(200), () -> strings.add("foo"));
assertTrue(taskQueue.hasRunnableTasks());
assertFalse(taskQueue.hasDeferredTasks());
taskQueue.runNextTask();
assertThat(strings, contains("foo"));
assertFalse(taskQueue.hasRunnableTasks());
}
public void testDefersTasksWithPositiveDelays() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
final List<String> strings = new ArrayList<>(1);
final long executionTimeMillis = randomLongBetween(1, 100);
taskQueue.scheduleAt(executionTimeMillis, () -> strings.add("foo"));
assertThat(taskQueue.getCurrentTimeMillis(), is(0L));
assertFalse(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());
taskQueue.advanceTime();
assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis));
assertTrue(taskQueue.hasRunnableTasks());
assertFalse(taskQueue.hasDeferredTasks());
taskQueue.runNextTask();
assertThat(strings, contains("foo"));
assertFalse(taskQueue.hasRunnableTasks());
assertFalse(taskQueue.hasDeferredTasks());
}
public void testKeepsFutureTasksDeferred() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
final List<String> strings = new ArrayList<>(2);
final long executionTimeMillis1 = randomLongBetween(1, 100);
final long executionTimeMillis2 = randomLongBetween(executionTimeMillis1 + 1, 200);
taskQueue.scheduleAt(executionTimeMillis1, () -> strings.add("foo"));
taskQueue.scheduleAt(executionTimeMillis2, () -> strings.add("bar"));
assertThat(taskQueue.getCurrentTimeMillis(), is(0L));
assertFalse(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());
taskQueue.advanceTime();
assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis1));
assertTrue(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());
taskQueue.runNextTask();
assertThat(strings, contains("foo"));
assertFalse(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());
taskQueue.advanceTime();
assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis2));
assertTrue(taskQueue.hasRunnableTasks());
assertFalse(taskQueue.hasDeferredTasks());
taskQueue.runNextTask();
assertThat(strings, contains("foo", "bar"));
}
public void testExecutesTasksInTimeOrder() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
final List<String> strings = new ArrayList<>(3);
final long executionTimeMillis1 = randomLongBetween(1, 100);
final long executionTimeMillis2 = randomLongBetween(executionTimeMillis1 + 100, 300);
taskQueue.scheduleAt(executionTimeMillis1, () -> strings.add("foo"));
taskQueue.scheduleAt(executionTimeMillis2, () -> strings.add("bar"));
assertThat(taskQueue.getCurrentTimeMillis(), is(0L));
assertFalse(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());
taskQueue.advanceTime();
assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis1));
assertTrue(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());
taskQueue.runNextTask();
assertThat(strings, contains("foo"));
assertFalse(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());
final long executionTimeMillis3 = randomLongBetween(executionTimeMillis1 + 1, executionTimeMillis2 - 1);
taskQueue.scheduleAt(executionTimeMillis3, () -> strings.add("baz"));
taskQueue.advanceTime();
assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis3));
assertTrue(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());
taskQueue.runNextTask();
taskQueue.advanceTime();
taskQueue.runNextTask();
assertThat(strings, contains("foo", "baz", "bar"));
assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis2));
assertFalse(taskQueue.hasRunnableTasks());
assertFalse(taskQueue.hasDeferredTasks());
}
public void testExecutorServiceEnqueuesTasks() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
final List<String> strings = new ArrayList<>(2);
final ExecutorService executorService = taskQueue.getExecutorService();
assertFalse(taskQueue.hasRunnableTasks());
executorService.execute(() -> strings.add("foo"));
assertTrue(taskQueue.hasRunnableTasks());
executorService.execute(() -> strings.add("bar"));
assertThat(strings, empty());
while (taskQueue.hasRunnableTasks()) {
taskQueue.runRandomTask(random());
}
assertThat(strings, containsInAnyOrder("foo", "bar"));
}
public void testThreadPoolEnqueuesTasks() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
final List<String> strings = new ArrayList<>(2);
final ThreadPool threadPool = taskQueue.getThreadPool();
assertFalse(taskQueue.hasRunnableTasks());
threadPool.generic().execute(() -> strings.add("foo"));
assertTrue(taskQueue.hasRunnableTasks());
threadPool.executor("anything").execute(() -> strings.add("bar"));
assertThat(strings, empty());
while (taskQueue.hasRunnableTasks()) {
taskQueue.runRandomTask(random());
}
assertThat(strings, containsInAnyOrder("foo", "bar"));
}
public void testFutureExecutorSchedulesTasks() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
advanceToRandomTime(taskQueue);
final long startTime = taskQueue.getCurrentTimeMillis();
final List<String> strings = new ArrayList<>(5);
final FutureExecutor futureExecutor = taskQueue.getFutureExecutor();
final long delayMillis = randomLongBetween(1, 100);
futureExecutor.schedule(() -> strings.add("deferred"), TimeValue.timeValueMillis(delayMillis));
assertFalse(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());
futureExecutor.schedule(() -> strings.add("runnable"), TimeValue.ZERO);
assertTrue(taskQueue.hasRunnableTasks());
futureExecutor.schedule(() -> strings.add("also runnable"), TimeValue.MINUS_ONE);
runAllTasks(taskQueue);
assertThat(taskQueue.getCurrentTimeMillis(), is(startTime + delayMillis));
assertThat(strings, contains("runnable", "also runnable", "deferred"));
final long delayMillis1 = randomLongBetween(2, 100);
final long delayMillis2 = randomLongBetween(1, delayMillis1 - 1);
futureExecutor.schedule(() -> strings.add("further deferred"), TimeValue.timeValueMillis(delayMillis1));
futureExecutor.schedule(() -> strings.add("not quite so deferred"), TimeValue.timeValueMillis(delayMillis2));
assertFalse(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());
runAllTasks(taskQueue);
assertThat(taskQueue.getCurrentTimeMillis(), is(startTime + delayMillis + delayMillis1));
assertThat(strings, contains("runnable", "also runnable", "deferred", "not quite so deferred", "further deferred"));
}
private static void runAllTasks(DeterministicTaskQueue taskQueue) {
while (true) {
while (taskQueue.hasRunnableTasks()) {
taskQueue.runNextTask();
}
if (taskQueue.hasDeferredTasks()) {
taskQueue.advanceTime();
} else {
break;
}
}
}
private static DeterministicTaskQueue newTaskQueue() {
return new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build());
}
}