[ingest] geo-ip performance improvements (#33029)

Re-implement the cache to avoid jackson JSON de-serialization for
every IP lookup. The built in maxmind cache caches JsonNode objects.
This requires de-serialization for every lookup, even if the object
is found in cache. Profiling shows that is very expensive (CPU).

The cache will now consist of the fully de-serialized objects.
Profiling shows that the new footprint for the CityDB is ~6KB per cache
entry. This may result in ~6MB increase with the 1000 entry default.
The  performance has been measured up to 40% faster on a modern 4 core/8 thread
CPU for an ingest (minimal indexing) workflow.

Further, the since prior implementation cached the JsonNode objects,
and there is not a 1:1 relationship between an IP lookup / JsonNode
object, the default cache size was most likely too small to be very
effective. While this change does not change the 1000 default cache
size, it will now cache more since there is now a 1:1 relationship between
an IP lookup and value in the cache.
This commit is contained in:
Jake Landis 2018-09-05 14:04:25 -07:00 committed by GitHub
parent ef1066d7f8
commit 6f9c9ab5e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 230 additions and 170 deletions

View File

@ -1,46 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.geoip;
import com.fasterxml.jackson.databind.JsonNode;
import com.maxmind.db.NodeCache;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
final class GeoIpCache implements NodeCache {
private final Cache<Integer, JsonNode> cache;
GeoIpCache(long maxSize) {
this.cache = CacheBuilder.<Integer, JsonNode>builder().setMaximumWeight(maxSize).build();
}
@Override
public JsonNode get(int key, Loader loader) throws IOException {
try {
return cache.computeIfAbsent(key, loader::load);
} catch (ExecutionException e) {
Throwable cause = e.getCause() != null ? e.getCause() : e;
throw new ElasticsearchException(cause);
}
}
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache;
import java.net.InetAddress;
import java.security.AccessController;
@ -66,14 +67,18 @@ public final class GeoIpProcessor extends AbstractProcessor {
private final DatabaseReader dbReader;
private final Set<Property> properties;
private final boolean ignoreMissing;
private final GeoIpCache cache;
GeoIpProcessor(String tag, String field, DatabaseReader dbReader, String targetField, Set<Property> properties, boolean ignoreMissing) {
GeoIpProcessor(String tag, String field, DatabaseReader dbReader, String targetField, Set<Property> properties, boolean ignoreMissing,
GeoIpCache cache) {
super(tag);
this.field = field;
this.targetField = targetField;
this.dbReader = dbReader;
this.properties = properties;
this.ignoreMissing = ignoreMissing;
this.cache = cache;
}
boolean isIgnoreMissing() {
@ -146,15 +151,16 @@ public final class GeoIpProcessor extends AbstractProcessor {
private Map<String, Object> retrieveCityGeoData(InetAddress ipAddress) {
SpecialPermission.check();
CityResponse response = AccessController.doPrivileged((PrivilegedAction<CityResponse>) () -> {
try {
return dbReader.city(ipAddress);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
CityResponse response = AccessController.doPrivileged((PrivilegedAction<CityResponse>) () ->
cache.putIfAbsent(ipAddress, CityResponse.class, ip -> {
try {
return dbReader.city(ip);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}));
Country country = response.getCountry();
City city = response.getCity();
@ -231,15 +237,16 @@ public final class GeoIpProcessor extends AbstractProcessor {
private Map<String, Object> retrieveCountryGeoData(InetAddress ipAddress) {
SpecialPermission.check();
CountryResponse response = AccessController.doPrivileged((PrivilegedAction<CountryResponse>) () -> {
try {
return dbReader.country(ipAddress);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
CountryResponse response = AccessController.doPrivileged((PrivilegedAction<CountryResponse>) () ->
cache.putIfAbsent(ipAddress, CountryResponse.class, ip -> {
try {
return dbReader.country(ip);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}));
Country country = response.getCountry();
Continent continent = response.getContinent();
@ -275,15 +282,16 @@ public final class GeoIpProcessor extends AbstractProcessor {
private Map<String, Object> retrieveAsnGeoData(InetAddress ipAddress) {
SpecialPermission.check();
AsnResponse response = AccessController.doPrivileged((PrivilegedAction<AsnResponse>) () -> {
try {
return dbReader.asn(ipAddress);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
AsnResponse response = AccessController.doPrivileged((PrivilegedAction<AsnResponse>) () ->
cache.putIfAbsent(ipAddress, AsnResponse.class, ip -> {
try {
return dbReader.asn(ip);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}));
Integer asn = response.getAutonomousSystemNumber();
String organization_name = response.getAutonomousSystemOrganization();
@ -322,9 +330,11 @@ public final class GeoIpProcessor extends AbstractProcessor {
);
private final Map<String, DatabaseReaderLazyLoader> databaseReaders;
private final GeoIpCache cache;
public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders) {
public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders, GeoIpCache cache) {
this.databaseReaders = databaseReaders;
this.cache = cache;
}
@Override
@ -368,14 +378,15 @@ public final class GeoIpProcessor extends AbstractProcessor {
}
}
return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, properties, ignoreMissing);
return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, properties, ignoreMissing, cache);
}
}
// Geoip2's AddressNotFoundException is checked and due to the fact that we need run their code
// inside a PrivilegedAction code block, we are forced to catch any checked exception and rethrow
// it with an unchecked exception.
private static final class AddressNotFoundRuntimeException extends RuntimeException {
//package private for testing
static final class AddressNotFoundRuntimeException extends RuntimeException {
AddressNotFoundRuntimeException(Throwable cause) {
super(cause);

View File

@ -23,16 +23,20 @@ import com.maxmind.db.NoCache;
import com.maxmind.db.NodeCache;
import com.maxmind.db.Reader;
import com.maxmind.geoip2.DatabaseReader;
import org.elasticsearch.core.internal.io.IOUtils;
import com.maxmind.geoip2.model.AbstractResponse;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
@ -42,6 +46,8 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Stream;
public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable {
@ -61,24 +67,18 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable
throw new IllegalStateException("getProcessors called twice for geoip plugin!!");
}
Path geoIpConfigDirectory = parameters.env.configFile().resolve("ingest-geoip");
NodeCache cache;
long cacheSize = CACHE_SIZE.get(parameters.env.settings());
if (cacheSize > 0) {
cache = new GeoIpCache(cacheSize);
} else {
cache = NoCache.getInstance();
}
try {
databaseReaders = loadDatabaseReaders(geoIpConfigDirectory, cache);
databaseReaders = loadDatabaseReaders(geoIpConfigDirectory);
} catch (IOException e) {
throw new RuntimeException(e);
}
return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders));
return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(cacheSize)));
}
static Map<String, DatabaseReaderLazyLoader> loadDatabaseReaders(Path geoIpConfigDirectory, NodeCache cache) throws IOException {
static Map<String, DatabaseReaderLazyLoader> loadDatabaseReaders(Path geoIpConfigDirectory) throws IOException {
if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) {
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
}
boolean loadDatabaseOnHeap = Booleans.parseBoolean(System.getProperty("es.geoip.load_db_on_heap", "false"));
Map<String, DatabaseReaderLazyLoader> databaseReaders = new HashMap<>();
@ -92,7 +92,7 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable
String databaseFileName = databasePath.getFileName().toString();
DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader(databaseFileName,
() -> {
DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(cache);
DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(NoCache.getInstance());
if (loadDatabaseOnHeap) {
builder.fileMode(Reader.FileMode.MEMORY);
} else {
@ -119,4 +119,75 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable
}
}
/**
* The in-memory cache for the geoip data. There should only be 1 instance of this class..
* This cache differs from the maxmind's {@link NodeCache} such that this cache stores the deserialized Json objects to avoid the
* cost of deserialization for each lookup (cached or not). This comes at slight expense of higher memory usage, but significant
* reduction of CPU usage.
*/
static class GeoIpCache {
private final Cache<CacheKey, AbstractResponse> cache;
//package private for testing
GeoIpCache(long maxSize) {
if (maxSize < 0) {
throw new IllegalArgumentException("geoip max cache size must be 0 or greater");
}
this.cache = CacheBuilder.<CacheKey, AbstractResponse>builder().setMaximumWeight(maxSize).build();
}
<T extends AbstractResponse> T putIfAbsent(InetAddress ip, Class<T> responseType,
Function<InetAddress, AbstractResponse> retrieveFunction) {
//can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader)
CacheKey<T> cacheKey = new CacheKey<>(ip, responseType);
//intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
AbstractResponse response = cache.get(cacheKey);
if (response == null) {
response = retrieveFunction.apply(ip);
cache.put(cacheKey, response);
}
return responseType.cast(response);
}
//only useful for testing
<T extends AbstractResponse> T get(InetAddress ip, Class<T> responseType) {
CacheKey<T> cacheKey = new CacheKey<>(ip, responseType);
return responseType.cast(cache.get(cacheKey));
}
/**
* The key to use for the cache. Since this cache can span multiple geoip processors that all use different databases, the response
* type is needed to be included in the cache key. For example, if we only used the IP address as the key the City and ASN the same
* IP may be in both with different values and we need to cache both. The response type scopes the IP to the correct database
* provides a means to safely cast the return objects.
* @param <T> The AbstractResponse type used to scope the key and cast the result.
*/
private static class CacheKey<T extends AbstractResponse> {
private final InetAddress ip;
private final Class<T> responseType;
private CacheKey(InetAddress ip, Class<T> responseType) {
this.ip = ip;
this.responseType = responseType;
}
//generated
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CacheKey<?> cacheKey = (CacheKey<?>) o;
return Objects.equals(ip, cacheKey.ip) &&
Objects.equals(responseType, cacheKey.responseType);
}
//generated
@Override
public int hashCode() {
return Objects.hash(ip, responseType);
}
}
}
}

View File

@ -1,51 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.geoip;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.maxmind.db.NodeCache;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.test.ESTestCase;
public class GeoIpCacheTests extends ESTestCase {
public void testCachesAndEvictsResults() throws Exception {
GeoIpCache cache = new GeoIpCache(1);
final NodeCache.Loader loader = key -> new IntNode(key);
JsonNode jsonNode1 = cache.get(1, loader);
assertSame(jsonNode1, cache.get(1, loader));
// evict old key by adding another value
cache.get(2, loader);
assertNotSame(jsonNode1, cache.get(1, loader));
}
public void testThrowsElasticsearchException() throws Exception {
GeoIpCache cache = new GeoIpCache(1);
NodeCache.Loader loader = (int key) -> {
throw new IllegalArgumentException("Illegal key");
};
ElasticsearchException ex = expectThrows(ElasticsearchException.class, () -> cache.get(1, loader));
assertTrue("Expected cause to be of type IllegalArgumentException but was [" + ex.getCause().getClass() + "]",
ex.getCause() instanceof IllegalArgumentException);
assertEquals("Illegal key", ex.getCause().getMessage());
}
}

View File

@ -20,11 +20,10 @@
package org.elasticsearch.ingest.geoip;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.maxmind.db.NoCache;
import com.maxmind.db.NodeCache;
import org.apache.lucene.util.Constants;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.StreamsUtils;
import org.junit.AfterClass;
@ -69,8 +68,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")),
geoIpConfigDir.resolve("GeoLite2-ASN.mmdb"));
NodeCache cache = randomFrom(NoCache.getInstance(), new GeoIpCache(randomNonNegativeLong()));
databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir, cache);
databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir);
}
@AfterClass
@ -92,7 +90,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
// This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected.
// As a consequence, the corresponding file appears to be still in use and Windows cannot delete it.
assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000));
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
@ -111,7 +109,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
// This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected.
// As a consequence, the corresponding file appears to be still in use and Windows cannot delete it.
assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000));
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
@ -131,7 +129,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
// This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected.
// As a consequence, the corresponding file appears to be still in use and Windows cannot delete it.
assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000));
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
@ -152,7 +150,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
// This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected.
// As a consequence, the corresponding file appears to be still in use and Windows cannot delete it.
assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000));
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
@ -173,7 +171,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
// This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected.
// As a consequence, the corresponding file appears to be still in use and Windows cannot delete it.
assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000));
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("target_field", "_field");
@ -187,7 +185,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
// This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected.
// As a consequence, the corresponding file appears to be still in use and Windows cannot delete it.
assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000));
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb");
@ -203,7 +201,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
// This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected.
// As a consequence, the corresponding file appears to be still in use and Windows cannot delete it.
assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000));
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb");
@ -220,7 +218,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
// This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected.
// As a consequence, the corresponding file appears to be still in use and Windows cannot delete it.
assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000));
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-ASN.mmdb");
@ -237,7 +235,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
// This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected.
// As a consequence, the corresponding file appears to be still in use and Windows cannot delete it.
assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000));
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
@ -250,7 +248,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
// This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected.
// As a consequence, the corresponding file appears to be still in use and Windows cannot delete it.
assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000));
Set<GeoIpProcessor.Property> properties = EnumSet.noneOf(GeoIpProcessor.Property.class);
List<String> fieldNames = new ArrayList<>();
@ -277,7 +275,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
// This test uses a MappedByteBuffer which will keep the file mappings active until it is garbage-collected.
// As a consequence, the corresponding file appears to be still in use and Windows cannot delete it.
assumeFalse("windows deletion behavior is asinine", Constants.WINDOWS);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000));
Map<String, Object> config1 = new HashMap<>();
config1.put("field", "_field");
@ -311,8 +309,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
// database readers used at class level are reused between tests. (we want to keep that otherwise running this
// test will take roughly 4 times more time)
Map<String, DatabaseReaderLazyLoader> databaseReaders =
IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir, NoCache.getInstance());
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(1000));
for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) {
assertNull(lazyLoader.databaseReader.get());
}

View File

@ -20,8 +20,9 @@
package org.elasticsearch.ingest.geoip;
import com.maxmind.geoip2.DatabaseReader;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache;
import org.elasticsearch.test.ESTestCase;
import java.io.InputStream;
@ -40,7 +41,8 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testCity() throws Exception {
InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb");
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false);
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
Map<String, Object> document = new HashMap<>();
document.put("source_field", "8.8.8.8");
@ -64,7 +66,8 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testNullValueWithIgnoreMissing() throws Exception {
InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb");
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true);
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true,
new GeoIpCache(1000));
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(),
Collections.singletonMap("source_field", null));
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
@ -75,7 +78,8 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testNonExistentWithIgnoreMissing() throws Exception {
InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb");
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true);
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true,
new GeoIpCache(1000));
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
processor.execute(ingestDocument);
@ -85,7 +89,8 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testNullWithoutIgnoreMissing() throws Exception {
InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb");
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false);
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(),
Collections.singletonMap("source_field", null));
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
@ -96,7 +101,8 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testNonExistentWithoutIgnoreMissing() throws Exception {
InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb");
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false);
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
Exception exception = expectThrows(Exception.class, () -> processor.execute(ingestDocument));
@ -106,7 +112,8 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testCity_withIpV6() throws Exception {
InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb");
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false);
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
String address = "2602:306:33d3:8000::3257:9652";
Map<String, Object> document = new HashMap<>();
@ -135,7 +142,8 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testCityWithMissingLocation() throws Exception {
InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb");
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false);
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
Map<String, Object> document = new HashMap<>();
document.put("source_field", "80.231.5.0");
@ -152,7 +160,8 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testCountry() throws Exception {
InputStream database = getDatabaseFileInputStream("/GeoLite2-Country.mmdb");
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false);
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
Map<String, Object> document = new HashMap<>();
document.put("source_field", "82.170.213.79");
@ -172,7 +181,8 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testCountryWithMissingLocation() throws Exception {
InputStream database = getDatabaseFileInputStream("/GeoLite2-Country.mmdb");
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false);
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
Map<String, Object> document = new HashMap<>();
document.put("source_field", "80.231.5.0");
@ -190,7 +200,8 @@ public class GeoIpProcessorTests extends ESTestCase {
String ip = "82.170.213.79";
InputStream database = getDatabaseFileInputStream("/GeoLite2-ASN.mmdb");
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false);
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
Map<String, Object> document = new HashMap<>();
document.put("source_field", ip);
@ -209,7 +220,8 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testAddressIsNotInTheDatabase() throws Exception {
InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb");
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false);
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
Map<String, Object> document = new HashMap<>();
document.put("source_field", "127.0.0.1");
@ -222,7 +234,8 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testInvalid() throws Exception {
InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb");
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false);
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
Map<String, Object> document = new HashMap<>();
document.put("source_field", "www.google.com");

View File

@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.geoip;
import com.maxmind.geoip2.model.AbstractResponse;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache;
import org.elasticsearch.test.ESTestCase;
import static org.mockito.Mockito.mock;
public class IngestGeoIpPluginTests extends ESTestCase {
public void testCachesAndEvictsResults() {
GeoIpCache cache = new GeoIpCache(1);
AbstractResponse response1 = mock(AbstractResponse.class);
AbstractResponse response2 = mock(AbstractResponse.class);
//add a key
AbstractResponse cachedResponse = cache.putIfAbsent(InetAddresses.forString("127.0.0.1"), AbstractResponse.class, ip -> response1);
assertSame(cachedResponse, response1);
assertSame(cachedResponse, cache.putIfAbsent(InetAddresses.forString("127.0.0.1"), AbstractResponse.class, ip -> response1));
assertSame(cachedResponse, cache.get(InetAddresses.forString("127.0.0.1"), AbstractResponse.class));
// evict old key by adding another value
cachedResponse = cache.putIfAbsent(InetAddresses.forString("127.0.0.2"), AbstractResponse.class, ip -> response2);
assertSame(cachedResponse, response2);
assertSame(cachedResponse, cache.putIfAbsent(InetAddresses.forString("127.0.0.2"), AbstractResponse.class, ip -> response2));
assertSame(cachedResponse, cache.get(InetAddresses.forString("127.0.0.2"), AbstractResponse.class));
assertNotSame(response1, cache.get(InetAddresses.forString("127.0.0.1"), AbstractResponse.class));
}
public void testThrowsFunctionsException() {
GeoIpCache cache = new GeoIpCache(1);
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> cache.putIfAbsent(InetAddresses.forString("127.0.0.1"), AbstractResponse.class,
ip -> { throw new IllegalArgumentException("bad"); }));
assertEquals("bad", ex.getMessage());
}
public void testInvalidInit() {
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new GeoIpCache(-1));
assertEquals("geoip max cache size must be 0 or greater", ex.getMessage());
}
}