Stop exposing ExecutorService from DeterministicTaskQueue (#56001)

There are no real users of `DeterministicTaskQueue#getExecutorService()` so we
can remove those public methods and expose the `ExecutorService` only through
the corresponding `ThreadPool`.
This commit is contained in:
David Turner 2020-04-30 11:34:26 +01:00
parent c2afbf20de
commit 445cf32591
2 changed files with 70 additions and 117 deletions

View File

@ -198,86 +198,6 @@ public class DeterministicTaskQueue {
assert deferredTasks.isEmpty() == (nextDeferredTaskExecutionTimeMillis == Long.MAX_VALUE);
}
/**
* @return A <code>ExecutorService</code> that uses this task queue.
*/
public ExecutorService getExecutorService() {
return getExecutorService(Function.identity());
}
/**
* @return A <code>ExecutorService</code> that uses this task queue and wraps <code>Runnable</code>s in the given wrapper.
*/
public ExecutorService getExecutorService(Function<Runnable, Runnable> runnableWrapper) {
return new ExecutorService() {
@Override
public void shutdown() {
throw new UnsupportedOperationException();
}
@Override
public List<Runnable> shutdownNow() {
throw new UnsupportedOperationException();
}
@Override
public boolean isShutdown() {
throw new UnsupportedOperationException();
}
@Override
public boolean isTerminated() {
throw new UnsupportedOperationException();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public <T> Future<T> submit(Callable<T> task) {
throw new UnsupportedOperationException();
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
throw new UnsupportedOperationException();
}
@Override
public Future<?> submit(Runnable task) {
throw new UnsupportedOperationException();
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
throw new UnsupportedOperationException();
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) {
throw new UnsupportedOperationException();
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public void execute(Runnable command) {
scheduleNow(runnableWrapper.apply(command));
}
};
}
/**
* @return A <code>ThreadPool</code> that uses this task queue.
*/
@ -289,15 +209,82 @@ public class DeterministicTaskQueue {
* @return A <code>ThreadPool</code> that uses this task queue and wraps <code>Runnable</code>s in the given wrapper.
*/
public ThreadPool getThreadPool(Function<Runnable, Runnable> runnableWrapper) {
final ExecutorService forkingExecutor = getExecutorService(runnableWrapper);
return new ThreadPool(settings) {
private final Map<String, ThreadPool.Info> infos = new HashMap<>();
{
stopCachedTimeThread();
}
private final Map<String, ThreadPool.Info> infos = new HashMap<>();
private final ExecutorService forkingExecutor = new ExecutorService() {
@Override
public void shutdown() {
throw new UnsupportedOperationException();
}
@Override
public List<Runnable> shutdownNow() {
throw new UnsupportedOperationException();
}
@Override
public boolean isShutdown() {
throw new UnsupportedOperationException();
}
@Override
public boolean isTerminated() {
throw new UnsupportedOperationException();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public <T> Future<T> submit(Callable<T> task) {
throw new UnsupportedOperationException();
}
@Override
public <T> Future<T> submit(Runnable task, T result1) {
throw new UnsupportedOperationException();
}
@Override
public Future<?> submit(Runnable task) {
throw new UnsupportedOperationException();
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
throw new UnsupportedOperationException();
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) {
throw new UnsupportedOperationException();
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public void execute(Runnable command) {
scheduleNow(runnableWrapper.apply(command));
}
};
@Override
public long relativeTimeInMillis() {
return currentTimeMillis;

View File

@ -28,7 +28,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -245,25 +244,6 @@ public class DeterministicTaskQueueTests extends ESTestCase {
assertThat(strings, contains("foo", "bar"));
}
public void testExecutorServiceEnqueuesTasks() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
final List<String> strings = new ArrayList<>(2);
final ExecutorService executorService = taskQueue.getExecutorService();
assertFalse(taskQueue.hasRunnableTasks());
executorService.execute(() -> strings.add("foo"));
assertTrue(taskQueue.hasRunnableTasks());
executorService.execute(() -> strings.add("bar"));
assertThat(strings, empty());
while (taskQueue.hasRunnableTasks()) {
taskQueue.runRandomTask();
}
assertThat(strings, containsInAnyOrder("foo", "bar"));
}
public void testThreadPoolEnqueuesTasks() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
final List<String> strings = new ArrayList<>(2);
@ -297,20 +277,6 @@ public class DeterministicTaskQueueTests extends ESTestCase {
assertTrue(called.get());
}
public void testExecutorServiceWrapsRunnable() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
final AtomicBoolean called = new AtomicBoolean();
final ExecutorService executorService = taskQueue.getExecutorService(runnable -> () -> {
assertFalse(called.get());
called.set(true);
runnable.run();
});
executorService.execute(() -> logger.info("runnable executed"));
assertFalse(called.get());
taskQueue.runAllRunnableTasks();
assertTrue(called.get());
}
public void testThreadPoolSchedulesFutureTasks() {
final DeterministicTaskQueue taskQueue = newTaskQueue();
advanceToRandomTime(taskQueue);