diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java
index 6924550d79b..ce98767bcc8 100644
--- a/core/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java
+++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java
@@ -24,7 +24,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.ingest.AbstractProcessorFactory;
+import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import java.io.IOException;
@@ -91,7 +91,7 @@ public class SimulateProcessorResult implements Writeable, ToXContent {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (processorTag != null) {
- builder.field(AbstractProcessorFactory.TAG_KEY, processorTag);
+ builder.field(ConfigurationUtils.TAG_KEY, processorTag);
}
if (failure == null) {
ingestDocument.toXContent(builder, params);
diff --git a/core/src/main/java/org/elasticsearch/ingest/AbstractProcessorFactory.java b/core/src/main/java/org/elasticsearch/ingest/AbstractProcessorFactory.java
deleted file mode 100644
index 0fb50228793..00000000000
--- a/core/src/main/java/org/elasticsearch/ingest/AbstractProcessorFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.ingest;
-
-import java.util.Map;
-
-/**
- * A processor implementation may modify the data belonging to a document.
- * Whether changes are made and what exactly is modified is up to the implementation.
- */
-public abstract class AbstractProcessorFactory
implements Processor.Factory
{
- public static final String TAG_KEY = "tag";
-
- @Override
- public P create(Map config) throws Exception {
- String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
- return doCreate(tag, config);
- }
-
- protected abstract P doCreate(String tag, Map config) throws Exception;
-}
diff --git a/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java b/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java
index 623c4f315a4..54fbc4a0237 100644
--- a/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java
+++ b/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java
@@ -29,6 +29,8 @@ import java.util.Map;
public final class ConfigurationUtils {
+ public static final String TAG_KEY = "tag";
+
private ConfigurationUtils() {
}
@@ -255,8 +257,8 @@ public final class ConfigurationUtils {
ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY);
List onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry);
- Processor processor;
- processor = factory.create(config);
+ String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
+ Processor processor = factory.create(tag, config);
if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) {
throw newConfigurationException(processor.getType(), processor.getTag(), Pipeline.ON_FAILURE_KEY,
diff --git a/core/src/main/java/org/elasticsearch/ingest/Processor.java b/core/src/main/java/org/elasticsearch/ingest/Processor.java
index 92c18464e94..d4a5462c1c9 100644
--- a/core/src/main/java/org/elasticsearch/ingest/Processor.java
+++ b/core/src/main/java/org/elasticsearch/ingest/Processor.java
@@ -45,14 +45,17 @@ public interface Processor {
/**
* A factory that knows how to construct a processor based on a map of maps.
*/
- interface Factory {
+ interface Factory {
/**
* Creates a processor based on the specified map of maps config.
*
+ * @param tag The tag for the processor
+ * @param config Configuration for the processor to create
+ *
* Implementations are responsible for removing the used keys, so that after creating a pipeline ingest can
* verify if all configurations settings have been used.
*/
- P create(Map config) throws Exception;
+ Processor create(String tag, Map config) throws Exception;
}
}
diff --git a/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java b/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java
index dd16898aea2..461247b6d09 100644
--- a/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java
+++ b/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java
@@ -37,12 +37,12 @@ public final class ProcessorsRegistry {
private final ClusterService clusterService;
private ProcessorsRegistry(ScriptService scriptService, ClusterService clusterService,
- Map>> providers) {
+ Map> providers) {
this.templateService = new InternalTemplateService(scriptService);
this.scriptService = scriptService;
this.clusterService = clusterService;
Map processorFactories = new HashMap<>();
- for (Map.Entry>> entry : providers.entrySet()) {
+ for (Map.Entry> entry : providers.entrySet()) {
processorFactories.put(entry.getKey(), entry.getValue().apply(this));
}
this.processorFactories = Collections.unmodifiableMap(processorFactories);
@@ -71,13 +71,13 @@ public final class ProcessorsRegistry {
public static final class Builder {
- private final Map>> providers = new HashMap<>();
+ private final Map> providers = new HashMap<>();
/**
* Adds a processor factory under a specific name.
*/
- public void registerProcessor(String name, Function> provider) {
- Function> previous = this.providers.putIfAbsent(name, provider);
+ public void registerProcessor(String name, Function provider) {
+ Function previous = this.providers.putIfAbsent(name, provider);
if (previous != null) {
throw new IllegalArgumentException("Processor factory already registered for name [" + name + "]");
}
diff --git a/core/src/main/java/org/elasticsearch/node/NodeModule.java b/core/src/main/java/org/elasticsearch/node/NodeModule.java
index 6d8be732dd2..49de57a87b6 100644
--- a/core/src/main/java/org/elasticsearch/node/NodeModule.java
+++ b/core/src/main/java/org/elasticsearch/node/NodeModule.java
@@ -62,7 +62,7 @@ public class NodeModule extends AbstractModule {
/**
* Adds a processor factory under a specific type name.
*/
- public void registerProcessor(String type, Function> provider) {
+ public void registerProcessor(String type, Function provider) {
processorsRegistryBuilder.registerProcessor(type, provider);
}
}
diff --git a/core/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java b/core/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java
index fa78d5aa16c..82886628c85 100644
--- a/core/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java
+++ b/core/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java
@@ -98,7 +98,7 @@ public class ConfigurationUtilsTests extends ESTestCase {
public void testReadProcessors() throws Exception {
Processor processor = mock(Processor.class);
ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder();
- builder.registerProcessor("test_processor", (registry) -> config -> processor);
+ builder.registerProcessor("test_processor", (registry) -> (tag, config) -> processor);
ProcessorsRegistry registry = builder.build(mock(ScriptService.class), mock(ClusterService.class));
diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java
index cb7bd849a47..8fe7ddd84cb 100644
--- a/core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java
+++ b/core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java
@@ -40,7 +40,7 @@ public class PipelineFactoryTests extends ESTestCase {
public void testCreate() throws Exception {
Map processorConfig0 = new HashMap<>();
Map processorConfig1 = new HashMap<>();
- processorConfig0.put(AbstractProcessorFactory.TAG_KEY, "first-processor");
+ processorConfig0.put(ConfigurationUtils.TAG_KEY, "first-processor");
Map pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.PROCESSORS_KEY,
diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java
index 55ea4360ece..ec41eda47b0 100644
--- a/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java
+++ b/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java
@@ -58,7 +58,7 @@ public class PipelineStoreTests extends ESTestCase {
public void init() throws Exception {
store = new PipelineStore(Settings.EMPTY);
ProcessorsRegistry.Builder registryBuilder = new ProcessorsRegistry.Builder();
- registryBuilder.registerProcessor("set", (registry) -> config -> {
+ registryBuilder.registerProcessor("set", (registry) -> (tag, config) -> {
String field = (String) config.remove("field");
String value = (String) config.remove("value");
return new Processor() {
@@ -78,7 +78,7 @@ public class PipelineStoreTests extends ESTestCase {
}
};
});
- registryBuilder.registerProcessor("remove", (registry) -> config -> {
+ registryBuilder.registerProcessor("remove", (registry) -> (tag, config) -> {
String field = (String) config.remove("field");
return new Processor() {
@Override
diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java
index d9ffd34cac1..f35f3fec534 100644
--- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java
+++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java
@@ -20,9 +20,9 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.ingest.AbstractProcessor;
-import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Processor;
import java.util.Map;
@@ -53,7 +53,7 @@ abstract class AbstractStringProcessor extends AbstractProcessor {
protected abstract String process(String value);
- static abstract class Factory extends AbstractProcessorFactory {
+ static abstract class Factory implements Processor.Factory {
protected final String processorType;
protected Factory(String processorType) {
@@ -61,11 +61,11 @@ abstract class AbstractStringProcessor extends AbstractProcessor {
}
@Override
- public T doCreate(String processorTag, Map config) throws Exception {
+ public AbstractStringProcessor create(String processorTag, Map config) throws Exception {
String field = ConfigurationUtils.readStringProperty(processorType, processorTag, config, "field");
return newProcessor(processorTag, field);
}
- protected abstract T newProcessor(String processorTag, String field);
+ protected abstract AbstractStringProcessor newProcessor(String processorTag, String field);
}
}
diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java
index af163c3c187..26b0e66a63c 100644
--- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java
+++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java
@@ -20,9 +20,10 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.ingest.AbstractProcessor;
-import org.elasticsearch.ingest.AbstractProcessorFactory;
+import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import org.elasticsearch.ingest.ValueSource;
@@ -64,7 +65,7 @@ public final class AppendProcessor extends AbstractProcessor {
return TYPE;
}
- public static final class Factory extends AbstractProcessorFactory {
+ public static final class Factory implements Processor.Factory {
private final TemplateService templateService;
@@ -73,7 +74,7 @@ public final class AppendProcessor extends AbstractProcessor {
}
@Override
- public AppendProcessor doCreate(String processorTag, Map config) throws Exception {
+ public AppendProcessor create(String processorTag, Map config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
return new AppendProcessor(processorTag, templateService.compile(field), ValueSource.wrap(value, templateService));
diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java
index 015c56c72c3..558bdde96a2 100644
--- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java
+++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java
@@ -20,9 +20,9 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.ingest.AbstractProcessor;
-import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Processor;
import java.util.ArrayList;
import java.util.List;
@@ -160,9 +160,9 @@ public final class ConvertProcessor extends AbstractProcessor {
return TYPE;
}
- public static final class Factory extends AbstractProcessorFactory {
+ public static final class Factory implements Processor.Factory {
@Override
- public ConvertProcessor doCreate(String processorTag, Map config) throws Exception {
+ public ConvertProcessor create(String processorTag, Map config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String typeProperty = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "type");
String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", field);
diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateIndexNameProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateIndexNameProcessor.java
index a94d4d048a8..c750c84c576 100644
--- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateIndexNameProcessor.java
+++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateIndexNameProcessor.java
@@ -21,9 +21,9 @@ package org.elasticsearch.ingest.common;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ingest.AbstractProcessor;
-import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Processor;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
@@ -120,10 +120,10 @@ public final class DateIndexNameProcessor extends AbstractProcessor {
return dateFormats;
}
- public static final class Factory extends AbstractProcessorFactory {
+ public static final class Factory implements Processor.Factory {
@Override
- protected DateIndexNameProcessor doCreate(String tag, Map config) throws Exception {
+ public DateIndexNameProcessor create(String tag, Map config) throws Exception {
String localeString = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "locale");
String timezoneString = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "timezone");
DateTimeZone timezone = timezoneString == null ? DateTimeZone.UTC : DateTimeZone.forID(timezoneString);
diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateProcessor.java
index b82b9c8b76c..e61ed513114 100644
--- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateProcessor.java
+++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateProcessor.java
@@ -21,9 +21,9 @@ package org.elasticsearch.ingest.common;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ingest.AbstractProcessor;
-import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Processor;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.ISODateTimeFormat;
@@ -108,10 +108,10 @@ public final class DateProcessor extends AbstractProcessor {
return formats;
}
- public static final class Factory extends AbstractProcessorFactory {
+ public static final class Factory implements Processor.Factory {
@SuppressWarnings("unchecked")
- public DateProcessor doCreate(String processorTag, Map config) throws Exception {
+ public DateProcessor create(String processorTag, Map config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", DEFAULT_TARGET_FIELD);
String timezoneString = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "timezone");
diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FailProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FailProcessor.java
index 6c434d85d5a..a24322f556f 100644
--- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FailProcessor.java
+++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FailProcessor.java
@@ -20,9 +20,9 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.ingest.AbstractProcessor;
-import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TemplateService;
import java.util.Map;
@@ -56,7 +56,7 @@ public final class FailProcessor extends AbstractProcessor {
return TYPE;
}
- public static final class Factory extends AbstractProcessorFactory {
+ public static final class Factory implements Processor.Factory {
private final TemplateService templateService;
@@ -65,7 +65,7 @@ public final class FailProcessor extends AbstractProcessor {
}
@Override
- public FailProcessor doCreate(String processorTag, Map config) throws Exception {
+ public FailProcessor create(String processorTag, Map config) throws Exception {
String message = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "message");
return new FailProcessor(processorTag, templateService.compile(message));
}
diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java
index b6d14d1b8c5..05be47633ba 100644
--- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java
+++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java
@@ -20,7 +20,6 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.ingest.AbstractProcessor;
-import org.elasticsearch.ingest.AbstractProcessorFactory;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
@@ -83,7 +82,7 @@ public final class ForEachProcessor extends AbstractProcessor {
return processors;
}
- public static final class Factory extends AbstractProcessorFactory {
+ public static final class Factory implements Processor.Factory {
private final ProcessorsRegistry processorRegistry;
@@ -92,7 +91,7 @@ public final class ForEachProcessor extends AbstractProcessor {
}
@Override
- protected ForEachProcessor doCreate(String tag, Map config) throws Exception {
+ public ForEachProcessor create(String tag, Map config) throws Exception {
String field = readStringProperty(TYPE, tag, config, "field");
List