remove BiFunction<Environment, TemplateService, Processor.Factory> in favour of Function<TemplateService, Processor.Factory>

the environment is now available through NodeModule#getNode#getEnvironment and can be retrieved during onModule(NodeModule), no need for this indirection anymore using the BiFunction
This commit is contained in:
javanna 2016-01-15 15:12:53 +01:00 committed by Luca Cavanna
parent dd7cae7c19
commit 050585e89f
10 changed files with 42 additions and 55 deletions

View File

@ -21,7 +21,6 @@ package org.elasticsearch.ingest;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -34,14 +33,11 @@ import java.io.IOException;
*/ */
public class IngestService implements Closeable { public class IngestService implements Closeable {
private final Environment environment;
private final PipelineStore pipelineStore; private final PipelineStore pipelineStore;
private final PipelineExecutionService pipelineExecutionService; private final PipelineExecutionService pipelineExecutionService;
private final ProcessorsRegistry processorsRegistry; private final ProcessorsRegistry processorsRegistry;
public IngestService(Settings settings, ThreadPool threadPool, Environment environment, public IngestService(Settings settings, ThreadPool threadPool, ClusterService clusterService, ProcessorsRegistry processorsRegistry) {
ClusterService clusterService, ProcessorsRegistry processorsRegistry) {
this.environment = environment;
this.processorsRegistry = processorsRegistry; this.processorsRegistry = processorsRegistry;
this.pipelineStore = new PipelineStore(settings, clusterService); this.pipelineStore = new PipelineStore(settings, clusterService);
this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool); this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool);
@ -56,7 +52,7 @@ public class IngestService implements Closeable {
} }
public void setScriptService(ScriptService scriptService) { public void setScriptService(ScriptService scriptService) {
pipelineStore.buildProcessorFactoryRegistry(processorsRegistry, environment, scriptService); pipelineStore.buildProcessorFactoryRegistry(processorsRegistry, scriptService);
} }
@Override @Override

View File

@ -21,6 +21,8 @@ package org.elasticsearch.ingest;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.WritePipelineResponse; import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
@ -32,9 +34,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.env.Environment;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.ingest.core.Pipeline; import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService; import org.elasticsearch.ingest.core.TemplateService;
@ -47,7 +46,7 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.BiFunction; import java.util.function.Function;
public class PipelineStore extends AbstractComponent implements Closeable, ClusterStateListener { public class PipelineStore extends AbstractComponent implements Closeable, ClusterStateListener {
@ -67,11 +66,11 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
clusterService.add(this); clusterService.add(this);
} }
public void buildProcessorFactoryRegistry(ProcessorsRegistry processorsRegistry, Environment environment, ScriptService scriptService) { public void buildProcessorFactoryRegistry(ProcessorsRegistry processorsRegistry, ScriptService scriptService) {
Map<String, Processor.Factory> processorFactories = new HashMap<>(); Map<String, Processor.Factory> processorFactories = new HashMap<>();
TemplateService templateService = new InternalTemplateService(scriptService); TemplateService templateService = new InternalTemplateService(scriptService);
for (Map.Entry<String, BiFunction<Environment, TemplateService, Processor.Factory<?>>> entry : processorsRegistry.entrySet()) { for (Map.Entry<String, Function<TemplateService, Processor.Factory<?>>> entry : processorsRegistry.entrySet()) {
Processor.Factory processorFactory = entry.getValue().apply(environment, templateService); Processor.Factory processorFactory = entry.getValue().apply(templateService);
processorFactories.put(entry.getKey(), processorFactory); processorFactories.put(entry.getKey(), processorFactory);
} }
this.processorFactoryRegistry = Collections.unmodifiableMap(processorFactories); this.processorFactoryRegistry = Collections.unmodifiableMap(processorFactories);

View File

@ -19,30 +19,29 @@
package org.elasticsearch.ingest; package org.elasticsearch.ingest;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService; import org.elasticsearch.ingest.core.TemplateService;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.BiFunction; import java.util.function.Function;
public class ProcessorsRegistry { public class ProcessorsRegistry {
private final Map<String, BiFunction<Environment, TemplateService, Processor.Factory<?>>> processorFactoryProviders = new HashMap<>(); private final Map<String, Function<TemplateService, Processor.Factory<?>>> processorFactoryProviders = new HashMap<>();
/** /**
* Adds a processor factory under a specific name. * Adds a processor factory under a specific name.
*/ */
public void registerProcessor(String name, BiFunction<Environment, TemplateService, Processor.Factory<?>> processorFactoryProvider) { public void registerProcessor(String name, Function<TemplateService, Processor.Factory<?>> processorFactoryProvider) {
BiFunction<Environment, TemplateService, Processor.Factory<?>> provider = processorFactoryProviders.putIfAbsent(name, processorFactoryProvider); Function<TemplateService, Processor.Factory<?>> provider = processorFactoryProviders.putIfAbsent(name, processorFactoryProvider);
if (provider != null) { if (provider != null) {
throw new IllegalArgumentException("Processor factory already registered for name [" + name + "]"); throw new IllegalArgumentException("Processor factory already registered for name [" + name + "]");
} }
} }
public Set<Map.Entry<String, BiFunction<Environment, TemplateService, Processor.Factory<?>>>> entrySet() { public Set<Map.Entry<String, Function<TemplateService, Processor.Factory<?>>>> entrySet() {
return processorFactoryProviders.entrySet(); return processorFactoryProviders.entrySet();
} }
} }

View File

@ -25,7 +25,6 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.ProcessorsRegistry; import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService; import org.elasticsearch.ingest.core.TemplateService;
@ -45,7 +44,7 @@ import org.elasticsearch.ingest.processor.UppercaseProcessor;
import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.node.service.NodeService; import org.elasticsearch.node.service.NodeService;
import java.util.function.BiFunction; import java.util.function.Function;
/** /**
* *
@ -65,19 +64,19 @@ public class NodeModule extends AbstractModule {
this.monitorService = monitorService; this.monitorService = monitorService;
this.processorsRegistry = new ProcessorsRegistry(); this.processorsRegistry = new ProcessorsRegistry();
registerProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory()); registerProcessor(DateProcessor.TYPE, (templateService) -> new DateProcessor.Factory());
registerProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService)); registerProcessor(SetProcessor.TYPE, SetProcessor.Factory::new);
registerProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService)); registerProcessor(AppendProcessor.TYPE, AppendProcessor.Factory::new);
registerProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory()); registerProcessor(RenameProcessor.TYPE, (templateService) -> new RenameProcessor.Factory());
registerProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService)); registerProcessor(RemoveProcessor.TYPE, RemoveProcessor.Factory::new);
registerProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory()); registerProcessor(SplitProcessor.TYPE, (templateService) -> new SplitProcessor.Factory());
registerProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory()); registerProcessor(JoinProcessor.TYPE, (templateService) -> new JoinProcessor.Factory());
registerProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory()); registerProcessor(UppercaseProcessor.TYPE, (templateService) -> new UppercaseProcessor.Factory());
registerProcessor(LowercaseProcessor.TYPE, (environment, templateService) -> new LowercaseProcessor.Factory()); registerProcessor(LowercaseProcessor.TYPE, (templateService) -> new LowercaseProcessor.Factory());
registerProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory()); registerProcessor(TrimProcessor.TYPE, (templateService) -> new TrimProcessor.Factory());
registerProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory()); registerProcessor(ConvertProcessor.TYPE, (templateService) -> new ConvertProcessor.Factory());
registerProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory()); registerProcessor(GsubProcessor.TYPE, (templateService) -> new GsubProcessor.Factory());
registerProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService)); registerProcessor(FailProcessor.TYPE, FailProcessor.Factory::new);
} }
@Override @Override
@ -109,7 +108,7 @@ public class NodeModule extends AbstractModule {
/** /**
* Adds a processor factory under a specific type name. * Adds a processor factory under a specific type name.
*/ */
public void registerProcessor(String type, BiFunction<Environment, TemplateService, Processor.Factory<?>> processorFactoryProvider) { public void registerProcessor(String type, Function<TemplateService, Processor.Factory<?>> processorFactoryProvider) {
processorsRegistry.registerProcessor(type, processorFactoryProvider); processorsRegistry.registerProcessor(type, processorFactoryProvider);
} }

View File

@ -35,8 +35,6 @@ import org.elasticsearch.http.HttpServer;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.ProcessorsRegistry; import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.PluginsService;
@ -44,7 +42,6 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -89,7 +86,7 @@ public class NodeService extends AbstractComponent {
this.version = version; this.version = version;
this.pluginService = pluginService; this.pluginService = pluginService;
this.circuitBreakerService = circuitBreakerService; this.circuitBreakerService = circuitBreakerService;
this.ingestService = new IngestService(settings, threadPool, environment, clusterService, processorsRegistry); this.ingestService = new IngestService(settings, threadPool, clusterService, processorsRegistry);
} }
// can not use constructor injection or there will be a circular dependency // can not use constructor injection or there will be a circular dependency

View File

@ -226,7 +226,7 @@ public class IngestClientIT extends ESIntegTestCase {
} }
public void onModule(NodeModule nodeModule) { public void onModule(NodeModule nodeModule) {
nodeModule.registerProcessor("test", (environment, templateService) -> config -> nodeModule.registerProcessor("test", (templateService) -> config ->
new TestProcessor("test", ingestDocument -> { new TestProcessor("test", ingestDocument -> {
ingestDocument.setFieldValue("processed", true); ingestDocument.setFieldValue("processed", true);
if (ingestDocument.getFieldValue("fail", Boolean.class)) { if (ingestDocument.getFieldValue("fail", Boolean.class)) {

View File

@ -41,8 +41,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
public class PipelineStoreTests extends ESTestCase { public class PipelineStoreTests extends ESTestCase {
@ -54,8 +52,8 @@ public class PipelineStoreTests extends ESTestCase {
ClusterService clusterService = mock(ClusterService.class); ClusterService clusterService = mock(ClusterService.class);
store = new PipelineStore(Settings.EMPTY, clusterService); store = new PipelineStore(Settings.EMPTY, clusterService);
ProcessorsRegistry registry = new ProcessorsRegistry(); ProcessorsRegistry registry = new ProcessorsRegistry();
registry.registerProcessor("set", (environment, templateService) -> new SetProcessor.Factory(TestTemplateService.instance())); registry.registerProcessor("set", (templateService) -> new SetProcessor.Factory(TestTemplateService.instance()));
store.buildProcessorFactoryRegistry(registry, null, null); store.buildProcessorFactoryRegistry(registry, null);
} }
public void testUpdatePipelines() { public void testUpdatePipelines() {

View File

@ -19,14 +19,13 @@
package org.elasticsearch.ingest; package org.elasticsearch.ingest;
import org.elasticsearch.env.Environment;
import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService; import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.BiFunction; import java.util.function.Function;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
@ -35,24 +34,24 @@ public class ProcessorsRegistryTests extends ESTestCase {
public void testAddProcessor() { public void testAddProcessor() {
ProcessorsRegistry processorsRegistry = new ProcessorsRegistry(); ProcessorsRegistry processorsRegistry = new ProcessorsRegistry();
TestProcessor.Factory factory1 = new TestProcessor.Factory(); TestProcessor.Factory factory1 = new TestProcessor.Factory();
processorsRegistry.registerProcessor("1", (environment, templateService) -> factory1); processorsRegistry.registerProcessor("1", (templateService) -> factory1);
TestProcessor.Factory factory2 = new TestProcessor.Factory(); TestProcessor.Factory factory2 = new TestProcessor.Factory();
processorsRegistry.registerProcessor("2", (environment, templateService) -> factory2); processorsRegistry.registerProcessor("2", (templateService) -> factory2);
TestProcessor.Factory factory3 = new TestProcessor.Factory(); TestProcessor.Factory factory3 = new TestProcessor.Factory();
try { try {
processorsRegistry.registerProcessor("1", (environment, templateService) -> factory3); processorsRegistry.registerProcessor("1", (templateService) -> factory3);
fail("addProcessor should have failed"); fail("addProcessor should have failed");
} catch(IllegalArgumentException e) { } catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("Processor factory already registered for name [1]")); assertThat(e.getMessage(), equalTo("Processor factory already registered for name [1]"));
} }
Set<Map.Entry<String, BiFunction<Environment, TemplateService, Processor.Factory<?>>>> entrySet = processorsRegistry.entrySet(); Set<Map.Entry<String, Function<TemplateService, Processor.Factory<?>>>> entrySet = processorsRegistry.entrySet();
assertThat(entrySet.size(), equalTo(2)); assertThat(entrySet.size(), equalTo(2));
for (Map.Entry<String, BiFunction<Environment, TemplateService, Processor.Factory<?>>> entry : entrySet) { for (Map.Entry<String, Function<TemplateService, Processor.Factory<?>>> entry : entrySet) {
if (entry.getKey().equals("1")) { if (entry.getKey().equals("1")) {
assertThat(entry.getValue().apply(null, null), equalTo(factory1)); assertThat(entry.getValue().apply(null), equalTo(factory1));
} else if (entry.getKey().equals("2")) { } else if (entry.getKey().equals("2")) {
assertThat(entry.getValue().apply(null, null), equalTo(factory2)); assertThat(entry.getValue().apply(null), equalTo(factory2));
} else { } else {
fail("unexpected processor id [" + entry.getKey() + "]"); fail("unexpected processor id [" + entry.getKey() + "]");
} }

View File

@ -56,7 +56,7 @@ public class IngestGrokPlugin extends Plugin {
} }
public void onModule(NodeModule nodeModule) { public void onModule(NodeModule nodeModule) {
nodeModule.registerProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(builtinPatterns)); nodeModule.registerProcessor(GrokProcessor.TYPE, (templateService) -> new GrokProcessor.Factory(builtinPatterns));
} }
static Map<String, String> loadBuiltinPatterns() throws IOException { static Map<String, String> loadBuiltinPatterns() throws IOException {

View File

@ -50,7 +50,7 @@ public class IngestGeoIpPlugin extends Plugin {
public void onModule(NodeModule nodeModule) throws IOException { public void onModule(NodeModule nodeModule) throws IOException {
Path geoIpConfigDirectory = nodeModule.getNode().getEnvironment().configFile().resolve("ingest-geoip"); Path geoIpConfigDirectory = nodeModule.getNode().getEnvironment().configFile().resolve("ingest-geoip");
Map<String, DatabaseReader> databaseReaders = loadDatabaseReaders(geoIpConfigDirectory); Map<String, DatabaseReader> databaseReaders = loadDatabaseReaders(geoIpConfigDirectory);
nodeModule.registerProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(databaseReaders)); nodeModule.registerProcessor(GeoIpProcessor.TYPE, (templateService) -> new GeoIpProcessor.Factory(databaseReaders));
} }
static Map<String, DatabaseReader> loadDatabaseReaders(Path geoIpConfigDirectory) throws IOException { static Map<String, DatabaseReader> loadDatabaseReaders(Path geoIpConfigDirectory) throws IOException {