add on_failure exception metadata to ingest document for verbose simulate

This commit is contained in:
Tal Levy 2016-02-09 14:17:57 -08:00
parent 7f0134e725
commit cabc4b1636
7 changed files with 348 additions and 117 deletions

View File

@ -23,13 +23,14 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.ingest.processor.TrackingResultProcessor.decorate;
class SimulateExecutionService {
private static final String THREAD_POOL_NAME = ThreadPool.Names.MANAGEMENT;
@ -40,40 +41,16 @@ class SimulateExecutionService {
this.threadPool = threadPool;
}
void executeVerboseDocument(Processor processor, IngestDocument ingestDocument, List<SimulateProcessorResult> processorResultList) throws Exception {
if (processor instanceof CompoundProcessor) {
CompoundProcessor cp = (CompoundProcessor) processor;
try {
for (Processor p : cp.getProcessors()) {
executeVerboseDocument(p, ingestDocument, processorResultList);
}
} catch (Exception e) {
for (Processor p : cp.getOnFailureProcessors()) {
executeVerboseDocument(p, ingestDocument, processorResultList);
}
}
} else {
try {
processor.execute(ingestDocument);
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument)));
} catch (Exception e) {
processorResultList.add(new SimulateProcessorResult(processor.getTag(), e));
throw e;
}
}
}
SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) {
if (verbose) {
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
IngestDocument currentIngestDocument = new IngestDocument(ingestDocument);
CompoundProcessor pipelineProcessor = new CompoundProcessor(pipeline.getProcessors(), pipeline.getOnFailureProcessors());
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
try {
executeVerboseDocument(pipelineProcessor, currentIngestDocument, processorResultList);
verbosePipelineProcessor.execute(ingestDocument);
return new SimulateDocumentVerboseResult(processorResultList);
} catch (Exception e) {
return new SimulateDocumentBaseResult(e);
return new SimulateDocumentVerboseResult(processorResultList);
}
return new SimulateDocumentVerboseResult(processorResultList);
} else {
try {
pipeline.execute(ingestDocument);

View File

@ -28,15 +28,16 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* A Processor that executes a list of other "processors". It executes a separate list of
* "onFailureProcessors" when any of the processors throw an {@link Exception}.
*/
public class CompoundProcessor implements Processor {
static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message";
static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type";
static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
public static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message";
public static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type";
public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
private final List<Processor> processors;
private final List<Processor> onFailureProcessors;
@ -84,7 +85,7 @@ public class CompoundProcessor implements Processor {
@Override
public String getTag() {
return "compound-processor-" + Objects.hash(processors, onFailureProcessors);
return "CompoundProcessor-" + flattenProcessors().stream().map(Processor::getTag).collect(Collectors.joining("-"));
}
@Override
@ -104,18 +105,27 @@ public class CompoundProcessor implements Processor {
}
void executeOnFailure(IngestDocument ingestDocument, Exception cause, String failedProcessorType, String failedProcessorTag) throws Exception {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
try {
ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getMessage());
ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
putFailureMetadata(ingestDocument, cause, failedProcessorType, failedProcessorTag);
for (Processor processor : onFailureProcessors) {
processor.execute(ingestDocument);
}
} finally {
ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
removeFailureMetadata(ingestDocument);
}
}
private void putFailureMetadata(IngestDocument ingestDocument, Exception cause, String failedProcessorType, String failedProcessorTag) {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getMessage());
ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
}
private void removeFailureMetadata(IngestDocument ingestDocument) {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
}
}

View File

@ -68,6 +68,13 @@ public final class Pipeline {
return description;
}
/**
* Get the underlying {@link CompoundProcessor} containing the Pipeline's processors
*/
public CompoundProcessor getCompoundProcessor() {
return compoundProcessor;
}
/**
* Unmodifiable list containing each processor that operates on the data.
*/

View File

@ -0,0 +1,89 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.action.ingest.SimulateProcessorResult;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Processor;
import java.util.ArrayList;
import java.util.List;
/**
* Processor to be used within Simulate API to keep track of processors executed in pipeline.
*/
public final class TrackingResultProcessor implements Processor {
private final Processor actualProcessor;
private final List<SimulateProcessorResult> processorResultList;
public TrackingResultProcessor(Processor actualProcessor, List<SimulateProcessorResult> processorResultList) {
this.processorResultList = processorResultList;
if (actualProcessor instanceof CompoundProcessor) {
CompoundProcessor trackedCompoundProcessor = decorate((CompoundProcessor) actualProcessor, processorResultList);
this.actualProcessor = trackedCompoundProcessor;
} else {
this.actualProcessor = actualProcessor;
}
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
try {
actualProcessor.execute(ingestDocument);
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument)));
} catch (Exception e) {
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e));
throw e;
}
}
@Override
public String getType() {
return actualProcessor.getType();
}
@Override
public String getTag() {
return actualProcessor.getTag();
}
public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList) {
List<Processor> processors = new ArrayList<>(compoundProcessor.getProcessors().size());
for (Processor processor : compoundProcessor.getProcessors()) {
if (processor instanceof CompoundProcessor) {
processors.add(decorate((CompoundProcessor) processor, processorResultList));
} else {
processors.add(new TrackingResultProcessor(processor, processorResultList));
}
}
List<Processor> onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size());
for (Processor processor : compoundProcessor.getOnFailureProcessors()) {
if (processor instanceof CompoundProcessor) {
onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList));
} else {
onFailureProcessors.add(new TrackingResultProcessor(processor, processorResultList));
}
}
return new CompoundProcessor(processors, onFailureProcessors);
}
}

View File

@ -31,10 +31,8 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
@ -46,7 +44,6 @@ public class SimulateExecutionServiceTests extends ESTestCase {
private ThreadPool threadPool;
private SimulateExecutionService executionService;
private Pipeline pipeline;
private Processor processor;
private IngestDocument ingestDocument;
@ -59,7 +56,6 @@ public class SimulateExecutionServiceTests extends ESTestCase {
);
executionService = new SimulateExecutionService(threadPool);
processor = new TestProcessor("id", "mock", ingestDocument -> {});
pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
}
@ -68,74 +64,6 @@ public class SimulateExecutionServiceTests extends ESTestCase {
threadPool.shutdown();
}
public void testExecuteVerboseDocumentSimple() throws Exception {
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
executionService.executeVerboseDocument(processor, ingestDocument, processorResultList);
SimulateProcessorResult result = new SimulateProcessorResult("id", ingestDocument);
assertThat(processorResultList.size(), equalTo(1));
assertThat(processorResultList.get(0).getProcessorTag(), equalTo(result.getProcessorTag()));
assertThat(processorResultList.get(0).getIngestDocument(), equalTo(result.getIngestDocument()));
assertThat(processorResultList.get(0).getFailure(), nullValue());
}
public void testExecuteVerboseDocumentSimpleException() throws Exception {
RuntimeException exception = new RuntimeException("mock_exception");
TestProcessor processor = new TestProcessor("id", "mock", ingestDocument -> { throw exception; });
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
try {
executionService.executeVerboseDocument(processor, ingestDocument, processorResultList);
fail("should throw exception");
} catch (RuntimeException e) {
assertThat(e.getMessage(), equalTo("mock_exception"));
}
SimulateProcessorResult result = new SimulateProcessorResult("id", exception);
assertThat(processorResultList.size(), equalTo(1));
assertThat(processorResultList.get(0).getProcessorTag(), equalTo(result.getProcessorTag()));
assertThat(processorResultList.get(0).getFailure(), equalTo(result.getFailure()));
}
public void testExecuteVerboseDocumentCompoundSuccess() throws Exception {
TestProcessor processor1 = new TestProcessor("p1", "mock", ingestDocument -> { });
TestProcessor processor2 = new TestProcessor("p2", "mock", ingestDocument -> { });
Processor compoundProcessor = new CompoundProcessor(processor1, processor2);
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
executionService.executeVerboseDocument(compoundProcessor, ingestDocument, processorResultList);
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor2.getInvokedCounter(), equalTo(1));
assertThat(processorResultList.size(), equalTo(2));
assertThat(processorResultList.get(0).getProcessorTag(), equalTo("p1"));
assertThat(processorResultList.get(0).getIngestDocument(), equalTo(ingestDocument));
assertThat(processorResultList.get(0).getFailure(), nullValue());
assertThat(processorResultList.get(1).getProcessorTag(), equalTo("p2"));
assertThat(processorResultList.get(1).getIngestDocument(), equalTo(ingestDocument));
assertThat(processorResultList.get(1).getFailure(), nullValue());
}
public void testExecuteVerboseDocumentCompoundOnFailure() throws Exception {
TestProcessor processor1 = new TestProcessor("p1", "mock", ingestDocument -> { });
TestProcessor processor2 = new TestProcessor("p2", "mock", ingestDocument -> { throw new RuntimeException("p2_exception"); });
TestProcessor onFailureProcessor1 = new TestProcessor("fail_p1", "mock", ingestDocument -> { });
TestProcessor onFailureProcessor2 = new TestProcessor("fail_p2", "mock", ingestDocument -> { throw new RuntimeException("fail_p2_exception"); });
TestProcessor onFailureProcessor3 = new TestProcessor("fail_p3", "mock", ingestDocument -> { });
CompoundProcessor onFailureCompoundProcessor = new CompoundProcessor(Collections.singletonList(onFailureProcessor2), Collections.singletonList(onFailureProcessor3));
Processor compoundProcessor = new CompoundProcessor(Arrays.asList(processor1, processor2), Arrays.asList(onFailureProcessor1, onFailureCompoundProcessor));
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
executionService.executeVerboseDocument(compoundProcessor, ingestDocument, processorResultList);
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor2.getInvokedCounter(), equalTo(1));
assertThat(onFailureProcessor1.getInvokedCounter(), equalTo(1));
assertThat(onFailureProcessor2.getInvokedCounter(), equalTo(1));
assertThat(onFailureProcessor3.getInvokedCounter(), equalTo(1));
assertThat(processorResultList.size(), equalTo(5));
assertThat(processorResultList.get(0).getProcessorTag(), equalTo("p1"));
assertThat(processorResultList.get(1).getProcessorTag(), equalTo("p2"));
assertThat(processorResultList.get(2).getProcessorTag(), equalTo("fail_p1"));
assertThat(processorResultList.get(3).getProcessorTag(), equalTo("fail_p2"));
assertThat(processorResultList.get(4).getProcessorTag(), equalTo("fail_p3"));
}
public void testExecuteVerboseItem() throws Exception {
TestProcessor processor = new TestProcessor("test-id", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
@ -170,16 +98,43 @@ public class SimulateExecutionServiceTests extends ESTestCase {
assertThat(simulateDocumentBaseResult.getFailure(), nullValue());
}
public void testExecuteVerboseItemWithFailure() throws Exception {
public void testExecuteVerboseItemExceptionWithoutOnFailure() throws Exception {
TestProcessor processor1 = new TestProcessor("processor_0", "mock", ingestDocument -> {});
TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); });
TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor1, processor2, processor3));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor2.getInvokedCounter(), equalTo(1));
assertThat(processor3.getInvokedCounter(), equalTo(0));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), equalTo(ingestDocument));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(), not(sameInstance(ingestDocument.getSourceAndMetadata())));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("processor_1"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), instanceOf(RuntimeException.class));
RuntimeException runtimeException = (RuntimeException) simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure();
assertThat(runtimeException.getMessage(), equalTo("processor failed"));
}
public void testExecuteVerboseItemWithOnFailure() throws Exception {
TestProcessor processor1 = new TestProcessor("processor_0", "mock", ingestDocument -> { throw new RuntimeException("processor failed"); });
TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(Collections.singletonList(processor1), Collections.singletonList(processor2)));
TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description",
new CompoundProcessor(new CompoundProcessor(Collections.singletonList(processor1),
Collections.singletonList(processor2)), processor3));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor2.getInvokedCounter(), equalTo(1));
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(3));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), instanceOf(RuntimeException.class));
@ -187,8 +142,20 @@ public class SimulateExecutionServiceTests extends ESTestCase {
assertThat(runtimeException.getMessage(), equalTo("processor failed"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("processor_1"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), not(sameInstance(ingestDocument)));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), equalTo(ingestDocument));
IngestDocument ingestDocumentWithOnFailureMetadata = new IngestDocument(ingestDocument);
Map<String, String> metadata = ingestDocumentWithOnFailureMetadata.getIngestMetadata();
metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD, "mock");
metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD, "processor_0");
metadata.put(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD, "processor failed");
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), equalTo(ingestDocumentWithOnFailureMetadata));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getProcessorTag(), equalTo("processor_2"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), not(sameInstance(ingestDocument)));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), equalTo(ingestDocument));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getFailure(), nullValue());
}
public void testExecuteItemWithFailure() throws Exception {

View File

@ -0,0 +1,129 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.action.ingest.SimulateProcessorResult;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.ingest.core.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD;
import static org.elasticsearch.ingest.core.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD;
import static org.elasticsearch.ingest.core.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD;
import static org.elasticsearch.ingest.processor.TrackingResultProcessor.decorate;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
public class TrackingResultProcessorTests extends ESTestCase {
private IngestDocument ingestDocument;
private List<SimulateProcessorResult> resultList;
@Before
public void init() {
ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
resultList = new ArrayList<>();
}
public void testActualProcessor() throws Exception {
TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {});
TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument);
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
assertThat(actualProcessor.getInvokedCounter(), equalTo(1));
assertThat(resultList.size(), equalTo(1));
assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument()));
assertThat(resultList.get(0).getFailure(), nullValue());
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag()));
}
public void testActualCompoundProcessorWithoutOnFailure() throws Exception {
RuntimeException exception = new RuntimeException("processor failed");
TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; });
CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor);
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
try {
trackingProcessor.execute(ingestDocument);
} catch (Exception e) {
assertThat(e.getMessage(), equalTo(exception.getMessage()));
}
SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
assertThat(resultList.size(), equalTo(1));
assertThat(resultList.get(0).getIngestDocument(), nullValue());
assertThat(resultList.get(0).getFailure(), equalTo(exception));
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFirstResult.getProcessorTag()));
}
public void testActualCompoundProcessorWithOnFailure() throws Exception {
RuntimeException exception = new RuntimeException("fail");
TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; });
TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {});
CompoundProcessor actualProcessor = new CompoundProcessor(
Arrays.asList(new CompoundProcessor(
Arrays.asList(failProcessor, onFailureProcessor),
Arrays.asList(onFailureProcessor, failProcessor))),
Arrays.asList(onFailureProcessor));
CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList);
trackingProcessor.execute(ingestDocument);
SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument);
SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument);
assertThat(failProcessor.getInvokedCounter(), equalTo(2));
assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2));
assertThat(resultList.size(), equalTo(4));
assertThat(resultList.get(0).getIngestDocument(), nullValue());
assertThat(resultList.get(0).getFailure(), equalTo(exception));
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag()));
Map<String, String> metadata = resultList.get(1).getIngestDocument().getIngestMetadata();
assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail"));
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test"));
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail"));
assertThat(resultList.get(1).getFailure(), nullValue());
assertThat(resultList.get(1).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag()));
assertThat(resultList.get(2).getIngestDocument(), nullValue());
assertThat(resultList.get(2).getFailure(), equalTo(exception));
assertThat(resultList.get(2).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag()));
metadata = resultList.get(3).getIngestDocument().getIngestMetadata();
assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail"));
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("compound"));
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("CompoundProcessor-fail-success-success-fail"));
assertThat(resultList.get(3).getFailure(), nullValue());
assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag()));
}
}

View File

@ -274,3 +274,55 @@
id: 1
- length: { _source: 2 }
- match: { _source.values_flat: ["foo_bar", "foo_baz"] }
---
"Test verbose simulate with error context":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.simulate:
verbose: true
body: >
{
"pipeline" : {
"description": "_description",
"processors": [
{
"rename" : {
"tag" : "rename-status",
"field" : "status",
"to" : "bar",
"on_failure" : [
{
"set" : {
"tag" : "set_on_rename_failure",
"field" : "error",
"value" : "processor {{ _ingest.on_failure_processor_tag }} [{{ _ingest.on_failure_processor_type }}]: {{ _ingest.on_failure_message }}"
}
}
]
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"foo": "bar"
}
}
]
}
- length: { docs: 1 }
- length: { docs.0.processor_results: 2 }
- match: { docs.0.processor_results.0.tag: "rename-status" }
- match: { docs.0.processor_results.0.error.type: "illegal_argument_exception" }
- match: { docs.0.processor_results.0.error.reason: "field [status] doesn't exist" }
- match: { docs.0.processor_results.1.tag: "set_on_rename_failure" }
- length: { docs.0.processor_results.1.doc._source: 2 }
- match: { docs.0.processor_results.1.doc._source.foo: "bar" }
- match: { docs.0.processor_results.1.doc._source.error: "processor rename-status [rename]: field [status] doesn't exist" }