diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java new file mode 100644 index 00000000000..f73d2ca13c1 --- /dev/null +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java @@ -0,0 +1,62 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.ingest.geoip; + +import com.maxmind.geoip2.DatabaseReader; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.logging.Loggers; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Facilitates lazy loading of the database reader, so that when the geoip plugin is installed, but not used, + * no memory is being wasted on the database reader. + */ +final class DatabaseReaderLazyLoader implements Closeable { + + private static final Logger LOGGER = Loggers.getLogger(DatabaseReaderLazyLoader.class); + + private final String databaseFileName; + private final CheckedSupplier loader; + // package protected for testing only: + final SetOnce databaseReader; + + DatabaseReaderLazyLoader(String databaseFileName, CheckedSupplier loader) { + this.databaseFileName = databaseFileName; + this.loader = loader; + this.databaseReader = new SetOnce<>(); + } + + synchronized DatabaseReader get() throws IOException { + if (databaseReader.get() == null) { + databaseReader.set(loader.get()); + LOGGER.debug("Loaded [{}] geoip database", databaseFileName); + } + return databaseReader.get(); + } + + @Override + public synchronized void close() throws IOException { + IOUtils.close(databaseReader.get()); + } +} diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index 3d1418dc940..2cbaa7a3bb1 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -19,19 +19,6 @@ package org.elasticsearch.ingest.geoip; -import java.io.IOException; -import java.net.InetAddress; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.Arrays; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; - import com.maxmind.geoip2.DatabaseReader; import com.maxmind.geoip2.exception.AddressNotFoundException; import com.maxmind.geoip2.model.CityResponse; @@ -49,6 +36,19 @@ import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; +import java.io.IOException; +import java.net.InetAddress; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty; import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalList; @@ -264,9 +264,9 @@ public final class GeoIpProcessor extends AbstractProcessor { ); static final Set DEFAULT_COUNTRY_PROPERTIES = EnumSet.of(Property.CONTINENT_NAME, Property.COUNTRY_ISO_CODE); - private final Map databaseReaders; + private final Map databaseReaders; - public Factory(Map databaseReaders) { + public Factory(Map databaseReaders) { this.databaseReaders = databaseReaders; } @@ -279,12 +279,13 @@ public final class GeoIpProcessor extends AbstractProcessor { List propertyNames = readOptionalList(TYPE, processorTag, config, "properties"); boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); - DatabaseReader databaseReader = databaseReaders.get(databaseFile); - if (databaseReader == null) { + DatabaseReaderLazyLoader lazyLoader = databaseReaders.get(databaseFile); + if (lazyLoader == null) { throw newConfigurationException(TYPE, processorTag, "database_file", "database file [" + databaseFile + "] doesn't exist"); } + DatabaseReader databaseReader = lazyLoader.get(); String databaseType = databaseReader.getMetadata().getDatabaseType(); final Set properties; diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index 4e5cc5c0237..1571bc99ea4 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -19,6 +19,15 @@ package org.elasticsearch.ingest.geoip; +import com.maxmind.db.NoCache; +import com.maxmind.db.NodeCache; +import com.maxmind.geoip2.DatabaseReader; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.plugins.IngestPlugin; +import org.elasticsearch.plugins.Plugin; + import java.io.Closeable; import java.io.IOException; import java.io.InputStream; @@ -35,20 +44,11 @@ import java.util.Map; import java.util.stream.Stream; import java.util.zip.GZIPInputStream; -import com.maxmind.db.NoCache; -import com.maxmind.db.NodeCache; -import com.maxmind.geoip2.DatabaseReader; -import org.apache.lucene.util.IOUtils; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.ingest.Processor; -import org.elasticsearch.plugins.IngestPlugin; -import org.elasticsearch.plugins.Plugin; - public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable { public static final Setting CACHE_SIZE = Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope); - private Map databaseReaders; + private Map databaseReaders; @Override public List> getSettings() { @@ -76,12 +76,12 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders)); } - static Map loadDatabaseReaders(Path geoIpConfigDirectory, NodeCache cache) throws IOException { + static Map loadDatabaseReaders(Path geoIpConfigDirectory, NodeCache cache) throws IOException { if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) { throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist"); } - Map databaseReaders = new HashMap<>(); + Map databaseReaders = new HashMap<>(); try (Stream databaseFiles = Files.list(geoIpConfigDirectory)) { PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb.gz"); // Use iterator instead of forEach otherwise IOException needs to be caught twice... @@ -89,10 +89,13 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable while (iterator.hasNext()) { Path databasePath = iterator.next(); if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) { - try (InputStream inputStream = new GZIPInputStream(Files.newInputStream(databasePath, StandardOpenOption.READ))) { - databaseReaders.put(databasePath.getFileName().toString(), - new DatabaseReader.Builder(inputStream).withCache(cache).build()); - } + String databaseFileName = databasePath.getFileName().toString(); + DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader(databaseFileName, () -> { + try (InputStream inputStream = new GZIPInputStream(Files.newInputStream(databasePath, StandardOpenOption.READ))) { + return new DatabaseReader.Builder(inputStream).withCache(cache).build(); + } + }); + databaseReaders.put(databaseFileName, holder); } } } diff --git a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java index 0c80bcc71fd..8db0d15f796 100644 --- a/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java +++ b/plugins/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java @@ -22,7 +22,6 @@ package org.elasticsearch.ingest.geoip; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.maxmind.db.NoCache; import com.maxmind.db.NodeCache; -import com.maxmind.geoip2.DatabaseReader; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Randomness; import org.elasticsearch.test.ESTestCase; @@ -48,7 +47,7 @@ import static org.hamcrest.Matchers.sameInstance; public class GeoIpProcessorFactoryTests extends ESTestCase { - private static Map databaseReaders; + private static Map databaseReaders; @BeforeClass public static void loadDatabaseReaders() throws IOException { @@ -66,7 +65,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { @AfterClass public static void closeDatabaseReaders() throws IOException { - for (DatabaseReader reader : databaseReaders.values()) { + for (DatabaseReaderLazyLoader reader : databaseReaders.values()) { reader.close(); } databaseReaders = null; @@ -222,4 +221,37 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { assertThat(e.getMessage(), equalTo("[properties] property isn't a list, but of type [java.lang.String]")); } } + + public void testLazyLoading() throws Exception { + Path configDir = createTempDir(); + Path geoIpConfigDir = configDir.resolve("ingest-geoip"); + Files.createDirectories(geoIpConfigDir); + Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb.gz")), + geoIpConfigDir.resolve("GeoLite2-City.mmdb.gz")); + Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb.gz")), + geoIpConfigDir.resolve("GeoLite2-Country.mmdb.gz")); + + // Loading another database reader instances, because otherwise we can't test lazy loading as the the + // database readers used at class level are reused between tests. (we want to keep that otherwise running this + // test will take roughly 4 times more time) + Map databaseReaders = + IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir, NoCache.getInstance()); + GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); + for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) { + assertNull(lazyLoader.databaseReader.get()); + } + + Map config = new HashMap<>(); + config.put("field", "_field"); + config.put("database_file", "GeoLite2-City.mmdb.gz"); + factory.create(null, "_tag", config); + config = new HashMap<>(); + config.put("field", "_field"); + config.put("database_file", "GeoLite2-Country.mmdb.gz"); + factory.create(null, "_tag", config); + + for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) { + assertNotNull(lazyLoader.databaseReader.get()); + } + } }