Cache results of geoip lookups (#22231)
With this commit, we introduce a cache to the geoip ingest processor. The cache is enabled by default and caches the 1000 most recent items. The cache size is controlled by the setting `ingest.geoip.cache_size`. Closes #22074
This commit is contained in:
parent
b2aaeb56f3
commit
655a95a2bb
|
@ -203,3 +203,14 @@ Which returns:
|
|||
}
|
||||
--------------------------------------------------
|
||||
// TESTRESPONSE
|
||||
|
||||
[[ingest-geoip-settings]]
|
||||
===== Node Settings
|
||||
|
||||
The geoip processor supports the following setting:
|
||||
|
||||
`ingest.geoip.cache_size`::
|
||||
|
||||
The maximum number of results that should be cached. Defaults to `1000`.
|
||||
|
||||
Note that these settings are node settings and apply to all geoip processors, i.e. there is one cache for all defined geoip processors.
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.ingest.geoip;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.maxmind.db.NodeCache;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.cache.Cache;
|
||||
import org.elasticsearch.common.cache.CacheBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
final class GeoIpCache implements NodeCache {
|
||||
private final Cache<Integer, JsonNode> cache;
|
||||
|
||||
GeoIpCache(long maxSize) {
|
||||
this.cache = CacheBuilder.<Integer, JsonNode>builder().setMaximumWeight(maxSize).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JsonNode get(int key, Loader loader) throws IOException {
|
||||
try {
|
||||
return cache.computeIfAbsent(key, loader::load);
|
||||
} catch (ExecutionException e) {
|
||||
Throwable cause = e.getCause() != null ? e.getCause() : e;
|
||||
throw new ElasticsearchException(cause);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -26,38 +26,57 @@ import java.nio.file.Files;
|
|||
import java.nio.file.Path;
|
||||
import java.nio.file.PathMatcher;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
import com.maxmind.db.NoCache;
|
||||
import com.maxmind.db.NodeCache;
|
||||
import com.maxmind.geoip2.DatabaseReader;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.ingest.Processor;
|
||||
import org.elasticsearch.plugins.IngestPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
||||
public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable {
|
||||
public static final Setting<Long> CACHE_SIZE =
|
||||
Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope);
|
||||
|
||||
private Map<String, DatabaseReader> databaseReaders;
|
||||
|
||||
@Override
|
||||
public List<Setting<?>> getSettings() {
|
||||
return Arrays.asList(CACHE_SIZE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
|
||||
if (databaseReaders != null) {
|
||||
throw new IllegalStateException("getProcessors called twice for geoip plugin!!");
|
||||
}
|
||||
Path geoIpConfigDirectory = parameters.env.configFile().resolve("ingest-geoip");
|
||||
NodeCache cache;
|
||||
long cacheSize = CACHE_SIZE.get(parameters.env.settings());
|
||||
if (cacheSize > 0) {
|
||||
cache = new GeoIpCache(cacheSize);
|
||||
} else {
|
||||
cache = NoCache.getInstance();
|
||||
}
|
||||
try {
|
||||
databaseReaders = loadDatabaseReaders(geoIpConfigDirectory);
|
||||
databaseReaders = loadDatabaseReaders(geoIpConfigDirectory, cache);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders));
|
||||
}
|
||||
|
||||
static Map<String, DatabaseReader> loadDatabaseReaders(Path geoIpConfigDirectory) throws IOException {
|
||||
static Map<String, DatabaseReader> loadDatabaseReaders(Path geoIpConfigDirectory, NodeCache cache) throws IOException {
|
||||
if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) {
|
||||
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
|
||||
}
|
||||
|
@ -71,7 +90,8 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable
|
|||
Path databasePath = iterator.next();
|
||||
if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) {
|
||||
try (InputStream inputStream = new GZIPInputStream(Files.newInputStream(databasePath, StandardOpenOption.READ))) {
|
||||
databaseReaders.put(databasePath.getFileName().toString(), new DatabaseReader.Builder(inputStream).build());
|
||||
databaseReaders.put(databasePath.getFileName().toString(),
|
||||
new DatabaseReader.Builder(inputStream).withCache(cache).build());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -85,4 +105,5 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable
|
|||
IOUtils.close(databaseReaders.values());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.ingest.geoip;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.IntNode;
|
||||
import com.maxmind.db.NodeCache;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
public class GeoIpCacheTests extends ESTestCase {
|
||||
public void testCachesAndEvictsResults() throws Exception {
|
||||
GeoIpCache cache = new GeoIpCache(1);
|
||||
final NodeCache.Loader loader = key -> new IntNode(key);
|
||||
|
||||
JsonNode jsonNode1 = cache.get(1, loader);
|
||||
assertSame(jsonNode1, cache.get(1, loader));
|
||||
|
||||
// evict old key by adding another value
|
||||
cache.get(2, loader);
|
||||
|
||||
assertNotSame(jsonNode1, cache.get(1, loader));
|
||||
}
|
||||
|
||||
public void testThrowsElasticsearchException() throws Exception {
|
||||
GeoIpCache cache = new GeoIpCache(1);
|
||||
NodeCache.Loader loader = (int key) -> {
|
||||
throw new IllegalArgumentException("Illegal key");
|
||||
};
|
||||
ElasticsearchException ex = expectThrows(ElasticsearchException.class, () -> cache.get(1, loader));
|
||||
assertTrue("Expected cause to be of type IllegalArgumentException but was [" + ex.getCause().getClass() + "]",
|
||||
ex.getCause() instanceof IllegalArgumentException);
|
||||
assertEquals("Illegal key", ex.getCause().getMessage());
|
||||
}
|
||||
}
|
|
@ -20,6 +20,8 @@
|
|||
package org.elasticsearch.ingest.geoip;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||
import com.maxmind.db.NoCache;
|
||||
import com.maxmind.db.NodeCache;
|
||||
import com.maxmind.geoip2.DatabaseReader;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.common.Randomness;
|
||||
|
@ -57,7 +59,9 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
|
|||
geoIpConfigDir.resolve("GeoLite2-City.mmdb.gz"));
|
||||
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb.gz")),
|
||||
geoIpConfigDir.resolve("GeoLite2-Country.mmdb.gz"));
|
||||
databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir);
|
||||
|
||||
NodeCache cache = randomFrom(NoCache.getInstance(), new GeoIpCache(randomPositiveLong()));
|
||||
databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir, cache);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
Loading…
Reference in New Issue