From 0ba18557406b5fbe6266bee8f1b05afb1f11c72a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 25 Sep 2018 19:29:22 +0200 Subject: [PATCH] INGEST: Tests for Drop Processor (#33430) * INGEST: Tests for Drop Processor * UT for behavior of dropped callback and drop processor * Moved drop processor to `server` project to enable this test * Simple IT * Relates #32278 --- .../ingest/common/IngestCommonPlugin.java | 1 + .../test/ingest/220_drop_processor.yml | 59 +++++++++++++++++++ .../elasticsearch/ingest}/DropProcessor.java | 5 +- .../ingest/IngestServiceTests.java | 39 ++++++++++++ 4 files changed, 100 insertions(+), 4 deletions(-) create mode 100644 modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml rename {modules/ingest-common/src/main/java/org/elasticsearch/ingest/common => server/src/main/java/org/elasticsearch/ingest}/DropProcessor.java (90%) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index d58a48e70c9..41e96253f28 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.grok.Grok; import org.elasticsearch.grok.ThreadWatchdog; +import org.elasticsearch.ingest.DropProcessor; import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.ActionPlugin; diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml new file mode 100644 index 00000000000..3be038aca24 --- /dev/null +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml @@ -0,0 +1,59 @@ +--- +teardown: +- do: + ingest.delete_pipeline: + id: "my_pipeline" + ignore: 404 + +--- +"Test Drop Processor": +- do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description" : "pipeline with drop", + "processors" : [ + { + "drop" : { + "if": "ctx.foo == 'bar'" + } + } + ] + } +- match: { acknowledged: true } + +- do: + index: + index: test + type: test + id: 1 + pipeline: "my_pipeline" + body: { + foo: "bar" + } + +- do: + index: + index: test + type: test + id: 2 + pipeline: "my_pipeline" + body: { + foo: "blub" + } + +- do: + catch: missing + get: + index: test + type: test + id: 1 +- match: { found: false } + +- do: + get: + index: test + type: test + id: 2 +- match: { _source.foo: "blub" } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DropProcessor.java b/server/src/main/java/org/elasticsearch/ingest/DropProcessor.java similarity index 90% rename from modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DropProcessor.java rename to server/src/main/java/org/elasticsearch/ingest/DropProcessor.java index a0eabe38979..d4ccf7ad635 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DropProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/DropProcessor.java @@ -17,12 +17,9 @@ * under the License. */ -package org.elasticsearch.ingest.common; +package org.elasticsearch.ingest; import java.util.Map; -import org.elasticsearch.ingest.AbstractProcessor; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.Processor; /** * Drop processor only returns {@code null} for the execution result to indicate that any document diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index e3f52f35b79..140c09a93fd 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -812,6 +812,45 @@ public class IngestServiceTests extends ESTestCase { assertThat(ingestService.stats().getStatsPerPipeline(), not(hasKey("_id2"))); } + public void testExecuteWithDrop() { + Map factories = new HashMap<>(); + factories.put("drop", new DropProcessor.Factory()); + factories.put("mock", (processorFactories, tag, config) -> new Processor() { + @Override + public IngestDocument execute(final IngestDocument ingestDocument) { + throw new AssertionError("Document should have been dropped but reached this processor"); + } + + @Override + public String getType() { + return null; + } + + @Override + public String getTag() { + return null; + } + }); + IngestService ingestService = createWithProcessors(factories); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", + new BytesArray("{\"processors\": [{\"drop\" : {}}, {\"mock\" : {}}]}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + @SuppressWarnings("unchecked") + final Consumer dropHandler = mock(Consumer.class); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler); + verify(failureHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(null); + verify(dropHandler, times(1)).accept(indexRequest); + } + private IngestDocument eqIndexTypeId(final Map source) { return argThat(new IngestDocumentMatcher("_index", "_type", "_id", source)); }