add on_failure context to ingest metadata during executeOnFailure

This commit is contained in:
Tal Levy 2015-12-23 15:16:24 -08:00
parent d14f6dbd3b
commit f34ce9ddf4
3 changed files with 126 additions and 20 deletions

View File

@ -25,6 +25,7 @@ import org.elasticsearch.ingest.IngestDocument;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
@ -32,6 +33,8 @@ import java.util.stream.Collectors;
* "onFailureProcessors" when any of the processors throw an {@link Exception}.
*/
public class CompoundProcessor implements Processor {
static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message";
static final String ON_FAILURE_PROCESSOR_FIELD = "on_failure_processor";
private final List<Processor> processors;
private final List<Processor> onFailureProcessors;
@ -59,23 +62,31 @@ public class CompoundProcessor implements Processor {
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
try {
for (Processor processor : processors) {
for (Processor processor : processors) {
try {
processor.execute(ingestDocument);
}
} catch (Exception e) {
if (onFailureProcessors.isEmpty()) {
throw e;
} else {
executeOnFailure(ingestDocument);
} catch (Exception e) {
if (onFailureProcessors.isEmpty()) {
throw e;
} else {
executeOnFailure(ingestDocument, e, processor.getType());
}
return;
}
}
}
void executeOnFailure(IngestDocument ingestDocument) throws Exception {
for (Processor processor : onFailureProcessors) {
processor.execute(ingestDocument);
void executeOnFailure(IngestDocument ingestDocument, Exception cause, String failedProcessorType) throws Exception {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
try {
ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getMessage());
ingestMetadata.put(ON_FAILURE_PROCESSOR_FIELD, failedProcessorType);
for (Processor processor : onFailureProcessors) {
processor.execute(ingestDocument);
}
} finally {
ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
ingestMetadata.remove(ON_FAILURE_PROCESSOR_FIELD);
}
}
}

View File

@ -22,17 +22,21 @@ package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import org.mockito.stubbing.Answer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
public class CompoundProcessorTests extends ESTestCase {
private IngestDocument ingestDocument;
@ -61,6 +65,7 @@ public class CompoundProcessorTests extends ESTestCase {
public void testSingleProcessorWithException() throws Exception {
Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("failed_processor");
doThrow(new RuntimeException("error")).doNothing().when(processor).execute(ingestDocument);
CompoundProcessor compoundProcessor = new CompoundProcessor(processor);
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
@ -72,29 +77,70 @@ public class CompoundProcessorTests extends ESTestCase {
} catch (Exception e) {
assertThat(e.getMessage(), equalTo("error"));
}
verify(processor, times(1)).execute(ingestDocument);
}
public void testSingleProcessorWithOnFailureProcessor() throws Exception {
Exception error = new RuntimeException("error");
Processor processor = mock(Processor.class);
doThrow(new RuntimeException("error")).doNothing().when(processor).execute(ingestDocument);
when(processor.getType()).thenReturn("first");
doThrow(error).doNothing().when(processor).execute(ingestDocument);
Processor processorNext = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(Arrays.asList(processor), Arrays.asList(processorNext));
Answer checkMetadataAnswer = invocationOnMock -> {
@SuppressWarnings("unchecked")
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(2));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first"));
return null;
};
doAnswer(checkMetadataAnswer).when(processorNext).execute(ingestDocument);
CompoundProcessor compoundProcessor = spy(new CompoundProcessor(Arrays.asList(processor), Arrays.asList(processorNext)));
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor));
assertThat(compoundProcessor.getOnFailureProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getOnFailureProcessors().get(0), equalTo(processorNext));
compoundProcessor.execute(ingestDocument);
verify(compoundProcessor).executeOnFailure(ingestDocument, error, "first");
verify(processor, times(1)).execute(ingestDocument);
verify(processorNext, times(1)).execute(ingestDocument);
}
public void testSingleProcessorWithNestedFailures() throws Exception {
Exception error = new RuntimeException("error");
Processor processor = mock(Processor.class);
doThrow(new RuntimeException("error")).doNothing().when(processor).execute(ingestDocument);
when(processor.getType()).thenReturn("first");
doThrow(error).doNothing().when(processor).execute(ingestDocument);
Processor processorToFail = mock(Processor.class);
doThrow(new RuntimeException("error")).doNothing().when(processorToFail).execute(ingestDocument);
Answer checkMetadataAnswer = invocationOnMock -> {
@SuppressWarnings("unchecked")
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(2));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first"));
return null;
};
doAnswer(checkMetadataAnswer).when(processorToFail).execute(ingestDocument);
when(processorToFail.getType()).thenReturn("second");
doThrow(error).doNothing().when(processorToFail).execute(ingestDocument);
Processor lastProcessor = mock(Processor.class);
Answer checkLastMetadataAnswer = invocationOnMock -> {
@SuppressWarnings("unchecked")
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(2));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("second"));
return null;
};
doAnswer(checkLastMetadataAnswer).when(lastProcessor).execute(ingestDocument);
CompoundProcessor innerCompoundOnFailProcessor = new CompoundProcessor(Arrays.asList(processorToFail), Arrays.asList(lastProcessor));
CompoundProcessor compoundOnFailProcessor = spy(innerCompoundOnFailProcessor);
@ -108,10 +154,10 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(compoundProcessor.getOnFailureProcessors().get(0), equalTo(compoundOnFailProcessor));
compoundProcessor.execute(ingestDocument);
verify(processor, times(1)).execute(ingestDocument);
verify(compoundProcessor, times(1)).executeOnFailure(ingestDocument);
verify(compoundProcessor, times(1)).executeOnFailure(ingestDocument, error, "first");
verify(compoundOnFailProcessor, times(1)).execute(ingestDocument);
verify(processorToFail, times(1)).execute(ingestDocument);
verify(compoundOnFailProcessor, times(1)).executeOnFailure(ingestDocument);
verify(compoundOnFailProcessor, times(1)).executeOnFailure(ingestDocument, error, "second");
verify(lastProcessor, times(1)).execute(ingestDocument);
}
}

View File

@ -1,5 +1,5 @@
---
"Test metadata templateing":
"Test metadata templating":
- do:
cluster.health:
wait_for_status: green
@ -45,7 +45,7 @@
- match: { _source.metadata: ["test", "test", "1"] }
---
"Test templateing":
"Test templating":
- do:
cluster.health:
wait_for_status: green
@ -169,3 +169,52 @@
id: 1
- length: { _source: 1 }
- match: { _source.field_to_remove: "field2" }
---
"Test on_failure metadata context templating":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_handled_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"remove" : {
"field" : "field_to_remove",
"on_failure" : [
{
"set" : {
"field" : "error",
"value" : "processor [{{ _ingest.on_failure_processor }}]: {{ _ingest.on_failure_message }}"
}
}
]
}
}
]
}
- match: { _id: "my_handled_pipeline" }
- do:
ingest.index:
index: test
type: test
id: 1
pipeline: "my_handled_pipeline"
body: {
do_nothing: "foo",
}
- do:
get:
index: test
type: test
id: 1
- length: { _source: 2 }
- match: { _source.do_nothing: "foo" }
- match: { _source.error: "processor [remove]: field [field_to_remove] not present as part of path [field_to_remove]" }