Merge pull request #16539 from javanna/enhancement/ingest_tp
Ingest: use bulk thread pool for bulk request processing (was index before)
This commit is contained in:
commit
09c32fb1c9
|
@ -88,7 +88,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
|
|||
|
||||
void processIndexRequest(Task task, String action, ActionListener listener, ActionFilterChain chain, IndexRequest indexRequest) {
|
||||
|
||||
executionService.execute(indexRequest, t -> {
|
||||
executionService.executeIndexRequest(indexRequest, t -> {
|
||||
logger.error("failed to execute pipeline [{}]", t, indexRequest.getPipeline());
|
||||
listener.onFailure(t);
|
||||
}, success -> {
|
||||
|
@ -102,7 +102,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
|
|||
|
||||
void processBulkIndexRequest(Task task, BulkRequest original, String action, ActionFilterChain chain, ActionListener<BulkResponse> listener) {
|
||||
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
|
||||
executionService.execute(() -> bulkRequestModifier, (indexRequest, throwable) -> {
|
||||
executionService.executeBulkRequest(() -> bulkRequestModifier, (indexRequest, throwable) -> {
|
||||
logger.debug("failed to execute pipeline [{}] for document [{}/{}/{}]", indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id(), throwable);
|
||||
bulkRequestModifier.markCurrentItemAsFailed(throwable);
|
||||
}, (throwable) -> {
|
||||
|
|
|
@ -41,7 +41,7 @@ public class PipelineExecutionService {
|
|||
this.threadPool = threadPool;
|
||||
}
|
||||
|
||||
public void execute(IndexRequest request, Consumer<Throwable> failureHandler, Consumer<Boolean> completionHandler) {
|
||||
public void executeIndexRequest(IndexRequest request, Consumer<Throwable> failureHandler, Consumer<Boolean> completionHandler) {
|
||||
Pipeline pipeline = getPipeline(request.getPipeline());
|
||||
threadPool.executor(ThreadPool.Names.INDEX).execute(new AbstractRunnable() {
|
||||
|
||||
|
@ -58,10 +58,10 @@ public class PipelineExecutionService {
|
|||
});
|
||||
}
|
||||
|
||||
public void execute(Iterable<ActionRequest<?>> actionRequests,
|
||||
BiConsumer<IndexRequest, Throwable> itemFailureHandler,
|
||||
Consumer<Throwable> completionHandler) {
|
||||
threadPool.executor(ThreadPool.Names.INDEX).execute(new AbstractRunnable() {
|
||||
public void executeBulkRequest(Iterable<ActionRequest<?>> actionRequests,
|
||||
BiConsumer<IndexRequest, Throwable> itemFailureHandler,
|
||||
Consumer<Throwable> completionHandler) {
|
||||
threadPool.executor(ThreadPool.Names.BULK).execute(new AbstractRunnable() {
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
|
|
|
@ -106,7 +106,7 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
|
||||
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(executionService).execute(same(indexRequest), any(Consumer.class), any(Consumer.class));
|
||||
verify(executionService).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class));
|
||||
verifyZeroInteractions(actionFilterChain);
|
||||
}
|
||||
|
||||
|
@ -124,10 +124,10 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
listener.accept(true);
|
||||
return null;
|
||||
};
|
||||
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), any(Consumer.class), any(Consumer.class));
|
||||
doAnswer(answer).when(executionService).executeIndexRequest(any(IndexRequest.class), any(Consumer.class), any(Consumer.class));
|
||||
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(executionService).execute(same(indexRequest), any(Consumer.class), any(Consumer.class));
|
||||
verify(executionService).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class));
|
||||
verify(actionFilterChain).proceed(task, IndexAction.NAME, indexRequest, actionListener);
|
||||
verifyZeroInteractions(actionListener);
|
||||
}
|
||||
|
@ -146,10 +146,10 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
handler.accept(exception);
|
||||
return null;
|
||||
};
|
||||
doAnswer(answer).when(executionService).execute(same(indexRequest), any(Consumer.class), any(Consumer.class));
|
||||
doAnswer(answer).when(executionService).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class));
|
||||
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(executionService).execute(same(indexRequest), any(Consumer.class), any(Consumer.class));
|
||||
verify(executionService).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class));
|
||||
verify(actionListener).onFailure(exception);
|
||||
verifyZeroInteractions(actionFilterChain);
|
||||
}
|
||||
|
@ -233,7 +233,7 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
listener.accept(true);
|
||||
return null;
|
||||
};
|
||||
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), any(Consumer.class), any(Consumer.class));
|
||||
doAnswer(answer).when(executionService).executeIndexRequest(any(IndexRequest.class), any(Consumer.class), any(Consumer.class));
|
||||
|
||||
Task task = mock(Task.class);
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
|
@ -243,7 +243,7 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
|
||||
assertThat(indexRequest.getPipeline(), nullValue());
|
||||
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
|
||||
verify(executionService, times(1)).execute(same(indexRequest), any(Consumer.class), any(Consumer.class));
|
||||
verify(executionService, times(1)).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class));
|
||||
verify(actionFilterChain, times(2)).proceed(task, IndexAction.NAME, indexRequest, actionListener);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
@SuppressWarnings("unchecked")
|
||||
Consumer<Boolean> completionHandler = mock(Consumer.class);
|
||||
try {
|
||||
executionService.execute(indexRequest, failureHandler, completionHandler);
|
||||
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
|
||||
fail("IllegalArgumentException expected");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("pipeline with id [_id] does not exist"));
|
||||
|
@ -99,7 +99,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
BiConsumer<IndexRequest, Throwable> failureHandler = mock(BiConsumer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
Consumer<Throwable> completionHandler = mock(Consumer.class);
|
||||
executionService.execute(bulkRequest.requests(), failureHandler, completionHandler);
|
||||
executionService.executeBulkRequest(bulkRequest.requests(), failureHandler, completionHandler);
|
||||
verify(failureHandler, times(1)).accept(
|
||||
argThat(new CustomTypeSafeMatcher<IndexRequest>("failure handler was not called with the expected arguments") {
|
||||
@Override
|
||||
|
@ -127,7 +127,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
Consumer<Throwable> failureHandler = mock(Consumer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
Consumer<Boolean> completionHandler = mock(Consumer.class);
|
||||
executionService.execute(indexRequest, failureHandler, completionHandler);
|
||||
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
|
||||
verify(failureHandler, never()).accept(any());
|
||||
verify(completionHandler, times(1)).accept(true);
|
||||
}
|
||||
|
@ -153,7 +153,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
Consumer<Throwable> failureHandler = mock(Consumer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
Consumer<Boolean> completionHandler = mock(Consumer.class);
|
||||
executionService.execute(indexRequest, failureHandler, completionHandler);
|
||||
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
|
||||
verify(processor).execute(any());
|
||||
verify(failureHandler, never()).accept(any());
|
||||
verify(completionHandler, times(1)).accept(true);
|
||||
|
@ -176,7 +176,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
Consumer<Throwable> failureHandler = mock(Consumer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
Consumer<Boolean> completionHandler = mock(Consumer.class);
|
||||
executionService.execute(indexRequest, failureHandler, completionHandler);
|
||||
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
|
||||
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
|
||||
verify(failureHandler, times(1)).accept(any(RuntimeException.class));
|
||||
verify(completionHandler, never()).accept(anyBoolean());
|
||||
|
@ -193,7 +193,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
Consumer<Throwable> failureHandler = mock(Consumer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
Consumer<Boolean> completionHandler = mock(Consumer.class);
|
||||
executionService.execute(indexRequest, failureHandler, completionHandler);
|
||||
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
|
||||
verify(failureHandler, never()).accept(any(RuntimeException.class));
|
||||
verify(completionHandler, times(1)).accept(true);
|
||||
}
|
||||
|
@ -210,7 +210,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
Consumer<Throwable> failureHandler = mock(Consumer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
Consumer<Boolean> completionHandler = mock(Consumer.class);
|
||||
executionService.execute(indexRequest, failureHandler, completionHandler);
|
||||
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
|
||||
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
|
||||
verify(failureHandler, times(1)).accept(any(RuntimeException.class));
|
||||
verify(completionHandler, never()).accept(anyBoolean());
|
||||
|
@ -231,7 +231,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
Consumer<Throwable> failureHandler = mock(Consumer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
Consumer<Boolean> completionHandler = mock(Consumer.class);
|
||||
executionService.execute(indexRequest, failureHandler, completionHandler);
|
||||
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
|
||||
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
|
||||
verify(failureHandler, times(1)).accept(any(RuntimeException.class));
|
||||
verify(completionHandler, never()).accept(anyBoolean());
|
||||
|
@ -246,7 +246,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
Consumer<Throwable> failureHandler = mock(Consumer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
Consumer<Boolean> completionHandler = mock(Consumer.class);
|
||||
executionService.execute(indexRequest, failureHandler, completionHandler);
|
||||
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
|
||||
|
||||
assertThat(indexRequest.ttl(), equalTo(TimeValue.parseTimeValue("5d", null, "ttl")));
|
||||
verify(failureHandler, never()).accept(any());
|
||||
|
@ -262,7 +262,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
Consumer<Throwable> failureHandler = mock(Consumer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
Consumer<Boolean> completionHandler = mock(Consumer.class);
|
||||
executionService.execute(indexRequest, failureHandler, completionHandler);
|
||||
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
|
||||
verify(failureHandler, times(1)).accept(any(ElasticsearchParseException.class));
|
||||
verify(completionHandler, never()).accept(anyBoolean());
|
||||
}
|
||||
|
@ -275,7 +275,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
.ttl(1000L);
|
||||
Consumer<Throwable> failureHandler = mock(Consumer.class);
|
||||
Consumer<Boolean> completionHandler = mock(Consumer.class);
|
||||
executionService.execute(indexRequest, failureHandler, completionHandler);
|
||||
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
|
||||
|
||||
assertThat(indexRequest.ttl(), equalTo(new TimeValue(1000L)));
|
||||
verify(failureHandler, never()).accept(any());
|
||||
|
@ -312,7 +312,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
|
||||
BiConsumer<IndexRequest, Throwable> requestItemErrorHandler = mock(BiConsumer.class);
|
||||
Consumer<Throwable> completionHandler = mock(Consumer.class);
|
||||
executionService.execute(bulkRequest.requests(), requestItemErrorHandler, completionHandler);
|
||||
executionService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler);
|
||||
|
||||
verify(requestItemErrorHandler, times(numIndexRequests)).accept(any(IndexRequest.class), eq(error));
|
||||
verify(completionHandler, times(1)).accept(null);
|
||||
|
@ -335,7 +335,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
|||
BiConsumer<IndexRequest, Throwable> requestItemErrorHandler = mock(BiConsumer.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
Consumer<Throwable> completionHandler = mock(Consumer.class);
|
||||
executionService.execute(bulkRequest.requests(), requestItemErrorHandler, completionHandler);
|
||||
executionService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler);
|
||||
|
||||
verify(requestItemErrorHandler, never()).accept(any(), any());
|
||||
verify(completionHandler, times(1)).accept(null);
|
||||
|
|
Loading…
Reference in New Issue