ingest: added `ignore_failure` option to all processors

If this option is enabled on a processor it silently catches any processor related failure and continues executing the rest of the pipeline.

 Closes #18493
This commit is contained in:
Martijn van Groningen 2016-05-31 11:58:20 +02:00
parent 966bfc3f2d
commit 766789b0f0
15 changed files with 163 additions and 40 deletions

View File

@ -1037,11 +1037,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]IndicesStoreIntegrationIT.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]IndicesStoreIntegrationIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]IndicesStoreTests.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]IndicesStoreTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]template[/\\]SimpleIndexTemplateIT.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]template[/\\]SimpleIndexTemplateIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]PipelineExecutionServiceTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]PipelineStoreTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]core[/\\]CompoundProcessorTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]core[/\\]PipelineFactoryTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]core[/\\]ValueSourceTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]processor[/\\]AbstractStringProcessorTestCase.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]processor[/\\]AbstractStringProcessorTestCase.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]processor[/\\]AppendProcessorTests.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]processor[/\\]AppendProcessorTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]processor[/\\]DateFormatTests.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ingest[/\\]processor[/\\]DateFormatTests.java" checks="LineLength" />

View File

@ -38,19 +38,25 @@ public class CompoundProcessor implements Processor {
public static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type"; public static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type";
public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag"; public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
private final boolean ignoreFailure;
private final List<Processor> processors; private final List<Processor> processors;
private final List<Processor> onFailureProcessors; private final List<Processor> onFailureProcessors;
public CompoundProcessor(Processor... processor) { public CompoundProcessor(Processor... processor) {
this(Arrays.asList(processor), Collections.emptyList()); this(false, Arrays.asList(processor), Collections.emptyList());
} }
public CompoundProcessor(List<Processor> processors, List<Processor> onFailureProcessors) { public CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors) {
super(); super();
this.ignoreFailure = ignoreFailure;
this.processors = processors; this.processors = processors;
this.onFailureProcessors = onFailureProcessors; this.onFailureProcessors = onFailureProcessors;
} }
public boolean isIgnoreFailure() {
return ignoreFailure;
}
public List<Processor> getOnFailureProcessors() { public List<Processor> getOnFailureProcessors() {
return onFailureProcessors; return onFailureProcessors;
} }
@ -93,6 +99,10 @@ public class CompoundProcessor implements Processor {
try { try {
processor.execute(ingestDocument); processor.execute(ingestDocument);
} catch (Exception e) { } catch (Exception e) {
if (ignoreFailure) {
continue;
}
ElasticsearchException compoundProcessorException = newCompoundProcessorException(e, processor.getType(), processor.getTag()); ElasticsearchException compoundProcessorException = newCompoundProcessorException(e, processor.getType(), processor.getTag());
if (onFailureProcessors.isEmpty()) { if (onFailureProcessors.isEmpty()) {
throw compoundProcessorException; throw compoundProcessorException;

View File

@ -250,6 +250,7 @@ public final class ConfigurationUtils {
private static Processor readProcessor(ProcessorsRegistry processorRegistry, String type, Map<String, Object> config) throws Exception { private static Processor readProcessor(ProcessorsRegistry processorRegistry, String type, Map<String, Object> config) throws Exception {
Processor.Factory factory = processorRegistry.getProcessorFactory(type); Processor.Factory factory = processorRegistry.getProcessorFactory(type);
if (factory != null) { if (factory != null) {
boolean ignoreFailure = ConfigurationUtils.readBooleanProperty(null, null, config, "ignore_failure", false);
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs = List<Map<String, Map<String, Object>>> onFailureProcessorConfigs =
ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY); ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY);
@ -260,10 +261,11 @@ public final class ConfigurationUtils {
throw new ElasticsearchParseException("processor [{}] doesn't support one or more provided configuration parameters {}", throw new ElasticsearchParseException("processor [{}] doesn't support one or more provided configuration parameters {}",
type, Arrays.toString(config.keySet().toArray())); type, Arrays.toString(config.keySet().toArray()));
} }
if (onFailureProcessors.isEmpty()) { if (onFailureProcessors.size() > 0 || ignoreFailure) {
return new CompoundProcessor(ignoreFailure, Collections.singletonList(processor), onFailureProcessors);
} else {
return processor; return processor;
} }
return new CompoundProcessor(Collections.singletonList(processor), onFailureProcessors);
} }
throw new ElasticsearchParseException("No processor type exists with name [" + type + "]"); throw new ElasticsearchParseException("No processor type exists with name [" + type + "]");
} }

View File

@ -109,7 +109,7 @@ public final class Pipeline {
if (config.isEmpty() == false) { if (config.isEmpty() == false) {
throw new ElasticsearchParseException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); throw new ElasticsearchParseException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
} }
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors)); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors));
return new Pipeline(id, description, compoundProcessor); return new Pipeline(id, description, compoundProcessor);
} }

View File

@ -83,7 +83,7 @@ public final class TrackingResultProcessor implements Processor {
onFailureProcessors.add(new TrackingResultProcessor(processor, processorResultList)); onFailureProcessors.add(new TrackingResultProcessor(processor, processorResultList));
} }
} }
return new CompoundProcessor(processors, onFailureProcessors); return new CompoundProcessor(false, processors, onFailureProcessors);
} }
} }

View File

@ -128,7 +128,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> {}); TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> {});
TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {}); TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", Pipeline pipeline = new Pipeline("_id", "_description",
new CompoundProcessor(new CompoundProcessor(Collections.singletonList(processor1), new CompoundProcessor(new CompoundProcessor(false, Collections.singletonList(processor1),
Collections.singletonList(processor2)), processor3)); Collections.singletonList(processor2)), processor3));
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor1.getInvokedCounter(), equalTo(1));

View File

@ -98,7 +98,8 @@ public class PipelineExecutionServiceTests extends ESTestCase {
IndexRequest indexRequest1 = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); IndexRequest indexRequest1 = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
bulkRequest.add(indexRequest1); bulkRequest.add(indexRequest1);
IndexRequest indexRequest2 = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("does_not_exist"); IndexRequest indexRequest2 =
new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("does_not_exist");
bulkRequest.add(indexRequest2); bulkRequest.add(indexRequest2);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
BiConsumer<IndexRequest, Throwable> failureHandler = mock(BiConsumer.class); BiConsumer<IndexRequest, Throwable> failureHandler = mock(BiConsumer.class);
@ -192,7 +193,8 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(processor.getType()).thenReturn("mock_processor_type"); when(processor.getType()).thenReturn("mock_processor_type");
when(processor.getTag()).thenReturn("mock_processor_tag"); when(processor.getTag()).thenReturn("mock_processor_tag");
Processor onFailureProcessor = mock(Processor.class); Processor onFailureProcessor = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor),
Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor)); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@ -208,7 +210,8 @@ public class PipelineExecutionServiceTests extends ESTestCase {
public void testExecuteFailureWithOnFailure() throws Exception { public void testExecuteFailureWithOnFailure() throws Exception {
Processor processor = mock(Processor.class); Processor processor = mock(Processor.class);
Processor onFailureProcessor = mock(Processor.class); Processor onFailureProcessor = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor),
Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor)); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@ -227,8 +230,9 @@ public class PipelineExecutionServiceTests extends ESTestCase {
Processor processor = mock(Processor.class); Processor processor = mock(Processor.class);
Processor onFailureProcessor = mock(Processor.class); Processor onFailureProcessor = mock(Processor.class);
Processor onFailureOnFailureProcessor = mock(Processor.class); Processor onFailureOnFailureProcessor = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor),
Collections.singletonList(new CompoundProcessor(Collections.singletonList(onFailureProcessor), Collections.singletonList(onFailureOnFailureProcessor)))); Collections.singletonList(new CompoundProcessor(false, Collections.singletonList(onFailureProcessor),
Collections.singletonList(onFailureOnFailureProcessor))));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor)); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));

View File

@ -61,7 +61,8 @@ public class PipelineStoreTests extends ESTestCase {
store = new PipelineStore(Settings.EMPTY); store = new PipelineStore(Settings.EMPTY);
ProcessorsRegistry.Builder registryBuilder = new ProcessorsRegistry.Builder(); ProcessorsRegistry.Builder registryBuilder = new ProcessorsRegistry.Builder();
registryBuilder.registerProcessor("set", (templateService, registry) -> new SetProcessor.Factory(TestTemplateService.instance())); registryBuilder.registerProcessor("set", (templateService, registry) -> new SetProcessor.Factory(TestTemplateService.instance()));
registryBuilder.registerProcessor("remove", (templateService, registry) -> new RemoveProcessor.Factory(TestTemplateService.instance())); registryBuilder.registerProcessor("remove", (templateService, registry) ->
new RemoveProcessor.Factory(TestTemplateService.instance()));
store.buildProcessorFactoryRegistry(registryBuilder, null); store.buildProcessorFactoryRegistry(registryBuilder, null);
} }
@ -190,7 +191,8 @@ public class PipelineStoreTests extends ESTestCase {
assertThat(pipeline, nullValue()); assertThat(pipeline, nullValue());
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}")); PutPipelineRequest putRequest = new PutPipelineRequest(id,
new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"));
clusterState = store.innerPut(putRequest, clusterState); clusterState = store.innerPut(putRequest, clusterState);
store.innerUpdatePipelines(clusterState); store.innerUpdatePipelines(clusterState);
pipeline = store.get(id); pipeline = store.get(id);
@ -208,7 +210,8 @@ public class PipelineStoreTests extends ESTestCase {
} }
public void testValidate() throws Exception { public void testValidate() throws Exception {
PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}},{\"remove\" : {\"field\": \"_field\"}}]}")); PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray(
"{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}},{\"remove\" : {\"field\": \"_field\"}}]}"));
DiscoveryNode node1 = new DiscoveryNode("_node_id1", new LocalTransportAddress("_id"), DiscoveryNode node1 = new DiscoveryNode("_node_id1", new LocalTransportAddress("_id"),
emptyMap(), emptySet(), Version.CURRENT); emptyMap(), emptySet(), Version.CURRENT);
@ -230,7 +233,8 @@ public class PipelineStoreTests extends ESTestCase {
} }
public void testValidateNoIngestInfo() throws Exception { public void testValidateNoIngestInfo() throws Exception {
PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}")); PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray(
"{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"));
try { try {
store.validatePipeline(Collections.emptyMap(), putRequest); store.validatePipeline(Collections.emptyMap(), putRequest);
fail("exception expected"); fail("exception expected");

View File

@ -24,6 +24,7 @@ import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.Before; import org.junit.Before;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -31,6 +32,7 @@ import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
public class CompoundProcessorTests extends ESTestCase { public class CompoundProcessorTests extends ESTestCase {
private IngestDocument ingestDocument; private IngestDocument ingestDocument;
@ -51,7 +53,7 @@ public class CompoundProcessorTests extends ESTestCase {
TestProcessor processor = new TestProcessor(ingestDocument -> {}); TestProcessor processor = new TestProcessor(ingestDocument -> {});
CompoundProcessor compoundProcessor = new CompoundProcessor(processor); CompoundProcessor compoundProcessor = new CompoundProcessor(processor);
assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
assertThat(processor.getInvokedCounter(), equalTo(1)); assertThat(processor.getInvokedCounter(), equalTo(1));
@ -61,7 +63,7 @@ public class CompoundProcessorTests extends ESTestCase {
TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");}); TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");});
CompoundProcessor compoundProcessor = new CompoundProcessor(processor); CompoundProcessor compoundProcessor = new CompoundProcessor(processor);
assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
try { try {
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
@ -72,6 +74,16 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(processor.getInvokedCounter(), equalTo(1)); assertThat(processor.getInvokedCounter(), equalTo(1));
} }
public void testIgnoreFailure() throws Exception {
TestProcessor processor1 = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor2 = new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue("field", "value");});
CompoundProcessor compoundProcessor = new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList());
compoundProcessor.execute(ingestDocument);
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertThat(processor2.getInvokedCounter(), equalTo(1));
assertThat(ingestDocument.getFieldValue("field", String.class), equalTo("value"));
}
public void testSingleProcessorWithOnFailureProcessor() throws Exception { public void testSingleProcessorWithOnFailureProcessor() throws Exception {
TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor processor2 = new TestProcessor(ingestDocument -> { TestProcessor processor2 = new TestProcessor(ingestDocument -> {
@ -82,7 +94,8 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id"));
}); });
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor1), Collections.singletonList(processor2)); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1),
Collections.singletonList(processor2));
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor1.getInvokedCounter(), equalTo(1));
@ -106,8 +119,10 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("second")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("second"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id2")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id2"));
}); });
CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(Collections.singletonList(processorToFail), Collections.singletonList(lastProcessor)); CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(false, Collections.singletonList(processorToFail),
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(compoundOnFailProcessor)); Collections.singletonList(lastProcessor));
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor),
Collections.singletonList(compoundOnFailProcessor));
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
assertThat(processorToFail.getInvokedCounter(), equalTo(1)); assertThat(processorToFail.getInvokedCounter(), equalTo(1));
@ -126,7 +141,7 @@ public class CompoundProcessorTests extends ESTestCase {
CompoundProcessor failCompoundProcessor = new CompoundProcessor(firstProcessor); CompoundProcessor failCompoundProcessor = new CompoundProcessor(firstProcessor);
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(failCompoundProcessor), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor)); Collections.singletonList(secondProcessor));
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
@ -136,7 +151,8 @@ public class CompoundProcessorTests extends ESTestCase {
public void testCompoundProcessorExceptionFail() throws Exception { public void testCompoundProcessorExceptionFail() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor failProcessor = new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); TestProcessor failProcessor =
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata(); Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3)); assertThat(ingestMetadata.entrySet(), hasSize(3));
@ -145,10 +161,10 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail"));
}); });
CompoundProcessor failCompoundProcessor = new CompoundProcessor(Collections.singletonList(firstProcessor), CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor),
Collections.singletonList(failProcessor)); Collections.singletonList(failProcessor));
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(failCompoundProcessor), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor)); Collections.singletonList(secondProcessor));
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);
@ -158,7 +174,8 @@ public class CompoundProcessorTests extends ESTestCase {
public void testCompoundProcessorExceptionFailInOnFailure() throws Exception { public void testCompoundProcessorExceptionFailInOnFailure() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor failProcessor = new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");}); TestProcessor failProcessor =
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> { TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata(); Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3)); assertThat(ingestMetadata.entrySet(), hasSize(3));
@ -167,10 +184,10 @@ public class CompoundProcessorTests extends ESTestCase {
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail"));
}); });
CompoundProcessor failCompoundProcessor = new CompoundProcessor(Collections.singletonList(firstProcessor), CompoundProcessor failCompoundProcessor = new CompoundProcessor(false, Collections.singletonList(firstProcessor),
Collections.singletonList(new CompoundProcessor(failProcessor))); Collections.singletonList(new CompoundProcessor(failProcessor)));
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(failCompoundProcessor), CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor)); Collections.singletonList(secondProcessor));
compoundProcessor.execute(ingestDocument); compoundProcessor.execute(ingestDocument);

View File

@ -35,6 +35,7 @@ import java.util.Map;
import java.util.prefs.PreferencesFactory; import java.util.prefs.PreferencesFactory;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
public class PipelineFactoryTests extends ESTestCase { public class PipelineFactoryTests extends ESTestCase {
@ -45,7 +46,8 @@ public class PipelineFactoryTests extends ESTestCase {
processorConfig0.put(AbstractProcessorFactory.TAG_KEY, "first-processor"); processorConfig0.put(AbstractProcessorFactory.TAG_KEY, "first-processor");
Map<String, Object> pipelineConfig = new HashMap<>(); Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.PROCESSORS_KEY, Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1))); pipelineConfig.put(Pipeline.PROCESSORS_KEY,
Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1)));
Pipeline.Factory factory = new Pipeline.Factory(); Pipeline.Factory factory = new Pipeline.Factory();
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory())); ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
@ -87,6 +89,28 @@ public class PipelineFactoryTests extends ESTestCase {
assertThat(pipeline.getOnFailureProcessors().get(0).getType(), equalTo("test-processor")); assertThat(pipeline.getOnFailureProcessors().get(0).getType(), equalTo("test-processor"));
} }
public void testCreateWithPipelineIgnoreFailure() throws Exception {
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("ignore_failure", true);
ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory()));
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.PROCESSORS_KEY,
Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getProcessors().size(), equalTo(1));
assertThat(pipeline.getOnFailureProcessors().size(), equalTo(0));
CompoundProcessor processor = (CompoundProcessor) pipeline.getProcessors().get(0);
assertThat(processor.isIgnoreFailure(), is(true));
assertThat(processor.getProcessors().get(0).getType(), equalTo("test-processor"));
}
public void testCreateUnusedProcessorOptions() throws Exception { public void testCreateUnusedProcessorOptions() throws Exception {
Map<String, Object> processorConfig = new HashMap<>(); Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("unused", "value"); processorConfig.put("unused", "value");
@ -121,7 +145,8 @@ public class PipelineFactoryTests extends ESTestCase {
public void testFlattenProcessors() throws Exception { public void testFlattenProcessors() throws Exception {
TestProcessor testProcessor = new TestProcessor(ingestDocument -> {}); TestProcessor testProcessor = new TestProcessor(ingestDocument -> {});
CompoundProcessor processor1 = new CompoundProcessor(testProcessor, testProcessor); CompoundProcessor processor1 = new CompoundProcessor(testProcessor, testProcessor);
CompoundProcessor processor2 = new CompoundProcessor(Collections.singletonList(testProcessor), Collections.singletonList(testProcessor)); CompoundProcessor processor2 =
new CompoundProcessor(false, Collections.singletonList(testProcessor), Collections.singletonList(testProcessor));
Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor1, processor2)); Pipeline pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor1, processor2));
List<Processor> flattened = pipeline.flattenAllProcessors(); List<Processor> flattened = pipeline.flattenAllProcessors();
assertThat(flattened.size(), equalTo(4)); assertThat(flattened.size(), equalTo(4));

View File

@ -51,7 +51,8 @@ public class ValueSourceTests extends ESTestCase {
myPreciousMap.put("field2", "value2"); myPreciousMap.put("field2", "value2");
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
ingestDocument.setFieldValue(TestTemplateService.instance().compile("field1"), ValueSource.wrap(myPreciousMap, TestTemplateService.instance())); ingestDocument.setFieldValue(TestTemplateService.instance().compile("field1"),
ValueSource.wrap(myPreciousMap, TestTemplateService.instance()));
ingestDocument.removeField("field1.field2"); ingestDocument.removeField("field1.field2");
assertThat(myPreciousMap.size(), equalTo(1)); assertThat(myPreciousMap.size(), equalTo(1));
@ -63,7 +64,8 @@ public class ValueSourceTests extends ESTestCase {
myPreciousList.add("value"); myPreciousList.add("value");
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
ingestDocument.setFieldValue(TestTemplateService.instance().compile("field1"), ValueSource.wrap(myPreciousList, TestTemplateService.instance())); ingestDocument.setFieldValue(TestTemplateService.instance().compile("field1"),
ValueSource.wrap(myPreciousList, TestTemplateService.instance()));
ingestDocument.removeField("field1.0"); ingestDocument.removeField("field1.0");
assertThat(myPreciousList.size(), equalTo(1)); assertThat(myPreciousList.size(), equalTo(1));

View File

@ -91,7 +91,7 @@ public class ForEachProcessorTests extends ESTestCase {
Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {}); Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {});
processor = new ForEachProcessor( processor = new ForEachProcessor(
"_tag", "values", "_tag", "values",
Collections.singletonList(new CompoundProcessor(Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor))) Collections.singletonList(new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)))
); );
processor.execute(ingestDocument); processor.execute(ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(3)); assertThat(testProcessor.getInvokedCounter(), equalTo(3));

View File

@ -91,8 +91,8 @@ public class TrackingResultProcessorTests extends ESTestCase {
RuntimeException exception = new RuntimeException("fail"); RuntimeException exception = new RuntimeException("fail");
TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; });
TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {});
CompoundProcessor actualProcessor = new CompoundProcessor( CompoundProcessor actualProcessor = new CompoundProcessor(false,
Arrays.asList(new CompoundProcessor( Arrays.asList(new CompoundProcessor(false,
Arrays.asList(failProcessor, onFailureProcessor), Arrays.asList(failProcessor, onFailureProcessor),
Arrays.asList(onFailureProcessor, failProcessor))), Arrays.asList(onFailureProcessor, failProcessor))),
Arrays.asList(onFailureProcessor)); Arrays.asList(onFailureProcessor));

View File

@ -566,6 +566,30 @@ the index to which failed documents get sent.
} }
-------------------------------------------------- --------------------------------------------------
Alternatively instead of defining behaviour in case of processor failure, it is also possible
to ignore a failure and continue with the next processor by specifying the `ignore_failure` setting.
In case in the example below the field `foo` doesn't exist the failure will be caught and the pipeline
continues to execute, which in this case means that the pipeline does nothing.
[source,js]
--------------------------------------------------
{
"description" : "my first pipeline with handled exceptions",
"processors" : [
{
"rename" : {
"field" : "foo",
"target_field" : "bar",
"ignore_failure" : true
}
}
]
}
--------------------------------------------------
The `ignore_failure` can be set on any processor and defaults to `false`.
[float] [float]
[[accessing-error-metadata]] [[accessing-error-metadata]]
=== Accessing Error Metadata From Processors Handling Exceptions === Accessing Error Metadata From Processors Handling Exceptions

View File

@ -106,3 +106,43 @@
- match: { _source.field1: "value1" } - match: { _source.field1: "value1" }
- match: { _source.foofield: "exists" } - match: { _source.foofield: "exists" }
- match: { _source.foofield2: "ran" } - match: { _source.foofield2: "ran" }
---
"Test pipeline with ignore_failure in a processor":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"fail" : {
"message" : "_message",
"ignore_failure": true
}
},
{
"set" : {
"field" : "field",
"value" : "value"
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
type: test
id: 1
pipeline: "my_pipeline"
body: {}
- do:
get:
index: test
type: test
id: 1
- match: { _source.field: "value" }