[7.x] Enhance the ingest node simulate verbose output (#60433) (#60678)

This commit enhances the verbose output for the
`_ingest/pipeline/_simulate?verbose` api. Specifically
this adds the following:
* the pipeline processor is now included in the output
* the conditional (if) and result is now included in the output iff it was defined
* a status field is always displayed. the possible values of status are
  * `success` - if the processor ran with out errors
  * `error` - if the processor ran but threw an error that was not ingored
  * `error_ignored` - if the processor ran but threw an error that was ingored
  * `skipped` - if the process did not run (currently only possible if the if condition evaluates to false)
  * `dropped` - if the the `drop` processor ran and dropped the document
* a `processor_type` field for the type of processor (e.g. set, rename, etc.)
* throw a better error if trying to simulate with a pipeline that does not exist

closes #56004
This commit is contained in:
Jake Landis 2020-08-27 16:53:09 -05:00 committed by GitHub
parent 1bfebd54ea
commit d2e5f2f532
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 514 additions and 217 deletions

View File

@ -342,84 +342,93 @@ The API returns the following response:
[source,console-result] [source,console-result]
---- ----
{ {
"docs": [ "docs": [
{ {
"processor_results": [ "processor_results": [
{ {
"doc": { "processor_type": "set",
"_id": "id", "status": "success",
"_index": "index", "doc": {
"_type": "_doc", "_index": "index",
"_source": { "_type": "_doc",
"field2": "_value2", "_id": "id",
"foo": "bar" "_source": {
}, "field2": "_value2",
"_ingest": { "foo": "bar"
"timestamp": "2017-05-04T22:46:09.674Z",
"pipeline": "_simulate_pipeline"
}
}
}, },
{ "_ingest": {
"doc": { "pipeline": "_simulate_pipeline",
"_id": "id", "timestamp": "2020-07-30T01:21:24.251836Z"
"_index": "index",
"_type": "_doc",
"_source": {
"field3": "_value3",
"field2": "_value2",
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.675Z",
"pipeline": "_simulate_pipeline"
}
}
} }
] }
}, },
{ {
"processor_results": [ "processor_type": "set",
{ "status": "success",
"doc": { "doc": {
"_id": "id", "_index": "index",
"_index": "index", "_type": "_doc",
"_type": "_doc", "_id": "id",
"_source": { "_source": {
"field2": "_value2", "field3": "_value3",
"foo": "rab" "field2": "_value2",
}, "foo": "bar"
"_ingest": {
"timestamp": "2017-05-04T22:46:09.676Z",
"pipeline": "_simulate_pipeline"
}
}
}, },
{ "_ingest": {
"doc": { "pipeline": "_simulate_pipeline",
"_id": "id", "timestamp": "2020-07-30T01:21:24.251836Z"
"_index": "index",
"_type": "_doc",
"_source": {
"field3": "_value3",
"field2": "_value2",
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.677Z",
"pipeline": "_simulate_pipeline"
}
}
} }
] }
} }
] ]
},
{
"processor_results": [
{
"processor_type": "set",
"status": "success",
"doc": {
"_index": "index",
"_type": "_doc",
"_id": "id",
"_source": {
"field2": "_value2",
"foo": "rab"
},
"_ingest": {
"pipeline": "_simulate_pipeline",
"timestamp": "2020-07-30T01:21:24.251863Z"
}
}
},
{
"processor_type": "set",
"status": "success",
"doc": {
"_index": "index",
"_type": "_doc",
"_id": "id",
"_source": {
"field3": "_value3",
"field2": "_value2",
"foo": "rab"
},
"_ingest": {
"pipeline": "_simulate_pipeline",
"timestamp": "2020-07-30T01:21:24.251863Z"
}
}
}
]
}
]
} }
---- ----
// TESTRESPONSE[s/"2017-05-04T22:46:09.674Z"/$body.docs.0.processor_results.0.doc._ingest.timestamp/] // TESTRESPONSE[s/"2020-07-30T01:21:24.251836Z"/$body.docs.0.processor_results.0.doc._ingest.timestamp/]
// TESTRESPONSE[s/"2017-05-04T22:46:09.675Z"/$body.docs.0.processor_results.1.doc._ingest.timestamp/] // TESTRESPONSE[s/"2020-07-30T01:21:24.251836Z"/$body.docs.0.processor_results.1.doc._ingest.timestamp/]
// TESTRESPONSE[s/"2017-05-04T22:46:09.676Z"/$body.docs.1.processor_results.0.doc._ingest.timestamp/] // TESTRESPONSE[s/"2020-07-30T01:21:24.251863Z"/$body.docs.1.processor_results.0.doc._ingest.timestamp/]
// TESTRESPONSE[s/"2017-05-04T22:46:09.677Z"/$body.docs.1.processor_results.1.doc._ingest.timestamp/] // TESTRESPONSE[s/"2020-07-30T01:21:24.251863Z"/$body.docs.1.processor_results.1.doc._ingest.timestamp/]
//// ////
[source,console] [source,console]

View File

@ -559,11 +559,15 @@ teardown:
- match: { docs.0.processor_results.0.tag: "setstatus-1" } - match: { docs.0.processor_results.0.tag: "setstatus-1" }
- match: { docs.0.processor_results.0.doc._source.field1: "123.42 400 <foo>" } - match: { docs.0.processor_results.0.doc._source.field1: "123.42 400 <foo>" }
- match: { docs.0.processor_results.0.doc._source.status: 200 } - match: { docs.0.processor_results.0.doc._source.status: 200 }
- match: { docs.0.processor_results.0.status: "success" }
- match: { docs.0.processor_results.0.processor_type: "set" }
- match: { docs.0.processor_results.1.tag: "rename-1" } - match: { docs.0.processor_results.1.tag: "rename-1" }
- match: { docs.0.processor_results.1.ignored_error.error.type: "illegal_argument_exception" } - match: { docs.0.processor_results.1.ignored_error.error.type: "illegal_argument_exception" }
- match: { docs.0.processor_results.1.ignored_error.error.reason: "field [foofield] doesn't exist" } - match: { docs.0.processor_results.1.ignored_error.error.reason: "field [foofield] doesn't exist" }
- match: { docs.0.processor_results.1.doc._source.field1: "123.42 400 <foo>" } - match: { docs.0.processor_results.1.doc._source.field1: "123.42 400 <foo>" }
- match: { docs.0.processor_results.1.doc._source.status: 200 } - match: { docs.0.processor_results.1.doc._source.status: 200 }
- match: { docs.0.processor_results.1.status: "error_ignored" }
- match: { docs.0.processor_results.1.processor_type: "rename" }
--- ---
"Test verbose simulate with ignore_failure and no exception thrown": "Test verbose simulate with ignore_failure and no exception thrown":
@ -605,11 +609,16 @@ teardown:
} }
- length: { docs: 1 } - length: { docs: 1 }
- length: { docs.0.processor_results: 2 } - length: { docs.0.processor_results: 2 }
- length: { docs.0.processor_results.0: 4 }
- match: { docs.0.processor_results.0.tag: "setstatus-1" } - match: { docs.0.processor_results.0.tag: "setstatus-1" }
- match: { docs.0.processor_results.0.status: "success" }
- match: { docs.0.processor_results.0.processor_type: "set" }
- match: { docs.0.processor_results.0.doc._source.field1: "123.42 400 <foo>" } - match: { docs.0.processor_results.0.doc._source.field1: "123.42 400 <foo>" }
- match: { docs.0.processor_results.0.doc._source.status: 200 } - match: { docs.0.processor_results.0.doc._source.status: 200 }
- length: { docs.0.processor_results.1: 2 } - length: { docs.0.processor_results.1: 4 }
- match: { docs.0.processor_results.1.tag: "rename-1" } - match: { docs.0.processor_results.1.tag: "rename-1" }
- match: { docs.0.processor_results.1.status: "success" }
- match: { docs.0.processor_results.1.processor_type: "rename" }
- match: { docs.0.processor_results.1.doc._source.new_status: 200 } - match: { docs.0.processor_results.1.doc._source.new_status: 200 }
--- ---
@ -725,7 +734,8 @@ teardown:
{ {
"set": { "set": {
"field": "pipeline0", "field": "pipeline0",
"value": true "value": true,
"description" : "first_set"
} }
}, },
{ {
@ -747,16 +757,25 @@ teardown:
] ]
} }
- length: { docs: 1 } - length: { docs: 1 }
- length: { docs.0.processor_results: 3 } - length: { docs.0.processor_results: 5 }
- match: { docs.0.processor_results.0.doc._source.pipeline0: true } - match: { docs.0.processor_results.0.doc._source.pipeline0: true }
- match: { docs.0.processor_results.0.status: "success" }
- match: { docs.0.processor_results.0.processor_type: "set" }
- match: { docs.0.processor_results.0.description: "first_set" }
- is_false: docs.0.processor_results.0.doc._source.pipeline1 - is_false: docs.0.processor_results.0.doc._source.pipeline1
- is_false: docs.0.processor_results.0.doc._source.pipeline2 - is_false: docs.0.processor_results.0.doc._source.pipeline2
- match: { docs.0.processor_results.1.doc._source.pipeline0: true } - match: { docs.0.processor_results.1.doc: null }
- match: { docs.0.processor_results.1.doc._source.pipeline1: true } - match: { docs.0.processor_results.1.status: "success" }
- is_false: docs.0.processor_results.1.doc._source.pipeline2 - match: { docs.0.processor_results.1.processor_type: "pipeline" }
- match: { docs.0.processor_results.2.doc._source.pipeline0: true } - match: { docs.0.processor_results.2.doc._source.pipeline0: true }
- match: { docs.0.processor_results.2.doc._source.pipeline1: true } - match: { docs.0.processor_results.2.doc._source.pipeline1: true }
- match: { docs.0.processor_results.2.doc._source.pipeline2: true } - is_false: docs.0.processor_results.2.doc._source.pipeline2
- match: { docs.0.processor_results.3.doc: null }
- match: { docs.0.processor_results.3.status: "success" }
- match: { docs.0.processor_results.3.processor_type: "pipeline" }
- match: { docs.0.processor_results.4.doc._source.pipeline0: true }
- match: { docs.0.processor_results.4.doc._source.pipeline1: true }
- match: { docs.0.processor_results.4.doc._source.pipeline2: true }
--- ---
"Test verbose simulate with true conditional and on failure": "Test verbose simulate with true conditional and on failure":
@ -817,19 +836,27 @@ teardown:
- length: { docs.0.processor_results: 4 } - length: { docs.0.processor_results: 4 }
- match: { docs.0.processor_results.0.tag: "gunna_fail" } - match: { docs.0.processor_results.0.tag: "gunna_fail" }
- match: { docs.0.processor_results.0.error.reason: "field [foo1] doesn't exist" } - match: { docs.0.processor_results.0.error.reason: "field [foo1] doesn't exist" }
- match: { docs.0.processor_results.0.status: "error" }
- match: { docs.0.processor_results.0.processor_type: "rename" }
- match: { docs.0.processor_results.1.tag: "failed1" } - match: { docs.0.processor_results.1.tag: "failed1" }
- match: { docs.0.processor_results.1.doc._source.failed1: "failed1" } - match: { docs.0.processor_results.1.doc._source.failed1: "failed1" }
- match: { docs.0.processor_results.1.doc._ingest.on_failure_processor_tag: "gunna_fail" } - match: { docs.0.processor_results.1.doc._ingest.on_failure_processor_tag: "gunna_fail" }
- match: { docs.0.processor_results.1.status: "success" }
- match: { docs.0.processor_results.1.processor_type: "set" }
- match: { docs.0.processor_results.2.tag: "gunna_fail_again" } - match: { docs.0.processor_results.2.tag: "gunna_fail_again" }
- match: { docs.0.processor_results.2.error.reason: "field [foo2] doesn't exist" } - match: { docs.0.processor_results.2.error.reason: "field [foo2] doesn't exist" }
- match: { docs.0.processor_results.2.status: "error" }
- match: { docs.0.processor_results.2.processor_type: "rename" }
- match: { docs.0.processor_results.3.tag: "failed2" } - match: { docs.0.processor_results.3.tag: "failed2" }
- match: { docs.0.processor_results.3.doc._source.failed1: "failed1" } - match: { docs.0.processor_results.3.doc._source.failed1: "failed1" }
- match: { docs.0.processor_results.3.doc._source.failed2: "failed2" } - match: { docs.0.processor_results.3.doc._source.failed2: "failed2" }
- match: { docs.0.processor_results.3.doc._ingest.on_failure_processor_tag: "gunna_fail_again" } - match: { docs.0.processor_results.3.doc._ingest.on_failure_processor_tag: "gunna_fail_again" }
- match: { docs.0.processor_results.3.status: "success" }
- match: { docs.0.processor_results.3.processor_type: "set" }
--- ---
"Test simulate with provided pipeline definition with tag and description in processors": "Test simulate with pipeline with conditional and skipped and dropped":
- do: - do:
ingest.simulate: ingest.simulate:
verbose: true verbose: true
@ -845,6 +872,16 @@ teardown:
"field" : "field2", "field" : "field2",
"value" : "_value" "value" : "_value"
} }
},
{
"drop" : {
"if": "false"
}
},
{
"drop" : {
"if": "true"
}
} }
] ]
}, },
@ -859,7 +896,43 @@ teardown:
] ]
} }
- length: { docs: 1 } - length: { docs: 1 }
- length: { docs.0.processor_results: 1 } - length: { docs.0.processor_results: 3 }
- match: { docs.0.processor_results.0.doc._source.field2: "_value" } - match: { docs.0.processor_results.0.doc._source.field2: "_value" }
- match: { docs.0.processor_results.0.description: "processor_description" } - match: { docs.0.processor_results.0.description: "processor_description" }
- match: { docs.0.processor_results.0.tag: "processor_tag" } - match: { docs.0.processor_results.0.tag: "processor_tag" }
- match: { docs.0.processor_results.0.status: "success" }
- match: { docs.0.processor_results.0.processor_type: "set" }
- match: { docs.0.processor_results.1.status: "skipped" }
- match: { docs.0.processor_results.1.processor_type: "drop" }
- match: { docs.0.processor_results.1.if.condition: "false" }
- match: { docs.0.processor_results.1.if.result: false }
- match: { docs.0.processor_results.2.status: "dropped" }
- match: { docs.0.processor_results.2.processor_type: "drop" }
- match: { docs.0.processor_results.2.if.condition: "true" }
- match: { docs.0.processor_results.2.if.result: true }
---
"Test simulate with provided pipeline that does not exist":
- do:
catch: bad_request
ingest.simulate:
verbose: true
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"pipeline": {
"name": "____pipeline_doesnot_exist___"
}
}
]
},
"docs": [
{
"_source": {}
}
]
}
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [____pipeline_doesnot_exist___]" }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
@ -32,6 +33,7 @@ import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestDocument;
import java.io.IOException; import java.io.IOException;
import java.util.Locale;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
@ -39,10 +41,33 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
public class SimulateProcessorResult implements Writeable, ToXContentObject { public class SimulateProcessorResult implements Writeable, ToXContentObject {
private static final String IGNORED_ERROR_FIELD = "ignored_error"; private static final String IGNORED_ERROR_FIELD = "ignored_error";
private static final String STATUS_FIELD = "status";
private static final String TYPE_FIELD = "processor_type";
private static final String CONDITION_FIELD = "condition";
private static final String RESULT_FIELD = "result";
enum Status {
SUCCESS,
ERROR,
ERROR_IGNORED,
SKIPPED,
DROPPED;
@Override
public String toString() {
return this.name().toLowerCase(Locale.ROOT);
}
public static Status fromString(String string) {
return Status.valueOf(string.toUpperCase(Locale.ROOT));
}
}
private final String type;
private final String processorTag; private final String processorTag;
private final String description; private final String description;
private final WriteableIngestDocument ingestDocument; private final WriteableIngestDocument ingestDocument;
private final Exception failure; private final Exception failure;
private final Tuple<String, Boolean> conditionalWithResult;
private static final ConstructingObjectParser<ElasticsearchException, Void> IGNORED_ERROR_PARSER = private static final ConstructingObjectParser<ElasticsearchException, Void> IGNORED_ERROR_PARSER =
new ConstructingObjectParser<>( new ConstructingObjectParser<>(
@ -58,26 +83,51 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
); );
} }
private static final ConstructingObjectParser<Tuple<String, Boolean>, Void> IF_CONDITION_PARSER =
new ConstructingObjectParser<>(
"if_condition_parser",
true,
a -> {
String condition = a[0] == null ? null : (String) a[0];
Boolean result = a[1] == null ? null : (Boolean) a[1];
return new Tuple<>(condition, result);
}
);
static {
IF_CONDITION_PARSER.declareString(optionalConstructorArg(), new ParseField(CONDITION_FIELD));
IF_CONDITION_PARSER.declareBoolean(optionalConstructorArg(), new ParseField(RESULT_FIELD));
}
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<SimulateProcessorResult, Void> PARSER = public static final ConstructingObjectParser<SimulateProcessorResult, Void> PARSER =
new ConstructingObjectParser<>( new ConstructingObjectParser<>(
"simulate_processor_result", "simulate_processor_result",
true, true,
a -> { a -> {
String processorTag = a[0] == null ? null : (String)a[0]; String type = (String) a[0];
String description = a[1] == null ? null : (String)a[1]; String processorTag = a[1] == null ? null : (String)a[1];
IngestDocument document = a[2] == null ? null : ((WriteableIngestDocument)a[2]).getIngestDocument(); String description = a[2] == null ? null : (String)a[2];
Tuple<String, Boolean> conditionalWithResult = a[3] == null ? null : (Tuple<String, Boolean>)a[3];
IngestDocument document = a[4] == null ? null : ((WriteableIngestDocument)a[4]).getIngestDocument();
Exception failure = null; Exception failure = null;
if (a[3] != null) { if (a[5] != null) {
failure = (ElasticsearchException)a[3]; failure = (ElasticsearchException)a[5];
} else if (a[4] != null) { } else if (a[6] != null) {
failure = (ElasticsearchException)a[4]; failure = (ElasticsearchException)a[6];
} }
return new SimulateProcessorResult(processorTag, description, document, failure);
return new SimulateProcessorResult(type, processorTag, description, document, failure, conditionalWithResult);
} }
); );
static { static {
PARSER.declareString(optionalConstructorArg(), new ParseField(TYPE_FIELD));
PARSER.declareString(optionalConstructorArg(), new ParseField(ConfigurationUtils.TAG_KEY)); PARSER.declareString(optionalConstructorArg(), new ParseField(ConfigurationUtils.TAG_KEY));
PARSER.declareString(optionalConstructorArg(), new ParseField(ConfigurationUtils.DESCRIPTION_KEY)); PARSER.declareString(optionalConstructorArg(), new ParseField(ConfigurationUtils.DESCRIPTION_KEY));
PARSER.declareObject(
optionalConstructorArg(),
IF_CONDITION_PARSER,
new ParseField("if")
);
PARSER.declareObject( PARSER.declareObject(
optionalConstructorArg(), optionalConstructorArg(),
WriteableIngestDocument.INGEST_DOC_PARSER, WriteableIngestDocument.INGEST_DOC_PARSER,
@ -95,24 +145,28 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
); );
} }
public SimulateProcessorResult(String processorTag, String description, IngestDocument ingestDocument, public SimulateProcessorResult(String type, String processorTag, String description, IngestDocument ingestDocument,
Exception failure) { Exception failure, Tuple<String, Boolean> conditionalWithResult) {
this.processorTag = processorTag; this.processorTag = processorTag;
this.description = description; this.description = description;
this.ingestDocument = (ingestDocument == null) ? null : new WriteableIngestDocument(ingestDocument); this.ingestDocument = (ingestDocument == null) ? null : new WriteableIngestDocument(ingestDocument);
this.failure = failure; this.failure = failure;
this.conditionalWithResult = conditionalWithResult;
this.type = type;
} }
public SimulateProcessorResult(String processorTag, String description, IngestDocument ingestDocument) { public SimulateProcessorResult(String type, String processorTag, String description, IngestDocument ingestDocument,
this(processorTag, description, ingestDocument, null); Tuple<String, Boolean> conditionalWithResult) {
this(type, processorTag, description, ingestDocument, null, conditionalWithResult);
} }
public SimulateProcessorResult(String processorTag, String description, Exception failure) { public SimulateProcessorResult(String type, String processorTag, String description, Exception failure,
this(processorTag, description, null, failure); Tuple<String, Boolean> conditionalWithResult ) {
this(type, processorTag, description, null, failure, conditionalWithResult);
} }
public SimulateProcessorResult(String processorTag, String description) { public SimulateProcessorResult(String type, String processorTag, String description, Tuple<String, Boolean> conditionalWithResult) {
this(processorTag, description, null, null); this(type, processorTag, description, null, null, conditionalWithResult);
} }
/** /**
@ -127,6 +181,18 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
} else { } else {
this.description = null; this.description = null;
} }
if (in.getVersion().onOrAfter(Version.V_7_10_0)) {
this.type = in.readString();
boolean hasConditional = in.readBoolean();
if (hasConditional) {
this.conditionalWithResult = new Tuple<>(in.readString(), in.readBoolean());
} else{
this.conditionalWithResult = null; //no condition exists
}
} else {
this.conditionalWithResult = null;
this.type = null;
}
} }
@Override @Override
@ -137,6 +203,14 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
if (out.getVersion().onOrAfter(Version.V_7_9_0)) { if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
out.writeOptionalString(description); out.writeOptionalString(description);
} }
if (out.getVersion().onOrAfter(Version.V_7_10_0)) {
out.writeString(type);
out.writeBoolean(conditionalWithResult != null);
if (conditionalWithResult != null) {
out.writeString(conditionalWithResult.v1());
out.writeBoolean(conditionalWithResult.v2());
}
}
} }
public IngestDocument getIngestDocument() { public IngestDocument getIngestDocument() {
@ -158,21 +232,37 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
return description; return description;
} }
public Tuple<String, Boolean> getConditionalWithResult() {
return conditionalWithResult;
}
public String getType() {
return type;
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (processorTag == null && failure == null && ingestDocument == null) { builder.startObject();
builder.nullValue();
return builder; if(type != null){
builder.field(TYPE_FIELD, type);
} }
builder.startObject(); builder.field(STATUS_FIELD, getStatus(type));
if (description != null) {
builder.field(ConfigurationUtils.DESCRIPTION_KEY, description);
}
if (processorTag != null) { if (processorTag != null) {
builder.field(ConfigurationUtils.TAG_KEY, processorTag); builder.field(ConfigurationUtils.TAG_KEY, processorTag);
} }
if (description != null) { if(conditionalWithResult != null){
builder.field(ConfigurationUtils.DESCRIPTION_KEY, description); builder.startObject("if");
builder.field(CONDITION_FIELD, conditionalWithResult.v1());
builder.field(RESULT_FIELD, conditionalWithResult.v2());
builder.endObject();
} }
if (failure != null && ingestDocument != null) { if (failure != null && ingestDocument != null) {
@ -194,4 +284,34 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
public static SimulateProcessorResult fromXContent(XContentParser parser) { public static SimulateProcessorResult fromXContent(XContentParser parser) {
return PARSER.apply(parser, null); return PARSER.apply(parser, null);
} }
Status getStatus(String type) {
//if no condition, or condition passed
if (conditionalWithResult == null || (conditionalWithResult != null && conditionalWithResult.v2())) {
if (failure != null) {
if (ingestDocument == null) {
return Status.ERROR;
} else {
return Status.ERROR_IGNORED;
}
} else if (ingestDocument == null && "pipeline".equals(type) == false) {
return Status.DROPPED;
}
return Status.SUCCESS;
} else { //has condition that failed the check
return Status.SKIPPED;
}
}
@Override
public String toString() {
return "SimulateProcessorResult{" +
"type='" + type + '\'' +
", processorTag='" + processorTag + '\'' +
", description='" + description + '\'' +
", ingestDocument=" + ingestDocument +
", failure=" + failure +
", conditionalWithResult=" + conditionalWithResult +
'}';
}
} }

View File

@ -144,6 +144,10 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
return TYPE; return TYPE;
} }
public String getCondition(){
return condition.getIdOrCode();
}
private static Object wrapUnmodifiable(Object raw) { private static Object wrapUnmodifiable(Object raw) {
// Wraps all mutable types that the JSON parser can create by immutable wrappers. // Wraps all mutable types that the JSON parser can create by immutable wrappers.
// Any inputs not wrapped are assumed to be immutable // Any inputs not wrapped are assumed to be immutable

View File

@ -55,8 +55,11 @@ public class PipelineProcessor extends AbstractProcessor {
} }
Pipeline getPipeline(IngestDocument ingestDocument) { Pipeline getPipeline(IngestDocument ingestDocument) {
String pipelineName = ingestDocument.renderTemplate(this.pipelineTemplate); return ingestService.getPipeline(getPipelineToCallName(ingestDocument));
return ingestService.getPipeline(pipelineName); }
String getPipelineToCallName(IngestDocument ingestDocument){
return ingestDocument.renderTemplate(this.pipelineTemplate);
} }
@Override @Override

View File

@ -21,6 +21,7 @@ package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ingest.SimulateProcessorResult; import org.elasticsearch.action.ingest.SimulateProcessorResult;
import org.elasticsearch.common.collect.Tuple;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -46,11 +47,19 @@ public final class TrackingResultProcessor implements Processor {
@Override @Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) { public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
if (conditionalProcessor != null ) { Tuple<String, Boolean> conditionalWithResult;
if (conditionalProcessor != null) {
if (conditionalProcessor.evaluate(ingestDocument) == false) { if (conditionalProcessor.evaluate(ingestDocument) == false) {
conditionalWithResult = new Tuple<>(conditionalProcessor.getCondition(), Boolean.FALSE);
processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
actualProcessor.getDescription(), conditionalWithResult));
handler.accept(ingestDocument, null); handler.accept(ingestDocument, null);
return; return;
} else {
conditionalWithResult = new Tuple<>(conditionalProcessor.getCondition(), Boolean.TRUE);
} }
} else {
conditionalWithResult = null; //no condition
} }
if (actualProcessor instanceof PipelineProcessor) { if (actualProcessor instanceof PipelineProcessor) {
@ -58,24 +67,32 @@ public final class TrackingResultProcessor implements Processor {
Pipeline pipeline = pipelineProcessor.getPipeline(ingestDocument); Pipeline pipeline = pipelineProcessor.getPipeline(ingestDocument);
//runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines //runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines
IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument); IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument);
ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(ingestDocument), (result, e) -> { Pipeline pipelineToCall = pipelineProcessor.getPipeline(ingestDocument);
if (pipelineToCall == null) {
throw new IllegalArgumentException("Pipeline processor configured for non-existent pipeline [" +
pipelineProcessor.getPipelineToCallName(ingestDocument) + ']');
}
ingestDocumentCopy.executePipeline(pipelineToCall, (result, e) -> {
// do nothing, let the tracking processors throw the exception while recording the path up to the failure // do nothing, let the tracking processors throw the exception while recording the path up to the failure
if (e instanceof ElasticsearchException) { if (e instanceof ElasticsearchException) {
ElasticsearchException elasticsearchException = (ElasticsearchException) e; ElasticsearchException elasticsearchException = (ElasticsearchException) e;
//else do nothing, let the tracking processors throw the exception while recording the path up to the failure //else do nothing, let the tracking processors throw the exception while recording the path up to the failure
if (elasticsearchException.getCause() instanceof IllegalStateException) { if (elasticsearchException.getCause() instanceof IllegalStateException) {
if (ignoreFailure) { if (ignoreFailure) {
processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(), processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getType(), pipelineProcessor.getTag(),
pipelineProcessor.getDescription(), new IngestDocument(ingestDocument), e)); pipelineProcessor.getDescription(), new IngestDocument(ingestDocument), e, conditionalWithResult));
} else { } else {
processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(), processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getType(), pipelineProcessor.getTag(),
pipelineProcessor.getDescription(), e)); pipelineProcessor.getDescription(), e, conditionalWithResult));
} }
handler.accept(null, elasticsearchException); handler.accept(null, elasticsearchException);
} }
} else { } else {
//now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it //now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList); CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList);
//add the pipeline process to the results
processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
actualProcessor.getDescription(), conditionalWithResult));
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor); verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline, handler); ingestDocument.executePipeline(verbosePipeline, handler);
@ -87,21 +104,21 @@ public final class TrackingResultProcessor implements Processor {
actualProcessor.execute(ingestDocument, (result, e) -> { actualProcessor.execute(ingestDocument, (result, e) -> {
if (e != null) { if (e != null) {
if (ignoreFailure) { if (ignoreFailure) {
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
actualProcessor.getDescription(), new IngestDocument(ingestDocument), e)); actualProcessor.getDescription(), new IngestDocument(ingestDocument), e, conditionalWithResult));
} else { } else {
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
actualProcessor.getDescription(), e)); actualProcessor.getDescription(), e, conditionalWithResult));
} }
handler.accept(null, e); handler.accept(null, e);
} else { } else {
if (result != null) { if (result != null) {
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
actualProcessor.getDescription(), new IngestDocument(ingestDocument))); actualProcessor.getDescription(), new IngestDocument(ingestDocument), conditionalWithResult));
handler.accept(result, null); handler.accept(result, null);
} else { } else {
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
actualProcessor.getDescription())); actualProcessor.getDescription(), conditionalWithResult));
handler.accept(null, null); handler.accept(null, null);
} }
} }

View File

@ -36,9 +36,10 @@ public class SimulateDocumentVerboseResultTests extends AbstractXContentTestCase
for (int i = 0; i<numDocs; i++) { for (int i = 0; i<numDocs; i++) {
boolean isSuccessful = !(withFailures && randomBoolean()); boolean isSuccessful = !(withFailures && randomBoolean());
boolean isIgnoredError = withFailures && randomBoolean(); boolean isIgnoredError = withFailures && randomBoolean();
boolean hasCondition = withFailures && randomBoolean();
results.add( results.add(
SimulateProcessorResultTests SimulateProcessorResultTests
.createTestInstance(isSuccessful, isIgnoredError) .createTestInstance(isSuccessful, isIgnoredError, hasCondition)
); );
} }
return new SimulateDocumentVerboseResult(results); return new SimulateDocumentVerboseResult(results);

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.ingest; package org.elasticsearch.action.ingest;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
@ -32,8 +33,8 @@ import java.util.StringJoiner;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.function.Supplier; import java.util.function.Supplier;
import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.elasticsearch.action.ingest.WriteableIngestDocumentTests.createRandomIngestDoc; import static org.elasticsearch.action.ingest.WriteableIngestDocumentTests.createRandomIngestDoc;
import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
@ -45,7 +46,8 @@ public class SimulateProcessorResultTests extends AbstractXContentTestCase<Simul
public void testSerialization() throws IOException { public void testSerialization() throws IOException {
boolean isSuccessful = randomBoolean(); boolean isSuccessful = randomBoolean();
boolean isIgnoredException = randomBoolean(); boolean isIgnoredException = randomBoolean();
SimulateProcessorResult simulateProcessorResult = createTestInstance(isSuccessful, isIgnoredException); boolean hasCondition = randomBoolean();
SimulateProcessorResult simulateProcessorResult = createTestInstance(isSuccessful, isIgnoredException, hasCondition);
BytesStreamOutput out = new BytesStreamOutput(); BytesStreamOutput out = new BytesStreamOutput();
simulateProcessorResult.writeTo(out); simulateProcessorResult.writeTo(out);
@ -73,8 +75,9 @@ public class SimulateProcessorResultTests extends AbstractXContentTestCase<Simul
public void testBWCDescription() throws IOException { public void testBWCDescription() throws IOException {
boolean isSuccessful = randomBoolean(); boolean isSuccessful = randomBoolean();
boolean isIgnoredException = randomBoolean(); boolean isIgnoredException = randomBoolean();
SimulateProcessorResult simulateProcessorResult = createTestInstance(isSuccessful, isIgnoredException); boolean hasCondition = randomBoolean();
SimulateProcessorResult simulateProcessorResult = createTestInstance(isSuccessful, isIgnoredException, hasCondition);
BytesStreamOutput out = new BytesStreamOutput(); BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(VersionUtils.getPreviousVersion(Version.V_7_9_0)); out.setVersion(VersionUtils.getPreviousVersion(Version.V_7_9_0));
simulateProcessorResult.writeTo(out); simulateProcessorResult.writeTo(out);
@ -85,21 +88,23 @@ public class SimulateProcessorResultTests extends AbstractXContentTestCase<Simul
} }
static SimulateProcessorResult createTestInstance(boolean isSuccessful, static SimulateProcessorResult createTestInstance(boolean isSuccessful,
boolean isIgnoredException) { boolean isIgnoredException, boolean hasCondition) {
String type = randomAlphaOfLengthBetween(1, 10);
String processorTag = randomAlphaOfLengthBetween(1, 10); String processorTag = randomAlphaOfLengthBetween(1, 10);
String description = randomAlphaOfLengthBetween(1, 10); String description = randomAlphaOfLengthBetween(1, 10);
Tuple<String, Boolean> conditionWithResult = hasCondition ? new Tuple<>(randomAlphaOfLengthBetween(1, 10), randomBoolean()) : null;
SimulateProcessorResult simulateProcessorResult; SimulateProcessorResult simulateProcessorResult;
if (isSuccessful) { if (isSuccessful) {
IngestDocument ingestDocument = createRandomIngestDoc(); IngestDocument ingestDocument = createRandomIngestDoc();
if (isIgnoredException) { if (isIgnoredException) {
simulateProcessorResult = new SimulateProcessorResult(processorTag, description, ingestDocument, simulateProcessorResult = new SimulateProcessorResult(type, processorTag, description, ingestDocument,
new IllegalArgumentException("test")); new IllegalArgumentException("test"), conditionWithResult);
} else { } else {
simulateProcessorResult = new SimulateProcessorResult(processorTag, description, ingestDocument); simulateProcessorResult = new SimulateProcessorResult(type, processorTag, description, ingestDocument, conditionWithResult);
} }
} else { } else {
simulateProcessorResult = new SimulateProcessorResult(processorTag, description, simulateProcessorResult = new SimulateProcessorResult(type, processorTag, description,
new IllegalArgumentException("test")); new IllegalArgumentException("test"), conditionWithResult);
} }
return simulateProcessorResult; return simulateProcessorResult;
} }
@ -107,13 +112,14 @@ public class SimulateProcessorResultTests extends AbstractXContentTestCase<Simul
private static SimulateProcessorResult createTestInstanceWithFailures() { private static SimulateProcessorResult createTestInstanceWithFailures() {
boolean isSuccessful = randomBoolean(); boolean isSuccessful = randomBoolean();
boolean isIgnoredException = randomBoolean(); boolean isIgnoredException = randomBoolean();
return createTestInstance(isSuccessful, isIgnoredException); boolean hasCondition = randomBoolean();
return createTestInstance(isSuccessful, isIgnoredException, hasCondition);
} }
@Override @Override
protected SimulateProcessorResult createTestInstance() { protected SimulateProcessorResult createTestInstance() {
// we test failures separately since comparing XContent is not possible with failures // we test failures separately since comparing XContent is not possible with failures
return createTestInstance(true, false); return createTestInstance(true, false, true);
} }
@Override @Override
@ -178,4 +184,37 @@ public class SimulateProcessorResultTests extends AbstractXContentTestCase<Simul
getShuffleFieldsExceptions(), getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance, getShuffleFieldsExceptions(), getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance,
this::assertEqualInstances, assertToXContentEquivalence, getToXContentParams()); this::assertEqualInstances, assertToXContentEquivalence, getToXContentParams());
} }
public void testStatus(){
SimulateProcessorResult result;
// conditional returned false
result = new SimulateProcessorResult(null, null, null, createRandomIngestDoc(), null,
new Tuple<>(randomAlphaOfLengthBetween(1, 10), false));
assertEquals(SimulateProcessorResult.Status.SKIPPED, result.getStatus("set"));
// no ingest doc
result = new SimulateProcessorResult(null, null, null, null, null, null);
assertEquals(SimulateProcessorResult.Status.DROPPED, result.getStatus(null));
// no ingest doc - as pipeline processor
result = new SimulateProcessorResult(null, null, null, null, null, null);
assertEquals(SimulateProcessorResult.Status.SUCCESS, result.getStatus("pipeline"));
// failure
result = new SimulateProcessorResult(null, null, null, null, new RuntimeException(""), null);
assertEquals(SimulateProcessorResult.Status.ERROR, result.getStatus("rename"));
// failure, but ignored
result = new SimulateProcessorResult(null, null, null, createRandomIngestDoc(), new RuntimeException(""), null);
assertEquals(SimulateProcessorResult.Status.ERROR_IGNORED, result.getStatus(""));
//success - no conditional
result = new SimulateProcessorResult(null, null, null, createRandomIngestDoc(), null, null);
assertEquals(SimulateProcessorResult.Status.SUCCESS, result.getStatus(null));
//success - conditional true
result = new SimulateProcessorResult(null, null, null, createRandomIngestDoc(), null,
new Tuple<>(randomAlphaOfLengthBetween(1, 10), true));
assertEquals(SimulateProcessorResult.Status.SUCCESS, result.getStatus(null));
}
} }

View File

@ -54,6 +54,8 @@ import static org.mockito.Mockito.when;
public class ConditionalProcessorTests extends ESTestCase { public class ConditionalProcessorTests extends ESTestCase {
private static final String scriptName = "conditionalScript";
public void testChecksCondition() throws Exception { public void testChecksCondition() throws Exception {
String conditionalField = "field1"; String conditionalField = "field1";
String scriptName = "conditionalScript"; String scriptName = "conditionalScript";
@ -114,6 +116,7 @@ public class ConditionalProcessorTests extends ESTestCase {
assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue)); assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue));
assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo"))); assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo")));
assertStats(processor, 0, 0, 0); assertStats(processor, 0, 0, 0);
assertEquals(scriptName, processor.getCondition());
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
ingestDocument.setFieldValue(conditionalField, falseValue); ingestDocument.setFieldValue(conditionalField, falseValue);
@ -148,7 +151,7 @@ public class ConditionalProcessorTests extends ESTestCase {
} }
public void testTypeDeprecation() throws Exception { public void testTypeDeprecation() throws Exception {
String scriptName = "conditionalScript";
ScriptService scriptService = new ScriptService(Settings.builder().build(), ScriptService scriptService = new ScriptService(Settings.builder().build(),
Collections.singletonMap( Collections.singletonMap(
Script.DEFAULT_SCRIPT_LANG, Script.DEFAULT_SCRIPT_LANG,

View File

@ -46,6 +46,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -67,8 +68,8 @@ public class TrackingResultProcessorTests extends ESTestCase {
TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, null, resultList); TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
actualProcessor.getDescription(), ingestDocument); actualProcessor.getDescription(), ingestDocument, null);
assertThat(actualProcessor.getInvokedCounter(), equalTo(1)); assertThat(actualProcessor.getInvokedCounter(), equalTo(1));
assertThat(resultList.size(), equalTo(1)); assertThat(resultList.size(), equalTo(1));
@ -88,8 +89,8 @@ public class TrackingResultProcessorTests extends ESTestCase {
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
assertThat(((IngestProcessorException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage())); assertThat(((IngestProcessorException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage()));
SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getType(), testProcessor.getTag(),
actualProcessor.getDescription(), ingestDocument); actualProcessor.getDescription(), ingestDocument, null);
assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(testProcessor.getInvokedCounter(), equalTo(1));
assertThat(resultList.size(), equalTo(1)); assertThat(resultList.size(), equalTo(1));
assertThat(resultList.get(0).getIngestDocument(), nullValue()); assertThat(resultList.get(0).getIngestDocument(), nullValue());
@ -109,10 +110,10 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getType(), failProcessor.getTag(),
failProcessor.getDescription(), ingestDocument); failProcessor.getDescription(), ingestDocument, null);
SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getType(),
failProcessor.getDescription(), ingestDocument); onFailureProcessor.getTag(), failProcessor.getDescription(), ingestDocument, null);
assertThat(failProcessor.getInvokedCounter(), equalTo(2)); assertThat(failProcessor.getInvokedCounter(), equalTo(2));
assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2)); assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2));
@ -163,18 +164,21 @@ public class TrackingResultProcessorTests extends ESTestCase {
trackingProcessor.execute(ingestDocument, (result, e) -> { trackingProcessor.execute(ingestDocument, (result, e) -> {
}); });
SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getType(), failProcessor.getTag(),
failProcessor.getDescription(), ingestDocument); failProcessor.getDescription(), ingestDocument, null);
SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getType(),
onFailureProcessor.getDescription(), ingestDocument); onFailureProcessor.getTag(), onFailureProcessor.getDescription(), ingestDocument, null);
assertThat(failProcessor.getInvokedCounter(), equalTo(1)); assertThat(failProcessor.getInvokedCounter(), equalTo(1));
assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1)); assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1));
assertThat(resultList.size(), equalTo(2)); assertThat(resultList.size(), equalTo(2));
assertThat(resultList.get(0).getIngestDocument(), nullValue()); assertThat(resultList.get(0).getIngestDocument(), nullValue());
assertThat(resultList.get(0).getFailure(), equalTo(exception)); assertThat(resultList.get(0).getFailure(), equalTo(exception));
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag()));
assertThat(resultList.get(0).getConditionalWithResult().v1(), equalTo(scriptName));
assertThat(resultList.get(0).getConditionalWithResult().v2(), is(Boolean.TRUE));
Map<String, Object> metadata = resultList.get(1).getIngestDocument().getIngestMetadata(); Map<String, Object> metadata = resultList.get(1).getIngestDocument().getIngestMetadata();
assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail"));
@ -194,8 +198,8 @@ public class TrackingResultProcessorTests extends ESTestCase {
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getType(), testProcessor.getTag(),
testProcessor.getDescription(), ingestDocument); testProcessor.getDescription(), ingestDocument, null);
assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(testProcessor.getInvokedCounter(), equalTo(1));
assertThat(resultList.size(), equalTo(1)); assertThat(resultList.size(), equalTo(1));
assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
@ -225,23 +229,25 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor trackingProcessor = decorate(compoundProcessor, null, resultList); CompoundProcessor trackingProcessor = decorate(compoundProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getType(), compoundProcessor.getTag(),
compoundProcessor.getDescription(), ingestDocument); compoundProcessor.getDescription(), ingestDocument, null);
//the step for key 2 is never executed due to conditional and thus not part of the result set assertThat(resultList.size(), equalTo(3));
assertThat(resultList.size(), equalTo(2));
assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); assertTrue(resultList.get(0).getIngestDocument().hasField(key1));
assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); assertFalse(resultList.get(0).getIngestDocument().hasField(key2));
assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); assertFalse(resultList.get(0).getIngestDocument().hasField(key3));
assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); assertThat(resultList.get(1).getConditionalWithResult().v1(), equalTo(scriptName));
assertFalse(resultList.get(1).getIngestDocument().hasField(key2)); assertThat(resultList.get(1).getConditionalWithResult().v2(), is(Boolean.FALSE));
assertTrue(resultList.get(1).getIngestDocument().hasField(key3));
assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); assertTrue(resultList.get(2).getIngestDocument().hasField(key1));
assertThat(resultList.get(1).getFailure(), nullValue()); assertFalse(resultList.get(2).getIngestDocument().hasField(key2));
assertThat(resultList.get(1).getProcessorTag(), nullValue()); assertTrue(resultList.get(2).getIngestDocument().hasField(key3));
assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
assertThat(resultList.get(2).getFailure(), nullValue());
assertThat(resultList.get(2).getProcessorTag(), nullValue());
} }
public void testActualPipelineProcessor() throws Exception { public void testActualPipelineProcessor() throws Exception {
@ -270,24 +276,28 @@ public class TrackingResultProcessorTests extends ESTestCase {
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
actualProcessor.getDescription(), ingestDocument); actualProcessor.getDescription(), ingestDocument, null);
expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId);
verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId);
assertThat(resultList.size(), equalTo(3));
assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); assertThat(resultList.size(), equalTo(4));
assertFalse(resultList.get(0).getIngestDocument().hasField(key2));
assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); assertNull(resultList.get(0).getConditionalWithResult());
assertThat(resultList.get(0).getType(), equalTo("pipeline"));
assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); assertTrue(resultList.get(1).getIngestDocument().hasField(key1));
assertTrue(resultList.get(1).getIngestDocument().hasField(key2)); assertFalse(resultList.get(1).getIngestDocument().hasField(key2));
assertFalse(resultList.get(1).getIngestDocument().hasField(key3)); assertFalse(resultList.get(1).getIngestDocument().hasField(key3));
assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); assertTrue(resultList.get(2).getIngestDocument().hasField(key1));
assertThat(resultList.get(2).getFailure(), nullValue()); assertTrue(resultList.get(2).getIngestDocument().hasField(key2));
assertThat(resultList.get(2).getProcessorTag(), nullValue()); assertFalse(resultList.get(2).getIngestDocument().hasField(key3));
assertThat(resultList.get(3).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
assertThat(resultList.get(3).getFailure(), nullValue());
assertThat(resultList.get(3).getProcessorTag(), nullValue());
} }
public void testActualPipelineProcessorWithTrueConditional() throws Exception { public void testActualPipelineProcessorWithTrueConditional() throws Exception {
@ -320,7 +330,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
randomAlphaOfLength(10), randomAlphaOfLength(10),
null, null,
new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService, new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService,
factory.create(Collections.emptyMap(), null, null, pipelineConfig2)), factory.create(Collections.emptyMap(), "pipeline1", null, pipelineConfig2)),
new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key3, randomInt()); }) new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key3, randomInt()); })
) )
); );
@ -332,33 +342,42 @@ public class TrackingResultProcessorTests extends ESTestCase {
when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1); when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1);
when(ingestService.getPipeline(pipelineId2)).thenReturn(pipeline2); when(ingestService.getPipeline(pipelineId2)).thenReturn(pipeline2);
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, null, pipelineConfig0); PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), "pipeline0", null, pipelineConfig0);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), actualProcessor.getDescription(), ingestDocument, null);
actualProcessor.getDescription(), ingestDocument);
expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1);
verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1);
verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId2); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId2);
assertThat(resultList.size(), equalTo(3));
assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); assertThat(resultList.size(), equalTo(5));
assertFalse(resultList.get(0).getIngestDocument().hasField(key2));
assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); assertNull(resultList.get(0).getConditionalWithResult());
assertThat(resultList.get(0).getType(), equalTo("pipeline"));
assertThat(resultList.get(0).getProcessorTag(), equalTo("pipeline0"));
assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); assertTrue(resultList.get(1).getIngestDocument().hasField(key1));
assertTrue(resultList.get(1).getIngestDocument().hasField(key2)); assertFalse(resultList.get(1).getIngestDocument().hasField(key2));
assertFalse(resultList.get(1).getIngestDocument().hasField(key3)); assertFalse(resultList.get(1).getIngestDocument().hasField(key3));
assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); assertThat(resultList.get(2).getConditionalWithResult().v1(), equalTo(scriptName));
assertThat(resultList.get(2).getFailure(), nullValue()); assertThat(resultList.get(2).getConditionalWithResult().v2(), is(Boolean.TRUE));
assertThat(resultList.get(2).getProcessorTag(), nullValue()); assertThat(resultList.get(2).getType(), equalTo("pipeline"));
assertThat(resultList.get(2).getProcessorTag(), equalTo("pipeline1"));
assertTrue(resultList.get(3).getIngestDocument().hasField(key1));
assertTrue(resultList.get(3).getIngestDocument().hasField(key2));
assertFalse(resultList.get(3).getIngestDocument().hasField(key3));
assertThat(resultList.get(4).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
assertThat(resultList.get(4).getFailure(), nullValue());
assertThat(resultList.get(4).getProcessorTag(), nullValue());
} }
public void testActualPipelineProcessorWithFalseConditional() throws Exception { public void testActualPipelineProcessorWithFalseConditional() throws Exception {
@ -410,26 +429,28 @@ public class TrackingResultProcessorTests extends ESTestCase {
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), actualProcessor.getDescription(), ingestDocument, null);
actualProcessor.getDescription(), ingestDocument);
expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1);
verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1);
verify(ingestService, Mockito.never()).getPipeline(pipelineId2); verify(ingestService, Mockito.never()).getPipeline(pipelineId2);
assertThat(resultList.size(), equalTo(2));
assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); assertThat(resultList.size(), equalTo(4));
assertFalse(resultList.get(0).getIngestDocument().hasField(key2));
assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); assertNull(resultList.get(0).getConditionalWithResult());
assertThat(resultList.get(0).getType(), equalTo("pipeline"));
assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); assertTrue(resultList.get(1).getIngestDocument().hasField(key1));
assertFalse(resultList.get(1).getIngestDocument().hasField(key2)); assertFalse(resultList.get(1).getIngestDocument().hasField(key2));
assertTrue(resultList.get(1).getIngestDocument().hasField(key3)); assertFalse(resultList.get(1).getIngestDocument().hasField(key3));
assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); assertThat(resultList.get(2).getConditionalWithResult().v1(), equalTo(scriptName));
assertThat(resultList.get(1).getFailure(), nullValue()); assertThat(resultList.get(2).getConditionalWithResult().v2(), is(Boolean.FALSE));
assertThat(resultList.get(1).getProcessorTag(), nullValue());
assertThat(resultList.get(3).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
assertThat(resultList.get(3).getFailure(), nullValue());
assertThat(resultList.get(3).getProcessorTag(), nullValue());
} }
public void testActualPipelineProcessorWithHandledFailure() throws Exception { public void testActualPipelineProcessorWithHandledFailure() throws Exception {
@ -464,28 +485,31 @@ public class TrackingResultProcessorTests extends ESTestCase {
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
actualProcessor.getDescription(), ingestDocument); actualProcessor.getDescription(), ingestDocument, null);
expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId);
verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId); verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId);
assertThat(resultList.size(), equalTo(4)); assertThat(resultList.size(), equalTo(5));
assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); assertNull(resultList.get(0).getConditionalWithResult());
assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); assertThat(resultList.get(0).getType(), equalTo("pipeline"));
assertFalse(resultList.get(0).getIngestDocument().hasField(key3));
assertTrue(resultList.get(1).getIngestDocument().hasField(key1));
assertFalse(resultList.get(1).getIngestDocument().hasField(key2));
assertFalse(resultList.get(1).getIngestDocument().hasField(key3));
//failed processor //failed processor
assertNull(resultList.get(1).getIngestDocument()); assertNull(resultList.get(2).getIngestDocument());
assertThat(resultList.get(1).getFailure().getMessage(), equalTo(exception.getMessage())); assertThat(resultList.get(2).getFailure().getMessage(), equalTo(exception.getMessage()));
assertTrue(resultList.get(2).getIngestDocument().hasField(key1)); assertTrue(resultList.get(3).getIngestDocument().hasField(key1));
assertTrue(resultList.get(2).getIngestDocument().hasField(key2)); assertTrue(resultList.get(3).getIngestDocument().hasField(key2));
assertFalse(resultList.get(2).getIngestDocument().hasField(key3)); assertFalse(resultList.get(3).getIngestDocument().hasField(key3));
assertThat(resultList.get(3).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); assertThat(resultList.get(4).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
assertThat(resultList.get(3).getFailure(), nullValue()); assertThat(resultList.get(4).getFailure(), nullValue());
assertThat(resultList.get(3).getProcessorTag(), nullValue()); assertThat(resultList.get(4).getProcessorTag(), nullValue());
} }
public void testActualPipelineProcessorWithCycle() throws Exception { public void testActualPipelineProcessorWithCycle() throws Exception {
@ -536,32 +560,36 @@ public class TrackingResultProcessorTests extends ESTestCase {
); );
when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
// calls the same pipeline twice
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
actualProcessor.getDescription(), ingestDocument); actualProcessor.getDescription(), ingestDocument, null);
expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId);
verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId); verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId);
assertThat(resultList.size(), equalTo(2)); assertThat(resultList.size(), equalTo(4));
assertThat(resultList.get(0).getIngestDocument(), not(equalTo(expectedResult.getIngestDocument()))); assertNull(resultList.get(0).getConditionalWithResult());
assertThat(resultList.get(0).getFailure(), nullValue()); assertThat(resultList.get(0).getType(), equalTo("pipeline"));
assertThat(resultList.get(0).getProcessorTag(), nullValue());
assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); assertThat(resultList.get(1).getIngestDocument(), not(equalTo(expectedResult.getIngestDocument())));
assertThat(resultList.get(1).getFailure(), nullValue()); assertThat(resultList.get(1).getFailure(), nullValue());
assertThat(resultList.get(1).getProcessorTag(), nullValue()); assertThat(resultList.get(1).getProcessorTag(), nullValue());
assertNull(resultList.get(2).getConditionalWithResult());
assertThat(resultList.get(2).getType(), equalTo("pipeline"));
assertThat(resultList.get(3).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
assertThat(resultList.get(3).getFailure(), nullValue());
assertThat(resultList.get(3).getProcessorTag(), nullValue());
//each invocation updates key1 with a random int //each invocation updates key1 with a random int
assertNotEquals(resultList.get(0).getIngestDocument().getSourceAndMetadata().get(key1), assertNotEquals(resultList.get(1).getIngestDocument().getSourceAndMetadata().get(key1),
resultList.get(1).getIngestDocument().getSourceAndMetadata().get(key1)); resultList.get(3).getIngestDocument().getSourceAndMetadata().get(key1));
} }
} }