Merge pull request #15646 from talevy/error_metadata

[Ingest] add on_failure context to ingest metadata during executeOnFailure
This commit is contained in:
Tal Levy 2016-01-04 11:26:43 -08:00
commit af89b57a39
3 changed files with 126 additions and 20 deletions

View File

@ -25,6 +25,7 @@ import org.elasticsearch.ingest.IngestDocument;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -32,6 +33,8 @@ import java.util.stream.Collectors;
* "onFailureProcessors" when any of the processors throw an {@link Exception}. * "onFailureProcessors" when any of the processors throw an {@link Exception}.
*/ */
public class CompoundProcessor implements Processor { public class CompoundProcessor implements Processor {
static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message";
static final String ON_FAILURE_PROCESSOR_FIELD = "on_failure_processor";
private final List<Processor> processors; private final List<Processor> processors;
private final List<Processor> onFailureProcessors; private final List<Processor> onFailureProcessors;
@ -59,23 +62,31 @@ public class CompoundProcessor implements Processor {
@Override @Override
public void execute(IngestDocument ingestDocument) throws Exception { public void execute(IngestDocument ingestDocument) throws Exception {
try {
for (Processor processor : processors) { for (Processor processor : processors) {
try {
processor.execute(ingestDocument); processor.execute(ingestDocument);
}
} catch (Exception e) { } catch (Exception e) {
if (onFailureProcessors.isEmpty()) { if (onFailureProcessors.isEmpty()) {
throw e; throw e;
} else { } else {
executeOnFailure(ingestDocument); executeOnFailure(ingestDocument, e, processor.getType());
}
return;
}
} }
} }
} void executeOnFailure(IngestDocument ingestDocument, Exception cause, String failedProcessorType) throws Exception {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
void executeOnFailure(IngestDocument ingestDocument) throws Exception { try {
ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getMessage());
ingestMetadata.put(ON_FAILURE_PROCESSOR_FIELD, failedProcessorType);
for (Processor processor : onFailureProcessors) { for (Processor processor : onFailureProcessors) {
processor.execute(ingestDocument); processor.execute(ingestDocument);
} }
} finally {
ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
ingestMetadata.remove(ON_FAILURE_PROCESSOR_FIELD);
}
} }
} }

View File

@ -22,17 +22,21 @@ package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.Before; import org.junit.Before;
import org.mockito.stubbing.Answer;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.mock.orig.Mockito.verify; import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
public class CompoundProcessorTests extends ESTestCase { public class CompoundProcessorTests extends ESTestCase {
private IngestDocument ingestDocument; private IngestDocument ingestDocument;
@ -61,6 +65,7 @@ public class CompoundProcessorTests extends ESTestCase {
public void testSingleProcessorWithException() throws Exception { public void testSingleProcessorWithException() throws Exception {
Processor processor = mock(Processor.class); Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("failed_processor");
doThrow(new RuntimeException("error")).doNothing().when(processor).execute(ingestDocument); doThrow(new RuntimeException("error")).doNothing().when(processor).execute(ingestDocument);
CompoundProcessor compoundProcessor = new CompoundProcessor(processor); CompoundProcessor compoundProcessor = new CompoundProcessor(processor);
assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
@ -72,29 +77,70 @@ public class CompoundProcessorTests extends ESTestCase {
} catch (Exception e) { } catch (Exception e) {
assertThat(e.getMessage(), equalTo("error")); assertThat(e.getMessage(), equalTo("error"));
} }
verify(processor, times(1)).execute(ingestDocument); verify(processor, times(1)).execute(ingestDocument);
} }
public void testSingleProcessorWithOnFailureProcessor() throws Exception { public void testSingleProcessorWithOnFailureProcessor() throws Exception {
Exception error = new RuntimeException("error");
Processor processor = mock(Processor.class); Processor processor = mock(Processor.class);
doThrow(new RuntimeException("error")).doNothing().when(processor).execute(ingestDocument); when(processor.getType()).thenReturn("first");
doThrow(error).doNothing().when(processor).execute(ingestDocument);
Processor processorNext = mock(Processor.class); Processor processorNext = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(Arrays.asList(processor), Arrays.asList(processorNext)); Answer checkMetadataAnswer = invocationOnMock -> {
@SuppressWarnings("unchecked")
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(2));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first"));
return null;
};
doAnswer(checkMetadataAnswer).when(processorNext).execute(ingestDocument);
CompoundProcessor compoundProcessor = spy(new CompoundProcessor(Arrays.asList(processor), Arrays.asList(processorNext)));
assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor)); assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor));
assertThat(compoundProcessor.getOnFailureProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getOnFailureProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getOnFailureProcessors().get(0), equalTo(processorNext)); assertThat(compoundProcessor.getOnFailureProcessors().get(0), equalTo(processorNext));
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
verify(compoundProcessor).executeOnFailure(ingestDocument, error, "first");
verify(processor, times(1)).execute(ingestDocument); verify(processor, times(1)).execute(ingestDocument);
verify(processorNext, times(1)).execute(ingestDocument); verify(processorNext, times(1)).execute(ingestDocument);
} }
public void testSingleProcessorWithNestedFailures() throws Exception { public void testSingleProcessorWithNestedFailures() throws Exception {
Exception error = new RuntimeException("error");
Processor processor = mock(Processor.class); Processor processor = mock(Processor.class);
doThrow(new RuntimeException("error")).doNothing().when(processor).execute(ingestDocument); when(processor.getType()).thenReturn("first");
doThrow(error).doNothing().when(processor).execute(ingestDocument);
Processor processorToFail = mock(Processor.class); Processor processorToFail = mock(Processor.class);
doThrow(new RuntimeException("error")).doNothing().when(processorToFail).execute(ingestDocument); Answer checkMetadataAnswer = invocationOnMock -> {
@SuppressWarnings("unchecked")
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(2));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first"));
return null;
};
doAnswer(checkMetadataAnswer).when(processorToFail).execute(ingestDocument);
when(processorToFail.getType()).thenReturn("second");
doThrow(error).doNothing().when(processorToFail).execute(ingestDocument);
Processor lastProcessor = mock(Processor.class); Processor lastProcessor = mock(Processor.class);
Answer checkLastMetadataAnswer = invocationOnMock -> {
@SuppressWarnings("unchecked")
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(2));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("second"));
return null;
};
doAnswer(checkLastMetadataAnswer).when(lastProcessor).execute(ingestDocument);
CompoundProcessor innerCompoundOnFailProcessor = new CompoundProcessor(Arrays.asList(processorToFail), Arrays.asList(lastProcessor)); CompoundProcessor innerCompoundOnFailProcessor = new CompoundProcessor(Arrays.asList(processorToFail), Arrays.asList(lastProcessor));
CompoundProcessor compoundOnFailProcessor = spy(innerCompoundOnFailProcessor); CompoundProcessor compoundOnFailProcessor = spy(innerCompoundOnFailProcessor);
@ -108,10 +154,10 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(compoundProcessor.getOnFailureProcessors().get(0), equalTo(compoundOnFailProcessor)); assertThat(compoundProcessor.getOnFailureProcessors().get(0), equalTo(compoundOnFailProcessor));
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
verify(processor, times(1)).execute(ingestDocument); verify(processor, times(1)).execute(ingestDocument);
verify(compoundProcessor, times(1)).executeOnFailure(ingestDocument); verify(compoundProcessor, times(1)).executeOnFailure(ingestDocument, error, "first");
verify(compoundOnFailProcessor, times(1)).execute(ingestDocument); verify(compoundOnFailProcessor, times(1)).execute(ingestDocument);
verify(processorToFail, times(1)).execute(ingestDocument); verify(processorToFail, times(1)).execute(ingestDocument);
verify(compoundOnFailProcessor, times(1)).executeOnFailure(ingestDocument); verify(compoundOnFailProcessor, times(1)).executeOnFailure(ingestDocument, error, "second");
verify(lastProcessor, times(1)).execute(ingestDocument); verify(lastProcessor, times(1)).execute(ingestDocument);
} }
} }

View File

@ -1,5 +1,5 @@
--- ---
"Test metadata templateing": "Test metadata templating":
- do: - do:
cluster.health: cluster.health:
wait_for_status: green wait_for_status: green
@ -45,7 +45,7 @@
- match: { _source.metadata: ["test", "test", "1"] } - match: { _source.metadata: ["test", "test", "1"] }
--- ---
"Test templateing": "Test templating":
- do: - do:
cluster.health: cluster.health:
wait_for_status: green wait_for_status: green
@ -169,3 +169,52 @@
id: 1 id: 1
- length: { _source: 1 } - length: { _source: 1 }
- match: { _source.field_to_remove: "field2" } - match: { _source.field_to_remove: "field2" }
---
"Test on_failure metadata context templating":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_handled_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"remove" : {
"field" : "field_to_remove",
"on_failure" : [
{
"set" : {
"field" : "error",
"value" : "processor [{{ _ingest.on_failure_processor }}]: {{ _ingest.on_failure_message }}"
}
}
]
}
}
]
}
- match: { _id: "my_handled_pipeline" }
- do:
ingest.index:
index: test
type: test
id: 1
pipeline: "my_handled_pipeline"
body: {
do_nothing: "foo",
}
- do:
get:
index: test
type: test
id: 1
- length: { _source: 2 }
- match: { _source.do_nothing: "foo" }
- match: { _source.error: "processor [remove]: field [field_to_remove] not present as part of path [field_to_remove]" }