Removed the lazy cache in DatabaseReaderService and eagerly build all available databases.
This commit is contained in:
parent
5e07644788
commit
270a3977bc
|
@ -1,51 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest.processor.geoip;
|
||||
|
||||
import com.maxmind.geoip2.DatabaseReader;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
final class DatabaseReaderService implements Closeable {
|
||||
|
||||
private final Map<String, DatabaseReader> databaseReaders = new HashMap<>();
|
||||
|
||||
synchronized DatabaseReader getOrCreateDatabaseReader(String key, InputStream inputStream) throws IOException {
|
||||
DatabaseReader databaseReader = databaseReaders.get(key);
|
||||
if (databaseReader != null) {
|
||||
return databaseReader;
|
||||
}
|
||||
|
||||
databaseReader = new DatabaseReader.Builder(inputStream).build();
|
||||
databaseReaders.put(key, databaseReader);
|
||||
return databaseReader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
for (DatabaseReader databaseReader : databaseReaders.values()) {
|
||||
databaseReader.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -24,6 +24,7 @@ import com.maxmind.geoip2.exception.AddressNotFoundException;
|
|||
import com.maxmind.geoip2.model.CityResponse;
|
||||
import com.maxmind.geoip2.model.CountryResponse;
|
||||
import com.maxmind.geoip2.record.*;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.SpecialPermission;
|
||||
import org.elasticsearch.common.network.NetworkAddress;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
|
@ -40,8 +41,10 @@ import java.nio.file.StandardOpenOption;
|
|||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.*;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.elasticsearch.ingest.processor.ConfigurationUtils.readList;
|
||||
import static org.elasticsearch.ingest.processor.ConfigurationUtils.readOptionalList;
|
||||
import static org.elasticsearch.ingest.processor.ConfigurationUtils.readStringProperty;
|
||||
|
||||
public final class GeoIpProcessor implements Processor {
|
||||
|
@ -218,21 +221,41 @@ public final class GeoIpProcessor implements Processor {
|
|||
Field.CONTINENT_NAME, Field.COUNTRY_ISO_CODE, Field.REGION_NAME, Field.CITY_NAME, Field.LOCATION
|
||||
);
|
||||
|
||||
private final Path geoIpConfigDirectory;
|
||||
private final DatabaseReaderService databaseReaderService = new DatabaseReaderService();
|
||||
private final Map<String, DatabaseReader> databaseReaders;
|
||||
|
||||
public Factory(Path configDirectory) {
|
||||
this.geoIpConfigDirectory = configDirectory.resolve("ingest").resolve("geoip");
|
||||
Path geoIpConfigDirectory = configDirectory.resolve("ingest").resolve("geoip");
|
||||
if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) {
|
||||
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
|
||||
}
|
||||
|
||||
try (Stream<Path> databaseFiles = Files.list(geoIpConfigDirectory)) {
|
||||
Map<String, DatabaseReader> databaseReaders = new HashMap<>();
|
||||
// Use iterator instead of forEach otherwise IOException needs to be caught twice...
|
||||
Iterator<Path> iterator = databaseFiles.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Path databasePath = iterator.next();
|
||||
if (Files.isRegularFile(databasePath)) {
|
||||
try (InputStream inputStream = Files.newInputStream(databasePath, StandardOpenOption.READ)) {
|
||||
databaseReaders.put(databasePath.getFileName().toString(), new DatabaseReader.Builder(inputStream).build());
|
||||
}
|
||||
}
|
||||
}
|
||||
this.databaseReaders = Collections.unmodifiableMap(databaseReaders);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public GeoIpProcessor create(Map<String, Object> config) throws Exception {
|
||||
String ipField = readStringProperty(config, "source_field");
|
||||
String targetField = readStringProperty(config, "target_field", "geoip");
|
||||
String databaseFile = readStringProperty(config, "database_file", "GeoLite2-City.mmdb");
|
||||
List<String> fieldNames = readOptionalList(config, "fields");
|
||||
|
||||
final Set<Field> fields;
|
||||
if (config.containsKey("fields")) {
|
||||
if (fieldNames != null) {
|
||||
fields = EnumSet.noneOf(Field.class);
|
||||
List<String> fieldNames = readList(config, "fields");
|
||||
for (String fieldName : fieldNames) {
|
||||
try {
|
||||
fields.add(Field.parse(fieldName));
|
||||
|
@ -244,20 +267,16 @@ public final class GeoIpProcessor implements Processor {
|
|||
fields = DEFAULT_FIELDS;
|
||||
}
|
||||
|
||||
Path databasePath = geoIpConfigDirectory.resolve(databaseFile);
|
||||
if (Files.exists(databasePath) && Files.isRegularFile(databasePath)) {
|
||||
try (InputStream database = Files.newInputStream(databasePath, StandardOpenOption.READ)) {
|
||||
DatabaseReader databaseReader = databaseReaderService.getOrCreateDatabaseReader(databaseFile, database);
|
||||
return new GeoIpProcessor(ipField, databaseReader, targetField, fields);
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException("database file [" + databaseFile + "] doesn't exist in [" + geoIpConfigDirectory + "]");
|
||||
DatabaseReader databaseReader = databaseReaders.get(databaseFile);
|
||||
if (databaseReader == null) {
|
||||
throw new IllegalArgumentException("database file [" + databaseFile + "] doesn't exist");
|
||||
}
|
||||
return new GeoIpProcessor(ipField, databaseReader, targetField, fields);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
databaseReaderService.close();
|
||||
IOUtils.close(databaseReaders.values());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,41 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest.processor.geoip;
|
||||
|
||||
import com.maxmind.geoip2.DatabaseReader;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
import java.io.InputStream;
|
||||
|
||||
public class DatabaseReaderServiceTests extends ESTestCase {
|
||||
|
||||
public void testLookup() throws Exception {
|
||||
InputStream database = DatabaseReaderServiceTests.class.getResourceAsStream("/GeoLite2-City.mmdb");
|
||||
|
||||
DatabaseReaderService service = new DatabaseReaderService();
|
||||
DatabaseReader instance = service.getOrCreateDatabaseReader("key1", database);
|
||||
assertThat(service.getOrCreateDatabaseReader("key1", database), equalTo(instance));
|
||||
|
||||
database = DatabaseReaderServiceTests.class.getResourceAsStream("/GeoLite2-City.mmdb");
|
||||
assertThat(service.getOrCreateDatabaseReader("key2", database), not(equalTo(instance)));
|
||||
}
|
||||
|
||||
}
|
|
@ -87,8 +87,9 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
|
|||
config.put("database_file", "does-not-exist.mmdb");
|
||||
try {
|
||||
factory.create(config);
|
||||
fail("Exception expected");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), startsWith("database file [does-not-exist.mmdb] doesn't exist in"));
|
||||
assertThat(e.getMessage(), equalTo("database file [does-not-exist.mmdb] doesn't exist"));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -119,6 +120,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
|
|||
config.put("fields", Collections.singletonList("invalid"));
|
||||
try {
|
||||
factory.create(config);
|
||||
fail("exception expected");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("illegal field option [invalid]. valid values are [[IP, COUNTRY_ISO_CODE, COUNTRY_NAME, CONTINENT_NAME, REGION_NAME, CITY_NAME, TIMEZONE, LATITUDE, LONGITUDE, LOCATION]]"));
|
||||
}
|
||||
|
@ -128,6 +130,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
|
|||
config.put("fields", "invalid");
|
||||
try {
|
||||
factory.create(config);
|
||||
fail("exception expected");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), equalTo("property [fields] isn't a list, but of type [java.lang.String]"));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue