diff --git a/plugins/ingest-geoip/build.gradle b/plugins/ingest-geoip/build.gradle index 387bc58c574..02f8e465c30 100644 --- a/plugins/ingest-geoip/build.gradle +++ b/plugins/ingest-geoip/build.gradle @@ -46,7 +46,7 @@ compileTestJava.options.compilerArgs << "-Xlint:-rawtypes,-unchecked" bundlePlugin { from("${project.buildDir}/ingest-geoip") { - into 'config/' + into '/' } } diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index a45b7d5e966..aad7051b8d1 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.Booleans; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.cache.Cache; import org.elasticsearch.common.cache.CacheBuilder; +import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ingest.Processor; @@ -54,6 +55,8 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable public static final Setting CACHE_SIZE = Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope); + static String[] DEFAULT_DATABASE_FILENAMES = new String[]{"GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"}; + private Map databaseReaders; @Override @@ -66,48 +69,89 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable if (databaseReaders != null) { throw new IllegalStateException("getProcessors called twice for geoip plugin!!"); } - Path geoIpConfigDirectory = parameters.env.configFile().resolve("ingest-geoip"); + final Path geoIpDirectory = getGeoIpDirectory(parameters); + final Path geoIpConfigDirectory = parameters.env.configFile().resolve("ingest-geoip"); long cacheSize = CACHE_SIZE.get(parameters.env.settings()); try { - databaseReaders = loadDatabaseReaders(geoIpConfigDirectory); + databaseReaders = loadDatabaseReaders(geoIpDirectory, geoIpConfigDirectory); } catch (IOException e) { throw new RuntimeException(e); } return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(cacheSize))); } - static Map loadDatabaseReaders(Path geoIpConfigDirectory) throws IOException { - if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) { - throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist"); + /* + * In GeoIpProcessorNonIngestNodeTests, ingest-geoip is loaded on the classpath. This means that the plugin is never unbundled into a + * directory where the database files would live. Therefore, we have to copy these database files ourselves. To do this, we need the + * ability to specify where those database files would go. We do this by adding a plugin that registers ingest.geoip.database_path as + * an actual setting. Otherwise, in production code, this setting is not registered and the database path is not configurable. + */ + @SuppressForbidden(reason = "PathUtils#get") + private Path getGeoIpDirectory(Processor.Parameters parameters) { + final Path geoIpDirectory; + if (parameters.env.settings().get("ingest.geoip.database_path") == null) { + geoIpDirectory = parameters.env.pluginsFile().resolve("ingest-geoip"); + } else { + geoIpDirectory = PathUtils.get(parameters.env.settings().get("ingest.geoip.database_path")); } - boolean loadDatabaseOnHeap = Booleans.parseBoolean(System.getProperty("es.geoip.load_db_on_heap", "false")); - Map databaseReaders = new HashMap<>(); - try (Stream databaseFiles = Files.list(geoIpConfigDirectory)) { - PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb"); - // Use iterator instead of forEach otherwise IOException needs to be caught twice... - Iterator iterator = databaseFiles.iterator(); - while (iterator.hasNext()) { - Path databasePath = iterator.next(); - if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) { - String databaseFileName = databasePath.getFileName().toString(); - DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader( - databasePath, - () -> { - DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(NoCache.getInstance()); - if (loadDatabaseOnHeap) { - builder.fileMode(Reader.FileMode.MEMORY); - } else { - builder.fileMode(Reader.FileMode.MEMORY_MAPPED); - } - return builder.build(); - }); - databaseReaders.put(databaseFileName, holder); + return geoIpDirectory; + } + + static Map loadDatabaseReaders(Path geoIpDirectory, Path geoIpConfigDirectory) throws IOException { + assertDatabaseExistence(geoIpDirectory, true); + assertDatabaseExistence(geoIpConfigDirectory, false); + final boolean loadDatabaseOnHeap = Booleans.parseBoolean(System.getProperty("es.geoip.load_db_on_heap", "false")); + final Map databaseReaders = new HashMap<>(); + + // load the default databases + for (final String databaseFilename : DEFAULT_DATABASE_FILENAMES) { + final Path databasePath = geoIpDirectory.resolve(databaseFilename); + final DatabaseReaderLazyLoader loader = createLoader(databasePath, loadDatabaseOnHeap); + databaseReaders.put(databaseFilename, loader); + } + + // load any custom databases + if (Files.exists(geoIpConfigDirectory)) { + try (Stream databaseFiles = Files.list(geoIpConfigDirectory)) { + PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb"); + // Use iterator instead of forEach otherwise IOException needs to be caught twice... + Iterator iterator = databaseFiles.iterator(); + while (iterator.hasNext()) { + Path databasePath = iterator.next(); + if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) { + String databaseFileName = databasePath.getFileName().toString(); + final DatabaseReaderLazyLoader loader = createLoader(databasePath, loadDatabaseOnHeap); + databaseReaders.put(databaseFileName, loader); + } } } } return Collections.unmodifiableMap(databaseReaders); } + private static DatabaseReaderLazyLoader createLoader(Path databasePath, boolean loadDatabaseOnHeap) { + return new DatabaseReaderLazyLoader( + databasePath, + () -> { + DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(NoCache.getInstance()); + if (loadDatabaseOnHeap) { + builder.fileMode(Reader.FileMode.MEMORY); + } else { + builder.fileMode(Reader.FileMode.MEMORY_MAPPED); + } + return builder.build(); + }); + } + + private static void assertDatabaseExistence(final Path path, final boolean exists) throws IOException { + for (final String database : DEFAULT_DATABASE_FILENAMES) { + if (Files.exists(path.resolve(database)) != exists) { + final String message = "expected database [" + database + "] to " + (exists ? "" : "not ") + "exist in [" + path + "]"; + throw new IOException(message); + } + } + } + @SuppressForbidden(reason = "Maxmind API requires java.io.File") private static DatabaseReader.Builder createDatabaseBuilder(Path databasePath) { return new DatabaseReader.Builder(databasePath.toFile()); diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index e4006d9eb10..a962e879397 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -44,7 +44,9 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.sameInstance; public class GeoIpProcessorFactoryTests extends ESTestCase { @@ -60,17 +62,13 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { return; } - Path configDir = createTempDir(); - Path geoIpConfigDir = configDir.resolve("ingest-geoip"); + final Path geoIpDir = createTempDir(); + final Path configDir = createTempDir(); + final Path geoIpConfigDir = configDir.resolve("ingest-geoip"); Files.createDirectories(geoIpConfigDir); - Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")), - geoIpConfigDir.resolve("GeoLite2-City.mmdb")); - Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")), - geoIpConfigDir.resolve("GeoLite2-Country.mmdb")); - Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")), - geoIpConfigDir.resolve("GeoLite2-ASN.mmdb")); + copyDatabaseFiles(geoIpDir); - databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir); + databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpDir, geoIpConfigDir); } @AfterClass @@ -297,21 +295,16 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { // This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected. // As a consequence, the corresponding file appears to be still in use and Windows cannot delete it. assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS); - Path configDir = createTempDir(); - Path geoIpConfigDir = configDir.resolve("ingest-geoip"); + final Path geoIpDir = createTempDir(); + final Path configDir = createTempDir(); + final Path geoIpConfigDir = configDir.resolve("ingest-geoip"); Files.createDirectories(geoIpConfigDir); - Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")), - geoIpConfigDir.resolve("GeoLite2-City.mmdb")); - Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")), - geoIpConfigDir.resolve("GeoLite2-Country.mmdb")); - Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")), - geoIpConfigDir.resolve("GeoLite2-ASN.mmdb")); + copyDatabaseFiles(geoIpDir); // Loading another database reader instances, because otherwise we can't test lazy loading as the // database readers used at class level are reused between tests. (we want to keep that otherwise running this // test will take roughly 4 times more time) - Map databaseReaders = - IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir); + Map databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpDir, geoIpConfigDir); GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) { assertNull(lazyLoader.databaseReader.get()); @@ -354,4 +347,79 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { assertNotNull(databaseReaders.get("GeoLite2-ASN.mmdb").databaseReader.get()); } + public void testLoadingCustomDatabase() throws IOException { + final Path geoIpDir = createTempDir(); + final Path configDir = createTempDir(); + final Path geoIpConfigDir = configDir.resolve("ingest-geoip"); + Files.createDirectories(geoIpConfigDir); + copyDatabaseFiles(geoIpDir); + // fake the GeoIP2-City database + copyDatabaseFile(geoIpConfigDir, "GeoLite2-City.mmdb"); + Files.move(geoIpConfigDir.resolve("GeoLite2-City.mmdb"), geoIpConfigDir.resolve("GeoIP2-City.mmdb")); + + /* + * Loading another database reader instances, because otherwise we can't test lazy loading as the database readers used at class + * level are reused between tests. (we want to keep that otherwise running this test will take roughly 4 times more time). + */ + final Map databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpDir, geoIpConfigDir); + final GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000)); + for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) { + assertNull(lazyLoader.databaseReader.get()); + } + + final Map field = Collections.singletonMap("_field", "1.1.1.1"); + final IngestDocument document = new IngestDocument("index", "type", "id", "routing", 1L, VersionType.EXTERNAL, field); + + Map config = new HashMap<>(); + config.put("field", "_field"); + config.put("database_file", "GeoIP2-City.mmdb"); + final GeoIpProcessor city = factory.create(null, "_tag", config); + + // these are lazy loaded until first use so we expect null here + assertNull(databaseReaders.get("GeoIP2-City.mmdb").databaseReader.get()); + city.execute(document); + // the first ingest should trigger a database load + assertNotNull(databaseReaders.get("GeoIP2-City.mmdb").databaseReader.get()); + } + + public void testDatabaseNotExistsInDir() throws IOException { + final Path geoIpDir = createTempDir(); + final Path configDir = createTempDir(); + final Path geoIpConfigDir = configDir.resolve("ingest-geoip"); + if (randomBoolean()) { + Files.createDirectories(geoIpConfigDir); + } + copyDatabaseFiles(geoIpDir); + final String databaseFilename = randomFrom(IngestGeoIpPlugin.DEFAULT_DATABASE_FILENAMES); + Files.delete(geoIpDir.resolve(databaseFilename)); + final IOException e = + expectThrows(IOException.class, () -> IngestGeoIpPlugin.loadDatabaseReaders(geoIpDir, geoIpConfigDir)); + assertThat(e, hasToString(containsString("expected database [" + databaseFilename + "] to exist in [" + geoIpDir + "]"))); + } + + public void testDatabaseExistsInConfigDir() throws IOException { + final Path geoIpDir = createTempDir(); + final Path configDir = createTempDir(); + final Path geoIpConfigDir = configDir.resolve("ingest-geoip"); + Files.createDirectories(geoIpConfigDir); + copyDatabaseFiles(geoIpDir); + final String databaseFilename = randomFrom(IngestGeoIpPlugin.DEFAULT_DATABASE_FILENAMES); + copyDatabaseFile(geoIpConfigDir, databaseFilename); + final IOException e = + expectThrows(IOException.class, () -> IngestGeoIpPlugin.loadDatabaseReaders(geoIpDir, geoIpConfigDir)); + assertThat(e, hasToString(containsString("expected database [" + databaseFilename + "] to not exist in [" + geoIpConfigDir + "]"))); + } + + private static void copyDatabaseFile(final Path path, final String databaseFilename) throws IOException { + Files.copy( + new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/" + databaseFilename)), + path.resolve(databaseFilename)); + } + + private static void copyDatabaseFiles(final Path path) throws IOException { + for (final String databaseFilename : IngestGeoIpPlugin.DEFAULT_DATABASE_FILENAMES) { + copyDatabaseFile(path, databaseFilename); + } + } + } diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeTests.java index 92fceab9eeb..9eb32767eb7 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; @@ -41,27 +42,30 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; public class GeoIpProcessorNonIngestNodeTests extends ESIntegTestCase { + public static class IngestGeoIpSettingsPlugin extends Plugin { + + @Override + public List> getSettings() { + return Collections.singletonList(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope)); + } + } + @Override protected Collection> nodePlugins() { - return Collections.singleton(IngestGeoIpPlugin.class); + return Arrays.asList(IngestGeoIpPlugin.class, IngestGeoIpSettingsPlugin.class); } @Override protected Settings nodeSettings(final int nodeOrdinal) { - return Settings.builder().put("node.ingest", false).put(super.nodeSettings(nodeOrdinal)).build(); - } - - @Override - protected Path nodeConfigPath(final int nodeOrdinal) { - final Path configPath = createTempDir(); + final Path databasePath = createTempDir(); try { - final Path databasePath = configPath.resolve("ingest-geoip"); Files.createDirectories(databasePath); Files.copy( new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")), @@ -75,7 +79,11 @@ public class GeoIpProcessorNonIngestNodeTests extends ESIntegTestCase { } catch (final IOException e) { throw new UncheckedIOException(e); } - return configPath; + return Settings.builder() + .put("ingest.geoip.database_path", databasePath) + .put("node.ingest", false) + .put(super.nodeSettings(nodeOrdinal)) + .build(); } /**