From 82a9ba355d9fc37ae6b2bc61b932113a73987d11 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 7 Oct 2015 13:03:15 +0200 Subject: [PATCH] Added pipeline execution service that deals with updating data as it comes in using a dedicated thread pool. Also changed how bulk requests are handled, because before it just didn't work, but added a todo there because it can potentially be handled differently. --- .../java/org/elasticsearch/ingest/Data.java | 7 + .../elasticsearch/ingest/SimpleProcessor.java | 10 +- .../plugin/ingest/IngestModule.java | 1 + .../plugin/ingest/IngestPlugin.java | 19 +- .../ingest/PipelineExecutionService.java | 86 +++++++ .../ingest/transport/IngestActionFilter.java | 105 ++++++--- .../org/elasticsearch/ingest/BasicTests.java | 23 +- .../ingest/PipelineExecutionServiceTests.java | 127 +++++++++++ .../transport/IngestActionFilterTests.java | 209 ++++++++++++++++++ 9 files changed, 543 insertions(+), 44 deletions(-) create mode 100644 plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java create mode 100644 plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java create mode 100644 plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Data.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/Data.java index 83a9aede29b..6a4f2f965ce 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Data.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/Data.java @@ -33,6 +33,8 @@ public final class Data { private final String id; private final Map document; + private boolean modified = false; + public Data(String index, String type, String id, Map document) { this.index = index; this.type = type; @@ -46,6 +48,7 @@ public final class Data { } public void addField(String field, String value) { + modified = true; document.put(field, value); } @@ -64,4 +67,8 @@ public final class Data { public Map getDocument() { return document; } + + public boolean isModified() { + return modified; + } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/SimpleProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/SimpleProcessor.java index 197f000118b..956e444cb48 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/SimpleProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/SimpleProcessor.java @@ -51,7 +51,7 @@ public final class SimpleProcessor implements Processor { public static class Builder implements Processor.Builder { private String path; - private String value; + private String expectedValue; private String addField; private String addFieldValue; @@ -59,8 +59,8 @@ public final class SimpleProcessor implements Processor { this.path = path; } - public void setValue(String value) { - this.value = value; + public void setExpectedValue(String value) { + this.expectedValue = value; } public void setAddField(String addField) { @@ -73,14 +73,14 @@ public final class SimpleProcessor implements Processor { public void fromMap(Map config) { this.path = (String) config.get("path"); - this.value = (String) config.get("value"); + this.expectedValue = (String) config.get("expected_value"); this.addField = (String) config.get("add_field"); this.addFieldValue = (String) config.get("add_field_value"); } @Override public Processor build() { - return new SimpleProcessor(path, value, addField, addFieldValue); + return new SimpleProcessor(path, expectedValue, addField, addFieldValue); } public static class Factory implements Processor.Builder.Factory { diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java index b8b11a898b2..6a2138c9089 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java @@ -37,6 +37,7 @@ public class IngestModule extends AbstractModule { @Override protected void configure() { binder().bind(IngestRestFilter.class).asEagerSingleton(); + binder().bind(PipelineExecutionService.class).asEagerSingleton(); binder().bind(PipelineStore.class).asEagerSingleton(); binder().bind(PipelineConfigDocReader.class).asEagerSingleton(); diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java index 2368ede0b5a..366eea29ae6 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java @@ -23,6 +23,7 @@ package org.elasticsearch.plugin.ingest; import org.elasticsearch.action.ActionModule; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugin.ingest.transport.IngestActionFilter; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.action.RestActionModule; @@ -31,14 +32,23 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import static org.elasticsearch.common.settings.Settings.settingsBuilder; + public class IngestPlugin extends Plugin { public static final String INGEST_CONTEXT_KEY = "__ingest__"; public static final String INGEST_PARAM = "ingest"; + public static final String NAME = "ingest"; + + private final Settings nodeSettings; + + public IngestPlugin(Settings nodeSettings) { + this.nodeSettings = nodeSettings; + } @Override public String name() { - return "ingest"; + return NAME; } @Override @@ -56,6 +66,13 @@ public class IngestPlugin extends Plugin { return Arrays.asList(PipelineStore.class, PipelineConfigDocReader.class); } + @Override + public Settings additionalSettings() { + return settingsBuilder() + .put(PipelineExecutionService.additionalSettings(nodeSettings)) + .build(); + } + public void onModule(ActionModule module) { module.registerFilter(IngestActionFilter.class); } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java new file mode 100644 index 00000000000..18d656813ec --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java @@ -0,0 +1,86 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest; + +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.support.LoggerMessageFormat; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.threadpool.ThreadPool; + +public class PipelineExecutionService { + + static final String THREAD_POOL_NAME = IngestPlugin.NAME; + + private final PipelineStore store; + private final ThreadPool threadPool; + + @Inject + public PipelineExecutionService(PipelineStore store, ThreadPool threadPool) { + this.store = store; + this.threadPool = threadPool; + } + + public void execute(Data data, String pipelineId, Listener listener) { + Pipeline pipeline = store.get(pipelineId); + if (pipeline == null) { + listener.failed(new IllegalArgumentException(LoggerMessageFormat.format("pipeline with id [{}] does not exist", pipelineId))); + return; + } + + threadPool.executor(THREAD_POOL_NAME).execute(new Runnable() { + @Override + public void run() { + try { + pipeline.execute(data); + listener.executed(data); + } catch (Exception e) { + listener.failed(e); + } + } + }); + } + + public interface Listener { + + void executed(Data data); + + void failed(Exception e); + + } + + public static Settings additionalSettings(Settings nodeSettings) { + Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME); + if (!settings.names().isEmpty()) { + // the TP is already configured in the node settings + // no need for additional settings + return Settings.EMPTY; + } + int availableProcessors = EsExecutors.boundedNumberOfProcessors(nodeSettings); + return Settings.builder() + .put("threadpool." + THREAD_POOL_NAME + ".type", "fixed") + .put("threadpool." + THREAD_POOL_NAME + ".size", availableProcessors) + .put("threadpool." + THREAD_POOL_NAME + ".queue_size", 200) + .build(); + } + +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java index 1fc34565afc..228243d3240 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java @@ -25,65 +25,108 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilterChain; +import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.Data; -import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.plugin.ingest.IngestPlugin; -import org.elasticsearch.plugin.ingest.PipelineStore; +import org.elasticsearch.plugin.ingest.PipelineExecutionService; -import java.util.List; +import java.util.Iterator; import java.util.Map; -public class IngestActionFilter extends ActionFilter.Simple { +public class IngestActionFilter extends AbstractComponent implements ActionFilter { - private final PipelineStore pipelineStore; + private final PipelineExecutionService executionService; @Inject - public IngestActionFilter(Settings settings, PipelineStore pipelineStore) { + public IngestActionFilter(Settings settings, PipelineExecutionService executionService) { super(settings); - this.pipelineStore = pipelineStore; + this.executionService = executionService; } @Override - protected boolean apply(String action, ActionRequest request, ActionListener listener) { + public void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) { String pipelineId = request.getFromContext(IngestPlugin.INGEST_CONTEXT_KEY); if (pipelineId == null) { pipelineId = request.getHeader(IngestPlugin.INGEST_PARAM); if (pipelineId == null) { - return true; + chain.proceed(action, request, listener); + return; } } - Pipeline pipeline = pipelineStore.get(pipelineId); - if (pipeline == null) { - return true; - } if (request instanceof IndexRequest) { - processIndexRequest((IndexRequest) request, pipeline); + processIndexRequest(action, listener, chain, (IndexRequest) request, pipelineId); } else if (request instanceof BulkRequest) { BulkRequest bulkRequest = (BulkRequest) request; - List actionRequests = bulkRequest.requests(); - for (ActionRequest actionRequest : actionRequests) { - if (actionRequest instanceof IndexRequest) { - processIndexRequest((IndexRequest) actionRequest, pipeline); - } - } + processBulkIndexRequest(action, listener, chain, bulkRequest, pipelineId, bulkRequest.requests().iterator()); + } else { + chain.proceed(action, request, listener); } - return true; - } - - // TODO: this should be delegated to a PipelineExecutor service that executes on a different thread (pipeline TP) - void processIndexRequest(IndexRequest indexRequest, Pipeline pipeline) { - Map sourceAsMap = indexRequest.sourceAsMap(); - Data data = new Data(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap); - pipeline.execute(data); - indexRequest.source(data.getDocument()); } @Override - protected boolean apply(String action, ActionResponse response, ActionListener listener) { - return true; + public void apply(String action, ActionResponse response, ActionListener listener, ActionFilterChain chain) { + chain.proceed(action, response, listener); + } + + void processIndexRequest(String action, ActionListener listener, ActionFilterChain chain, IndexRequest indexRequest, String pipelineId) { + Map sourceAsMap = indexRequest.sourceAsMap(); + Data data = new Data(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap); + executionService.execute(data, pipelineId, new PipelineExecutionService.Listener() { + @Override + public void executed(Data data) { + if (data.isModified()) { + indexRequest.source(data.getDocument()); + } + chain.proceed(action, indexRequest, listener); + } + + @Override + public void failed(Exception e) { + logger.error("failed to execute pipeline [{}]", e, pipelineId); + listener.onFailure(e); + } + }); + } + + // TODO: rethink how to deal with bulk requests: + // This doesn't scale very well for a single bulk requests, so it would be great if a bulk requests could be broken up into several chunks so that the ingesting can be paralized + // on the other hand if there are many index/bulk requests then breaking up bulk requests isn't going to help much. + // I think the execution service should be smart enough about when it should break things up in chunks based on the ingest threadpool usage, + // this means that the contract of the execution service should change in order to accept multiple data instances. + void processBulkIndexRequest(String action, ActionListener listener, ActionFilterChain chain, BulkRequest bulkRequest, String pipelineId, Iterator requests) { + if (!requests.hasNext()) { + chain.proceed(action, bulkRequest, listener); + return; + } + + ActionRequest actionRequest = requests.next(); + if (!(actionRequest instanceof IndexRequest)) { + processBulkIndexRequest(action, listener, chain, bulkRequest, pipelineId, requests); + return; + } + + IndexRequest indexRequest = (IndexRequest) actionRequest; + Map sourceAsMap = indexRequest.sourceAsMap(); + Data data = new Data(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap); + executionService.execute(data, pipelineId, new PipelineExecutionService.Listener() { + @Override + public void executed(Data data) { + if (data.isModified()) { + indexRequest.source(data.getDocument()); + } + processBulkIndexRequest(action, listener, chain, bulkRequest, pipelineId, requests); + } + + @Override + public void failed(Exception e) { + logger.error("failed to execute pipeline [{}]", e, pipelineId); + listener.onFailure(e); + } + }); } @Override diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/BasicTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/BasicTests.java index e23d84ab8cf..0c884713374 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/BasicTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/BasicTests.java @@ -48,7 +48,7 @@ public class BasicTests extends ESIntegTestCase { .startObject() .startObject("simple") .field("path", "field2") - .field("value", "abc") + .field("expected_value", "abc") .field("add_field", "field3") .field("add_field_value", "xyz") .endObject() @@ -64,16 +64,25 @@ public class BasicTests extends ESIntegTestCase { .putHeader("ingest", "_id") .get(); - Map doc = client().prepareGet("test", "type", "1") - .get().getSourceAsMap(); - assertThat(doc.get("field3"), equalTo("xyz")); + assertBusy(new Runnable() { + @Override + public void run() { + Map doc = client().prepareGet("test", "type", "1") + .get().getSourceAsMap(); + assertThat(doc.get("field3"), equalTo("xyz")); + } + }); client().prepareBulk().add( client().prepareIndex("test", "type", "2").setSource("field2", "abc") ).putHeader("ingest", "_id").get(); - - doc = client().prepareGet("test", "type", "2").get().getSourceAsMap(); - assertThat(doc.get("field3"), equalTo("xyz")); + assertBusy(new Runnable() { + @Override + public void run() { + Map doc = client().prepareGet("test", "type", "2").get().getSourceAsMap(); + assertThat(doc.get("field3"), equalTo("xyz")); + } + }); } @Override diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java new file mode 100644 index 00000000000..e2f966ba37e --- /dev/null +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java @@ -0,0 +1,127 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.util.Collections; +import java.util.Map; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +public class PipelineExecutionServiceTests extends ESTestCase { + + private PipelineStore store; + private ThreadPool threadPool; + private PipelineExecutionService executionService; + + @Before + public void setup() { + store = mock(PipelineStore.class); + threadPool = new ThreadPool( + Settings.builder() + .put("name", "_name") + .put(PipelineExecutionService.additionalSettings(Settings.EMPTY)) + .build() + ); + executionService = new PipelineExecutionService(store, threadPool); + } + + @After + public void destroy() { + threadPool.shutdown(); + } + + public void testExecute_pipelineDoesNotExist() { + when(store.get("_id")).thenReturn(null); + Data data = new Data("_index", "_type", "_id", Collections.emptyMap()); + PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class); + executionService.execute(data, "_id", listener); + verify(listener).failed(any(IllegalArgumentException.class)); + verify(listener, times(0)).executed(data); + } + + public void testExecute_success() throws Exception { + Pipeline.Builder builder = new Pipeline.Builder("_id"); + Processor processor = mock(Processor.class); + builder.addProcessors(new Processor.Builder() { + @Override + public void fromMap(Map config) { + } + + @Override + public Processor build() { + return processor; + } + }); + + when(store.get("_id")).thenReturn(builder.build()); + + Data data = new Data("_index", "_type", "_id", Collections.emptyMap()); + PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class); + executionService.execute(data, "_id", listener); + assertBusy(new Runnable() { + @Override + public void run() { + verify(processor).execute(data); + verify(listener).executed(data); + verify(listener, times(0)).failed(any(Exception.class)); + } + }); + } + + public void testExecute_failure() throws Exception { + Pipeline.Builder builder = new Pipeline.Builder("_id"); + Processor processor = mock(Processor.class); + builder.addProcessors(new Processor.Builder() { + @Override + public void fromMap(Map config) { + } + + @Override + public Processor build() { + return processor; + } + }); + + when(store.get("_id")).thenReturn(builder.build()); + Data data = new Data("_index", "_type", "_id", Collections.emptyMap()); + doThrow(new RuntimeException()).when(processor).execute(data); + PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class); + executionService.execute(data, "_id", listener); + assertBusy(new Runnable() { + @Override + public void run() { + verify(processor).execute(data); + verify(listener, times(0)).executed(data); + verify(listener).failed(any(RuntimeException.class)); + } + }); + } + +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java new file mode 100644 index 00000000000..4b19f0bd91b --- /dev/null +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java @@ -0,0 +1,209 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest.transport; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.ActionFilterChain; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.ingest.Data; +import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.SimpleProcessor; +import org.elasticsearch.plugin.ingest.IngestPlugin; +import org.elasticsearch.plugin.ingest.PipelineExecutionService; +import org.elasticsearch.plugin.ingest.PipelineStore; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Before; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.*; + +public class IngestActionFilterTests extends ESTestCase { + + private IngestActionFilter filter; + private PipelineExecutionService executionService; + + @Before + public void setup() { + executionService = mock(PipelineExecutionService.class); + filter = new IngestActionFilter(Settings.EMPTY, executionService); + } + + public void testApplyNoIngestId() throws Exception { + IndexRequest indexRequest = new IndexRequest(); + ActionListener actionListener = mock(ActionListener.class); + ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); + + filter.apply("_action", indexRequest, actionListener, actionFilterChain); + + verify(actionFilterChain).proceed("_action", indexRequest, actionListener); + verifyZeroInteractions(executionService, actionFilterChain); + } + + public void testApplyIngestIdViaRequestParam() throws Exception { + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id"); + indexRequest.source("field", "value"); + indexRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id"); + ActionListener actionListener = mock(ActionListener.class); + ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); + + filter.apply("_action", indexRequest, actionListener, actionFilterChain); + + verify(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + verifyZeroInteractions(actionFilterChain); + } + + public void testApplyIngestIdViaContext() throws Exception { + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id"); + indexRequest.source("field", "value"); + indexRequest.putInContext(IngestPlugin.INGEST_CONTEXT_KEY, "_id"); + ActionListener actionListener = mock(ActionListener.class); + ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); + + filter.apply("_action", indexRequest, actionListener, actionFilterChain); + + verify(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + verifyZeroInteractions(actionFilterChain); + } + + public void testApply_executed() throws Exception { + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id"); + indexRequest.source("field", "value"); + indexRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id"); + ActionListener actionListener = mock(ActionListener.class); + ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); + + Answer answer = new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + Data data = (Data) invocationOnMock.getArguments()[0]; + PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2]; + listener.executed(data); + return null; + } + }; + doAnswer(answer).when(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + filter.apply("_action", indexRequest, actionListener, actionFilterChain); + + verify(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + verify(actionFilterChain).proceed("_action", indexRequest, actionListener); + verifyZeroInteractions(actionListener); + } + + public void testApply_failed() throws Exception { + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id"); + indexRequest.source("field", "value"); + indexRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id"); + ActionListener actionListener = mock(ActionListener.class); + ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); + + RuntimeException exception = new RuntimeException(); + Answer answer = new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2]; + listener.failed(exception); + return null; + } + }; + doAnswer(answer).when(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + filter.apply("_action", indexRequest, actionListener, actionFilterChain); + + verify(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + verify(actionListener).onFailure(exception); + verifyZeroInteractions(actionFilterChain); + } + + public void testApply_withBulkRequest() throws Exception { + ThreadPool threadPool = new ThreadPool( + Settings.builder() + .put("name", "_name") + .put(PipelineExecutionService.additionalSettings(Settings.EMPTY)) + .build() + ); + PipelineStore store = mock(PipelineStore.class); + Pipeline.Builder pipelineBuilder = new Pipeline.Builder("_id"); + SimpleProcessor.Builder processorBuilder = new SimpleProcessor.Builder(); + processorBuilder.setPath("field1"); + processorBuilder.setExpectedValue("value1"); + processorBuilder.setAddField("field2"); + processorBuilder.setAddFieldValue("value2"); + pipelineBuilder.addProcessors(processorBuilder); + when(store.get("_id")).thenReturn(pipelineBuilder.build()); + executionService = new PipelineExecutionService(store, threadPool); + filter = new IngestActionFilter(Settings.EMPTY, executionService); + + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id"); + int numRequest = scaledRandomIntBetween(8, 64); + for (int i = 0; i < numRequest; i++) { + if (rarely()) { + ActionRequest request; + if (randomBoolean()) { + request = new DeleteRequest("_index", "_type", "_id"); + } else { + request = new UpdateRequest("_index", "_type", "_id"); + } + bulkRequest.add(request); + } else { + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id"); + indexRequest.source("field1", "value1"); + bulkRequest.add(indexRequest); + } + } + + ActionListener actionListener = mock(ActionListener.class); + ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); + + filter.apply("_action", bulkRequest, actionListener, actionFilterChain); + + assertBusy(new Runnable() { + @Override + public void run() { + verify(actionFilterChain).proceed("_action", bulkRequest, actionListener); + verifyZeroInteractions(actionListener); + + int assertedRequests = 0; + for (ActionRequest actionRequest : bulkRequest.requests()) { + if (actionRequest instanceof IndexRequest) { + IndexRequest indexRequest = (IndexRequest) actionRequest; + assertThat(indexRequest.sourceAsMap().size(), equalTo(2)); + assertThat(indexRequest.sourceAsMap().get("field1"), equalTo("value1")); + assertThat(indexRequest.sourceAsMap().get("field2"), equalTo("value2")); + } + assertedRequests++; + } + assertThat(assertedRequests, equalTo(numRequest)); + } + }); + + threadPool.shutdown(); + } + +}