geoip: renamed `ip_field` option to `source_field`, because it can hold a ip or hostname.

geoip: add a `fields` option to control what fields are added by geoip processor
geoip: instead of adding all fields, only `country_code`, `city_name`, `location`, `continent_name` and `region_name` fields are added.
This commit is contained in:
Martijn van Groningen 2015-11-10 09:17:28 +07:00
parent 28dbccaced
commit 4da05168f4
5 changed files with 230 additions and 58 deletions

View File

@ -309,18 +309,19 @@ is located at `$ES_HOME/config/ingest/geoip` and holds the shipped databases too
.Geoip options
[options="header"]
|======
| Name | Required | Default | Description
| `ip_field` | yes | - | The field to get the ip address from for the geographical lookup.
| `target_field` | no | geoip | The field that will hold the geographical information looked up from the Maxmind database.
| `database_file` | no | GeoLite2-City.mmdb | The database filename in the geoip config directory. The ingest plugin ships with the GeoLite2-City.mmdb and GeoLite2-Country.mmdb files.
| Name | Required | Default | Description
| `source_field` | yes | - | The field to get the ip address or hostname from for the geographical lookup.
| `target_field` | no | geoip | The field that will hold the geographical information looked up from the Maxmind database.
| `database_file` | no | GeoLite2-City.mmdb | The database filename in the geoip config directory. The ingest plugin ships with the GeoLite2-City.mmdb and GeoLite2-Country.mmdb files.
| `fields` | no | [`continent_name`, `country_iso_code`, `region_name`, `city_name`, `location`] <1> | Controls what properties are added to the `target_field` based on the geoip lookup.
|======
If the GeoLite2 City database is used then the following fields will be added under the `target_field`: `ip`,
<1> Depends on what is available in `database_field`:
* If the GeoLite2 City database is used then the following fields may be added under the `target_field`: `ip`,
`country_iso_code`, `country_name`, `continent_name`, `region_name`, `city_name`, `timezone`, `latitude`, `longitude`
and `location`.
If the GeoLite2 Country database is used then the following fields will be added under the `target_field`: `ip`,
`country_iso_code`, `country_name` and `continent_name`.
and `location`. The fields actually added depend on what has been found and which fields were configured in `fields`.
* If the GeoLite2 Country database is used then the following fields may be added under the `target_field`: `ip`,
`country_iso_code`, `country_name` and `continent_name`.The fields actually added depend on what has been found and which fields were configured in `fields`.
An example that uses the default city database and adds the geographical information to the `geoip` field based on the `ip` field:
@ -331,7 +332,7 @@ An example that uses the default city database and adds the geographical informa
"processors" : [
{
"geoip" : {
"ip_field" : "ip"
"source_field" : "ip"
}
}
]
@ -347,7 +348,7 @@ An example that uses the default country database and add the geographical infor
"processors" : [
{
"geoip" : {
"ip_field" : "ip",
"source_field" : "ip",
"target_field" : "geo",
"database_file" : "GeoLite2-Country.mmdb"
}

View File

@ -21,11 +21,9 @@ package org.elasticsearch.ingest.processor.geoip;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.exception.AddressNotFoundException;
import com.maxmind.geoip2.exception.GeoIp2Exception;
import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.model.CountryResponse;
import com.maxmind.geoip2.record.*;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.ingest.Data;
@ -40,29 +38,30 @@ import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import static org.elasticsearch.ingest.processor.ConfigurationUtils.readStringList;
import static org.elasticsearch.ingest.processor.ConfigurationUtils.readStringProperty;
public final class GeoIpProcessor implements Processor {
public static final String TYPE = "geoip";
private final String ipField;
private final String sourceField;
private final String targetField;
private final DatabaseReader dbReader;
private final Set<Field> fields;
GeoIpProcessor(String ipField, DatabaseReader dbReader, String targetField) throws IOException {
this.ipField = ipField;
GeoIpProcessor(String sourceField, DatabaseReader dbReader, String targetField, Set<Field> fields) throws IOException {
this.sourceField = sourceField;
this.targetField = targetField;
this.dbReader = dbReader;
this.fields = fields;
}
@Override
public void execute(Data data) {
String ip = data.getProperty(ipField);
String ip = data.getProperty(sourceField);
final InetAddress ipAddress;
try {
ipAddress = InetAddress.getByName(ip);
@ -92,8 +91,8 @@ public final class GeoIpProcessor implements Processor {
data.addField(targetField, geoData);
}
String getIpField() {
return ipField;
String getSourceField() {
return sourceField;
}
String getTargetField() {
@ -104,6 +103,10 @@ public final class GeoIpProcessor implements Processor {
return dbReader;
}
Set<Field> getFields() {
return fields;
}
private Map<String, Object> retrieveCityGeoData(InetAddress ipAddress) {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
@ -125,18 +128,42 @@ public final class GeoIpProcessor implements Processor {
Continent continent = response.getContinent();
Subdivision subdivision = response.getMostSpecificSubdivision();
Map<String, Object> geoData = new HashMap<String, Object>();
geoData.put("ip", NetworkAddress.formatAddress(ipAddress));
geoData.put("country_iso_code", country.getIsoCode());
geoData.put("country_name", country.getName());
geoData.put("continent_name", continent.getName());
geoData.put("region_name", subdivision.getName());
geoData.put("city_name", city.getName());
geoData.put("timezone", location.getTimeZone());
geoData.put("latitude", location.getLatitude());
geoData.put("longitude", location.getLongitude());
if (location.getLatitude() != null && location.getLongitude() != null) {
geoData.put("location", new double[]{location.getLongitude(), location.getLatitude()});
Map<String, Object> geoData = new HashMap<>();
for (Field field : fields) {
switch (field) {
case IP:
geoData.put("ip", NetworkAddress.formatAddress(ipAddress));
break;
case COUNTRY_ISO_CODE:
geoData.put("country_iso_code", country.getIsoCode());
break;
case COUNTRY_NAME:
geoData.put("country_name", country.getName());
break;
case CONTINENT_NAME:
geoData.put("continent_name", continent.getName());
break;
case REGION_NAME:
geoData.put("region_name", subdivision.getName());
break;
case CITY_NAME:
geoData.put("city_name", city.getName());
break;
case TIMEZONE:
geoData.put("timezone", location.getTimeZone());
break;
case LATITUDE:
geoData.put("latitude", location.getLatitude());
break;
case LONGITUDE:
geoData.put("longitude", location.getLongitude());
break;
case LOCATION:
if (location.getLatitude() != null && location.getLongitude() != null) {
geoData.put("location", new double[]{location.getLongitude(), location.getLatitude()});
}
break;
}
}
return geoData;
}
@ -159,29 +186,59 @@ public final class GeoIpProcessor implements Processor {
Country country = response.getCountry();
Continent continent = response.getContinent();
Map<String, Object> geoData = new HashMap<String, Object>();
geoData.put("ip", NetworkAddress.formatAddress(ipAddress));
geoData.put("country_iso_code", country.getIsoCode());
geoData.put("country_name", country.getName());
geoData.put("continent_name", continent.getName());
Map<String, Object> geoData = new HashMap<>();
for (Field field : fields) {
switch (field) {
case IP:
geoData.put("ip", NetworkAddress.formatAddress(ipAddress));
break;
case COUNTRY_ISO_CODE:
geoData.put("country_iso_code", country.getIsoCode());
break;
case COUNTRY_NAME:
geoData.put("country_name", country.getName());
break;
case CONTINENT_NAME:
geoData.put("continent_name", continent.getName());
break;
}
}
return geoData;
}
public static class Factory implements Processor.Factory<GeoIpProcessor> {
static final Set<Field> DEFAULT_FIELDS = EnumSet.of(
Field.CONTINENT_NAME, Field.COUNTRY_ISO_CODE, Field.REGION_NAME, Field.CITY_NAME, Field.LOCATION
);
private Path geoIpConfigDirectory;
private final DatabaseReaderService databaseReaderService = new DatabaseReaderService();
public GeoIpProcessor create(Map<String, Object> config) throws IOException {
String ipField = readStringProperty(config, "ip_field");
String ipField = readStringProperty(config, "source_field");
String targetField = readStringProperty(config, "target_field", "geoip");
String databaseFile = readStringProperty(config, "database_file", "GeoLite2-City.mmdb");
final Set<Field> fields;
if (config.containsKey("fields")) {
fields = EnumSet.noneOf(Field.class);
List<String> fieldNames = readStringList(config, "fields");
for (String fieldName : fieldNames) {
try {
fields.add(Field.parse(fieldName));
} catch (Exception e) {
throw new IllegalArgumentException("illegal field option [" + fieldName +"]. valid values are [" + Arrays.toString(Field.values()) +"]", e);
}
}
} else {
fields = DEFAULT_FIELDS;
}
Path databasePath = geoIpConfigDirectory.resolve(databaseFile);
if (Files.exists(databasePath) && Files.isRegularFile(databasePath)) {
try (InputStream database = Files.newInputStream(databasePath, StandardOpenOption.READ)) {
DatabaseReader databaseReader = databaseReaderService.getOrCreateDatabaseReader(databaseFile, database);
return new GeoIpProcessor(ipField, databaseReader, targetField);
return new GeoIpProcessor(ipField, databaseReader, targetField, fields);
}
} else {
throw new IllegalArgumentException("database file [" + databaseFile + "] doesn't exist in [" + geoIpConfigDirectory + "]");
@ -209,4 +266,22 @@ public final class GeoIpProcessor implements Processor {
}
}
public enum Field {
IP,
COUNTRY_ISO_CODE,
COUNTRY_NAME,
CONTINENT_NAME,
REGION_NAME,
CITY_NAME,
TIMEZONE,
LATITUDE,
LONGITUDE,
LOCATION;
public static Field parse(String value) {
return valueOf(value.toUpperCase(Locale.ROOT));
}
}
}

View File

@ -24,12 +24,13 @@ import org.elasticsearch.test.StreamsUtils;
import org.junit.Before;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.Matchers.startsWith;
public class GeoIpProcessorFactoryTests extends ESTestCase {
@ -50,22 +51,23 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
factory.setConfigDirectory(configDir);
Map<String, Object> config = new HashMap<>();
config.put("ip_field", "_field");
config.put("source_field", "_field");
GeoIpProcessor processor = factory.create(config);
assertThat(processor.getIpField(), equalTo("_field"));
assertThat(processor.getSourceField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("geoip"));
assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-City"));
assertThat(processor.getFields(), sameInstance(GeoIpProcessor.Factory.DEFAULT_FIELDS));
}
public void testBuild_targetField() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
factory.setConfigDirectory(configDir);
Map<String, Object> config = new HashMap<>();
config.put("ip_field", "_field");
config.put("source_field", "_field");
config.put("target_field", "_field");
GeoIpProcessor processor = factory.create(config);
assertThat(processor.getIpField(), equalTo("_field"));
assertThat(processor.getSourceField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("_field"));
}
@ -73,10 +75,10 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
factory.setConfigDirectory(configDir);
Map<String, Object> config = new HashMap<>();
config.put("ip_field", "_field");
config.put("source_field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb");
GeoIpProcessor processor = factory.create(config);
assertThat(processor.getIpField(), equalTo("_field"));
assertThat(processor.getSourceField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("geoip"));
assertThat(processor.getDbReader().getMetadata().getDatabaseType(), equalTo("GeoLite2-Country"));
}
@ -86,7 +88,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
factory.setConfigDirectory(configDir);
Map<String, Object> config = new HashMap<>();
config.put("ip_field", "_field");
config.put("source_field", "_field");
config.put("database_file", "does-not-exist.mmdb");
try {
factory.create(config);
@ -94,4 +96,47 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
assertThat(e.getMessage(), startsWith("database file [does-not-exist.mmdb] doesn't exist in"));
}
}
public void testBuild_fields() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
factory.setConfigDirectory(configDir);
Set<GeoIpProcessor.Field> fields = EnumSet.noneOf(GeoIpProcessor.Field.class);
List<String> fieldNames = new ArrayList<>();
int numFields = scaledRandomIntBetween(1, GeoIpProcessor.Field.values().length);
for (int i = 0; i < numFields; i++) {
GeoIpProcessor.Field field = GeoIpProcessor.Field.values()[i];
fields.add(field);
fieldNames.add(field.name().toLowerCase(Locale.ROOT));
}
Map<String, Object> config = new HashMap<>();
config.put("source_field", "_field");
config.put("fields", fieldNames);
GeoIpProcessor processor = factory.create(config);
assertThat(processor.getSourceField(), equalTo("_field"));
assertThat(processor.getFields(), equalTo(fields));
}
public void testBuild_illegalFieldOption() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory();
factory.setConfigDirectory(configDir);
Map<String, Object> config = new HashMap<>();
config.put("source_field", "_field");
config.put("fields", Collections.singletonList("invalid"));
try {
factory.create(config);
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("illegal field option [invalid]. valid values are [[IP, COUNTRY_ISO_CODE, COUNTRY_NAME, CONTINENT_NAME, REGION_NAME, CITY_NAME, TIMEZONE, LATITUDE, LONGITUDE, LOCATION]]"));
}
config = new HashMap<>();
config.put("source_field", "_field");
config.put("fields", "invalid");
try {
factory.create(config);
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("property [fields] isn't a list, but of type [java.lang.String]"));
}
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.ingest.Data;
import org.elasticsearch.test.ESTestCase;
import java.io.InputStream;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
@ -33,7 +34,7 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testCity() throws Exception {
InputStream database = GeoIpProcessor.class.getResourceAsStream("/GeoLite2-City.mmdb");
GeoIpProcessor processor = new GeoIpProcessor("source_field", new DatabaseReader.Builder(database).build(), "target_field");
GeoIpProcessor processor = new GeoIpProcessor("source_field", new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Field.class));
Map<String, Object> document = new HashMap<>();
document.put("source_field", "82.170.213.79");
@ -59,7 +60,7 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testCountry() throws Exception {
InputStream database = GeoIpProcessor.class.getResourceAsStream("/GeoLite2-Country.mmdb");
GeoIpProcessor processor = new GeoIpProcessor("source_field", new DatabaseReader.Builder(database).build(), "target_field");
GeoIpProcessor processor = new GeoIpProcessor("source_field", new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Field.class));
Map<String, Object> document = new HashMap<>();
document.put("source_field", "82.170.213.79");
@ -79,7 +80,7 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testAddressIsNotInTheDatabase() throws Exception {
InputStream database = GeoIpProcessor.class.getResourceAsStream("/GeoLite2-City.mmdb");
GeoIpProcessor processor = new GeoIpProcessor("source_field", new DatabaseReader.Builder(database).build(), "target_field");
GeoIpProcessor processor = new GeoIpProcessor("source_field", new DatabaseReader.Builder(database).build(), "target_field", EnumSet.allOf(GeoIpProcessor.Field.class));
Map<String, Object> document = new HashMap<>();
document.put("source_field", "202.45.11.11");

View File

@ -13,7 +13,59 @@
"processors": [
{
"geoip" : {
"ip_field" : "field1"
"source_field" : "field1"
}
}
]
}
- match: { _id: "my_pipeline" }
# Simulate a Thread.sleep(), because pipeline are updated in the background
- do:
catch: request_timeout
cluster.health:
wait_for_nodes: 99
timeout: 2s
- match: { "timed_out": true }
- do:
ingest.index:
index: test
type: test
id: 1
pipeline_id: "my_pipeline"
body: {field1: "128.101.101.101"}
- do:
get:
index: test
type: test
id: 1
- match: { _source.field1: "128.101.101.101" }
- length: { _source.geoip: 5 }
- match: { _source.geoip.city_name: "Minneapolis" }
- match: { _source.geoip.country_iso_code: "US" }
- match: { _source.geoip.location: [-93.2166, 44.9759] }
- match: { _source.geoip.region_name: "Minnesota" }
- match: { _source.geoip.continent_name: "North America" }
---
"Test geoip processor with fields":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"geoip" : {
"source_field" : "field1",
"fields" : ["city_name", "country_iso_code", "ip", "latitude", "longitude", "location", "timezone", "country_name", "region_name", "continent_name"]
}
}
]
@ -69,7 +121,7 @@
"processors": [
{
"geoip" : {
"ip_field" : "field1",
"source_field" : "field1",
"database_file" : "GeoLite2-Country.mmdb"
}
}
@ -99,8 +151,6 @@
type: test
id: 1
- match: { _source.field1: "128.101.101.101" }
- length: { _source.geoip: 4 }
- length: { _source.geoip: 2 }
- match: { _source.geoip.country_iso_code: "US" }
- match: { _source.geoip.ip: "128.101.101.101" }
- match: { _source.geoip.country_name: "United States" }
- match: { _source.geoip.continent_name: "North America" }