diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/DatabaseReaderService.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/DatabaseReaderService.java deleted file mode 100644 index 8d61accf92e..00000000000 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/DatabaseReaderService.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.ingest.processor.geoip; - -import com.maxmind.geoip2.DatabaseReader; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.Map; - -final class DatabaseReaderService implements Closeable { - - private final Map databaseReaders = new HashMap<>(); - - synchronized DatabaseReader getOrCreateDatabaseReader(String key, InputStream inputStream) throws IOException { - DatabaseReader databaseReader = databaseReaders.get(key); - if (databaseReader != null) { - return databaseReader; - } - - databaseReader = new DatabaseReader.Builder(inputStream).build(); - databaseReaders.put(key, databaseReader); - return databaseReader; - } - - @Override - public void close() throws IOException { - for (DatabaseReader databaseReader : databaseReaders.values()) { - databaseReader.close(); - } - } -} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java index 6fd70cf7828..83871d54c86 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessor.java @@ -24,6 +24,7 @@ import com.maxmind.geoip2.exception.AddressNotFoundException; import com.maxmind.geoip2.model.CityResponse; import com.maxmind.geoip2.model.CountryResponse; import com.maxmind.geoip2.record.*; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.SpecialPermission; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.ingest.IngestDocument; @@ -40,8 +41,10 @@ import java.nio.file.StandardOpenOption; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.*; +import java.util.stream.Stream; import static org.elasticsearch.ingest.processor.ConfigurationUtils.readList; +import static org.elasticsearch.ingest.processor.ConfigurationUtils.readOptionalList; import static org.elasticsearch.ingest.processor.ConfigurationUtils.readStringProperty; public final class GeoIpProcessor implements Processor { @@ -218,21 +221,41 @@ public final class GeoIpProcessor implements Processor { Field.CONTINENT_NAME, Field.COUNTRY_ISO_CODE, Field.REGION_NAME, Field.CITY_NAME, Field.LOCATION ); - private final Path geoIpConfigDirectory; - private final DatabaseReaderService databaseReaderService = new DatabaseReaderService(); + private final Map databaseReaders; public Factory(Path configDirectory) { - this.geoIpConfigDirectory = configDirectory.resolve("ingest").resolve("geoip"); + Path geoIpConfigDirectory = configDirectory.resolve("ingest").resolve("geoip"); + if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) { + throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist"); + } + + try (Stream databaseFiles = Files.list(geoIpConfigDirectory)) { + Map databaseReaders = new HashMap<>(); + // Use iterator instead of forEach otherwise IOException needs to be caught twice... + Iterator iterator = databaseFiles.iterator(); + while (iterator.hasNext()) { + Path databasePath = iterator.next(); + if (Files.isRegularFile(databasePath)) { + try (InputStream inputStream = Files.newInputStream(databasePath, StandardOpenOption.READ)) { + databaseReaders.put(databasePath.getFileName().toString(), new DatabaseReader.Builder(inputStream).build()); + } + } + } + this.databaseReaders = Collections.unmodifiableMap(databaseReaders); + } catch (IOException e) { + throw new RuntimeException(e); + } } public GeoIpProcessor create(Map config) throws Exception { String ipField = readStringProperty(config, "source_field"); String targetField = readStringProperty(config, "target_field", "geoip"); String databaseFile = readStringProperty(config, "database_file", "GeoLite2-City.mmdb"); + List fieldNames = readOptionalList(config, "fields"); + final Set fields; - if (config.containsKey("fields")) { + if (fieldNames != null) { fields = EnumSet.noneOf(Field.class); - List fieldNames = readList(config, "fields"); for (String fieldName : fieldNames) { try { fields.add(Field.parse(fieldName)); @@ -244,20 +267,16 @@ public final class GeoIpProcessor implements Processor { fields = DEFAULT_FIELDS; } - Path databasePath = geoIpConfigDirectory.resolve(databaseFile); - if (Files.exists(databasePath) && Files.isRegularFile(databasePath)) { - try (InputStream database = Files.newInputStream(databasePath, StandardOpenOption.READ)) { - DatabaseReader databaseReader = databaseReaderService.getOrCreateDatabaseReader(databaseFile, database); - return new GeoIpProcessor(ipField, databaseReader, targetField, fields); - } - } else { - throw new IllegalArgumentException("database file [" + databaseFile + "] doesn't exist in [" + geoIpConfigDirectory + "]"); + DatabaseReader databaseReader = databaseReaders.get(databaseFile); + if (databaseReader == null) { + throw new IllegalArgumentException("database file [" + databaseFile + "] doesn't exist"); } + return new GeoIpProcessor(ipField, databaseReader, targetField, fields); } @Override public void close() throws IOException { - databaseReaderService.close(); + IOUtils.close(databaseReaders.values()); } } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/DatabaseReaderServiceTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/DatabaseReaderServiceTests.java deleted file mode 100644 index ebf3fefdba0..00000000000 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/DatabaseReaderServiceTests.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.ingest.processor.geoip; - -import com.maxmind.geoip2.DatabaseReader; -import org.elasticsearch.test.ESTestCase; -import static org.hamcrest.Matchers.*; - -import java.io.InputStream; - -public class DatabaseReaderServiceTests extends ESTestCase { - - public void testLookup() throws Exception { - InputStream database = DatabaseReaderServiceTests.class.getResourceAsStream("/GeoLite2-City.mmdb"); - - DatabaseReaderService service = new DatabaseReaderService(); - DatabaseReader instance = service.getOrCreateDatabaseReader("key1", database); - assertThat(service.getOrCreateDatabaseReader("key1", database), equalTo(instance)); - - database = DatabaseReaderServiceTests.class.getResourceAsStream("/GeoLite2-City.mmdb"); - assertThat(service.getOrCreateDatabaseReader("key2", database), not(equalTo(instance))); - } - -} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessorFactoryTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessorFactoryTests.java index d4feeff886e..d42f87d8048 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessorFactoryTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/geoip/GeoIpProcessorFactoryTests.java @@ -87,8 +87,9 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { config.put("database_file", "does-not-exist.mmdb"); try { factory.create(config); + fail("Exception expected"); } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), startsWith("database file [does-not-exist.mmdb] doesn't exist in")); + assertThat(e.getMessage(), equalTo("database file [does-not-exist.mmdb] doesn't exist")); } } @@ -119,6 +120,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { config.put("fields", Collections.singletonList("invalid")); try { factory.create(config); + fail("exception expected"); } catch (IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("illegal field option [invalid]. valid values are [[IP, COUNTRY_ISO_CODE, COUNTRY_NAME, CONTINENT_NAME, REGION_NAME, CITY_NAME, TIMEZONE, LATITUDE, LONGITUDE, LOCATION]]")); } @@ -128,6 +130,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase { config.put("fields", "invalid"); try { factory.create(config); + fail("exception expected"); } catch (IllegalArgumentException e) { assertThat(e.getMessage(), equalTo("property [fields] isn't a list, but of type [java.lang.String]")); }