diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java
index 1795d2f5c70..d9c788ba21e 100644
--- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java
+++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java
@@ -46,7 +46,7 @@ public interface Processor {
/**
* A factory that knows how to construct a processor based on a map of maps.
*/
- interface Factory
extends Closeable {
+ interface Factory
{
/**
* Creates a processor based on the specified map of maps config.
@@ -56,14 +56,5 @@ public interface Processor {
*/
P create(Map config) throws Exception;
- /**
- * Sets the configuration directory when needed to read additional config files
- */
- default void setConfigDirectory(Path configDirectory) {
- }
-
- @Override
- default void close() throws IOException {
- }
}
}
diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java
index bbdc1154f13..6fd70cf7828 100644
--- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java
+++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java
@@ -29,6 +29,7 @@ import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.processor.Processor;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
@@ -211,15 +212,19 @@ public final class GeoIpProcessor implements Processor {
return geoData;
}
- public static class Factory implements Processor.Factory {
+ public static class Factory implements Processor.Factory, Closeable {
static final Set DEFAULT_FIELDS = EnumSet.of(
Field.CONTINENT_NAME, Field.COUNTRY_ISO_CODE, Field.REGION_NAME, Field.CITY_NAME, Field.LOCATION
);
- private Path geoIpConfigDirectory;
+ private final Path geoIpConfigDirectory;
private final DatabaseReaderService databaseReaderService = new DatabaseReaderService();
+ public Factory(Path configDirectory) {
+ this.geoIpConfigDirectory = configDirectory.resolve("ingest").resolve("geoip");
+ }
+
public GeoIpProcessor create(Map config) throws Exception {
String ipField = readStringProperty(config, "source_field");
String targetField = readStringProperty(config, "target_field", "geoip");
@@ -250,11 +255,6 @@ public final class GeoIpProcessor implements Processor {
}
}
- @Override
- public void setConfigDirectory(Path configDirectory) {
- geoIpConfigDirectory = configDirectory.resolve("ingest").resolve("geoip");
- }
-
@Override
public void close() throws IOException {
databaseReaderService.close();
diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java
index f78db7e2d8e..320303d2cfa 100644
--- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java
+++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/grok/GrokProcessor.java
@@ -69,7 +69,12 @@ public final class GrokProcessor implements Processor {
}
public static class Factory implements Processor.Factory {
- private Path grokConfigDirectory;
+
+ private final Path grokConfigDirectory;
+
+ public Factory(Path configDirectory) {
+ this.grokConfigDirectory = configDirectory.resolve("ingest").resolve("grok");
+ }
public GrokProcessor create(Map config) throws Exception {
String matchField = ConfigurationUtils.readStringProperty(config, "field");
@@ -90,10 +95,6 @@ public final class GrokProcessor implements Processor {
return new GrokProcessor(grok, matchField);
}
- @Override
- public void setConfigDirectory(Path configDirectory) {
- this.grokConfigDirectory = configDirectory.resolve("ingest").resolve("grok");
- }
}
}
diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java
index 898af1a0f3e..5c6961b8670 100644
--- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java
+++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java
@@ -21,7 +21,6 @@ package org.elasticsearch.plugin.ingest;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
-import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.processor.set.SetProcessor;
import org.elasticsearch.ingest.processor.convert.ConvertProcessor;
import org.elasticsearch.ingest.processor.date.DateProcessor;
@@ -42,9 +41,11 @@ import org.elasticsearch.plugin.ingest.transport.simulate.SimulateExecutionServi
import java.util.HashMap;
import java.util.Map;
+import static org.elasticsearch.plugin.ingest.PipelineStore.ProcessorFactoryProvider;
+
public class IngestModule extends AbstractModule {
- private final Map processors = new HashMap<>();
+ private final Map processorFactoryProviders = new HashMap<>();
@Override
protected void configure() {
@@ -53,23 +54,23 @@ public class IngestModule extends AbstractModule {
binder().bind(PipelineStore.class).asEagerSingleton();
binder().bind(SimulateExecutionService.class).asEagerSingleton();
- addProcessor(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory());
- addProcessor(GrokProcessor.TYPE, new GrokProcessor.Factory());
- addProcessor(DateProcessor.TYPE, new DateProcessor.Factory());
- addProcessor(SetProcessor.TYPE, new SetProcessor.Factory());
- addProcessor(RenameProcessor.TYPE, new RenameProcessor.Factory());
- addProcessor(RemoveProcessor.TYPE, new RemoveProcessor.Factory());
- addProcessor(SplitProcessor.TYPE, new SplitProcessor.Factory());
- addProcessor(JoinProcessor.TYPE, new JoinProcessor.Factory());
- addProcessor(UppercaseProcessor.TYPE, new UppercaseProcessor.Factory());
- addProcessor(LowercaseProcessor.TYPE, new LowercaseProcessor.Factory());
- addProcessor(TrimProcessor.TYPE, new TrimProcessor.Factory());
- addProcessor(ConvertProcessor.TYPE, new ConvertProcessor.Factory());
- addProcessor(GsubProcessor.TYPE, new GsubProcessor.Factory());
- addProcessor(MetaDataProcessor.TYPE, new MetaDataProcessor.Factory());
+ addProcessor(GeoIpProcessor.TYPE, environment -> new GeoIpProcessor.Factory(environment.configFile()));
+ addProcessor(GrokProcessor.TYPE, environment -> new GrokProcessor.Factory(environment.configFile()));
+ addProcessor(DateProcessor.TYPE, environment -> new DateProcessor.Factory());
+ addProcessor(SetProcessor.TYPE, environment -> new SetProcessor.Factory());
+ addProcessor(RenameProcessor.TYPE, environment -> new RenameProcessor.Factory());
+ addProcessor(RemoveProcessor.TYPE, environment -> new RemoveProcessor.Factory());
+ addProcessor(SplitProcessor.TYPE, environment -> new SplitProcessor.Factory());
+ addProcessor(JoinProcessor.TYPE, environment -> new JoinProcessor.Factory());
+ addProcessor(UppercaseProcessor.TYPE, environment -> new UppercaseProcessor.Factory());
+ addProcessor(LowercaseProcessor.TYPE, environment -> new LowercaseProcessor.Factory());
+ addProcessor(TrimProcessor.TYPE, environment -> new TrimProcessor.Factory());
+ addProcessor(ConvertProcessor.TYPE, environment -> new ConvertProcessor.Factory());
+ addProcessor(GsubProcessor.TYPE, environment -> new GsubProcessor.Factory());
+ addProcessor(MetaDataProcessor.TYPE, environment -> new MetaDataProcessor.Factory());
- MapBinder mapBinder = MapBinder.newMapBinder(binder(), String.class, Processor.Factory.class);
- for (Map.Entry entry : processors.entrySet()) {
+ MapBinder mapBinder = MapBinder.newMapBinder(binder(), String.class, ProcessorFactoryProvider.class);
+ for (Map.Entry entry : processorFactoryProviders.entrySet()) {
mapBinder.addBinding(entry.getKey()).toInstance(entry.getValue());
}
}
@@ -77,8 +78,8 @@ public class IngestModule extends AbstractModule {
/**
* Adds a processor factory under a specific type name.
*/
- public void addProcessor(String type, Processor.Factory factory) {
- processors.put(type, factory);
+ public void addProcessor(String type, ProcessorFactoryProvider processorFactoryProvider) {
+ processorFactoryProviders.put(type, processorFactoryProvider);
}
}
diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java
index 133b40b9cc7..b3e30b51ff9 100644
--- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java
+++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java
@@ -36,6 +36,8 @@ import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.SearchScrollIterator;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Provider;
@@ -54,10 +56,11 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.threadpool.ThreadPool;
+import java.io.Closeable;
import java.io.IOException;
import java.util.*;
-public class PipelineStore extends AbstractComponent {
+public class PipelineStore extends AbstractLifecycleComponent {
public final static String INDEX = ".ingest";
public final static String TYPE = "pipeline";
@@ -74,29 +77,44 @@ public class PipelineStore extends AbstractComponent {
private volatile Map pipelines = new HashMap<>();
@Inject
- public PipelineStore(Settings settings, Provider clientProvider, ThreadPool threadPool, Environment environment, ClusterService clusterService, Map processors) {
+ public PipelineStore(Settings settings, Provider clientProvider, ThreadPool threadPool, Environment environment, ClusterService clusterService, Map processorFactoryProviders) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.clientProvider = clientProvider;
this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30));
this.pipelineUpdateInterval = settings.getAsTime("ingest.pipeline.store.update.interval", TimeValue.timeValueSeconds(1));
- for (Processor.Factory factory : processors.values()) {
- factory.setConfigDirectory(environment.configFile());
+ Map processorFactories = new HashMap<>();
+ for (Map.Entry entry : processorFactoryProviders.entrySet()) {
+ Processor.Factory processorFactory = entry.getValue().get(environment);
+ processorFactories.put(entry.getKey(), processorFactory);
}
- this.processorFactoryRegistry = Collections.unmodifiableMap(processors);
+ this.processorFactoryRegistry = Collections.unmodifiableMap(processorFactories);
clusterService.add(new PipelineStoreListener());
- clusterService.addLifecycleListener(new LifecycleListener() {
- @Override
- public void beforeClose() {
- // Ideally we would implement Closeable, but when a node is stopped this doesn't get invoked:
- try {
- IOUtils.close(processorFactoryRegistry.values());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ }
+
+ @Override
+ protected void doStart() {
+ }
+
+ @Override
+ protected void doStop() {
+ }
+
+ @Override
+ protected void doClose() {
+ // TODO: When org.elasticsearch.node.Node can close Closable instances we should remove this code
+ List closeables = new ArrayList<>();
+ for (Processor.Factory factory : processorFactoryRegistry.values()) {
+ if (factory instanceof Closeable) {
+ closeables.add((Closeable) factory);
}
- });
+ }
+ try {
+ IOUtils.close(closeables);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
/**
@@ -239,6 +257,19 @@ public class PipelineStore extends AbstractComponent {
return client;
}
+ /**
+ * The ingest framework (pipeline, processor and processor factory) can't rely on ES specific code. However some
+ * processors rely on reading files from the config directory. We can't add Environment as a constructor parameter,
+ * so we need some code that provides the physical location of the configuration directory to the processor factories
+ * that need this and this is what this processor factory provider does.
+ */
+ @FunctionalInterface
+ interface ProcessorFactoryProvider {
+
+ Processor.Factory get(Environment environment);
+
+ }
+
class Updater implements Runnable {
@Override
diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessorFactoryTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessorFactoryTests.java
index 010b0ede5c1..d4feeff886e 100644
--- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessorFactoryTests.java
+++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessorFactoryTests.java
@@ -24,7 +24,6 @@ import org.elasticsearch.test.StreamsUtils;
import org.junit.Before;
import java.io.ByteArrayInputStream;
-import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
@@ -47,8 +46,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
}
public void testBuild_defaults() throws Exception {
- GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
- factory.setConfigDirectory(configDir);
+ GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
Map config = new HashMap<>();
config.put("source_field", "_field");
@@ -61,8 +59,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
}
public void testBuild_targetField() throws Exception {
- GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
- factory.setConfigDirectory(configDir);
+ GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
Map config = new HashMap<>();
config.put("source_field", "_field");
config.put("target_field", "_field");
@@ -72,8 +69,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
}
public void testBuild_dbFile() throws Exception {
- GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
- factory.setConfigDirectory(configDir);
+ GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
Map config = new HashMap<>();
config.put("source_field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb");
@@ -84,8 +80,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
}
public void testBuild_nonExistingDbFile() throws Exception {
- GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
- factory.setConfigDirectory(configDir);
+ GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
Map config = new HashMap<>();
config.put("source_field", "_field");
@@ -98,8 +93,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
}
public void testBuild_fields() throws Exception {
- GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
- factory.setConfigDirectory(configDir);
+ GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
Set fields = EnumSet.noneOf(GeoIpProcessor.Field.class);
List fieldNames = new ArrayList<>();
@@ -118,8 +112,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
}
public void testBuild_illegalFieldOption() throws Exception {
- GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
- factory.setConfigDirectory(configDir);
+ GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
Map config = new HashMap<>();
config.put("source_field", "_field");
diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/grok/GrokProcessorFactoryTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/grok/GrokProcessorFactoryTests.java
index 9291c1bb04a..39430fe24bc 100644
--- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/grok/GrokProcessorFactoryTests.java
+++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/grok/GrokProcessorFactoryTests.java
@@ -43,8 +43,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
}
public void testBuild() throws Exception {
- GrokProcessor.Factory factory = new GrokProcessor.Factory();
- factory.setConfigDirectory(configDir);
+ GrokProcessor.Factory factory = new GrokProcessor.Factory(configDir);
Map config = new HashMap<>();
config.put("field", "_field");