[7.x] Per processor description for verbose simulate (#58207) (#60008)

For ingest node processors a per processor description
was recently added. This commit displays that description
in the verbose output of the pipeline simulation.

related #57906
This commit is contained in:
Jake Landis 2020-07-21 17:32:45 -05:00 committed by GitHub
parent 293cb8d48c
commit 55216dabb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 98 additions and 35 deletions

View File

@ -829,7 +829,7 @@ teardown:
--- ---
"Test simulate with provided pipeline definition with description in processors": "Test simulate with provided pipeline definition with tag and description in processors":
- do: - do:
ingest.simulate: ingest.simulate:
verbose: true verbose: true
@ -841,6 +841,7 @@ teardown:
{ {
"set" : { "set" : {
"description": "processor_description", "description": "processor_description",
"tag": "processor_tag",
"field" : "field2", "field" : "field2",
"value" : "_value" "value" : "_value"
} }
@ -860,3 +861,5 @@ teardown:
- length: { docs: 1 } - length: { docs: 1 }
- length: { docs.0.processor_results: 1 } - length: { docs.0.processor_results: 1 }
- match: { docs.0.processor_results.0.doc._source.field2: "_value" } - match: { docs.0.processor_results.0.doc._source.field2: "_value" }
- match: { docs.0.processor_results.0.description: "processor_description" }
- match: { docs.0.processor_results.0.tag: "processor_tag" }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.ingest; package org.elasticsearch.action.ingest;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -39,6 +40,7 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
private static final String IGNORED_ERROR_FIELD = "ignored_error"; private static final String IGNORED_ERROR_FIELD = "ignored_error";
private final String processorTag; private final String processorTag;
private final String description;
private final WriteableIngestDocument ingestDocument; private final WriteableIngestDocument ingestDocument;
private final Exception failure; private final Exception failure;
@ -62,18 +64,20 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
true, true,
a -> { a -> {
String processorTag = a[0] == null ? null : (String)a[0]; String processorTag = a[0] == null ? null : (String)a[0];
IngestDocument document = a[1] == null ? null : ((WriteableIngestDocument)a[1]).getIngestDocument(); String description = a[1] == null ? null : (String)a[1];
IngestDocument document = a[2] == null ? null : ((WriteableIngestDocument)a[2]).getIngestDocument();
Exception failure = null; Exception failure = null;
if (a[2] != null) { if (a[3] != null) {
failure = (ElasticsearchException)a[2];
} else if (a[3] != null) {
failure = (ElasticsearchException)a[3]; failure = (ElasticsearchException)a[3];
} else if (a[4] != null) {
failure = (ElasticsearchException)a[4];
} }
return new SimulateProcessorResult(processorTag, document, failure); return new SimulateProcessorResult(processorTag, description, document, failure);
} }
); );
static { static {
PARSER.declareString(optionalConstructorArg(), new ParseField(ConfigurationUtils.TAG_KEY)); PARSER.declareString(optionalConstructorArg(), new ParseField(ConfigurationUtils.TAG_KEY));
PARSER.declareString(optionalConstructorArg(), new ParseField(ConfigurationUtils.DESCRIPTION_KEY));
PARSER.declareObject( PARSER.declareObject(
optionalConstructorArg(), optionalConstructorArg(),
WriteableIngestDocument.INGEST_DOC_PARSER, WriteableIngestDocument.INGEST_DOC_PARSER,
@ -91,22 +95,24 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
); );
} }
public SimulateProcessorResult(String processorTag, IngestDocument ingestDocument, Exception failure) { public SimulateProcessorResult(String processorTag, String description, IngestDocument ingestDocument,
Exception failure) {
this.processorTag = processorTag; this.processorTag = processorTag;
this.description = description;
this.ingestDocument = (ingestDocument == null) ? null : new WriteableIngestDocument(ingestDocument); this.ingestDocument = (ingestDocument == null) ? null : new WriteableIngestDocument(ingestDocument);
this.failure = failure; this.failure = failure;
} }
public SimulateProcessorResult(String processorTag, IngestDocument ingestDocument) { public SimulateProcessorResult(String processorTag, String description, IngestDocument ingestDocument) {
this(processorTag, ingestDocument, null); this(processorTag, description, ingestDocument, null);
} }
public SimulateProcessorResult(String processorTag, Exception failure) { public SimulateProcessorResult(String processorTag, String description, Exception failure) {
this(processorTag, null, failure); this(processorTag, description, null, failure);
} }
public SimulateProcessorResult(String processorTag) { public SimulateProcessorResult(String processorTag, String description) {
this(processorTag, null, null); this(processorTag, description, null, null);
} }
/** /**
@ -116,6 +122,11 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
this.processorTag = in.readString(); this.processorTag = in.readString();
this.ingestDocument = in.readOptionalWriteable(WriteableIngestDocument::new); this.ingestDocument = in.readOptionalWriteable(WriteableIngestDocument::new);
this.failure = in.readException(); this.failure = in.readException();
if (in.getVersion().onOrAfter(Version.V_7_9_0)) {
this.description = in.readOptionalString();
} else {
this.description = null;
}
} }
@Override @Override
@ -123,6 +134,9 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
out.writeString(processorTag); out.writeString(processorTag);
out.writeOptionalWriteable(ingestDocument); out.writeOptionalWriteable(ingestDocument);
out.writeException(failure); out.writeException(failure);
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
out.writeOptionalString(description);
}
} }
public IngestDocument getIngestDocument() { public IngestDocument getIngestDocument() {
@ -140,6 +154,10 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
return failure; return failure;
} }
public String getDescription() {
return description;
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (processorTag == null && failure == null && ingestDocument == null) { if (processorTag == null && failure == null && ingestDocument == null) {
@ -153,6 +171,10 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
builder.field(ConfigurationUtils.TAG_KEY, processorTag); builder.field(ConfigurationUtils.TAG_KEY, processorTag);
} }
if (description != null) {
builder.field(ConfigurationUtils.DESCRIPTION_KEY, description);
}
if (failure != null && ingestDocument != null) { if (failure != null && ingestDocument != null) {
builder.startObject(IGNORED_ERROR_FIELD); builder.startObject(IGNORED_ERROR_FIELD);
ElasticsearchException.generateFailureXContent(builder, params, failure, true); ElasticsearchException.generateFailureXContent(builder, params, failure, true);

View File

@ -66,9 +66,10 @@ public final class TrackingResultProcessor implements Processor {
if (elasticsearchException.getCause() instanceof IllegalStateException) { if (elasticsearchException.getCause() instanceof IllegalStateException) {
if (ignoreFailure) { if (ignoreFailure) {
processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(), processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(),
new IngestDocument(ingestDocument), e)); pipelineProcessor.getDescription(), new IngestDocument(ingestDocument), e));
} else { } else {
processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(), e)); processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(),
pipelineProcessor.getDescription(), e));
} }
handler.accept(null, elasticsearchException); handler.accept(null, elasticsearchException);
} }
@ -86,17 +87,21 @@ public final class TrackingResultProcessor implements Processor {
actualProcessor.execute(ingestDocument, (result, e) -> { actualProcessor.execute(ingestDocument, (result, e) -> {
if (e != null) { if (e != null) {
if (ignoreFailure) { if (ignoreFailure) {
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument), e)); processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(),
actualProcessor.getDescription(), new IngestDocument(ingestDocument), e));
} else { } else {
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e)); processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(),
actualProcessor.getDescription(), e));
} }
handler.accept(null, e); handler.accept(null, e);
} else { } else {
if (result != null) { if (result != null) {
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument))); processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(),
actualProcessor.getDescription(), new IngestDocument(ingestDocument)));
handler.accept(result, null); handler.accept(result, null);
} else { } else {
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag())); processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(),
actualProcessor.getDescription()));
handler.accept(null, null); handler.accept(null, null);
} }
} }

View File

@ -19,11 +19,13 @@
package org.elasticsearch.action.ingest; package org.elasticsearch.action.ingest;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.test.AbstractXContentTestCase; import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.test.VersionUtils;
import java.io.IOException; import java.io.IOException;
import java.util.StringJoiner; import java.util.StringJoiner;
@ -50,6 +52,7 @@ public class SimulateProcessorResultTests extends AbstractXContentTestCase<Simul
StreamInput streamInput = out.bytes().streamInput(); StreamInput streamInput = out.bytes().streamInput();
SimulateProcessorResult otherSimulateProcessorResult = new SimulateProcessorResult(streamInput); SimulateProcessorResult otherSimulateProcessorResult = new SimulateProcessorResult(streamInput);
assertThat(otherSimulateProcessorResult.getProcessorTag(), equalTo(simulateProcessorResult.getProcessorTag())); assertThat(otherSimulateProcessorResult.getProcessorTag(), equalTo(simulateProcessorResult.getProcessorTag()));
assertThat(otherSimulateProcessorResult.getDescription(), equalTo(simulateProcessorResult.getDescription()));
if (isSuccessful) { if (isSuccessful) {
assertIngestDocument(otherSimulateProcessorResult.getIngestDocument(), simulateProcessorResult.getIngestDocument()); assertIngestDocument(otherSimulateProcessorResult.getIngestDocument(), simulateProcessorResult.getIngestDocument());
if (isIgnoredException) { if (isIgnoredException) {
@ -67,19 +70,36 @@ public class SimulateProcessorResultTests extends AbstractXContentTestCase<Simul
} }
} }
public void testBWCDescription() throws IOException {
boolean isSuccessful = randomBoolean();
boolean isIgnoredException = randomBoolean();
SimulateProcessorResult simulateProcessorResult = createTestInstance(isSuccessful, isIgnoredException);
BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(VersionUtils.getPreviousVersion(Version.V_7_9_0));
simulateProcessorResult.writeTo(out);
StreamInput in = out.bytes().streamInput();
in.setVersion(VersionUtils.getPreviousVersion(Version.V_7_9_0));
SimulateProcessorResult otherSimulateProcessorResult = new SimulateProcessorResult(in);
assertNull(otherSimulateProcessorResult.getDescription());
}
static SimulateProcessorResult createTestInstance(boolean isSuccessful, static SimulateProcessorResult createTestInstance(boolean isSuccessful,
boolean isIgnoredException) { boolean isIgnoredException) {
String processorTag = randomAlphaOfLengthBetween(1, 10); String processorTag = randomAlphaOfLengthBetween(1, 10);
String description = randomAlphaOfLengthBetween(1, 10);
SimulateProcessorResult simulateProcessorResult; SimulateProcessorResult simulateProcessorResult;
if (isSuccessful) { if (isSuccessful) {
IngestDocument ingestDocument = createRandomIngestDoc(); IngestDocument ingestDocument = createRandomIngestDoc();
if (isIgnoredException) { if (isIgnoredException) {
simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument, new IllegalArgumentException("test")); simulateProcessorResult = new SimulateProcessorResult(processorTag, description, ingestDocument,
new IllegalArgumentException("test"));
} else { } else {
simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument); simulateProcessorResult = new SimulateProcessorResult(processorTag, description, ingestDocument);
} }
} else { } else {
simulateProcessorResult = new SimulateProcessorResult(processorTag, new IllegalArgumentException("test")); simulateProcessorResult = new SimulateProcessorResult(processorTag, description,
new IllegalArgumentException("test"));
} }
return simulateProcessorResult; return simulateProcessorResult;
} }

View File

@ -67,7 +67,8 @@ public class TrackingResultProcessorTests extends ESTestCase {
TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, null, resultList); TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(),
actualProcessor.getDescription(), ingestDocument);
assertThat(actualProcessor.getInvokedCounter(), equalTo(1)); assertThat(actualProcessor.getInvokedCounter(), equalTo(1));
assertThat(resultList.size(), equalTo(1)); assertThat(resultList.size(), equalTo(1));
@ -87,7 +88,8 @@ public class TrackingResultProcessorTests extends ESTestCase {
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
assertThat(((IngestProcessorException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage())); assertThat(((IngestProcessorException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage()));
SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(),
actualProcessor.getDescription(), ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(testProcessor.getInvokedCounter(), equalTo(1));
assertThat(resultList.size(), equalTo(1)); assertThat(resultList.size(), equalTo(1));
assertThat(resultList.get(0).getIngestDocument(), nullValue()); assertThat(resultList.get(0).getIngestDocument(), nullValue());
@ -107,8 +109,10 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList); CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(),
SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); failProcessor.getDescription(), ingestDocument);
SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(),
failProcessor.getDescription(), ingestDocument);
assertThat(failProcessor.getInvokedCounter(), equalTo(2)); assertThat(failProcessor.getInvokedCounter(), equalTo(2));
assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2)); assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2));
@ -159,8 +163,10 @@ public class TrackingResultProcessorTests extends ESTestCase {
trackingProcessor.execute(ingestDocument, (result, e) -> { trackingProcessor.execute(ingestDocument, (result, e) -> {
}); });
SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(),
SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); failProcessor.getDescription(), ingestDocument);
SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(),
onFailureProcessor.getDescription(), ingestDocument);
assertThat(failProcessor.getInvokedCounter(), equalTo(1)); assertThat(failProcessor.getInvokedCounter(), equalTo(1));
assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1)); assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1));
@ -188,7 +194,8 @@ public class TrackingResultProcessorTests extends ESTestCase {
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(),
testProcessor.getDescription(), ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(testProcessor.getInvokedCounter(), equalTo(1));
assertThat(resultList.size(), equalTo(1)); assertThat(resultList.size(), equalTo(1));
assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
@ -218,7 +225,8 @@ public class TrackingResultProcessorTests extends ESTestCase {
CompoundProcessor trackingProcessor = decorate(compoundProcessor, null, resultList); CompoundProcessor trackingProcessor = decorate(compoundProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(),
compoundProcessor.getDescription(), ingestDocument);
//the step for key 2 is never executed due to conditional and thus not part of the result set //the step for key 2 is never executed due to conditional and thus not part of the result set
assertThat(resultList.size(), equalTo(2)); assertThat(resultList.size(), equalTo(2));
@ -262,7 +270,8 @@ public class TrackingResultProcessorTests extends ESTestCase {
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(),
actualProcessor.getDescription(), ingestDocument);
expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId);
verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId);
@ -331,7 +340,8 @@ public class TrackingResultProcessorTests extends ESTestCase {
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(),
actualProcessor.getDescription(), ingestDocument);
expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1);
verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1);
@ -401,7 +411,8 @@ public class TrackingResultProcessorTests extends ESTestCase {
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(),
actualProcessor.getDescription(), ingestDocument);
expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1);
verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1);
@ -453,7 +464,8 @@ public class TrackingResultProcessorTests extends ESTestCase {
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(),
actualProcessor.getDescription(), ingestDocument);
expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId);
verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId); verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId);
@ -530,7 +542,8 @@ public class TrackingResultProcessorTests extends ESTestCase {
trackingProcessor.execute(ingestDocument, (result, e) -> {}); trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(),
actualProcessor.getDescription(), ingestDocument);
expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId); expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId);
verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId); verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId);