Added ASN support for Ingest GeoIP plugin.

Closes #27849
This commit is contained in:
Sian Lerk Lau 2017-12-22 14:51:44 +08:00 committed by Martijn van Groningen
parent cb783bcb57
commit a4a7150b56
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
6 changed files with 183 additions and 17 deletions

View File

@ -5,12 +5,12 @@ The GeoIP processor adds information about the geographical location of IP addre
This processor adds this information by default under the `geoip` field. The `geoip` processor can resolve both IPv4 and This processor adds this information by default under the `geoip` field. The `geoip` processor can resolve both IPv4 and
IPv6 addresses. IPv6 addresses.
The ingest-geoip plugin ships by default with the GeoLite2 City and GeoLite2 Country geoip2 databases from Maxmind made available The ingest-geoip plugin ships by default with the GeoLite2 City, GeoLite2 Country and GeoLite2 ASN geoip2 databases from Maxmind made available
under the CCA-ShareAlike 4.0 license. For more details see, http://dev.maxmind.com/geoip/geoip2/geolite2/ under the CCA-ShareAlike 4.0 license. For more details see, http://dev.maxmind.com/geoip/geoip2/geolite2/
The GeoIP processor can run with other geoip2 databases from Maxmind. The files must be copied into the geoip config directory, The GeoIP processor can run with other geoip2 databases from Maxmind. The files must be copied into the geoip config directory,
and the `database_file` option should be used to specify the filename of the custom database. Custom database files must be compressed and the `database_file` option should be used to specify the filename of the custom database. Custom database files must be compressed
with gzip. The geoip config directory is located at `$ES_HOME/config/ingest-geoip` and holds the shipped databases too. with gzip. The geoip config directory is located at `$ES_HOME/ingest-geoip` and holds the shipped databases too.
:plugin_name: ingest-geoip :plugin_name: ingest-geoip
include::install_remove.asciidoc[] include::install_remove.asciidoc[]
@ -36,7 +36,11 @@ include::install_remove.asciidoc[]
`country_iso_code`, `country_name`, `continent_name`, `region_name`, `city_name`, `timezone`, `latitude`, `longitude` `country_iso_code`, `country_name`, `continent_name`, `region_name`, `city_name`, `timezone`, `latitude`, `longitude`
and `location`. The fields actually added depend on what has been found and which properties were configured in `properties`. and `location`. The fields actually added depend on what has been found and which properties were configured in `properties`.
* If the GeoLite2 Country database is used, then the following fields may be added under the `target_field`: `ip`, * If the GeoLite2 Country database is used, then the following fields may be added under the `target_field`: `ip`,
`country_iso_code`, `country_name` and `continent_name`. The fields actually added depend on what has been found and which properties were configured in `properties`. `country_iso_code`, `country_name` and `continent_name`. The fields actually added depend on what has been found and which properties
were configured in `properties`.
* If the GeoLite2 ASN database is used, then the following fields may be added under the `target_field`: `ip`,
`asn`, and `organization_name`. The fields actually added depend on what has been found and which properties were configured
in `properties`.
Here is an example that uses the default city database and adds the geographical information to the `geoip` field based on the `ip` field: Here is an example that uses the default city database and adds the geographical information to the `geoip` field based on the `ip` field:

View File

@ -23,7 +23,7 @@ esplugin {
} }
dependencies { dependencies {
// Upgrade to 2.10.0 or higher when jackson-core gets upgraded to 2.9.x // Upgrade to 2.10.0 or higher when jackson-core gets upgraded to 2.9.x. Blocked by #27032
compile ('com.maxmind.geoip2:geoip2:2.9.0') compile ('com.maxmind.geoip2:geoip2:2.9.0')
// geoip2 dependencies: // geoip2 dependencies:
compile("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}") compile("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}")
@ -36,10 +36,7 @@ dependencies {
task copyDefaultGeoIp2DatabaseFiles(type: Copy) { task copyDefaultGeoIp2DatabaseFiles(type: Copy) {
from { zipTree(configurations.testCompile.files.find { it.name.contains('geolite2-databases')}) } from { zipTree(configurations.testCompile.files.find { it.name.contains('geolite2-databases')}) }
into "${project.buildDir}/ingest-geoip" into "${project.buildDir}/ingest-geoip"
include "*.mmdb.gz"
// For now, do not include GeoLite2-ASN.mmdb.gz file, because it isn't used yet:
include "GeoLite2-City.mmdb.gz"
include "GeoLite2-Country.mmdb.gz"
} }
project.bundlePlugin.dependsOn(copyDefaultGeoIp2DatabaseFiles) project.bundlePlugin.dependsOn(copyDefaultGeoIp2DatabaseFiles)

View File

@ -21,6 +21,7 @@ package org.elasticsearch.ingest.geoip;
import com.maxmind.geoip2.DatabaseReader; import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.exception.AddressNotFoundException; import com.maxmind.geoip2.exception.AddressNotFoundException;
import com.maxmind.geoip2.model.AsnResponse;
import com.maxmind.geoip2.model.CityResponse; import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.model.CountryResponse; import com.maxmind.geoip2.model.CountryResponse;
import com.maxmind.geoip2.record.City; import com.maxmind.geoip2.record.City;
@ -59,6 +60,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
public static final String TYPE = "geoip"; public static final String TYPE = "geoip";
private static final String CITY_DB_SUFFIX = "-City"; private static final String CITY_DB_SUFFIX = "-City";
private static final String COUNTRY_DB_SUFFIX = "-Country"; private static final String COUNTRY_DB_SUFFIX = "-Country";
private static final String ASN_DB_SUFFIX = "-ASN";
private final String field; private final String field;
private final String targetField; private final String targetField;
@ -107,6 +109,12 @@ public final class GeoIpProcessor extends AbstractProcessor {
} catch (AddressNotFoundRuntimeException e) { } catch (AddressNotFoundRuntimeException e) {
geoData = Collections.emptyMap(); geoData = Collections.emptyMap();
} }
} else if (databaseType.endsWith(ASN_DB_SUFFIX)) {
try {
geoData = retrieveAsnGeoData(ipAddress);
} catch (AddressNotFoundRuntimeException e) {
geoData = Collections.emptyMap();
}
} else { } else {
throw new ElasticsearchParseException("Unsupported database type [" + dbReader.getMetadata().getDatabaseType() throw new ElasticsearchParseException("Unsupported database type [" + dbReader.getMetadata().getDatabaseType()
+ "]", new IllegalStateException()); + "]", new IllegalStateException());
@ -256,12 +264,53 @@ public final class GeoIpProcessor extends AbstractProcessor {
return geoData; return geoData;
} }
private Map<String, Object> retrieveAsnGeoData(InetAddress ipAddress) {
SpecialPermission.check();
AsnResponse response = AccessController.doPrivileged((PrivilegedAction<AsnResponse>) () -> {
try {
return dbReader.asn(ipAddress);
} catch (AddressNotFoundException e) {
throw new AddressNotFoundRuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
Integer asn = response.getAutonomousSystemNumber();
String organization_name = response.getAutonomousSystemOrganization();
Map<String, Object> geoData = new HashMap<>();
for (Property property : this.properties) {
switch (property) {
case IP:
geoData.put("ip", NetworkAddress.format(ipAddress));
break;
case ASN:
if (asn != null) {
geoData.put("asn", asn);
}
break;
case ORGANIZATION_NAME:
if (organization_name != null) {
geoData.put("organization_name", organization_name);
}
break;
}
}
return geoData;
}
public static final class Factory implements Processor.Factory { public static final class Factory implements Processor.Factory {
static final Set<Property> DEFAULT_CITY_PROPERTIES = EnumSet.of( static final Set<Property> DEFAULT_CITY_PROPERTIES = EnumSet.of(
Property.CONTINENT_NAME, Property.COUNTRY_ISO_CODE, Property.REGION_NAME, Property.CONTINENT_NAME, Property.COUNTRY_ISO_CODE, Property.REGION_NAME,
Property.CITY_NAME, Property.LOCATION Property.CITY_NAME, Property.LOCATION
); );
static final Set<Property> DEFAULT_COUNTRY_PROPERTIES = EnumSet.of(Property.CONTINENT_NAME, Property.COUNTRY_ISO_CODE); static final Set<Property> DEFAULT_COUNTRY_PROPERTIES = EnumSet.of(
Property.CONTINENT_NAME, Property.COUNTRY_ISO_CODE
);
static final Set<Property> DEFAULT_ASN_PROPERTIES = EnumSet.of(
Property.IP, Property.ASN, Property.ORGANIZATION_NAME
);
private final Map<String, DatabaseReaderLazyLoader> databaseReaders; private final Map<String, DatabaseReaderLazyLoader> databaseReaders;
@ -302,6 +351,8 @@ public final class GeoIpProcessor extends AbstractProcessor {
properties = DEFAULT_CITY_PROPERTIES; properties = DEFAULT_CITY_PROPERTIES;
} else if (databaseType.endsWith(COUNTRY_DB_SUFFIX)) { } else if (databaseType.endsWith(COUNTRY_DB_SUFFIX)) {
properties = DEFAULT_COUNTRY_PROPERTIES; properties = DEFAULT_COUNTRY_PROPERTIES;
} else if (databaseType.endsWith(ASN_DB_SUFFIX)) {
properties = DEFAULT_ASN_PROPERTIES;
} else { } else {
throw newConfigurationException(TYPE, processorTag, "database_file", "Unsupported database type [" throw newConfigurationException(TYPE, processorTag, "database_file", "Unsupported database type ["
+ databaseType + "]"); + databaseType + "]");
@ -331,11 +382,20 @@ public final class GeoIpProcessor extends AbstractProcessor {
REGION_NAME, REGION_NAME,
CITY_NAME, CITY_NAME,
TIMEZONE, TIMEZONE,
LOCATION; LOCATION,
ASN,
ORGANIZATION_NAME;
static final EnumSet<Property> ALL_CITY_PROPERTIES = EnumSet.allOf(Property.class); static final EnumSet<Property> ALL_CITY_PROPERTIES = EnumSet.of(
static final EnumSet<Property> ALL_COUNTRY_PROPERTIES = EnumSet.of(Property.IP, Property.CONTINENT_NAME, Property.IP, Property.COUNTRY_ISO_CODE, Property.COUNTRY_NAME, Property.CONTINENT_NAME,
Property.COUNTRY_NAME, Property.COUNTRY_ISO_CODE); Property.REGION_NAME, Property.CITY_NAME, Property.TIMEZONE, Property.LOCATION
);
static final EnumSet<Property> ALL_COUNTRY_PROPERTIES = EnumSet.of(
Property.IP, Property.CONTINENT_NAME, Property.COUNTRY_NAME, Property.COUNTRY_ISO_CODE
);
static final EnumSet<Property> ALL_ASN_PROPERTIES = EnumSet.of(
Property.IP, Property.ASN, Property.ORGANIZATION_NAME
);
public static Property parseProperty(String databaseType, String value) { public static Property parseProperty(String databaseType, String value) {
Set<Property> validProperties = EnumSet.noneOf(Property.class); Set<Property> validProperties = EnumSet.noneOf(Property.class);
@ -343,6 +403,8 @@ public final class GeoIpProcessor extends AbstractProcessor {
validProperties = ALL_CITY_PROPERTIES; validProperties = ALL_CITY_PROPERTIES;
} else if (databaseType.endsWith(COUNTRY_DB_SUFFIX)) { } else if (databaseType.endsWith(COUNTRY_DB_SUFFIX)) {
validProperties = ALL_COUNTRY_PROPERTIES; validProperties = ALL_COUNTRY_PROPERTIES;
} else if (databaseType.endsWith(ASN_DB_SUFFIX)) {
validProperties = ALL_ASN_PROPERTIES;
} }
try { try {

View File

@ -58,6 +58,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
geoIpConfigDir.resolve("GeoLite2-City.mmdb.gz")); geoIpConfigDir.resolve("GeoLite2-City.mmdb.gz"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb.gz")), Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb.gz")),
geoIpConfigDir.resolve("GeoLite2-Country.mmdb.gz")); geoIpConfigDir.resolve("GeoLite2-Country.mmdb.gz"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb.gz")),
geoIpConfigDir.resolve("GeoLite2-ASN.mmdb.gz"));
NodeCache cache = randomFrom(NoCache.getInstance(), new GeoIpCache(randomNonNegativeLong())); NodeCache cache = randomFrom(NoCache.getInstance(), new GeoIpCache(randomNonNegativeLong()));
databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir, cache); databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir, cache);
@ -122,6 +124,24 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
assertFalse(processor.isIgnoreMissing()); assertFalse(processor.isIgnoreMissing());
} }
public void testAsnBuildDefaults() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-ASN.mmdb.gz");
String processorTag = randomAlphaOfLength(10);
GeoIpProcessor processor = factory.create(null, processorTag, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("geoip"));
assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-ASN"));
assertThat(processor.getProperties(), sameInstance(GeoIpProcessor.Factory.DEFAULT_ASN_PROPERTIES));
assertFalse(processor.isIgnoreMissing());
}
public void testBuildTargetField() throws Exception { public void testBuildTargetField() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
Map<String, Object> config = new HashMap<>(); Map<String, Object> config = new HashMap<>();
@ -146,12 +166,31 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
assertFalse(processor.isIgnoreMissing()); assertFalse(processor.isIgnoreMissing());
} }
public void testBuildWithCountryDbAndCityFields() throws Exception { public void testBuildWithCountryDbAndAsnFields() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders); GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
Map<String, Object> config = new HashMap<>(); Map<String, Object> config = new HashMap<>();
config.put("field", "_field"); config.put("field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb.gz"); config.put("database_file", "GeoLite2-Country.mmdb.gz");
EnumSet<GeoIpProcessor.Property> cityOnlyProperties = EnumSet.complementOf(GeoIpProcessor.Property.ALL_COUNTRY_PROPERTIES); EnumSet<GeoIpProcessor.Property> asnOnlyProperties = EnumSet.copyOf(GeoIpProcessor.Property.ALL_ASN_PROPERTIES);
asnOnlyProperties.remove(GeoIpProcessor.Property.IP);
String asnProperty = RandomPicks.randomFrom(Randomness.get(), asnOnlyProperties).toString();
config.put("properties", Collections.singletonList(asnProperty));
try {
factory.create(null, null, config);
fail("Exception expected");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[properties] illegal property value [" + asnProperty +
"]. valid values are [IP, COUNTRY_ISO_CODE, COUNTRY_NAME, CONTINENT_NAME]"));
}
}
public void testBuildWithAsnDbAndCityFields() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-ASN.mmdb.gz");
EnumSet<GeoIpProcessor.Property> cityOnlyProperties = EnumSet.copyOf(GeoIpProcessor.Property.ALL_CITY_PROPERTIES);
cityOnlyProperties.remove(GeoIpProcessor.Property.IP);
String cityProperty = RandomPicks.randomFrom(Randomness.get(), cityOnlyProperties).toString(); String cityProperty = RandomPicks.randomFrom(Randomness.get(), cityOnlyProperties).toString();
config.put("properties", Collections.singletonList(cityProperty)); config.put("properties", Collections.singletonList(cityProperty));
try { try {
@ -159,7 +198,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
fail("Exception expected"); fail("Exception expected");
} catch (ElasticsearchParseException e) { } catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[properties] illegal property value [" + cityProperty + assertThat(e.getMessage(), equalTo("[properties] illegal property value [" + cityProperty +
"]. valid values are [IP, COUNTRY_ISO_CODE, COUNTRY_NAME, CONTINENT_NAME]")); "]. valid values are [IP, ASN, ORGANIZATION_NAME]"));
} }
} }
@ -230,6 +269,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
geoIpConfigDir.resolve("GeoLite2-City.mmdb.gz")); geoIpConfigDir.resolve("GeoLite2-City.mmdb.gz"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb.gz")), Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb.gz")),
geoIpConfigDir.resolve("GeoLite2-Country.mmdb.gz")); geoIpConfigDir.resolve("GeoLite2-Country.mmdb.gz"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb.gz")),
geoIpConfigDir.resolve("GeoLite2-ASN.mmdb.gz"));
// Loading another database reader instances, because otherwise we can't test lazy loading as the // Loading another database reader instances, because otherwise we can't test lazy loading as the
// database readers used at class level are reused between tests. (we want to keep that otherwise running this // database readers used at class level are reused between tests. (we want to keep that otherwise running this
@ -249,6 +290,10 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
config.put("field", "_field"); config.put("field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb.gz"); config.put("database_file", "GeoLite2-Country.mmdb.gz");
factory.create(null, "_tag", config); factory.create(null, "_tag", config);
config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-ASN.mmdb.gz");
factory.create(null, "_tag", config);
for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) { for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) {
assertNotNull(lazyLoader.databaseReader.get()); assertNotNull(lazyLoader.databaseReader.get());

View File

@ -187,6 +187,26 @@ public class GeoIpProcessorTests extends ESTestCase {
assertThat(geoData.get("ip"), equalTo("80.231.5.0")); assertThat(geoData.get("ip"), equalTo("80.231.5.0"));
} }
public void testAsn() throws Exception {
String ip = "82.170.213.79";
InputStream database = getDatabaseFileInputStream("/GeoLite2-ASN.mmdb.gz");
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false);
Map<String, Object> document = new HashMap<>();
document.put("source_field", ip);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata().get("source_field"), equalTo(ip));
@SuppressWarnings("unchecked")
Map<String, Object> geoData = (Map<String, Object>) ingestDocument.getSourceAndMetadata().get("target_field");
assertThat(geoData.size(), equalTo(3));
assertThat(geoData.get("ip"), equalTo(ip));
assertThat(geoData.get("asn"), equalTo(5615));
assertThat(geoData.get("organization_name"), equalTo("KPN B.V."));
}
public void testAddressIsNotInTheDatabase() throws Exception { public void testAddressIsNotInTheDatabase() throws Exception {
InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb.gz"); InputStream database = getDatabaseFileInputStream("/GeoLite2-City.mmdb.gz");
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",

View File

@ -87,7 +87,7 @@
- match: { _source.geoip.continent_name: "North America" } - match: { _source.geoip.continent_name: "North America" }
--- ---
"Test geoip processor with different database file": "Test geoip processor with different database file - GeoLite2-Country":
- do: - do:
ingest.put_pipeline: ingest.put_pipeline:
id: "my_pipeline" id: "my_pipeline"
@ -195,3 +195,41 @@
- match: { _source.geoip.location.lat: 44.9759 } - match: { _source.geoip.location.lat: 44.9759 }
- match: { _source.geoip.region_name: "Minnesota" } - match: { _source.geoip.region_name: "Minnesota" }
- match: { _source.geoip.continent_name: "North America" } - match: { _source.geoip.continent_name: "North America" }
---
"Test geoip processor with different database file - GeoLite2-ASN":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"geoip" : {
"field" : "field1",
"database_file" : "GeoLite2-ASN.mmdb.gz"
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
type: test
id: 1
pipeline: "my_pipeline"
body: {field1: "82.170.213.79"}
- do:
get:
index: test
type: test
id: 1
- match: { _source.field1: "82.170.213.79" }
- length: { _source.geoip: 3 }
- match: { _source.geoip.ip: "82.170.213.79" }
- match: { _source.geoip.asn: 5615 }
- match: { _source.geoip.organization_name: "KPN B.V." }