Add on_failure field to processors and pipelines.

both processors and pipelines now have the ability to define
a separate list of processors to be executed if the original line
of execution throws an Exception.

processors without an on_failure parameter defined will throw an
exception and exit the pipeline immediately. processors with on_failure
defined will catch the exception and allow for further processors to
run. Exceptions within the on_failure block will be treated the same as
the top-level.
This commit is contained in:
Tal Levy 2015-12-07 10:53:36 -08:00
parent 46f99a11a0
commit 0bf4c8fb82
11 changed files with 602 additions and 47 deletions

View File

@ -22,6 +22,7 @@ package org.elasticsearch.ingest;
import org.elasticsearch.ingest.processor.ConfigurationUtils; import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.processor.CompoundProcessor;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -36,21 +37,19 @@ public final class Pipeline {
private final String id; private final String id;
private final String description; private final String description;
private final List<Processor> processors; private final CompoundProcessor compoundProcessor;
public Pipeline(String id, String description, List<Processor> processors) { public Pipeline(String id, String description, CompoundProcessor compoundProcessor) {
this.id = id; this.id = id;
this.description = description; this.description = description;
this.processors = processors; this.compoundProcessor = compoundProcessor;
} }
/** /**
* Modifies the data of a document to be indexed based on the processor this pipeline holds * Modifies the data of a document to be indexed based on the processor this pipeline holds
*/ */
public void execute(IngestDocument ingestDocument) throws Exception { public void execute(IngestDocument ingestDocument) throws Exception {
for (Processor processor : processors) { compoundProcessor.execute(ingestDocument);
processor.execute(ingestDocument);
}
} }
/** /**
@ -71,33 +70,56 @@ public final class Pipeline {
* Unmodifiable list containing each processor that operates on the data. * Unmodifiable list containing each processor that operates on the data.
*/ */
public List<Processor> getProcessors() { public List<Processor> getProcessors() {
return processors; return compoundProcessor.getProcessors();
}
/**
* Unmodifiable list containing each on_failure processor that operates on the data in case of
* exception thrown in pipeline processors
*/
public List<Processor> getOnFailureProcessors() {
return compoundProcessor.getOnFailureProcessors();
} }
public final static class Factory { public final static class Factory {
private Processor readProcessor(Map<String, Processor.Factory> processorRegistry, String type, Map<String, Object> config) throws Exception {
Processor.Factory factory = processorRegistry.get(type);
if (factory != null) {
List<Processor> onFailureProcessors = readProcessors("on_failure", processorRegistry, config);
Processor processor = factory.create(config);
if (config.isEmpty() == false) {
throw new IllegalArgumentException("processor [" + type + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray()));
}
if (onFailureProcessors.isEmpty()) {
return processor;
} else {
return new CompoundProcessor(Arrays.asList(processor), onFailureProcessors);
}
} else {
throw new IllegalArgumentException("No processor type exist with name [" + type + "]");
}
}
private List<Processor> readProcessors(String fieldName, Map<String, Processor.Factory> processorRegistry, Map<String, Object> config) throws Exception {
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(config, fieldName);
List<Processor> onFailureProcessors = new ArrayList<>();
if (onFailureProcessorConfigs != null) {
for (Map<String, Map<String, Object>> processorConfigWithKey : onFailureProcessorConfigs) {
for (Map.Entry<String, Map<String, Object>> entry : processorConfigWithKey.entrySet()) {
onFailureProcessors.add(readProcessor(processorRegistry, entry.getKey(), entry.getValue()));
}
}
}
return onFailureProcessors;
}
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws Exception { public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(config, "description"); String description = ConfigurationUtils.readOptionalStringProperty(config, "description");
List<Processor> processors = new ArrayList<>(); List<Processor> processors = readProcessors("processors", processorRegistry, config);
@SuppressWarnings("unchecked") List<Processor> onFailureProcessors = readProcessors("on_failure", processorRegistry, config);
List<Map<String, Map<String, Object>>> processorConfigs = (List<Map<String, Map<String, Object>>>) config.get("processors"); CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors));
if (processorConfigs != null ) { return new Pipeline(id, description, compoundProcessor);
for (Map<String, Map<String, Object>> processor : processorConfigs) {
for (Map.Entry<String, Map<String, Object>> entry : processor.entrySet()) {
Processor.Factory factory = processorRegistry.get(entry.getKey());
if (factory != null) {
Map<String, Object> processorConfig = entry.getValue();
processors.add(factory.create(processorConfig));
if (processorConfig.isEmpty() == false) {
throw new IllegalArgumentException("processor [" + entry.getKey() + "] doesn't support one or more provided configuration parameters " + Arrays.toString(processorConfig.keySet().toArray()));
}
} else {
throw new IllegalArgumentException("No processor type exist with name [" + entry.getKey() + "]");
}
}
}
}
return new Pipeline(id, description, Collections.unmodifiableList(processors));
} }
} }

View File

@ -0,0 +1,81 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* A Processor that executes a list of other "processors". It executes a separate list of
* "onFailureProcessors" when any of the processors throw an {@link Exception}.
*/
public class CompoundProcessor implements Processor {
private final List<Processor> processors;
private final List<Processor> onFailureProcessors;
public CompoundProcessor(Processor... processor) {
this(Arrays.asList(processor), Collections.emptyList());
}
public CompoundProcessor(List<Processor> processors, List<Processor> onFailureProcessors) {
this.processors = processors;
this.onFailureProcessors = onFailureProcessors;
}
public List<Processor> getOnFailureProcessors() {
return onFailureProcessors;
}
public List<Processor> getProcessors() {
return processors;
}
@Override
public String getType() {
return "compound[" + processors.stream().map(p -> p.getType()).collect(Collectors.joining(",")) + "]";
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
try {
for (Processor processor : processors) {
processor.execute(ingestDocument);
}
} catch (Exception e) {
if (onFailureProcessors.isEmpty()) {
throw e;
} else {
executeOnFailure(ingestDocument);
}
}
}
void executeOnFailure(IngestDocument ingestDocument) throws Exception {
for (Processor processor : onFailureProcessors) {
processor.execute(ingestDocument);
}
}
}

View File

@ -52,6 +52,29 @@ public class PipelineFactoryTests extends ESTestCase {
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("test-processor")); assertThat(pipeline.getProcessors().get(0).getType(), equalTo("test-processor"));
} }
public void testCreateWithPipelineOnFailure() throws Exception {
Map<String, Object> processorConfig = new HashMap<>();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("description", "_description");
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test-processor", processorConfig)));
pipelineConfig.put("on_failure", Collections.singletonList(Collections.singletonMap("test-processor", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("test-processor");
Processor.Factory processorFactory = mock(Processor.Factory.class);
when(processorFactory.create(processorConfig)).thenReturn(processor);
processorRegistry.put("test-processor", processorFactory);
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getProcessors().size(), equalTo(1));
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("test-processor"));
assertThat(pipeline.getOnFailureProcessors().size(), equalTo(1));
assertThat(pipeline.getOnFailureProcessors().get(0).getType(), equalTo("test-processor"));
}
public void testCreateUnusedProcessorOptions() throws Exception { public void testCreateUnusedProcessorOptions() throws Exception {
Map<String, Object> processorConfig = new HashMap<>(); Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("unused", "value"); processorConfig.put("unused", "value");
@ -71,4 +94,26 @@ public class PipelineFactoryTests extends ESTestCase {
assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]")); assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
} }
} }
public void testCreateProcessorsWithOnFailureProperties() throws Exception {
Map<String, Object> processorConfig = new HashMap<>();
processorConfig.put("on_failure", Collections.singletonList(Collections.singletonMap("test", new HashMap<>())));
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("description", "_description");
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
Pipeline.Factory factory = new Pipeline.Factory();
Map<String, Processor.Factory> processorFactoryStore = new HashMap<>();
Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("test-processor");
Processor.Factory processorFactory = mock(Processor.Factory.class);
when(processorFactory.create(processorConfig)).thenReturn(processor);
processorFactoryStore.put("test", processorFactory);
Pipeline pipeline = factory.create("_id", pipelineConfig, processorFactoryStore);
assertThat(pipeline.getId(), equalTo("_id"));
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getProcessors().size(), equalTo(1));
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("compound[test-processor]"));
}
} }

View File

@ -0,0 +1,117 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.Arrays;
import java.util.HashMap;
import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
public class CompoundProcessorTests extends ESTestCase {
private IngestDocument ingestDocument;
@Before
public void init() {
ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
}
public void testEmpty() throws Exception {
CompoundProcessor processor = new CompoundProcessor();
assertThat(processor.getProcessors().isEmpty(), is(true));
assertThat(processor.getOnFailureProcessors().isEmpty(), is(true));
processor.execute(ingestDocument);
}
public void testSingleProcessor() throws Exception {
Processor processor = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(processor);
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
compoundProcessor.execute(ingestDocument);
verify(processor, times(1)).execute(ingestDocument);
}
public void testSingleProcessorWithException() throws Exception {
Processor processor = mock(Processor.class);
doThrow(new RuntimeException("error")).doNothing().when(processor).execute(ingestDocument);
CompoundProcessor compoundProcessor = new CompoundProcessor(processor);
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor));
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
try {
compoundProcessor.execute(ingestDocument);
fail("should throw exception");
} catch (Exception e) {
assertThat(e.getMessage(), equalTo("error"));
}
verify(processor, times(1)).execute(ingestDocument);
}
public void testSingleProcessorWithOnFailureProcessor() throws Exception {
Processor processor = mock(Processor.class);
doThrow(new RuntimeException("error")).doNothing().when(processor).execute(ingestDocument);
Processor processorNext = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(Arrays.asList(processor), Arrays.asList(processorNext));
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor));
assertThat(compoundProcessor.getOnFailureProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getOnFailureProcessors().get(0), equalTo(processorNext));
compoundProcessor.execute(ingestDocument);
verify(processor, times(1)).execute(ingestDocument);
verify(processorNext, times(1)).execute(ingestDocument);
}
public void testSingleProcessorWithNestedFailures() throws Exception {
Processor processor = mock(Processor.class);
doThrow(new RuntimeException("error")).doNothing().when(processor).execute(ingestDocument);
Processor processorToFail = mock(Processor.class);
doThrow(new RuntimeException("error")).doNothing().when(processorToFail).execute(ingestDocument);
Processor lastProcessor = mock(Processor.class);
CompoundProcessor innerCompoundOnFailProcessor = new CompoundProcessor(Arrays.asList(processorToFail), Arrays.asList(lastProcessor));
CompoundProcessor compoundOnFailProcessor = spy(innerCompoundOnFailProcessor);
CompoundProcessor innerCompoundProcessor = new CompoundProcessor(Arrays.asList(processor), Arrays.asList(compoundOnFailProcessor));
CompoundProcessor compoundProcessor = spy(innerCompoundProcessor);
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor));
assertThat(compoundProcessor.getOnFailureProcessors().size(), equalTo(1));
assertThat(compoundProcessor.getOnFailureProcessors().get(0), equalTo(compoundOnFailProcessor));
compoundProcessor.execute(ingestDocument);
verify(processor, times(1)).execute(ingestDocument);
verify(compoundProcessor, times(1)).executeOnFailure(ingestDocument);
verify(compoundOnFailProcessor, times(1)).execute(ingestDocument);
verify(processorToFail, times(1)).execute(ingestDocument);
verify(compoundOnFailProcessor, times(1)).executeOnFailure(ingestDocument);
verify(lastProcessor, times(1)).execute(ingestDocument);
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.processor.CompoundProcessor;
import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.processor.set.SetProcessor; import org.elasticsearch.ingest.processor.set.SetProcessor;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -38,6 +39,7 @@ import org.mockito.ArgumentMatcher;
import org.mockito.Matchers; import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -85,8 +87,8 @@ public class PipelineExecutionServiceTests extends ESTestCase {
} }
public void testExecuteSuccess() throws Exception { public void testExecuteSuccess() throws Exception {
Processor processor = mock(Processor.class); CompoundProcessor processor = mock(CompoundProcessor.class);
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
Consumer<Throwable> failureHandler = mock(Consumer.class); Consumer<Throwable> failureHandler = mock(Consumer.class);
@ -99,7 +101,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
} }
public void testExecutePropagateAllMetaDataUpdates() throws Exception { public void testExecutePropagateAllMetaDataUpdates() throws Exception {
Processor processor = mock(Processor.class); CompoundProcessor processor = mock(CompoundProcessor.class);
doAnswer((InvocationOnMock invocationOnMock) -> { doAnswer((InvocationOnMock invocationOnMock) -> {
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0]; IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) { for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) {
@ -112,7 +114,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
} }
return null; return null;
}).when(processor).execute(any()); }).when(processor).execute(any());
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
Consumer<Throwable> failureHandler = mock(Consumer.class); Consumer<Throwable> failureHandler = mock(Consumer.class);
@ -132,8 +134,8 @@ public class PipelineExecutionServiceTests extends ESTestCase {
} }
public void testExecuteFailure() throws Exception { public void testExecuteFailure() throws Exception {
Processor processor = mock(Processor.class); CompoundProcessor processor = mock(CompoundProcessor.class);
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
Consumer<Throwable> failureHandler = mock(Consumer.class); Consumer<Throwable> failureHandler = mock(Consumer.class);
@ -144,6 +146,57 @@ public class PipelineExecutionServiceTests extends ESTestCase {
verify(completionHandler, never()).accept(anyBoolean()); verify(completionHandler, never()).accept(anyBoolean());
} }
public void testExecuteSuccessWithOnFailure() throws Exception {
Processor processor = mock(Processor.class);
Processor onFailureProcessor = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
Consumer<Throwable> failureHandler = mock(Consumer.class);
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
//TODO we remove metadata, this check is not valid anymore, what do we replace it with?
//verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(failureHandler, never()).accept(any(RuntimeException.class));
verify(completionHandler, times(1)).accept(true);
}
public void testExecuteFailureWithOnFailure() throws Exception {
Processor processor = mock(Processor.class);
Processor onFailureProcessor = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
Consumer<Throwable> failureHandler = mock(Consumer.class);
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(failureHandler, times(1)).accept(any(RuntimeException.class));
verify(completionHandler, never()).accept(anyBoolean());
}
public void testExecuteFailureWithNestedOnFailure() throws Exception {
Processor processor = mock(Processor.class);
Processor onFailureProcessor = mock(Processor.class);
Processor onFailureOnFailureProcessor = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor),
Collections.singletonList(new CompoundProcessor(Arrays.asList(onFailureProcessor),Arrays.asList(onFailureOnFailureProcessor))));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
Consumer<Throwable> failureHandler = mock(Consumer.class);
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(failureHandler, times(1)).accept(any(RuntimeException.class));
verify(completionHandler, never()).accept(anyBoolean());
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testExecuteTTL() throws Exception { public void testExecuteTTL() throws Exception {
// test with valid ttl // test with valid ttl
@ -152,7 +205,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
config.put("field", "_ttl"); config.put("field", "_ttl");
config.put("value", "5d"); config.put("value", "5d");
Processor processor = metaProcessorFactory.create(config); Processor processor = metaProcessorFactory.create(config);
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
Consumer<Throwable> failureHandler = mock(Consumer.class); Consumer<Throwable> failureHandler = mock(Consumer.class);
@ -169,7 +222,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
config.put("field", "_ttl"); config.put("field", "_ttl");
config.put("value", "abc"); config.put("value", "abc");
processor = metaProcessorFactory.create(config); processor = metaProcessorFactory.create(config);
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor)));
indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
failureHandler = mock(Consumer.class); failureHandler = mock(Consumer.class);
@ -179,7 +232,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
verify(completionHandler, never()).accept(anyBoolean()); verify(completionHandler, never()).accept(anyBoolean());
// test with provided ttl // test with provided ttl
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.emptyList())); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", mock(CompoundProcessor.class)));
indexRequest = new IndexRequest("_index", "_type", "_id") indexRequest = new IndexRequest("_index", "_type", "_id")
.source(Collections.emptyMap()) .source(Collections.emptyMap())
@ -217,10 +270,10 @@ public class PipelineExecutionServiceTests extends ESTestCase {
String pipelineId = "_id"; String pipelineId = "_id";
Processor pipeline = mock(Processor.class); CompoundProcessor processor = mock(CompoundProcessor.class);
Exception error = new RuntimeException(); Exception error = new RuntimeException();
doThrow(error).when(pipeline).execute(any()); doThrow(error).when(processor).execute(any());
when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, Collections.singletonList(pipeline))); when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, processor));
Consumer<Throwable> requestItemErrorHandler = mock(Consumer.class); Consumer<Throwable> requestItemErrorHandler = mock(Consumer.class);
Consumer<Boolean> completionHandler = mock(Consumer.class); Consumer<Boolean> completionHandler = mock(Consumer.class);
@ -241,7 +294,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
} }
String pipelineId = "_id"; String pipelineId = "_id";
when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, Collections.emptyList())); when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, new CompoundProcessor()));
Consumer<Throwable> requestItemErrorHandler = mock(Consumer.class); Consumer<Throwable> requestItemErrorHandler = mock(Consumer.class);
Consumer<Boolean> completionHandler = mock(Consumer.class); Consumer<Boolean> completionHandler = mock(Consumer.class);

View File

@ -32,6 +32,7 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.CompoundProcessor;
import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.plugin.ingest.IngestBootstrapper; import org.elasticsearch.plugin.ingest.IngestBootstrapper;
import org.elasticsearch.plugin.ingest.IngestPlugin; import org.elasticsearch.plugin.ingest.IngestPlugin;
@ -186,7 +187,7 @@ public class IngestActionFilterTests extends ESTestCase {
return null; return null;
} }
}; };
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor)));
executionService = new PipelineExecutionService(store, threadPool); executionService = new PipelineExecutionService(store, threadPool);
IngestBootstrapper bootstrapper = mock(IngestBootstrapper.class); IngestBootstrapper bootstrapper = mock(IngestBootstrapper.class);
when(bootstrapper.getPipelineExecutionService()).thenReturn(executionService); when(bootstrapper.getPipelineExecutionService()).thenReturn(executionService);

View File

@ -23,7 +23,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.ingest.processor.CompoundProcessor;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After; import org.junit.After;
@ -47,7 +47,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
private ThreadPool threadPool; private ThreadPool threadPool;
private SimulateExecutionService executionService; private SimulateExecutionService executionService;
private Pipeline pipeline; private Pipeline pipeline;
private Processor processor; private CompoundProcessor processor;
private IngestDocument ingestDocument; private IngestDocument ingestDocument;
@Before @Before
@ -58,10 +58,9 @@ public class SimulateExecutionServiceTests extends ESTestCase {
.build() .build()
); );
executionService = new SimulateExecutionService(threadPool); executionService = new SimulateExecutionService(threadPool);
processor = mock(Processor.class); processor = mock(CompoundProcessor.class);
when(processor.getType()).thenReturn("mock"); when(processor.getType()).thenReturn("mock");
pipeline = new Pipeline("_id", "_description", Arrays.asList(processor, processor)); pipeline = new Pipeline("_id", "_description", new CompoundProcessor(processor, processor));
//ingestDocument = new IngestDocument("_index", "_type", "_id", Collections.singletonMap("foo", "bar"));
ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.CompoundProcessor;
import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.plugin.ingest.PipelineStore; import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -28,6 +29,7 @@ import org.junit.Before;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -49,7 +51,9 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
@Before @Before
public void init() throws IOException { public void init() throws IOException {
Pipeline pipeline = new Pipeline(SimulatePipelineRequest.SIMULATED_PIPELINE_ID, null, Collections.singletonList(mock(Processor.class))); CompoundProcessor pipelineCompoundProcessor = mock(CompoundProcessor.class);
when(pipelineCompoundProcessor.getProcessors()).thenReturn(Arrays.asList(mock(Processor.class)));
Pipeline pipeline = new Pipeline(SimulatePipelineRequest.SIMULATED_PIPELINE_ID, null, pipelineCompoundProcessor);
Map<String, Processor.Factory> processorRegistry = new HashMap<>(); Map<String, Processor.Factory> processorRegistry = new HashMap<>();
processorRegistry.put("mock_processor", mock(Processor.Factory.class)); processorRegistry.put("mock_processor", mock(Processor.Factory.class));
store = mock(PipelineStore.class); store = mock(PipelineStore.class);
@ -133,9 +137,28 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
List<Map<String, Object>> processors = new ArrayList<>(); List<Map<String, Object>> processors = new ArrayList<>();
int numProcessors = randomIntBetween(1, 10); int numProcessors = randomIntBetween(1, 10);
for (int i = 0; i < numProcessors; i++) { for (int i = 0; i < numProcessors; i++) {
processors.add(Collections.singletonMap("mock_processor", Collections.emptyMap())); Map<String, Object> processorConfig = new HashMap<>();
List<Map<String, Object>> onFailureProcessors = new ArrayList<>();
int numOnFailureProcessors = randomIntBetween(0, 1);
for (int j = 0; j < numOnFailureProcessors; j++) {
onFailureProcessors.add(Collections.singletonMap("mock_processor", Collections.emptyMap()));
}
if (numOnFailureProcessors > 0) {
processorConfig.put("on_failure", onFailureProcessors);
}
processors.add(Collections.singletonMap("mock_processor", processorConfig));
} }
pipelineConfig.put("processors", processors); pipelineConfig.put("processors", processors);
List<Map<String, Object>> onFailureProcessors = new ArrayList<>();
int numOnFailureProcessors = randomIntBetween(0, 1);
for (int i = 0; i < numOnFailureProcessors; i++) {
onFailureProcessors.add(Collections.singletonMap("mock_processor", Collections.emptyMap()));
}
if (numOnFailureProcessors > 0) {
pipelineConfig.put("on_failure", onFailureProcessors);
}
requestContent.put(Fields.PIPELINE, pipelineConfig); requestContent.put(Fields.PIPELINE, pipelineConfig);
SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parse(requestContent, false, store); SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parse(requestContent, false, store);

View File

@ -72,3 +72,53 @@
} }
] ]
} }
---
"Test basic pipeline with on_failure in processor":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value",
"on_failure": [
{
"set" : {
"field" : "field2",
"value" : "_failed_value"
}
}
]
}
}
]
}
- match: { _index: ".ingest" }
- match: { _type: "pipeline" }
- match: { _version: 1 }
- match: { _id: "my_pipeline" }
- do:
ingest.get_pipeline:
id: "my_pipeline"
- match: { my_pipeline._source.description: "_description" }
- match: { my_pipeline._version: 1 }
- do:
ingest.delete_pipeline:
id: "my_pipeline"
- match: { _index: ".ingest" }
- match: { _type: "pipeline" }
- match: { _version: 2 }
- match: { _id: "my_pipeline" }
- match: { found: true }
- do:
catch: missing
ingest.get_pipeline:
id: "my_pipeline"

View File

@ -69,6 +69,48 @@
} }
- length: { docs: 1 } - length: { docs: 1 }
---
"Test simulate with provided pipeline definition with on_failure block":
- do:
ingest.simulate:
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"rename" : {
"field" : "does_not_exist",
"to" : "field2",
"on_failure" : [
{
"set" : {
"field" : "field2",
"value" : "_value"
}
}
]
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"foo": "bar"
}
}
]
}
- length: { docs: 1 }
- match: { docs.0.doc._source.foo: "bar" }
- match: { docs.0.doc._source.field2: "_value" }
- length: { docs.0.doc._ingest: 1 }
- is_true: docs.0.doc._ingest.timestamp
--- ---
"Test simulate with no provided pipeline or pipeline_id": "Test simulate with no provided pipeline or pipeline_id":
- do: - do:

View File

@ -0,0 +1,122 @@
---
"Test Pipeline With On Failure Block":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"rename" : {
"field" : "foofield",
"to" : "field1"
}
},
{
"grok" : {
"field" : "field1",
"pattern" : "%{NUMBER:val} %{NUMBER:status} <%{WORD:msg}>"
}
}
],
"on_failure" : [
{
"grok" : {
"field" : "field1",
"pattern" : "%{NUMBER:val:float} %{NUMBER:status:int} <%{WORD:msg}>"
}
},
{
"set" : {
"field" : "_failed",
"value" : true
}
}
]
}
- match: { _id: "my_pipeline" }
- do:
ingest.index:
index: test
type: test
id: 1
pipeline_id: "my_pipeline"
body: {field1: "123.42 400 <foo>"}
- do:
get:
index: test
type: test
id: 1
- match: { _source.val: 123.42 }
- match: { _source.status: 400 }
- match: { _source.msg: "foo" }
- match: { _source._failed: true }
---
"Test Pipeline With Nested Processor On Failures":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"grok" : {
"field" : "field1",
"pattern" : "%{NUMBER:val:float} %{NUMBER:status:int} <%{WORD:msg}>"
}
},
{
"rename" : {
"field" : "foofield",
"to" : "field1",
"on_failure" : [
{
"set" : {
"field" : "foofield",
"value" : "exists"
}
},
{
"rename" : {
"field" : "foofield2",
"to" : "field1",
"on_failure" : [
{
"set" : {
"field" : "foofield2",
"value" : "ran"
}
}
]
}
}
]
}
}
]
}
- match: { _id: "my_pipeline" }
- do:
ingest.index:
index: test
type: test
id: 1
pipeline_id: "my_pipeline"
body: {field1: "123.42 400 <foo>"}
- do:
get:
index: test
type: test
id: 1
- match: { _source.val: 123.42 }
- match: { _source.msg: "foo" }
- match: { _source.status: 400 }
- match: { _source.foofield: "exists" }
- match: { _source.foofield2: "ran" }