diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index fbb9f65414a..2dad4663dc4 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -25,7 +25,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.node.Node; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -75,6 +78,50 @@ public class EsExecutors { return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder); } + private static final ExecutorService DIRECT_EXECUTOR_SERVICE = new AbstractExecutorService() { + + @Override + public void shutdown() { + throw new UnsupportedOperationException(); + } + + @Override + public List shutdownNow() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public void execute(Runnable command) { + command.run(); + } + + }; + + /** + * Returns an {@link ExecutorService} that executes submitted tasks on the current thread. This executor service does not support being + * shutdown. + * + * @return an {@link ExecutorService} that executes submitted tasks on the current thread + */ + public static ExecutorService newDirectExecutorService() { + return DIRECT_EXECUTOR_SERVICE; + } + public static String threadName(Settings settings, String ... names) { String namePrefix = Arrays diff --git a/core/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java b/core/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java index 9e5469fd16a..0ba7b2d100a 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java +++ b/core/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java @@ -29,7 +29,7 @@ import org.elasticsearch.node.Node; import java.util.Arrays; import java.util.List; import java.util.Locale; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; /** @@ -102,7 +102,7 @@ public final class FixedExecutorBuilder extends ExecutorBuilder command.run(); + static final ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService(); private final ThreadContext threadContext; @@ -278,23 +278,26 @@ public class ThreadPool extends AbstractComponent implements Closeable { } /** - * Get the generic executor. This executor's {@link Executor#execute(Runnable)} method will run the Runnable it is given in - * the {@link ThreadContext} of the thread that queues it. + * Get the generic executor service. This executor service {@link Executor#execute(Runnable)} method will run the {@link Runnable} it + * is given in the {@link ThreadContext} of the thread that queues it. */ - public Executor generic() { + public ExecutorService generic() { return executor(Names.GENERIC); } /** - * Get the executor with the given name. This executor's {@link Executor#execute(Runnable)} method will run the Runnable it is given in - * the {@link ThreadContext} of the thread that queues it. + * Get the executor service with the given name. This executor service's {@link Executor#execute(Runnable)} method will run the + * {@link Runnable} it is given in the {@link ThreadContext} of the thread that queues it. + * + * @param name the name of the executor service to obtain + * @throws IllegalArgumentException if no executor service with the specified name exists */ - public Executor executor(String name) { - Executor executor = executors.get(name).executor(); - if (executor == null) { - throw new IllegalArgumentException("No executor found for [" + name + "]"); + public ExecutorService executor(String name) { + final ExecutorHolder holder = executors.get(name); + if (holder == null) { + throw new IllegalArgumentException("no executor service found for [" + name + "]"); } - return executor; + return holder.executor(); } public ScheduledExecutorService scheduler() { @@ -515,16 +518,16 @@ public class ThreadPool extends AbstractComponent implements Closeable { } static class ExecutorHolder { - private final Executor executor; + private final ExecutorService executor; public final Info info; - ExecutorHolder(Executor executor, Info info) { + ExecutorHolder(ExecutorService executor, Info info) { assert executor instanceof EsThreadPoolExecutor || executor == DIRECT_EXECUTOR; this.executor = executor; this.info = info; } - Executor executor() { + ExecutorService executor() { return executor; } } diff --git a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java index 9dbef147c01..68a79c345e6 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java @@ -29,13 +29,14 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilterChain; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.PipelineExecutionService; import org.elasticsearch.ingest.PipelineStore; -import org.elasticsearch.ingest.TestProcessor; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.Processor; +import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; @@ -43,6 +44,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; import org.mockito.stubbing.Answer; +import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import static org.hamcrest.Matchers.equalTo; @@ -158,7 +160,8 @@ public class IngestActionFilterTests extends ESTestCase { public void testApplyWithBulkRequest() throws Exception { Task task = mock(Task.class); ThreadPool threadPool = mock(ThreadPool.class); - when(threadPool.executor(any())).thenReturn(Runnable::run); + final ExecutorService executorService = EsExecutors.newDirectExecutorService(); + when(threadPool.executor(any())).thenReturn(executorService); PipelineStore store = mock(PipelineStore.class); Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value2")); diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 86e2a35f1b5..fdbd2d0548a 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -61,7 +61,7 @@ import java.util.Locale; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -364,8 +364,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice private IndicesClusterStateService createIndicesClusterStateService(DiscoveryNode discoveryNode, final Supplier indicesServiceSupplier) { final ThreadPool threadPool = mock(ThreadPool.class); - final Executor executor = mock(Executor.class); - when(threadPool.generic()).thenReturn(executor); + when(threadPool.generic()).thenReturn(mock(ExecutorService.class)); final MockIndicesService indicesService = indicesServiceSupplier.get(); final Settings settings = Settings.builder().put("node.name", discoveryNode.getName()).build(); final TransportService transportService = new TransportService(settings, null, threadPool, diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index b9426b83e66..947cb3f18d1 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.CustomTypeSafeMatcher; @@ -39,6 +40,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -68,7 +70,8 @@ public class PipelineExecutionServiceTests extends ESTestCase { public void setup() { store = mock(PipelineStore.class); ThreadPool threadPool = mock(ThreadPool.class); - when(threadPool.executor(anyString())).thenReturn(Runnable::run); + final ExecutorService executorService = EsExecutors.newDirectExecutorService(); + when(threadPool.executor(anyString())).thenReturn(executorService); executionService = new PipelineExecutionService(store, threadPool); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java index 5c2278f59eb..351f9392656 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java @@ -48,6 +48,7 @@ import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.reindex.ScrollableHitSource.Response; import org.elasticsearch.rest.RestStatus; @@ -64,7 +65,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.net.URL; import java.nio.charset.StandardCharsets; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -90,10 +91,11 @@ public class RemoteScrollableHitSourceTests extends ESTestCase { @Override public void setUp() throws Exception { super.setUp(); + final ExecutorService directExecutor = EsExecutors.newDirectExecutorService(); threadPool = new TestThreadPool(getTestName()) { @Override - public Executor executor(String name) { - return Runnable::run; + public ExecutorService executor(String name) { + return directExecutor; } @Override