From 9ed32f715fc327ad568ed092e5533e050ae229bf Mon Sep 17 00:00:00 2001 From: javanna Date: Tue, 9 Feb 2016 16:59:43 +0100 Subject: [PATCH] Ingest: use bulk thread pool for bulk request processing (was index before) Closes #16503 --- .../action/ingest/IngestActionFilter.java | 4 +-- .../ingest/PipelineExecutionService.java | 10 +++---- .../ingest/IngestActionFilterTests.java | 14 +++++----- .../ingest/PipelineExecutionServiceTests.java | 26 +++++++++---------- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java b/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java index 3ec0af0e845..0366e10d750 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java @@ -88,7 +88,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio void processIndexRequest(Task task, String action, ActionListener listener, ActionFilterChain chain, IndexRequest indexRequest) { - executionService.execute(indexRequest, t -> { + executionService.executeIndexRequest(indexRequest, t -> { logger.error("failed to execute pipeline [{}]", t, indexRequest.getPipeline()); listener.onFailure(t); }, success -> { @@ -102,7 +102,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio void processBulkIndexRequest(Task task, BulkRequest original, String action, ActionFilterChain chain, ActionListener listener) { BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); - executionService.execute(() -> bulkRequestModifier, (indexRequest, throwable) -> { + executionService.executeBulkRequest(() -> bulkRequestModifier, (indexRequest, throwable) -> { logger.debug("failed to execute pipeline [{}] for document [{}/{}/{}]", indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id(), throwable); bulkRequestModifier.markCurrentItemAsFailed(throwable); }, (throwable) -> { diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java index c6a3b4b843d..3f0de550782 100644 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java @@ -41,7 +41,7 @@ public class PipelineExecutionService { this.threadPool = threadPool; } - public void execute(IndexRequest request, Consumer failureHandler, Consumer completionHandler) { + public void executeIndexRequest(IndexRequest request, Consumer failureHandler, Consumer completionHandler) { Pipeline pipeline = getPipeline(request.getPipeline()); threadPool.executor(ThreadPool.Names.INDEX).execute(new AbstractRunnable() { @@ -58,10 +58,10 @@ public class PipelineExecutionService { }); } - public void execute(Iterable> actionRequests, - BiConsumer itemFailureHandler, - Consumer completionHandler) { - threadPool.executor(ThreadPool.Names.INDEX).execute(new AbstractRunnable() { + public void executeBulkRequest(Iterable> actionRequests, + BiConsumer itemFailureHandler, + Consumer completionHandler) { + threadPool.executor(ThreadPool.Names.BULK).execute(new AbstractRunnable() { @Override public void onFailure(Throwable t) { diff --git a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java index e1ffe94e63d..5d7dd1c0ea6 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java @@ -106,7 +106,7 @@ public class IngestActionFilterTests extends ESTestCase { filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain); - verify(executionService).execute(same(indexRequest), any(Consumer.class), any(Consumer.class)); + verify(executionService).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class)); verifyZeroInteractions(actionFilterChain); } @@ -124,10 +124,10 @@ public class IngestActionFilterTests extends ESTestCase { listener.accept(true); return null; }; - doAnswer(answer).when(executionService).execute(any(IndexRequest.class), any(Consumer.class), any(Consumer.class)); + doAnswer(answer).when(executionService).executeIndexRequest(any(IndexRequest.class), any(Consumer.class), any(Consumer.class)); filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain); - verify(executionService).execute(same(indexRequest), any(Consumer.class), any(Consumer.class)); + verify(executionService).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class)); verify(actionFilterChain).proceed(task, IndexAction.NAME, indexRequest, actionListener); verifyZeroInteractions(actionListener); } @@ -146,10 +146,10 @@ public class IngestActionFilterTests extends ESTestCase { handler.accept(exception); return null; }; - doAnswer(answer).when(executionService).execute(same(indexRequest), any(Consumer.class), any(Consumer.class)); + doAnswer(answer).when(executionService).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class)); filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain); - verify(executionService).execute(same(indexRequest), any(Consumer.class), any(Consumer.class)); + verify(executionService).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class)); verify(actionListener).onFailure(exception); verifyZeroInteractions(actionFilterChain); } @@ -233,7 +233,7 @@ public class IngestActionFilterTests extends ESTestCase { listener.accept(true); return null; }; - doAnswer(answer).when(executionService).execute(any(IndexRequest.class), any(Consumer.class), any(Consumer.class)); + doAnswer(answer).when(executionService).executeIndexRequest(any(IndexRequest.class), any(Consumer.class), any(Consumer.class)); Task task = mock(Task.class); ActionListener actionListener = mock(ActionListener.class); @@ -243,7 +243,7 @@ public class IngestActionFilterTests extends ESTestCase { filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain); assertThat(indexRequest.getPipeline(), nullValue()); filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain); - verify(executionService, times(1)).execute(same(indexRequest), any(Consumer.class), any(Consumer.class)); + verify(executionService, times(1)).executeIndexRequest(same(indexRequest), any(Consumer.class), any(Consumer.class)); verify(actionFilterChain, times(2)).proceed(task, IndexAction.NAME, indexRequest, actionListener); } } diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index 9126a513e6e..e644df2a83a 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -77,7 +77,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); try { - executionService.execute(indexRequest, failureHandler, completionHandler); + executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); fail("IllegalArgumentException expected"); } catch (IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("pipeline with id [_id] does not exist")); @@ -99,7 +99,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); - executionService.execute(bulkRequest.requests(), failureHandler, completionHandler); + executionService.executeBulkRequest(bulkRequest.requests(), failureHandler, completionHandler); verify(failureHandler, times(1)).accept( argThat(new CustomTypeSafeMatcher("failure handler was not called with the expected arguments") { @Override @@ -127,7 +127,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { Consumer failureHandler = mock(Consumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); - executionService.execute(indexRequest, failureHandler, completionHandler); + executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); verify(failureHandler, never()).accept(any()); verify(completionHandler, times(1)).accept(true); } @@ -153,7 +153,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { Consumer failureHandler = mock(Consumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); - executionService.execute(indexRequest, failureHandler, completionHandler); + executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); verify(processor).execute(any()); verify(failureHandler, never()).accept(any()); verify(completionHandler, times(1)).accept(true); @@ -176,7 +176,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { Consumer failureHandler = mock(Consumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); - executionService.execute(indexRequest, failureHandler, completionHandler); + executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); verify(failureHandler, times(1)).accept(any(RuntimeException.class)); verify(completionHandler, never()).accept(anyBoolean()); @@ -193,7 +193,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { Consumer failureHandler = mock(Consumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); - executionService.execute(indexRequest, failureHandler, completionHandler); + executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); verify(failureHandler, never()).accept(any(RuntimeException.class)); verify(completionHandler, times(1)).accept(true); } @@ -210,7 +210,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { Consumer failureHandler = mock(Consumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); - executionService.execute(indexRequest, failureHandler, completionHandler); + executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); verify(failureHandler, times(1)).accept(any(RuntimeException.class)); verify(completionHandler, never()).accept(anyBoolean()); @@ -231,7 +231,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { Consumer failureHandler = mock(Consumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); - executionService.execute(indexRequest, failureHandler, completionHandler); + executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); verify(failureHandler, times(1)).accept(any(RuntimeException.class)); verify(completionHandler, never()).accept(anyBoolean()); @@ -246,7 +246,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { Consumer failureHandler = mock(Consumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); - executionService.execute(indexRequest, failureHandler, completionHandler); + executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); assertThat(indexRequest.ttl(), equalTo(TimeValue.parseTimeValue("5d", null, "ttl"))); verify(failureHandler, never()).accept(any()); @@ -262,7 +262,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { Consumer failureHandler = mock(Consumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); - executionService.execute(indexRequest, failureHandler, completionHandler); + executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); verify(failureHandler, times(1)).accept(any(ElasticsearchParseException.class)); verify(completionHandler, never()).accept(anyBoolean()); } @@ -275,7 +275,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { .ttl(1000L); Consumer failureHandler = mock(Consumer.class); Consumer completionHandler = mock(Consumer.class); - executionService.execute(indexRequest, failureHandler, completionHandler); + executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); assertThat(indexRequest.ttl(), equalTo(new TimeValue(1000L))); verify(failureHandler, never()).accept(any()); @@ -312,7 +312,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { BiConsumer requestItemErrorHandler = mock(BiConsumer.class); Consumer completionHandler = mock(Consumer.class); - executionService.execute(bulkRequest.requests(), requestItemErrorHandler, completionHandler); + executionService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler); verify(requestItemErrorHandler, times(numIndexRequests)).accept(any(IndexRequest.class), eq(error)); verify(completionHandler, times(1)).accept(null); @@ -335,7 +335,7 @@ public class PipelineExecutionServiceTests extends ESTestCase { BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); - executionService.execute(bulkRequest.requests(), requestItemErrorHandler, completionHandler); + executionService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler); verify(requestItemErrorHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(null);