[INGEST] Lazy load the geoip databases.
Load the geoip database the first time a pipeline gets created that has a geoip processor. This saves memory (measured ~150MB for the city db) in cases when the plugin is installed, but not used.
This commit is contained in:
parent
57b5d1d29b
commit
211d50f7b8
|
@ -0,0 +1,62 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.ingest.geoip;
|
||||||
|
|
||||||
|
import com.maxmind.geoip2.DatabaseReader;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.apache.lucene.util.IOUtils;
|
||||||
|
import org.apache.lucene.util.SetOnce;
|
||||||
|
import org.elasticsearch.common.CheckedSupplier;
|
||||||
|
import org.elasticsearch.common.logging.Loggers;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Facilitates lazy loading of the database reader, so that when the geoip plugin is installed, but not used,
|
||||||
|
* no memory is being wasted on the database reader.
|
||||||
|
*/
|
||||||
|
final class DatabaseReaderLazyLoader implements Closeable {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = Loggers.getLogger(DatabaseReaderLazyLoader.class);
|
||||||
|
|
||||||
|
private final String databaseFileName;
|
||||||
|
private final CheckedSupplier<DatabaseReader, IOException> loader;
|
||||||
|
// package protected for testing only:
|
||||||
|
final SetOnce<DatabaseReader> databaseReader;
|
||||||
|
|
||||||
|
DatabaseReaderLazyLoader(String databaseFileName, CheckedSupplier<DatabaseReader, IOException> loader) {
|
||||||
|
this.databaseFileName = databaseFileName;
|
||||||
|
this.loader = loader;
|
||||||
|
this.databaseReader = new SetOnce<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized DatabaseReader get() throws IOException {
|
||||||
|
if (databaseReader.get() == null) {
|
||||||
|
databaseReader.set(loader.get());
|
||||||
|
LOGGER.debug("Loaded [{}] geoip database", databaseFileName);
|
||||||
|
}
|
||||||
|
return databaseReader.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void close() throws IOException {
|
||||||
|
IOUtils.close(databaseReader.get());
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,19 +19,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.ingest.geoip;
|
package org.elasticsearch.ingest.geoip;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.InetAddress;
|
|
||||||
import java.security.AccessController;
|
|
||||||
import java.security.PrivilegedAction;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Locale;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import com.maxmind.geoip2.DatabaseReader;
|
import com.maxmind.geoip2.DatabaseReader;
|
||||||
import com.maxmind.geoip2.exception.AddressNotFoundException;
|
import com.maxmind.geoip2.exception.AddressNotFoundException;
|
||||||
import com.maxmind.geoip2.model.CityResponse;
|
import com.maxmind.geoip2.model.CityResponse;
|
||||||
|
@ -49,6 +36,19 @@ import org.elasticsearch.ingest.AbstractProcessor;
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.Processor;
|
import org.elasticsearch.ingest.Processor;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.security.AccessController;
|
||||||
|
import java.security.PrivilegedAction;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
|
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
|
||||||
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
|
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
|
||||||
import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalList;
|
import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalList;
|
||||||
|
@ -264,9 +264,9 @@ public final class GeoIpProcessor extends AbstractProcessor {
|
||||||
);
|
);
|
||||||
static final Set<Property> DEFAULT_COUNTRY_PROPERTIES = EnumSet.of(Property.CONTINENT_NAME, Property.COUNTRY_ISO_CODE);
|
static final Set<Property> DEFAULT_COUNTRY_PROPERTIES = EnumSet.of(Property.CONTINENT_NAME, Property.COUNTRY_ISO_CODE);
|
||||||
|
|
||||||
private final Map<String, DatabaseReader> databaseReaders;
|
private final Map<String, DatabaseReaderLazyLoader> databaseReaders;
|
||||||
|
|
||||||
public Factory(Map<String, DatabaseReader> databaseReaders) {
|
public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders) {
|
||||||
this.databaseReaders = databaseReaders;
|
this.databaseReaders = databaseReaders;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -279,12 +279,13 @@ public final class GeoIpProcessor extends AbstractProcessor {
|
||||||
List<String> propertyNames = readOptionalList(TYPE, processorTag, config, "properties");
|
List<String> propertyNames = readOptionalList(TYPE, processorTag, config, "properties");
|
||||||
boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
|
boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
|
||||||
|
|
||||||
DatabaseReader databaseReader = databaseReaders.get(databaseFile);
|
DatabaseReaderLazyLoader lazyLoader = databaseReaders.get(databaseFile);
|
||||||
if (databaseReader == null) {
|
if (lazyLoader == null) {
|
||||||
throw newConfigurationException(TYPE, processorTag,
|
throw newConfigurationException(TYPE, processorTag,
|
||||||
"database_file", "database file [" + databaseFile + "] doesn't exist");
|
"database_file", "database file [" + databaseFile + "] doesn't exist");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DatabaseReader databaseReader = lazyLoader.get();
|
||||||
String databaseType = databaseReader.getMetadata().getDatabaseType();
|
String databaseType = databaseReader.getMetadata().getDatabaseType();
|
||||||
|
|
||||||
final Set<Property> properties;
|
final Set<Property> properties;
|
||||||
|
|
|
@ -19,6 +19,15 @@
|
||||||
|
|
||||||
package org.elasticsearch.ingest.geoip;
|
package org.elasticsearch.ingest.geoip;
|
||||||
|
|
||||||
|
import com.maxmind.db.NoCache;
|
||||||
|
import com.maxmind.db.NodeCache;
|
||||||
|
import com.maxmind.geoip2.DatabaseReader;
|
||||||
|
import org.apache.lucene.util.IOUtils;
|
||||||
|
import org.elasticsearch.common.settings.Setting;
|
||||||
|
import org.elasticsearch.ingest.Processor;
|
||||||
|
import org.elasticsearch.plugins.IngestPlugin;
|
||||||
|
import org.elasticsearch.plugins.Plugin;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
@ -35,20 +44,11 @@ import java.util.Map;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
import java.util.zip.GZIPInputStream;
|
import java.util.zip.GZIPInputStream;
|
||||||
|
|
||||||
import com.maxmind.db.NoCache;
|
|
||||||
import com.maxmind.db.NodeCache;
|
|
||||||
import com.maxmind.geoip2.DatabaseReader;
|
|
||||||
import org.apache.lucene.util.IOUtils;
|
|
||||||
import org.elasticsearch.common.settings.Setting;
|
|
||||||
import org.elasticsearch.ingest.Processor;
|
|
||||||
import org.elasticsearch.plugins.IngestPlugin;
|
|
||||||
import org.elasticsearch.plugins.Plugin;
|
|
||||||
|
|
||||||
public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable {
|
public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable {
|
||||||
public static final Setting<Long> CACHE_SIZE =
|
public static final Setting<Long> CACHE_SIZE =
|
||||||
Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope);
|
Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope);
|
||||||
|
|
||||||
private Map<String, DatabaseReader> databaseReaders;
|
private Map<String, DatabaseReaderLazyLoader> databaseReaders;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Setting<?>> getSettings() {
|
public List<Setting<?>> getSettings() {
|
||||||
|
@ -76,12 +76,12 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable
|
||||||
return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders));
|
return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders));
|
||||||
}
|
}
|
||||||
|
|
||||||
static Map<String, DatabaseReader> loadDatabaseReaders(Path geoIpConfigDirectory, NodeCache cache) throws IOException {
|
static Map<String, DatabaseReaderLazyLoader> loadDatabaseReaders(Path geoIpConfigDirectory, NodeCache cache) throws IOException {
|
||||||
if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) {
|
if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) {
|
||||||
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
|
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, DatabaseReader> databaseReaders = new HashMap<>();
|
Map<String, DatabaseReaderLazyLoader> databaseReaders = new HashMap<>();
|
||||||
try (Stream<Path> databaseFiles = Files.list(geoIpConfigDirectory)) {
|
try (Stream<Path> databaseFiles = Files.list(geoIpConfigDirectory)) {
|
||||||
PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb.gz");
|
PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb.gz");
|
||||||
// Use iterator instead of forEach otherwise IOException needs to be caught twice...
|
// Use iterator instead of forEach otherwise IOException needs to be caught twice...
|
||||||
|
@ -89,10 +89,13 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable
|
||||||
while (iterator.hasNext()) {
|
while (iterator.hasNext()) {
|
||||||
Path databasePath = iterator.next();
|
Path databasePath = iterator.next();
|
||||||
if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) {
|
if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) {
|
||||||
try (InputStream inputStream = new GZIPInputStream(Files.newInputStream(databasePath, StandardOpenOption.READ))) {
|
String databaseFileName = databasePath.getFileName().toString();
|
||||||
databaseReaders.put(databasePath.getFileName().toString(),
|
DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader(databaseFileName, () -> {
|
||||||
new DatabaseReader.Builder(inputStream).withCache(cache).build());
|
try (InputStream inputStream = new GZIPInputStream(Files.newInputStream(databasePath, StandardOpenOption.READ))) {
|
||||||
}
|
return new DatabaseReader.Builder(inputStream).withCache(cache).build();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
databaseReaders.put(databaseFileName, holder);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,6 @@ package org.elasticsearch.ingest.geoip;
|
||||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||||
import com.maxmind.db.NoCache;
|
import com.maxmind.db.NoCache;
|
||||||
import com.maxmind.db.NodeCache;
|
import com.maxmind.db.NodeCache;
|
||||||
import com.maxmind.geoip2.DatabaseReader;
|
|
||||||
import org.elasticsearch.ElasticsearchParseException;
|
import org.elasticsearch.ElasticsearchParseException;
|
||||||
import org.elasticsearch.common.Randomness;
|
import org.elasticsearch.common.Randomness;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
@ -48,7 +47,7 @@ import static org.hamcrest.Matchers.sameInstance;
|
||||||
|
|
||||||
public class GeoIpProcessorFactoryTests extends ESTestCase {
|
public class GeoIpProcessorFactoryTests extends ESTestCase {
|
||||||
|
|
||||||
private static Map<String, DatabaseReader> databaseReaders;
|
private static Map<String, DatabaseReaderLazyLoader> databaseReaders;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void loadDatabaseReaders() throws IOException {
|
public static void loadDatabaseReaders() throws IOException {
|
||||||
|
@ -66,7 +65,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void closeDatabaseReaders() throws IOException {
|
public static void closeDatabaseReaders() throws IOException {
|
||||||
for (DatabaseReader reader : databaseReaders.values()) {
|
for (DatabaseReaderLazyLoader reader : databaseReaders.values()) {
|
||||||
reader.close();
|
reader.close();
|
||||||
}
|
}
|
||||||
databaseReaders = null;
|
databaseReaders = null;
|
||||||
|
@ -222,4 +221,37 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
|
||||||
assertThat(e.getMessage(), equalTo("[properties] property isn't a list, but of type [java.lang.String]"));
|
assertThat(e.getMessage(), equalTo("[properties] property isn't a list, but of type [java.lang.String]"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testLazyLoading() throws Exception {
|
||||||
|
Path configDir = createTempDir();
|
||||||
|
Path geoIpConfigDir = configDir.resolve("ingest-geoip");
|
||||||
|
Files.createDirectories(geoIpConfigDir);
|
||||||
|
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb.gz")),
|
||||||
|
geoIpConfigDir.resolve("GeoLite2-City.mmdb.gz"));
|
||||||
|
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb.gz")),
|
||||||
|
geoIpConfigDir.resolve("GeoLite2-Country.mmdb.gz"));
|
||||||
|
|
||||||
|
// Loading another database reader instances, because otherwise we can't test lazy loading as the the
|
||||||
|
// database readers used at class level are reused between tests. (we want to keep that otherwise running this
|
||||||
|
// test will take roughly 4 times more time)
|
||||||
|
Map<String, DatabaseReaderLazyLoader> databaseReaders =
|
||||||
|
IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir, NoCache.getInstance());
|
||||||
|
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
|
||||||
|
for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) {
|
||||||
|
assertNull(lazyLoader.databaseReader.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Object> config = new HashMap<>();
|
||||||
|
config.put("field", "_field");
|
||||||
|
config.put("database_file", "GeoLite2-City.mmdb.gz");
|
||||||
|
factory.create(null, "_tag", config);
|
||||||
|
config = new HashMap<>();
|
||||||
|
config.put("field", "_field");
|
||||||
|
config.put("database_file", "GeoLite2-Country.mmdb.gz");
|
||||||
|
factory.create(null, "_tag", config);
|
||||||
|
|
||||||
|
for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) {
|
||||||
|
assertNotNull(lazyLoader.databaseReader.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue