From b3ad3f35fa70cb6adb6f7a7d0b447c96224b443c Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 8 Oct 2015 13:27:31 +0200 Subject: [PATCH] prevent IndexRequest from being processed multipel times --- .../plugin/ingest/IngestPlugin.java | 3 ++- .../plugin/ingest/rest/IngestRestFilter.java | 4 ++-- .../ingest/transport/IngestActionFilter.java | 11 ++++++++++- .../transport/IngestActionFilterTests.java | 16 +++++++++++++++- 4 files changed, 29 insertions(+), 5 deletions(-) diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java index 366eea29ae6..54fa0ead010 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java @@ -36,8 +36,9 @@ import static org.elasticsearch.common.settings.Settings.settingsBuilder; public class IngestPlugin extends Plugin { - public static final String INGEST_CONTEXT_KEY = "__ingest__"; + public static final String INGEST_PAREM_CONTEXT_KEY = "__ingest__"; public static final String INGEST_PARAM = "ingest"; + public static final String INGEST_ALREADY_PROCESSED = "ingest_already_processed"; public static final String NAME = "ingest"; private final Settings nodeSettings; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/IngestRestFilter.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/IngestRestFilter.java index c6afd2e4ebd..c82e8f1800e 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/IngestRestFilter.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/IngestRestFilter.java @@ -23,7 +23,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.rest.*; import static org.elasticsearch.plugin.ingest.IngestPlugin.*; -import static org.elasticsearch.plugin.ingest.IngestPlugin.INGEST_CONTEXT_KEY; +import static org.elasticsearch.plugin.ingest.IngestPlugin.INGEST_PAREM_CONTEXT_KEY; public class IngestRestFilter extends RestFilter { @@ -34,7 +34,7 @@ public class IngestRestFilter extends RestFilter { @Override public void process(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception { - request.putInContext(INGEST_CONTEXT_KEY, request.param(INGEST_PARAM)); + request.putInContext(INGEST_PAREM_CONTEXT_KEY, request.param(INGEST_PARAM)); filterChain.continueProcessing(request, channel); } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java index 228243d3240..1edd481bbe8 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java @@ -48,7 +48,7 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte @Override public void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) { - String pipelineId = request.getFromContext(IngestPlugin.INGEST_CONTEXT_KEY); + String pipelineId = request.getFromContext(IngestPlugin.INGEST_PAREM_CONTEXT_KEY); if (pipelineId == null) { pipelineId = request.getHeader(IngestPlugin.INGEST_PARAM); if (pipelineId == null) { @@ -73,6 +73,14 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte } void processIndexRequest(String action, ActionListener listener, ActionFilterChain chain, IndexRequest indexRequest, String pipelineId) { + // The IndexRequest has the same type on the node that receives the request and the node that + // processes the primary action. This could lead to a pipeline being executed twice for the same + // index request, hence this check + if (indexRequest.hasHeader(IngestPlugin.INGEST_ALREADY_PROCESSED)) { + chain.proceed(action, indexRequest, listener); + return; + } + Map sourceAsMap = indexRequest.sourceAsMap(); Data data = new Data(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap); executionService.execute(data, pipelineId, new PipelineExecutionService.Listener() { @@ -81,6 +89,7 @@ public class IngestActionFilter extends AbstractComponent implements ActionFilte if (data.isModified()) { indexRequest.source(data.getDocument()); } + indexRequest.putHeader(IngestPlugin.INGEST_ALREADY_PROCESSED, true); chain.proceed(action, indexRequest, listener); } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java index 4b19f0bd91b..dfa61cf1fd1 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java @@ -82,7 +82,7 @@ public class IngestActionFilterTests extends ESTestCase { public void testApplyIngestIdViaContext() throws Exception { IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id"); indexRequest.source("field", "value"); - indexRequest.putInContext(IngestPlugin.INGEST_CONTEXT_KEY, "_id"); + indexRequest.putInContext(IngestPlugin.INGEST_PAREM_CONTEXT_KEY, "_id"); ActionListener actionListener = mock(ActionListener.class); ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); @@ -92,6 +92,20 @@ public class IngestActionFilterTests extends ESTestCase { verifyZeroInteractions(actionFilterChain); } + public void testApplyAlreadyProcessed() throws Exception { + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id"); + indexRequest.source("field", "value"); + indexRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id"); + indexRequest.putHeader(IngestPlugin.INGEST_ALREADY_PROCESSED, true); + ActionListener actionListener = mock(ActionListener.class); + ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); + + filter.apply("_action", indexRequest, actionListener, actionFilterChain); + + verify(actionFilterChain).proceed("_action", indexRequest, actionListener); + verifyZeroInteractions(executionService, actionListener); + } + public void testApply_executed() throws Exception { IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id"); indexRequest.source("field", "value");