diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml index 584f0f9e9f2..2224d56165f 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -417,6 +417,62 @@ teardown: - is_true: docs.1.processor_results.1.doc._ingest.timestamp - is_true: docs.1.processor_results.1.doc._ingest.pipeline +--- +"Test verbose simulate with error in pipeline": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "rename" : { + "field" : "does_not_exist", + "target_field" : "_value" + } + } + ] + } + - match: { acknowledged: true } + + - do: + ingest.simulate: + verbose: true + body: > + { + "pipeline": { + "description": "_description", + "processors": [ + { + "pipeline" : { + "name" : "my_pipeline" + } + } + ] + }, + "docs": [ + { + "_index": "index", + "_id": "id", + "_source": { + "foo": "bar", + "bar": "hello" + } + } + ] + } + - length: { docs: 1 } + - length: { docs.0.processor_results: 2 } + - match: { docs.0.processor_results.0.processor_type: "pipeline" } + - match: { docs.0.processor_results.0.status: "success" } + - match: { docs.0.processor_results.1.processor_type: "rename" } + - match: { docs.0.processor_results.1.status: "error" } + - match: { docs.0.processor_results.1.error.root_cause.0.type: "illegal_argument_exception" } + - match: { docs.0.processor_results.1.error.root_cause.0.reason: "field [does_not_exist] doesn't exist" } + - match: { docs.0.processor_results.1.error.type: "illegal_argument_exception" } + - match: { docs.0.processor_results.1.error.reason: "field [does_not_exist] doesn't exist" } + --- "Test verbose simulate with on_failure": - do: diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 6ff01ca6398..60f77d2ad97 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -51,6 +51,7 @@ import java.util.function.BiConsumer; public final class IngestDocument { public static final String INGEST_KEY = "_ingest"; + public static final String PIPELINE_CYCLE_ERROR_MESSAGE = "Cycle detected for pipeline: "; private static final String INGEST_KEY_PREFIX = INGEST_KEY + "."; private static final String SOURCE_PREFIX = SourceFieldMapper.NAME + "."; @@ -748,7 +749,7 @@ public final class IngestDocument { handler.accept(result, e); }); } else { - handler.accept(null, new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId())); + handler.accept(null, new IllegalStateException(PIPELINE_CYCLE_ERROR_MESSAGE + pipeline.getId())); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index b819230a74b..9a88697d79a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -27,6 +27,8 @@ import java.util.ArrayList; import java.util.List; import java.util.function.BiConsumer; +import static org.elasticsearch.ingest.IngestDocument.PIPELINE_CYCLE_ERROR_MESSAGE; + /** * Processor to be used within Simulate API to keep track of processors executed in pipeline. */ @@ -73,20 +75,18 @@ public final class TrackingResultProcessor implements Processor { pipelineProcessor.getPipelineToCallName(ingestDocument) + ']'); } ingestDocumentCopy.executePipeline(pipelineToCall, (result, e) -> { - // do nothing, let the tracking processors throw the exception while recording the path up to the failure - if (e instanceof ElasticsearchException) { - ElasticsearchException elasticsearchException = (ElasticsearchException) e; - //else do nothing, let the tracking processors throw the exception while recording the path up to the failure - if (elasticsearchException.getCause() instanceof IllegalStateException) { - if (ignoreFailure) { - processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getType(), pipelineProcessor.getTag(), - pipelineProcessor.getDescription(), new IngestDocument(ingestDocument), e, conditionalWithResult)); - } else { - processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getType(), pipelineProcessor.getTag(), - pipelineProcessor.getDescription(), e, conditionalWithResult)); - } - handler.accept(null, elasticsearchException); + // special handling for pipeline cycle errors + if (e instanceof ElasticsearchException && + e.getCause() instanceof IllegalStateException && + e.getCause().getMessage().startsWith(PIPELINE_CYCLE_ERROR_MESSAGE)) { + if (ignoreFailure) { + processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getType(), pipelineProcessor.getTag(), + pipelineProcessor.getDescription(), new IngestDocument(ingestDocument), e, conditionalWithResult)); + } else { + processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getType(), pipelineProcessor.getTag(), + pipelineProcessor.getDescription(), e, conditionalWithResult)); } + handler.accept(null, e); } else { //now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList); diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index 2d5a6f184aa..62289612750 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -512,6 +512,43 @@ public class TrackingResultProcessorTests extends ESTestCase { assertThat(resultList.get(4).getProcessorTag(), nullValue()); } + public void testActualPipelineProcessorWithUnhandledFailure() throws Exception { + String pipelineId = "pipeline1"; + IngestService ingestService = createIngestService(); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("name", pipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + IllegalStateException exception = new IllegalStateException("Not a pipeline cycle error"); + + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> ingestDocument.setFieldValue(key1, randomInt())), + new TestProcessor(ingestDocument -> { throw exception; })) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, null, pipelineConfig); + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); + + trackingProcessor.execute(ingestDocument, (result, e) -> {}); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(), + actualProcessor.getDescription(), ingestDocument, null); + expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId); + + verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId); + + assertThat(resultList.size(), equalTo(3)); + assertNull(resultList.get(0).getConditionalWithResult()); + assertThat(resultList.get(0).getType(), equalTo("pipeline")); + assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); + assertThat(resultList.get(2).getFailure(), equalTo(exception)); + } + public void testActualPipelineProcessorWithCycle() throws Exception { String pipelineId1 = "pipeline1"; String pipelineId2 = "pipeline2";