From f96dccd1ecce0f6e1b882a1e336d34c6168dbd0a Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 10 Nov 2020 12:16:40 -0700 Subject: [PATCH] Propogate rejected execution during bulk actions (#64886) Currently a rejected execution exception can be swallowed when async actions return during transport bulk actions. This includes scenarios where we went async to perform ingest pipelines or index creation. This commit resolves the issue by propagating a rejected exception. --- .../action/bulk/TransportBulkAction.java | 36 ++++++---- .../action/bulk/TransportBulkActionTests.java | 25 ++++++- .../threadpool/TestThreadPool.java | 70 +++++++++++++++++++ 3 files changed, 116 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index c99ffacbf97..555528fdd62 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -58,7 +58,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; @@ -271,8 +270,13 @@ public class TransportBulkAction extends HandledTransportAction executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated)); + threadPool.executor(executorName).execute(new ActionRunnable(listener) { + + @Override + protected void doRun() { + executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); + } + }); } } @@ -288,11 +292,22 @@ public class TransportBulkAction extends HandledTransportAction executeBulk(task, bulkRequest, startTime, - ActionListener.wrap(listener::onResponse, inner -> { + final ActionListener wrappedListener = ActionListener.wrap(listener::onResponse, inner -> { inner.addSuppressed(e); listener.onFailure(inner); - }), responses, indicesThatCannotBeCreated)); + }); + threadPool.executor(executorName).execute(new ActionRunnable(wrappedListener) { + @Override + protected void doRun() { + executeBulk(task, bulkRequest, startTime, wrappedListener, responses, indicesThatCannotBeCreated); + } + + @Override + public void onRejection(Exception rejectedException) { + rejectedException.addSuppressed(e); + super.onRejection(rejectedException); + } + }); } } }); @@ -731,14 +746,9 @@ public class TransportBulkAction extends HandledTransportAction(listener) { @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - - @Override - protected void doRun() throws Exception { + protected void doRun() { doInternalExecute(task, bulkRequest, executorName, actionListener); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 3e77001892d..d4d8938f3a8 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; @@ -42,6 +43,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.VersionType; @@ -77,12 +79,13 @@ public class TransportBulkActionTests extends ESTestCase { /** Services needed by bulk action */ private TransportService transportService; private ClusterService clusterService; - private ThreadPool threadPool; + private TestThreadPool threadPool; private TestTransportBulkAction bulkAction; class TestTransportBulkAction extends TransportBulkAction { + volatile boolean failIndexCreation = false; boolean indexCreated = false; // set when the "real" index is created TestTransportBulkAction() { @@ -100,7 +103,11 @@ public class TransportBulkActionTests extends ESTestCase { @Override void createIndex(String index, TimeValue timeout, Version minNodeVersion, ActionListener listener) { indexCreated = true; - listener.onResponse(null); + if (failIndexCreation) { + listener.onFailure(new ResourceAlreadyExistsException("index already exists")); + } else { + listener.onResponse(null); + } } } @@ -293,6 +300,20 @@ public class TransportBulkActionTests extends ESTestCase { assertTrue(bulkAction.includesSystem(buildBulkRequest(mixed), indicesLookup, systemIndices)); } + public void testRejectionAfterCreateIndexIsPropagated() throws Exception { + BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap())); + bulkAction.failIndexCreation = randomBoolean(); + + try { + threadPool.startForcingRejections(); + PlainActionFuture future = PlainActionFuture.newFuture(); + ActionTestUtils.execute(bulkAction, null, bulkRequest, future); + expectThrows(EsRejectedExecutionException.class, future::actionGet); + } finally { + threadPool.stopForcingRejections(); + } + } + private BulkRequest buildBulkRequest(List indices) { BulkRequest request = new BulkRequest(); for (String index : indices) { diff --git a/test/framework/src/main/java/org/elasticsearch/threadpool/TestThreadPool.java b/test/framework/src/main/java/org/elasticsearch/threadpool/TestThreadPool.java index 03fcc890596..ee1b757508b 100644 --- a/test/framework/src/main/java/org/elasticsearch/threadpool/TestThreadPool.java +++ b/test/framework/src/main/java/org/elasticsearch/threadpool/TestThreadPool.java @@ -20,10 +20,20 @@ package org.elasticsearch.threadpool; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.node.Node; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; + public class TestThreadPool extends ThreadPool { + private final CountDownLatch blockingLatch = new CountDownLatch(1); + private volatile boolean returnRejectingExecutor = false; + private volatile ThreadPoolExecutor rejectingExecutor; + public TestThreadPool(String name, ExecutorBuilder... customBuilders) { this(name, Settings.EMPTY, customBuilders); } @@ -32,4 +42,64 @@ public class TestThreadPool extends ThreadPool { super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).put(settings).build(), customBuilders); } + @Override + public ExecutorService executor(String name) { + if (returnRejectingExecutor) { + return rejectingExecutor; + } else { + return super.executor(name); + } + } + + public void startForcingRejections() { + if (rejectingExecutor == null) { + createRejectingExecutor(); + } + returnRejectingExecutor = true; + } + + public void stopForcingRejections() { + returnRejectingExecutor = false; + } + + @Override + public void shutdown() { + blockingLatch.countDown(); + if (rejectingExecutor != null) { + rejectingExecutor.shutdown(); + } + super.shutdown(); + } + + @Override + public void shutdownNow() { + blockingLatch.countDown(); + if (rejectingExecutor != null) { + rejectingExecutor.shutdownNow(); + } + super.shutdownNow(); + } + + private synchronized void createRejectingExecutor() { + if (rejectingExecutor != null) { + return; + } + ThreadFactory factory = EsExecutors.daemonThreadFactory("reject_thread"); + rejectingExecutor = EsExecutors.newFixed("rejecting", 1, 0, factory, getThreadContext()); + + CountDownLatch startedLatch = new CountDownLatch(1); + rejectingExecutor.execute(() -> { + try { + startedLatch.countDown(); + blockingLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + try { + startedLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } }