From 050585e89f8a510dcd9bf2749918d84297c839c3 Mon Sep 17 00:00:00 2001 From: javanna Date: Fri, 15 Jan 2016 15:12:53 +0100 Subject: [PATCH] remove BiFunction in favour of Function the environment is now available through NodeModule#getNode#getEnvironment and can be retrieved during onModule(NodeModule), no need for this indirection anymore using the BiFunction --- .../elasticsearch/ingest/IngestService.java | 8 ++--- .../elasticsearch/ingest/PipelineStore.java | 13 ++++---- .../ingest/ProcessorsRegistry.java | 11 +++---- .../org/elasticsearch/node/NodeModule.java | 31 +++++++++---------- .../node/service/NodeService.java | 5 +-- .../elasticsearch/ingest/IngestClientIT.java | 2 +- .../ingest/PipelineStoreTests.java | 6 ++-- .../ingest/ProcessorsRegistryTests.java | 17 +++++----- .../ingest/grok/IngestGrokPlugin.java | 2 +- .../ingest/geoip/IngestGeoIpPlugin.java | 2 +- 10 files changed, 42 insertions(+), 55 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/ingest/IngestService.java b/core/src/main/java/org/elasticsearch/ingest/IngestService.java index da26903a056..40bc803f08a 100644 --- a/core/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/core/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -21,7 +21,6 @@ package org.elasticsearch.ingest; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.env.Environment; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; @@ -34,14 +33,11 @@ import java.io.IOException; */ public class IngestService implements Closeable { - private final Environment environment; private final PipelineStore pipelineStore; private final PipelineExecutionService pipelineExecutionService; private final ProcessorsRegistry processorsRegistry; - public IngestService(Settings settings, ThreadPool threadPool, Environment environment, - ClusterService clusterService, ProcessorsRegistry processorsRegistry) { - this.environment = environment; + public IngestService(Settings settings, ThreadPool threadPool, ClusterService clusterService, ProcessorsRegistry processorsRegistry) { this.processorsRegistry = processorsRegistry; this.pipelineStore = new PipelineStore(settings, clusterService); this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool); @@ -56,7 +52,7 @@ public class IngestService implements Closeable { } public void setScriptService(ScriptService scriptService) { - pipelineStore.buildProcessorFactoryRegistry(processorsRegistry, environment, scriptService); + pipelineStore.buildProcessorFactoryRegistry(processorsRegistry, scriptService); } @Override diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java b/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java index ab11f99246e..6db1d6c0681 100644 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java @@ -21,6 +21,8 @@ package org.elasticsearch.ingest; import org.apache.lucene.util.IOUtils; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.ingest.WritePipelineResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -32,9 +34,6 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.env.Environment; -import org.elasticsearch.action.ingest.DeletePipelineRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.ingest.core.Pipeline; import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.TemplateService; @@ -47,7 +46,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.BiFunction; +import java.util.function.Function; public class PipelineStore extends AbstractComponent implements Closeable, ClusterStateListener { @@ -67,11 +66,11 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust clusterService.add(this); } - public void buildProcessorFactoryRegistry(ProcessorsRegistry processorsRegistry, Environment environment, ScriptService scriptService) { + public void buildProcessorFactoryRegistry(ProcessorsRegistry processorsRegistry, ScriptService scriptService) { Map processorFactories = new HashMap<>(); TemplateService templateService = new InternalTemplateService(scriptService); - for (Map.Entry>> entry : processorsRegistry.entrySet()) { - Processor.Factory processorFactory = entry.getValue().apply(environment, templateService); + for (Map.Entry>> entry : processorsRegistry.entrySet()) { + Processor.Factory processorFactory = entry.getValue().apply(templateService); processorFactories.put(entry.getKey(), processorFactory); } this.processorFactoryRegistry = Collections.unmodifiableMap(processorFactories); diff --git a/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java b/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java index 3561d8079c9..766ba772932 100644 --- a/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java +++ b/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java @@ -19,30 +19,29 @@ package org.elasticsearch.ingest; -import org.elasticsearch.env.Environment; import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.TemplateService; import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.function.BiFunction; +import java.util.function.Function; public class ProcessorsRegistry { - private final Map>> processorFactoryProviders = new HashMap<>(); + private final Map>> processorFactoryProviders = new HashMap<>(); /** * Adds a processor factory under a specific name. */ - public void registerProcessor(String name, BiFunction> processorFactoryProvider) { - BiFunction> provider = processorFactoryProviders.putIfAbsent(name, processorFactoryProvider); + public void registerProcessor(String name, Function> processorFactoryProvider) { + Function> provider = processorFactoryProviders.putIfAbsent(name, processorFactoryProvider); if (provider != null) { throw new IllegalArgumentException("Processor factory already registered for name [" + name + "]"); } } - public Set>>> entrySet() { + public Set>>> entrySet() { return processorFactoryProviders.entrySet(); } } diff --git a/core/src/main/java/org/elasticsearch/node/NodeModule.java b/core/src/main/java/org/elasticsearch/node/NodeModule.java index 8622b14e925..8ef26296fe2 100644 --- a/core/src/main/java/org/elasticsearch/node/NodeModule.java +++ b/core/src/main/java/org/elasticsearch/node/NodeModule.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.env.Environment; import org.elasticsearch.ingest.ProcessorsRegistry; import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.TemplateService; @@ -45,7 +44,7 @@ import org.elasticsearch.ingest.processor.UppercaseProcessor; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.node.service.NodeService; -import java.util.function.BiFunction; +import java.util.function.Function; /** * @@ -65,19 +64,19 @@ public class NodeModule extends AbstractModule { this.monitorService = monitorService; this.processorsRegistry = new ProcessorsRegistry(); - registerProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory()); - registerProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService)); - registerProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService)); - registerProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory()); - registerProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService)); - registerProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory()); - registerProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory()); - registerProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory()); - registerProcessor(LowercaseProcessor.TYPE, (environment, templateService) -> new LowercaseProcessor.Factory()); - registerProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory()); - registerProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory()); - registerProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory()); - registerProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService)); + registerProcessor(DateProcessor.TYPE, (templateService) -> new DateProcessor.Factory()); + registerProcessor(SetProcessor.TYPE, SetProcessor.Factory::new); + registerProcessor(AppendProcessor.TYPE, AppendProcessor.Factory::new); + registerProcessor(RenameProcessor.TYPE, (templateService) -> new RenameProcessor.Factory()); + registerProcessor(RemoveProcessor.TYPE, RemoveProcessor.Factory::new); + registerProcessor(SplitProcessor.TYPE, (templateService) -> new SplitProcessor.Factory()); + registerProcessor(JoinProcessor.TYPE, (templateService) -> new JoinProcessor.Factory()); + registerProcessor(UppercaseProcessor.TYPE, (templateService) -> new UppercaseProcessor.Factory()); + registerProcessor(LowercaseProcessor.TYPE, (templateService) -> new LowercaseProcessor.Factory()); + registerProcessor(TrimProcessor.TYPE, (templateService) -> new TrimProcessor.Factory()); + registerProcessor(ConvertProcessor.TYPE, (templateService) -> new ConvertProcessor.Factory()); + registerProcessor(GsubProcessor.TYPE, (templateService) -> new GsubProcessor.Factory()); + registerProcessor(FailProcessor.TYPE, FailProcessor.Factory::new); } @Override @@ -109,7 +108,7 @@ public class NodeModule extends AbstractModule { /** * Adds a processor factory under a specific type name. */ - public void registerProcessor(String type, BiFunction> processorFactoryProvider) { + public void registerProcessor(String type, Function> processorFactoryProvider) { processorsRegistry.registerProcessor(type, processorFactoryProvider); } diff --git a/core/src/main/java/org/elasticsearch/node/service/NodeService.java b/core/src/main/java/org/elasticsearch/node/service/NodeService.java index ecf5cd07e11..15352eeadd2 100644 --- a/core/src/main/java/org/elasticsearch/node/service/NodeService.java +++ b/core/src/main/java/org/elasticsearch/node/service/NodeService.java @@ -35,8 +35,6 @@ import org.elasticsearch.http.HttpServer; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.ingest.PipelineExecutionService; -import org.elasticsearch.ingest.PipelineStore; import org.elasticsearch.ingest.ProcessorsRegistry; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.plugins.PluginsService; @@ -44,7 +42,6 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.io.Closeable; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -89,7 +86,7 @@ public class NodeService extends AbstractComponent { this.version = version; this.pluginService = pluginService; this.circuitBreakerService = circuitBreakerService; - this.ingestService = new IngestService(settings, threadPool, environment, clusterService, processorsRegistry); + this.ingestService = new IngestService(settings, threadPool, clusterService, processorsRegistry); } // can not use constructor injection or there will be a circular dependency diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index fe60c9e04ba..6227b01a30c 100644 --- a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -226,7 +226,7 @@ public class IngestClientIT extends ESIntegTestCase { } public void onModule(NodeModule nodeModule) { - nodeModule.registerProcessor("test", (environment, templateService) -> config -> + nodeModule.registerProcessor("test", (templateService) -> config -> new TestProcessor("test", ingestDocument -> { ingestDocument.setFieldValue("processed", true); if (ingestDocument.getFieldValue("fail", Boolean.class)) { diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java index 19d56858c8f..c1f14b26eb8 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java @@ -41,8 +41,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; public class PipelineStoreTests extends ESTestCase { @@ -54,8 +52,8 @@ public class PipelineStoreTests extends ESTestCase { ClusterService clusterService = mock(ClusterService.class); store = new PipelineStore(Settings.EMPTY, clusterService); ProcessorsRegistry registry = new ProcessorsRegistry(); - registry.registerProcessor("set", (environment, templateService) -> new SetProcessor.Factory(TestTemplateService.instance())); - store.buildProcessorFactoryRegistry(registry, null, null); + registry.registerProcessor("set", (templateService) -> new SetProcessor.Factory(TestTemplateService.instance())); + store.buildProcessorFactoryRegistry(registry, null); } public void testUpdatePipelines() { diff --git a/core/src/test/java/org/elasticsearch/ingest/ProcessorsRegistryTests.java b/core/src/test/java/org/elasticsearch/ingest/ProcessorsRegistryTests.java index 2869fffbafc..ad18488d990 100644 --- a/core/src/test/java/org/elasticsearch/ingest/ProcessorsRegistryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/ProcessorsRegistryTests.java @@ -19,14 +19,13 @@ package org.elasticsearch.ingest; -import org.elasticsearch.env.Environment; import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.TemplateService; import org.elasticsearch.test.ESTestCase; import java.util.Map; import java.util.Set; -import java.util.function.BiFunction; +import java.util.function.Function; import static org.hamcrest.CoreMatchers.equalTo; @@ -35,24 +34,24 @@ public class ProcessorsRegistryTests extends ESTestCase { public void testAddProcessor() { ProcessorsRegistry processorsRegistry = new ProcessorsRegistry(); TestProcessor.Factory factory1 = new TestProcessor.Factory(); - processorsRegistry.registerProcessor("1", (environment, templateService) -> factory1); + processorsRegistry.registerProcessor("1", (templateService) -> factory1); TestProcessor.Factory factory2 = new TestProcessor.Factory(); - processorsRegistry.registerProcessor("2", (environment, templateService) -> factory2); + processorsRegistry.registerProcessor("2", (templateService) -> factory2); TestProcessor.Factory factory3 = new TestProcessor.Factory(); try { - processorsRegistry.registerProcessor("1", (environment, templateService) -> factory3); + processorsRegistry.registerProcessor("1", (templateService) -> factory3); fail("addProcessor should have failed"); } catch(IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("Processor factory already registered for name [1]")); } - Set>>> entrySet = processorsRegistry.entrySet(); + Set>>> entrySet = processorsRegistry.entrySet(); assertThat(entrySet.size(), equalTo(2)); - for (Map.Entry>> entry : entrySet) { + for (Map.Entry>> entry : entrySet) { if (entry.getKey().equals("1")) { - assertThat(entry.getValue().apply(null, null), equalTo(factory1)); + assertThat(entry.getValue().apply(null), equalTo(factory1)); } else if (entry.getKey().equals("2")) { - assertThat(entry.getValue().apply(null, null), equalTo(factory2)); + assertThat(entry.getValue().apply(null), equalTo(factory2)); } else { fail("unexpected processor id [" + entry.getKey() + "]"); } diff --git a/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/IngestGrokPlugin.java b/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/IngestGrokPlugin.java index 9ca5bc24c9a..54800ac1603 100644 --- a/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/IngestGrokPlugin.java +++ b/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/IngestGrokPlugin.java @@ -56,7 +56,7 @@ public class IngestGrokPlugin extends Plugin { } public void onModule(NodeModule nodeModule) { - nodeModule.registerProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(builtinPatterns)); + nodeModule.registerProcessor(GrokProcessor.TYPE, (templateService) -> new GrokProcessor.Factory(builtinPatterns)); } static Map loadBuiltinPatterns() throws IOException { diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index 6fdb8703342..4b67ffcbd71 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -50,7 +50,7 @@ public class IngestGeoIpPlugin extends Plugin { public void onModule(NodeModule nodeModule) throws IOException { Path geoIpConfigDirectory = nodeModule.getNode().getEnvironment().configFile().resolve("ingest-geoip"); Map databaseReaders = loadDatabaseReaders(geoIpConfigDirectory); - nodeModule.registerProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(databaseReaders)); + nodeModule.registerProcessor(GeoIpProcessor.TYPE, (templateService) -> new GeoIpProcessor.Factory(databaseReaders)); } static Map loadDatabaseReaders(Path geoIpConfigDirectory) throws IOException {