* Bulk requests can be thousands of items large and take more than O(10ms) time to handle => we should not handle them on the transport threadpool to not block select loops * relates #39128 * relates #39658
This commit is contained in:
parent
2523ad03ca
commit
c77e10b16b
|
@ -117,7 +117,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||||
TransportShardBulkAction shardBulkAction, NodeClient client,
|
TransportShardBulkAction shardBulkAction, NodeClient client,
|
||||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||||
AutoCreateIndex autoCreateIndex, LongSupplier relativeTimeProvider) {
|
AutoCreateIndex autoCreateIndex, LongSupplier relativeTimeProvider) {
|
||||||
super(BulkAction.NAME, transportService, actionFilters, (Supplier<BulkRequest>) BulkRequest::new);
|
super(BulkAction.NAME, transportService, actionFilters, (Supplier<BulkRequest>) BulkRequest::new, ThreadPool.Names.WRITE);
|
||||||
Objects.requireNonNull(relativeTimeProvider);
|
Objects.requireNonNull(relativeTimeProvider);
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
this.clusterService = clusterService;
|
this.clusterService = clusterService;
|
||||||
|
@ -258,7 +258,8 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(CreateIndexResponse result) {
|
public void onResponse(CreateIndexResponse result) {
|
||||||
if (counter.decrementAndGet() == 0) {
|
if (counter.decrementAndGet() == 0) {
|
||||||
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
|
threadPool.executor(ThreadPool.Names.WRITE).execute(
|
||||||
|
() -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.action.support;
|
package org.elasticsearch.action.support;
|
||||||
|
|
||||||
import org.apache.logging.log4j.Logger;
|
|
||||||
import org.elasticsearch.action.ActionRequest;
|
import org.elasticsearch.action.ActionRequest;
|
||||||
import org.elasticsearch.action.ActionResponse;
|
import org.elasticsearch.action.ActionResponse;
|
||||||
import org.elasticsearch.common.io.stream.Writeable;
|
import org.elasticsearch.common.io.stream.Writeable;
|
||||||
|
@ -57,6 +56,13 @@ public abstract class HandledTransportAction<Request extends ActionRequest, Resp
|
||||||
new TransportHandler());
|
new TransportHandler());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected HandledTransportAction(String actionName, TransportService transportService, ActionFilters actionFilters,
|
||||||
|
Supplier<Request> request, String executor) {
|
||||||
|
super(actionName, actionFilters, transportService.getTaskManager());
|
||||||
|
transportService.registerRequestHandler(actionName, request, executor, false, true,
|
||||||
|
new TransportHandler());
|
||||||
|
}
|
||||||
|
|
||||||
protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker,
|
protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker,
|
||||||
TransportService transportService, ActionFilters actionFilters,
|
TransportService transportService, ActionFilters actionFilters,
|
||||||
Writeable.Reader<Request> requestReader) {
|
Writeable.Reader<Request> requestReader) {
|
||||||
|
@ -73,9 +79,8 @@ public abstract class HandledTransportAction<Request extends ActionRequest, Resp
|
||||||
|
|
||||||
class TransportHandler implements TransportRequestHandler<Request> {
|
class TransportHandler implements TransportRequestHandler<Request> {
|
||||||
@Override
|
@Override
|
||||||
public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
|
public final void messageReceived(final Request request, final TransportChannel channel, Task task) {
|
||||||
// We already got the task created on the network layer - no need to create it again on the transport layer
|
// We already got the task created on the network layer - no need to create it again on the transport layer
|
||||||
Logger logger = HandledTransportAction.this.logger;
|
|
||||||
execute(task, request, new ChannelActionListener<>(channel, actionName, request));
|
execute(task, request, new ChannelActionListener<>(channel, actionName, request));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
import org.elasticsearch.transport.RemoteTransportException;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -133,9 +134,14 @@ public class BulkProcessorRetryIT extends ESIntegTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Throwable t = (Throwable) response;
|
if (response instanceof RemoteTransportException
|
||||||
// we're not expecting any other errors
|
&& ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS && rejectedExecutionExpected) {
|
||||||
throw new AssertionError("Unexpected failure", t);
|
// ignored, we exceeded the write queue size with dispatching the initial bulk request
|
||||||
|
} else {
|
||||||
|
Throwable t = (Throwable) response;
|
||||||
|
// we're not expecting any other errors
|
||||||
|
throw new AssertionError("Unexpected failure", t);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,20 +30,24 @@ import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||||
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.index.IndexNotFoundException;
|
import org.elasticsearch.index.IndexNotFoundException;
|
||||||
import org.elasticsearch.index.VersionType;
|
import org.elasticsearch.index.VersionType;
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import static java.util.Collections.emptySet;
|
import static java.util.Collections.emptySet;
|
||||||
import static java.util.Collections.singleton;
|
import static java.util.Collections.singleton;
|
||||||
|
import static org.mockito.Matchers.anyString;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -102,7 +106,10 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa
|
||||||
ClusterState state = mock(ClusterState.class);
|
ClusterState state = mock(ClusterState.class);
|
||||||
when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
|
when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
|
||||||
when(clusterService.state()).thenReturn(state);
|
when(clusterService.state()).thenReturn(state);
|
||||||
TransportBulkAction action = new TransportBulkAction(null, mock(TransportService.class), clusterService,
|
final ThreadPool threadPool = mock(ThreadPool.class);
|
||||||
|
final ExecutorService direct = EsExecutors.newDirectExecutorService();
|
||||||
|
when(threadPool.executor(anyString())).thenReturn(direct);
|
||||||
|
TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService,
|
||||||
null, null, null, mock(ActionFilters.class), null, null) {
|
null, null, null, mock(ActionFilters.class), null, null) {
|
||||||
@Override
|
@Override
|
||||||
void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,
|
void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,
|
||||||
|
|
|
@ -45,11 +45,13 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||||
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.index.IndexNotFoundException;
|
import org.elasticsearch.index.IndexNotFoundException;
|
||||||
import org.elasticsearch.index.IndexSettings;
|
import org.elasticsearch.index.IndexSettings;
|
||||||
import org.elasticsearch.ingest.IngestService;
|
import org.elasticsearch.ingest.IngestService;
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportResponseHandler;
|
import org.elasticsearch.transport.TransportResponseHandler;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -61,6 +63,7 @@ import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
@ -68,6 +71,7 @@ import java.util.function.Consumer;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.sameInstance;
|
import static org.hamcrest.Matchers.sameInstance;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyString;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -92,6 +96,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||||
TransportService transportService;
|
TransportService transportService;
|
||||||
ClusterService clusterService;
|
ClusterService clusterService;
|
||||||
IngestService ingestService;
|
IngestService ingestService;
|
||||||
|
ThreadPool threadPool;
|
||||||
|
|
||||||
/** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */
|
/** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */
|
||||||
@Captor
|
@Captor
|
||||||
|
@ -126,7 +131,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||||
boolean indexCreated = true; // If set to false, will be set to true by call to createIndex
|
boolean indexCreated = true; // If set to false, will be set to true by call to createIndex
|
||||||
|
|
||||||
TestTransportBulkAction() {
|
TestTransportBulkAction() {
|
||||||
super(null, transportService, clusterService, ingestService,
|
super(threadPool, transportService, clusterService, ingestService,
|
||||||
null, null, new ActionFilters(Collections.emptySet()), null,
|
null, null, new ActionFilters(Collections.emptySet()), null,
|
||||||
new AutoCreateIndex(
|
new AutoCreateIndex(
|
||||||
SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
|
SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
|
||||||
|
@ -163,6 +168,9 @@ public class TransportBulkActionIngestTests extends ESTestCase {
|
||||||
@Before
|
@Before
|
||||||
public void setupAction() {
|
public void setupAction() {
|
||||||
// initialize captors, which must be members to use @Capture because of generics
|
// initialize captors, which must be members to use @Capture because of generics
|
||||||
|
threadPool = mock(ThreadPool.class);
|
||||||
|
final ExecutorService direct = EsExecutors.newDirectExecutorService();
|
||||||
|
when(threadPool.executor(anyString())).thenReturn(direct);
|
||||||
MockitoAnnotations.initMocks(this);
|
MockitoAnnotations.initMocks(this);
|
||||||
// setup services that will be called by action
|
// setup services that will be called by action
|
||||||
transportService = mock(TransportService.class);
|
transportService = mock(TransportService.class);
|
||||||
|
|
Loading…
Reference in New Issue