Expose executor service interface from thread pool

This commit exposes the executor service interface from thread
pool. This will enable some high-level concurrency primitives that will
make some code cleaner and simpler.

Relates #21608
This commit is contained in:
Jason Tedor 2016-11-17 09:18:49 -05:00 committed by GitHub
parent 6c9ea0877d
commit b08a2e1f31
8 changed files with 86 additions and 29 deletions

View File

@ -25,7 +25,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -75,6 +78,50 @@ public class EsExecutors {
return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder); return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder);
} }
private static final ExecutorService DIRECT_EXECUTOR_SERVICE = new AbstractExecutorService() {
@Override
public void shutdown() {
throw new UnsupportedOperationException();
}
@Override
public List<Runnable> shutdownNow() {
throw new UnsupportedOperationException();
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public void execute(Runnable command) {
command.run();
}
};
/**
* Returns an {@link ExecutorService} that executes submitted tasks on the current thread. This executor service does not support being
* shutdown.
*
* @return an {@link ExecutorService} that executes submitted tasks on the current thread
*/
public static ExecutorService newDirectExecutorService() {
return DIRECT_EXECUTOR_SERVICE;
}
public static String threadName(Settings settings, String ... names) { public static String threadName(Settings settings, String ... names) {
String namePrefix = String namePrefix =
Arrays Arrays

View File

@ -29,7 +29,7 @@ import org.elasticsearch.node.Node;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
/** /**
@ -102,7 +102,7 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
int size = settings.size; int size = settings.size;
int queueSize = settings.queueSize; int queueSize = settings.queueSize;
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name())); final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
Executor executor = EsExecutors.newFixed(name(), size, queueSize, threadFactory, threadContext); final ExecutorService executor = EsExecutors.newFixed(name(), size, queueSize, threadFactory, threadContext);
final ThreadPool.Info info = final ThreadPool.Info info =
new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize)); new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));
return new ThreadPool.ExecutorHolder(executor, info); return new ThreadPool.ExecutorHolder(executor, info);

View File

@ -29,7 +29,7 @@ import org.elasticsearch.node.Node;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -96,7 +96,7 @@ public final class ScalingExecutorBuilder extends ExecutorBuilder<ScalingExecuto
int max = settings.max; int max = settings.max;
final ThreadPool.Info info = new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.SCALING, core, max, keepAlive, null); final ThreadPool.Info info = new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.SCALING, core, max, keepAlive, null);
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name())); final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
final Executor executor = final ExecutorService executor =
EsExecutors.newScaling(name(), core, max, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, threadContext); EsExecutors.newScaling(name(), core, max, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, threadContext);
return new ThreadPool.ExecutorHolder(executor, info); return new ThreadPool.ExecutorHolder(executor, info);
} }

View File

@ -144,7 +144,7 @@ public class ThreadPool extends AbstractComponent implements Closeable {
private final EstimatedTimeThread estimatedTimeThread; private final EstimatedTimeThread estimatedTimeThread;
static final Executor DIRECT_EXECUTOR = command -> command.run(); static final ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService();
private final ThreadContext threadContext; private final ThreadContext threadContext;
@ -278,23 +278,26 @@ public class ThreadPool extends AbstractComponent implements Closeable {
} }
/** /**
* Get the generic executor. This executor's {@link Executor#execute(Runnable)} method will run the Runnable it is given in * Get the generic executor service. This executor service {@link Executor#execute(Runnable)} method will run the {@link Runnable} it
* the {@link ThreadContext} of the thread that queues it. * is given in the {@link ThreadContext} of the thread that queues it.
*/ */
public Executor generic() { public ExecutorService generic() {
return executor(Names.GENERIC); return executor(Names.GENERIC);
} }
/** /**
* Get the executor with the given name. This executor's {@link Executor#execute(Runnable)} method will run the Runnable it is given in * Get the executor service with the given name. This executor service's {@link Executor#execute(Runnable)} method will run the
* the {@link ThreadContext} of the thread that queues it. * {@link Runnable} it is given in the {@link ThreadContext} of the thread that queues it.
*
* @param name the name of the executor service to obtain
* @throws IllegalArgumentException if no executor service with the specified name exists
*/ */
public Executor executor(String name) { public ExecutorService executor(String name) {
Executor executor = executors.get(name).executor(); final ExecutorHolder holder = executors.get(name);
if (executor == null) { if (holder == null) {
throw new IllegalArgumentException("No executor found for [" + name + "]"); throw new IllegalArgumentException("no executor service found for [" + name + "]");
} }
return executor; return holder.executor();
} }
public ScheduledExecutorService scheduler() { public ScheduledExecutorService scheduler() {
@ -515,16 +518,16 @@ public class ThreadPool extends AbstractComponent implements Closeable {
} }
static class ExecutorHolder { static class ExecutorHolder {
private final Executor executor; private final ExecutorService executor;
public final Info info; public final Info info;
ExecutorHolder(Executor executor, Info info) { ExecutorHolder(ExecutorService executor, Info info) {
assert executor instanceof EsThreadPoolExecutor || executor == DIRECT_EXECUTOR; assert executor instanceof EsThreadPoolExecutor || executor == DIRECT_EXECUTOR;
this.executor = executor; this.executor = executor;
this.info = info; this.info = info;
} }
Executor executor() { ExecutorService executor() {
return executor; return executor;
} }
} }

View File

@ -29,13 +29,14 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilterChain; import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.PipelineExecutionService; import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.ingest.PipelineStore; import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.node.service.NodeService; import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -43,6 +44,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before; import org.junit.Before;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer; import java.util.function.Consumer;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -158,7 +160,8 @@ public class IngestActionFilterTests extends ESTestCase {
public void testApplyWithBulkRequest() throws Exception { public void testApplyWithBulkRequest() throws Exception {
Task task = mock(Task.class); Task task = mock(Task.class);
ThreadPool threadPool = mock(ThreadPool.class); ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(any())).thenReturn(Runnable::run); final ExecutorService executorService = EsExecutors.newDirectExecutorService();
when(threadPool.executor(any())).thenReturn(executorService);
PipelineStore store = mock(PipelineStore.class); PipelineStore store = mock(PipelineStore.class);
Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value2")); Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value2"));

View File

@ -61,7 +61,7 @@ import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -364,8 +364,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
private IndicesClusterStateService createIndicesClusterStateService(DiscoveryNode discoveryNode, private IndicesClusterStateService createIndicesClusterStateService(DiscoveryNode discoveryNode,
final Supplier<MockIndicesService> indicesServiceSupplier) { final Supplier<MockIndicesService> indicesServiceSupplier) {
final ThreadPool threadPool = mock(ThreadPool.class); final ThreadPool threadPool = mock(ThreadPool.class);
final Executor executor = mock(Executor.class); when(threadPool.generic()).thenReturn(mock(ExecutorService.class));
when(threadPool.generic()).thenReturn(executor);
final MockIndicesService indicesService = indicesServiceSupplier.get(); final MockIndicesService indicesService = indicesServiceSupplier.get();
final Settings settings = Settings.builder().put("node.name", discoveryNode.getName()).build(); final Settings settings = Settings.builder().put("node.name", discoveryNode.getName()).build();
final TransportService transportService = new TransportService(settings, null, threadPool, final TransportService transportService = new TransportService(settings, null, threadPool,

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.CustomTypeSafeMatcher; import org.hamcrest.CustomTypeSafeMatcher;
@ -39,6 +40,7 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -68,7 +70,8 @@ public class PipelineExecutionServiceTests extends ESTestCase {
public void setup() { public void setup() {
store = mock(PipelineStore.class); store = mock(PipelineStore.class);
ThreadPool threadPool = mock(ThreadPool.class); ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(anyString())).thenReturn(Runnable::run); final ExecutorService executorService = EsExecutors.newDirectExecutorService();
when(threadPool.executor(anyString())).thenReturn(executorService);
executionService = new PipelineExecutionService(store, threadPool); executionService = new PipelineExecutionService(store, threadPool);
} }

View File

@ -48,6 +48,7 @@ import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.reindex.ScrollableHitSource.Response; import org.elasticsearch.index.reindex.ScrollableHitSource.Response;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
@ -64,7 +65,7 @@ import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.net.URL; import java.net.URL;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -90,10 +91,11 @@ public class RemoteScrollableHitSourceTests extends ESTestCase {
@Override @Override
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
final ExecutorService directExecutor = EsExecutors.newDirectExecutorService();
threadPool = new TestThreadPool(getTestName()) { threadPool = new TestThreadPool(getTestName()) {
@Override @Override
public Executor executor(String name) { public ExecutorService executor(String name) {
return Runnable::run; return directExecutor;
} }
@Override @Override