From b4baa6c7ab5cc993c5f78ebb24e77c84b3d31055 Mon Sep 17 00:00:00 2001 From: javanna Date: Mon, 11 Jan 2016 17:30:58 +0100 Subject: [PATCH] remove use of already processed header in favour of resetting the pipeline id to null --- .../action/ingest/IngestActionFilter.java | 15 ++++-------- .../ingest/PipelineExecutionService.java | 2 ++ .../ingest/IngestActionFilterTests.java | 24 +++++++++++++++++++ 3 files changed, 31 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java b/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java index 81ae2ccc9c4..c9467b07a05 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java @@ -46,8 +46,6 @@ import java.util.Set; public final class IngestActionFilter extends AbstractComponent implements ActionFilter { - static final String PIPELINE_ALREADY_PROCESSED = "ingest_already_processed"; - private final PipelineExecutionService executionService; @Inject @@ -96,18 +94,15 @@ public final class IngestActionFilter extends AbstractComponent implements Actio } void processIndexRequest(Task task, String action, ActionListener listener, ActionFilterChain chain, IndexRequest indexRequest) { - // The IndexRequest has the same type on the node that receives the request and the node that - // processes the primary action. This could lead to a pipeline being executed twice for the same - // index request, hence this check - if (indexRequest.hasHeader(PIPELINE_ALREADY_PROCESSED)) { - chain.proceed(task, action, indexRequest, listener); - return; - } + executionService.execute(indexRequest, t -> { logger.error("failed to execute pipeline [{}]", t, indexRequest.pipeline()); listener.onFailure(t); }, success -> { - indexRequest.putHeader(PIPELINE_ALREADY_PROCESSED, true); + // TransportIndexAction uses IndexRequest and same action name on the node that receives the request and the node that + // processes the primary action. This could lead to a pipeline being executed twice for the same + // index request, hence we set the pipeline to null once its execution completed. + indexRequest.pipeline(null); chain.proceed(task, action, indexRequest, listener); }); } diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java index 29435769e6b..5553374880e 100644 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java @@ -61,6 +61,8 @@ public class PipelineExecutionService { if (Strings.hasText(indexRequest.pipeline())) { try { innerExecute(indexRequest, getPipeline(indexRequest.pipeline())); + //this shouldn't be needed here but we do it for consistency with index api which requires it to prevent double execution + indexRequest.pipeline(null); } catch (Throwable e) { itemFailureHandler.accept(new Tuple<>(indexRequest, e)); } diff --git a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java index 344770b2bba..91c6765520c 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java @@ -45,10 +45,12 @@ import org.mockito.stubbing.Answer; import java.util.function.Consumer; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.same; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; @@ -212,4 +214,26 @@ public class IngestActionFilterTests extends ESTestCase { assertThat(assertedRequests, equalTo(numRequest)); }); } + + @SuppressWarnings("unchecked") + public void testIndexApiSinglePipelineExecution() { + Answer answer = invocationOnMock -> { + @SuppressWarnings("unchecked") + Consumer listener = (Consumer) invocationOnMock.getArguments()[2]; + listener.accept(true); + return null; + }; + doAnswer(answer).when(executionService).execute(any(IndexRequest.class), any(Consumer.class), any(Consumer.class)); + + Task task = mock(Task.class); + ActionListener actionListener = mock(ActionListener.class); + ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); + + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").pipeline("_id").source("field", "value"); + filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain); + assertThat(indexRequest.pipeline(), nullValue()); + filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain); + verify(executionService, times(1)).execute(same(indexRequest), any(Consumer.class), any(Consumer.class)); + verify(actionFilterChain, times(2)).proceed(task, IndexAction.NAME, indexRequest, actionListener); + } }