diff --git a/core/src/main/java/org/elasticsearch/ingest/IngestService.java b/core/src/main/java/org/elasticsearch/ingest/IngestService.java index da26903a056..40bc803f08a 100644 --- a/core/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/core/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -21,7 +21,6 @@ package org.elasticsearch.ingest; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.env.Environment; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; @@ -34,14 +33,11 @@ import java.io.IOException; */ public class IngestService implements Closeable { - private final Environment environment; private final PipelineStore pipelineStore; private final PipelineExecutionService pipelineExecutionService; private final ProcessorsRegistry processorsRegistry; - public IngestService(Settings settings, ThreadPool threadPool, Environment environment, - ClusterService clusterService, ProcessorsRegistry processorsRegistry) { - this.environment = environment; + public IngestService(Settings settings, ThreadPool threadPool, ClusterService clusterService, ProcessorsRegistry processorsRegistry) { this.processorsRegistry = processorsRegistry; this.pipelineStore = new PipelineStore(settings, clusterService); this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool); @@ -56,7 +52,7 @@ public class IngestService implements Closeable { } public void setScriptService(ScriptService scriptService) { - pipelineStore.buildProcessorFactoryRegistry(processorsRegistry, environment, scriptService); + pipelineStore.buildProcessorFactoryRegistry(processorsRegistry, scriptService); } @Override diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java b/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java index ab11f99246e..6db1d6c0681 100644 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineStore.java @@ -21,6 +21,8 @@ package org.elasticsearch.ingest; import org.apache.lucene.util.IOUtils; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.ingest.WritePipelineResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -32,9 +34,6 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.env.Environment; -import org.elasticsearch.action.ingest.DeletePipelineRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.ingest.core.Pipeline; import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.TemplateService; @@ -47,7 +46,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.BiFunction; +import java.util.function.Function; public class PipelineStore extends AbstractComponent implements Closeable, ClusterStateListener { @@ -67,11 +66,11 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust clusterService.add(this); } - public void buildProcessorFactoryRegistry(ProcessorsRegistry processorsRegistry, Environment environment, ScriptService scriptService) { + public void buildProcessorFactoryRegistry(ProcessorsRegistry processorsRegistry, ScriptService scriptService) { Map processorFactories = new HashMap<>(); TemplateService templateService = new InternalTemplateService(scriptService); - for (Map.Entry>> entry : processorsRegistry.entrySet()) { - Processor.Factory processorFactory = entry.getValue().apply(environment, templateService); + for (Map.Entry>> entry : processorsRegistry.entrySet()) { + Processor.Factory processorFactory = entry.getValue().apply(templateService); processorFactories.put(entry.getKey(), processorFactory); } this.processorFactoryRegistry = Collections.unmodifiableMap(processorFactories); diff --git a/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java b/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java index 3561d8079c9..766ba772932 100644 --- a/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java +++ b/core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java @@ -19,30 +19,29 @@ package org.elasticsearch.ingest; -import org.elasticsearch.env.Environment; import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.TemplateService; import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.function.BiFunction; +import java.util.function.Function; public class ProcessorsRegistry { - private final Map>> processorFactoryProviders = new HashMap<>(); + private final Map>> processorFactoryProviders = new HashMap<>(); /** * Adds a processor factory under a specific name. */ - public void registerProcessor(String name, BiFunction> processorFactoryProvider) { - BiFunction> provider = processorFactoryProviders.putIfAbsent(name, processorFactoryProvider); + public void registerProcessor(String name, Function> processorFactoryProvider) { + Function> provider = processorFactoryProviders.putIfAbsent(name, processorFactoryProvider); if (provider != null) { throw new IllegalArgumentException("Processor factory already registered for name [" + name + "]"); } } - public Set>>> entrySet() { + public Set>>> entrySet() { return processorFactoryProviders.entrySet(); } } diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index ce9a3742876..1778867f5a1 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -230,6 +230,13 @@ public class Node implements Releasable { return client; } + /** + * Returns the environment of the node + */ + public Environment getEnvironment() { + return environment; + } + /** * Start the node. If the node is already started, this method is no-op. */ diff --git a/core/src/main/java/org/elasticsearch/node/NodeModule.java b/core/src/main/java/org/elasticsearch/node/NodeModule.java index c1b707bfd0f..8ef26296fe2 100644 --- a/core/src/main/java/org/elasticsearch/node/NodeModule.java +++ b/core/src/main/java/org/elasticsearch/node/NodeModule.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.env.Environment; import org.elasticsearch.ingest.ProcessorsRegistry; import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.TemplateService; @@ -45,7 +44,7 @@ import org.elasticsearch.ingest.processor.UppercaseProcessor; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.node.service.NodeService; -import java.util.function.BiFunction; +import java.util.function.Function; /** * @@ -65,19 +64,19 @@ public class NodeModule extends AbstractModule { this.monitorService = monitorService; this.processorsRegistry = new ProcessorsRegistry(); - registerProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory()); - registerProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService)); - registerProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService)); - registerProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory()); - registerProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService)); - registerProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory()); - registerProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory()); - registerProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory()); - registerProcessor(LowercaseProcessor.TYPE, (environment, templateService) -> new LowercaseProcessor.Factory()); - registerProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory()); - registerProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory()); - registerProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory()); - registerProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService)); + registerProcessor(DateProcessor.TYPE, (templateService) -> new DateProcessor.Factory()); + registerProcessor(SetProcessor.TYPE, SetProcessor.Factory::new); + registerProcessor(AppendProcessor.TYPE, AppendProcessor.Factory::new); + registerProcessor(RenameProcessor.TYPE, (templateService) -> new RenameProcessor.Factory()); + registerProcessor(RemoveProcessor.TYPE, RemoveProcessor.Factory::new); + registerProcessor(SplitProcessor.TYPE, (templateService) -> new SplitProcessor.Factory()); + registerProcessor(JoinProcessor.TYPE, (templateService) -> new JoinProcessor.Factory()); + registerProcessor(UppercaseProcessor.TYPE, (templateService) -> new UppercaseProcessor.Factory()); + registerProcessor(LowercaseProcessor.TYPE, (templateService) -> new LowercaseProcessor.Factory()); + registerProcessor(TrimProcessor.TYPE, (templateService) -> new TrimProcessor.Factory()); + registerProcessor(ConvertProcessor.TYPE, (templateService) -> new ConvertProcessor.Factory()); + registerProcessor(GsubProcessor.TYPE, (templateService) -> new GsubProcessor.Factory()); + registerProcessor(FailProcessor.TYPE, FailProcessor.Factory::new); } @Override @@ -99,10 +98,17 @@ public class NodeModule extends AbstractModule { bind(ProcessorsRegistry.class).toInstance(processorsRegistry); } + /** + * Returns the node + */ + public Node getNode() { + return node; + } + /** * Adds a processor factory under a specific type name. */ - public void registerProcessor(String type, BiFunction> processorFactoryProvider) { + public void registerProcessor(String type, Function> processorFactoryProvider) { processorsRegistry.registerProcessor(type, processorFactoryProvider); } diff --git a/core/src/main/java/org/elasticsearch/node/service/NodeService.java b/core/src/main/java/org/elasticsearch/node/service/NodeService.java index ecf5cd07e11..15352eeadd2 100644 --- a/core/src/main/java/org/elasticsearch/node/service/NodeService.java +++ b/core/src/main/java/org/elasticsearch/node/service/NodeService.java @@ -35,8 +35,6 @@ import org.elasticsearch.http.HttpServer; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.ingest.PipelineExecutionService; -import org.elasticsearch.ingest.PipelineStore; import org.elasticsearch.ingest.ProcessorsRegistry; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.plugins.PluginsService; @@ -44,7 +42,6 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.io.Closeable; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -89,7 +86,7 @@ public class NodeService extends AbstractComponent { this.version = version; this.pluginService = pluginService; this.circuitBreakerService = circuitBreakerService; - this.ingestService = new IngestService(settings, threadPool, environment, clusterService, processorsRegistry); + this.ingestService = new IngestService(settings, threadPool, clusterService, processorsRegistry); } // can not use constructor injection or there will be a circular dependency diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index fe60c9e04ba..6227b01a30c 100644 --- a/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/core/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -226,7 +226,7 @@ public class IngestClientIT extends ESIntegTestCase { } public void onModule(NodeModule nodeModule) { - nodeModule.registerProcessor("test", (environment, templateService) -> config -> + nodeModule.registerProcessor("test", (templateService) -> config -> new TestProcessor("test", ingestDocument -> { ingestDocument.setFieldValue("processed", true); if (ingestDocument.getFieldValue("fail", Boolean.class)) { diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java index 19d56858c8f..c1f14b26eb8 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java @@ -41,8 +41,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; public class PipelineStoreTests extends ESTestCase { @@ -54,8 +52,8 @@ public class PipelineStoreTests extends ESTestCase { ClusterService clusterService = mock(ClusterService.class); store = new PipelineStore(Settings.EMPTY, clusterService); ProcessorsRegistry registry = new ProcessorsRegistry(); - registry.registerProcessor("set", (environment, templateService) -> new SetProcessor.Factory(TestTemplateService.instance())); - store.buildProcessorFactoryRegistry(registry, null, null); + registry.registerProcessor("set", (templateService) -> new SetProcessor.Factory(TestTemplateService.instance())); + store.buildProcessorFactoryRegistry(registry, null); } public void testUpdatePipelines() { diff --git a/core/src/test/java/org/elasticsearch/ingest/ProcessorsRegistryTests.java b/core/src/test/java/org/elasticsearch/ingest/ProcessorsRegistryTests.java index 2869fffbafc..ad18488d990 100644 --- a/core/src/test/java/org/elasticsearch/ingest/ProcessorsRegistryTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/ProcessorsRegistryTests.java @@ -19,14 +19,13 @@ package org.elasticsearch.ingest; -import org.elasticsearch.env.Environment; import org.elasticsearch.ingest.core.Processor; import org.elasticsearch.ingest.core.TemplateService; import org.elasticsearch.test.ESTestCase; import java.util.Map; import java.util.Set; -import java.util.function.BiFunction; +import java.util.function.Function; import static org.hamcrest.CoreMatchers.equalTo; @@ -35,24 +34,24 @@ public class ProcessorsRegistryTests extends ESTestCase { public void testAddProcessor() { ProcessorsRegistry processorsRegistry = new ProcessorsRegistry(); TestProcessor.Factory factory1 = new TestProcessor.Factory(); - processorsRegistry.registerProcessor("1", (environment, templateService) -> factory1); + processorsRegistry.registerProcessor("1", (templateService) -> factory1); TestProcessor.Factory factory2 = new TestProcessor.Factory(); - processorsRegistry.registerProcessor("2", (environment, templateService) -> factory2); + processorsRegistry.registerProcessor("2", (templateService) -> factory2); TestProcessor.Factory factory3 = new TestProcessor.Factory(); try { - processorsRegistry.registerProcessor("1", (environment, templateService) -> factory3); + processorsRegistry.registerProcessor("1", (templateService) -> factory3); fail("addProcessor should have failed"); } catch(IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("Processor factory already registered for name [1]")); } - Set>>> entrySet = processorsRegistry.entrySet(); + Set>>> entrySet = processorsRegistry.entrySet(); assertThat(entrySet.size(), equalTo(2)); - for (Map.Entry>> entry : entrySet) { + for (Map.Entry>> entry : entrySet) { if (entry.getKey().equals("1")) { - assertThat(entry.getValue().apply(null, null), equalTo(factory1)); + assertThat(entry.getValue().apply(null), equalTo(factory1)); } else if (entry.getKey().equals("2")) { - assertThat(entry.getValue().apply(null, null), equalTo(factory2)); + assertThat(entry.getValue().apply(null), equalTo(factory2)); } else { fail("unexpected processor id [" + entry.getKey() + "]"); } diff --git a/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/IngestGrokPlugin.java b/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/IngestGrokPlugin.java index 9ca5bc24c9a..54800ac1603 100644 --- a/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/IngestGrokPlugin.java +++ b/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/IngestGrokPlugin.java @@ -56,7 +56,7 @@ public class IngestGrokPlugin extends Plugin { } public void onModule(NodeModule nodeModule) { - nodeModule.registerProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(builtinPatterns)); + nodeModule.registerProcessor(GrokProcessor.TYPE, (templateService) -> new GrokProcessor.Factory(builtinPatterns)); } static Map loadBuiltinPatterns() throws IOException { diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index ab87d51318b..0ffc3cf3a59 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -37,24 +37,17 @@ import org.elasticsearch.ingest.core.Processor; import java.io.Closeable; import java.io.IOException; -import java.io.InputStream; import java.net.InetAddress; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.PathMatcher; -import java.nio.file.StandardOpenOption; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.stream.Stream; import static org.elasticsearch.ingest.core.ConfigurationUtils.readOptionalList; import static org.elasticsearch.ingest.core.ConfigurationUtils.readStringProperty; @@ -230,31 +223,8 @@ public final class GeoIpProcessor implements Processor { private final Map databaseReaders; - public Factory(Path configDirectory) { - - // TODO(simonw): same as fro grok we should load this outside of the factory in a static method and hass the map to the ctor - Path geoIpConfigDirectory = configDirectory.resolve("ingest-geoip"); - if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) { - throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist"); - } - - try (Stream databaseFiles = Files.list(geoIpConfigDirectory)) { - Map databaseReaders = new HashMap<>(); - PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb"); - // Use iterator instead of forEach otherwise IOException needs to be caught twice... - Iterator iterator = databaseFiles.iterator(); - while (iterator.hasNext()) { - Path databasePath = iterator.next(); - if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) { - try (InputStream inputStream = Files.newInputStream(databasePath, StandardOpenOption.READ)) { - databaseReaders.put(databasePath.getFileName().toString(), new DatabaseReader.Builder(inputStream).build()); - } - } - } - this.databaseReaders = Collections.unmodifiableMap(databaseReaders); - } catch (IOException e) { - throw new RuntimeException(e); - } + public Factory(Map databaseReaders) { + this.databaseReaders = databaseReaders; } public GeoIpProcessor create(Map config) throws Exception { diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index 4b6a60902ea..f92cb7b479f 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -19,9 +19,22 @@ package org.elasticsearch.ingest.geoip; +import com.maxmind.geoip2.DatabaseReader; import org.elasticsearch.node.NodeModule; import org.elasticsearch.plugins.Plugin; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.PathMatcher; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.stream.Stream; + public class IngestGeoIpPlugin extends Plugin { @Override @@ -31,10 +44,34 @@ public class IngestGeoIpPlugin extends Plugin { @Override public String description() { - return "Plugin that allows to plug in ingest processors"; + return "Ingest processor that adds information about the geographical location of ip addresses"; } - public void onModule(NodeModule nodeModule) { - nodeModule.registerProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile())); + public void onModule(NodeModule nodeModule) throws IOException { + Path geoIpConfigDirectory = nodeModule.getNode().getEnvironment().configFile().resolve("ingest-geoip"); + Map databaseReaders = loadDatabaseReaders(geoIpConfigDirectory); + nodeModule.registerProcessor(GeoIpProcessor.TYPE, (templateService) -> new GeoIpProcessor.Factory(databaseReaders)); + } + + static Map loadDatabaseReaders(Path geoIpConfigDirectory) throws IOException { + if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) { + throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist"); + } + + Map databaseReaders = new HashMap<>(); + try (Stream databaseFiles = Files.list(geoIpConfigDirectory)) { + PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb"); + // Use iterator instead of forEach otherwise IOException needs to be caught twice... + Iterator iterator = databaseFiles.iterator(); + while (iterator.hasNext()) { + Path databasePath = iterator.next(); + if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) { + try (InputStream inputStream = Files.newInputStream(databasePath, StandardOpenOption.READ)) { + databaseReaders.put(databasePath.getFileName().toString(), new DatabaseReader.Builder(inputStream).build()); + } + } + } + } + return Collections.unmodifiableMap(databaseReaders); } } diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index 78dd86d4fdc..20ffe7fe43a 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -19,11 +19,13 @@ package org.elasticsearch.ingest.geoip; +import com.maxmind.geoip2.DatabaseReader; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.StreamsUtils; -import org.junit.Before; +import org.junit.BeforeClass; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -40,19 +42,20 @@ import static org.hamcrest.Matchers.sameInstance; public class GeoIpProcessorFactoryTests extends ESTestCase { - private Path configDir; + private static Map databaseReaders; - @Before - public void prepareConfigDirectory() throws Exception { - this.configDir = createTempDir(); + @BeforeClass + public static void loadDatabaseReaders() throws IOException { + Path configDir = createTempDir(); Path geoIpConfigDir = configDir.resolve("ingest-geoip"); Files.createDirectories(geoIpConfigDir); Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")), geoIpConfigDir.resolve("GeoLite2-City.mmdb")); Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")), geoIpConfigDir.resolve("GeoLite2-Country.mmdb")); + databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir); } - public void testBuild_defaults() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir); + public void testBuildDefaults() throws Exception { + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); Map config = new HashMap<>(); config.put("source_field", "_field"); @@ -64,8 +67,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { assertThat(processor.getFields(), sameInstance(GeoIpProcessor.Factory.DEFAULT_FIELDS)); } - public void testBuild_targetField() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir); + public void testBuildTargetField() throws Exception { + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); Map config = new HashMap<>(); config.put("source_field", "_field"); config.put("target_field", "_field"); @@ -74,8 +77,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { assertThat(processor.getTargetField(), equalTo("_field")); } - public void testBuild_dbFile() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir); + public void testBuildDbFile() throws Exception { + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); Map config = new HashMap<>(); config.put("source_field", "_field"); config.put("database_file", "GeoLite2-Country.mmdb"); @@ -85,8 +88,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-Country")); } - public void testBuild_nonExistingDbFile() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir); + public void testBuildNonExistingDbFile() throws Exception { + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); Map config = new HashMap<>(); config.put("source_field", "_field"); @@ -99,8 +102,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { } } - public void testBuild_fields() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir); + public void testBuildFields() throws Exception { + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); Set fields = EnumSet.noneOf(GeoIpProcessor.Field.class); List fieldNames = new ArrayList<>(); @@ -118,8 +121,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { assertThat(processor.getFields(), equalTo(fields)); } - public void testBuild_illegalFieldOption() throws Exception { - GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir); + public void testBuildIllegalFieldOption() throws Exception { + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); Map config = new HashMap<>(); config.put("source_field", "_field");