move DatabaseReaders initialization to IngestGeoIpPlugin#onModule

This commit is contained in:
javanna 2016-01-15 15:02:35 +01:00 committed by Luca Cavanna
parent d2eda422cf
commit dd7cae7c19
3 changed files with 61 additions and 51 deletions

View File

@ -37,24 +37,17 @@ import org.elasticsearch.ingest.core.Processor;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.StandardOpenOption;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import static org.elasticsearch.ingest.core.ConfigurationUtils.readOptionalList;
import static org.elasticsearch.ingest.core.ConfigurationUtils.readStringProperty;
@ -230,31 +223,8 @@ public final class GeoIpProcessor implements Processor {
private final Map<String, DatabaseReader> databaseReaders;
public Factory(Path configDirectory) {
// TODO(simonw): same as fro grok we should load this outside of the factory in a static method and hass the map to the ctor
Path geoIpConfigDirectory = configDirectory.resolve("ingest-geoip");
if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) {
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
}
try (Stream<Path> databaseFiles = Files.list(geoIpConfigDirectory)) {
Map<String, DatabaseReader> databaseReaders = new HashMap<>();
PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb");
// Use iterator instead of forEach otherwise IOException needs to be caught twice...
Iterator<Path> iterator = databaseFiles.iterator();
while (iterator.hasNext()) {
Path databasePath = iterator.next();
if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) {
try (InputStream inputStream = Files.newInputStream(databasePath, StandardOpenOption.READ)) {
databaseReaders.put(databasePath.getFileName().toString(), new DatabaseReader.Builder(inputStream).build());
}
}
}
this.databaseReaders = Collections.unmodifiableMap(databaseReaders);
} catch (IOException e) {
throw new RuntimeException(e);
}
public Factory(Map<String, DatabaseReader> databaseReaders) {
this.databaseReaders = databaseReaders;
}
public GeoIpProcessor create(Map<String, Object> config) throws Exception {

View File

@ -19,9 +19,22 @@
package org.elasticsearch.ingest.geoip;
import com.maxmind.geoip2.DatabaseReader;
import org.elasticsearch.node.NodeModule;
import org.elasticsearch.plugins.Plugin;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Stream;
public class IngestGeoIpPlugin extends Plugin {
@Override
@ -34,7 +47,31 @@ public class IngestGeoIpPlugin extends Plugin {
return "Plugin that allows to plug in ingest processors";
}
public void onModule(NodeModule nodeModule) {
nodeModule.registerProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile()));
public void onModule(NodeModule nodeModule) throws IOException {
Path geoIpConfigDirectory = nodeModule.getNode().getEnvironment().configFile().resolve("ingest-geoip");
Map<String, DatabaseReader> databaseReaders = loadDatabaseReaders(geoIpConfigDirectory);
nodeModule.registerProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(databaseReaders));
}
static Map<String, DatabaseReader> loadDatabaseReaders(Path geoIpConfigDirectory) throws IOException {
if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) {
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
}
Map<String, DatabaseReader> databaseReaders = new HashMap<>();
try (Stream<Path> databaseFiles = Files.list(geoIpConfigDirectory)) {
PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb");
// Use iterator instead of forEach otherwise IOException needs to be caught twice...
Iterator<Path> iterator = databaseFiles.iterator();
while (iterator.hasNext()) {
Path databasePath = iterator.next();
if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) {
try (InputStream inputStream = Files.newInputStream(databasePath, StandardOpenOption.READ)) {
databaseReaders.put(databasePath.getFileName().toString(), new DatabaseReader.Builder(inputStream).build());
}
}
}
}
return Collections.unmodifiableMap(databaseReaders);
}
}

View File

@ -19,11 +19,13 @@
package org.elasticsearch.ingest.geoip;
import com.maxmind.geoip2.DatabaseReader;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.StreamsUtils;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@ -40,19 +42,20 @@ import static org.hamcrest.Matchers.sameInstance;
public class GeoIpProcessorFactoryTests extends ESTestCase {
private Path configDir;
private static Map<String, DatabaseReader> databaseReaders;
@Before
public void prepareConfigDirectory() throws Exception {
this.configDir = createTempDir();
@BeforeClass
public static void loadDatabaseReaders() throws IOException {
Path configDir = createTempDir();
Path geoIpConfigDir = configDir.resolve("ingest-geoip");
Files.createDirectories(geoIpConfigDir);
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")), geoIpConfigDir.resolve("GeoLite2-City.mmdb"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")), geoIpConfigDir.resolve("GeoLite2-Country.mmdb"));
databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir);
}
public void testBuild_defaults() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
public void testBuildDefaults() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
Map<String, Object> config = new HashMap<>();
config.put("source_field", "_field");
@ -64,8 +67,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
assertThat(processor.getFields(), sameInstance(GeoIpProcessor.Factory.DEFAULT_FIELDS));
}
public void testBuild_targetField() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
public void testBuildTargetField() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
Map<String, Object> config = new HashMap<>();
config.put("source_field", "_field");
config.put("target_field", "_field");
@ -74,8 +77,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
assertThat(processor.getTargetField(), equalTo("_field"));
}
public void testBuild_dbFile() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
public void testBuildDbFile() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
Map<String, Object> config = new HashMap<>();
config.put("source_field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb");
@ -85,8 +88,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-Country"));
}
public void testBuild_nonExistingDbFile() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
public void testBuildNonExistingDbFile() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
Map<String, Object> config = new HashMap<>();
config.put("source_field", "_field");
@ -99,8 +102,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
}
}
public void testBuild_fields() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
public void testBuildFields() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
Set<GeoIpProcessor.Field> fields = EnumSet.noneOf(GeoIpProcessor.Field.class);
List<String> fieldNames = new ArrayList<>();
@ -118,8 +121,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
assertThat(processor.getFields(), equalTo(fields));
}
public void testBuild_illegalFieldOption() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(configDir);
public void testBuildIllegalFieldOption() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
Map<String, Object> config = new HashMap<>();
config.put("source_field", "_field");