diff --git a/docs/reference/ingest/apis/simulate-pipeline.asciidoc b/docs/reference/ingest/apis/simulate-pipeline.asciidoc index 0abb2f2ac84..3c53a61506b 100644 --- a/docs/reference/ingest/apis/simulate-pipeline.asciidoc +++ b/docs/reference/ingest/apis/simulate-pipeline.asciidoc @@ -342,84 +342,93 @@ The API returns the following response: [source,console-result] ---- { - "docs": [ - { - "processor_results": [ - { - "doc": { - "_id": "id", - "_index": "index", - "_type": "_doc", - "_source": { - "field2": "_value2", - "foo": "bar" - }, - "_ingest": { - "timestamp": "2017-05-04T22:46:09.674Z", - "pipeline": "_simulate_pipeline" - } - } + "docs": [ + { + "processor_results": [ + { + "processor_type": "set", + "status": "success", + "doc": { + "_index": "index", + "_type": "_doc", + "_id": "id", + "_source": { + "field2": "_value2", + "foo": "bar" }, - { - "doc": { - "_id": "id", - "_index": "index", - "_type": "_doc", - "_source": { - "field3": "_value3", - "field2": "_value2", - "foo": "bar" - }, - "_ingest": { - "timestamp": "2017-05-04T22:46:09.675Z", - "pipeline": "_simulate_pipeline" - } - } + "_ingest": { + "pipeline": "_simulate_pipeline", + "timestamp": "2020-07-30T01:21:24.251836Z" } - ] - }, - { - "processor_results": [ - { - "doc": { - "_id": "id", - "_index": "index", - "_type": "_doc", - "_source": { - "field2": "_value2", - "foo": "rab" - }, - "_ingest": { - "timestamp": "2017-05-04T22:46:09.676Z", - "pipeline": "_simulate_pipeline" - } - } + } + }, + { + "processor_type": "set", + "status": "success", + "doc": { + "_index": "index", + "_type": "_doc", + "_id": "id", + "_source": { + "field3": "_value3", + "field2": "_value2", + "foo": "bar" }, - { - "doc": { - "_id": "id", - "_index": "index", - "_type": "_doc", - "_source": { - "field3": "_value3", - "field2": "_value2", - "foo": "rab" - }, - "_ingest": { - "timestamp": "2017-05-04T22:46:09.677Z", - "pipeline": "_simulate_pipeline" - } - } + "_ingest": { + "pipeline": "_simulate_pipeline", + "timestamp": "2020-07-30T01:21:24.251836Z" } - ] - } - ] + } + } + ] + }, + { + "processor_results": [ + { + "processor_type": "set", + "status": "success", + "doc": { + "_index": "index", + "_type": "_doc", + "_id": "id", + "_source": { + "field2": "_value2", + "foo": "rab" + }, + "_ingest": { + "pipeline": "_simulate_pipeline", + "timestamp": "2020-07-30T01:21:24.251863Z" + } + } + }, + { + "processor_type": "set", + "status": "success", + "doc": { + "_index": "index", + "_type": "_doc", + "_id": "id", + "_source": { + "field3": "_value3", + "field2": "_value2", + "foo": "rab" + }, + "_ingest": { + "pipeline": "_simulate_pipeline", + "timestamp": "2020-07-30T01:21:24.251863Z" + } + } + } + ] + } + ] } + ---- -// TESTRESPONSE[s/"2017-05-04T22:46:09.674Z"/$body.docs.0.processor_results.0.doc._ingest.timestamp/] -// TESTRESPONSE[s/"2017-05-04T22:46:09.675Z"/$body.docs.0.processor_results.1.doc._ingest.timestamp/] -// TESTRESPONSE[s/"2017-05-04T22:46:09.676Z"/$body.docs.1.processor_results.0.doc._ingest.timestamp/] -// TESTRESPONSE[s/"2017-05-04T22:46:09.677Z"/$body.docs.1.processor_results.1.doc._ingest.timestamp/] +// TESTRESPONSE[s/"2020-07-30T01:21:24.251836Z"/$body.docs.0.processor_results.0.doc._ingest.timestamp/] +// TESTRESPONSE[s/"2020-07-30T01:21:24.251836Z"/$body.docs.0.processor_results.1.doc._ingest.timestamp/] +// TESTRESPONSE[s/"2020-07-30T01:21:24.251863Z"/$body.docs.1.processor_results.0.doc._ingest.timestamp/] +// TESTRESPONSE[s/"2020-07-30T01:21:24.251863Z"/$body.docs.1.processor_results.1.doc._ingest.timestamp/] //// [source,console] diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml index b248bb48891..584f0f9e9f2 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -559,11 +559,15 @@ teardown: - match: { docs.0.processor_results.0.tag: "setstatus-1" } - match: { docs.0.processor_results.0.doc._source.field1: "123.42 400 " } - match: { docs.0.processor_results.0.doc._source.status: 200 } + - match: { docs.0.processor_results.0.status: "success" } + - match: { docs.0.processor_results.0.processor_type: "set" } - match: { docs.0.processor_results.1.tag: "rename-1" } - match: { docs.0.processor_results.1.ignored_error.error.type: "illegal_argument_exception" } - match: { docs.0.processor_results.1.ignored_error.error.reason: "field [foofield] doesn't exist" } - match: { docs.0.processor_results.1.doc._source.field1: "123.42 400 " } - match: { docs.0.processor_results.1.doc._source.status: 200 } + - match: { docs.0.processor_results.1.status: "error_ignored" } + - match: { docs.0.processor_results.1.processor_type: "rename" } --- "Test verbose simulate with ignore_failure and no exception thrown": @@ -605,11 +609,16 @@ teardown: } - length: { docs: 1 } - length: { docs.0.processor_results: 2 } + - length: { docs.0.processor_results.0: 4 } - match: { docs.0.processor_results.0.tag: "setstatus-1" } + - match: { docs.0.processor_results.0.status: "success" } + - match: { docs.0.processor_results.0.processor_type: "set" } - match: { docs.0.processor_results.0.doc._source.field1: "123.42 400 " } - match: { docs.0.processor_results.0.doc._source.status: 200 } - - length: { docs.0.processor_results.1: 2 } + - length: { docs.0.processor_results.1: 4 } - match: { docs.0.processor_results.1.tag: "rename-1" } + - match: { docs.0.processor_results.1.status: "success" } + - match: { docs.0.processor_results.1.processor_type: "rename" } - match: { docs.0.processor_results.1.doc._source.new_status: 200 } --- @@ -725,7 +734,8 @@ teardown: { "set": { "field": "pipeline0", - "value": true + "value": true, + "description" : "first_set" } }, { @@ -747,16 +757,25 @@ teardown: ] } - length: { docs: 1 } -- length: { docs.0.processor_results: 3 } +- length: { docs.0.processor_results: 5 } - match: { docs.0.processor_results.0.doc._source.pipeline0: true } +- match: { docs.0.processor_results.0.status: "success" } +- match: { docs.0.processor_results.0.processor_type: "set" } +- match: { docs.0.processor_results.0.description: "first_set" } - is_false: docs.0.processor_results.0.doc._source.pipeline1 - is_false: docs.0.processor_results.0.doc._source.pipeline2 -- match: { docs.0.processor_results.1.doc._source.pipeline0: true } -- match: { docs.0.processor_results.1.doc._source.pipeline1: true } -- is_false: docs.0.processor_results.1.doc._source.pipeline2 +- match: { docs.0.processor_results.1.doc: null } +- match: { docs.0.processor_results.1.status: "success" } +- match: { docs.0.processor_results.1.processor_type: "pipeline" } - match: { docs.0.processor_results.2.doc._source.pipeline0: true } - match: { docs.0.processor_results.2.doc._source.pipeline1: true } -- match: { docs.0.processor_results.2.doc._source.pipeline2: true } +- is_false: docs.0.processor_results.2.doc._source.pipeline2 +- match: { docs.0.processor_results.3.doc: null } +- match: { docs.0.processor_results.3.status: "success" } +- match: { docs.0.processor_results.3.processor_type: "pipeline" } +- match: { docs.0.processor_results.4.doc._source.pipeline0: true } +- match: { docs.0.processor_results.4.doc._source.pipeline1: true } +- match: { docs.0.processor_results.4.doc._source.pipeline2: true } --- "Test verbose simulate with true conditional and on failure": @@ -817,19 +836,27 @@ teardown: - length: { docs.0.processor_results: 4 } - match: { docs.0.processor_results.0.tag: "gunna_fail" } - match: { docs.0.processor_results.0.error.reason: "field [foo1] doesn't exist" } +- match: { docs.0.processor_results.0.status: "error" } +- match: { docs.0.processor_results.0.processor_type: "rename" } - match: { docs.0.processor_results.1.tag: "failed1" } - match: { docs.0.processor_results.1.doc._source.failed1: "failed1" } - match: { docs.0.processor_results.1.doc._ingest.on_failure_processor_tag: "gunna_fail" } +- match: { docs.0.processor_results.1.status: "success" } +- match: { docs.0.processor_results.1.processor_type: "set" } - match: { docs.0.processor_results.2.tag: "gunna_fail_again" } - match: { docs.0.processor_results.2.error.reason: "field [foo2] doesn't exist" } +- match: { docs.0.processor_results.2.status: "error" } +- match: { docs.0.processor_results.2.processor_type: "rename" } - match: { docs.0.processor_results.3.tag: "failed2" } - match: { docs.0.processor_results.3.doc._source.failed1: "failed1" } - match: { docs.0.processor_results.3.doc._source.failed2: "failed2" } - match: { docs.0.processor_results.3.doc._ingest.on_failure_processor_tag: "gunna_fail_again" } +- match: { docs.0.processor_results.3.status: "success" } +- match: { docs.0.processor_results.3.processor_type: "set" } --- -"Test simulate with provided pipeline definition with tag and description in processors": +"Test simulate with pipeline with conditional and skipped and dropped": - do: ingest.simulate: verbose: true @@ -845,6 +872,16 @@ teardown: "field" : "field2", "value" : "_value" } + }, + { + "drop" : { + "if": "false" + } + }, + { + "drop" : { + "if": "true" + } } ] }, @@ -859,7 +896,43 @@ teardown: ] } - length: { docs: 1 } - - length: { docs.0.processor_results: 1 } + - length: { docs.0.processor_results: 3 } - match: { docs.0.processor_results.0.doc._source.field2: "_value" } - match: { docs.0.processor_results.0.description: "processor_description" } - match: { docs.0.processor_results.0.tag: "processor_tag" } + - match: { docs.0.processor_results.0.status: "success" } + - match: { docs.0.processor_results.0.processor_type: "set" } + - match: { docs.0.processor_results.1.status: "skipped" } + - match: { docs.0.processor_results.1.processor_type: "drop" } + - match: { docs.0.processor_results.1.if.condition: "false" } + - match: { docs.0.processor_results.1.if.result: false } + - match: { docs.0.processor_results.2.status: "dropped" } + - match: { docs.0.processor_results.2.processor_type: "drop" } + - match: { docs.0.processor_results.2.if.condition: "true" } + - match: { docs.0.processor_results.2.if.result: true } +--- +"Test simulate with provided pipeline that does not exist": + - do: + catch: bad_request + ingest.simulate: + verbose: true + body: > + { + "pipeline": { + "description": "_description", + "processors": [ + { + "pipeline": { + "name": "____pipeline_doesnot_exist___" + } + } + ] + }, + "docs": [ + { + "_source": {} + } + ] + } + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [____pipeline_doesnot_exist___]" } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java index b9db1dd3c2c..13fe915c91f 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -32,6 +33,7 @@ import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; import java.io.IOException; +import java.util.Locale; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; @@ -39,10 +41,33 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona public class SimulateProcessorResult implements Writeable, ToXContentObject { private static final String IGNORED_ERROR_FIELD = "ignored_error"; + private static final String STATUS_FIELD = "status"; + private static final String TYPE_FIELD = "processor_type"; + private static final String CONDITION_FIELD = "condition"; + private static final String RESULT_FIELD = "result"; + + enum Status { + SUCCESS, + ERROR, + ERROR_IGNORED, + SKIPPED, + DROPPED; + + @Override + public String toString() { + return this.name().toLowerCase(Locale.ROOT); + } + + public static Status fromString(String string) { + return Status.valueOf(string.toUpperCase(Locale.ROOT)); + } + } + private final String type; private final String processorTag; private final String description; private final WriteableIngestDocument ingestDocument; private final Exception failure; + private final Tuple conditionalWithResult; private static final ConstructingObjectParser IGNORED_ERROR_PARSER = new ConstructingObjectParser<>( @@ -58,26 +83,51 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject { ); } + private static final ConstructingObjectParser, Void> IF_CONDITION_PARSER = + new ConstructingObjectParser<>( + "if_condition_parser", + true, + a -> { + String condition = a[0] == null ? null : (String) a[0]; + Boolean result = a[1] == null ? null : (Boolean) a[1]; + return new Tuple<>(condition, result); + } + ); + static { + IF_CONDITION_PARSER.declareString(optionalConstructorArg(), new ParseField(CONDITION_FIELD)); + IF_CONDITION_PARSER.declareBoolean(optionalConstructorArg(), new ParseField(RESULT_FIELD)); + } + + @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "simulate_processor_result", true, a -> { - String processorTag = a[0] == null ? null : (String)a[0]; - String description = a[1] == null ? null : (String)a[1]; - IngestDocument document = a[2] == null ? null : ((WriteableIngestDocument)a[2]).getIngestDocument(); + String type = (String) a[0]; + String processorTag = a[1] == null ? null : (String)a[1]; + String description = a[2] == null ? null : (String)a[2]; + Tuple conditionalWithResult = a[3] == null ? null : (Tuple)a[3]; + IngestDocument document = a[4] == null ? null : ((WriteableIngestDocument)a[4]).getIngestDocument(); Exception failure = null; - if (a[3] != null) { - failure = (ElasticsearchException)a[3]; - } else if (a[4] != null) { - failure = (ElasticsearchException)a[4]; + if (a[5] != null) { + failure = (ElasticsearchException)a[5]; + } else if (a[6] != null) { + failure = (ElasticsearchException)a[6]; } - return new SimulateProcessorResult(processorTag, description, document, failure); + + return new SimulateProcessorResult(type, processorTag, description, document, failure, conditionalWithResult); } ); static { + PARSER.declareString(optionalConstructorArg(), new ParseField(TYPE_FIELD)); PARSER.declareString(optionalConstructorArg(), new ParseField(ConfigurationUtils.TAG_KEY)); PARSER.declareString(optionalConstructorArg(), new ParseField(ConfigurationUtils.DESCRIPTION_KEY)); + PARSER.declareObject( + optionalConstructorArg(), + IF_CONDITION_PARSER, + new ParseField("if") + ); PARSER.declareObject( optionalConstructorArg(), WriteableIngestDocument.INGEST_DOC_PARSER, @@ -95,24 +145,28 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject { ); } - public SimulateProcessorResult(String processorTag, String description, IngestDocument ingestDocument, - Exception failure) { + public SimulateProcessorResult(String type, String processorTag, String description, IngestDocument ingestDocument, + Exception failure, Tuple conditionalWithResult) { this.processorTag = processorTag; this.description = description; this.ingestDocument = (ingestDocument == null) ? null : new WriteableIngestDocument(ingestDocument); this.failure = failure; + this.conditionalWithResult = conditionalWithResult; + this.type = type; } - public SimulateProcessorResult(String processorTag, String description, IngestDocument ingestDocument) { - this(processorTag, description, ingestDocument, null); + public SimulateProcessorResult(String type, String processorTag, String description, IngestDocument ingestDocument, + Tuple conditionalWithResult) { + this(type, processorTag, description, ingestDocument, null, conditionalWithResult); } - public SimulateProcessorResult(String processorTag, String description, Exception failure) { - this(processorTag, description, null, failure); + public SimulateProcessorResult(String type, String processorTag, String description, Exception failure, + Tuple conditionalWithResult ) { + this(type, processorTag, description, null, failure, conditionalWithResult); } - public SimulateProcessorResult(String processorTag, String description) { - this(processorTag, description, null, null); + public SimulateProcessorResult(String type, String processorTag, String description, Tuple conditionalWithResult) { + this(type, processorTag, description, null, null, conditionalWithResult); } /** @@ -127,6 +181,18 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject { } else { this.description = null; } + if (in.getVersion().onOrAfter(Version.V_7_10_0)) { + this.type = in.readString(); + boolean hasConditional = in.readBoolean(); + if (hasConditional) { + this.conditionalWithResult = new Tuple<>(in.readString(), in.readBoolean()); + } else{ + this.conditionalWithResult = null; //no condition exists + } + } else { + this.conditionalWithResult = null; + this.type = null; + } } @Override @@ -137,6 +203,14 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject { if (out.getVersion().onOrAfter(Version.V_7_9_0)) { out.writeOptionalString(description); } + if (out.getVersion().onOrAfter(Version.V_7_10_0)) { + out.writeString(type); + out.writeBoolean(conditionalWithResult != null); + if (conditionalWithResult != null) { + out.writeString(conditionalWithResult.v1()); + out.writeBoolean(conditionalWithResult.v2()); + } + } } public IngestDocument getIngestDocument() { @@ -158,21 +232,37 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject { return description; } + public Tuple getConditionalWithResult() { + return conditionalWithResult; + } + + public String getType() { + return type; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - if (processorTag == null && failure == null && ingestDocument == null) { - builder.nullValue(); - return builder; + builder.startObject(); + + if(type != null){ + builder.field(TYPE_FIELD, type); } - builder.startObject(); + builder.field(STATUS_FIELD, getStatus(type)); + + if (description != null) { + builder.field(ConfigurationUtils.DESCRIPTION_KEY, description); + } if (processorTag != null) { builder.field(ConfigurationUtils.TAG_KEY, processorTag); } - if (description != null) { - builder.field(ConfigurationUtils.DESCRIPTION_KEY, description); + if(conditionalWithResult != null){ + builder.startObject("if"); + builder.field(CONDITION_FIELD, conditionalWithResult.v1()); + builder.field(RESULT_FIELD, conditionalWithResult.v2()); + builder.endObject(); } if (failure != null && ingestDocument != null) { @@ -194,4 +284,34 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject { public static SimulateProcessorResult fromXContent(XContentParser parser) { return PARSER.apply(parser, null); } + + Status getStatus(String type) { + //if no condition, or condition passed + if (conditionalWithResult == null || (conditionalWithResult != null && conditionalWithResult.v2())) { + if (failure != null) { + if (ingestDocument == null) { + return Status.ERROR; + } else { + return Status.ERROR_IGNORED; + } + } else if (ingestDocument == null && "pipeline".equals(type) == false) { + return Status.DROPPED; + } + return Status.SUCCESS; + } else { //has condition that failed the check + return Status.SKIPPED; + } + } + + @Override + public String toString() { + return "SimulateProcessorResult{" + + "type='" + type + '\'' + + ", processorTag='" + processorTag + '\'' + + ", description='" + description + '\'' + + ", ingestDocument=" + ingestDocument + + ", failure=" + failure + + ", conditionalWithResult=" + conditionalWithResult + + '}'; + } } diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index 9d8d4535cf8..df4e96887df 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -144,6 +144,10 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP return TYPE; } + public String getCondition(){ + return condition.getIdOrCode(); + } + private static Object wrapUnmodifiable(Object raw) { // Wraps all mutable types that the JSON parser can create by immutable wrappers. // Any inputs not wrapped are assumed to be immutable diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index b362259ea03..34754aa7b3e 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -55,8 +55,11 @@ public class PipelineProcessor extends AbstractProcessor { } Pipeline getPipeline(IngestDocument ingestDocument) { - String pipelineName = ingestDocument.renderTemplate(this.pipelineTemplate); - return ingestService.getPipeline(pipelineName); + return ingestService.getPipeline(getPipelineToCallName(ingestDocument)); + } + + String getPipelineToCallName(IngestDocument ingestDocument){ + return ingestDocument.renderTemplate(this.pipelineTemplate); } @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index d2600446ad1..b819230a74b 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -21,6 +21,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ingest.SimulateProcessorResult; +import org.elasticsearch.common.collect.Tuple; import java.util.ArrayList; import java.util.List; @@ -46,11 +47,19 @@ public final class TrackingResultProcessor implements Processor { @Override public void execute(IngestDocument ingestDocument, BiConsumer handler) { - if (conditionalProcessor != null ) { + Tuple conditionalWithResult; + if (conditionalProcessor != null) { if (conditionalProcessor.evaluate(ingestDocument) == false) { + conditionalWithResult = new Tuple<>(conditionalProcessor.getCondition(), Boolean.FALSE); + processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(), + actualProcessor.getDescription(), conditionalWithResult)); handler.accept(ingestDocument, null); return; + } else { + conditionalWithResult = new Tuple<>(conditionalProcessor.getCondition(), Boolean.TRUE); } + } else { + conditionalWithResult = null; //no condition } if (actualProcessor instanceof PipelineProcessor) { @@ -58,24 +67,32 @@ public final class TrackingResultProcessor implements Processor { Pipeline pipeline = pipelineProcessor.getPipeline(ingestDocument); //runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument); - ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(ingestDocument), (result, e) -> { + Pipeline pipelineToCall = pipelineProcessor.getPipeline(ingestDocument); + if (pipelineToCall == null) { + throw new IllegalArgumentException("Pipeline processor configured for non-existent pipeline [" + + pipelineProcessor.getPipelineToCallName(ingestDocument) + ']'); + } + ingestDocumentCopy.executePipeline(pipelineToCall, (result, e) -> { // do nothing, let the tracking processors throw the exception while recording the path up to the failure if (e instanceof ElasticsearchException) { ElasticsearchException elasticsearchException = (ElasticsearchException) e; //else do nothing, let the tracking processors throw the exception while recording the path up to the failure if (elasticsearchException.getCause() instanceof IllegalStateException) { if (ignoreFailure) { - processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(), - pipelineProcessor.getDescription(), new IngestDocument(ingestDocument), e)); + processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getType(), pipelineProcessor.getTag(), + pipelineProcessor.getDescription(), new IngestDocument(ingestDocument), e, conditionalWithResult)); } else { - processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(), - pipelineProcessor.getDescription(), e)); + processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getType(), pipelineProcessor.getTag(), + pipelineProcessor.getDescription(), e, conditionalWithResult)); } handler.accept(null, elasticsearchException); } } else { //now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList); + //add the pipeline process to the results + processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(), + actualProcessor.getDescription(), conditionalWithResult)); Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), verbosePipelineProcessor); ingestDocument.executePipeline(verbosePipeline, handler); @@ -87,21 +104,21 @@ public final class TrackingResultProcessor implements Processor { actualProcessor.execute(ingestDocument, (result, e) -> { if (e != null) { if (ignoreFailure) { - processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), - actualProcessor.getDescription(), new IngestDocument(ingestDocument), e)); + processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(), + actualProcessor.getDescription(), new IngestDocument(ingestDocument), e, conditionalWithResult)); } else { - processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), - actualProcessor.getDescription(), e)); + processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(), + actualProcessor.getDescription(), e, conditionalWithResult)); } handler.accept(null, e); } else { if (result != null) { - processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), - actualProcessor.getDescription(), new IngestDocument(ingestDocument))); + processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(), + actualProcessor.getDescription(), new IngestDocument(ingestDocument), conditionalWithResult)); handler.accept(result, null); } else { - processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), - actualProcessor.getDescription())); + processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(), + actualProcessor.getDescription(), conditionalWithResult)); handler.accept(null, null); } } diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentVerboseResultTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentVerboseResultTests.java index 6b673c49efa..b796a997d55 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentVerboseResultTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentVerboseResultTests.java @@ -36,9 +36,10 @@ public class SimulateDocumentVerboseResultTests extends AbstractXContentTestCase for (int i = 0; i conditionWithResult = hasCondition ? new Tuple<>(randomAlphaOfLengthBetween(1, 10), randomBoolean()) : null; SimulateProcessorResult simulateProcessorResult; if (isSuccessful) { IngestDocument ingestDocument = createRandomIngestDoc(); if (isIgnoredException) { - simulateProcessorResult = new SimulateProcessorResult(processorTag, description, ingestDocument, - new IllegalArgumentException("test")); + simulateProcessorResult = new SimulateProcessorResult(type, processorTag, description, ingestDocument, + new IllegalArgumentException("test"), conditionWithResult); } else { - simulateProcessorResult = new SimulateProcessorResult(processorTag, description, ingestDocument); + simulateProcessorResult = new SimulateProcessorResult(type, processorTag, description, ingestDocument, conditionWithResult); } } else { - simulateProcessorResult = new SimulateProcessorResult(processorTag, description, - new IllegalArgumentException("test")); + simulateProcessorResult = new SimulateProcessorResult(type, processorTag, description, + new IllegalArgumentException("test"), conditionWithResult); } return simulateProcessorResult; } @@ -107,13 +112,14 @@ public class SimulateProcessorResultTests extends AbstractXContentTestCase(randomAlphaOfLengthBetween(1, 10), false)); + assertEquals(SimulateProcessorResult.Status.SKIPPED, result.getStatus("set")); + + // no ingest doc + result = new SimulateProcessorResult(null, null, null, null, null, null); + assertEquals(SimulateProcessorResult.Status.DROPPED, result.getStatus(null)); + + // no ingest doc - as pipeline processor + result = new SimulateProcessorResult(null, null, null, null, null, null); + assertEquals(SimulateProcessorResult.Status.SUCCESS, result.getStatus("pipeline")); + + // failure + result = new SimulateProcessorResult(null, null, null, null, new RuntimeException(""), null); + assertEquals(SimulateProcessorResult.Status.ERROR, result.getStatus("rename")); + + // failure, but ignored + result = new SimulateProcessorResult(null, null, null, createRandomIngestDoc(), new RuntimeException(""), null); + assertEquals(SimulateProcessorResult.Status.ERROR_IGNORED, result.getStatus("")); + + //success - no conditional + result = new SimulateProcessorResult(null, null, null, createRandomIngestDoc(), null, null); + assertEquals(SimulateProcessorResult.Status.SUCCESS, result.getStatus(null)); + + //success - conditional true + result = new SimulateProcessorResult(null, null, null, createRandomIngestDoc(), null, + new Tuple<>(randomAlphaOfLengthBetween(1, 10), true)); + assertEquals(SimulateProcessorResult.Status.SUCCESS, result.getStatus(null)); + } } diff --git a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java index 829c4f2b943..8928491a9a8 100644 --- a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java @@ -54,6 +54,8 @@ import static org.mockito.Mockito.when; public class ConditionalProcessorTests extends ESTestCase { + private static final String scriptName = "conditionalScript"; + public void testChecksCondition() throws Exception { String conditionalField = "field1"; String scriptName = "conditionalScript"; @@ -114,6 +116,7 @@ public class ConditionalProcessorTests extends ESTestCase { assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue)); assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo"))); assertStats(processor, 0, 0, 0); + assertEquals(scriptName, processor.getCondition()); ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument.setFieldValue(conditionalField, falseValue); @@ -148,7 +151,7 @@ public class ConditionalProcessorTests extends ESTestCase { } public void testTypeDeprecation() throws Exception { - String scriptName = "conditionalScript"; + ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap( Script.DEFAULT_SCRIPT_LANG, diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index a66aa815c91..2d5a6f184aa 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -46,6 +46,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.verify; @@ -67,8 +68,8 @@ public class TrackingResultProcessorTests extends ESTestCase { TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); - SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), - actualProcessor.getDescription(), ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(), + actualProcessor.getDescription(), ingestDocument, null); assertThat(actualProcessor.getInvokedCounter(), equalTo(1)); assertThat(resultList.size(), equalTo(1)); @@ -88,8 +89,8 @@ public class TrackingResultProcessorTests extends ESTestCase { trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); assertThat(((IngestProcessorException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage())); - SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), - actualProcessor.getDescription(), ingestDocument); + SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getType(), testProcessor.getTag(), + actualProcessor.getDescription(), ingestDocument, null); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(resultList.size(), equalTo(1)); assertThat(resultList.get(0).getIngestDocument(), nullValue()); @@ -109,10 +110,10 @@ public class TrackingResultProcessorTests extends ESTestCase { CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); - SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), - failProcessor.getDescription(), ingestDocument); - SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), - failProcessor.getDescription(), ingestDocument); + SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getType(), failProcessor.getTag(), + failProcessor.getDescription(), ingestDocument, null); + SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getType(), + onFailureProcessor.getTag(), failProcessor.getDescription(), ingestDocument, null); assertThat(failProcessor.getInvokedCounter(), equalTo(2)); assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2)); @@ -163,18 +164,21 @@ public class TrackingResultProcessorTests extends ESTestCase { trackingProcessor.execute(ingestDocument, (result, e) -> { }); - SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), - failProcessor.getDescription(), ingestDocument); - SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), - onFailureProcessor.getDescription(), ingestDocument); + SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getType(), failProcessor.getTag(), + failProcessor.getDescription(), ingestDocument, null); + SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getType(), + onFailureProcessor.getTag(), onFailureProcessor.getDescription(), ingestDocument, null); assertThat(failProcessor.getInvokedCounter(), equalTo(1)); assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1)); + assertThat(resultList.size(), equalTo(2)); assertThat(resultList.get(0).getIngestDocument(), nullValue()); assertThat(resultList.get(0).getFailure(), equalTo(exception)); assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); + assertThat(resultList.get(0).getConditionalWithResult().v1(), equalTo(scriptName)); + assertThat(resultList.get(0).getConditionalWithResult().v2(), is(Boolean.TRUE)); Map metadata = resultList.get(1).getIngestDocument().getIngestMetadata(); assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); @@ -194,8 +198,8 @@ public class TrackingResultProcessorTests extends ESTestCase { trackingProcessor.execute(ingestDocument, (result, e) -> {}); - SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), - testProcessor.getDescription(), ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getType(), testProcessor.getTag(), + testProcessor.getDescription(), ingestDocument, null); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(resultList.size(), equalTo(1)); assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); @@ -225,23 +229,25 @@ public class TrackingResultProcessorTests extends ESTestCase { CompoundProcessor trackingProcessor = decorate(compoundProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); - SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), - compoundProcessor.getDescription(), ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getType(), compoundProcessor.getTag(), + compoundProcessor.getDescription(), ingestDocument, null); - //the step for key 2 is never executed due to conditional and thus not part of the result set - assertThat(resultList.size(), equalTo(2)); + assertThat(resultList.size(), equalTo(3)); assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); - assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); - assertFalse(resultList.get(1).getIngestDocument().hasField(key2)); - assertTrue(resultList.get(1).getIngestDocument().hasField(key3)); + assertThat(resultList.get(1).getConditionalWithResult().v1(), equalTo(scriptName)); + assertThat(resultList.get(1).getConditionalWithResult().v2(), is(Boolean.FALSE)); - assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); - assertThat(resultList.get(1).getFailure(), nullValue()); - assertThat(resultList.get(1).getProcessorTag(), nullValue()); + assertTrue(resultList.get(2).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(2).getIngestDocument().hasField(key2)); + assertTrue(resultList.get(2).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(2).getFailure(), nullValue()); + assertThat(resultList.get(2).getProcessorTag(), nullValue()); } public void testActualPipelineProcessor() throws Exception { @@ -270,24 +276,28 @@ public class TrackingResultProcessorTests extends ESTestCase { trackingProcessor.execute(ingestDocument, (result, e) -> {}); - SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), - actualProcessor.getDescription(), ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(), + actualProcessor.getDescription(), ingestDocument, null); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId); - assertThat(resultList.size(), equalTo(3)); - assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); - assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); - assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + assertThat(resultList.size(), equalTo(4)); + + assertNull(resultList.get(0).getConditionalWithResult()); + assertThat(resultList.get(0).getType(), equalTo("pipeline")); assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); - assertTrue(resultList.get(1).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key2)); assertFalse(resultList.get(1).getIngestDocument().hasField(key3)); - assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); - assertThat(resultList.get(2).getFailure(), nullValue()); - assertThat(resultList.get(2).getProcessorTag(), nullValue()); + assertTrue(resultList.get(2).getIngestDocument().hasField(key1)); + assertTrue(resultList.get(2).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(2).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(3).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(3).getFailure(), nullValue()); + assertThat(resultList.get(3).getProcessorTag(), nullValue()); } public void testActualPipelineProcessorWithTrueConditional() throws Exception { @@ -320,7 +330,7 @@ public class TrackingResultProcessorTests extends ESTestCase { randomAlphaOfLength(10), null, new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService, - factory.create(Collections.emptyMap(), null, null, pipelineConfig2)), + factory.create(Collections.emptyMap(), "pipeline1", null, pipelineConfig2)), new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key3, randomInt()); }) ) ); @@ -332,33 +342,42 @@ public class TrackingResultProcessorTests extends ESTestCase { when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1); when(ingestService.getPipeline(pipelineId2)).thenReturn(pipeline2); - PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, null, pipelineConfig0); + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), "pipeline0", null, pipelineConfig0); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); - - SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), - actualProcessor.getDescription(), ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(), + actualProcessor.getDescription(), ingestDocument, null); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId2); - assertThat(resultList.size(), equalTo(3)); - assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); - assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); - assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + assertThat(resultList.size(), equalTo(5)); + + assertNull(resultList.get(0).getConditionalWithResult()); + assertThat(resultList.get(0).getType(), equalTo("pipeline")); + assertThat(resultList.get(0).getProcessorTag(), equalTo("pipeline0")); assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); - assertTrue(resultList.get(1).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key2)); assertFalse(resultList.get(1).getIngestDocument().hasField(key3)); - assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); - assertThat(resultList.get(2).getFailure(), nullValue()); - assertThat(resultList.get(2).getProcessorTag(), nullValue()); + assertThat(resultList.get(2).getConditionalWithResult().v1(), equalTo(scriptName)); + assertThat(resultList.get(2).getConditionalWithResult().v2(), is(Boolean.TRUE)); + assertThat(resultList.get(2).getType(), equalTo("pipeline")); + assertThat(resultList.get(2).getProcessorTag(), equalTo("pipeline1")); + + assertTrue(resultList.get(3).getIngestDocument().hasField(key1)); + assertTrue(resultList.get(3).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(3).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(4).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(4).getFailure(), nullValue()); + assertThat(resultList.get(4).getProcessorTag(), nullValue()); } public void testActualPipelineProcessorWithFalseConditional() throws Exception { @@ -410,26 +429,28 @@ public class TrackingResultProcessorTests extends ESTestCase { trackingProcessor.execute(ingestDocument, (result, e) -> {}); - - SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), - actualProcessor.getDescription(), ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(), + actualProcessor.getDescription(), ingestDocument, null); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1); verify(ingestService, Mockito.never()).getPipeline(pipelineId2); - assertThat(resultList.size(), equalTo(2)); - assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); - assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); - assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + assertThat(resultList.size(), equalTo(4)); + + assertNull(resultList.get(0).getConditionalWithResult()); + assertThat(resultList.get(0).getType(), equalTo("pipeline")); assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); assertFalse(resultList.get(1).getIngestDocument().hasField(key2)); - assertTrue(resultList.get(1).getIngestDocument().hasField(key3)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key3)); - assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); - assertThat(resultList.get(1).getFailure(), nullValue()); - assertThat(resultList.get(1).getProcessorTag(), nullValue()); + assertThat(resultList.get(2).getConditionalWithResult().v1(), equalTo(scriptName)); + assertThat(resultList.get(2).getConditionalWithResult().v2(), is(Boolean.FALSE)); + + assertThat(resultList.get(3).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(3).getFailure(), nullValue()); + assertThat(resultList.get(3).getProcessorTag(), nullValue()); } public void testActualPipelineProcessorWithHandledFailure() throws Exception { @@ -464,28 +485,31 @@ public class TrackingResultProcessorTests extends ESTestCase { trackingProcessor.execute(ingestDocument, (result, e) -> {}); - SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), - actualProcessor.getDescription(), ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(), + actualProcessor.getDescription(), ingestDocument, null); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId); verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId); - assertThat(resultList.size(), equalTo(4)); + assertThat(resultList.size(), equalTo(5)); - assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); - assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); - assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + assertNull(resultList.get(0).getConditionalWithResult()); + assertThat(resultList.get(0).getType(), equalTo("pipeline")); + + assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key3)); //failed processor - assertNull(resultList.get(1).getIngestDocument()); - assertThat(resultList.get(1).getFailure().getMessage(), equalTo(exception.getMessage())); + assertNull(resultList.get(2).getIngestDocument()); + assertThat(resultList.get(2).getFailure().getMessage(), equalTo(exception.getMessage())); - assertTrue(resultList.get(2).getIngestDocument().hasField(key1)); - assertTrue(resultList.get(2).getIngestDocument().hasField(key2)); - assertFalse(resultList.get(2).getIngestDocument().hasField(key3)); + assertTrue(resultList.get(3).getIngestDocument().hasField(key1)); + assertTrue(resultList.get(3).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(3).getIngestDocument().hasField(key3)); - assertThat(resultList.get(3).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); - assertThat(resultList.get(3).getFailure(), nullValue()); - assertThat(resultList.get(3).getProcessorTag(), nullValue()); + assertThat(resultList.get(4).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(4).getFailure(), nullValue()); + assertThat(resultList.get(4).getProcessorTag(), nullValue()); } public void testActualPipelineProcessorWithCycle() throws Exception { @@ -536,32 +560,36 @@ public class TrackingResultProcessorTests extends ESTestCase { ); when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + // calls the same pipeline twice CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor); CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); trackingProcessor.execute(ingestDocument, (result, e) -> {}); - SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), - actualProcessor.getDescription(), ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(), + actualProcessor.getDescription(), ingestDocument, null); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId); verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId); - assertThat(resultList.size(), equalTo(2)); + assertThat(resultList.size(), equalTo(4)); - assertThat(resultList.get(0).getIngestDocument(), not(equalTo(expectedResult.getIngestDocument()))); - assertThat(resultList.get(0).getFailure(), nullValue()); - assertThat(resultList.get(0).getProcessorTag(), nullValue()); + assertNull(resultList.get(0).getConditionalWithResult()); + assertThat(resultList.get(0).getType(), equalTo("pipeline")); - assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(1).getIngestDocument(), not(equalTo(expectedResult.getIngestDocument()))); assertThat(resultList.get(1).getFailure(), nullValue()); assertThat(resultList.get(1).getProcessorTag(), nullValue()); + assertNull(resultList.get(2).getConditionalWithResult()); + assertThat(resultList.get(2).getType(), equalTo("pipeline")); + + assertThat(resultList.get(3).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(3).getFailure(), nullValue()); + assertThat(resultList.get(3).getProcessorTag(), nullValue()); + //each invocation updates key1 with a random int - assertNotEquals(resultList.get(0).getIngestDocument().getSourceAndMetadata().get(key1), - resultList.get(1).getIngestDocument().getSourceAndMetadata().get(key1)); + assertNotEquals(resultList.get(1).getIngestDocument().getSourceAndMetadata().get(key1), + resultList.get(3).getIngestDocument().getSourceAndMetadata().get(key1)); } - - - }