From 08b3b6264e8618deb125ab5a4825e50a7c04f9ab Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Thu, 30 Jun 2016 01:49:22 -0700 Subject: [PATCH] Tests pass, started removing generics from processor factory --- .../ingest/SimulatePipelineRequest.java | 2 +- .../ingest/AbstractProcessorFactory.java | 7 +-- .../ingest/ConfigurationUtils.java | 12 ++--- .../elasticsearch/ingest/IngestService.java | 39 ++++++++------ .../org/elasticsearch/ingest/Pipeline.java | 6 +-- .../elasticsearch/ingest/PipelineStore.java | 33 +++++------- .../org/elasticsearch/ingest/Processor.java | 11 ++-- .../ingest/ProcessorsRegistry.java | 48 ----------------- .../java/org/elasticsearch/node/Node.java | 6 +++ .../org/elasticsearch/node/NodeModule.java | 26 ---------- .../node/service/NodeService.java | 17 +++---- .../SimulatePipelineRequestParsingTests.java | 10 ++-- .../elasticsearch/http/HttpServerTests.java | 7 ++- .../ingest/ConfigurationUtilsTests.java | 20 +++----- .../ingest/PipelineFactoryTests.java | 24 +++------ .../ingest/PipelineStoreTests.java | 9 ++-- .../ingest/ProcessorsRegistryTests.java | 51 ------------------- .../common/AbstractStringProcessor.java | 8 +-- .../ingest/common/AppendProcessor.java | 7 +-- .../ingest/common/ConvertProcessor.java | 7 +-- .../ingest/common/DateIndexNameProcessor.java | 27 +++++----- .../ingest/common/DateProcessor.java | 7 +-- .../ingest/common/FailProcessor.java | 7 +-- .../ingest/common/ForEachProcessor.java | 9 ++-- .../ingest/common/GrokProcessor.java | 7 +-- .../ingest/common/GsubProcessor.java | 7 +-- .../ingest/common/JoinProcessor.java | 7 +-- .../ingest/common/LowercaseProcessor.java | 2 +- .../ingest/common/RemoveProcessor.java | 7 +-- .../ingest/common/RenameProcessor.java | 7 +-- .../ingest/common/ScriptProcessor.java | 7 +-- .../ingest/common/SetProcessor.java | 7 +-- .../ingest/common/SortProcessor.java | 7 +-- .../ingest/common/SplitProcessor.java | 7 +-- .../ingest/common/TrimProcessor.java | 2 +- .../ingest/common/UppercaseProcessor.java | 2 +- .../common/DateIndexNameFactoryTests.java | 4 +- .../common/FailProcessorFactoryTests.java | 2 +- .../common/ForEachProcessorFactoryTests.java | 7 ++- .../common/GrokProcessorFactoryTests.java | 10 ++-- .../common/ScriptProcessorFactoryTests.java | 5 +- .../common/SplitProcessorFactoryTests.java | 2 +- .../ingest/common/SplitProcessorTests.java | 2 +- .../attachment/AttachmentProcessor.java | 7 +-- .../attachment/IngestAttachmentPlugin.java | 20 +++++--- .../ingest/geoip/GeoIpProcessor.java | 46 ++++++++--------- .../ingest/geoip/IngestGeoIpPlugin.java | 2 +- .../geoip/GeoIpProcessorFactoryTests.java | 10 ++-- .../ingest/IngestTestPlugin.java | 29 ++++++----- .../elasticsearch/ingest/TestProcessor.java | 4 +- 50 files changed, 259 insertions(+), 360 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java delete mode 100644 core/src/test/java/org/elasticsearch/ingest/ProcessorsRegistryTests.java diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index ce24969e8f2..18b1002ed6c 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -141,7 +141,7 @@ public class SimulatePipelineRequest extends ActionRequest config, boolean verbose, PipelineStore pipelineStore) throws Exception { Map pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE); - Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorRegistry()); + Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories()); List ingestDocumentList = parseDocs(config); return new Parsed(pipeline, ingestDocumentList, verbose); } diff --git a/core/src/main/java/org/elasticsearch/ingest/AbstractProcessorFactory.java b/core/src/main/java/org/elasticsearch/ingest/AbstractProcessorFactory.java index 95c4a93c9bc..81f79c0fd4f 100644 --- a/core/src/main/java/org/elasticsearch/ingest/AbstractProcessorFactory.java +++ b/core/src/main/java/org/elasticsearch/ingest/AbstractProcessorFactory.java @@ -25,14 +25,15 @@ import java.util.Map; * A processor implementation may modify the data belonging to a document. * Whether changes are made and what exactly is modified is up to the implementation. */ -public abstract class AbstractProcessorFactory

implements Processor.Factory

{ +public abstract class AbstractProcessorFactory implements Processor.Factory { public static final String TAG_KEY = "tag"; @Override - public P create(ProcessorsRegistry registry, Map config) throws Exception { + public Processor create(Map registry, Map config) throws Exception { String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); return doCreate(registry, tag, config); } - protected abstract P doCreate(ProcessorsRegistry registry, String tag, Map config) throws Exception; + protected abstract Processor doCreate(Map registry, String tag, + Map config) throws Exception; } diff --git a/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java b/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java index 22eea7d23b7..ff0f55a559b 100644 --- a/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java +++ b/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java @@ -234,12 +234,12 @@ public final class ConfigurationUtils { } public static List readProcessorConfigs(List>> processorConfigs, - ProcessorsRegistry processorRegistry) throws Exception { + Map processorFactories) throws Exception { List processors = new ArrayList<>(); if (processorConfigs != null) { for (Map> processorConfigWithKey : processorConfigs) { for (Map.Entry> entry : processorConfigWithKey.entrySet()) { - processors.add(readProcessor(processorRegistry, entry.getKey(), entry.getValue())); + processors.add(readProcessor(processorFactories, entry.getKey(), entry.getValue())); } } } @@ -247,15 +247,15 @@ public final class ConfigurationUtils { return processors; } - private static Processor readProcessor(ProcessorsRegistry processorRegistry, String type, Map config) throws Exception { - Processor.Factory factory = processorRegistry.getProcessorFactory(type); + private static Processor readProcessor(Map processorFactories, String type, Map config) throws Exception { + Processor.Factory factory = processorFactories.get(type); if (factory != null) { boolean ignoreFailure = ConfigurationUtils.readBooleanProperty(null, null, config, "ignore_failure", false); List>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY); - List onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorRegistry); - Processor processor = factory.create(processorRegistry, config); + List onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorFactories); + Processor processor = factory.create(processorFactories, config); if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) { throw newConfigurationException(processor.getType(), processor.getTag(), Pipeline.ON_FAILURE_KEY, diff --git a/core/src/main/java/org/elasticsearch/ingest/IngestService.java b/core/src/main/java/org/elasticsearch/ingest/IngestService.java index ce7d6dadee5..c540d25334c 100644 --- a/core/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/core/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -19,16 +19,18 @@ package org.elasticsearch.ingest; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.script.ScriptService; -import org.elasticsearch.threadpool.ThreadPool; - -import java.io.Closeable; -import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.plugins.IngestPlugin; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.threadpool.ThreadPool; /** * Holder class for several ingest related services. @@ -37,11 +39,20 @@ public class IngestService { private final PipelineStore pipelineStore; private final PipelineExecutionService pipelineExecutionService; - private final ProcessorsRegistry.Builder processorsRegistryBuilder; - public IngestService(Settings settings, ThreadPool threadPool, ProcessorsRegistry.Builder processorsRegistryBuilder) { - this.processorsRegistryBuilder = processorsRegistryBuilder; - this.pipelineStore = new PipelineStore(settings); + public IngestService(Settings settings, ThreadPool threadPool, + Environment env, ScriptService scriptService, List ingestPlugins) { + final TemplateService templateService = new InternalTemplateService(scriptService); + Map processorFactories = new HashMap<>(); + for (IngestPlugin ingestPlugin : ingestPlugins) { + Map newProcessors = ingestPlugin.getProcessors(env, scriptService, templateService); + for (Map.Entry entry : newProcessors.entrySet()) { + if (processorFactories.put(entry.getKey(), entry.getValue()) != null) { + throw new IllegalArgumentException("Ingest processor [" + entry.getKey() + "] is already registered"); + } + } + } + this.pipelineStore = new PipelineStore(settings, Collections.unmodifiableMap(processorFactories)); this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool); } @@ -53,12 +64,8 @@ public class IngestService { return pipelineExecutionService; } - public void buildProcessorsFactoryRegistry(ScriptService scriptService, ClusterService clusterService) { - pipelineStore.buildProcessorFactoryRegistry(processorsRegistryBuilder, scriptService, clusterService); - } - public IngestInfo info() { - Map processorFactories = pipelineStore.getProcessorRegistry().getProcessorFactories(); + Map processorFactories = pipelineStore.getProcessorFactories(); List processorInfoList = new ArrayList<>(processorFactories.size()); for (Map.Entry entry : processorFactories.entrySet()) { processorInfoList.add(new ProcessorInfo(entry.getKey())); diff --git a/core/src/main/java/org/elasticsearch/ingest/Pipeline.java b/core/src/main/java/org/elasticsearch/ingest/Pipeline.java index d568f5521a4..3f4e05229b1 100644 --- a/core/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/core/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -98,13 +98,13 @@ public final class Pipeline { public final static class Factory { - public Pipeline create(String id, Map config, ProcessorsRegistry processorRegistry) throws Exception { + public Pipeline create(String id, Map config, Map processorFactories) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); List>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY); - List processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorRegistry); + List processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorFactories); List>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY); - List onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorRegistry); + List onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorFactories); if (config.isEmpty() == false) { throw new ElasticsearchParseException("pipeline [" + id + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java b/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java index 3118d2b5076..50ae19a2ca1 100644 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java @@ -19,6 +19,12 @@ package org.elasticsearch.ingest; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; @@ -37,20 +43,11 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.script.ScriptService; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; public class PipelineStore extends AbstractComponent implements ClusterStateListener { private final Pipeline.Factory factory = new Pipeline.Factory(); - private ProcessorsRegistry processorRegistry; + private final Map processorFactories; // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there. // We know of all the processor factories when a node with all its plugin have been initialized. Also some @@ -58,13 +55,9 @@ public class PipelineStore extends AbstractComponent implements ClusterStateList // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around. volatile Map pipelines = new HashMap<>(); - public PipelineStore(Settings settings) { + public PipelineStore(Settings settings, Map processorFactories) { super(settings); - } - - public void buildProcessorFactoryRegistry(ProcessorsRegistry.Builder processorsRegistryBuilder, ScriptService scriptService, - ClusterService clusterService) { - this.processorRegistry = processorsRegistryBuilder.build(scriptService, clusterService); + this.processorFactories = processorFactories; } @Override @@ -81,7 +74,7 @@ public class PipelineStore extends AbstractComponent implements ClusterStateList Map pipelines = new HashMap<>(); for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { try { - pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorRegistry)); + pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories)); } catch (ElasticsearchParseException e) { throw e; } catch (Exception e) { @@ -157,7 +150,7 @@ public class PipelineStore extends AbstractComponent implements ClusterStateList } Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false).v2(); - Pipeline pipeline = factory.create(request.getId(), pipelineConfig, processorRegistry); + Pipeline pipeline = factory.create(request.getId(), pipelineConfig, processorFactories); List exceptions = new ArrayList<>(); for (Processor processor : pipeline.flattenAllProcessors()) { for (Map.Entry entry : ingestInfos.entrySet()) { @@ -194,8 +187,8 @@ public class PipelineStore extends AbstractComponent implements ClusterStateList return pipelines.get(id); } - public ProcessorsRegistry getProcessorRegistry() { - return processorRegistry; + public Map getProcessorFactories() { + return processorFactories; } /** diff --git a/core/src/main/java/org/elasticsearch/ingest/Processor.java b/core/src/main/java/org/elasticsearch/ingest/Processor.java index 934cf52b91b..3a67bed6ea8 100644 --- a/core/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/core/src/main/java/org/elasticsearch/ingest/Processor.java @@ -45,14 +45,17 @@ public interface Processor { /** * A factory that knows how to construct a processor based on a map of maps. */ - interface Factory

{ + interface Factory { /** * Creates a processor based on the specified map of maps config. * - * Implementations are responsible for removing the used keys, so that after creating a pipeline ingest can - * verify if all configurations settings have been used. + * @param processorFactories Other processors which may be created inside this processor + * @param config The configuration for the processor + * + * Note: Implementations are responsible for removing the used configuration keys, so that after + * creating a pipeline ingest can verify if all configurations settings have been used. */ - P create(ProcessorsRegistry registry, Map config) throws Exception; + Processor create(Map processorFactories, Map config) throws Exception; } } diff --git a/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java b/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java deleted file mode 100644 index 1b968f0ffc7..00000000000 --- a/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.ingest; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.function.Function; - -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.script.ScriptService; - -public final class ProcessorsRegistry { - - private final Map processorFactories; - - public ProcessorsRegistry(Map processors) { - this.processorFactories = Collections.unmodifiableMap(processors); - } - - public Processor.Factory getProcessorFactory(String name) { - return processorFactories.get(name); - } - - // For testing: - Map getProcessorFactories() { - return processorFactories; - } -} diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index ff095f749c8..ec61f0feb91 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -85,12 +85,14 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.ttl.IndicesTTLService; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.node.internal.InternalSettingsPreparer; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.AnalysisPlugin; +import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; @@ -255,6 +257,9 @@ public class Node implements Closeable { final TribeService tribeService = new TribeService(settings, clusterService); resourcesToClose.add(tribeService); NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(); + final IngestService ingestService = new IngestService(settings, threadPool, environment, + scriptModule.getScriptService(), pluginsService.filterPlugins(IngestPlugin.class)); + ModulesBuilder modules = new ModulesBuilder(); // plugin modules must be added here, before others or we can get crazy injection errors... for (Module pluginModule : pluginsService.nodeModules()) { @@ -293,6 +298,7 @@ public class Node implements Closeable { b.bind(BigArrays.class).toInstance(bigArrays); b.bind(ScriptService.class).toInstance(scriptModule.getScriptService()); b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry()); + b.bind(IngestService.class).toInstance(ingestService); } ); injector = modules.createInjector(); diff --git a/core/src/main/java/org/elasticsearch/node/NodeModule.java b/core/src/main/java/org/elasticsearch/node/NodeModule.java index 6d8be732dd2..8299834435b 100644 --- a/core/src/main/java/org/elasticsearch/node/NodeModule.java +++ b/core/src/main/java/org/elasticsearch/node/NodeModule.java @@ -20,28 +20,17 @@ package org.elasticsearch.node; import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.ingest.ProcessorsRegistry; -import org.elasticsearch.ingest.Processor; -import org.elasticsearch.ingest.TemplateService; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.node.service.NodeService; -import java.util.function.Function; - -/** - * - */ public class NodeModule extends AbstractModule { private final Node node; private final MonitorService monitorService; - private final ProcessorsRegistry.Builder processorsRegistryBuilder; public NodeModule(Node node, MonitorService monitorService) { this.node = node; this.monitorService = monitorService; - this.processorsRegistryBuilder = new ProcessorsRegistry.Builder(); } @Override @@ -49,20 +38,5 @@ public class NodeModule extends AbstractModule { bind(Node.class).toInstance(node); bind(MonitorService.class).toInstance(monitorService); bind(NodeService.class).asEagerSingleton(); - bind(ProcessorsRegistry.Builder.class).toInstance(processorsRegistryBuilder); - } - - /** - * Returns the node - */ - public Node getNode() { - return node; - } - - /** - * Adds a processor factory under a specific type name. - */ - public void registerProcessor(String type, Function> provider) { - processorsRegistryBuilder.registerProcessor(type, provider); } } diff --git a/core/src/main/java/org/elasticsearch/node/service/NodeService.java b/core/src/main/java/org/elasticsearch/node/service/NodeService.java index 0920a84bfd6..79fbb24a177 100644 --- a/core/src/main/java/org/elasticsearch/node/service/NodeService.java +++ b/core/src/main/java/org/elasticsearch/node/service/NodeService.java @@ -19,6 +19,11 @@ package org.elasticsearch.node.service; +import java.io.Closeable; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + import org.elasticsearch.Build; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; @@ -35,18 +40,12 @@ import org.elasticsearch.http.HttpServer; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.ingest.ProcessorsRegistry; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.io.Closeable; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; @@ -76,7 +75,7 @@ public class NodeService extends AbstractComponent implements Closeable { public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery, TransportService transportService, IndicesService indicesService, PluginsService pluginService, CircuitBreakerService circuitBreakerService, - ProcessorsRegistry.Builder processorsRegistryBuilder, ClusterService clusterService, SettingsFilter settingsFilter) { + IngestService ingestService, ClusterService clusterService, SettingsFilter settingsFilter) { super(settings); this.threadPool = threadPool; this.monitorService = monitorService; @@ -86,17 +85,17 @@ public class NodeService extends AbstractComponent implements Closeable { this.pluginService = pluginService; this.circuitBreakerService = circuitBreakerService; this.clusterService = clusterService; - this.ingestService = new IngestService(settings, threadPool, processorsRegistryBuilder); + this.ingestService = ingestService; this.settingsFilter = settingsFilter; clusterService.add(ingestService.getPipelineStore()); clusterService.add(ingestService.getPipelineExecutionService()); } // can not use constructor injection or there will be a circular dependency + // nocommit: try removing this... @Inject(optional = true) public void setScriptService(ScriptService scriptService) { this.scriptService = scriptService; - this.ingestService.buildProcessorsFactoryRegistry(scriptService, clusterService); } public void setHttpServer(@Nullable HttpServer httpServer) { diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java index fa4bdc6525d..6f335aef634 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java @@ -21,9 +21,8 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.ingest.PipelineStore; -import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.TestProcessor; -import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; @@ -59,12 +58,11 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase { TestProcessor processor = new TestProcessor(ingestDocument -> {}); CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor); Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, pipelineCompoundProcessor); - ProcessorsRegistry.Builder processorRegistryBuilder = new ProcessorsRegistry.Builder(); - processorRegistryBuilder.registerProcessor("mock_processor", ((registry) -> mock(Processor.Factory.class))); - ProcessorsRegistry processorRegistry = processorRegistryBuilder.build(mock(ScriptService.class), mock(ClusterService.class)); + Map registry = + Collections.singletonMap("mock_processor", (factories, config) -> processor); store = mock(PipelineStore.class); when(store.get(SIMULATED_PIPELINE_ID)).thenReturn(pipeline); - when(store.getProcessorRegistry()).thenReturn(processorRegistry); + when(store.getProcessorFactories()).thenReturn(registry); } public void testParseUsingPipelineStore() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/http/HttpServerTests.java b/core/src/test/java/org/elasticsearch/http/HttpServerTests.java index 2ba7da84c14..f78269d87d2 100644 --- a/core/src/test/java/org/elasticsearch/http/HttpServerTests.java +++ b/core/src/test/java/org/elasticsearch/http/HttpServerTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.rest.AbstractRestChannel; import org.elasticsearch.rest.BytesRestResponse; @@ -45,6 +46,7 @@ import org.junit.Before; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.Map; public class HttpServerTests extends ESTestCase { @@ -74,8 +76,9 @@ public class HttpServerTests extends ESTestCase { ClusterService clusterService = new ClusterService(Settings.EMPTY, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null); - NodeService nodeService = new NodeService(Settings.EMPTY, null, null, null, null, null, null, null, null, - clusterService, null); + IngestService ingestService = new IngestService(settings, null, null, null, Collections.emptyList()); + NodeService nodeService = new NodeService(Settings.EMPTY, null, null, null, null, null, null, null, + ingestService, clusterService, null); httpServer = new HttpServer(settings, httpServerTransport, restController, nodeService, circuitBreakerService); httpServer.start(); } diff --git a/core/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java b/core/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java index fa78d5aa16c..eab5793d3f6 100644 --- a/core/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/ConfigurationUtilsTests.java @@ -19,14 +19,6 @@ package org.elasticsearch.ingest; -import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.ingest.ProcessorsRegistry; -import org.elasticsearch.script.ScriptService; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.TestThreadPool; -import org.junit.Before; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -34,6 +26,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; @@ -97,10 +95,8 @@ public class ConfigurationUtilsTests extends ESTestCase { public void testReadProcessors() throws Exception { Processor processor = mock(Processor.class); - ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder(); - builder.registerProcessor("test_processor", (registry) -> config -> processor); - ProcessorsRegistry registry = builder.build(mock(ScriptService.class), mock(ClusterService.class)); - + Map registry = + Collections.singletonMap("test_processor", (factories, config) -> processor); List>> config = new ArrayList<>(); Map emptyConfig = Collections.emptyMap(); diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java index cb7bd849a47..8510c41b3d3 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java @@ -46,7 +46,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.PROCESSORS_KEY, Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1))); Pipeline.Factory factory = new Pipeline.Factory(); - ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory())); + Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); @@ -62,7 +62,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); Pipeline.Factory factory = new Pipeline.Factory(); try { - factory.create("_id", pipelineConfig, createProcessorRegistry(Collections.emptyMap())); + factory.create("_id", pipelineConfig, Collections.emptyMap()); fail("should fail, missing required [processors] field"); } catch (ElasticsearchParseException e) { assertThat(e.getMessage(), equalTo("[processors] required property is missing")); @@ -76,7 +76,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Pipeline.Factory factory = new Pipeline.Factory(); - ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory())); + Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); @@ -93,7 +93,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.emptyList()); Pipeline.Factory factory = new Pipeline.Factory(); - ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory())); + Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create("_id", pipelineConfig, processorRegistry)); assertThat(e.getMessage(), equalTo("pipeline [_id] cannot have an empty on_failure option defined")); } @@ -105,7 +105,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Pipeline.Factory factory = new Pipeline.Factory(); - ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory())); + Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create("_id", pipelineConfig, processorRegistry)); assertThat(e.getMessage(), equalTo("[on_failure] processors list cannot be empty")); } @@ -114,7 +114,7 @@ public class PipelineFactoryTests extends ESTestCase { Map processorConfig = new HashMap<>(); processorConfig.put("ignore_failure", true); - ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory())); + Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); Pipeline.Factory factory = new Pipeline.Factory(); Map pipelineConfig = new HashMap<>(); pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); @@ -139,7 +139,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Pipeline.Factory factory = new Pipeline.Factory(); - ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory())); + Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create("_id", pipelineConfig, processorRegistry)); assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]")); } @@ -152,7 +152,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); Pipeline.Factory factory = new Pipeline.Factory(); - ProcessorsRegistry processorRegistry = createProcessorRegistry(Collections.singletonMap("test", new TestProcessor.Factory())); + Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); @@ -169,12 +169,4 @@ public class PipelineFactoryTests extends ESTestCase { List flattened = pipeline.flattenAllProcessors(); assertThat(flattened.size(), equalTo(4)); } - - private ProcessorsRegistry createProcessorRegistry(Map processorRegistry) { - ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder(); - for (Map.Entry entry : processorRegistry.entrySet()) { - builder.registerProcessor(entry.getKey(), ((registry) -> entry.getValue())); - } - return builder.build(mock(ScriptService.class), mock(ClusterService.class)); - } } diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java index 55ea4360ece..fd4c9038e0d 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java @@ -56,9 +56,8 @@ public class PipelineStoreTests extends ESTestCase { @Before public void init() throws Exception { - store = new PipelineStore(Settings.EMPTY); - ProcessorsRegistry.Builder registryBuilder = new ProcessorsRegistry.Builder(); - registryBuilder.registerProcessor("set", (registry) -> config -> { + Map processorFactories = new HashMap<>(); + processorFactories.put("set", (factories, config) -> { String field = (String) config.remove("field"); String value = (String) config.remove("value"); return new Processor() { @@ -78,7 +77,7 @@ public class PipelineStoreTests extends ESTestCase { } }; }); - registryBuilder.registerProcessor("remove", (registry) -> config -> { + processorFactories.put("remove", (factories, config) -> { String field = (String) config.remove("field"); return new Processor() { @Override @@ -97,7 +96,7 @@ public class PipelineStoreTests extends ESTestCase { } }; }); - store.buildProcessorFactoryRegistry(registryBuilder, mock(ScriptService.class), mock(ClusterService.class)); + store = new PipelineStore(Settings.EMPTY, processorFactories); } public void testUpdatePipelines() { diff --git a/core/src/test/java/org/elasticsearch/ingest/ProcessorsRegistryTests.java b/core/src/test/java/org/elasticsearch/ingest/ProcessorsRegistryTests.java deleted file mode 100644 index 41701102584..00000000000 --- a/core/src/test/java/org/elasticsearch/ingest/ProcessorsRegistryTests.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.ingest; - -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.script.ScriptService; -import org.elasticsearch.test.ESTestCase; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.sameInstance; -import static org.mockito.Mockito.mock; - -public class ProcessorsRegistryTests extends ESTestCase { - - public void testBuildProcessorRegistry() { - ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder(); - TestProcessor.Factory factory1 = new TestProcessor.Factory(); - builder.registerProcessor("1", (registry) -> factory1); - TestProcessor.Factory factory2 = new TestProcessor.Factory(); - builder.registerProcessor("2", (registry) -> factory2); - TestProcessor.Factory factory3 = new TestProcessor.Factory(); - try { - builder.registerProcessor("1", (registry) -> factory3); - fail("addProcessor should have failed"); - } catch(IllegalArgumentException e) { - assertThat(e.getMessage(), equalTo("Processor factory already registered for name [1]")); - } - - ProcessorsRegistry registry = builder.build(mock(ScriptService.class), mock(ClusterService.class)); - assertThat(registry.getProcessorFactories().size(), equalTo(2)); - assertThat(registry.getProcessorFactory("1"), sameInstance(factory1)); - assertThat(registry.getProcessorFactory("2"), sameInstance(factory2)); - } -} diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java index 45ef6835ff0..d491da729f1 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java @@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.AbstractProcessorFactory; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.Processor; import java.util.Map; @@ -54,7 +54,7 @@ abstract class AbstractStringProcessor extends AbstractProcessor { protected abstract String process(String value); - static abstract class Factory extends AbstractProcessorFactory { + static abstract class Factory extends AbstractProcessorFactory { protected final String processorType; protected Factory(String processorType) { @@ -62,11 +62,11 @@ abstract class AbstractStringProcessor extends AbstractProcessor { } @Override - public T doCreate(ProcessorsRegistry registry, String processorTag, Map config) throws Exception { + public AbstractStringProcessor doCreate(Map registry, String processorTag, Map config) throws Exception { String field = ConfigurationUtils.readStringProperty(processorType, processorTag, config, "field"); return newProcessor(processorTag, field); } - protected abstract T newProcessor(String processorTag, String field); + protected abstract AbstractStringProcessor newProcessor(String processorTag, String field); } } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java index 242e3411d3f..3b810b64d69 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java @@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.AbstractProcessorFactory; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.TemplateService; import org.elasticsearch.ingest.ValueSource; @@ -65,7 +65,7 @@ public final class AppendProcessor extends AbstractProcessor { return TYPE; } - public static final class Factory extends AbstractProcessorFactory { + public static final class Factory extends AbstractProcessorFactory { private final TemplateService templateService; @@ -74,7 +74,8 @@ public final class AppendProcessor extends AbstractProcessor { } @Override - public AppendProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map config) throws Exception { + public AppendProcessor doCreate(Map registry, String processorTag, + Map config) throws Exception { String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value"); return new AppendProcessor(processorTag, templateService.compile(field), ValueSource.wrap(value, templateService)); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java index 4e35003aa88..c8c0bdcf73f 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java @@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.AbstractProcessorFactory; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.Processor; import java.util.ArrayList; import java.util.List; @@ -161,9 +161,10 @@ public final class ConvertProcessor extends AbstractProcessor { return TYPE; } - public static final class Factory extends AbstractProcessorFactory { + public static final class Factory extends AbstractProcessorFactory { @Override - public ConvertProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map config) throws Exception { + public ConvertProcessor doCreate(Map registry, String processorTag, + Map config) throws Exception { String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); String typeProperty = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "type"); String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", field); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateIndexNameProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateIndexNameProcessor.java index d89430c4bef..9e38f58d407 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateIndexNameProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateIndexNameProcessor.java @@ -19,17 +19,6 @@ package org.elasticsearch.ingest.common; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ingest.AbstractProcessor; -import org.elasticsearch.ingest.AbstractProcessorFactory; -import org.elasticsearch.ingest.ConfigurationUtils; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.ProcessorsRegistry; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; - import java.util.ArrayList; import java.util.Collections; import java.util.IllformedLocaleException; @@ -38,6 +27,17 @@ import java.util.Locale; import java.util.Map; import java.util.function.Function; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.AbstractProcessorFactory; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + public final class DateIndexNameProcessor extends AbstractProcessor { public static final String TYPE = "date_index_name"; @@ -121,10 +121,11 @@ public final class DateIndexNameProcessor extends AbstractProcessor { return dateFormats; } - public static final class Factory extends AbstractProcessorFactory { + public static final class Factory extends AbstractProcessorFactory { @Override - protected DateIndexNameProcessor doCreate(ProcessorsRegistry registry, String tag, Map config) throws Exception { + protected DateIndexNameProcessor doCreate(Map registry, String tag, + Map config) throws Exception { String localeString = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "locale"); String timezoneString = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "timezone"); DateTimeZone timezone = timezoneString == null ? DateTimeZone.UTC : DateTimeZone.forID(timezoneString); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateProcessor.java index 0b17123bcb0..c780401974c 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateProcessor.java @@ -24,7 +24,7 @@ import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.AbstractProcessorFactory; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.Processor; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.format.ISODateTimeFormat; @@ -109,10 +109,11 @@ public final class DateProcessor extends AbstractProcessor { return formats; } - public static final class Factory extends AbstractProcessorFactory { + public static final class Factory extends AbstractProcessorFactory { @SuppressWarnings("unchecked") - public DateProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map config) throws Exception { + public DateProcessor doCreate(Map registry, String processorTag, + Map config) throws Exception { String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", DEFAULT_TARGET_FIELD); String timezoneString = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "timezone"); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FailProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FailProcessor.java index 7c806c08cdf..52e882afe16 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FailProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FailProcessor.java @@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.AbstractProcessorFactory; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.TemplateService; import java.util.Map; @@ -57,7 +57,7 @@ public final class FailProcessor extends AbstractProcessor { return TYPE; } - public static final class Factory extends AbstractProcessorFactory { + public static final class Factory extends AbstractProcessorFactory { private final TemplateService templateService; @@ -66,7 +66,8 @@ public final class FailProcessor extends AbstractProcessor { } @Override - public FailProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map config) throws Exception { + public FailProcessor doCreate(Map registry, String processorTag, + Map config) throws Exception { String message = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "message"); return new FailProcessor(processorTag, templateService.compile(message)); } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 9ee244962ab..d5795a4aed0 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -24,7 +24,7 @@ import org.elasticsearch.ingest.AbstractProcessorFactory; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; -import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.Processor; import java.util.ArrayList; import java.util.Collections; @@ -83,12 +83,13 @@ public final class ForEachProcessor extends AbstractProcessor { return processors; } - public static final class Factory extends AbstractProcessorFactory { + public static final class Factory extends AbstractProcessorFactory { @Override - protected ForEachProcessor doCreate(String tag, Map config) throws Exception { + protected ForEachProcessor doCreate(Map factories, String tag, + Map config) throws Exception { String field = readStringProperty(TYPE, tag, config, "field"); List>> processorConfigs = readList(TYPE, tag, config, "processors"); - List processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorRegistry); + List processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, factories); return new ForEachProcessor(tag, field, Collections.unmodifiableList(processors)); } } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java index 1c11cd13487..95c0b2ae179 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java @@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.AbstractProcessorFactory; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.Processor; import java.util.HashMap; import java.util.List; @@ -115,7 +115,7 @@ public final class GrokProcessor extends AbstractProcessor { return combinedPattern; } - public final static class Factory extends AbstractProcessorFactory { + public final static class Factory extends AbstractProcessorFactory { private final Map builtinPatterns; @@ -124,7 +124,8 @@ public final class GrokProcessor extends AbstractProcessor { } @Override - public GrokProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map config) throws Exception { + public GrokProcessor doCreate(Map registry, String processorTag, + Map config) throws Exception { String matchField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); List matchPatterns = ConfigurationUtils.readList(TYPE, processorTag, config, "patterns"); boolean traceMatch = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "trace_match", false); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GsubProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GsubProcessor.java index 124b1ded1ce..d5399f1edc0 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GsubProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GsubProcessor.java @@ -22,7 +22,7 @@ package org.elasticsearch.ingest.common; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.AbstractProcessorFactory; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.Processor; import java.util.Map; import java.util.regex.Matcher; @@ -79,9 +79,10 @@ public final class GsubProcessor extends AbstractProcessor { return TYPE; } - public static final class Factory extends AbstractProcessorFactory { + public static final class Factory extends AbstractProcessorFactory { @Override - public GsubProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map config) throws Exception { + public GsubProcessor doCreate(Map registry, String processorTag, + Map config) throws Exception { String field = readStringProperty(TYPE, processorTag, config, "field"); String pattern = readStringProperty(TYPE, processorTag, config, "pattern"); String replacement = readStringProperty(TYPE, processorTag, config, "replacement"); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JoinProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JoinProcessor.java index b0752d4fab9..e4963af2f9a 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JoinProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JoinProcessor.java @@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.AbstractProcessorFactory; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.Processor; import java.util.List; import java.util.Map; @@ -71,9 +71,10 @@ public final class JoinProcessor extends AbstractProcessor { return TYPE; } - public final static class Factory extends AbstractProcessorFactory { + public final static class Factory extends AbstractProcessorFactory { @Override - public JoinProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map config) throws Exception { + public JoinProcessor doCreate(Map registry, String processorTag, + Map config) throws Exception { String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); String separator = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "separator"); return new JoinProcessor(processorTag, field, separator); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/LowercaseProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/LowercaseProcessor.java index 9f8ea7a5614..e7a8f3f3e6a 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/LowercaseProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/LowercaseProcessor.java @@ -44,7 +44,7 @@ public final class LowercaseProcessor extends AbstractStringProcessor { return TYPE; } - public final static class Factory extends AbstractStringProcessor.Factory { + public final static class Factory extends AbstractStringProcessor.Factory { public Factory() { super(TYPE); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RemoveProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RemoveProcessor.java index cec88e65d95..cf28d7a68e3 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RemoveProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RemoveProcessor.java @@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.AbstractProcessorFactory; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.TemplateService; import java.util.Map; @@ -56,7 +56,7 @@ public final class RemoveProcessor extends AbstractProcessor { return TYPE; } - public static final class Factory extends AbstractProcessorFactory { + public static final class Factory extends AbstractProcessorFactory { private final TemplateService templateService; @@ -65,7 +65,8 @@ public final class RemoveProcessor extends AbstractProcessor { } @Override - public RemoveProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map config) throws Exception { + public RemoveProcessor doCreate(Map registry, String processorTag, + Map config) throws Exception { String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); return new RemoveProcessor(processorTag, templateService.compile(field)); } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RenameProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RenameProcessor.java index f4c242379e5..97d273b6e95 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RenameProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RenameProcessor.java @@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.AbstractProcessorFactory; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.Processor; import java.util.Map; @@ -76,9 +76,10 @@ public final class RenameProcessor extends AbstractProcessor { return TYPE; } - public static final class Factory extends AbstractProcessorFactory { + public static final class Factory extends AbstractProcessorFactory { @Override - public RenameProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map config) throws Exception { + public RenameProcessor doCreate(Map registry, String processorTag, + Map config) throws Exception { String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field"); return new RenameProcessor(processorTag, field, targetField); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ScriptProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ScriptProcessor.java index cf147e174aa..9a4b6347592 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ScriptProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ScriptProcessor.java @@ -26,7 +26,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.AbstractProcessorFactory; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.script.CompiledScript; import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.Script; @@ -78,7 +78,7 @@ public final class ScriptProcessor extends AbstractProcessor { return TYPE; } - public static final class Factory extends AbstractProcessorFactory { + public static final class Factory extends AbstractProcessorFactory { private final ScriptService scriptService; @@ -87,7 +87,8 @@ public final class ScriptProcessor extends AbstractProcessor { } @Override - public ScriptProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map config) throws Exception { + public ScriptProcessor doCreate(Map registry, String processorTag, + Map config) throws Exception { String field = readOptionalStringProperty(TYPE, processorTag, config, "field"); String lang = readStringProperty(TYPE, processorTag, config, "lang"); String inline = readOptionalStringProperty(TYPE, processorTag, config, "inline"); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SetProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SetProcessor.java index 61053fec7e4..6f574eabfe7 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SetProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SetProcessor.java @@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.AbstractProcessorFactory; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.TemplateService; import org.elasticsearch.ingest.ValueSource; @@ -76,7 +76,7 @@ public final class SetProcessor extends AbstractProcessor { return TYPE; } - public static final class Factory extends AbstractProcessorFactory { + public static final class Factory extends AbstractProcessorFactory { private final TemplateService templateService; @@ -85,7 +85,8 @@ public final class SetProcessor extends AbstractProcessor { } @Override - public SetProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map config) throws Exception { + public SetProcessor doCreate(Map registry, String processorTag, + Map config) throws Exception { String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value"); boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "override", true); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SortProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SortProcessor.java index 152526a9f68..5533e5f5b12 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SortProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SortProcessor.java @@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.AbstractProcessorFactory; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.Processor; import java.util.Collections; import java.util.List; @@ -112,10 +112,11 @@ public final class SortProcessor extends AbstractProcessor { return TYPE; } - public final static class Factory extends AbstractProcessorFactory { + public final static class Factory extends AbstractProcessorFactory { @Override - public SortProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map config) throws Exception { + public SortProcessor doCreate(Map registry, String processorTag, + Map config) throws Exception { String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, FIELD); try { SortOrder direction = SortOrder.fromString( diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SplitProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SplitProcessor.java index 92ce7a02341..3dc67365edc 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SplitProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SplitProcessor.java @@ -23,7 +23,7 @@ import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.AbstractProcessorFactory; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.Processor; import java.util.ArrayList; import java.util.Collections; @@ -73,9 +73,10 @@ public final class SplitProcessor extends AbstractProcessor { return TYPE; } - public static class Factory extends AbstractProcessorFactory { + public static class Factory extends AbstractProcessorFactory { @Override - public SplitProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map config) throws Exception { + public SplitProcessor doCreate(Map registry, String processorTag, + Map config) throws Exception { String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); return new SplitProcessor(processorTag, field, ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "separator")); } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/TrimProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/TrimProcessor.java index a57a25125d6..e852f887da0 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/TrimProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/TrimProcessor.java @@ -41,7 +41,7 @@ public final class TrimProcessor extends AbstractStringProcessor { return TYPE; } - public static final class Factory extends AbstractStringProcessor.Factory { + public static final class Factory extends AbstractStringProcessor.Factory { public Factory() { super(TYPE); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/UppercaseProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/UppercaseProcessor.java index a5c817352a1..5585a130eaf 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/UppercaseProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/UppercaseProcessor.java @@ -43,7 +43,7 @@ public final class UppercaseProcessor extends AbstractStringProcessor { return TYPE; } - public static final class Factory extends AbstractStringProcessor.Factory { + public static final class Factory extends AbstractStringProcessor.Factory { public Factory() { super(TYPE); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/DateIndexNameFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/DateIndexNameFactoryTests.java index 04e2dbf72a4..cc272d0b120 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/DateIndexNameFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/DateIndexNameFactoryTests.java @@ -87,12 +87,12 @@ public class DateIndexNameFactoryTests extends ESTestCase { DateIndexNameProcessor.Factory factory = new DateIndexNameProcessor.Factory(); Map config = new HashMap<>(); config.put("date_rounding", "y"); - ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(config)); + ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config)); assertThat(e.getMessage(), Matchers.equalTo("[field] required property is missing")); config.clear(); config.put("field", "_field"); - e = expectThrows(ElasticsearchParseException.class, () -> factory.create(config)); + e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config)); assertThat(e.getMessage(), Matchers.equalTo("[date_rounding] required property is missing")); } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/FailProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/FailProcessorFactoryTests.java index da2a96f368a..ae1058480b4 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/FailProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/FailProcessorFactoryTests.java @@ -45,7 +45,7 @@ public class FailProcessorFactoryTests extends ESTestCase { config.put("message", "error"); String processorTag = randomAsciiOfLength(10); config.put(AbstractProcessorFactory.TAG_KEY, processorTag); - FailProcessor failProcessor = factory.create(null, config); + FailProcessor failProcessor = (FailProcessor)factory.create(null, config); assertThat(failProcessor.getTag(), equalTo(processorTag)); assertThat(failProcessor.getMessage().execute(Collections.emptyMap()), equalTo("error")); } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java index 8f8aa85ae59..898e178422f 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java @@ -21,7 +21,7 @@ package org.elasticsearch.ingest.common; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.ingest.Processor; -import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; @@ -37,9 +37,8 @@ public class ForEachProcessorFactoryTests extends ESTestCase { public void testCreate() throws Exception { Processor processor = new TestProcessor(ingestDocument -> {}); - Map processors = new HashMap<>(); - processors.put("_name", (r, c) -> processor); - ProcessorsRegistry registry = new ProcessorsRegistry(processors); + Map registry = new HashMap<>(); + registry.put("_name", (r, c) -> processor); ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(); Map config = new HashMap<>(); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java index 4b862f8f92d..2858583c492 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java @@ -50,7 +50,7 @@ public class GrokProcessorFactoryTests extends ESTestCase { GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); Map config = new HashMap<>(); config.put("patterns", Collections.singletonList("(?\\w+)")); - ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(config)); + ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config)); assertThat(e.getMessage(), equalTo("[field] required property is missing")); } @@ -58,7 +58,7 @@ public class GrokProcessorFactoryTests extends ESTestCase { GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); Map config = new HashMap<>(); config.put("field", "foo"); - ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(config)); + ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config)); assertThat(e.getMessage(), equalTo("[patterns] required property is missing")); } @@ -67,7 +67,7 @@ public class GrokProcessorFactoryTests extends ESTestCase { Map config = new HashMap<>(); config.put("field", "foo"); config.put("patterns", Collections.emptyList()); - ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(config)); + ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config)); assertThat(e.getMessage(), equalTo("[patterns] List of patterns must not be empty")); } @@ -89,7 +89,7 @@ public class GrokProcessorFactoryTests extends ESTestCase { Map config = new HashMap<>(); config.put("field", "_field"); config.put("patterns", Collections.singletonList("[")); - ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(config)); + ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config)); assertThat(e.getMessage(), equalTo("[patterns] Invalid regex pattern found in: [[]. premature end of char-class")); } @@ -99,7 +99,7 @@ public class GrokProcessorFactoryTests extends ESTestCase { config.put("field", "_field"); config.put("patterns", Collections.singletonList("%{MY_PATTERN:name}!")); config.put("pattern_definitions", Collections.singletonMap("MY_PATTERN", "[")); - ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(config)); + ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, config)); assertThat(e.getMessage(), equalTo("[patterns] Invalid regex pattern found in: [%{MY_PATTERN:name}!]. premature end of char-class")); } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ScriptProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ScriptProcessorFactoryTests.java index ed47894d4d9..121dbc3a565 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ScriptProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ScriptProcessorFactoryTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; import org.junit.Before; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -55,7 +56,7 @@ public class ScriptProcessorFactoryTests extends ESTestCase { configMap.put("lang", "mockscript"); ElasticsearchException exception = expectThrows(ElasticsearchException.class, - () -> factory.doCreate(randomAsciiOfLength(10), configMap)); + () -> factory.doCreate(null, randomAsciiOfLength(10), configMap)); assertThat(exception.getMessage(), is("[null] Only one of [file], [id], or [inline] may be configured")); } @@ -66,7 +67,7 @@ public class ScriptProcessorFactoryTests extends ESTestCase { configMap.put("lang", "mockscript"); ElasticsearchException exception = expectThrows(ElasticsearchException.class, - () -> factory.doCreate(randomAsciiOfLength(10), configMap)); + () -> factory.doCreate(null, randomAsciiOfLength(10), configMap)); assertThat(exception.getMessage(), is("[null] Need [file], [id], or [inline] parameter to refer to scripts")); } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SplitProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SplitProcessorFactoryTests.java index 0ed7c3d7191..ccf93708e86 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SplitProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SplitProcessorFactoryTests.java @@ -37,7 +37,7 @@ public class SplitProcessorFactoryTests extends ESTestCase { config.put("separator", "\\."); String processorTag = randomAsciiOfLength(10); config.put(AbstractProcessorFactory.TAG_KEY, processorTag); - SplitProcessor splitProcessor = factory.create(null, config); + SplitProcessor splitProcessor = (SplitProcessor)factory.create(null, config); assertThat(splitProcessor.getTag(), equalTo(processorTag)); assertThat(splitProcessor.getField(), equalTo("field1")); assertThat(splitProcessor.getSeparator(), equalTo("\\.")); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SplitProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SplitProcessorTests.java index 13d45dc126b..c2bb21d6422 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SplitProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SplitProcessorTests.java @@ -84,7 +84,7 @@ public class SplitProcessorTests extends ESTestCase { Map splitConfig = new HashMap<>(); splitConfig.put("field", "flags"); splitConfig.put("separator", "\\|"); - Processor splitProcessor = (new SplitProcessor.Factory()).create(splitConfig); + Processor splitProcessor = (new SplitProcessor.Factory()).create(null, splitConfig); Map source = new HashMap<>(); source.put("flags", "new|hot|super|fun|interesting"); IngestDocument ingestDocument = new IngestDocument(source, new HashMap<>()); diff --git a/plugins/ingest-attachment/src/main/java/org/elasticsearch/ingest/attachment/AttachmentProcessor.java b/plugins/ingest-attachment/src/main/java/org/elasticsearch/ingest/attachment/AttachmentProcessor.java index 689adfa1acb..73bed16ec0f 100644 --- a/plugins/ingest-attachment/src/main/java/org/elasticsearch/ingest/attachment/AttachmentProcessor.java +++ b/plugins/ingest-attachment/src/main/java/org/elasticsearch/ingest/attachment/AttachmentProcessor.java @@ -27,7 +27,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.AbstractProcessorFactory; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.ProcessorsRegistry; +import org.elasticsearch.ingest.Processor; import java.io.IOException; import java.util.Arrays; @@ -151,12 +151,13 @@ public final class AttachmentProcessor extends AbstractProcessor { return indexedChars; } - public static final class Factory extends AbstractProcessorFactory { + public static final class Factory extends AbstractProcessorFactory { static final Set DEFAULT_PROPERTIES = EnumSet.allOf(Property.class); @Override - public AttachmentProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map config) throws Exception { + public AttachmentProcessor doCreate(Map registry, String processorTag, + Map config) throws Exception { String field = readStringProperty(TYPE, processorTag, config, "field"); String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "attachment"); List properyNames = readOptionalList(TYPE, processorTag, config, "properties"); diff --git a/plugins/ingest-attachment/src/main/java/org/elasticsearch/ingest/attachment/IngestAttachmentPlugin.java b/plugins/ingest-attachment/src/main/java/org/elasticsearch/ingest/attachment/IngestAttachmentPlugin.java index 3156fe381fd..eaba639255b 100644 --- a/plugins/ingest-attachment/src/main/java/org/elasticsearch/ingest/attachment/IngestAttachmentPlugin.java +++ b/plugins/ingest-attachment/src/main/java/org/elasticsearch/ingest/attachment/IngestAttachmentPlugin.java @@ -19,15 +19,21 @@ package org.elasticsearch.ingest.attachment; -import org.elasticsearch.node.NodeModule; +import java.util.Collections; +import java.util.Map; + +import org.elasticsearch.env.Environment; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.ingest.TemplateService; +import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.ScriptService; -import java.io.IOException; +public class IngestAttachmentPlugin extends Plugin implements IngestPlugin { -public class IngestAttachmentPlugin extends Plugin { - - public void onModule(NodeModule nodeModule) throws IOException { - nodeModule.registerProcessor(AttachmentProcessor.TYPE, - (registry) -> new AttachmentProcessor.Factory()); + @Override + public Map getProcessors( + Environment env, ScriptService scriptService, TemplateService templateService) { + return Collections.singletonMap(AttachmentProcessor.TYPE, new AttachmentProcessor.Factory()); } } diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index 9d417c3534c..4f1fcac40f5 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -19,26 +19,6 @@ package org.elasticsearch.ingest.geoip; -import com.maxmind.geoip2.DatabaseReader; -import com.maxmind.geoip2.exception.AddressNotFoundException; -import com.maxmind.geoip2.model.CityResponse; -import com.maxmind.geoip2.model.CountryResponse; -import com.maxmind.geoip2.record.City; -import com.maxmind.geoip2.record.Continent; -import com.maxmind.geoip2.record.Country; -import com.maxmind.geoip2.record.Location; -import com.maxmind.geoip2.record.Subdivision; -import org.apache.lucene.util.IOUtils; -import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.SpecialPermission; -import org.elasticsearch.common.network.InetAddresses; -import org.elasticsearch.common.network.NetworkAddress; -import org.elasticsearch.ingest.AbstractProcessor; -import org.elasticsearch.ingest.AbstractProcessorFactory; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.ProcessorsRegistry; - -import java.io.Closeable; import java.io.IOException; import java.net.InetAddress; import java.security.AccessController; @@ -52,6 +32,24 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import com.maxmind.geoip2.DatabaseReader; +import com.maxmind.geoip2.exception.AddressNotFoundException; +import com.maxmind.geoip2.model.CityResponse; +import com.maxmind.geoip2.model.CountryResponse; +import com.maxmind.geoip2.record.City; +import com.maxmind.geoip2.record.Continent; +import com.maxmind.geoip2.record.Country; +import com.maxmind.geoip2.record.Location; +import com.maxmind.geoip2.record.Subdivision; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.SpecialPermission; +import org.elasticsearch.common.network.InetAddresses; +import org.elasticsearch.common.network.NetworkAddress; +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.AbstractProcessorFactory; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; + import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalList; import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty; @@ -218,7 +216,7 @@ public final class GeoIpProcessor extends AbstractProcessor { return geoData; } - public static final class Factory extends AbstractProcessorFactory { + public static final class Factory extends AbstractProcessorFactory { static final Set DEFAULT_CITY_PROPERTIES = EnumSet.of( Property.CONTINENT_NAME, Property.COUNTRY_ISO_CODE, Property.REGION_NAME, Property.CITY_NAME, Property.LOCATION @@ -232,7 +230,8 @@ public final class GeoIpProcessor extends AbstractProcessor { } @Override - public GeoIpProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map config) throws Exception { + public GeoIpProcessor doCreate(Map registry, String processorTag, + Map config) throws Exception { String ipField = readStringProperty(TYPE, processorTag, config, "field"); String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "geoip"); String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb.gz"); @@ -240,7 +239,8 @@ public final class GeoIpProcessor extends AbstractProcessor { DatabaseReader databaseReader = databaseReaders.get(databaseFile); if (databaseReader == null) { - throw newConfigurationException(TYPE, processorTag, "database_file", "database file [" + databaseFile + "] doesn't exist"); + throw newConfigurationException(TYPE, processorTag, + "database_file", "database file [" + databaseFile + "] doesn't exist"); } String databaseType = databaseReader.getMetadata().getDatabaseType(); diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index 1b1753439d7..fb83f595e4c 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -49,7 +49,7 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable @Override public synchronized Map getProcessors( - Environment env, ClusterService clusterService, ScriptService scriptService, TemplateService templateService) { + Environment env, ScriptService scriptService, TemplateService templateService) { if (databaseReaders != null) { throw new IllegalStateException("called onModule twice for geoip plugin!!"); } diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index 0d9309a3948..d2de3b821ca 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -78,7 +78,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { String processorTag = randomAsciiOfLength(10); config.put(AbstractProcessorFactory.TAG_KEY, processorTag); - GeoIpProcessor processor = factory.create(null, config); + GeoIpProcessor processor = (GeoIpProcessor)factory.create(null, config); assertThat(processor.getTag(), equalTo(processorTag)); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); @@ -96,7 +96,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { String processorTag = randomAsciiOfLength(10); config.put(AbstractProcessorFactory.TAG_KEY, processorTag); - GeoIpProcessor processor = factory.create(null, config); + GeoIpProcessor processor = (GeoIpProcessor)factory.create(null, config); assertThat(processor.getTag(), equalTo(processorTag)); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); @@ -109,7 +109,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { Map config = new HashMap<>(); config.put("field", "_field"); config.put("target_field", "_field"); - GeoIpProcessor processor = factory.create(null, config); + GeoIpProcessor processor = (GeoIpProcessor)factory.create(null, config); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("_field")); } @@ -119,7 +119,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { Map config = new HashMap<>(); config.put("field", "_field"); config.put("database_file", "GeoLite2-Country.mmdb.gz"); - GeoIpProcessor processor = factory.create(null, config); + GeoIpProcessor processor = (GeoIpProcessor)factory.create(null, config); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getTargetField(), equalTo("geoip")); assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-Country")); @@ -171,7 +171,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { Map config = new HashMap<>(); config.put("field", "_field"); config.put("properties", fieldNames); - GeoIpProcessor processor = factory.create(null, config); + GeoIpProcessor processor = (GeoIpProcessor)factory.create(null, config); assertThat(processor.getField(), equalTo("_field")); assertThat(processor.getProperties(), equalTo(properties)); } diff --git a/test/framework/src/main/java/org/elasticsearch/ingest/IngestTestPlugin.java b/test/framework/src/main/java/org/elasticsearch/ingest/IngestTestPlugin.java index b32a2eab991..9661fc53dea 100644 --- a/test/framework/src/main/java/org/elasticsearch/ingest/IngestTestPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/ingest/IngestTestPlugin.java @@ -19,22 +19,27 @@ package org.elasticsearch.ingest; -import org.elasticsearch.node.NodeModule; +import java.util.Collections; +import java.util.Map; + +import org.elasticsearch.env.Environment; +import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.ScriptService; /** * Adds an ingest processor to be used in tests. */ -public class IngestTestPlugin extends Plugin { - - public void onModule(NodeModule nodeModule) { - nodeModule.registerProcessor("test", (registry) -> config -> - new TestProcessor("id", "test", doc -> { - doc.setFieldValue("processed", true); - if (doc.hasField("fail") && doc.getFieldValue("fail", Boolean.class)) { - throw new IllegalArgumentException("test processor failed"); - } - }) - ); +public class IngestTestPlugin extends Plugin implements IngestPlugin { + @Override + public Map getProcessors( + Environment env, ScriptService scriptService, TemplateService templateService) { + return Collections.singletonMap("test", (factories, config) -> + new TestProcessor("id", "test", doc -> { + doc.setFieldValue("processed", true); + if (doc.hasField("fail") && doc.getFieldValue("fail", Boolean.class)) { + throw new IllegalArgumentException("test processor failed"); + } + })); } } diff --git a/test/framework/src/main/java/org/elasticsearch/ingest/TestProcessor.java b/test/framework/src/main/java/org/elasticsearch/ingest/TestProcessor.java index 48381b87142..51138985410 100644 --- a/test/framework/src/main/java/org/elasticsearch/ingest/TestProcessor.java +++ b/test/framework/src/main/java/org/elasticsearch/ingest/TestProcessor.java @@ -64,9 +64,9 @@ public class TestProcessor implements Processor { return invokedCounter.get(); } - public static final class Factory extends AbstractProcessorFactory { + public static final class Factory extends AbstractProcessorFactory { @Override - public TestProcessor doCreate(ProcessorsRegistry registry, String processorTag, Map config) throws Exception { + public TestProcessor doCreate(Map registry, String processorTag, Map config) throws Exception { return new TestProcessor(processorTag, "test-processor", ingestDocument -> {}); } }