From 9dd52ad7d39e2dc04ec9bd34f72cbdcf6bc65b18 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 30 Nov 2015 18:11:23 +0100 Subject: [PATCH] Removed pollution from the Processor.Factory interface. 1) It no longer extends from Closeable. 2) Removed the config directory setter. Implementation that relied on it, now get the location to the config dir via their constructors. --- .../ingest/processor/Processor.java | 11 +--- .../processor/geoip/GeoIpProcessor.java | 14 ++--- .../ingest/processor/grok/GrokProcessor.java | 11 ++-- .../plugin/ingest/IngestModule.java | 41 +++++++------ .../plugin/ingest/PipelineStore.java | 61 ++++++++++++++----- .../geoip/GeoIpProcessorFactoryTests.java | 19 ++---- .../grok/GrokProcessorFactoryTests.java | 3 +- 7 files changed, 88 insertions(+), 72 deletions(-) diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java index 1795d2f5c70..d9c788ba21e 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java @@ -46,7 +46,7 @@ public interface Processor { /** * A factory that knows how to construct a processor based on a map of maps. */ - interface Factory

extends Closeable { + interface Factory

{ /** * Creates a processor based on the specified map of maps config. @@ -56,14 +56,5 @@ public interface Processor { */ P create(Map config) throws Exception; - /** - * Sets the configuration directory when needed to read additional config files - */ - default void setConfigDirectory(Path configDirectory) { - } - - @Override - default void close() throws IOException { - } } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java index bbdc1154f13..6fd70cf7828 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.processor.Processor; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; @@ -211,15 +212,19 @@ public final class GeoIpProcessor implements Processor { return geoData; } - public static class Factory implements Processor.Factory { + public static class Factory implements Processor.Factory, Closeable { static final Set DEFAULT_FIELDS = EnumSet.of( Field.CONTINENT_NAME, Field.COUNTRY_ISO_CODE, Field.REGION_NAME, Field.CITY_NAME, Field.LOCATION ); - private Path geoIpConfigDirectory; + private final Path geoIpConfigDirectory; private final DatabaseReaderService databaseReaderService = new DatabaseReaderService(); + public Factory(Path configDirectory) { + this.geoIpConfigDirectory = configDirectory.resolve("ingest").resolve("geoip"); + } + public GeoIpProcessor create(Map config) throws Exception { String ipField = readStringProperty(config, "source_field"); String targetField = readStringProperty(config, "target_field", "geoip"); @@ -250,11 +255,6 @@ public final class GeoIpProcessor implements Processor { } } - @Override - public void setConfigDirectory(Path configDirectory) { - geoIpConfigDirectory = configDirectory.resolve("ingest").resolve("geoip"); - } - @Override public void close() throws IOException { databaseReaderService.close(); diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java index f78db7e2d8e..320303d2cfa 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java @@ -69,7 +69,12 @@ public final class GrokProcessor implements Processor { } public static class Factory implements Processor.Factory { - private Path grokConfigDirectory; + + private final Path grokConfigDirectory; + + public Factory(Path configDirectory) { + this.grokConfigDirectory = configDirectory.resolve("ingest").resolve("grok"); + } public GrokProcessor create(Map config) throws Exception { String matchField = ConfigurationUtils.readStringProperty(config, "field"); @@ -90,10 +95,6 @@ public final class GrokProcessor implements Processor { return new GrokProcessor(grok, matchField); } - @Override - public void setConfigDirectory(Path configDirectory) { - this.grokConfigDirectory = configDirectory.resolve("ingest").resolve("grok"); - } } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java index 898af1a0f3e..5c6961b8670 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java @@ -21,7 +21,6 @@ package org.elasticsearch.plugin.ingest; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.MapBinder; -import org.elasticsearch.ingest.processor.Processor; import org.elasticsearch.ingest.processor.set.SetProcessor; import org.elasticsearch.ingest.processor.convert.ConvertProcessor; import org.elasticsearch.ingest.processor.date.DateProcessor; @@ -42,9 +41,11 @@ import org.elasticsearch.plugin.ingest.transport.simulate.SimulateExecutionServi import java.util.HashMap; import java.util.Map; +import static org.elasticsearch.plugin.ingest.PipelineStore.ProcessorFactoryProvider; + public class IngestModule extends AbstractModule { - private final Map processors = new HashMap<>(); + private final Map processorFactoryProviders = new HashMap<>(); @Override protected void configure() { @@ -53,23 +54,23 @@ public class IngestModule extends AbstractModule { binder().bind(PipelineStore.class).asEagerSingleton(); binder().bind(SimulateExecutionService.class).asEagerSingleton(); - addProcessor(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory()); - addProcessor(GrokProcessor.TYPE, new GrokProcessor.Factory()); - addProcessor(DateProcessor.TYPE, new DateProcessor.Factory()); - addProcessor(SetProcessor.TYPE, new SetProcessor.Factory()); - addProcessor(RenameProcessor.TYPE, new RenameProcessor.Factory()); - addProcessor(RemoveProcessor.TYPE, new RemoveProcessor.Factory()); - addProcessor(SplitProcessor.TYPE, new SplitProcessor.Factory()); - addProcessor(JoinProcessor.TYPE, new JoinProcessor.Factory()); - addProcessor(UppercaseProcessor.TYPE, new UppercaseProcessor.Factory()); - addProcessor(LowercaseProcessor.TYPE, new LowercaseProcessor.Factory()); - addProcessor(TrimProcessor.TYPE, new TrimProcessor.Factory()); - addProcessor(ConvertProcessor.TYPE, new ConvertProcessor.Factory()); - addProcessor(GsubProcessor.TYPE, new GsubProcessor.Factory()); - addProcessor(MetaDataProcessor.TYPE, new MetaDataProcessor.Factory()); + addProcessor(GeoIpProcessor.TYPE, environment -> new GeoIpProcessor.Factory(environment.configFile())); + addProcessor(GrokProcessor.TYPE, environment -> new GrokProcessor.Factory(environment.configFile())); + addProcessor(DateProcessor.TYPE, environment -> new DateProcessor.Factory()); + addProcessor(SetProcessor.TYPE, environment -> new SetProcessor.Factory()); + addProcessor(RenameProcessor.TYPE, environment -> new RenameProcessor.Factory()); + addProcessor(RemoveProcessor.TYPE, environment -> new RemoveProcessor.Factory()); + addProcessor(SplitProcessor.TYPE, environment -> new SplitProcessor.Factory()); + addProcessor(JoinProcessor.TYPE, environment -> new JoinProcessor.Factory()); + addProcessor(UppercaseProcessor.TYPE, environment -> new UppercaseProcessor.Factory()); + addProcessor(LowercaseProcessor.TYPE, environment -> new LowercaseProcessor.Factory()); + addProcessor(TrimProcessor.TYPE, environment -> new TrimProcessor.Factory()); + addProcessor(ConvertProcessor.TYPE, environment -> new ConvertProcessor.Factory()); + addProcessor(GsubProcessor.TYPE, environment -> new GsubProcessor.Factory()); + addProcessor(MetaDataProcessor.TYPE, environment -> new MetaDataProcessor.Factory()); - MapBinder mapBinder = MapBinder.newMapBinder(binder(), String.class, Processor.Factory.class); - for (Map.Entry entry : processors.entrySet()) { + MapBinder mapBinder = MapBinder.newMapBinder(binder(), String.class, ProcessorFactoryProvider.class); + for (Map.Entry entry : processorFactoryProviders.entrySet()) { mapBinder.addBinding(entry.getKey()).toInstance(entry.getValue()); } } @@ -77,8 +78,8 @@ public class IngestModule extends AbstractModule { /** * Adds a processor factory under a specific type name. */ - public void addProcessor(String type, Processor.Factory factory) { - processors.put(type, factory); + public void addProcessor(String type, ProcessorFactoryProvider processorFactoryProvider) { + processorFactoryProviders.put(type, processorFactoryProvider); } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java index 133b40b9cc7..b3e30b51ff9 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java @@ -36,6 +36,8 @@ import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.common.SearchScrollIterator; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Provider; @@ -54,10 +56,11 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.threadpool.ThreadPool; +import java.io.Closeable; import java.io.IOException; import java.util.*; -public class PipelineStore extends AbstractComponent { +public class PipelineStore extends AbstractLifecycleComponent { public final static String INDEX = ".ingest"; public final static String TYPE = "pipeline"; @@ -74,29 +77,44 @@ public class PipelineStore extends AbstractComponent { private volatile Map pipelines = new HashMap<>(); @Inject - public PipelineStore(Settings settings, Provider clientProvider, ThreadPool threadPool, Environment environment, ClusterService clusterService, Map processors) { + public PipelineStore(Settings settings, Provider clientProvider, ThreadPool threadPool, Environment environment, ClusterService clusterService, Map processorFactoryProviders) { super(settings); this.threadPool = threadPool; this.clusterService = clusterService; this.clientProvider = clientProvider; this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30)); this.pipelineUpdateInterval = settings.getAsTime("ingest.pipeline.store.update.interval", TimeValue.timeValueSeconds(1)); - for (Processor.Factory factory : processors.values()) { - factory.setConfigDirectory(environment.configFile()); + Map processorFactories = new HashMap<>(); + for (Map.Entry entry : processorFactoryProviders.entrySet()) { + Processor.Factory processorFactory = entry.getValue().get(environment); + processorFactories.put(entry.getKey(), processorFactory); } - this.processorFactoryRegistry = Collections.unmodifiableMap(processors); + this.processorFactoryRegistry = Collections.unmodifiableMap(processorFactories); clusterService.add(new PipelineStoreListener()); - clusterService.addLifecycleListener(new LifecycleListener() { - @Override - public void beforeClose() { - // Ideally we would implement Closeable, but when a node is stopped this doesn't get invoked: - try { - IOUtils.close(processorFactoryRegistry.values()); - } catch (IOException e) { - throw new RuntimeException(e); - } + } + + @Override + protected void doStart() { + } + + @Override + protected void doStop() { + } + + @Override + protected void doClose() { + // TODO: When org.elasticsearch.node.Node can close Closable instances we should remove this code + List closeables = new ArrayList<>(); + for (Processor.Factory factory : processorFactoryRegistry.values()) { + if (factory instanceof Closeable) { + closeables.add((Closeable) factory); } - }); + } + try { + IOUtils.close(closeables); + } catch (IOException e) { + throw new RuntimeException(e); + } } /** @@ -239,6 +257,19 @@ public class PipelineStore extends AbstractComponent { return client; } + /** + * The ingest framework (pipeline, processor and processor factory) can't rely on ES specific code. However some + * processors rely on reading files from the config directory. We can't add Environment as a constructor parameter, + * so we need some code that provides the physical location of the configuration directory to the processor factories + * that need this and this is what this processor factory provider does. + */ + @FunctionalInterface + interface ProcessorFactoryProvider { + + Processor.Factory get(Environment environment); + + } + class Updater implements Runnable { @Override diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessorFactoryTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessorFactoryTests.java index 010b0ede5c1..d4feeff886e 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessorFactoryTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessorFactoryTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.test.StreamsUtils; import org.junit.Before; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.*; @@ -47,8 +46,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { } public void testBuild_defaults() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(); - factory.setConfigDirectory(configDir); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir); Map config = new HashMap<>(); config.put("source_field", "_field"); @@ -61,8 +59,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { } public void testBuild_targetField() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(); - factory.setConfigDirectory(configDir); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir); Map config = new HashMap<>(); config.put("source_field", "_field"); config.put("target_field", "_field"); @@ -72,8 +69,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { } public void testBuild_dbFile() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(); - factory.setConfigDirectory(configDir); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir); Map config = new HashMap<>(); config.put("source_field", "_field"); config.put("database_file", "GeoLite2-Country.mmdb"); @@ -84,8 +80,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { } public void testBuild_nonExistingDbFile() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(); - factory.setConfigDirectory(configDir); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir); Map config = new HashMap<>(); config.put("source_field", "_field"); @@ -98,8 +93,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { } public void testBuild_fields() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(); - factory.setConfigDirectory(configDir); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir); Set fields = EnumSet.noneOf(GeoIpProcessor.Field.class); List fieldNames = new ArrayList<>(); @@ -118,8 +112,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { } public void testBuild_illegalFieldOption() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(); - factory.setConfigDirectory(configDir); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir); Map config = new HashMap<>(); config.put("source_field", "_field"); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/grok/GrokProcessorFactoryTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/grok/GrokProcessorFactoryTests.java index 9291c1bb04a..39430fe24bc 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/grok/GrokProcessorFactoryTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/grok/GrokProcessorFactoryTests.java @@ -43,8 +43,7 @@ public class GrokProcessorFactoryTests extends ESTestCase { } public void testBuild() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(); - factory.setConfigDirectory(configDir); + GrokProcessor.Factory factory = new GrokProcessor.Factory(configDir); Map config = new HashMap<>(); config.put("field", "_field");