parent
8fd01554bc
commit
ed768b101f
|
@ -19,6 +19,7 @@
|
|||
package org.elasticsearch.action.ingest;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
|
@ -29,47 +30,48 @@ import org.elasticsearch.ingest.IngestDocument;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
public class SimulateProcessorResult implements Writeable, ToXContent {
|
||||
class SimulateProcessorResult implements Writeable, ToXContent {
|
||||
private final String processorTag;
|
||||
private final WriteableIngestDocument ingestDocument;
|
||||
private final Exception failure;
|
||||
|
||||
public SimulateProcessorResult(String processorTag, IngestDocument ingestDocument) {
|
||||
SimulateProcessorResult(String processorTag, IngestDocument ingestDocument, Exception failure) {
|
||||
this.processorTag = processorTag;
|
||||
this.ingestDocument = new WriteableIngestDocument(ingestDocument);
|
||||
this.failure = null;
|
||||
this.ingestDocument = (ingestDocument == null) ? null : new WriteableIngestDocument(ingestDocument);
|
||||
this.failure = failure;
|
||||
}
|
||||
|
||||
public SimulateProcessorResult(String processorTag, Exception failure) {
|
||||
this.processorTag = processorTag;
|
||||
this.failure = failure;
|
||||
this.ingestDocument = null;
|
||||
SimulateProcessorResult(String processorTag, IngestDocument ingestDocument) {
|
||||
this(processorTag, ingestDocument, null);
|
||||
}
|
||||
|
||||
SimulateProcessorResult(String processorTag, Exception failure) {
|
||||
this(processorTag, null, failure);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read from a stream.
|
||||
*/
|
||||
public SimulateProcessorResult(StreamInput in) throws IOException {
|
||||
SimulateProcessorResult(StreamInput in) throws IOException {
|
||||
this.processorTag = in.readString();
|
||||
if (in.readBoolean()) {
|
||||
this.failure = in.readException();
|
||||
this.ingestDocument = null;
|
||||
} else {
|
||||
this.ingestDocument = new WriteableIngestDocument(in);
|
||||
this.failure = null;
|
||||
} else {
|
||||
this.ingestDocument = null;
|
||||
}
|
||||
this.failure = in.readException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(processorTag);
|
||||
if (failure == null) {
|
||||
if (ingestDocument == null) {
|
||||
out.writeBoolean(false);
|
||||
ingestDocument.writeTo(out);
|
||||
} else {
|
||||
out.writeBoolean(true);
|
||||
out.writeException(failure);
|
||||
ingestDocument.writeTo(out);
|
||||
}
|
||||
out.writeException(failure);
|
||||
}
|
||||
|
||||
public IngestDocument getIngestDocument() {
|
||||
|
@ -90,14 +92,23 @@ public class SimulateProcessorResult implements Writeable, ToXContent {
|
|||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
|
||||
if (processorTag != null) {
|
||||
builder.field(ConfigurationUtils.TAG_KEY, processorTag);
|
||||
}
|
||||
if (failure == null) {
|
||||
ingestDocument.toXContent(builder, params);
|
||||
} else {
|
||||
|
||||
if (failure != null && ingestDocument != null) {
|
||||
builder.startObject("ignored_error");
|
||||
ElasticsearchException.renderException(builder, params, failure);
|
||||
builder.endObject();
|
||||
} else if (failure != null) {
|
||||
ElasticsearchException.renderException(builder, params, failure);
|
||||
}
|
||||
|
||||
if (ingestDocument != null) {
|
||||
ingestDocument.toXContent(builder, params);
|
||||
}
|
||||
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.elasticsearch.ingest.IngestDocument;
|
|||
import org.elasticsearch.ingest.Processor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -49,7 +48,7 @@ public final class TrackingResultProcessor implements Processor {
|
|||
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument)));
|
||||
} catch (Exception e) {
|
||||
if (ignoreFailure) {
|
||||
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument)));
|
||||
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument), e));
|
||||
} else {
|
||||
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e));
|
||||
}
|
||||
|
|
|
@ -160,7 +160,24 @@ public class SimulateExecutionServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception {
|
||||
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); });
|
||||
RuntimeException exception = new RuntimeException("processor failed");
|
||||
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { throw exception; });
|
||||
CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
|
||||
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor));
|
||||
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
|
||||
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
|
||||
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
|
||||
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(1));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), sameInstance(exception));
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
|
||||
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
|
||||
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(), not(sameInstance(ingestDocument.getSourceAndMetadata())));
|
||||
}
|
||||
|
||||
public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws Exception {
|
||||
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { });
|
||||
CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
|
||||
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor));
|
||||
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
|
||||
|
|
|
@ -37,13 +37,18 @@ public class SimulateProcessorResultTests extends ESTestCase {
|
|||
|
||||
public void testSerialization() throws IOException {
|
||||
String processorTag = randomAsciiOfLengthBetween(1, 10);
|
||||
boolean isFailure = randomBoolean();
|
||||
boolean isSuccessful = randomBoolean();
|
||||
boolean isIgnoredException = randomBoolean();
|
||||
SimulateProcessorResult simulateProcessorResult;
|
||||
if (isFailure) {
|
||||
simulateProcessorResult = new SimulateProcessorResult(processorTag, new IllegalArgumentException("test"));
|
||||
} else {
|
||||
if (isSuccessful) {
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument);
|
||||
if (isIgnoredException) {
|
||||
simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument, new IllegalArgumentException("test"));
|
||||
} else {
|
||||
simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument);
|
||||
}
|
||||
} else {
|
||||
simulateProcessorResult = new SimulateProcessorResult(processorTag, new IllegalArgumentException("test"));
|
||||
}
|
||||
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
|
@ -51,13 +56,20 @@ public class SimulateProcessorResultTests extends ESTestCase {
|
|||
StreamInput streamInput = out.bytes().streamInput();
|
||||
SimulateProcessorResult otherSimulateProcessorResult = new SimulateProcessorResult(streamInput);
|
||||
assertThat(otherSimulateProcessorResult.getProcessorTag(), equalTo(simulateProcessorResult.getProcessorTag()));
|
||||
if (isFailure) {
|
||||
assertThat(simulateProcessorResult.getIngestDocument(), is(nullValue()));
|
||||
if (isSuccessful) {
|
||||
assertIngestDocument(otherSimulateProcessorResult.getIngestDocument(), simulateProcessorResult.getIngestDocument());
|
||||
if (isIgnoredException) {
|
||||
assertThat(otherSimulateProcessorResult.getFailure(), instanceOf(IllegalArgumentException.class));
|
||||
IllegalArgumentException e = (IllegalArgumentException) otherSimulateProcessorResult.getFailure();
|
||||
assertThat(e.getMessage(), equalTo("test"));
|
||||
} else {
|
||||
assertThat(otherSimulateProcessorResult.getFailure(), nullValue());
|
||||
}
|
||||
} else {
|
||||
assertThat(otherSimulateProcessorResult.getIngestDocument(), is(nullValue()));
|
||||
assertThat(otherSimulateProcessorResult.getFailure(), instanceOf(IllegalArgumentException.class));
|
||||
IllegalArgumentException e = (IllegalArgumentException) otherSimulateProcessorResult.getFailure();
|
||||
assertThat(e.getMessage(), equalTo("test"));
|
||||
} else {
|
||||
assertIngestDocument(otherSimulateProcessorResult.getIngestDocument(), simulateProcessorResult.getIngestDocument());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TY
|
|||
import static org.elasticsearch.action.ingest.TrackingResultProcessor.decorate;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
public class TrackingResultProcessorTests extends ESTestCase {
|
||||
|
||||
|
@ -142,7 +143,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
|
|||
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
|
||||
assertThat(resultList.size(), equalTo(1));
|
||||
assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
|
||||
assertThat(resultList.get(0).getFailure(), nullValue());
|
||||
assertThat(resultList.get(0).getFailure(), sameInstance(exception));
|
||||
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -481,7 +481,7 @@
|
|||
- match: { docs.0.processor_results.4.doc._source.status: 200 }
|
||||
|
||||
---
|
||||
"Test verbose simulate with ignore_failure":
|
||||
"Test verbose simulate with ignore_failure and thrown exception":
|
||||
- do:
|
||||
ingest.simulate:
|
||||
verbose: true
|
||||
|
@ -547,5 +547,54 @@
|
|||
- match: { docs.0.processor_results.0.doc._source.field1: "123.42 400 <foo>" }
|
||||
- match: { docs.0.processor_results.0.doc._source.status: 200 }
|
||||
- match: { docs.0.processor_results.1.tag: "rename-1" }
|
||||
- match: { docs.0.processor_results.1.ignored_error.error.type: "illegal_argument_exception" }
|
||||
- match: { docs.0.processor_results.1.ignored_error.error.reason: "field [foofield] doesn't exist" }
|
||||
- match: { docs.0.processor_results.1.doc._source.field1: "123.42 400 <foo>" }
|
||||
- match: { docs.0.processor_results.1.doc._source.status: 200 }
|
||||
|
||||
---
|
||||
"Test verbose simulate with ignore_failure and no exception thrown":
|
||||
- do:
|
||||
ingest.simulate:
|
||||
verbose: true
|
||||
body: >
|
||||
{
|
||||
"pipeline" : {
|
||||
"description": "_description",
|
||||
"processors": [
|
||||
{
|
||||
"set" : {
|
||||
"tag" : "setstatus-1",
|
||||
"field" : "status",
|
||||
"value" : 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"rename" : {
|
||||
"tag" : "rename-1",
|
||||
"field" : "status",
|
||||
"target_field" : "new_status",
|
||||
"ignore_failure": true
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"docs": [
|
||||
{
|
||||
"_index": "index",
|
||||
"_type": "type",
|
||||
"_id": "id",
|
||||
"_source": {
|
||||
"field1": "123.42 400 <foo>"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- length: { docs: 1 }
|
||||
- length: { docs.0.processor_results: 2 }
|
||||
- match: { docs.0.processor_results.0.tag: "setstatus-1" }
|
||||
- match: { docs.0.processor_results.0.doc._source.field1: "123.42 400 <foo>" }
|
||||
- match: { docs.0.processor_results.0.doc._source.status: 200 }
|
||||
- length: { docs.0.processor_results.1: 2 }
|
||||
- match: { docs.0.processor_results.1.tag: "rename-1" }
|
||||
- match: { docs.0.processor_results.1.doc._source.new_status: 200 }
|
||||
|
|
Loading…
Reference in New Issue