Fix ignore_failure behavior in _simulate?verbose (#18987)

- fix it so that processors with the `ignore_failure` option do not
record their exception in the response
- add more tests to make empty `on_failure`. This now throws an
  exception
This commit is contained in:
Tal Levy 2016-06-21 13:29:53 -07:00 committed by GitHub
parent ea206237e3
commit 28fd684eef
8 changed files with 209 additions and 18 deletions

View File

@ -24,6 +24,7 @@ import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
@ -33,15 +34,12 @@ public final class TrackingResultProcessor implements Processor {
private final Processor actualProcessor;
private final List<SimulateProcessorResult> processorResultList;
private final boolean ignoreFailure;
public TrackingResultProcessor(Processor actualProcessor, List<SimulateProcessorResult> processorResultList) {
public TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List<SimulateProcessorResult> processorResultList) {
this.ignoreFailure = ignoreFailure;
this.processorResultList = processorResultList;
if (actualProcessor instanceof CompoundProcessor) {
CompoundProcessor trackedCompoundProcessor = decorate((CompoundProcessor) actualProcessor, processorResultList);
this.actualProcessor = trackedCompoundProcessor;
} else {
this.actualProcessor = actualProcessor;
}
this.actualProcessor = actualProcessor;
}
@Override
@ -50,7 +48,11 @@ public final class TrackingResultProcessor implements Processor {
actualProcessor.execute(ingestDocument);
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument)));
} catch (Exception e) {
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e));
if (ignoreFailure) {
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument)));
} else {
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e));
}
throw e;
}
}
@ -71,7 +73,7 @@ public final class TrackingResultProcessor implements Processor {
if (processor instanceof CompoundProcessor) {
processors.add(decorate((CompoundProcessor) processor, processorResultList));
} else {
processors.add(new TrackingResultProcessor(processor, processorResultList));
processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
}
}
List<Processor> onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size());
@ -79,10 +81,10 @@ public final class TrackingResultProcessor implements Processor {
if (processor instanceof CompoundProcessor) {
onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList));
} else {
onFailureProcessors.add(new TrackingResultProcessor(processor, processorResultList));
onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
}
}
return new CompoundProcessor(false, processors, onFailureProcessors);
return new CompoundProcessor(compoundProcessor.isIgnoreFailure(), processors, onFailureProcessors);
}
}

View File

@ -243,6 +243,7 @@ public final class ConfigurationUtils {
}
}
}
return processors;
}
@ -256,7 +257,12 @@ public final class ConfigurationUtils {
List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry);
Processor processor;
processor = factory.create(config);
if (!config.isEmpty()) {
if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) {
throw newConfigurationException(processor.getType(), processor.getTag(), Pipeline.ON_FAILURE_KEY,
"processors list cannot be empty");
}
if (config.isEmpty() == false) {
throw new ElasticsearchParseException("processor [{}] doesn't support one or more provided configuration parameters {}",
type, Arrays.toString(config.keySet().toArray()));
}

View File

@ -109,6 +109,9 @@ public final class Pipeline {
throw new ElasticsearchParseException("pipeline [" + id +
"] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
}
if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) {
throw new ElasticsearchParseException("pipeline [" + id + "] cannot have an empty on_failure option defined");
}
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.unmodifiableList(processors),
Collections.unmodifiableList(onFailureProcessors));
return new Pipeline(id, description, compoundProcessor);

View File

@ -159,6 +159,22 @@ public class SimulateExecutionServiceTests extends ESTestCase {
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getFailure(), nullValue());
}
public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception {
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); });
CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(1));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(), not(sameInstance(ingestDocument.getSourceAndMetadata())));
}
public void testExecuteItemWithFailure() throws Exception {
TestProcessor processor = new TestProcessor(ingestDocument -> { throw new RuntimeException("processor failed"); });
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));

View File

@ -28,6 +28,7 @@ import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -52,7 +53,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
public void testActualProcessor() throws Exception {
TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {});
TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(actualProcessor, resultList);
TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList);
trackingProcessor.execute(ingestDocument);
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
@ -127,4 +128,21 @@ public class TrackingResultProcessorTests extends ESTestCase {
assertThat(resultList.get(3).getFailure(), nullValue());
assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag()));
}
public void testActualCompoundProcessorWithIgnoreFailure() throws Exception {
RuntimeException exception = new RuntimeException("processor failed");
TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; });
CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor),
Collections.emptyList());
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument);
SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
assertThat(resultList.size(), equalTo(1));
assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
assertThat(resultList.get(0).getFailure(), nullValue());
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag()));
}
}

View File

@ -86,6 +86,30 @@ public class PipelineFactoryTests extends ESTestCase {
assertThat(pipeline.getOnFailureProcessors().get(0).getType(), equalTo("test-processor"));
}
public void testCreateWithPipelineEmptyOnFailure() throws Exception {
Map<String, Object> processorConfig = new HashMap<>();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.emptyList());
Pipeline.Factory factory = new Pipeline.Factory();
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create("_id", pipelineConfig, processorRegistry));
assertThat(e.getMessage(), equalTo("pipeline [_id] cannot have an empty on_failure option defined"));
}
public void testCreateWithPipelineEmptyOnFailureInProcessor() throws Exception {
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put(Pipeline.ON_FAILURE_KEY, Collections.emptyList());
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create("_id", pipelineConfig, processorRegistry));
assertThat(e.getMessage(), equalTo("[on_failure] processors list cannot be empty"));
}
public void testCreateWithPipelineIgnoreFailure() throws Exception {
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("ignore_failure", true);
@ -116,11 +140,8 @@ public class PipelineFactoryTests extends ESTestCase {
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
try {
factory.create("_id", pipelineConfig, processorRegistry);
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
}
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create("_id", pipelineConfig, processorRegistry));
assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
}
public void testCreateProcessorsWithOnFailureProperties() throws Exception {

View File

@ -107,6 +107,61 @@
- match: { _source.foofield: "exists" }
- match: { _source.foofield2: "ran" }
---
"Test pipeline with empty on_failure in a processor":
- do:
catch: request
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"fail" : {
"tag" : "emptyfail",
"message" : "_message",
"on_failure": []
}
}
],
"on_failure": [
{
"set" : {
"field": "on_failure_executed",
"value": true
}
}
]
}
- match: { error.root_cause.0.type: "parse_exception" }
- match: { error.root_cause.0.reason: "[on_failure] processors list cannot be empty" }
- match: { error.root_cause.0.header.processor_type: "fail" }
- match: { error.root_cause.0.header.processor_tag: "emptyfail" }
- match: { error.root_cause.0.header.property_name: "on_failure" }
---
"Test pipeline with empty on_failure in pipeline":
- do:
catch: request
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "foo",
"value" : "_message"
}
}
],
"on_failure": []
}
- match: { error.root_cause.0.type: "parse_exception" }
- match: { error.root_cause.0.reason: "pipeline [my_pipeline] cannot have an empty on_failure option defined" }
---
"Test pipeline with ignore_failure in a processor":
- do:

View File

@ -479,3 +479,73 @@
- match: { docs.0.processor_results.4.doc._source.foofield2: "ran" }
- match: { docs.0.processor_results.4.doc._source.field1: "123.42 400 <foo>" }
- match: { docs.0.processor_results.4.doc._source.status: 200 }
---
"Test verbose simulate with ignore_failure":
- do:
ingest.simulate:
verbose: true
body: >
{
"pipeline" : {
"description": "_description",
"processors": [
{
"set" : {
"tag" : "setstatus-1",
"field" : "status",
"value" : 200
}
},
{
"rename" : {
"tag" : "rename-1",
"field" : "foofield",
"target_field" : "field1",
"ignore_failure": true,
"on_failure" : [
{
"set" : {
"tag" : "set on_failure rename",
"field" : "foofield",
"value" : "exists"
}
},
{
"rename" : {
"field" : "foofield2",
"target_field" : "field1",
"on_failure" : [
{
"set" : {
"field" : "foofield2",
"value" : "ran"
}
}
]
}
}
]
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"field1": "123.42 400 <foo>"
}
}
]
}
- length: { docs: 1 }
- length: { docs.0.processor_results: 2 }
- match: { docs.0.processor_results.0.tag: "setstatus-1" }
- match: { docs.0.processor_results.0.doc._source.field1: "123.42 400 <foo>" }
- match: { docs.0.processor_results.0.doc._source.status: 200 }
- match: { docs.0.processor_results.1.tag: "rename-1" }
- match: { docs.0.processor_results.1.doc._source.field1: "123.42 400 <foo>" }
- match: { docs.0.processor_results.1.doc._source.status: 200 }