From e37e5dfc04c0608a2ef45692302e5ab4c6e3c43b Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Thu, 20 Sep 2018 08:33:07 -0500 Subject: [PATCH] ingest: support simulate with verbose for pipeline processor (#33839) * ingest: support simulate with verbose for pipeline processor This change better supports the use of simulate?verbose with the pipeline processor. Prior to this change any pipeline processors executed with simulate?verbose would not show all intermediate processors for the inner pipelines. This changes also moves the PipelineProcess and TrackingResultProcessor classes to enable instance checks and to avoid overly public classes. As well this updates the error message for when cycles are detected in pipelines calling other pipelines. --- .../ingest/common/IngestCommonPlugin.java | 1 + .../test/ingest/210_pipeline_processor.yml | 2 +- .../rest-api-spec/test/ingest/90_simulate.yml | 147 ++++++++ .../ingest/SimulateExecutionService.java | 14 +- .../elasticsearch/ingest/IngestDocument.java | 2 +- .../ingest}/PipelineProcessor.java | 12 +- .../ingest/TrackingResultProcessor.java | 35 +- .../ingest/TrackingResultProcessorTests.java | 149 --------- .../ingest}/PipelineProcessorTests.java | 5 +- .../ingest/TrackingResultProcessorTests.java | 315 ++++++++++++++++++ 10 files changed, 509 insertions(+), 173 deletions(-) rename {modules/ingest-common/src/main/java/org/elasticsearch/ingest/common => server/src/main/java/org/elasticsearch/ingest}/PipelineProcessor.java (87%) rename server/src/main/java/org/elasticsearch/{action => }/ingest/TrackingResultProcessor.java (65%) delete mode 100644 server/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java rename {modules/ingest-common/src/test/java/org/elasticsearch/ingest/common => server/src/test/java/org/elasticsearch/ingest}/PipelineProcessorTests.java (97%) create mode 100644 server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index d9dba2cc100..d58a48e70c9 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.grok.Grok; import org.elasticsearch.grok.ThreadWatchdog; +import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.IngestPlugin; diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml index 355ba2d4210..c7c5df1e06f 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml @@ -110,4 +110,4 @@ teardown: pipeline: "outer" body: {} - match: { error.root_cause.0.type: "exception" } -- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Recursive invocation of pipeline [inner] detected." } +- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Cycle detected for pipeline: inner" } diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml index 776a8af0c24..46c4fb0a69e 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -605,3 +605,150 @@ teardown: - length: { docs.0.processor_results.1: 2 } - match: { docs.0.processor_results.1.tag: "rename-1" } - match: { docs.0.processor_results.1.doc._source.new_status: 200 } + +--- +"Test verbose simulate with Pipeline Processor with Circular Pipelines": +- do: + ingest.put_pipeline: + id: "outer" + body: > + { + "description" : "outer pipeline", + "processors" : [ + { + "pipeline" : { + "pipeline": "inner" + } + } + ] + } +- match: { acknowledged: true } + +- do: + ingest.put_pipeline: + id: "inner" + body: > + { + "description" : "inner pipeline", + "processors" : [ + { + "pipeline" : { + "pipeline": "outer" + } + } + ] + } +- match: { acknowledged: true } + +- do: + catch: /illegal_state_exception/ + ingest.simulate: + verbose: true + body: > + { + "pipeline": { + "processors" : [ + { + "pipeline" : { + "pipeline": "outer" + } + } + ] + } + , + "docs": [ + { + "_index": "index", + "_type": "type", + "_id": "id", + "_source": { + "field1": "123.42 400 " + } + } + ] + } +- match: { error.root_cause.0.type: "illegal_state_exception" } +- match: { error.root_cause.0.reason: "Cycle detected for pipeline: inner" } + +--- +"Test verbose simulate with Pipeline Processor with Multiple Pipelines": +- do: + ingest.put_pipeline: + id: "pipeline1" + body: > + { + "processors": [ + { + "set": { + "field": "pipeline1", + "value": true + } + }, + { + "pipeline": { + "pipeline": "pipeline2" + } + } + ] + } +- match: { acknowledged: true } + +- do: + ingest.put_pipeline: + id: "pipeline2" + body: > + { + "processors": [ + { + "set": { + "field": "pipeline2", + "value": true + } + } + ] + } +- match: { acknowledged: true } + +- do: + ingest.simulate: + verbose: true + body: > + { + "pipeline": { + "processors": [ + { + "set": { + "field": "pipeline0", + "value": true + } + }, + { + "pipeline": { + "pipeline": "pipeline1" + } + } + ] + }, + "docs": [ + { + "_index": "index", + "_type": "type", + "_id": "id", + "_source": { + "field1": "123.42 400 " + } + } + ] + } +- length: { docs: 1 } +- length: { docs.0.processor_results: 3 } +- match: { docs.0.processor_results.0.doc._source.pipeline0: true } +- is_false: docs.0.processor_results.0.doc._source.pipeline1 +- is_false: docs.0.processor_results.0.doc._source.pipeline2 +- match: { docs.0.processor_results.1.doc._source.pipeline0: true } +- match: { docs.0.processor_results.1.doc._source.pipeline1: true } +- is_false: docs.0.processor_results.1.doc._source.pipeline2 +- match: { docs.0.processor_results.2.doc._source.pipeline0: true } +- match: { docs.0.processor_results.2.doc._source.pipeline1: true } +- match: { docs.0.processor_results.2.doc._source.pipeline2: true } + diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index 430da9955ba..c081707f4db 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -24,12 +24,16 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.CompoundProcessor; +import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; +import java.util.Collections; +import java.util.IdentityHashMap; import java.util.List; +import java.util.Set; -import static org.elasticsearch.action.ingest.TrackingResultProcessor.decorate; +import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; class SimulateExecutionService { @@ -42,11 +46,15 @@ class SimulateExecutionService { } SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) { + // Prevent cycles in pipeline decoration + final Set pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>()); if (verbose) { List processorResultList = new ArrayList<>(); - CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList, pipelinesSeen); try { - verbosePipelineProcessor.execute(ingestDocument); + Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), + verbosePipelineProcessor); + ingestDocument.executePipeline(verbosePipeline); return new SimulateDocumentVerboseResult(processorResultList); } catch (Exception e) { return new SimulateDocumentVerboseResult(processorResultList); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 10cb2fd17fe..719558edbf7 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -647,7 +647,7 @@ public final class IngestDocument { public IngestDocument executePipeline(Pipeline pipeline) throws Exception { try { if (this.executedPipelines.add(pipeline) == false) { - throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected."); + throw new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()); } return pipeline.execute(this); } finally { diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java similarity index 87% rename from modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java rename to server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index 1958a3e5232..918ff6b8aef 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -17,15 +17,9 @@ * under the License. */ -package org.elasticsearch.ingest.common; +package org.elasticsearch.ingest; import java.util.Map; -import org.elasticsearch.ingest.AbstractProcessor; -import org.elasticsearch.ingest.ConfigurationUtils; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.Processor; public class PipelineProcessor extends AbstractProcessor { @@ -50,6 +44,10 @@ public class PipelineProcessor extends AbstractProcessor { return ingestDocument.executePipeline(pipeline); } + Pipeline getPipeline(){ + return ingestService.getPipeline(pipelineName); + } + @Override public String getType() { return TYPE; diff --git a/server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java similarity index 65% rename from server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java rename to server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 04c0fe7ca49..41a984be5ad 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -17,14 +17,13 @@ * under the License. */ -package org.elasticsearch.action.ingest; +package org.elasticsearch.ingest; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.Processor; +import org.elasticsearch.action.ingest.SimulateProcessorResult; import java.util.ArrayList; import java.util.List; +import java.util.Set; /** * Processor to be used within Simulate API to keep track of processors executed in pipeline. @@ -35,7 +34,7 @@ public final class TrackingResultProcessor implements Processor { private final List processorResultList; private final boolean ignoreFailure; - public TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List processorResultList) { + TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List processorResultList) { this.ignoreFailure = ignoreFailure; this.processorResultList = processorResultList; this.actualProcessor = actualProcessor; @@ -67,19 +66,35 @@ public final class TrackingResultProcessor implements Processor { return actualProcessor.getTag(); } - public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList) { + public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList, + Set pipelinesSeen) { List processors = new ArrayList<>(compoundProcessor.getProcessors().size()); for (Processor processor : compoundProcessor.getProcessors()) { - if (processor instanceof CompoundProcessor) { - processors.add(decorate((CompoundProcessor) processor, processorResultList)); + if (processor instanceof PipelineProcessor) { + PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); + if (pipelinesSeen.add(pipelineProcessor) == false) { + throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId()); + } + processors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen)); + pipelinesSeen.remove(pipelineProcessor); + } else if (processor instanceof CompoundProcessor) { + processors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen)); } else { processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); } } List onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size()); for (Processor processor : compoundProcessor.getOnFailureProcessors()) { - if (processor instanceof CompoundProcessor) { - onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList)); + if (processor instanceof PipelineProcessor) { + PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); + if (pipelinesSeen.add(pipelineProcessor) == false) { + throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId()); + } + onFailureProcessors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, + pipelinesSeen)); + pipelinesSeen.remove(pipelineProcessor); + } else if (processor instanceof CompoundProcessor) { + onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen)); } else { onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); } diff --git a/server/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java deleted file mode 100644 index 3572a04529b..00000000000 --- a/server/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.ingest; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ingest.TestProcessor; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.test.ESTestCase; -import org.junit.Before; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD; -import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD; -import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD; -import static org.elasticsearch.action.ingest.TrackingResultProcessor.decorate; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.sameInstance; - -public class TrackingResultProcessorTests extends ESTestCase { - - private IngestDocument ingestDocument; - private List resultList; - - @Before - public void init() { - ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); - resultList = new ArrayList<>(); - } - - public void testActualProcessor() throws Exception { - TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {}); - TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); - - SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); - - assertThat(actualProcessor.getInvokedCounter(), equalTo(1)); - assertThat(resultList.size(), equalTo(1)); - - assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); - assertThat(resultList.get(0).getFailure(), nullValue()); - assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); - } - - public void testActualCompoundProcessorWithoutOnFailure() throws Exception { - RuntimeException exception = new RuntimeException("processor failed"); - TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); - CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - - try { - trackingProcessor.execute(ingestDocument); - fail("processor should throw exception"); - } catch (ElasticsearchException e) { - assertThat(e.getRootCause().getMessage(), equalTo(exception.getMessage())); - } - - SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); - assertThat(testProcessor.getInvokedCounter(), equalTo(1)); - assertThat(resultList.size(), equalTo(1)); - assertThat(resultList.get(0).getIngestDocument(), nullValue()); - assertThat(resultList.get(0).getFailure(), equalTo(exception)); - assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFirstResult.getProcessorTag())); - } - - public void testActualCompoundProcessorWithOnFailure() throws Exception { - RuntimeException exception = new RuntimeException("fail"); - TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); - TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); - CompoundProcessor actualProcessor = new CompoundProcessor(false, - Arrays.asList(new CompoundProcessor(false, - Arrays.asList(failProcessor, onFailureProcessor), - Arrays.asList(onFailureProcessor, failProcessor))), - Arrays.asList(onFailureProcessor)); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); - - SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); - SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); - - assertThat(failProcessor.getInvokedCounter(), equalTo(2)); - assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2)); - assertThat(resultList.size(), equalTo(4)); - - assertThat(resultList.get(0).getIngestDocument(), nullValue()); - assertThat(resultList.get(0).getFailure(), equalTo(exception)); - assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); - - Map metadata = resultList.get(1).getIngestDocument().getIngestMetadata(); - assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); - assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); - assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); - assertThat(resultList.get(1).getFailure(), nullValue()); - assertThat(resultList.get(1).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); - - assertThat(resultList.get(2).getIngestDocument(), nullValue()); - assertThat(resultList.get(2).getFailure(), equalTo(exception)); - assertThat(resultList.get(2).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); - - metadata = resultList.get(3).getIngestDocument().getIngestMetadata(); - assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); - assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); - assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); - assertThat(resultList.get(3).getFailure(), nullValue()); - assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); - } - - public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { - RuntimeException exception = new RuntimeException("processor failed"); - TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); - CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor), - Collections.emptyList()); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - - trackingProcessor.execute(ingestDocument); - - SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); - assertThat(testProcessor.getInvokedCounter(), equalTo(1)); - assertThat(resultList.size(), equalTo(1)); - assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); - assertThat(resultList.get(0).getFailure(), sameInstance(exception)); - assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); - } -} diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java similarity index 97% rename from modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java rename to server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index 6e18bac40d4..99fa7633d08 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.ingest.common; +package org.elasticsearch.ingest; import java.util.Collections; import java.util.HashMap; @@ -27,6 +27,7 @@ import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.test.ESTestCase; @@ -110,7 +111,7 @@ public class PipelineProcessorTests extends ESTestCase { () -> factory.create(Collections.emptyMap(), null, outerConfig).execute(testIngestDocument) ); assertEquals( - "Recursive invocation of pipeline [inner] detected.", e.getRootCause().getMessage() + "Cycle detected for pipeline: inner", e.getRootCause().getMessage() ); } diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java new file mode 100644 index 00000000000..7a7f9b77372 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -0,0 +1,315 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ingest.SimulateProcessorResult; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD; +import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD; +import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD; +import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TrackingResultProcessorTests extends ESTestCase { + + private IngestDocument ingestDocument; + private List resultList; + private Set pipelinesSeen; + + @Before + public void init() { + ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); + resultList = new ArrayList<>(); + pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>()); + } + + public void testActualProcessor() throws Exception { + TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {}); + TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList); + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + assertThat(actualProcessor.getInvokedCounter(), equalTo(1)); + assertThat(resultList.size(), equalTo(1)); + + assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(0).getFailure(), nullValue()); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); + } + + public void testActualCompoundProcessorWithoutOnFailure() throws Exception { + RuntimeException exception = new RuntimeException("processor failed"); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); + CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + try { + trackingProcessor.execute(ingestDocument); + fail("processor should throw exception"); + } catch (ElasticsearchException e) { + assertThat(e.getRootCause().getMessage(), equalTo(exception.getMessage())); + } + + SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); + assertThat(testProcessor.getInvokedCounter(), equalTo(1)); + assertThat(resultList.size(), equalTo(1)); + assertThat(resultList.get(0).getIngestDocument(), nullValue()); + assertThat(resultList.get(0).getFailure(), equalTo(exception)); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFirstResult.getProcessorTag())); + } + + public void testActualCompoundProcessorWithOnFailure() throws Exception { + RuntimeException exception = new RuntimeException("fail"); + TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); + TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); + CompoundProcessor actualProcessor = new CompoundProcessor(false, + Arrays.asList(new CompoundProcessor(false, + Arrays.asList(failProcessor, onFailureProcessor), + Arrays.asList(onFailureProcessor, failProcessor))), + Arrays.asList(onFailureProcessor)); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); + SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); + + assertThat(failProcessor.getInvokedCounter(), equalTo(2)); + assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2)); + assertThat(resultList.size(), equalTo(4)); + + assertThat(resultList.get(0).getIngestDocument(), nullValue()); + assertThat(resultList.get(0).getFailure(), equalTo(exception)); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); + + Map metadata = resultList.get(1).getIngestDocument().getIngestMetadata(); + assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); + assertThat(resultList.get(1).getFailure(), nullValue()); + assertThat(resultList.get(1).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); + + assertThat(resultList.get(2).getIngestDocument(), nullValue()); + assertThat(resultList.get(2).getFailure(), equalTo(exception)); + assertThat(resultList.get(2).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); + + metadata = resultList.get(3).getIngestDocument().getIngestMetadata(); + assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); + assertThat(resultList.get(3).getFailure(), nullValue()); + assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); + } + + public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { + RuntimeException exception = new RuntimeException("processor failed"); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); + CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor), + Collections.emptyList()); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); + assertThat(testProcessor.getInvokedCounter(), equalTo(1)); + assertThat(resultList.size(), equalTo(1)); + assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(0).getFailure(), sameInstance(exception)); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); + } + + public void testActualPipelineProcessor() throws Exception { + String pipelineId = "pipeline1"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }), + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key2, randomInt()); }), + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService).getPipeline(pipelineId); + assertThat(resultList.size(), equalTo(3)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); + assertTrue(resultList.get(1).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(2).getFailure(), nullValue()); + assertThat(resultList.get(2).getProcessorTag(), nullValue()); + } + + public void testActualPipelineProcessorWithHandledFailure() throws Exception { + RuntimeException exception = new RuntimeException("processor failed"); + + String pipelineId = "pipeline1"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); }), + new CompoundProcessor( + false, + Collections.singletonList(new TestProcessor(ingestDocument -> { throw exception; })), + Collections.singletonList(new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key2, randomInt()); })) + ), + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService).getPipeline(pipelineId); + assertThat(resultList.size(), equalTo(4)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + //failed processor + assertNull(resultList.get(1).getIngestDocument()); + assertThat(resultList.get(1).getFailure().getMessage(), equalTo(exception.getMessage())); + + assertTrue(resultList.get(2).getIngestDocument().hasField(key1)); + assertTrue(resultList.get(2).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(2).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(3).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(3).getFailure(), nullValue()); + assertThat(resultList.get(3).getProcessorTag(), nullValue()); + } + + public void testActualPipelineProcessorWithCycle() throws Exception { + String pipelineId = "pipeline1"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor(pipelineProcessor) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + IllegalStateException exception = expectThrows(IllegalStateException.class, + () -> decorate(actualProcessor, resultList, pipelinesSeen)); + assertThat(exception.getMessage(), equalTo("Cycle detected for pipeline: pipeline1")); + } + + + public void testActualPipelineProcessorRepeatedInvocation() throws Exception { + String pipelineId = "pipeline1"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); })) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService, times(2)).getPipeline(pipelineId); + assertThat(resultList.size(), equalTo(2)); + + assertThat(resultList.get(0).getIngestDocument(), not(equalTo(expectedResult.getIngestDocument()))); + assertThat(resultList.get(0).getFailure(), nullValue()); + assertThat(resultList.get(0).getProcessorTag(), nullValue()); + + assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(1).getFailure(), nullValue()); + assertThat(resultList.get(1).getProcessorTag(), nullValue()); + + //each invocation updates key1 with a random int + assertNotEquals(resultList.get(0).getIngestDocument().getSourceAndMetadata().get(key1), + resultList.get(1).getIngestDocument().getSourceAndMetadata().get(key1)); + } + +}