diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index de539026e7a..ac3459b78e9 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -20,10 +20,10 @@ package org.elasticsearch.index.shard; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.Assertions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; -import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -198,11 +198,14 @@ final class IndexShardOperationPermits implements Closeable { /** * Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided * {@link ActionListener} will be called on the calling thread. During calls of - * {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided {@link ActionListener} - * will then be called using the provided executor once operations are no longer blocked. + * {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. + * The {@link ActionListener#onResponse(Object)} method will then be called using the provided executor once operations are no + * longer blocked. Note that the executor will not be used for {@link ActionListener#onFailure(Exception)} calls. Those will run + * directly on the calling thread, which in case of delays, will be a generic thread. Callers should thus make sure + * that the {@link ActionListener#onFailure(Exception)} method provided here only contains lightweight operations. * * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed - * @param executorOnDelay executor to use for delayed call + * @param executorOnDelay executor to use for the possibly delayed {@link ActionListener#onResponse(Object)} call * @param forceExecution whether the runnable should force its execution in case it gets rejected */ public void acquire(final ActionListener onAcquired, final String executorOnDelay, final boolean forceExecution) { @@ -217,7 +220,7 @@ final class IndexShardOperationPermits implements Closeable { final Supplier contextSupplier = threadPool.getThreadContext().newRestorableContext(false); if (executorOnDelay != null) { delayedOperations.add( - new ThreadedActionListener<>(logger, threadPool, executorOnDelay, + new PermitAwareThreadedActionListener(threadPool, executorOnDelay, new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution)); } else { delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired)); @@ -269,4 +272,56 @@ final class IndexShardOperationPermits implements Closeable { } } + /** + * A permit-aware action listener wrapper that spawns onResponse listener invocations off on a configurable thread-pool. + * Being permit-aware, it also releases the permit when hitting thread-pool rejections and falls back to the + * invoker's thread to communicate failures. + */ + private static class PermitAwareThreadedActionListener implements ActionListener { + + private final ThreadPool threadPool; + private final String executor; + private final ActionListener listener; + private final boolean forceExecution; + + private PermitAwareThreadedActionListener(ThreadPool threadPool, String executor, ActionListener listener, + boolean forceExecution) { + this.threadPool = threadPool; + this.executor = executor; + this.listener = listener; + this.forceExecution = forceExecution; + } + + @Override + public void onResponse(final Releasable releasable) { + threadPool.executor(executor).execute(new AbstractRunnable() { + @Override + public boolean isForceExecution() { + return forceExecution; + } + + @Override + protected void doRun() throws Exception { + listener.onResponse(releasable); + } + + @Override + public void onRejection(Exception e) { + IOUtils.closeWhileHandlingException(releasable); + super.onRejection(e); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); // will possibly execute on the caller thread + } + }); + } + + @Override + public void onFailure(final Exception e) { + listener.onFailure(e); + } + } + } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index 41dc8f520cc..0f6c8fbee02 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -21,6 +21,9 @@ package org.elasticsearch.index.shard; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -47,6 +50,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; @@ -59,7 +63,18 @@ public class IndexShardOperationPermitsTests extends ESTestCase { @BeforeClass public static void setupThreadPool() { - threadPool = new TestThreadPool("IndexShardOperationsLockTests"); + int bulkThreadPoolSize = randomIntBetween(1, 2); + int bulkThreadPoolQueueSize = randomIntBetween(1, 2); + threadPool = new TestThreadPool("IndexShardOperationsLockTests", + Settings.builder() + .put("thread_pool." + ThreadPool.Names.BULK + ".size", bulkThreadPoolSize) + .put("thread_pool." + ThreadPool.Names.BULK + ".queue_size", bulkThreadPoolQueueSize) + .build()); + assertThat(threadPool.executor(ThreadPool.Names.BULK), instanceOf(EsThreadPoolExecutor.class)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getCorePoolSize(), equalTo(bulkThreadPoolSize)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getMaximumPoolSize(), equalTo(bulkThreadPoolSize)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getQueue().remainingCapacity(), + equalTo(bulkThreadPoolQueueSize)); } @AfterClass @@ -82,33 +97,53 @@ public class IndexShardOperationPermitsTests extends ESTestCase { public void testAllOperationsInvoked() throws InterruptedException, TimeoutException, ExecutionException { int numThreads = 10; + class DummyException extends RuntimeException {} + List> futures = new ArrayList<>(); List operationThreads = new ArrayList<>(); - CountDownLatch latch = new CountDownLatch(numThreads / 2); + CountDownLatch latch = new CountDownLatch(numThreads / 4); + boolean forceExecution = randomBoolean(); for (int i = 0; i < numThreads; i++) { + // the bulk thread pool uses a bounded size and can get rejections, see setupThreadPool + String threadPoolName = randomFrom(ThreadPool.Names.BULK, ThreadPool.Names.GENERIC); + boolean failingListener = randomBoolean(); PlainActionFuture future = new PlainActionFuture() { @Override public void onResponse(Releasable releasable) { releasable.close(); - super.onResponse(releasable); + if (failingListener) { + throw new DummyException(); + } else { + super.onResponse(releasable); + } } }; Thread thread = new Thread() { public void run() { latch.countDown(); - permits.acquire(future, ThreadPool.Names.GENERIC, true); + try { + permits.acquire(future, threadPoolName, forceExecution); + } catch (DummyException dummyException) { + // ok, notify future + assertTrue(failingListener); + future.onFailure(dummyException); + } } }; futures.add(future); operationThreads.add(thread); } + boolean closeAfterBlocking = randomBoolean(); CountDownLatch blockFinished = new CountDownLatch(1); threadPool.generic().execute(() -> { try { latch.await(); blockAndWait().close(); blockFinished.countDown(); + if (closeAfterBlocking) { + permits.close(); + } } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -119,7 +154,16 @@ public class IndexShardOperationPermitsTests extends ESTestCase { } for (PlainActionFuture future : futures) { - assertNotNull(future.get(1, TimeUnit.MINUTES)); + try { + assertNotNull(future.get(1, TimeUnit.MINUTES)); + } catch (ExecutionException e) { + if (closeAfterBlocking) { + assertThat(e.getCause(), either(instanceOf(DummyException.class)).or(instanceOf(EsRejectedExecutionException.class)) + .or(instanceOf(IndexShardClosedException.class))); + } else { + assertThat(e.getCause(), either(instanceOf(DummyException.class)).or(instanceOf(EsRejectedExecutionException.class))); + } + } } for (Thread thread : operationThreads) {