diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 9adc92e02be..19ee2efeab6 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -117,7 +117,7 @@ public class TransportBulkAction extends HandledTransportAction) BulkRequest::new); + super(BulkAction.NAME, transportService, actionFilters, (Supplier) BulkRequest::new, ThreadPool.Names.WRITE); Objects.requireNonNull(relativeTimeProvider); this.threadPool = threadPool; this.clusterService = clusterService; @@ -258,7 +258,8 @@ public class TransportBulkAction extends HandledTransportAction executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated)); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java b/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java index c0bc0af8399..ca10583ce24 100644 --- a/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.action.support; -import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.Writeable; @@ -57,6 +56,13 @@ public abstract class HandledTransportAction request, String executor) { + super(actionName, actionFilters, transportService.getTaskManager()); + transportService.registerRequestHandler(actionName, request, executor, false, true, + new TransportHandler()); + } + protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker, TransportService transportService, ActionFilters actionFilters, Writeable.Reader requestReader) { @@ -73,9 +79,8 @@ public abstract class HandledTransportAction { @Override - public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception { + public final void messageReceived(final Request request, final TransportChannel channel, Task task) { // We already got the task created on the network layer - no need to create it again on the transport layer - Logger logger = HandledTransportAction.this.logger; execute(task, request, new ChannelActionListener<>(channel, actionName, request)); } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java index 515c539a884..e4b6fff9fc3 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.transport.RemoteTransportException; import java.util.Collections; import java.util.Iterator; @@ -133,9 +134,14 @@ public class BulkProcessorRetryIT extends ESIntegTestCase { } } } else { - Throwable t = (Throwable) response; - // we're not expecting any other errors - throw new AssertionError("Unexpected failure", t); + if (response instanceof RemoteTransportException + && ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS && rejectedExecutionExpected) { + // ignored, we exceeded the write queue size with dispatching the initial bulk request + } else { + Throwable t = (Throwable) response; + // we're not expecting any other errors + throw new AssertionError("Unexpected failure", t); + } } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index 10014c6fb3f..f213b523fbf 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -30,20 +30,24 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.VersionType; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.util.Arrays; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.function.Function; import static java.util.Collections.emptySet; import static java.util.Collections.singleton; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -102,7 +106,10 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa ClusterState state = mock(ClusterState.class); when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA); when(clusterService.state()).thenReturn(state); - TransportBulkAction action = new TransportBulkAction(null, mock(TransportService.class), clusterService, + final ThreadPool threadPool = mock(ThreadPool.class); + final ExecutorService direct = EsExecutors.newDirectExecutorService(); + when(threadPool.executor(anyString())).thenReturn(direct); + TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService, null, null, null, mock(ActionFilters.class), null, null) { @Override void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index b3ecc590767..b570ec8f781 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -45,11 +45,13 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.junit.Before; @@ -61,6 +63,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -68,6 +71,7 @@ import java.util.function.Consumer; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -92,6 +96,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { TransportService transportService; ClusterService clusterService; IngestService ingestService; + ThreadPool threadPool; /** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */ @Captor @@ -126,7 +131,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { boolean indexCreated = true; // If set to false, will be set to true by call to createIndex TestTransportBulkAction() { - super(null, transportService, clusterService, ingestService, + super(threadPool, transportService, clusterService, ingestService, null, null, new ActionFilters(Collections.emptySet()), null, new AutoCreateIndex( SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), @@ -163,6 +168,9 @@ public class TransportBulkActionIngestTests extends ESTestCase { @Before public void setupAction() { // initialize captors, which must be members to use @Capture because of generics + threadPool = mock(ThreadPool.class); + final ExecutorService direct = EsExecutors.newDirectExecutorService(); + when(threadPool.executor(anyString())).thenReturn(direct); MockitoAnnotations.initMocks(this); // setup services that will be called by action transportService = mock(TransportService.class);