Allow list of IPs in geoip ingest processor (#49573) (#49947)

* Allow list of IPs in geoip ingest processor

This change lets you use array of IPs in addition to string in geoip processor source field.
It will set array containing geoip data for each element in source, unless first_only parameter
option is enabled, then only first found will be returned.

Closes #46193
This commit is contained in:
Przemko Robakowski 2019-12-07 00:19:09 +01:00 committed by GitHub
parent 17cda5b2c0
commit d7083a84f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 241 additions and 34 deletions

View File

@ -25,6 +25,7 @@ uncompressed. The `ingest-geoip` config directory is located at `$ES_CONFIG/inge
| `database_file` | no | GeoLite2-City.mmdb | The database filename in the geoip config directory. The ingest-geoip module ships with the GeoLite2-City.mmdb, GeoLite2-Country.mmdb and GeoLite2-ASN.mmdb files.
| `properties` | no | [`continent_name`, `country_iso_code`, `region_iso_code`, `region_name`, `city_name`, `location`] * | Controls what properties are added to the `target_field` based on the geoip lookup.
| `ignore_missing` | no | `false` | If `true` and `field` does not exist, the processor quietly exits without modifying the document
| `first_only` | no | `true` | If `true` only first found geoip data will be returned, even if `field` contains array
|======
*Depends on what is available in `database_file`:

View File

@ -41,6 +41,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
@ -68,6 +69,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
private final Set<Property> properties;
private final boolean ignoreMissing;
private final GeoIpCache cache;
private final boolean firstOnly;
/**
* Construct a geo-IP processor.
@ -79,15 +81,17 @@ public final class GeoIpProcessor extends AbstractProcessor {
* @param properties the properties; ideally this is lazily-loaded once on first use
* @param ignoreMissing true if documents with a missing value for the field should be ignored
* @param cache a geo-IP cache
* @param firstOnly true if only first result should be returned in case of array
*/
GeoIpProcessor(
final String tag,
final String field,
final DatabaseReaderLazyLoader lazyLoader,
final String targetField,
final Set<Property> properties,
final boolean ignoreMissing,
final GeoIpCache cache) {
final String tag,
final String field,
final DatabaseReaderLazyLoader lazyLoader,
final String targetField,
final Set<Property> properties,
final boolean ignoreMissing,
final GeoIpCache cache,
boolean firstOnly) {
super(tag);
this.field = field;
this.targetField = targetField;
@ -95,6 +99,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
this.properties = properties;
this.ignoreMissing = ignoreMissing;
this.cache = cache;
this.firstOnly = firstOnly;
}
boolean isIgnoreMissing() {
@ -103,7 +108,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws IOException {
String ip = ingestDocument.getFieldValue(field, String.class, ignoreMissing);
Object ip = ingestDocument.getFieldValue(field, Object.class, ignoreMissing);
if (ip == null && ignoreMissing) {
return ingestDocument;
@ -111,11 +116,43 @@ public final class GeoIpProcessor extends AbstractProcessor {
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract geoip information.");
}
final InetAddress ipAddress = InetAddresses.forString(ip);
if (ip instanceof String) {
Map<String, Object> geoData = getGeoData((String) ip);
if (geoData.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, geoData);
}
} else if (ip instanceof List) {
boolean match = false;
List<Map<String, Object>> geoDataList = new ArrayList<>(((List) ip).size());
for (Object ipAddr : (List) ip) {
if (ipAddr instanceof String == false) {
throw new IllegalArgumentException("array in field [" + field + "] should only contain strings");
}
Map<String, Object> geoData = getGeoData((String) ipAddr);
if (geoData.isEmpty()) {
geoDataList.add(null);
continue;
}
if (firstOnly) {
ingestDocument.setFieldValue(targetField, geoData);
return ingestDocument;
}
match = true;
geoDataList.add(geoData);
}
if (match) {
ingestDocument.setFieldValue(targetField, geoDataList);
}
} else {
throw new IllegalArgumentException("field [" + field + "] should contain only string or array of strings");
}
return ingestDocument;
}
Map<String, Object> geoData;
private Map<String, Object> getGeoData(String ip) throws IOException {
String databaseType = lazyLoader.getDatabaseType();
final InetAddress ipAddress = InetAddresses.forString(ip);
Map<String, Object> geoData;
if (databaseType.endsWith(CITY_DB_SUFFIX)) {
try {
geoData = retrieveCityGeoData(ipAddress);
@ -136,12 +173,9 @@ public final class GeoIpProcessor extends AbstractProcessor {
}
} else {
throw new ElasticsearchParseException("Unsupported database type [" + lazyLoader.getDatabaseType()
+ "]", new IllegalStateException());
+ "]", new IllegalStateException());
}
if (geoData.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, geoData);
}
return ingestDocument;
return geoData;
}
@Override
@ -360,14 +394,15 @@ public final class GeoIpProcessor extends AbstractProcessor {
@Override
public GeoIpProcessor create(
final Map<String, Processor.Factory> registry,
final String processorTag,
final Map<String, Object> config) throws IOException {
final Map<String, Processor.Factory> registry,
final String processorTag,
final Map<String, Object> config) throws IOException {
String ipField = readStringProperty(TYPE, processorTag, config, "field");
String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "geoip");
String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb");
List<String> propertyNames = readOptionalList(TYPE, processorTag, config, "properties");
boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
boolean firstOnly = readBooleanProperty(TYPE, processorTag, config, "first_only", true);
DatabaseReaderLazyLoader lazyLoader = databaseReaders.get(databaseFile);
if (lazyLoader == null) {
@ -397,11 +432,11 @@ public final class GeoIpProcessor extends AbstractProcessor {
properties = DEFAULT_ASN_PROPERTIES;
} else {
throw newConfigurationException(TYPE, processorTag, "database_file", "Unsupported database type ["
+ databaseType + "]");
+ databaseType + "]");
}
}
return new GeoIpProcessor(processorTag, ipField, lazyLoader, targetField, properties, ignoreMissing, cache);
return new GeoIpProcessor(processorTag, ipField, lazyLoader, targetField, properties, ignoreMissing, cache, firstOnly);
}
}
@ -460,7 +495,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
return property;
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("illegal property value [" + value + "]. valid values are " +
Arrays.toString(validProperties.toArray()));
Arrays.toString(validProperties.toArray()));
}
}
}

View File

@ -29,9 +29,11 @@ import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
@ -39,13 +41,14 @@ import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocumen
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
public class GeoIpProcessorTests extends ESTestCase {
public void testCity() throws Exception {
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
new GeoIpCache(1000), false);
Map<String, Object> document = new HashMap<>();
document.put("source_field", "8.8.8.8");
@ -70,7 +73,7 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testNullValueWithIgnoreMissing() throws Exception {
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true,
new GeoIpCache(1000));
new GeoIpCache(1000), false);
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(),
Collections.singletonMap("source_field", null));
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
@ -81,7 +84,7 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testNonExistentWithIgnoreMissing() throws Exception {
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true,
new GeoIpCache(1000));
new GeoIpCache(1000), false);
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
processor.execute(ingestDocument);
@ -91,7 +94,7 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testNullWithoutIgnoreMissing() throws Exception {
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
new GeoIpCache(1000), false);
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(),
Collections.singletonMap("source_field", null));
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
@ -102,7 +105,7 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testNonExistentWithoutIgnoreMissing() throws Exception {
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
new GeoIpCache(1000), false);
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
Exception exception = expectThrows(Exception.class, () -> processor.execute(ingestDocument));
@ -112,7 +115,7 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testCity_withIpV6() throws Exception {
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
new GeoIpCache(1000), false);
String address = "2602:306:33d3:8000::3257:9652";
Map<String, Object> document = new HashMap<>();
@ -141,7 +144,7 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testCityWithMissingLocation() throws Exception {
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
new GeoIpCache(1000), false);
Map<String, Object> document = new HashMap<>();
document.put("source_field", "80.231.5.0");
@ -158,7 +161,7 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testCountry() throws Exception {
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
loader("/GeoLite2-Country.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
new GeoIpCache(1000), false);
Map<String, Object> document = new HashMap<>();
document.put("source_field", "82.170.213.79");
@ -178,7 +181,7 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testCountryWithMissingLocation() throws Exception {
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
loader("/GeoLite2-Country.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
new GeoIpCache(1000), false);
Map<String, Object> document = new HashMap<>();
document.put("source_field", "80.231.5.0");
@ -196,7 +199,7 @@ public class GeoIpProcessorTests extends ESTestCase {
String ip = "82.171.64.0";
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
loader("/GeoLite2-ASN.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
new GeoIpCache(1000), false);
Map<String, Object> document = new HashMap<>();
document.put("source_field", ip);
@ -215,7 +218,7 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testAddressIsNotInTheDatabase() throws Exception {
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
new GeoIpCache(1000), false);
Map<String, Object> document = new HashMap<>();
document.put("source_field", "127.0.0.1");
@ -228,7 +231,7 @@ public class GeoIpProcessorTests extends ESTestCase {
public void testInvalid() throws Exception {
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000));
new GeoIpCache(1000), false);
Map<String, Object> document = new HashMap<>();
document.put("source_field", "www.google.com");
@ -237,6 +240,93 @@ public class GeoIpProcessorTests extends ESTestCase {
assertThat(e.getMessage(), containsString("not an IP string literal"));
}
public void testListAllValid() throws Exception {
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000), false);
Map<String, Object> document = new HashMap<>();
document.put("source_field", Arrays.asList("8.8.8.8", "82.171.64.0"));
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
processor.execute(ingestDocument);
@SuppressWarnings("unchecked")
List<Map<String, Object>> geoData = (List<Map<String, Object>>) ingestDocument.getSourceAndMetadata().get("target_field");
Map<String, Object> location = new HashMap<>();
location.put("lat", 37.751d);
location.put("lon", -97.822d);
assertThat(geoData.get(0).get("location"), equalTo(location));
assertThat(geoData.get(1).get("city_name"), equalTo("Hoensbroek"));
}
public void testListPartiallyValid() throws Exception {
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000), false);
Map<String, Object> document = new HashMap<>();
document.put("source_field", Arrays.asList("8.8.8.8", "127.0.0.1"));
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
processor.execute(ingestDocument);
@SuppressWarnings("unchecked")
List<Map<String, Object>> geoData = (List<Map<String, Object>>) ingestDocument.getSourceAndMetadata().get("target_field");
Map<String, Object> location = new HashMap<>();
location.put("lat", 37.751d);
location.put("lon", -97.822d);
assertThat(geoData.get(0).get("location"), equalTo(location));
assertThat(geoData.get(1), nullValue());
}
public void testListNoMatches() throws Exception {
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000), false);
Map<String, Object> document = new HashMap<>();
document.put("source_field", Arrays.asList("127.0.0.1", "127.0.0.1"));
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
processor.execute(ingestDocument);
assertFalse(ingestDocument.hasField("target_field"));
}
public void testListFirstOnly() throws Exception {
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000), true);
Map<String, Object> document = new HashMap<>();
document.put("source_field", Arrays.asList("8.8.8.8", "127.0.0.1"));
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
processor.execute(ingestDocument);
@SuppressWarnings("unchecked")
Map<String, Object> geoData = (Map<String, Object>) ingestDocument.getSourceAndMetadata().get("target_field");
Map<String, Object> location = new HashMap<>();
location.put("lat", 37.751d);
location.put("lon", -97.822d);
assertThat(geoData.get("location"), equalTo(location));
}
public void testListFirstOnlyNoMatches() throws Exception {
GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field",
loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false,
new GeoIpCache(1000), true);
Map<String, Object> document = new HashMap<>();
document.put("source_field", Arrays.asList("127.0.0.1", "127.0.0.2"));
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata().containsKey("target_field"), is(false));
}
private DatabaseReaderLazyLoader loader(final String path) {
final Supplier<InputStream> databaseInputStreamSupplier = () -> GeoIpProcessor.class.getResourceAsStream(path);
final CheckedSupplier<DatabaseReader, IOException> loader =

View File

@ -37,6 +37,87 @@
- match: { _source.geoip.region_name: "Minnesota" }
- match: { _source.geoip.continent_name: "North America" }
---
"Test geoip processor with list":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"geoip" : {
"field" : "field1",
"first_only" : false
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
id: 1
pipeline: "my_pipeline"
body: {field1: ["128.101.101.101", "127.0.0.1"]}
- do:
get:
index: test
id: 1
- match: { _source.field1: ["128.101.101.101", "127.0.0.1"] }
- length: { _source.geoip: 2 }
- length: { _source.geoip.0: 6 }
- match: { _source.geoip.0.city_name: "Minneapolis" }
- match: { _source.geoip.0.country_iso_code: "US" }
- match: { _source.geoip.0.location.lon: -93.2548 }
- match: { _source.geoip.0.location.lat: 44.9399 }
- match: { _source.geoip.0.region_iso_code: "US-MN" }
- match: { _source.geoip.0.region_name: "Minnesota" }
- match: { _source.geoip.0.continent_name: "North America" }
- match: { _source.geoip.1: null }
---
"Test geoip processor with lists, first only":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"geoip" : {
"field" : "field1"
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
id: 1
pipeline: "my_pipeline"
body: {field1: ["127.0.0.1", "128.101.101.101", "128.101.101.101"]}
- do:
get:
index: test
id: 1
- match: { _source.field1: ["127.0.0.1", "128.101.101.101", "128.101.101.101"] }
- length: { _source.geoip: 6 }
- match: { _source.geoip.city_name: "Minneapolis" }
- match: { _source.geoip.country_iso_code: "US" }
- match: { _source.geoip.location.lon: -93.2548 }
- match: { _source.geoip.location.lat: 44.9399 }
- match: { _source.geoip.region_iso_code: "US-MN" }
- match: { _source.geoip.region_name: "Minnesota" }
- match: { _source.geoip.continent_name: "North America" }
---
"Test geoip processor with fields":
- do: