diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/CompoundProcessor.java b/core/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java similarity index 93% rename from plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/CompoundProcessor.java rename to core/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 42bb567da46..7c71324300e 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/CompoundProcessor.java +++ b/core/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -18,9 +18,7 @@ */ -package org.elasticsearch.ingest.processor; - -import org.elasticsearch.ingest.IngestDocument; +package org.elasticsearch.ingest; import java.util.Arrays; import java.util.Collections; @@ -42,6 +40,7 @@ public class CompoundProcessor implements Processor { public CompoundProcessor(Processor... processor) { this(Arrays.asList(processor), Collections.emptyList()); } + public CompoundProcessor(List processors, List onFailureProcessors) { this.processors = processors; this.onFailureProcessors = onFailureProcessors; @@ -57,7 +56,7 @@ public class CompoundProcessor implements Processor { @Override public String getType() { - return "compound[" + processors.stream().map(p -> p.getType()).collect(Collectors.joining(",")) + "]"; + return "compound[" + processors.stream().map(Processor::getType).collect(Collectors.joining(",")) + "]"; } @Override diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConfigurationUtils.java b/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java similarity index 99% rename from plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConfigurationUtils.java rename to core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java index 7ba737eb56e..c51714615a8 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConfigurationUtils.java +++ b/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.ingest.processor; +package org.elasticsearch.ingest; import java.util.List; import java.util.Map; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/core/src/main/java/org/elasticsearch/ingest/IngestDocument.java similarity index 95% rename from plugins/ingest/src/main/java/org/elasticsearch/ingest/IngestDocument.java rename to core/src/main/java/org/elasticsearch/ingest/IngestDocument.java index c6356867bd9..565ee9242ff 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/core/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -20,6 +20,14 @@ package org.elasticsearch.ingest; import org.elasticsearch.common.Strings; +import org.elasticsearch.index.mapper.internal.IdFieldMapper; +import org.elasticsearch.index.mapper.internal.IndexFieldMapper; +import org.elasticsearch.index.mapper.internal.ParentFieldMapper; +import org.elasticsearch.index.mapper.internal.RoutingFieldMapper; +import org.elasticsearch.index.mapper.internal.SourceFieldMapper; +import org.elasticsearch.index.mapper.internal.TTLFieldMapper; +import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; +import org.elasticsearch.index.mapper.internal.TypeFieldMapper; import java.text.DateFormat; import java.text.SimpleDateFormat; @@ -38,7 +46,6 @@ import java.util.TimeZone; public final class IngestDocument { public final static String INGEST_KEY = "_ingest"; - public final static String SOURCE_KEY = "_source"; static final String TIMESTAMP = "timestamp"; @@ -348,7 +355,7 @@ public final class IngestDocument { if (append) { if (map.containsKey(leafKey)) { Object object = map.get(leafKey); - List list = appendValues(path, object, value); + List list = appendValues(object, value); if (list != object) { map.put(leafKey, list); } @@ -374,7 +381,7 @@ public final class IngestDocument { } if (append) { Object object = list.get(index); - List newList = appendValues(path, object, value); + List newList = appendValues(object, value); if (newList != object) { list.set(index, newList); } @@ -387,7 +394,7 @@ public final class IngestDocument { } @SuppressWarnings("unchecked") - private static List appendValues(String path, Object maybeList, Object value) { + private static List appendValues(Object maybeList, Object value) { List list; if (maybeList instanceof List) { //maybeList is already a list, we append the provided values to it @@ -427,7 +434,7 @@ public final class IngestDocument { private Map createTemplateModel() { Map model = new HashMap<>(sourceAndMetadata); - model.put(SOURCE_KEY, sourceAndMetadata); + model.put(SourceFieldMapper.NAME, sourceAndMetadata); // If there is a field in the source with the name '_ingest' it gets overwritten here, // if access to that field is required then it get accessed via '_source._ingest' model.put(INGEST_KEY, ingestMetadata); @@ -489,13 +496,13 @@ public final class IngestDocument { } public enum MetaData { - INDEX("_index"), - TYPE("_type"), - ID("_id"), - ROUTING("_routing"), - PARENT("_parent"), - TIMESTAMP("_timestamp"), - TTL("_ttl"); + INDEX(IndexFieldMapper.NAME), + TYPE(TypeFieldMapper.NAME), + ID(IdFieldMapper.NAME), + ROUTING(RoutingFieldMapper.NAME), + PARENT(ParentFieldMapper.NAME), + TIMESTAMP(TimestampFieldMapper.NAME), + TTL(TTLFieldMapper.NAME); private final String fieldName; @@ -506,7 +513,6 @@ public final class IngestDocument { public String getFieldName() { return fieldName; } - } private class FieldPath { @@ -523,7 +529,7 @@ public final class IngestDocument { newPath = path.substring(8, path.length()); } else { initialContext = sourceAndMetadata; - if (path.startsWith(SOURCE_KEY + ".")) { + if (path.startsWith(SourceFieldMapper.NAME + ".")) { newPath = path.substring(8, path.length()); } else { newPath = path; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java b/core/src/main/java/org/elasticsearch/ingest/Pipeline.java similarity index 91% rename from plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java rename to core/src/main/java/org/elasticsearch/ingest/Pipeline.java index b0e0a2a66a8..56b49a5b063 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/core/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -20,10 +20,6 @@ package org.elasticsearch.ingest; -import org.elasticsearch.ingest.processor.ConfigurationUtils; -import org.elasticsearch.ingest.processor.Processor; -import org.elasticsearch.ingest.processor.CompoundProcessor; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -92,12 +88,10 @@ public final class Pipeline { } if (onFailureProcessors.isEmpty()) { return processor; - } else { - return new CompoundProcessor(Arrays.asList(processor), onFailureProcessors); } - } else { - throw new IllegalArgumentException("No processor type exist with name [" + type + "]"); + return new CompoundProcessor(Collections.singletonList(processor), onFailureProcessors); } + throw new IllegalArgumentException("No processor type exist with name [" + type + "]"); } private List readProcessors(String fieldName, Map processorRegistry, Map config) throws Exception { @@ -121,6 +115,5 @@ public final class Pipeline { CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors)); return new Pipeline(id, description, compoundProcessor); } - } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java b/core/src/main/java/org/elasticsearch/ingest/Processor.java similarity index 94% rename from plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java rename to core/src/main/java/org/elasticsearch/ingest/Processor.java index 36bcf9689a6..5b97a485d9c 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java +++ b/core/src/main/java/org/elasticsearch/ingest/Processor.java @@ -18,9 +18,7 @@ */ -package org.elasticsearch.ingest.processor; - -import org.elasticsearch.ingest.IngestDocument; +package org.elasticsearch.ingest; import java.util.Map; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/ProcessorFactoryProvider.java b/core/src/main/java/org/elasticsearch/ingest/ProcessorFactoryProvider.java similarity index 88% rename from plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/ProcessorFactoryProvider.java rename to core/src/main/java/org/elasticsearch/ingest/ProcessorFactoryProvider.java index e99261e6408..e1126b305b1 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/ProcessorFactoryProvider.java +++ b/core/src/main/java/org/elasticsearch/ingest/ProcessorFactoryProvider.java @@ -17,11 +17,9 @@ * under the License. */ -package org.elasticsearch.plugin.ingest; +package org.elasticsearch.ingest; import org.elasticsearch.env.Environment; -import org.elasticsearch.ingest.TemplateService; -import org.elasticsearch.ingest.processor.Processor; /** * The ingest framework (pipeline, processor and processor factory) can't rely on ES specific code. However some @@ -29,9 +27,8 @@ import org.elasticsearch.ingest.processor.Processor; * so we need some code that provides the physical location of the configuration directory to the processor factories * that need this and this is what this processor factory provider does. */ +//TODO this abstraction could be removed once ingest-core is part of es core? @FunctionalInterface -interface ProcessorFactoryProvider { - +public interface ProcessorFactoryProvider { Processor.Factory get(Environment environment, TemplateService templateService); - } diff --git a/core/src/main/java/org/elasticsearch/ingest/ProcessorsModule.java b/core/src/main/java/org/elasticsearch/ingest/ProcessorsModule.java new file mode 100644 index 00000000000..c8dd515c81f --- /dev/null +++ b/core/src/main/java/org/elasticsearch/ingest/ProcessorsModule.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.multibindings.MapBinder; + +import java.util.HashMap; +import java.util.Map; + +/** + * Registry for processor factories + * @see org.elasticsearch.ingest.Processor.Factory + * @see ProcessorFactoryProvider + */ +public class ProcessorsModule extends AbstractModule { + + private final Map processorFactoryProviders = new HashMap<>(); + + @Override + protected void configure() { + MapBinder mapBinder = MapBinder.newMapBinder(binder(), String.class, ProcessorFactoryProvider.class); + for (Map.Entry entry : processorFactoryProviders.entrySet()) { + mapBinder.addBinding(entry.getKey()).toInstance(entry.getValue()); + } + } + + /** + * Adds a processor factory under a specific type name. + */ + public void addProcessor(String type, ProcessorFactoryProvider processorFactoryProvider) { + processorFactoryProviders.put(type, processorFactoryProvider); + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/TemplateService.java b/core/src/main/java/org/elasticsearch/ingest/TemplateService.java similarity index 93% rename from plugins/ingest/src/main/java/org/elasticsearch/ingest/TemplateService.java rename to core/src/main/java/org/elasticsearch/ingest/TemplateService.java index c5bd3e97320..afc89dedfce 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/TemplateService.java +++ b/core/src/main/java/org/elasticsearch/ingest/TemplateService.java @@ -24,6 +24,7 @@ import java.util.Map; * Abstraction for the template engine. */ // NOTE: this abstraction is added because the 'org.elasticsearch.ingest' has the requirement to be ES agnostic +//TODO this abstraction could be removed once ingest-core is part of es core? public interface TemplateService { Template compile(String template); diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/ValueSource.java b/core/src/main/java/org/elasticsearch/ingest/ValueSource.java similarity index 100% rename from plugins/ingest/src/main/java/org/elasticsearch/ingest/ValueSource.java rename to core/src/main/java/org/elasticsearch/ingest/ValueSource.java diff --git a/core/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/core/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java new file mode 100644 index 00000000000..b6a508eb9a1 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -0,0 +1,110 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class CompoundProcessorTests extends ESTestCase { + private IngestDocument ingestDocument; + + @Before + public void init() { + ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); + } + + public void testEmpty() throws Exception { + CompoundProcessor processor = new CompoundProcessor(); + assertThat(processor.getProcessors().isEmpty(), is(true)); + assertThat(processor.getOnFailureProcessors().isEmpty(), is(true)); + processor.execute(ingestDocument); + } + + public void testSingleProcessor() throws Exception { + TestProcessor processor = new TestProcessor(ingestDocument -> {}); + CompoundProcessor compoundProcessor = new CompoundProcessor(processor); + assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); + assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor)); + assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); + compoundProcessor.execute(ingestDocument); + assertThat(processor.getInvokedCounter(), equalTo(1)); + } + + public void testSingleProcessorWithException() throws Exception { + TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");}); + CompoundProcessor compoundProcessor = new CompoundProcessor(processor); + assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); + assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor)); + assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); + try { + compoundProcessor.execute(ingestDocument); + fail("should throw exception"); + } catch (Exception e) { + assertThat(e.getMessage(), equalTo("error")); + } + assertThat(processor.getInvokedCounter(), equalTo(1)); + } + + public void testSingleProcessorWithOnFailureProcessor() throws Exception { + TestProcessor processor1 = new TestProcessor("first", ingestDocument -> {throw new RuntimeException("error");}); + TestProcessor processor2 = new TestProcessor(ingestDocument -> { + Map ingestMetadata = ingestDocument.getIngestMetadata(); + assertThat(ingestMetadata.size(), equalTo(2)); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first")); + }); + + CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor1), Collections.singletonList(processor2)); + compoundProcessor.execute(ingestDocument); + + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertThat(processor2.getInvokedCounter(), equalTo(1)); + } + + public void testSingleProcessorWithNestedFailures() throws Exception { + TestProcessor processor = new TestProcessor("first", ingestDocument -> {throw new RuntimeException("error");}); + TestProcessor processorToFail = new TestProcessor("second", ingestDocument -> { + Map ingestMetadata = ingestDocument.getIngestMetadata(); + assertThat(ingestMetadata.size(), equalTo(2)); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first")); + throw new RuntimeException("error"); + }); + TestProcessor lastProcessor = new TestProcessor(ingestDocument -> { + Map ingestMetadata = ingestDocument.getIngestMetadata(); + assertThat(ingestMetadata.size(), equalTo(2)); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error")); + assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("second")); + }); + CompoundProcessor compoundOnFailProcessor = new CompoundProcessor(Collections.singletonList(processorToFail), Collections.singletonList(lastProcessor)); + CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(compoundOnFailProcessor)); + compoundProcessor.execute(ingestDocument); + + assertThat(processorToFail.getInvokedCounter(), equalTo(1)); + assertThat(lastProcessor.getInvokedCounter(), equalTo(1)); + } +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/ConfigurationUtilsTests.java b/core/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java similarity index 95% rename from plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/ConfigurationUtilsTests.java rename to core/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java index b661a598edf..a1d736d1c1e 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/ConfigurationUtilsTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.ingest.processor; +package org.elasticsearch.ingest; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -53,7 +53,7 @@ public class ConfigurationUtilsTests extends ESTestCase { assertThat(val, equalTo("bar")); } - public void testReadStringProperty_InvalidType() { + public void testReadStringPropertyInvalidType() { try { ConfigurationUtils.readStringProperty(config, "arr"); } catch (IllegalArgumentException e) { diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java b/core/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java similarity index 100% rename from plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java rename to core/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java similarity index 70% rename from plugins/ingest/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java rename to core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java index 5d61f11ac24..f3bb8f38451 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.ingest; -import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.test.ESTestCase; import java.util.Collections; @@ -27,8 +26,6 @@ import java.util.HashMap; import java.util.Map; import static org.hamcrest.CoreMatchers.equalTo; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class PipelineFactoryTests extends ESTestCase { @@ -38,13 +35,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put("description", "_description"); pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig))); Pipeline.Factory factory = new Pipeline.Factory(); - Map processorRegistry = new HashMap<>(); - Processor processor = mock(Processor.class); - when(processor.getType()).thenReturn("test-processor"); - Processor.Factory processorFactory = mock(Processor.Factory.class); - when(processorFactory.create(processorConfig)).thenReturn(processor); - processorRegistry.put("test", processorFactory); - + Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); @@ -56,16 +47,10 @@ public class PipelineFactoryTests extends ESTestCase { Map processorConfig = new HashMap<>(); Map pipelineConfig = new HashMap<>(); pipelineConfig.put("description", "_description"); - pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test-processor", processorConfig))); - pipelineConfig.put("on_failure", Collections.singletonList(Collections.singletonMap("test-processor", processorConfig))); + pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig))); + pipelineConfig.put("on_failure", Collections.singletonList(Collections.singletonMap("test", processorConfig))); Pipeline.Factory factory = new Pipeline.Factory(); - Map processorRegistry = new HashMap<>(); - Processor processor = mock(Processor.class); - when(processor.getType()).thenReturn("test-processor"); - Processor.Factory processorFactory = mock(Processor.Factory.class); - when(processorFactory.create(processorConfig)).thenReturn(processor); - processorRegistry.put("test-processor", processorFactory); - + Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); @@ -82,12 +67,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put("description", "_description"); pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig))); Pipeline.Factory factory = new Pipeline.Factory(); - Map processorRegistry = new HashMap<>(); - Processor processor = mock(Processor.class); - when(processor.getType()).thenReturn("test-processor"); - Processor.Factory processorFactory = mock(Processor.Factory.class); - when(processorFactory.create(processorConfig)).thenReturn(processor); - processorRegistry.put("test", processorFactory); + Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); try { factory.create("_id", pipelineConfig, processorRegistry); } catch (IllegalArgumentException e) { @@ -103,14 +83,8 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put("description", "_description"); pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("test", processorConfig))); Pipeline.Factory factory = new Pipeline.Factory(); - Map processorFactoryStore = new HashMap<>(); - Processor processor = mock(Processor.class); - when(processor.getType()).thenReturn("test-processor"); - Processor.Factory processorFactory = mock(Processor.Factory.class); - when(processorFactory.create(processorConfig)).thenReturn(processor); - processorFactoryStore.put("test", processorFactory); - - Pipeline pipeline = factory.create("_id", pipelineConfig, processorFactoryStore); + Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); + Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getProcessors().size(), equalTo(1)); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/RandomDocumentPicks.java b/core/src/test/java/org/elasticsearch/ingest/RandomDocumentPicks.java similarity index 100% rename from plugins/ingest/src/test/java/org/elasticsearch/ingest/RandomDocumentPicks.java rename to core/src/test/java/org/elasticsearch/ingest/RandomDocumentPicks.java diff --git a/core/src/test/java/org/elasticsearch/ingest/TestProcessor.java b/core/src/test/java/org/elasticsearch/ingest/TestProcessor.java new file mode 100644 index 00000000000..67a2406da75 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/ingest/TestProcessor.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +/** + * Processor used for testing, keeps track of how many times it is invoked and + * accepts a {@link Consumer} of {@link IngestDocument} to be called when executed. + */ +public class TestProcessor implements Processor { + + private final String type; + private final Consumer ingestDocumentConsumer; + private final AtomicInteger invokedCounter = new AtomicInteger(); + + public TestProcessor(Consumer ingestDocumentConsumer) { + this("test-processor", ingestDocumentConsumer); + } + + public TestProcessor(String type, Consumer ingestDocumentConsumer) { + this.ingestDocumentConsumer = ingestDocumentConsumer; + this.type = type; + } + + @Override + public void execute(IngestDocument ingestDocument) throws Exception { + invokedCounter.incrementAndGet(); + ingestDocumentConsumer.accept(ingestDocument); + } + + @Override + public String getType() { + return type; + } + + public int getInvokedCounter() { + return invokedCounter.get(); + } + + public static final class Factory implements Processor.Factory { + @Override + public TestProcessor create(Map config) throws Exception { + return new TestProcessor(ingestDocument -> {}); + } + } +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/TestTemplateService.java b/core/src/test/java/org/elasticsearch/ingest/TestTemplateService.java similarity index 99% rename from plugins/ingest/src/test/java/org/elasticsearch/ingest/TestTemplateService.java rename to core/src/test/java/org/elasticsearch/ingest/TestTemplateService.java index 5ef2c8e4bdd..d44764fa8ac 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/TestTemplateService.java +++ b/core/src/test/java/org/elasticsearch/ingest/TestTemplateService.java @@ -52,7 +52,5 @@ public class TestTemplateService implements TemplateService { public String getKey() { return expected; } - } - } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/ValueSourceTests.java b/core/src/test/java/org/elasticsearch/ingest/ValueSourceTests.java similarity index 99% rename from plugins/ingest/src/test/java/org/elasticsearch/ingest/ValueSourceTests.java rename to core/src/test/java/org/elasticsearch/ingest/ValueSourceTests.java index f21f1f2ad44..63eae63a400 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/ValueSourceTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/ValueSourceTests.java @@ -67,5 +67,4 @@ public class ValueSourceTests extends ESTestCase { assertThat(myPreciousList.size(), equalTo(1)); assertThat(myPreciousList.get(0), equalTo("value")); } - } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/AbstractStringProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/AbstractStringProcessor.java index 2769a1dd419..cc704c19984 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/AbstractStringProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/AbstractStringProcessor.java @@ -19,7 +19,9 @@ package org.elasticsearch.ingest.processor; +import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; import java.util.Map; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/AppendProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/AppendProcessor.java index f8d53f07af3..46bba1b9edb 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/AppendProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/AppendProcessor.java @@ -22,6 +22,8 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.TemplateService; import org.elasticsearch.ingest.ValueSource; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.Processor; import java.util.Map; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConvertProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConvertProcessor.java index b3c287dc9ba..68386b18a33 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConvertProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/ConvertProcessor.java @@ -20,6 +20,8 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.Processor; import java.util.ArrayList; import java.util.List; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/DateProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/DateProcessor.java index 6fce95436f9..e919f4f49c3 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/DateProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/DateProcessor.java @@ -20,6 +20,8 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.Processor; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.format.ISODateTimeFormat; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/FailProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/FailProcessor.java index 727736fd283..6d2bdf8b7d7 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/FailProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/FailProcessor.java @@ -19,7 +19,9 @@ package org.elasticsearch.ingest.processor; +import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.TemplateService; import java.util.Map; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/GeoIpProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/GeoIpProcessor.java index bfeaad4e15e..adb76799d4b 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/GeoIpProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/GeoIpProcessor.java @@ -32,6 +32,7 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.SpecialPermission; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; import java.io.Closeable; import java.io.IOException; @@ -55,8 +56,8 @@ import java.util.Map; import java.util.Set; import java.util.stream.Stream; -import static org.elasticsearch.ingest.processor.ConfigurationUtils.readOptionalList; -import static org.elasticsearch.ingest.processor.ConfigurationUtils.readStringProperty; +import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalList; +import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty; public final class GeoIpProcessor implements Processor { diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/GrokProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/GrokProcessor.java index 60c7cee7ead..d85cde13d05 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/GrokProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/GrokProcessor.java @@ -20,6 +20,8 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.Processor; import java.io.BufferedReader; import java.io.IOException; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/GsubProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/GsubProcessor.java index 897592a4f15..b831ec511e1 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/GsubProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/GsubProcessor.java @@ -20,6 +20,8 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.Processor; import java.util.Map; import java.util.regex.Matcher; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/JoinProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/JoinProcessor.java index fdc9aade1f8..3582dce278a 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/JoinProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/JoinProcessor.java @@ -20,6 +20,8 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.Processor; import java.util.List; import java.util.Map; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/RemoveProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/RemoveProcessor.java index bbcf3e3f3d5..efad3707398 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/RemoveProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/RemoveProcessor.java @@ -21,6 +21,8 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.TemplateService; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.Processor; import java.util.Map; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/RenameProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/RenameProcessor.java index 9aa76ab2d69..5b134069070 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/RenameProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/RenameProcessor.java @@ -20,6 +20,8 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.Processor; import java.util.Map; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/SetProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/SetProcessor.java index 2bf69b73772..0f2ac29be49 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/SetProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/SetProcessor.java @@ -22,6 +22,8 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.TemplateService; import org.elasticsearch.ingest.ValueSource; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.Processor; import java.util.Map; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/SplitProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/SplitProcessor.java index 8838a384530..b1e50b20cd6 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/SplitProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/SplitProcessor.java @@ -20,6 +20,8 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.Processor; import java.util.Arrays; import java.util.Map; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestBootstrapper.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestBootstrapper.java index d07b3a8528a..4f114fc61fe 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestBootstrapper.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestBootstrapper.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.env.Environment; import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.ingest.ProcessorFactoryProvider; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java index a63ebdf1ed5..e168173cca0 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java @@ -20,31 +20,11 @@ package org.elasticsearch.plugin.ingest; import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.multibindings.MapBinder; -import org.elasticsearch.ingest.processor.AppendProcessor; -import org.elasticsearch.ingest.processor.ConvertProcessor; -import org.elasticsearch.ingest.processor.DateProcessor; -import org.elasticsearch.ingest.processor.FailProcessor; -import org.elasticsearch.ingest.processor.GeoIpProcessor; -import org.elasticsearch.ingest.processor.GrokProcessor; -import org.elasticsearch.ingest.processor.GsubProcessor; -import org.elasticsearch.ingest.processor.JoinProcessor; -import org.elasticsearch.ingest.processor.LowercaseProcessor; -import org.elasticsearch.ingest.processor.RemoveProcessor; -import org.elasticsearch.ingest.processor.RenameProcessor; -import org.elasticsearch.ingest.processor.SetProcessor; -import org.elasticsearch.ingest.processor.SplitProcessor; -import org.elasticsearch.ingest.processor.TrimProcessor; -import org.elasticsearch.ingest.processor.UppercaseProcessor; import org.elasticsearch.plugin.ingest.rest.IngestRestFilter; -import java.util.HashMap; -import java.util.Map; - public class IngestModule extends AbstractModule { private final boolean ingestEnabled; - private final Map processorFactoryProviders = new HashMap<>(); public IngestModule(boolean ingestEnabled) { this.ingestEnabled = ingestEnabled; @@ -52,41 +32,12 @@ public class IngestModule extends AbstractModule { @Override protected void configure() { - // Even if ingest isn't enable we still need to make sure that rest requests with pipeline + // Even if ingest isn't enabled we still need to make sure that rest requests with pipeline // param copy the pipeline into the context, so that in IngestDisabledActionFilter // index/bulk requests can be failed binder().bind(IngestRestFilter.class).asEagerSingleton(); if (ingestEnabled) { binder().bind(IngestBootstrapper.class).asEagerSingleton(); - - addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile())); - addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile())); - addProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory()); - addProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService)); - addProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService)); - addProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory()); - addProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService)); - addProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory()); - addProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory()); - addProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory()); - addProcessor(LowercaseProcessor.TYPE, (environment, mustacheFactory) -> new LowercaseProcessor.Factory()); - addProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory()); - addProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory()); - addProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory()); - addProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService)); - - MapBinder mapBinder = MapBinder.newMapBinder(binder(), String.class, ProcessorFactoryProvider.class); - for (Map.Entry entry : processorFactoryProviders.entrySet()) { - mapBinder.addBinding(entry.getKey()).toInstance(entry.getValue()); - } } } - - /** - * Adds a processor factory under a specific type name. - */ - public void addProcessor(String type, ProcessorFactoryProvider processorFactoryProvider) { - processorFactoryProviders.put(type, processorFactoryProvider); - } - } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java index f8eb044d881..a5f4b78cd45 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java @@ -26,11 +26,27 @@ import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.ingest.ProcessorsModule; +import org.elasticsearch.ingest.processor.AppendProcessor; +import org.elasticsearch.ingest.processor.ConvertProcessor; +import org.elasticsearch.ingest.processor.DateProcessor; +import org.elasticsearch.ingest.processor.FailProcessor; +import org.elasticsearch.ingest.processor.GeoIpProcessor; +import org.elasticsearch.ingest.processor.GrokProcessor; +import org.elasticsearch.ingest.processor.GsubProcessor; +import org.elasticsearch.ingest.processor.JoinProcessor; +import org.elasticsearch.ingest.processor.LowercaseProcessor; +import org.elasticsearch.ingest.processor.RemoveProcessor; +import org.elasticsearch.ingest.processor.RenameProcessor; +import org.elasticsearch.ingest.processor.SetProcessor; +import org.elasticsearch.ingest.processor.SplitProcessor; +import org.elasticsearch.ingest.processor.TrimProcessor; +import org.elasticsearch.ingest.processor.UppercaseProcessor; import org.elasticsearch.plugin.ingest.rest.RestDeletePipelineAction; import org.elasticsearch.plugin.ingest.rest.RestGetPipelineAction; +import org.elasticsearch.plugin.ingest.rest.RestIngestDisabledAction; import org.elasticsearch.plugin.ingest.rest.RestPutPipelineAction; import org.elasticsearch.plugin.ingest.rest.RestSimulatePipelineAction; -import org.elasticsearch.plugin.ingest.rest.RestIngestDisabledAction; import org.elasticsearch.plugin.ingest.transport.IngestActionFilter; import org.elasticsearch.plugin.ingest.transport.IngestDisabledActionFilter; import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction; @@ -44,6 +60,7 @@ import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineTransp import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptModule; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -82,7 +99,25 @@ public class IngestPlugin extends Plugin { if (transportClient) { return Collections.emptyList(); } else { - return Collections.singletonList(new IngestModule(ingestEnabled)); + ProcessorsModule processorsModule = new ProcessorsModule(); + if (ingestEnabled) { + processorsModule.addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile())); + processorsModule.addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile())); + processorsModule.addProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory()); + processorsModule.addProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService)); + processorsModule.addProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService)); + processorsModule.addProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory()); + processorsModule.addProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService)); + processorsModule.addProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory()); + processorsModule.addProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory()); + processorsModule.addProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory()); + processorsModule.addProcessor(LowercaseProcessor.TYPE, (environment, mustacheFactory) -> new LowercaseProcessor.Factory()); + processorsModule.addProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory()); + processorsModule.addProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory()); + processorsModule.addProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory()); + processorsModule.addProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService)); + } + return Arrays.asList(new IngestModule(ingestEnabled), processorsModule); } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java index ba59d8af314..8c575a5b4a8 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java @@ -41,8 +41,9 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.env.Environment; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.ProcessorFactoryProvider; import org.elasticsearch.ingest.TemplateService; -import org.elasticsearch.ingest.processor.Processor; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequest; import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequest; import org.elasticsearch.plugin.ingest.transport.reload.ReloadPipelinesAction; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/IngestRestFilter.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/IngestRestFilter.java index 0c548888ab2..751ff0fcc68 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/IngestRestFilter.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/rest/IngestRestFilter.java @@ -20,15 +20,13 @@ package org.elasticsearch.plugin.ingest.rest; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.plugin.ingest.IngestPlugin; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestFilter; import org.elasticsearch.rest.RestFilterChain; import org.elasticsearch.rest.RestRequest; -import static org.elasticsearch.plugin.ingest.IngestPlugin.PIPELINE_ID_PARAM; -import static org.elasticsearch.plugin.ingest.IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY; - public class IngestRestFilter extends RestFilter { @Inject @@ -38,8 +36,8 @@ public class IngestRestFilter extends RestFilter { @Override public void process(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception { - if (request.hasParam(PIPELINE_ID_PARAM)) { - request.putInContext(PIPELINE_ID_PARAM_CONTEXT_KEY, request.param(PIPELINE_ID_PARAM)); + if (request.hasParam(IngestPlugin.PIPELINE_ID_PARAM)) { + request.putInContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY, request.param(IngestPlugin.PIPELINE_ID_PARAM)); } filterChain.continueProcessing(request, channel); } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionService.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionService.java index 430d6fd7234..1d2fc5d7793 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionService.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionService.java @@ -22,7 +22,7 @@ package org.elasticsearch.plugin.ingest.transport.simulate; import org.elasticsearch.action.ActionListener; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.processor.Processor; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequest.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequest.java index 2d81fa71f63..a4601cb746b 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequest.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequest.java @@ -26,7 +26,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.processor.ConfigurationUtils; +import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.plugin.ingest.PipelineStore; import java.io.IOException; diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/AbstractStringProcessorTestCase.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/AbstractStringProcessorTestCase.java index 0d5f21ff712..a8840b42d7a 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/AbstractStringProcessorTestCase.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/AbstractStringProcessorTestCase.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.test.ESTestCase; diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/AppendProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/AppendProcessorTests.java index df9cc4074c4..8c27d881252 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/AppendProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/AppendProcessorTests.java @@ -24,8 +24,7 @@ import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.ingest.TemplateService; import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.ingest.ValueSource; -import org.elasticsearch.ingest.processor.AppendProcessor; -import org.elasticsearch.ingest.processor.Processor; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/CompoundProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/CompoundProcessorTests.java deleted file mode 100644 index 85bbee1e6d3..00000000000 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/CompoundProcessorTests.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.ingest.processor; - -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.test.ESTestCase; -import org.junit.Before; -import org.mockito.stubbing.Answer; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import static org.elasticsearch.mock.orig.Mockito.verify; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.when; - -public class CompoundProcessorTests extends ESTestCase { - private IngestDocument ingestDocument; - - @Before - public void init() { - ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); - } - - public void testEmpty() throws Exception { - CompoundProcessor processor = new CompoundProcessor(); - assertThat(processor.getProcessors().isEmpty(), is(true)); - assertThat(processor.getOnFailureProcessors().isEmpty(), is(true)); - processor.execute(ingestDocument); - } - - public void testSingleProcessor() throws Exception { - Processor processor = mock(Processor.class); - CompoundProcessor compoundProcessor = new CompoundProcessor(processor); - assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); - assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor)); - assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); - compoundProcessor.execute(ingestDocument); - verify(processor, times(1)).execute(ingestDocument); - } - - public void testSingleProcessorWithException() throws Exception { - Processor processor = mock(Processor.class); - when(processor.getType()).thenReturn("failed_processor"); - doThrow(new RuntimeException("error")).doNothing().when(processor).execute(ingestDocument); - CompoundProcessor compoundProcessor = new CompoundProcessor(processor); - assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); - assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor)); - assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); - try { - compoundProcessor.execute(ingestDocument); - fail("should throw exception"); - } catch (Exception e) { - assertThat(e.getMessage(), equalTo("error")); - } - - verify(processor, times(1)).execute(ingestDocument); - } - - public void testSingleProcessorWithOnFailureProcessor() throws Exception { - Exception error = new RuntimeException("error"); - - Processor processor = mock(Processor.class); - when(processor.getType()).thenReturn("first"); - doThrow(error).doNothing().when(processor).execute(ingestDocument); - - Processor processorNext = mock(Processor.class); - Answer checkMetadataAnswer = invocationOnMock -> { - @SuppressWarnings("unchecked") - IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0]; - Map ingestMetadata = ingestDocument.getIngestMetadata(); - assertThat(ingestMetadata.size(), equalTo(2)); - assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error")); - assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first")); - return null; - }; - doAnswer(checkMetadataAnswer).when(processorNext).execute(ingestDocument); - - CompoundProcessor compoundProcessor = spy(new CompoundProcessor(Arrays.asList(processor), Arrays.asList(processorNext))); - assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); - assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor)); - assertThat(compoundProcessor.getOnFailureProcessors().size(), equalTo(1)); - assertThat(compoundProcessor.getOnFailureProcessors().get(0), equalTo(processorNext)); - compoundProcessor.execute(ingestDocument); - verify(compoundProcessor).executeOnFailure(ingestDocument, error, "first"); - verify(processor, times(1)).execute(ingestDocument); - verify(processorNext, times(1)).execute(ingestDocument); - - } - - public void testSingleProcessorWithNestedFailures() throws Exception { - Exception error = new RuntimeException("error"); - Processor processor = mock(Processor.class); - when(processor.getType()).thenReturn("first"); - doThrow(error).doNothing().when(processor).execute(ingestDocument); - Processor processorToFail = mock(Processor.class); - Answer checkMetadataAnswer = invocationOnMock -> { - @SuppressWarnings("unchecked") - IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0]; - Map ingestMetadata = ingestDocument.getIngestMetadata(); - assertThat(ingestMetadata.size(), equalTo(2)); - assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error")); - assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("first")); - return null; - }; - doAnswer(checkMetadataAnswer).when(processorToFail).execute(ingestDocument); - when(processorToFail.getType()).thenReturn("second"); - doThrow(error).doNothing().when(processorToFail).execute(ingestDocument); - Processor lastProcessor = mock(Processor.class); - Answer checkLastMetadataAnswer = invocationOnMock -> { - @SuppressWarnings("unchecked") - IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0]; - Map ingestMetadata = ingestDocument.getIngestMetadata(); - assertThat(ingestMetadata.size(), equalTo(2)); - assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error")); - assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_FIELD), equalTo("second")); - return null; - }; - doAnswer(checkLastMetadataAnswer).when(lastProcessor).execute(ingestDocument); - - CompoundProcessor innerCompoundOnFailProcessor = new CompoundProcessor(Arrays.asList(processorToFail), Arrays.asList(lastProcessor)); - CompoundProcessor compoundOnFailProcessor = spy(innerCompoundOnFailProcessor); - - CompoundProcessor innerCompoundProcessor = new CompoundProcessor(Arrays.asList(processor), Arrays.asList(compoundOnFailProcessor)); - CompoundProcessor compoundProcessor = spy(innerCompoundProcessor); - - assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); - assertThat(compoundProcessor.getProcessors().get(0), equalTo(processor)); - assertThat(compoundProcessor.getOnFailureProcessors().size(), equalTo(1)); - assertThat(compoundProcessor.getOnFailureProcessors().get(0), equalTo(compoundOnFailProcessor)); - compoundProcessor.execute(ingestDocument); - verify(processor, times(1)).execute(ingestDocument); - verify(compoundProcessor, times(1)).executeOnFailure(ingestDocument, error, "first"); - verify(compoundOnFailProcessor, times(1)).execute(ingestDocument); - verify(processorToFail, times(1)).execute(ingestDocument); - verify(compoundOnFailProcessor, times(1)).executeOnFailure(ingestDocument, error, "second"); - verify(lastProcessor, times(1)).execute(ingestDocument); - } -} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/ConvertProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/ConvertProcessorTests.java index 77ecd5056c3..487dca4c232 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/ConvertProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/ConvertProcessorTests.java @@ -21,8 +21,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.RandomDocumentPicks; -import org.elasticsearch.ingest.processor.ConvertProcessor; -import org.elasticsearch.ingest.processor.Processor; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/FailProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/FailProcessorTests.java index c79cb7b73e5..b6d4df9103a 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/FailProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/FailProcessorTests.java @@ -20,11 +20,9 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.ingest.TestTemplateService; -import org.elasticsearch.ingest.processor.FailProcessor; -import org.elasticsearch.ingest.processor.FailProcessorException; -import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.test.ESTestCase; import static org.hamcrest.Matchers.equalTo; diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/GsubProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/GsubProcessorTests.java index 76e00b218e1..c2873ebf569 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/GsubProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/GsubProcessorTests.java @@ -21,8 +21,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.RandomDocumentPicks; -import org.elasticsearch.ingest.processor.GsubProcessor; -import org.elasticsearch.ingest.processor.Processor; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.test.ESTestCase; import java.util.Collections; diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/JoinProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/JoinProcessorTests.java index 3afc8132999..65552b41c02 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/JoinProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/JoinProcessorTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.RandomDocumentPicks; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/RemoveProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/RemoveProcessorTests.java index 39d31d3a0c2..952354cc800 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/RemoveProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/RemoveProcessorTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.ingest.TestTemplateService; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.test.ESTestCase; import java.util.HashMap; diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/RenameProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/RenameProcessorTests.java index 75a5ec8e36b..c9e2e8be51c 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/RenameProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/RenameProcessorTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.RandomDocumentPicks; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/SetProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/SetProcessorTests.java index bf91da0dab5..7de742c908f 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/SetProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/SetProcessorTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.ingest.TemplateService; import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.ingest.ValueSource; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/SplitProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/SplitProcessorTests.java index 7c1f1a13047..b4487a6a09b 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/SplitProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/SplitProcessorTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.ingest.processor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.RandomDocumentPicks; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.test.ESTestCase; import java.util.Arrays; diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java index 9a31b4f9006..41b989088db 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java @@ -29,9 +29,9 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.TestTemplateService; -import org.elasticsearch.ingest.processor.CompoundProcessor; -import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.ingest.processor.SetProcessor; +import org.elasticsearch.ingest.CompoundProcessor; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java index 336c02f7b0d..c841f8cce69 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java @@ -21,7 +21,6 @@ package org.elasticsearch.plugin.ingest.transport; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; @@ -32,8 +31,8 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.processor.CompoundProcessor; -import org.elasticsearch.ingest.processor.Processor; +import org.elasticsearch.ingest.CompoundProcessor; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugin.ingest.IngestBootstrapper; import org.elasticsearch.plugin.ingest.IngestPlugin; import org.elasticsearch.plugin.ingest.PipelineExecutionService; @@ -45,14 +44,12 @@ import org.junit.Before; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.function.Consumer; import static org.elasticsearch.plugin.ingest.transport.IngestActionFilter.BulkRequestModifier; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionServiceTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionServiceTests.java index c4d12b23a47..c913db72ca1 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionServiceTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulateExecutionServiceTests.java @@ -23,14 +23,12 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.RandomDocumentPicks; -import org.elasticsearch.ingest.processor.CompoundProcessor; +import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; -import java.util.Arrays; - import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestParsingTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestParsingTests.java index bee6ddd141c..985e7ef1424 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestParsingTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequestParsingTests.java @@ -21,8 +21,8 @@ package org.elasticsearch.plugin.ingest.transport.simulate; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.processor.CompoundProcessor; -import org.elasticsearch.ingest.processor.Processor; +import org.elasticsearch.ingest.CompoundProcessor; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugin.ingest.PipelineStore; import org.elasticsearch.test.ESTestCase; import org.junit.Before; diff --git a/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/IngestMustacheSetProcessorIT.java b/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/IngestMustacheSetProcessorIT.java index b496d189f18..47fef1a2228 100644 --- a/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/IngestMustacheSetProcessorIT.java +++ b/qa/ingest-with-mustache/src/test/java/org/elasticsearch/plugin/ingest/IngestMustacheSetProcessorIT.java @@ -22,7 +22,7 @@ package org.elasticsearch.plugin.ingest; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.ValueSource; -import org.elasticsearch.ingest.processor.Processor; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.processor.SetProcessor; import org.hamcrest.Matchers;