Handle error conditions when simulating ingest pipelines with verbosity enabled (#63327) (#63484)

This commit is contained in:
Dan Hermann 2020-10-08 09:21:05 -05:00 committed by GitHub
parent 25fc6746cc
commit 85886e71c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 108 additions and 14 deletions

View File

@ -417,6 +417,62 @@ teardown:
- is_true: docs.1.processor_results.1.doc._ingest.timestamp
- is_true: docs.1.processor_results.1.doc._ingest.pipeline
---
"Test verbose simulate with error in pipeline":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"rename" : {
"field" : "does_not_exist",
"target_field" : "_value"
}
}
]
}
- match: { acknowledged: true }
- do:
ingest.simulate:
verbose: true
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"pipeline" : {
"name" : "my_pipeline"
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar",
"bar": "hello"
}
}
]
}
- length: { docs: 1 }
- length: { docs.0.processor_results: 2 }
- match: { docs.0.processor_results.0.processor_type: "pipeline" }
- match: { docs.0.processor_results.0.status: "success" }
- match: { docs.0.processor_results.1.processor_type: "rename" }
- match: { docs.0.processor_results.1.status: "error" }
- match: { docs.0.processor_results.1.error.root_cause.0.type: "illegal_argument_exception" }
- match: { docs.0.processor_results.1.error.root_cause.0.reason: "field [does_not_exist] doesn't exist" }
- match: { docs.0.processor_results.1.error.type: "illegal_argument_exception" }
- match: { docs.0.processor_results.1.error.reason: "field [does_not_exist] doesn't exist" }
---
"Test verbose simulate with on_failure":
- do:

View File

@ -51,6 +51,7 @@ import java.util.function.BiConsumer;
public final class IngestDocument {
public static final String INGEST_KEY = "_ingest";
public static final String PIPELINE_CYCLE_ERROR_MESSAGE = "Cycle detected for pipeline: ";
private static final String INGEST_KEY_PREFIX = INGEST_KEY + ".";
private static final String SOURCE_PREFIX = SourceFieldMapper.NAME + ".";
@ -748,7 +749,7 @@ public final class IngestDocument {
handler.accept(result, e);
});
} else {
handler.accept(null, new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()));
handler.accept(null, new IllegalStateException(PIPELINE_CYCLE_ERROR_MESSAGE + pipeline.getId()));
}
}

View File

@ -27,6 +27,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import static org.elasticsearch.ingest.IngestDocument.PIPELINE_CYCLE_ERROR_MESSAGE;
/**
* Processor to be used within Simulate API to keep track of processors executed in pipeline.
*/
@ -73,11 +75,10 @@ public final class TrackingResultProcessor implements Processor {
pipelineProcessor.getPipelineToCallName(ingestDocument) + ']');
}
ingestDocumentCopy.executePipeline(pipelineToCall, (result, e) -> {
// do nothing, let the tracking processors throw the exception while recording the path up to the failure
if (e instanceof ElasticsearchException) {
ElasticsearchException elasticsearchException = (ElasticsearchException) e;
//else do nothing, let the tracking processors throw the exception while recording the path up to the failure
if (elasticsearchException.getCause() instanceof IllegalStateException) {
// special handling for pipeline cycle errors
if (e instanceof ElasticsearchException &&
e.getCause() instanceof IllegalStateException &&
e.getCause().getMessage().startsWith(PIPELINE_CYCLE_ERROR_MESSAGE)) {
if (ignoreFailure) {
processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getType(), pipelineProcessor.getTag(),
pipelineProcessor.getDescription(), new IngestDocument(ingestDocument), e, conditionalWithResult));
@ -85,8 +86,7 @@ public final class TrackingResultProcessor implements Processor {
processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getType(), pipelineProcessor.getTag(),
pipelineProcessor.getDescription(), e, conditionalWithResult));
}
handler.accept(null, elasticsearchException);
}
handler.accept(null, e);
} else {
//now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList);

View File

@ -512,6 +512,43 @@ public class TrackingResultProcessorTests extends ESTestCase {
assertThat(resultList.get(4).getProcessorTag(), nullValue());
}
public void testActualPipelineProcessorWithUnhandledFailure() throws Exception {
String pipelineId = "pipeline1";
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("name", pipelineId);
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
String key1 = randomAlphaOfLength(10);
IllegalStateException exception = new IllegalStateException("Not a pipeline cycle error");
Pipeline pipeline = new Pipeline(
pipelineId, null, null, new CompoundProcessor(
new TestProcessor(ingestDocument -> ingestDocument.setFieldValue(key1, randomInt())),
new TestProcessor(ingestDocument -> { throw exception; }))
);
when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, null, pipelineConfig);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
trackingProcessor.execute(ingestDocument, (result, e) -> {});
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
actualProcessor.getDescription(), ingestDocument, null);
expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId);
verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId);
assertThat(resultList.size(), equalTo(3));
assertNull(resultList.get(0).getConditionalWithResult());
assertThat(resultList.get(0).getType(), equalTo("pipeline"));
assertTrue(resultList.get(1).getIngestDocument().hasField(key1));
assertThat(resultList.get(2).getFailure(), equalTo(exception));
}
public void testActualPipelineProcessorWithCycle() throws Exception {
String pipelineId1 = "pipeline1";
String pipelineId2 = "pipeline2";