Removed pollution from the Processor.Factory interface.

1) It no longer extends from Closeable.
2) Removed the config directory setter. Implementation that relied on it, now get the location to the config dir via their constructors.
This commit is contained in:
Martijn van Groningen 2015-11-30 18:11:23 +01:00
parent fa9fcb3b11
commit 9dd52ad7d3
7 changed files with 88 additions and 72 deletions

View File

@ -46,7 +46,7 @@ public interface Processor {
/**
* A factory that knows how to construct a processor based on a map of maps.
*/
interface Factory<P extends Processor> extends Closeable {
interface Factory<P extends Processor> {
/**
* Creates a processor based on the specified map of maps config.
@ -56,14 +56,5 @@ public interface Processor {
*/
P create(Map<String, Object> config) throws Exception;
/**
* Sets the configuration directory when needed to read additional config files
*/
default void setConfigDirectory(Path configDirectory) {
}
@Override
default void close() throws IOException {
}
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.processor.Processor;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
@ -211,15 +212,19 @@ public final class GeoIpProcessor implements Processor {
return geoData;
}
public static class Factory implements Processor.Factory<GeoIpProcessor> {
public static class Factory implements Processor.Factory<GeoIpProcessor>, Closeable {
static final Set<Field> DEFAULT_FIELDS = EnumSet.of(
Field.CONTINENT_NAME, Field.COUNTRY_ISO_CODE, Field.REGION_NAME, Field.CITY_NAME, Field.LOCATION
);
private Path geoIpConfigDirectory;
private final Path geoIpConfigDirectory;
private final DatabaseReaderService databaseReaderService = new DatabaseReaderService();
public Factory(Path configDirectory) {
this.geoIpConfigDirectory = configDirectory.resolve("ingest").resolve("geoip");
}
public GeoIpProcessor create(Map<String, Object> config) throws Exception {
String ipField = readStringProperty(config, "source_field");
String targetField = readStringProperty(config, "target_field", "geoip");
@ -250,11 +255,6 @@ public final class GeoIpProcessor implements Processor {
}
}
@Override
public void setConfigDirectory(Path configDirectory) {
geoIpConfigDirectory = configDirectory.resolve("ingest").resolve("geoip");
}
@Override
public void close() throws IOException {
databaseReaderService.close();

View File

@ -69,7 +69,12 @@ public final class GrokProcessor implements Processor {
}
public static class Factory implements Processor.Factory<GrokProcessor> {
private Path grokConfigDirectory;
private final Path grokConfigDirectory;
public Factory(Path configDirectory) {
this.grokConfigDirectory = configDirectory.resolve("ingest").resolve("grok");
}
public GrokProcessor create(Map<String, Object> config) throws Exception {
String matchField = ConfigurationUtils.readStringProperty(config, "field");
@ -90,10 +95,6 @@ public final class GrokProcessor implements Processor {
return new GrokProcessor(grok, matchField);
}
@Override
public void setConfigDirectory(Path configDirectory) {
this.grokConfigDirectory = configDirectory.resolve("ingest").resolve("grok");
}
}
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.plugin.ingest;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.processor.set.SetProcessor;
import org.elasticsearch.ingest.processor.convert.ConvertProcessor;
import org.elasticsearch.ingest.processor.date.DateProcessor;
@ -42,9 +41,11 @@ import org.elasticsearch.plugin.ingest.transport.simulate.SimulateExecutionServi
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.plugin.ingest.PipelineStore.ProcessorFactoryProvider;
public class IngestModule extends AbstractModule {
private final Map<String, Processor.Factory> processors = new HashMap<>();
private final Map<String, ProcessorFactoryProvider> processorFactoryProviders = new HashMap<>();
@Override
protected void configure() {
@ -53,23 +54,23 @@ public class IngestModule extends AbstractModule {
binder().bind(PipelineStore.class).asEagerSingleton();
binder().bind(SimulateExecutionService.class).asEagerSingleton();
addProcessor(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory());
addProcessor(GrokProcessor.TYPE, new GrokProcessor.Factory());
addProcessor(DateProcessor.TYPE, new DateProcessor.Factory());
addProcessor(SetProcessor.TYPE, new SetProcessor.Factory());
addProcessor(RenameProcessor.TYPE, new RenameProcessor.Factory());
addProcessor(RemoveProcessor.TYPE, new RemoveProcessor.Factory());
addProcessor(SplitProcessor.TYPE, new SplitProcessor.Factory());
addProcessor(JoinProcessor.TYPE, new JoinProcessor.Factory());
addProcessor(UppercaseProcessor.TYPE, new UppercaseProcessor.Factory());
addProcessor(LowercaseProcessor.TYPE, new LowercaseProcessor.Factory());
addProcessor(TrimProcessor.TYPE, new TrimProcessor.Factory());
addProcessor(ConvertProcessor.TYPE, new ConvertProcessor.Factory());
addProcessor(GsubProcessor.TYPE, new GsubProcessor.Factory());
addProcessor(MetaDataProcessor.TYPE, new MetaDataProcessor.Factory());
addProcessor(GeoIpProcessor.TYPE, environment -> new GeoIpProcessor.Factory(environment.configFile()));
addProcessor(GrokProcessor.TYPE, environment -> new GrokProcessor.Factory(environment.configFile()));
addProcessor(DateProcessor.TYPE, environment -> new DateProcessor.Factory());
addProcessor(SetProcessor.TYPE, environment -> new SetProcessor.Factory());
addProcessor(RenameProcessor.TYPE, environment -> new RenameProcessor.Factory());
addProcessor(RemoveProcessor.TYPE, environment -> new RemoveProcessor.Factory());
addProcessor(SplitProcessor.TYPE, environment -> new SplitProcessor.Factory());
addProcessor(JoinProcessor.TYPE, environment -> new JoinProcessor.Factory());
addProcessor(UppercaseProcessor.TYPE, environment -> new UppercaseProcessor.Factory());
addProcessor(LowercaseProcessor.TYPE, environment -> new LowercaseProcessor.Factory());
addProcessor(TrimProcessor.TYPE, environment -> new TrimProcessor.Factory());
addProcessor(ConvertProcessor.TYPE, environment -> new ConvertProcessor.Factory());
addProcessor(GsubProcessor.TYPE, environment -> new GsubProcessor.Factory());
addProcessor(MetaDataProcessor.TYPE, environment -> new MetaDataProcessor.Factory());
MapBinder<String, Processor.Factory> mapBinder = MapBinder.newMapBinder(binder(), String.class, Processor.Factory.class);
for (Map.Entry<String, Processor.Factory> entry : processors.entrySet()) {
MapBinder<String, ProcessorFactoryProvider> mapBinder = MapBinder.newMapBinder(binder(), String.class, ProcessorFactoryProvider.class);
for (Map.Entry<String, ProcessorFactoryProvider> entry : processorFactoryProviders.entrySet()) {
mapBinder.addBinding(entry.getKey()).toInstance(entry.getValue());
}
}
@ -77,8 +78,8 @@ public class IngestModule extends AbstractModule {
/**
* Adds a processor factory under a specific type name.
*/
public void addProcessor(String type, Processor.Factory factory) {
processors.put(type, factory);
public void addProcessor(String type, ProcessorFactoryProvider processorFactoryProvider) {
processorFactoryProviders.put(type, processorFactoryProvider);
}
}

View File

@ -36,6 +36,8 @@ import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.SearchScrollIterator;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Provider;
@ -54,10 +56,11 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.util.*;
public class PipelineStore extends AbstractComponent {
public class PipelineStore extends AbstractLifecycleComponent {
public final static String INDEX = ".ingest";
public final static String TYPE = "pipeline";
@ -74,30 +77,45 @@ public class PipelineStore extends AbstractComponent {
private volatile Map<String, PipelineDefinition> pipelines = new HashMap<>();
@Inject
public PipelineStore(Settings settings, Provider<Client> clientProvider, ThreadPool threadPool, Environment environment, ClusterService clusterService, Map<String, Processor.Factory> processors) {
public PipelineStore(Settings settings, Provider<Client> clientProvider, ThreadPool threadPool, Environment environment, ClusterService clusterService, Map<String, ProcessorFactoryProvider> processorFactoryProviders) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.clientProvider = clientProvider;
this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30));
this.pipelineUpdateInterval = settings.getAsTime("ingest.pipeline.store.update.interval", TimeValue.timeValueSeconds(1));
for (Processor.Factory factory : processors.values()) {
factory.setConfigDirectory(environment.configFile());
Map<String, Processor.Factory> processorFactories = new HashMap<>();
for (Map.Entry<String, ProcessorFactoryProvider> entry : processorFactoryProviders.entrySet()) {
Processor.Factory processorFactory = entry.getValue().get(environment);
processorFactories.put(entry.getKey(), processorFactory);
}
this.processorFactoryRegistry = Collections.unmodifiableMap(processors);
this.processorFactoryRegistry = Collections.unmodifiableMap(processorFactories);
clusterService.add(new PipelineStoreListener());
clusterService.addLifecycleListener(new LifecycleListener() {
}
@Override
public void beforeClose() {
// Ideally we would implement Closeable, but when a node is stopped this doesn't get invoked:
protected void doStart() {
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
// TODO: When org.elasticsearch.node.Node can close Closable instances we should remove this code
List<Closeable> closeables = new ArrayList<>();
for (Processor.Factory factory : processorFactoryRegistry.values()) {
if (factory instanceof Closeable) {
closeables.add((Closeable) factory);
}
}
try {
IOUtils.close(processorFactoryRegistry.values());
IOUtils.close(closeables);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
/**
* Deletes the pipeline specified by id in the request.
@ -239,6 +257,19 @@ public class PipelineStore extends AbstractComponent {
return client;
}
/**
* The ingest framework (pipeline, processor and processor factory) can't rely on ES specific code. However some
* processors rely on reading files from the config directory. We can't add Environment as a constructor parameter,
* so we need some code that provides the physical location of the configuration directory to the processor factories
* that need this and this is what this processor factory provider does.
*/
@FunctionalInterface
interface ProcessorFactoryProvider {
Processor.Factory get(Environment environment);
}
class Updater implements Runnable {
@Override

View File

@ -24,7 +24,6 @@ import org.elasticsearch.test.StreamsUtils;
import org.junit.Before;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
@ -47,8 +46,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
}
public void testBuild_defaults() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
factory.setConfigDirectory(configDir);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
Map<String, Object> config = new HashMap<>();
config.put("source_field", "_field");
@ -61,8 +59,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
}
public void testBuild_targetField() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
factory.setConfigDirectory(configDir);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
Map<String, Object> config = new HashMap<>();
config.put("source_field", "_field");
config.put("target_field", "_field");
@ -72,8 +69,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
}
public void testBuild_dbFile() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
factory.setConfigDirectory(configDir);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
Map<String, Object> config = new HashMap<>();
config.put("source_field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb");
@ -84,8 +80,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
}
public void testBuild_nonExistingDbFile() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
factory.setConfigDirectory(configDir);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
Map<String, Object> config = new HashMap<>();
config.put("source_field", "_field");
@ -98,8 +93,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
}
public void testBuild_fields() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
factory.setConfigDirectory(configDir);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
Set<GeoIpProcessor.Field> fields = EnumSet.noneOf(GeoIpProcessor.Field.class);
List<String> fieldNames = new ArrayList<>();
@ -118,8 +112,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
}
public void testBuild_illegalFieldOption() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
factory.setConfigDirectory(configDir);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
Map<String, Object> config = new HashMap<>();
config.put("source_field", "_field");

View File

@ -43,8 +43,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
}
public void testBuild() throws Exception {
GrokProcessor.Factory factory = new GrokProcessor.Factory();
factory.setConfigDirectory(configDir);
GrokProcessor.Factory factory = new GrokProcessor.Factory(configDir);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");