Merge pull request #16015 from javanna/enhancement/geoip_todos
Move geoip db loading out of the processor factory
This commit is contained in:
commit
20b5796cf3
|
@ -21,7 +21,6 @@ package org.elasticsearch.ingest;
|
|||
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
|
@ -34,14 +33,11 @@ import java.io.IOException;
|
|||
*/
|
||||
public class IngestService implements Closeable {
|
||||
|
||||
private final Environment environment;
|
||||
private final PipelineStore pipelineStore;
|
||||
private final PipelineExecutionService pipelineExecutionService;
|
||||
private final ProcessorsRegistry processorsRegistry;
|
||||
|
||||
public IngestService(Settings settings, ThreadPool threadPool, Environment environment,
|
||||
ClusterService clusterService, ProcessorsRegistry processorsRegistry) {
|
||||
this.environment = environment;
|
||||
public IngestService(Settings settings, ThreadPool threadPool, ClusterService clusterService, ProcessorsRegistry processorsRegistry) {
|
||||
this.processorsRegistry = processorsRegistry;
|
||||
this.pipelineStore = new PipelineStore(settings, clusterService);
|
||||
this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool);
|
||||
|
@ -56,7 +52,7 @@ public class IngestService implements Closeable {
|
|||
}
|
||||
|
||||
public void setScriptService(ScriptService scriptService) {
|
||||
pipelineStore.buildProcessorFactoryRegistry(processorsRegistry, environment, scriptService);
|
||||
pipelineStore.buildProcessorFactoryRegistry(processorsRegistry, scriptService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.elasticsearch.ingest;
|
|||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ingest.DeletePipelineRequest;
|
||||
import org.elasticsearch.action.ingest.PutPipelineRequest;
|
||||
import org.elasticsearch.action.ingest.WritePipelineResponse;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
|
@ -32,9 +34,6 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.action.ingest.DeletePipelineRequest;
|
||||
import org.elasticsearch.action.ingest.PutPipelineRequest;
|
||||
import org.elasticsearch.ingest.core.Pipeline;
|
||||
import org.elasticsearch.ingest.core.Processor;
|
||||
import org.elasticsearch.ingest.core.TemplateService;
|
||||
|
@ -47,7 +46,7 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class PipelineStore extends AbstractComponent implements Closeable, ClusterStateListener {
|
||||
|
||||
|
@ -67,11 +66,11 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
|
|||
clusterService.add(this);
|
||||
}
|
||||
|
||||
public void buildProcessorFactoryRegistry(ProcessorsRegistry processorsRegistry, Environment environment, ScriptService scriptService) {
|
||||
public void buildProcessorFactoryRegistry(ProcessorsRegistry processorsRegistry, ScriptService scriptService) {
|
||||
Map<String, Processor.Factory> processorFactories = new HashMap<>();
|
||||
TemplateService templateService = new InternalTemplateService(scriptService);
|
||||
for (Map.Entry<String, BiFunction<Environment, TemplateService, Processor.Factory<?>>> entry : processorsRegistry.entrySet()) {
|
||||
Processor.Factory processorFactory = entry.getValue().apply(environment, templateService);
|
||||
for (Map.Entry<String, Function<TemplateService, Processor.Factory<?>>> entry : processorsRegistry.entrySet()) {
|
||||
Processor.Factory processorFactory = entry.getValue().apply(templateService);
|
||||
processorFactories.put(entry.getKey(), processorFactory);
|
||||
}
|
||||
this.processorFactoryRegistry = Collections.unmodifiableMap(processorFactories);
|
||||
|
|
|
@ -19,30 +19,29 @@
|
|||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.ingest.core.Processor;
|
||||
import org.elasticsearch.ingest.core.TemplateService;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class ProcessorsRegistry {
|
||||
|
||||
private final Map<String, BiFunction<Environment, TemplateService, Processor.Factory<?>>> processorFactoryProviders = new HashMap<>();
|
||||
private final Map<String, Function<TemplateService, Processor.Factory<?>>> processorFactoryProviders = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Adds a processor factory under a specific name.
|
||||
*/
|
||||
public void registerProcessor(String name, BiFunction<Environment, TemplateService, Processor.Factory<?>> processorFactoryProvider) {
|
||||
BiFunction<Environment, TemplateService, Processor.Factory<?>> provider = processorFactoryProviders.putIfAbsent(name, processorFactoryProvider);
|
||||
public void registerProcessor(String name, Function<TemplateService, Processor.Factory<?>> processorFactoryProvider) {
|
||||
Function<TemplateService, Processor.Factory<?>> provider = processorFactoryProviders.putIfAbsent(name, processorFactoryProvider);
|
||||
if (provider != null) {
|
||||
throw new IllegalArgumentException("Processor factory already registered for name [" + name + "]");
|
||||
}
|
||||
}
|
||||
|
||||
public Set<Map.Entry<String, BiFunction<Environment, TemplateService, Processor.Factory<?>>>> entrySet() {
|
||||
public Set<Map.Entry<String, Function<TemplateService, Processor.Factory<?>>>> entrySet() {
|
||||
return processorFactoryProviders.entrySet();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -230,6 +230,13 @@ public class Node implements Releasable {
|
|||
return client;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the environment of the node
|
||||
*/
|
||||
public Environment getEnvironment() {
|
||||
return environment;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the node. If the node is already started, this method is no-op.
|
||||
*/
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
|
|||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.ingest.ProcessorsRegistry;
|
||||
import org.elasticsearch.ingest.core.Processor;
|
||||
import org.elasticsearch.ingest.core.TemplateService;
|
||||
|
@ -45,7 +44,7 @@ import org.elasticsearch.ingest.processor.UppercaseProcessor;
|
|||
import org.elasticsearch.monitor.MonitorService;
|
||||
import org.elasticsearch.node.service.NodeService;
|
||||
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -65,19 +64,19 @@ public class NodeModule extends AbstractModule {
|
|||
this.monitorService = monitorService;
|
||||
this.processorsRegistry = new ProcessorsRegistry();
|
||||
|
||||
registerProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory());
|
||||
registerProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService));
|
||||
registerProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService));
|
||||
registerProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory());
|
||||
registerProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService));
|
||||
registerProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory());
|
||||
registerProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory());
|
||||
registerProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory());
|
||||
registerProcessor(LowercaseProcessor.TYPE, (environment, templateService) -> new LowercaseProcessor.Factory());
|
||||
registerProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory());
|
||||
registerProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory());
|
||||
registerProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory());
|
||||
registerProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService));
|
||||
registerProcessor(DateProcessor.TYPE, (templateService) -> new DateProcessor.Factory());
|
||||
registerProcessor(SetProcessor.TYPE, SetProcessor.Factory::new);
|
||||
registerProcessor(AppendProcessor.TYPE, AppendProcessor.Factory::new);
|
||||
registerProcessor(RenameProcessor.TYPE, (templateService) -> new RenameProcessor.Factory());
|
||||
registerProcessor(RemoveProcessor.TYPE, RemoveProcessor.Factory::new);
|
||||
registerProcessor(SplitProcessor.TYPE, (templateService) -> new SplitProcessor.Factory());
|
||||
registerProcessor(JoinProcessor.TYPE, (templateService) -> new JoinProcessor.Factory());
|
||||
registerProcessor(UppercaseProcessor.TYPE, (templateService) -> new UppercaseProcessor.Factory());
|
||||
registerProcessor(LowercaseProcessor.TYPE, (templateService) -> new LowercaseProcessor.Factory());
|
||||
registerProcessor(TrimProcessor.TYPE, (templateService) -> new TrimProcessor.Factory());
|
||||
registerProcessor(ConvertProcessor.TYPE, (templateService) -> new ConvertProcessor.Factory());
|
||||
registerProcessor(GsubProcessor.TYPE, (templateService) -> new GsubProcessor.Factory());
|
||||
registerProcessor(FailProcessor.TYPE, FailProcessor.Factory::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -99,10 +98,17 @@ public class NodeModule extends AbstractModule {
|
|||
bind(ProcessorsRegistry.class).toInstance(processorsRegistry);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the node
|
||||
*/
|
||||
public Node getNode() {
|
||||
return node;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a processor factory under a specific type name.
|
||||
*/
|
||||
public void registerProcessor(String type, BiFunction<Environment, TemplateService, Processor.Factory<?>> processorFactoryProvider) {
|
||||
public void registerProcessor(String type, Function<TemplateService, Processor.Factory<?>> processorFactoryProvider) {
|
||||
processorsRegistry.registerProcessor(type, processorFactoryProvider);
|
||||
}
|
||||
|
||||
|
|
|
@ -35,8 +35,6 @@ import org.elasticsearch.http.HttpServer;
|
|||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.ingest.IngestService;
|
||||
import org.elasticsearch.ingest.PipelineExecutionService;
|
||||
import org.elasticsearch.ingest.PipelineStore;
|
||||
import org.elasticsearch.ingest.ProcessorsRegistry;
|
||||
import org.elasticsearch.monitor.MonitorService;
|
||||
import org.elasticsearch.plugins.PluginsService;
|
||||
|
@ -44,7 +42,6 @@ import org.elasticsearch.script.ScriptService;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
@ -89,7 +86,7 @@ public class NodeService extends AbstractComponent {
|
|||
this.version = version;
|
||||
this.pluginService = pluginService;
|
||||
this.circuitBreakerService = circuitBreakerService;
|
||||
this.ingestService = new IngestService(settings, threadPool, environment, clusterService, processorsRegistry);
|
||||
this.ingestService = new IngestService(settings, threadPool, clusterService, processorsRegistry);
|
||||
}
|
||||
|
||||
// can not use constructor injection or there will be a circular dependency
|
||||
|
|
|
@ -226,7 +226,7 @@ public class IngestClientIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
public void onModule(NodeModule nodeModule) {
|
||||
nodeModule.registerProcessor("test", (environment, templateService) -> config ->
|
||||
nodeModule.registerProcessor("test", (templateService) -> config ->
|
||||
new TestProcessor("test", ingestDocument -> {
|
||||
ingestDocument.setFieldValue("processed", true);
|
||||
if (ingestDocument.getFieldValue("fail", Boolean.class)) {
|
||||
|
|
|
@ -41,8 +41,6 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class PipelineStoreTests extends ESTestCase {
|
||||
|
@ -54,8 +52,8 @@ public class PipelineStoreTests extends ESTestCase {
|
|||
ClusterService clusterService = mock(ClusterService.class);
|
||||
store = new PipelineStore(Settings.EMPTY, clusterService);
|
||||
ProcessorsRegistry registry = new ProcessorsRegistry();
|
||||
registry.registerProcessor("set", (environment, templateService) -> new SetProcessor.Factory(TestTemplateService.instance()));
|
||||
store.buildProcessorFactoryRegistry(registry, null, null);
|
||||
registry.registerProcessor("set", (templateService) -> new SetProcessor.Factory(TestTemplateService.instance()));
|
||||
store.buildProcessorFactoryRegistry(registry, null);
|
||||
}
|
||||
|
||||
public void testUpdatePipelines() {
|
||||
|
|
|
@ -19,14 +19,13 @@
|
|||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.ingest.core.Processor;
|
||||
import org.elasticsearch.ingest.core.TemplateService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
|
||||
|
@ -35,24 +34,24 @@ public class ProcessorsRegistryTests extends ESTestCase {
|
|||
public void testAddProcessor() {
|
||||
ProcessorsRegistry processorsRegistry = new ProcessorsRegistry();
|
||||
TestProcessor.Factory factory1 = new TestProcessor.Factory();
|
||||
processorsRegistry.registerProcessor("1", (environment, templateService) -> factory1);
|
||||
processorsRegistry.registerProcessor("1", (templateService) -> factory1);
|
||||
TestProcessor.Factory factory2 = new TestProcessor.Factory();
|
||||
processorsRegistry.registerProcessor("2", (environment, templateService) -> factory2);
|
||||
processorsRegistry.registerProcessor("2", (templateService) -> factory2);
|
||||
TestProcessor.Factory factory3 = new TestProcessor.Factory();
|
||||
try {
|
||||
processorsRegistry.registerProcessor("1", (environment, templateService) -> factory3);
|
||||
processorsRegistry.registerProcessor("1", (templateService) -> factory3);
|
||||
fail("addProcessor should have failed");
|
||||
} catch(IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("Processor factory already registered for name [1]"));
|
||||
}
|
||||
|
||||
Set<Map.Entry<String, BiFunction<Environment, TemplateService, Processor.Factory<?>>>> entrySet = processorsRegistry.entrySet();
|
||||
Set<Map.Entry<String, Function<TemplateService, Processor.Factory<?>>>> entrySet = processorsRegistry.entrySet();
|
||||
assertThat(entrySet.size(), equalTo(2));
|
||||
for (Map.Entry<String, BiFunction<Environment, TemplateService, Processor.Factory<?>>> entry : entrySet) {
|
||||
for (Map.Entry<String, Function<TemplateService, Processor.Factory<?>>> entry : entrySet) {
|
||||
if (entry.getKey().equals("1")) {
|
||||
assertThat(entry.getValue().apply(null, null), equalTo(factory1));
|
||||
assertThat(entry.getValue().apply(null), equalTo(factory1));
|
||||
} else if (entry.getKey().equals("2")) {
|
||||
assertThat(entry.getValue().apply(null, null), equalTo(factory2));
|
||||
assertThat(entry.getValue().apply(null), equalTo(factory2));
|
||||
} else {
|
||||
fail("unexpected processor id [" + entry.getKey() + "]");
|
||||
}
|
||||
|
|
|
@ -56,7 +56,7 @@ public class IngestGrokPlugin extends Plugin {
|
|||
}
|
||||
|
||||
public void onModule(NodeModule nodeModule) {
|
||||
nodeModule.registerProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(builtinPatterns));
|
||||
nodeModule.registerProcessor(GrokProcessor.TYPE, (templateService) -> new GrokProcessor.Factory(builtinPatterns));
|
||||
}
|
||||
|
||||
static Map<String, String> loadBuiltinPatterns() throws IOException {
|
||||
|
|
|
@ -37,24 +37,17 @@ import org.elasticsearch.ingest.core.Processor;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.PathMatcher;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.elasticsearch.ingest.core.ConfigurationUtils.readOptionalList;
|
||||
import static org.elasticsearch.ingest.core.ConfigurationUtils.readStringProperty;
|
||||
|
@ -230,31 +223,8 @@ public final class GeoIpProcessor implements Processor {
|
|||
|
||||
private final Map<String, DatabaseReader> databaseReaders;
|
||||
|
||||
public Factory(Path configDirectory) {
|
||||
|
||||
// TODO(simonw): same as fro grok we should load this outside of the factory in a static method and hass the map to the ctor
|
||||
Path geoIpConfigDirectory = configDirectory.resolve("ingest-geoip");
|
||||
if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) {
|
||||
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
|
||||
}
|
||||
|
||||
try (Stream<Path> databaseFiles = Files.list(geoIpConfigDirectory)) {
|
||||
Map<String, DatabaseReader> databaseReaders = new HashMap<>();
|
||||
PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb");
|
||||
// Use iterator instead of forEach otherwise IOException needs to be caught twice...
|
||||
Iterator<Path> iterator = databaseFiles.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Path databasePath = iterator.next();
|
||||
if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) {
|
||||
try (InputStream inputStream = Files.newInputStream(databasePath, StandardOpenOption.READ)) {
|
||||
databaseReaders.put(databasePath.getFileName().toString(), new DatabaseReader.Builder(inputStream).build());
|
||||
}
|
||||
}
|
||||
}
|
||||
this.databaseReaders = Collections.unmodifiableMap(databaseReaders);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
public Factory(Map<String, DatabaseReader> databaseReaders) {
|
||||
this.databaseReaders = databaseReaders;
|
||||
}
|
||||
|
||||
public GeoIpProcessor create(Map<String, Object> config) throws Exception {
|
||||
|
|
|
@ -19,9 +19,22 @@
|
|||
|
||||
package org.elasticsearch.ingest.geoip;
|
||||
|
||||
import com.maxmind.geoip2.DatabaseReader;
|
||||
import org.elasticsearch.node.NodeModule;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.PathMatcher;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class IngestGeoIpPlugin extends Plugin {
|
||||
|
||||
@Override
|
||||
|
@ -31,10 +44,34 @@ public class IngestGeoIpPlugin extends Plugin {
|
|||
|
||||
@Override
|
||||
public String description() {
|
||||
return "Plugin that allows to plug in ingest processors";
|
||||
return "Ingest processor that adds information about the geographical location of ip addresses";
|
||||
}
|
||||
|
||||
public void onModule(NodeModule nodeModule) {
|
||||
nodeModule.registerProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile()));
|
||||
public void onModule(NodeModule nodeModule) throws IOException {
|
||||
Path geoIpConfigDirectory = nodeModule.getNode().getEnvironment().configFile().resolve("ingest-geoip");
|
||||
Map<String, DatabaseReader> databaseReaders = loadDatabaseReaders(geoIpConfigDirectory);
|
||||
nodeModule.registerProcessor(GeoIpProcessor.TYPE, (templateService) -> new GeoIpProcessor.Factory(databaseReaders));
|
||||
}
|
||||
|
||||
static Map<String, DatabaseReader> loadDatabaseReaders(Path geoIpConfigDirectory) throws IOException {
|
||||
if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) {
|
||||
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
|
||||
}
|
||||
|
||||
Map<String, DatabaseReader> databaseReaders = new HashMap<>();
|
||||
try (Stream<Path> databaseFiles = Files.list(geoIpConfigDirectory)) {
|
||||
PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb");
|
||||
// Use iterator instead of forEach otherwise IOException needs to be caught twice...
|
||||
Iterator<Path> iterator = databaseFiles.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Path databasePath = iterator.next();
|
||||
if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) {
|
||||
try (InputStream inputStream = Files.newInputStream(databasePath, StandardOpenOption.READ)) {
|
||||
databaseReaders.put(databasePath.getFileName().toString(), new DatabaseReader.Builder(inputStream).build());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return Collections.unmodifiableMap(databaseReaders);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,11 +19,13 @@
|
|||
|
||||
package org.elasticsearch.ingest.geoip;
|
||||
|
||||
import com.maxmind.geoip2.DatabaseReader;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.StreamsUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
|
@ -40,19 +42,20 @@ import static org.hamcrest.Matchers.sameInstance;
|
|||
|
||||
public class GeoIpProcessorFactoryTests extends ESTestCase {
|
||||
|
||||
private Path configDir;
|
||||
private static Map<String, DatabaseReader> databaseReaders;
|
||||
|
||||
@Before
|
||||
public void prepareConfigDirectory() throws Exception {
|
||||
this.configDir = createTempDir();
|
||||
@BeforeClass
|
||||
public static void loadDatabaseReaders() throws IOException {
|
||||
Path configDir = createTempDir();
|
||||
Path geoIpConfigDir = configDir.resolve("ingest-geoip");
|
||||
Files.createDirectories(geoIpConfigDir);
|
||||
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")), geoIpConfigDir.resolve("GeoLite2-City.mmdb"));
|
||||
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")), geoIpConfigDir.resolve("GeoLite2-Country.mmdb"));
|
||||
databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir);
|
||||
}
|
||||
|
||||
public void testBuild_defaults() throws Exception {
|
||||
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
|
||||
public void testBuildDefaults() throws Exception {
|
||||
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
|
||||
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("source_field", "_field");
|
||||
|
@ -64,8 +67,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
|
|||
assertThat(processor.getFields(), sameInstance(GeoIpProcessor.Factory.DEFAULT_FIELDS));
|
||||
}
|
||||
|
||||
public void testBuild_targetField() throws Exception {
|
||||
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
|
||||
public void testBuildTargetField() throws Exception {
|
||||
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("source_field", "_field");
|
||||
config.put("target_field", "_field");
|
||||
|
@ -74,8 +77,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
|
|||
assertThat(processor.getTargetField(), equalTo("_field"));
|
||||
}
|
||||
|
||||
public void testBuild_dbFile() throws Exception {
|
||||
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
|
||||
public void testBuildDbFile() throws Exception {
|
||||
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("source_field", "_field");
|
||||
config.put("database_file", "GeoLite2-Country.mmdb");
|
||||
|
@ -85,8 +88,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
|
|||
assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-Country"));
|
||||
}
|
||||
|
||||
public void testBuild_nonExistingDbFile() throws Exception {
|
||||
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
|
||||
public void testBuildNonExistingDbFile() throws Exception {
|
||||
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
|
||||
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("source_field", "_field");
|
||||
|
@ -99,8 +102,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testBuild_fields() throws Exception {
|
||||
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
|
||||
public void testBuildFields() throws Exception {
|
||||
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
|
||||
|
||||
Set<GeoIpProcessor.Field> fields = EnumSet.noneOf(GeoIpProcessor.Field.class);
|
||||
List<String> fieldNames = new ArrayList<>();
|
||||
|
@ -118,8 +121,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
|
|||
assertThat(processor.getFields(), equalTo(fields));
|
||||
}
|
||||
|
||||
public void testBuild_illegalFieldOption() throws Exception {
|
||||
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
|
||||
public void testBuildIllegalFieldOption() throws Exception {
|
||||
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
|
||||
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("source_field", "_field");
|
||||
|
|
Loading…
Reference in New Issue