move ingest api to core
This commit is contained in:
parent
702f712204
commit
2478aafa46
|
@ -18,9 +18,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
package org.elasticsearch.ingest.processor;
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -42,6 +40,7 @@ public class CompoundProcessor implements Processor {
|
||||||
public CompoundProcessor(Processor... processor) {
|
public CompoundProcessor(Processor... processor) {
|
||||||
this(Arrays.asList(processor), Collections.emptyList());
|
this(Arrays.asList(processor), Collections.emptyList());
|
||||||
}
|
}
|
||||||
|
|
||||||
public CompoundProcessor(List<Processor> processors, List<Processor> onFailureProcessors) {
|
public CompoundProcessor(List<Processor> processors, List<Processor> onFailureProcessors) {
|
||||||
this.processors = processors;
|
this.processors = processors;
|
||||||
this.onFailureProcessors = onFailureProcessors;
|
this.onFailureProcessors = onFailureProcessors;
|
||||||
|
@ -57,7 +56,7 @@ public class CompoundProcessor implements Processor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getType() {
|
public String getType() {
|
||||||
return "compound[" + processors.stream().map(p -> p.getType()).collect(Collectors.joining(",")) + "]";
|
return "compound[" + processors.stream().map(Processor::getType).collect(Collectors.joining(",")) + "]";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
|
@ -17,7 +17,7 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.ingest.processor;
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
|
@ -20,6 +20,14 @@
|
||||||
package org.elasticsearch.ingest;
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.index.mapper.internal.IdFieldMapper;
|
||||||
|
import org.elasticsearch.index.mapper.internal.IndexFieldMapper;
|
||||||
|
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
|
||||||
|
import org.elasticsearch.index.mapper.internal.RoutingFieldMapper;
|
||||||
|
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
|
||||||
|
import org.elasticsearch.index.mapper.internal.TTLFieldMapper;
|
||||||
|
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
|
||||||
|
import org.elasticsearch.index.mapper.internal.TypeFieldMapper;
|
||||||
|
|
||||||
import java.text.DateFormat;
|
import java.text.DateFormat;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
|
@ -38,7 +46,6 @@ import java.util.TimeZone;
|
||||||
public final class IngestDocument {
|
public final class IngestDocument {
|
||||||
|
|
||||||
public final static String INGEST_KEY = "_ingest";
|
public final static String INGEST_KEY = "_ingest";
|
||||||
public final static String SOURCE_KEY = "_source";
|
|
||||||
|
|
||||||
static final String TIMESTAMP = "timestamp";
|
static final String TIMESTAMP = "timestamp";
|
||||||
|
|
||||||
|
@ -348,7 +355,7 @@ public final class IngestDocument {
|
||||||
if (append) {
|
if (append) {
|
||||||
if (map.containsKey(leafKey)) {
|
if (map.containsKey(leafKey)) {
|
||||||
Object object = map.get(leafKey);
|
Object object = map.get(leafKey);
|
||||||
List<Object> list = appendValues(path, object, value);
|
List<Object> list = appendValues(object, value);
|
||||||
if (list != object) {
|
if (list != object) {
|
||||||
map.put(leafKey, list);
|
map.put(leafKey, list);
|
||||||
}
|
}
|
||||||
|
@ -374,7 +381,7 @@ public final class IngestDocument {
|
||||||
}
|
}
|
||||||
if (append) {
|
if (append) {
|
||||||
Object object = list.get(index);
|
Object object = list.get(index);
|
||||||
List<Object> newList = appendValues(path, object, value);
|
List<Object> newList = appendValues(object, value);
|
||||||
if (newList != object) {
|
if (newList != object) {
|
||||||
list.set(index, newList);
|
list.set(index, newList);
|
||||||
}
|
}
|
||||||
|
@ -387,7 +394,7 @@ public final class IngestDocument {
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private static List<Object> appendValues(String path, Object maybeList, Object value) {
|
private static List<Object> appendValues(Object maybeList, Object value) {
|
||||||
List<Object> list;
|
List<Object> list;
|
||||||
if (maybeList instanceof List) {
|
if (maybeList instanceof List) {
|
||||||
//maybeList is already a list, we append the provided values to it
|
//maybeList is already a list, we append the provided values to it
|
||||||
|
@ -427,7 +434,7 @@ public final class IngestDocument {
|
||||||
|
|
||||||
private Map<String, Object> createTemplateModel() {
|
private Map<String, Object> createTemplateModel() {
|
||||||
Map<String, Object> model = new HashMap<>(sourceAndMetadata);
|
Map<String, Object> model = new HashMap<>(sourceAndMetadata);
|
||||||
model.put(SOURCE_KEY, sourceAndMetadata);
|
model.put(SourceFieldMapper.NAME, sourceAndMetadata);
|
||||||
// If there is a field in the source with the name '_ingest' it gets overwritten here,
|
// If there is a field in the source with the name '_ingest' it gets overwritten here,
|
||||||
// if access to that field is required then it get accessed via '_source._ingest'
|
// if access to that field is required then it get accessed via '_source._ingest'
|
||||||
model.put(INGEST_KEY, ingestMetadata);
|
model.put(INGEST_KEY, ingestMetadata);
|
||||||
|
@ -489,13 +496,13 @@ public final class IngestDocument {
|
||||||
}
|
}
|
||||||
|
|
||||||
public enum MetaData {
|
public enum MetaData {
|
||||||
INDEX("_index"),
|
INDEX(IndexFieldMapper.NAME),
|
||||||
TYPE("_type"),
|
TYPE(TypeFieldMapper.NAME),
|
||||||
ID("_id"),
|
ID(IdFieldMapper.NAME),
|
||||||
ROUTING("_routing"),
|
ROUTING(RoutingFieldMapper.NAME),
|
||||||
PARENT("_parent"),
|
PARENT(ParentFieldMapper.NAME),
|
||||||
TIMESTAMP("_timestamp"),
|
TIMESTAMP(TimestampFieldMapper.NAME),
|
||||||
TTL("_ttl");
|
TTL(TTLFieldMapper.NAME);
|
||||||
|
|
||||||
private final String fieldName;
|
private final String fieldName;
|
||||||
|
|
||||||
|
@ -506,7 +513,6 @@ public final class IngestDocument {
|
||||||
public String getFieldName() {
|
public String getFieldName() {
|
||||||
return fieldName;
|
return fieldName;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private class FieldPath {
|
private class FieldPath {
|
||||||
|
@ -523,7 +529,7 @@ public final class IngestDocument {
|
||||||
newPath = path.substring(8, path.length());
|
newPath = path.substring(8, path.length());
|
||||||
} else {
|
} else {
|
||||||
initialContext = sourceAndMetadata;
|
initialContext = sourceAndMetadata;
|
||||||
if (path.startsWith(SOURCE_KEY + ".")) {
|
if (path.startsWith(SourceFieldMapper.NAME + ".")) {
|
||||||
newPath = path.substring(8, path.length());
|
newPath = path.substring(8, path.length());
|
||||||
} else {
|
} else {
|
||||||
newPath = path;
|
newPath = path;
|
|
@ -20,10 +20,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.ingest;
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.processor.ConfigurationUtils;
|
|
||||||
import org.elasticsearch.ingest.processor.Processor;
|
|
||||||
import org.elasticsearch.ingest.processor.CompoundProcessor;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -92,12 +88,10 @@ public final class Pipeline {
|
||||||
}
|
}
|
||||||
if (onFailureProcessors.isEmpty()) {
|
if (onFailureProcessors.isEmpty()) {
|
||||||
return processor;
|
return processor;
|
||||||
} else {
|
|
||||||
return new CompoundProcessor(Arrays.asList(processor), onFailureProcessors);
|
|
||||||
}
|
}
|
||||||
} else {
|
return new CompoundProcessor(Collections.singletonList(processor), onFailureProcessors);
|
||||||
throw new IllegalArgumentException("No processor type exist with name [" + type + "]");
|
|
||||||
}
|
}
|
||||||
|
throw new IllegalArgumentException("No processor type exist with name [" + type + "]");
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<Processor> readProcessors(String fieldName, Map<String, Processor.Factory> processorRegistry, Map<String, Object> config) throws Exception {
|
private List<Processor> readProcessors(String fieldName, Map<String, Processor.Factory> processorRegistry, Map<String, Object> config) throws Exception {
|
||||||
|
@ -121,6 +115,5 @@ public final class Pipeline {
|
||||||
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors));
|
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors));
|
||||||
return new Pipeline(id, description, compoundProcessor);
|
return new Pipeline(id, description, compoundProcessor);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -18,9 +18,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
package org.elasticsearch.ingest.processor;
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
|
@ -17,11 +17,9 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.plugin.ingest;
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.ingest.TemplateService;
|
|
||||||
import org.elasticsearch.ingest.processor.Processor;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The ingest framework (pipeline, processor and processor factory) can't rely on ES specific code. However some
|
* The ingest framework (pipeline, processor and processor factory) can't rely on ES specific code. However some
|
||||||
|
@ -29,9 +27,8 @@ import org.elasticsearch.ingest.processor.Processor;
|
||||||
* so we need some code that provides the physical location of the configuration directory to the processor factories
|
* so we need some code that provides the physical location of the configuration directory to the processor factories
|
||||||
* that need this and this is what this processor factory provider does.
|
* that need this and this is what this processor factory provider does.
|
||||||
*/
|
*/
|
||||||
|
//TODO this abstraction could be removed once ingest-core is part of es core?
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
interface ProcessorFactoryProvider {
|
public interface ProcessorFactoryProvider {
|
||||||
|
|
||||||
Processor.Factory get(Environment environment, TemplateService templateService);
|
Processor.Factory get(Environment environment, TemplateService templateService);
|
||||||
|
|
||||||
}
|
}
|
|
@ -0,0 +1,51 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.inject.AbstractModule;
|
||||||
|
import org.elasticsearch.common.inject.multibindings.MapBinder;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Registry for processor factories
|
||||||
|
* @see org.elasticsearch.ingest.Processor.Factory
|
||||||
|
* @see ProcessorFactoryProvider
|
||||||
|
*/
|
||||||
|
public class ProcessorsModule extends AbstractModule {
|
||||||
|
|
||||||
|
private final Map<String, ProcessorFactoryProvider> processorFactoryProviders = new HashMap<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void configure() {
|
||||||
|
MapBinder<String, ProcessorFactoryProvider> mapBinder = MapBinder.newMapBinder(binder(), String.class, ProcessorFactoryProvider.class);
|
||||||
|
for (Map.Entry<String, ProcessorFactoryProvider> entry : processorFactoryProviders.entrySet()) {
|
||||||
|
mapBinder.addBinding(entry.getKey()).toInstance(entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds a processor factory under a specific type name.
|
||||||
|
*/
|
||||||
|
public void addProcessor(String type, ProcessorFactoryProvider processorFactoryProvider) {
|
||||||
|
processorFactoryProviders.put(type, processorFactoryProvider);
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,6 +24,7 @@ import java.util.Map;
|
||||||
* Abstraction for the template engine.
|
* Abstraction for the template engine.
|
||||||
*/
|
*/
|
||||||
// NOTE: this abstraction is added because the 'org.elasticsearch.ingest' has the requirement to be ES agnostic
|
// NOTE: this abstraction is added because the 'org.elasticsearch.ingest' has the requirement to be ES agnostic
|
||||||
|
//TODO this abstraction could be removed once ingest-core is part of es core?
|
||||||
public interface TemplateService {
|
public interface TemplateService {
|
||||||
|
|
||||||
Template compile(String template);
|
Template compile(String template);
|
|
@ -0,0 +1,110 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
|
||||||
|
public class CompoundProcessorTests extends ESTestCase {
|
||||||
|
private IngestDocument ingestDocument;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void init() {
|
||||||
|
ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testEmpty() throws Exception {
|
||||||
|
CompoundProcessor processor = new CompoundProcessor();
|
||||||
|
assertThat(processor.getProcessors().isEmpty(), is(true));
|
||||||
|
assertThat(processor.getOnFailureProcessors().isEmpty(), is(true));
|
||||||
|
processor.execute(ingestDocument);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSingleProcessor() throws Exception {
|
||||||
|
TestProcessor processor = new TestProcessor(ingestDocument -> {});
|
||||||
|
CompoundProcessor compoundProcessor = new CompoundProcessor(processor);
|
||||||
|
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
|
||||||
|
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor));
|
||||||
|
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
|
||||||
|
compoundProcessor.execute(ingestDocument);
|
||||||
|
assertThat(processor.getInvokedCounter(), equalTo(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSingleProcessorWithException() throws Exception {
|
||||||
|
TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");});
|
||||||
|
CompoundProcessor compoundProcessor = new CompoundProcessor(processor);
|
||||||
|
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
|
||||||
|
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor));
|
||||||
|
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
|
||||||
|
try {
|
||||||
|
compoundProcessor.execute(ingestDocument);
|
||||||
|
fail("should throw exception");
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertThat(e.getMessage(), equalTo("error"));
|
||||||
|
}
|
||||||
|
assertThat(processor.getInvokedCounter(), equalTo(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSingleProcessorWithOnFailureProcessor() throws Exception {
|
||||||
|
TestProcessor processor1 = new TestProcessor("first", ingestDocument -> {throw new RuntimeException("error");});
|
||||||
|
TestProcessor processor2 = new TestProcessor(ingestDocument -> {
|
||||||
|
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
|
||||||
|
assertThat(ingestMetadata.size(), equalTo(2));
|
||||||
|
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
|
||||||
|
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first"));
|
||||||
|
});
|
||||||
|
|
||||||
|
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor1), Collections.singletonList(processor2));
|
||||||
|
compoundProcessor.execute(ingestDocument);
|
||||||
|
|
||||||
|
assertThat(processor1.getInvokedCounter(), equalTo(1));
|
||||||
|
assertThat(processor2.getInvokedCounter(), equalTo(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSingleProcessorWithNestedFailures() throws Exception {
|
||||||
|
TestProcessor processor = new TestProcessor("first", ingestDocument -> {throw new RuntimeException("error");});
|
||||||
|
TestProcessor processorToFail = new TestProcessor("second", ingestDocument -> {
|
||||||
|
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
|
||||||
|
assertThat(ingestMetadata.size(), equalTo(2));
|
||||||
|
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
|
||||||
|
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first"));
|
||||||
|
throw new RuntimeException("error");
|
||||||
|
});
|
||||||
|
TestProcessor lastProcessor = new TestProcessor(ingestDocument -> {
|
||||||
|
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
|
||||||
|
assertThat(ingestMetadata.size(), equalTo(2));
|
||||||
|
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
|
||||||
|
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("second"));
|
||||||
|
});
|
||||||
|
CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(Collections.singletonList(processorToFail), Collections.singletonList(lastProcessor));
|
||||||
|
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(compoundOnFailProcessor));
|
||||||
|
compoundProcessor.execute(ingestDocument);
|
||||||
|
|
||||||
|
assertThat(processorToFail.getInvokedCounter(), equalTo(1));
|
||||||
|
assertThat(lastProcessor.getInvokedCounter(), equalTo(1));
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,7 +17,7 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.ingest.processor;
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -53,7 +53,7 @@ public class ConfigurationUtilsTests extends ESTestCase {
|
||||||
assertThat(val, equalTo("bar"));
|
assertThat(val, equalTo("bar"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testReadStringProperty_InvalidType() {
|
public void testReadStringPropertyInvalidType() {
|
||||||
try {
|
try {
|
||||||
ConfigurationUtils.readStringProperty(config, "arr");
|
ConfigurationUtils.readStringProperty(config, "arr");
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
|
@ -19,7 +19,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.ingest;
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.processor.Processor;
|
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -27,8 +26,6 @@ import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
public class PipelineFactoryTests extends ESTestCase {
|
public class PipelineFactoryTests extends ESTestCase {
|
||||||
|
|
||||||
|
@ -38,13 +35,7 @@ public class PipelineFactoryTests extends ESTestCase {
|
||||||
pipelineConfig.put("description", "_description");
|
pipelineConfig.put("description", "_description");
|
||||||
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
|
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
|
||||||
Pipeline.Factory factory = new Pipeline.Factory();
|
Pipeline.Factory factory = new Pipeline.Factory();
|
||||||
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
|
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
|
||||||
Processor processor = mock(Processor.class);
|
|
||||||
when(processor.getType()).thenReturn("test-processor");
|
|
||||||
Processor.Factory processorFactory = mock(Processor.Factory.class);
|
|
||||||
when(processorFactory.create(processorConfig)).thenReturn(processor);
|
|
||||||
processorRegistry.put("test", processorFactory);
|
|
||||||
|
|
||||||
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
|
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
|
||||||
assertThat(pipeline.getId(), equalTo("_id"));
|
assertThat(pipeline.getId(), equalTo("_id"));
|
||||||
assertThat(pipeline.getDescription(), equalTo("_description"));
|
assertThat(pipeline.getDescription(), equalTo("_description"));
|
||||||
|
@ -56,16 +47,10 @@ public class PipelineFactoryTests extends ESTestCase {
|
||||||
Map<String, Object> processorConfig = new HashMap<>();
|
Map<String, Object> processorConfig = new HashMap<>();
|
||||||
Map<String, Object> pipelineConfig = new HashMap<>();
|
Map<String, Object> pipelineConfig = new HashMap<>();
|
||||||
pipelineConfig.put("description", "_description");
|
pipelineConfig.put("description", "_description");
|
||||||
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test-processor", processorConfig)));
|
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
|
||||||
pipelineConfig.put("on_failure", Collections.singletonList(Collections.singletonMap("test-processor", processorConfig)));
|
pipelineConfig.put("on_failure", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
|
||||||
Pipeline.Factory factory = new Pipeline.Factory();
|
Pipeline.Factory factory = new Pipeline.Factory();
|
||||||
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
|
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
|
||||||
Processor processor = mock(Processor.class);
|
|
||||||
when(processor.getType()).thenReturn("test-processor");
|
|
||||||
Processor.Factory processorFactory = mock(Processor.Factory.class);
|
|
||||||
when(processorFactory.create(processorConfig)).thenReturn(processor);
|
|
||||||
processorRegistry.put("test-processor", processorFactory);
|
|
||||||
|
|
||||||
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
|
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
|
||||||
assertThat(pipeline.getId(), equalTo("_id"));
|
assertThat(pipeline.getId(), equalTo("_id"));
|
||||||
assertThat(pipeline.getDescription(), equalTo("_description"));
|
assertThat(pipeline.getDescription(), equalTo("_description"));
|
||||||
|
@ -82,12 +67,7 @@ public class PipelineFactoryTests extends ESTestCase {
|
||||||
pipelineConfig.put("description", "_description");
|
pipelineConfig.put("description", "_description");
|
||||||
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
|
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
|
||||||
Pipeline.Factory factory = new Pipeline.Factory();
|
Pipeline.Factory factory = new Pipeline.Factory();
|
||||||
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
|
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
|
||||||
Processor processor = mock(Processor.class);
|
|
||||||
when(processor.getType()).thenReturn("test-processor");
|
|
||||||
Processor.Factory processorFactory = mock(Processor.Factory.class);
|
|
||||||
when(processorFactory.create(processorConfig)).thenReturn(processor);
|
|
||||||
processorRegistry.put("test", processorFactory);
|
|
||||||
try {
|
try {
|
||||||
factory.create("_id", pipelineConfig, processorRegistry);
|
factory.create("_id", pipelineConfig, processorRegistry);
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
|
@ -103,14 +83,8 @@ public class PipelineFactoryTests extends ESTestCase {
|
||||||
pipelineConfig.put("description", "_description");
|
pipelineConfig.put("description", "_description");
|
||||||
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
|
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig)));
|
||||||
Pipeline.Factory factory = new Pipeline.Factory();
|
Pipeline.Factory factory = new Pipeline.Factory();
|
||||||
Map<String, Processor.Factory> processorFactoryStore = new HashMap<>();
|
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
|
||||||
Processor processor = mock(Processor.class);
|
Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry);
|
||||||
when(processor.getType()).thenReturn("test-processor");
|
|
||||||
Processor.Factory processorFactory = mock(Processor.Factory.class);
|
|
||||||
when(processorFactory.create(processorConfig)).thenReturn(processor);
|
|
||||||
processorFactoryStore.put("test", processorFactory);
|
|
||||||
|
|
||||||
Pipeline pipeline = factory.create("_id", pipelineConfig, processorFactoryStore);
|
|
||||||
assertThat(pipeline.getId(), equalTo("_id"));
|
assertThat(pipeline.getId(), equalTo("_id"));
|
||||||
assertThat(pipeline.getDescription(), equalTo("_description"));
|
assertThat(pipeline.getDescription(), equalTo("_description"));
|
||||||
assertThat(pipeline.getProcessors().size(), equalTo(1));
|
assertThat(pipeline.getProcessors().size(), equalTo(1));
|
|
@ -0,0 +1,66 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Processor used for testing, keeps track of how many times it is invoked and
|
||||||
|
* accepts a {@link Consumer} of {@link IngestDocument} to be called when executed.
|
||||||
|
*/
|
||||||
|
public class TestProcessor implements Processor {
|
||||||
|
|
||||||
|
private final String type;
|
||||||
|
private final Consumer<IngestDocument> ingestDocumentConsumer;
|
||||||
|
private final AtomicInteger invokedCounter = new AtomicInteger();
|
||||||
|
|
||||||
|
public TestProcessor(Consumer<IngestDocument> ingestDocumentConsumer) {
|
||||||
|
this("test-processor", ingestDocumentConsumer);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestProcessor(String type, Consumer<IngestDocument> ingestDocumentConsumer) {
|
||||||
|
this.ingestDocumentConsumer = ingestDocumentConsumer;
|
||||||
|
this.type = type;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute(IngestDocument ingestDocument) throws Exception {
|
||||||
|
invokedCounter.incrementAndGet();
|
||||||
|
ingestDocumentConsumer.accept(ingestDocument);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getType() {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getInvokedCounter() {
|
||||||
|
return invokedCounter.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class Factory implements Processor.Factory<TestProcessor> {
|
||||||
|
@Override
|
||||||
|
public TestProcessor create(Map<String, Object> config) throws Exception {
|
||||||
|
return new TestProcessor(ingestDocument -> {});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -52,7 +52,5 @@ public class TestTemplateService implements TemplateService {
|
||||||
public String getKey() {
|
public String getKey() {
|
||||||
return expected;
|
return expected;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -67,5 +67,4 @@ public class ValueSourceTests extends ESTestCase {
|
||||||
assertThat(myPreciousList.size(), equalTo(1));
|
assertThat(myPreciousList.size(), equalTo(1));
|
||||||
assertThat(myPreciousList.get(0), equalTo("value"));
|
assertThat(myPreciousList.get(0), equalTo("value"));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -19,7 +19,9 @@
|
||||||
|
|
||||||
package org.elasticsearch.ingest.processor;
|
package org.elasticsearch.ingest.processor;
|
||||||
|
|
||||||
|
import org.elasticsearch.ingest.ConfigurationUtils;
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,8 @@ package org.elasticsearch.ingest.processor;
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.TemplateService;
|
import org.elasticsearch.ingest.TemplateService;
|
||||||
import org.elasticsearch.ingest.ValueSource;
|
import org.elasticsearch.ingest.ValueSource;
|
||||||
|
import org.elasticsearch.ingest.ConfigurationUtils;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
package org.elasticsearch.ingest.processor;
|
package org.elasticsearch.ingest.processor;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
|
import org.elasticsearch.ingest.ConfigurationUtils;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
package org.elasticsearch.ingest.processor;
|
package org.elasticsearch.ingest.processor;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
|
import org.elasticsearch.ingest.ConfigurationUtils;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.DateTimeZone;
|
import org.joda.time.DateTimeZone;
|
||||||
import org.joda.time.format.ISODateTimeFormat;
|
import org.joda.time.format.ISODateTimeFormat;
|
||||||
|
|
|
@ -19,7 +19,9 @@
|
||||||
|
|
||||||
package org.elasticsearch.ingest.processor;
|
package org.elasticsearch.ingest.processor;
|
||||||
|
|
||||||
|
import org.elasticsearch.ingest.ConfigurationUtils;
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.ingest.TemplateService;
|
import org.elasticsearch.ingest.TemplateService;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.lucene.util.IOUtils;
|
||||||
import org.elasticsearch.SpecialPermission;
|
import org.elasticsearch.SpecialPermission;
|
||||||
import org.elasticsearch.common.network.NetworkAddress;
|
import org.elasticsearch.common.network.NetworkAddress;
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -55,8 +56,8 @@ import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.elasticsearch.ingest.processor.ConfigurationUtils.readOptionalList;
|
import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalList;
|
||||||
import static org.elasticsearch.ingest.processor.ConfigurationUtils.readStringProperty;
|
import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;
|
||||||
|
|
||||||
public final class GeoIpProcessor implements Processor {
|
public final class GeoIpProcessor implements Processor {
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
package org.elasticsearch.ingest.processor;
|
package org.elasticsearch.ingest.processor;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
|
import org.elasticsearch.ingest.ConfigurationUtils;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
package org.elasticsearch.ingest.processor;
|
package org.elasticsearch.ingest.processor;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
|
import org.elasticsearch.ingest.ConfigurationUtils;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
package org.elasticsearch.ingest.processor;
|
package org.elasticsearch.ingest.processor;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
|
import org.elasticsearch.ingest.ConfigurationUtils;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
|
@ -21,6 +21,8 @@ package org.elasticsearch.ingest.processor;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.TemplateService;
|
import org.elasticsearch.ingest.TemplateService;
|
||||||
|
import org.elasticsearch.ingest.ConfigurationUtils;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
package org.elasticsearch.ingest.processor;
|
package org.elasticsearch.ingest.processor;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
|
import org.elasticsearch.ingest.ConfigurationUtils;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,8 @@ package org.elasticsearch.ingest.processor;
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.TemplateService;
|
import org.elasticsearch.ingest.TemplateService;
|
||||||
import org.elasticsearch.ingest.ValueSource;
|
import org.elasticsearch.ingest.ValueSource;
|
||||||
|
import org.elasticsearch.ingest.ConfigurationUtils;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
package org.elasticsearch.ingest.processor;
|
package org.elasticsearch.ingest.processor;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
|
import org.elasticsearch.ingest.ConfigurationUtils;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.discovery.DiscoverySettings;
|
import org.elasticsearch.discovery.DiscoverySettings;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.gateway.GatewayService;
|
import org.elasticsearch.gateway.GatewayService;
|
||||||
|
import org.elasticsearch.ingest.ProcessorFactoryProvider;
|
||||||
import org.elasticsearch.script.ScriptService;
|
import org.elasticsearch.script.ScriptService;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
|
@ -20,31 +20,11 @@
|
||||||
package org.elasticsearch.plugin.ingest;
|
package org.elasticsearch.plugin.ingest;
|
||||||
|
|
||||||
import org.elasticsearch.common.inject.AbstractModule;
|
import org.elasticsearch.common.inject.AbstractModule;
|
||||||
import org.elasticsearch.common.inject.multibindings.MapBinder;
|
|
||||||
import org.elasticsearch.ingest.processor.AppendProcessor;
|
|
||||||
import org.elasticsearch.ingest.processor.ConvertProcessor;
|
|
||||||
import org.elasticsearch.ingest.processor.DateProcessor;
|
|
||||||
import org.elasticsearch.ingest.processor.FailProcessor;
|
|
||||||
import org.elasticsearch.ingest.processor.GeoIpProcessor;
|
|
||||||
import org.elasticsearch.ingest.processor.GrokProcessor;
|
|
||||||
import org.elasticsearch.ingest.processor.GsubProcessor;
|
|
||||||
import org.elasticsearch.ingest.processor.JoinProcessor;
|
|
||||||
import org.elasticsearch.ingest.processor.LowercaseProcessor;
|
|
||||||
import org.elasticsearch.ingest.processor.RemoveProcessor;
|
|
||||||
import org.elasticsearch.ingest.processor.RenameProcessor;
|
|
||||||
import org.elasticsearch.ingest.processor.SetProcessor;
|
|
||||||
import org.elasticsearch.ingest.processor.SplitProcessor;
|
|
||||||
import org.elasticsearch.ingest.processor.TrimProcessor;
|
|
||||||
import org.elasticsearch.ingest.processor.UppercaseProcessor;
|
|
||||||
import org.elasticsearch.plugin.ingest.rest.IngestRestFilter;
|
import org.elasticsearch.plugin.ingest.rest.IngestRestFilter;
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
public class IngestModule extends AbstractModule {
|
public class IngestModule extends AbstractModule {
|
||||||
|
|
||||||
private final boolean ingestEnabled;
|
private final boolean ingestEnabled;
|
||||||
private final Map<String, ProcessorFactoryProvider> processorFactoryProviders = new HashMap<>();
|
|
||||||
|
|
||||||
public IngestModule(boolean ingestEnabled) {
|
public IngestModule(boolean ingestEnabled) {
|
||||||
this.ingestEnabled = ingestEnabled;
|
this.ingestEnabled = ingestEnabled;
|
||||||
|
@ -52,41 +32,12 @@ public class IngestModule extends AbstractModule {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void configure() {
|
protected void configure() {
|
||||||
// Even if ingest isn't enable we still need to make sure that rest requests with pipeline
|
// Even if ingest isn't enabled we still need to make sure that rest requests with pipeline
|
||||||
// param copy the pipeline into the context, so that in IngestDisabledActionFilter
|
// param copy the pipeline into the context, so that in IngestDisabledActionFilter
|
||||||
// index/bulk requests can be failed
|
// index/bulk requests can be failed
|
||||||
binder().bind(IngestRestFilter.class).asEagerSingleton();
|
binder().bind(IngestRestFilter.class).asEagerSingleton();
|
||||||
if (ingestEnabled) {
|
if (ingestEnabled) {
|
||||||
binder().bind(IngestBootstrapper.class).asEagerSingleton();
|
binder().bind(IngestBootstrapper.class).asEagerSingleton();
|
||||||
|
|
||||||
addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile()));
|
|
||||||
addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile()));
|
|
||||||
addProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory());
|
|
||||||
addProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService));
|
|
||||||
addProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService));
|
|
||||||
addProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory());
|
|
||||||
addProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService));
|
|
||||||
addProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory());
|
|
||||||
addProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory());
|
|
||||||
addProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory());
|
|
||||||
addProcessor(LowercaseProcessor.TYPE, (environment, mustacheFactory) -> new LowercaseProcessor.Factory());
|
|
||||||
addProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory());
|
|
||||||
addProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory());
|
|
||||||
addProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory());
|
|
||||||
addProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService));
|
|
||||||
|
|
||||||
MapBinder<String, ProcessorFactoryProvider> mapBinder = MapBinder.newMapBinder(binder(), String.class, ProcessorFactoryProvider.class);
|
|
||||||
for (Map.Entry<String, ProcessorFactoryProvider> entry : processorFactoryProviders.entrySet()) {
|
|
||||||
mapBinder.addBinding(entry.getKey()).toInstance(entry.getValue());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Adds a processor factory under a specific type name.
|
|
||||||
*/
|
|
||||||
public void addProcessor(String type, ProcessorFactoryProvider processorFactoryProvider) {
|
|
||||||
processorFactoryProviders.put(type, processorFactoryProvider);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,11 +26,27 @@ import org.elasticsearch.common.component.LifecycleComponent;
|
||||||
import org.elasticsearch.common.inject.Module;
|
import org.elasticsearch.common.inject.Module;
|
||||||
import org.elasticsearch.common.network.NetworkModule;
|
import org.elasticsearch.common.network.NetworkModule;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.ingest.ProcessorsModule;
|
||||||
|
import org.elasticsearch.ingest.processor.AppendProcessor;
|
||||||
|
import org.elasticsearch.ingest.processor.ConvertProcessor;
|
||||||
|
import org.elasticsearch.ingest.processor.DateProcessor;
|
||||||
|
import org.elasticsearch.ingest.processor.FailProcessor;
|
||||||
|
import org.elasticsearch.ingest.processor.GeoIpProcessor;
|
||||||
|
import org.elasticsearch.ingest.processor.GrokProcessor;
|
||||||
|
import org.elasticsearch.ingest.processor.GsubProcessor;
|
||||||
|
import org.elasticsearch.ingest.processor.JoinProcessor;
|
||||||
|
import org.elasticsearch.ingest.processor.LowercaseProcessor;
|
||||||
|
import org.elasticsearch.ingest.processor.RemoveProcessor;
|
||||||
|
import org.elasticsearch.ingest.processor.RenameProcessor;
|
||||||
|
import org.elasticsearch.ingest.processor.SetProcessor;
|
||||||
|
import org.elasticsearch.ingest.processor.SplitProcessor;
|
||||||
|
import org.elasticsearch.ingest.processor.TrimProcessor;
|
||||||
|
import org.elasticsearch.ingest.processor.UppercaseProcessor;
|
||||||
import org.elasticsearch.plugin.ingest.rest.RestDeletePipelineAction;
|
import org.elasticsearch.plugin.ingest.rest.RestDeletePipelineAction;
|
||||||
import org.elasticsearch.plugin.ingest.rest.RestGetPipelineAction;
|
import org.elasticsearch.plugin.ingest.rest.RestGetPipelineAction;
|
||||||
|
import org.elasticsearch.plugin.ingest.rest.RestIngestDisabledAction;
|
||||||
import org.elasticsearch.plugin.ingest.rest.RestPutPipelineAction;
|
import org.elasticsearch.plugin.ingest.rest.RestPutPipelineAction;
|
||||||
import org.elasticsearch.plugin.ingest.rest.RestSimulatePipelineAction;
|
import org.elasticsearch.plugin.ingest.rest.RestSimulatePipelineAction;
|
||||||
import org.elasticsearch.plugin.ingest.rest.RestIngestDisabledAction;
|
|
||||||
import org.elasticsearch.plugin.ingest.transport.IngestActionFilter;
|
import org.elasticsearch.plugin.ingest.transport.IngestActionFilter;
|
||||||
import org.elasticsearch.plugin.ingest.transport.IngestDisabledActionFilter;
|
import org.elasticsearch.plugin.ingest.transport.IngestDisabledActionFilter;
|
||||||
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
|
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
|
||||||
|
@ -44,6 +60,7 @@ import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineTransp
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.script.ScriptModule;
|
import org.elasticsearch.script.ScriptModule;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
||||||
|
@ -82,7 +99,25 @@ public class IngestPlugin extends Plugin {
|
||||||
if (transportClient) {
|
if (transportClient) {
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
} else {
|
} else {
|
||||||
return Collections.singletonList(new IngestModule(ingestEnabled));
|
ProcessorsModule processorsModule = new ProcessorsModule();
|
||||||
|
if (ingestEnabled) {
|
||||||
|
processorsModule.addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile()));
|
||||||
|
processorsModule.addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile()));
|
||||||
|
processorsModule.addProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory());
|
||||||
|
processorsModule.addProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService));
|
||||||
|
processorsModule.addProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService));
|
||||||
|
processorsModule.addProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory());
|
||||||
|
processorsModule.addProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService));
|
||||||
|
processorsModule.addProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory());
|
||||||
|
processorsModule.addProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory());
|
||||||
|
processorsModule.addProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory());
|
||||||
|
processorsModule.addProcessor(LowercaseProcessor.TYPE, (environment, mustacheFactory) -> new LowercaseProcessor.Factory());
|
||||||
|
processorsModule.addProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory());
|
||||||
|
processorsModule.addProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory());
|
||||||
|
processorsModule.addProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory());
|
||||||
|
processorsModule.addProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService));
|
||||||
|
}
|
||||||
|
return Arrays.asList(new IngestModule(ingestEnabled), processorsModule);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,8 +41,9 @@ import org.elasticsearch.common.xcontent.XContentHelper;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.index.IndexNotFoundException;
|
import org.elasticsearch.index.IndexNotFoundException;
|
||||||
import org.elasticsearch.ingest.Pipeline;
|
import org.elasticsearch.ingest.Pipeline;
|
||||||
|
import org.elasticsearch.ingest.ProcessorFactoryProvider;
|
||||||
import org.elasticsearch.ingest.TemplateService;
|
import org.elasticsearch.ingest.TemplateService;
|
||||||
import org.elasticsearch.ingest.processor.Processor;
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequest;
|
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequest;
|
||||||
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequest;
|
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequest;
|
||||||
import org.elasticsearch.plugin.ingest.transport.reload.ReloadPipelinesAction;
|
import org.elasticsearch.plugin.ingest.transport.reload.ReloadPipelinesAction;
|
||||||
|
|
|
@ -20,15 +20,13 @@
|
||||||
package org.elasticsearch.plugin.ingest.rest;
|
package org.elasticsearch.plugin.ingest.rest;
|
||||||
|
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
import org.elasticsearch.plugin.ingest.IngestPlugin;
|
||||||
import org.elasticsearch.rest.RestChannel;
|
import org.elasticsearch.rest.RestChannel;
|
||||||
import org.elasticsearch.rest.RestController;
|
import org.elasticsearch.rest.RestController;
|
||||||
import org.elasticsearch.rest.RestFilter;
|
import org.elasticsearch.rest.RestFilter;
|
||||||
import org.elasticsearch.rest.RestFilterChain;
|
import org.elasticsearch.rest.RestFilterChain;
|
||||||
import org.elasticsearch.rest.RestRequest;
|
import org.elasticsearch.rest.RestRequest;
|
||||||
|
|
||||||
import static org.elasticsearch.plugin.ingest.IngestPlugin.PIPELINE_ID_PARAM;
|
|
||||||
import static org.elasticsearch.plugin.ingest.IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY;
|
|
||||||
|
|
||||||
public class IngestRestFilter extends RestFilter {
|
public class IngestRestFilter extends RestFilter {
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
|
@ -38,8 +36,8 @@ public class IngestRestFilter extends RestFilter {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception {
|
public void process(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception {
|
||||||
if (request.hasParam(PIPELINE_ID_PARAM)) {
|
if (request.hasParam(IngestPlugin.PIPELINE_ID_PARAM)) {
|
||||||
request.putInContext(PIPELINE_ID_PARAM_CONTEXT_KEY, request.param(PIPELINE_ID_PARAM));
|
request.putInContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY, request.param(IngestPlugin.PIPELINE_ID_PARAM));
|
||||||
}
|
}
|
||||||
filterChain.continueProcessing(request, channel);
|
filterChain.continueProcessing(request, channel);
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ package org.elasticsearch.plugin.ingest.transport.simulate;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.Pipeline;
|
import org.elasticsearch.ingest.Pipeline;
|
||||||
import org.elasticsearch.ingest.processor.Processor;
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
|
@ -26,7 +26,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.Pipeline;
|
import org.elasticsearch.ingest.Pipeline;
|
||||||
import org.elasticsearch.ingest.processor.ConfigurationUtils;
|
import org.elasticsearch.ingest.ConfigurationUtils;
|
||||||
import org.elasticsearch.plugin.ingest.PipelineStore;
|
import org.elasticsearch.plugin.ingest.PipelineStore;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.elasticsearch.ingest.processor;
|
package org.elasticsearch.ingest.processor;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
|
|
@ -24,8 +24,7 @@ import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||||
import org.elasticsearch.ingest.TemplateService;
|
import org.elasticsearch.ingest.TemplateService;
|
||||||
import org.elasticsearch.ingest.TestTemplateService;
|
import org.elasticsearch.ingest.TestTemplateService;
|
||||||
import org.elasticsearch.ingest.ValueSource;
|
import org.elasticsearch.ingest.ValueSource;
|
||||||
import org.elasticsearch.ingest.processor.AppendProcessor;
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.ingest.processor.Processor;
|
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
|
@ -1,163 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to Elasticsearch under one or more contributor
|
|
||||||
* license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright
|
|
||||||
* ownership. Elasticsearch licenses this file to you under
|
|
||||||
* the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
* not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.elasticsearch.ingest.processor;
|
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
|
||||||
import org.elasticsearch.test.ESTestCase;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.mockito.stubbing.Answer;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import static org.elasticsearch.mock.orig.Mockito.verify;
|
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
|
||||||
import static org.hamcrest.Matchers.is;
|
|
||||||
import static org.mockito.Mockito.doAnswer;
|
|
||||||
import static org.mockito.Mockito.doThrow;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.spy;
|
|
||||||
import static org.mockito.Mockito.times;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
public class CompoundProcessorTests extends ESTestCase {
|
|
||||||
private IngestDocument ingestDocument;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void init() {
|
|
||||||
ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testEmpty() throws Exception {
|
|
||||||
CompoundProcessor processor = new CompoundProcessor();
|
|
||||||
assertThat(processor.getProcessors().isEmpty(), is(true));
|
|
||||||
assertThat(processor.getOnFailureProcessors().isEmpty(), is(true));
|
|
||||||
processor.execute(ingestDocument);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testSingleProcessor() throws Exception {
|
|
||||||
Processor processor = mock(Processor.class);
|
|
||||||
CompoundProcessor compoundProcessor = new CompoundProcessor(processor);
|
|
||||||
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
|
|
||||||
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor));
|
|
||||||
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
|
|
||||||
compoundProcessor.execute(ingestDocument);
|
|
||||||
verify(processor, times(1)).execute(ingestDocument);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testSingleProcessorWithException() throws Exception {
|
|
||||||
Processor processor = mock(Processor.class);
|
|
||||||
when(processor.getType()).thenReturn("failed_processor");
|
|
||||||
doThrow(new RuntimeException("error")).doNothing().when(processor).execute(ingestDocument);
|
|
||||||
CompoundProcessor compoundProcessor = new CompoundProcessor(processor);
|
|
||||||
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
|
|
||||||
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor));
|
|
||||||
assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true));
|
|
||||||
try {
|
|
||||||
compoundProcessor.execute(ingestDocument);
|
|
||||||
fail("should throw exception");
|
|
||||||
} catch (Exception e) {
|
|
||||||
assertThat(e.getMessage(), equalTo("error"));
|
|
||||||
}
|
|
||||||
|
|
||||||
verify(processor, times(1)).execute(ingestDocument);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testSingleProcessorWithOnFailureProcessor() throws Exception {
|
|
||||||
Exception error = new RuntimeException("error");
|
|
||||||
|
|
||||||
Processor processor = mock(Processor.class);
|
|
||||||
when(processor.getType()).thenReturn("first");
|
|
||||||
doThrow(error).doNothing().when(processor).execute(ingestDocument);
|
|
||||||
|
|
||||||
Processor processorNext = mock(Processor.class);
|
|
||||||
Answer checkMetadataAnswer = invocationOnMock -> {
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
|
|
||||||
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
|
|
||||||
assertThat(ingestMetadata.size(), equalTo(2));
|
|
||||||
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
|
|
||||||
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first"));
|
|
||||||
return null;
|
|
||||||
};
|
|
||||||
doAnswer(checkMetadataAnswer).when(processorNext).execute(ingestDocument);
|
|
||||||
|
|
||||||
CompoundProcessor compoundProcessor = spy(new CompoundProcessor(Arrays.asList(processor), Arrays.asList(processorNext)));
|
|
||||||
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
|
|
||||||
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor));
|
|
||||||
assertThat(compoundProcessor.getOnFailureProcessors().size(), equalTo(1));
|
|
||||||
assertThat(compoundProcessor.getOnFailureProcessors().get(0), equalTo(processorNext));
|
|
||||||
compoundProcessor.execute(ingestDocument);
|
|
||||||
verify(compoundProcessor).executeOnFailure(ingestDocument, error, "first");
|
|
||||||
verify(processor, times(1)).execute(ingestDocument);
|
|
||||||
verify(processorNext, times(1)).execute(ingestDocument);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testSingleProcessorWithNestedFailures() throws Exception {
|
|
||||||
Exception error = new RuntimeException("error");
|
|
||||||
Processor processor = mock(Processor.class);
|
|
||||||
when(processor.getType()).thenReturn("first");
|
|
||||||
doThrow(error).doNothing().when(processor).execute(ingestDocument);
|
|
||||||
Processor processorToFail = mock(Processor.class);
|
|
||||||
Answer checkMetadataAnswer = invocationOnMock -> {
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
|
|
||||||
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
|
|
||||||
assertThat(ingestMetadata.size(), equalTo(2));
|
|
||||||
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
|
|
||||||
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first"));
|
|
||||||
return null;
|
|
||||||
};
|
|
||||||
doAnswer(checkMetadataAnswer).when(processorToFail).execute(ingestDocument);
|
|
||||||
when(processorToFail.getType()).thenReturn("second");
|
|
||||||
doThrow(error).doNothing().when(processorToFail).execute(ingestDocument);
|
|
||||||
Processor lastProcessor = mock(Processor.class);
|
|
||||||
Answer checkLastMetadataAnswer = invocationOnMock -> {
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
|
|
||||||
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
|
|
||||||
assertThat(ingestMetadata.size(), equalTo(2));
|
|
||||||
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
|
|
||||||
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("second"));
|
|
||||||
return null;
|
|
||||||
};
|
|
||||||
doAnswer(checkLastMetadataAnswer).when(lastProcessor).execute(ingestDocument);
|
|
||||||
|
|
||||||
CompoundProcessor innerCompoundOnFailProcessor = new CompoundProcessor(Arrays.asList(processorToFail), Arrays.asList(lastProcessor));
|
|
||||||
CompoundProcessor compoundOnFailProcessor = spy(innerCompoundOnFailProcessor);
|
|
||||||
|
|
||||||
CompoundProcessor innerCompoundProcessor = new CompoundProcessor(Arrays.asList(processor), Arrays.asList(compoundOnFailProcessor));
|
|
||||||
CompoundProcessor compoundProcessor = spy(innerCompoundProcessor);
|
|
||||||
|
|
||||||
assertThat(compoundProcessor.getProcessors().size(), equalTo(1));
|
|
||||||
assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor));
|
|
||||||
assertThat(compoundProcessor.getOnFailureProcessors().size(), equalTo(1));
|
|
||||||
assertThat(compoundProcessor.getOnFailureProcessors().get(0), equalTo(compoundOnFailProcessor));
|
|
||||||
compoundProcessor.execute(ingestDocument);
|
|
||||||
verify(processor, times(1)).execute(ingestDocument);
|
|
||||||
verify(compoundProcessor, times(1)).executeOnFailure(ingestDocument, error, "first");
|
|
||||||
verify(compoundOnFailProcessor, times(1)).execute(ingestDocument);
|
|
||||||
verify(processorToFail, times(1)).execute(ingestDocument);
|
|
||||||
verify(compoundOnFailProcessor, times(1)).executeOnFailure(ingestDocument, error, "second");
|
|
||||||
verify(lastProcessor, times(1)).execute(ingestDocument);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -21,8 +21,7 @@ package org.elasticsearch.ingest.processor;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||||
import org.elasticsearch.ingest.processor.ConvertProcessor;
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.ingest.processor.Processor;
|
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
|
@ -20,11 +20,9 @@
|
||||||
package org.elasticsearch.ingest.processor;
|
package org.elasticsearch.ingest.processor;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||||
import org.elasticsearch.ingest.TestTemplateService;
|
import org.elasticsearch.ingest.TestTemplateService;
|
||||||
import org.elasticsearch.ingest.processor.FailProcessor;
|
|
||||||
import org.elasticsearch.ingest.processor.FailProcessorException;
|
|
||||||
import org.elasticsearch.ingest.processor.Processor;
|
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
|
@ -21,8 +21,7 @@ package org.elasticsearch.ingest.processor;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||||
import org.elasticsearch.ingest.processor.GsubProcessor;
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.ingest.processor.Processor;
|
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.ingest.processor;
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||||
import org.elasticsearch.ingest.TestTemplateService;
|
import org.elasticsearch.ingest.TestTemplateService;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||||
import org.elasticsearch.ingest.TemplateService;
|
import org.elasticsearch.ingest.TemplateService;
|
||||||
import org.elasticsearch.ingest.TestTemplateService;
|
import org.elasticsearch.ingest.TestTemplateService;
|
||||||
import org.elasticsearch.ingest.ValueSource;
|
import org.elasticsearch.ingest.ValueSource;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
|
@ -29,9 +29,9 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.Pipeline;
|
import org.elasticsearch.ingest.Pipeline;
|
||||||
import org.elasticsearch.ingest.TestTemplateService;
|
import org.elasticsearch.ingest.TestTemplateService;
|
||||||
import org.elasticsearch.ingest.processor.CompoundProcessor;
|
|
||||||
import org.elasticsearch.ingest.processor.Processor;
|
|
||||||
import org.elasticsearch.ingest.processor.SetProcessor;
|
import org.elasticsearch.ingest.processor.SetProcessor;
|
||||||
|
import org.elasticsearch.ingest.CompoundProcessor;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.plugin.ingest.transport;
|
||||||
|
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.ActionRequest;
|
import org.elasticsearch.action.ActionRequest;
|
||||||
import org.elasticsearch.action.ActionResponse;
|
|
||||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||||
import org.elasticsearch.action.bulk.BulkRequest;
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkResponse;
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
|
@ -32,8 +31,8 @@ import org.elasticsearch.action.update.UpdateRequest;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.Pipeline;
|
import org.elasticsearch.ingest.Pipeline;
|
||||||
import org.elasticsearch.ingest.processor.CompoundProcessor;
|
import org.elasticsearch.ingest.CompoundProcessor;
|
||||||
import org.elasticsearch.ingest.processor.Processor;
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.plugin.ingest.IngestBootstrapper;
|
import org.elasticsearch.plugin.ingest.IngestBootstrapper;
|
||||||
import org.elasticsearch.plugin.ingest.IngestPlugin;
|
import org.elasticsearch.plugin.ingest.IngestPlugin;
|
||||||
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
|
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
|
||||||
|
@ -45,14 +44,12 @@ import org.junit.Before;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
import static org.elasticsearch.plugin.ingest.transport.IngestActionFilter.BulkRequestModifier;
|
import static org.elasticsearch.plugin.ingest.transport.IngestActionFilter.BulkRequestModifier;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
|
|
|
@ -23,14 +23,12 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.Pipeline;
|
import org.elasticsearch.ingest.Pipeline;
|
||||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||||
import org.elasticsearch.ingest.processor.CompoundProcessor;
|
import org.elasticsearch.ingest.CompoundProcessor;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
import static org.hamcrest.Matchers.not;
|
import static org.hamcrest.Matchers.not;
|
||||||
|
|
|
@ -21,8 +21,8 @@ package org.elasticsearch.plugin.ingest.transport.simulate;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.Pipeline;
|
import org.elasticsearch.ingest.Pipeline;
|
||||||
import org.elasticsearch.ingest.processor.CompoundProcessor;
|
import org.elasticsearch.ingest.CompoundProcessor;
|
||||||
import org.elasticsearch.ingest.processor.Processor;
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.plugin.ingest.PipelineStore;
|
import org.elasticsearch.plugin.ingest.PipelineStore;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
|
@ -22,7 +22,7 @@ package org.elasticsearch.plugin.ingest;
|
||||||
|
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.ValueSource;
|
import org.elasticsearch.ingest.ValueSource;
|
||||||
import org.elasticsearch.ingest.processor.Processor;
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.ingest.processor.SetProcessor;
|
import org.elasticsearch.ingest.processor.SetProcessor;
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue