NIFI-5902 Added GeoEnrichIPRecord.

NIFI-5902 Updated GeoEnrichIPRecord with more error handling.

NIFI-5902 Made changes requested in a review.

NIFI-5902 Updated EL support on database path.

NIFI-5902 Made updates based on code review.

NIFI-5902 Made two changes from a code review.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #3231
This commit is contained in:
Mike Thomsen 2018-12-17 15:47:55 -05:00 committed by Matthew Burgess
parent 8bed249f37
commit 3e7816da43
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
12 changed files with 731 additions and 75 deletions

View File

@ -29,6 +29,12 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>${project.version}</version>
<type>nar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-enrich-processors</artifactId> <artifactId>nifi-enrich-processors</artifactId>

View File

@ -88,5 +88,50 @@
<version>1.6.5</version> <version>1.6.5</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-path</artifactId>
<version>1.10.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services</artifactId>
<version>1.10.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<version>1.10.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>src/test/resources/avro/record_schema.avsc</exclude>
<exclude>src/test/resources/json/one_record.json</exclude>
<exclude>src/test/resources/json/one_record_no_geo.json</exclude>
<exclude>src/test/resources/json/two_records_for_split.json</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -50,6 +50,7 @@ public abstract class AbstractEnrichIP extends AbstractProcessor {
.description("Path to Maxmind IP Enrichment Database File") .description("Path to Maxmind IP Enrichment Database File")
.required(true) .required(true)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build(); .build();
public static final PropertyDescriptor IP_ADDRESS_ATTRIBUTE = new PropertyDescriptor.Builder() public static final PropertyDescriptor IP_ADDRESS_ATTRIBUTE = new PropertyDescriptor.Builder()
@ -88,7 +89,7 @@ public abstract class AbstractEnrichIP extends AbstractProcessor {
@OnScheduled @OnScheduled
public void onScheduled(final ProcessContext context) throws IOException { public void onScheduled(final ProcessContext context) throws IOException {
final String dbFileString = context.getProperty(GEO_DATABASE_FILE).getValue(); final String dbFileString = context.getProperty(GEO_DATABASE_FILE).evaluateAttributeExpressions().getValue();
final File dbFile = new File(dbFileString); final File dbFile = new File(dbFileString);
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
final DatabaseReader reader = new DatabaseReader.Builder(dbFile).build(); final DatabaseReader reader = new DatabaseReader.Builder(dbFile).build();

View File

@ -0,0 +1,352 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors;
import com.maxmind.geoip2.model.CityResponse;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.maxmind.DatabaseReader;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"geo", "enrich", "ip", "maxmind", "record"})
@CapabilityDescription("Looks up geolocation information for an IP address and adds the geo information to FlowFile attributes. The "
+ "geo data is provided as a MaxMind database. This version uses the NiFi Record API to allow large scale enrichment of record-oriented data sets. "
+ "Each field provided by the MaxMind database can be directed to a field of the user's choosing by providing a record path for that field configuration. ")
public class GeoEnrichIPRecord extends AbstractEnrichIP {
public static final PropertyDescriptor READER = new PropertyDescriptor.Builder()
.name("geo-enrich-ip-record-reader")
.displayName("Record Reader")
.description("Record reader service to use for reading the flowfile contents.")
.required(true)
.identifiesControllerService(RecordReaderFactory.class)
.build();
public static final PropertyDescriptor WRITER = new PropertyDescriptor.Builder()
.name("geo-enrich-ip-record-writer")
.displayName("Record Writer")
.description("Record writer service to use for enriching the flowfile contents.")
.required(true)
.identifiesControllerService(RecordSetWriterFactory.class)
.build();
public static final PropertyDescriptor IP_RECORD_PATH = new PropertyDescriptor.Builder()
.name("geo-enrich-ip-ip-record-path")
.displayName("IP Address Record Path")
.description("The record path to retrieve the IP address for doing the lookup.")
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
public static final PropertyDescriptor SPLIT_FOUND_NOT_FOUND = new PropertyDescriptor.Builder()
.name("geo-enrich-ip-split-found-not-found")
.displayName("Separate Enriched From Not Enriched")
.description("Separate records that have been enriched from ones that have not. Default behavior is " +
"to send everything to the found relationship if even one record is enriched.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor GEO_CITY = new PropertyDescriptor.Builder()
.name("geo-enrich-ip-city-record-path")
.displayName("City Record Path")
.description("Record path for putting the city identified for the IP address")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor GEO_ACCURACY = new PropertyDescriptor.Builder()
.name("geo-enrich-ip-accuracy-record-path")
.displayName("Accuracy Radius Record Path")
.description("Record path for putting the accuracy radius if provided by the database (in Kilometers)")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor GEO_LATITUDE = new PropertyDescriptor.Builder()
.name("geo-enrich-ip-latitude-record-path")
.displayName("Latitude Record Path")
.description("Record path for putting the latitude identified for this IP address")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor GEO_LONGITUDE = new PropertyDescriptor.Builder()
.name("geo-enrich-ip-longitude-record-path")
.displayName("Longitude Record Path")
.description("Record path for putting the longitude identified for this IP address")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor GEO_COUNTRY = new PropertyDescriptor.Builder()
.name("geo-enrich-ip-country-record-path")
.displayName("Country Record Path")
.description("Record path for putting the country identified for this IP address")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor GEO_COUNTRY_ISO = new PropertyDescriptor.Builder()
.name("geo-enrich-ip-country-iso-record-path")
.displayName("Country ISO Code Record Path")
.description("Record path for putting the ISO Code for the country identified")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor GEO_POSTAL_CODE = new PropertyDescriptor.Builder()
.name("geo-enrich-ip-country-postal-record-path")
.displayName("Country Postal Code Record Path")
.description("Record path for putting the postal code for the country identified")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("The original input flowfile goes to this relationship regardless of whether the content was enriched or not.")
.build();
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_ORIGINAL, REL_FOUND, REL_NOT_FOUND
)));
public static final List<PropertyDescriptor> GEO_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
GEO_CITY, GEO_ACCURACY, GEO_LATITUDE, GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE
));
private static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
GEO_DATABASE_FILE, READER, WRITER, SPLIT_FOUND_NOT_FOUND, IP_RECORD_PATH, GEO_CITY, GEO_ACCURACY, GEO_LATITUDE,
GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE
));
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
protected volatile RecordReaderFactory readerFactory;
protected volatile RecordSetWriterFactory writerFactory;
protected boolean splitOutput;
@Override
@OnScheduled
public void onScheduled(ProcessContext context) throws IOException {
super.onScheduled(context);
readerFactory = context.getProperty(READER).asControllerService(RecordReaderFactory.class);
writerFactory = context.getProperty(WRITER).asControllerService(RecordSetWriterFactory.class);
splitOutput = context.getProperty(SPLIT_FOUND_NOT_FOUND).asBoolean();
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile input = session.get();
if (input == null) {
return;
}
FlowFile output = session.create(input);
FlowFile notFound = splitOutput ? session.create(input) : null;
final DatabaseReader dbReader = databaseReaderRef.get();
try (InputStream is = session.read(input);
OutputStream os = session.write(output);
OutputStream osNotFound = splitOutput ? session.write(notFound) : null) {
RecordPathCache cache = new RecordPathCache(GEO_PROPERTIES.size() + 1);
Map<PropertyDescriptor, RecordPath> paths = new HashMap<>();
for (PropertyDescriptor descriptor : GEO_PROPERTIES) {
if (!context.getProperty(descriptor).isSet()) {
continue;
}
String rawPath = context.getProperty(descriptor).evaluateAttributeExpressions(input).getValue();
RecordPath compiled = cache.getCompiled(rawPath);
paths.put(descriptor, compiled);
}
String rawIpPath = context.getProperty(IP_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
RecordPath ipPath = cache.getCompiled(rawIpPath);
RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
RecordSchema schema = writerFactory.getSchema(input.getAttributes(), reader.getSchema());
RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, os);
RecordSetWriter notFoundWriter = splitOutput ? writerFactory.createWriter(getLogger(), schema, osNotFound) : null;
Record record;
Relationship targetRelationship = REL_NOT_FOUND;
writer.beginRecordSet();
if (notFoundWriter != null) {
notFoundWriter.beginRecordSet();
}
int foundCount = 0;
int notFoundCount = 0;
while ((record = reader.nextRecord()) != null) {
CityResponse response = geocode(ipPath, record, dbReader);
boolean wasEnriched = enrichRecord(response, record, paths);
if (wasEnriched) {
targetRelationship = REL_FOUND;
}
if (!splitOutput || (splitOutput && wasEnriched)) {
writer.write(record);
foundCount++;
} else {
notFoundWriter.write(record);
notFoundCount++;
}
}
writer.finishRecordSet();
writer.close();
if (notFoundWriter != null) {
notFoundWriter.finishRecordSet();
notFoundWriter.close();
}
is.close();
os.close();
if (osNotFound != null) {
osNotFound.close();
}
output = session.putAllAttributes(output, buildAttributes(foundCount, writer.getMimeType()));
if (!splitOutput) {
session.transfer(output, targetRelationship);
session.remove(input);
} else {
if (notFoundCount > 0) {
notFound = session.putAllAttributes(notFound, buildAttributes(notFoundCount, writer.getMimeType()));
session.transfer(notFound, REL_NOT_FOUND);
} else {
session.remove(notFound);
}
session.transfer(output, REL_FOUND);
session.transfer(input, REL_ORIGINAL);
session.getProvenanceReporter().modifyContent(notFound);
}
session.getProvenanceReporter().modifyContent(output);
} catch (Exception ex) {
getLogger().error("Error enriching records.", ex);
session.rollback();
context.yield();
}
}
private Map<String, String> buildAttributes(int recordCount, String mimeType) {
Map<String, String> retVal = new HashMap<>();
retVal.put(CoreAttributes.MIME_TYPE.key(), mimeType);
retVal.put("record.count", String.valueOf(recordCount));
return retVal;
}
private CityResponse geocode(RecordPath ipPath, Record record, DatabaseReader reader) throws Exception {
RecordPathResult result = ipPath.evaluate(record);
Optional<FieldValue> ipField = result.getSelectedFields().findFirst();
if (ipField.isPresent()) {
FieldValue value = ipField.get();
Object val = value.getValue();
if (val == null) {
return null;
}
String realValue = val.toString();
InetAddress address = InetAddress.getByName(realValue);
return reader.city(address);
} else {
return null;
}
}
private boolean enrichRecord(CityResponse response, Record record, Map<PropertyDescriptor, RecordPath> cached) {
boolean retVal;
if (response == null) {
return false;
} else if (response.getCity() == null) {
return false;
}
boolean city = update(GEO_CITY, cached, record, response.getCity().getName());
boolean accuracy = update(GEO_ACCURACY, cached, record, response.getCity().getConfidence());
boolean country = update(GEO_COUNTRY, cached, record, response.getCountry().getName());
boolean iso = update(GEO_COUNTRY_ISO, cached, record, response.getCountry().getIsoCode());
boolean lat = update(GEO_LATITUDE, cached, record, response.getLocation().getLatitude());
boolean lon = update(GEO_LONGITUDE, cached, record, response.getLocation().getLongitude());
boolean postal = update(GEO_POSTAL_CODE, cached, record, response.getPostal().getCode());
retVal = (city || accuracy || country || iso || lat || lon || postal);
return retVal;
}
private boolean update(PropertyDescriptor descriptor, Map<PropertyDescriptor, RecordPath> cached, Record record, Object fieldValue) {
if (!cached.containsKey(descriptor) || fieldValue == null) {
return false;
}
RecordPath cityPath = cached.get(descriptor);
RecordPathResult result = cityPath.evaluate(record);
FieldValue value = result.getSelectedFields().findFirst().get();
if (value.getParent().get().getValue() == null) {
return false;
}
value.updateValue(fieldValue.toString());
return true;
}
}

View File

@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
org.apache.nifi.processors.GeoEnrichIP org.apache.nifi.processors.GeoEnrichIP
org.apache.nifi.processors.GeoEnrichIPRecord
org.apache.nifi.processors.ISPEnrichIP org.apache.nifi.processors.ISPEnrichIP
org.apache.nifi.processors.enrich.QueryDNS org.apache.nifi.processors.enrich.QueryDNS
org.apache.nifi.processors.enrich.QueryWhois org.apache.nifi.processors.enrich.QueryWhois

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.maxmind.geoip2.model.CityResponse;
import java.util.Collections;
public class GeoEnrichTestUtils {
public static CityResponse getFullCityResponse() throws Exception {
// Taken from MaxMind unit tests.
final String maxMindCityResponse = "{\"city\":{\"confidence\":76,"
+ "\"geoname_id\":9876,\"names\":{\"en\":\"Minneapolis\""
+ "}},\"continent\":{\"code\":\"NA\","
+ "\"geoname_id\":42,\"names\":{" + "\"en\":\"North America\""
+ "}},\"country\":{\"confidence\":99,"
+ "\"iso_code\":\"US\",\"geoname_id\":1,\"names\":{"
+ "\"en\":\"United States of America\"" + "}" + "},"
+ "\"location\":{" + "\"accuracy_radius\":1500,"
+ "\"latitude\":44.98," + "\"longitude\":93.2636,"
+ "\"metro_code\":765," + "\"time_zone\":\"America/Chicago\""
+ "}," + "\"postal\":{\"confidence\": 33, \"code\":\"55401\"},"
+ "\"registered_country\":{" + "\"geoname_id\":2,"
+ "\"iso_code\":\"CA\"," + "\"names\":{" + "\"en\":\"Canada\""
+ "}" + "}," + "\"represented_country\":{" + "\"geoname_id\":3,"
+ "\"iso_code\":\"GB\"," + "\"names\":{"
+ "\"en\":\"United Kingdom\"" + "}," + "\"type\":\"C<military>\""
+ "}," + "\"subdivisions\":[{" + "\"confidence\":88,"
+ "\"geoname_id\":574635," + "\"iso_code\":\"MN\"," + "\"names\":{"
+ "\"en\":\"Minnesota\"" + "}" + "}," + "{\"iso_code\":\"TT\"}],"
+ "\"traits\":{" + "\"autonomous_system_number\":1234,"
+ "\"autonomous_system_organization\":\"AS Organization\","
+ "\"domain\":\"example.com\"," + "\"ip_address\":\"1.2.3.4\","
+ "\"is_anonymous_proxy\":true,"
+ "\"is_satellite_provider\":true," + "\"isp\":\"Comcast\","
+ "\"organization\":\"Blorg\"," + "\"user_type\":\"college\""
+ "}," + "\"maxmind\":{\"queries_remaining\":11}" + "}";
InjectableValues inject = new InjectableValues.Std().addValue("locales", Collections.singletonList("en"));
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return new ObjectMapper().reader(CityResponse.class).with(inject).readValue(maxMindCityResponse);
}
public static CityResponse getNullLatAndLongCityResponse() throws Exception {
// Taken from MaxMind unit tests and modified.
final String maxMindCityResponse = "{" + "\"city\":{" + "\"confidence\":76,"
+ "\"geoname_id\":9876," + "\"names\":{" + "\"en\":\"Minneapolis\""
+ "}" + "}," + "\"continent\":{" + "\"code\":\"NA\","
+ "\"geoname_id\":42," + "\"names\":{" + "\"en\":\"North America\""
+ "}" + "}," + "\"country\":{" + "\"confidence\":99,"
+ "\"iso_code\":\"US\"," + "\"geoname_id\":1," + "\"names\":{"
+ "\"en\":\"United States of America\"" + "}" + "},"
+ "\"location\":{" + "\"accuracy_radius\":1500,"
+ "\"metro_code\":765," + "\"time_zone\":\"America/Chicago\""
+ "}," + "\"postal\":{\"confidence\": 33, \"code\":\"55401\"},"
+ "\"registered_country\":{" + "\"geoname_id\":2,"
+ "\"iso_code\":\"CA\"," + "\"names\":{" + "\"en\":\"Canada\""
+ "}" + "}," + "\"represented_country\":{" + "\"geoname_id\":3,"
+ "\"iso_code\":\"GB\"," + "\"names\":{"
+ "\"en\":\"United Kingdom\"" + "}," + "\"type\":\"C<military>\""
+ "}," + "\"subdivisions\":[{" + "\"confidence\":88,"
+ "\"geoname_id\":574635," + "\"iso_code\":\"MN\"," + "\"names\":{"
+ "\"en\":\"Minnesota\"" + "}" + "}," + "{\"iso_code\":\"TT\"}],"
+ "\"traits\":{" + "\"autonomous_system_number\":1234,"
+ "\"autonomous_system_organization\":\"AS Organization\","
+ "\"domain\":\"example.com\"," + "\"ip_address\":\"1.2.3.4\","
+ "\"is_anonymous_proxy\":true,"
+ "\"is_satellite_provider\":true," + "\"isp\":\"Comcast\","
+ "\"organization\":\"Blorg\"," + "\"user_type\":\"college\""
+ "}," + "\"maxmind\":{\"queries_remaining\":11}" + "}";
InjectableValues inject = new InjectableValues.Std().addValue("locales", Collections.singletonList("en"));
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return new ObjectMapper().reader(CityResponse.class).with(inject).readValue(maxMindCityResponse);
}
}

View File

@ -16,9 +16,6 @@
*/ */
package org.apache.nifi.processors; package org.apache.nifi.processors;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.maxmind.geoip2.model.CityResponse; import com.maxmind.geoip2.model.CityResponse;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
@ -41,6 +38,8 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.apache.nifi.processors.GeoEnrichTestUtils.getFullCityResponse;
import static org.apache.nifi.processors.GeoEnrichTestUtils.getNullLatAndLongCityResponse;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
@ -283,77 +282,6 @@ public class TestGeoEnrichIP {
verifyNoMoreInteractions(databaseReader); verifyNoMoreInteractions(databaseReader);
} }
private CityResponse getFullCityResponse() throws Exception {
// Taken from MaxMind unit tests.
final String maxMindCityResponse = "{\"city\":{\"confidence\":76,"
+ "\"geoname_id\":9876,\"names\":{\"en\":\"Minneapolis\""
+ "}},\"continent\":{\"code\":\"NA\","
+ "\"geoname_id\":42,\"names\":{" + "\"en\":\"North America\""
+ "}},\"country\":{\"confidence\":99,"
+ "\"iso_code\":\"US\",\"geoname_id\":1,\"names\":{"
+ "\"en\":\"United States of America\"" + "}" + "},"
+ "\"location\":{" + "\"accuracy_radius\":1500,"
+ "\"latitude\":44.98," + "\"longitude\":93.2636,"
+ "\"metro_code\":765," + "\"time_zone\":\"America/Chicago\""
+ "}," + "\"postal\":{\"confidence\": 33, \"code\":\"55401\"},"
+ "\"registered_country\":{" + "\"geoname_id\":2,"
+ "\"iso_code\":\"CA\"," + "\"names\":{" + "\"en\":\"Canada\""
+ "}" + "}," + "\"represented_country\":{" + "\"geoname_id\":3,"
+ "\"iso_code\":\"GB\"," + "\"names\":{"
+ "\"en\":\"United Kingdom\"" + "}," + "\"type\":\"C<military>\""
+ "}," + "\"subdivisions\":[{" + "\"confidence\":88,"
+ "\"geoname_id\":574635," + "\"iso_code\":\"MN\"," + "\"names\":{"
+ "\"en\":\"Minnesota\"" + "}" + "}," + "{\"iso_code\":\"TT\"}],"
+ "\"traits\":{" + "\"autonomous_system_number\":1234,"
+ "\"autonomous_system_organization\":\"AS Organization\","
+ "\"domain\":\"example.com\"," + "\"ip_address\":\"1.2.3.4\","
+ "\"is_anonymous_proxy\":true,"
+ "\"is_satellite_provider\":true," + "\"isp\":\"Comcast\","
+ "\"organization\":\"Blorg\"," + "\"user_type\":\"college\""
+ "}," + "\"maxmind\":{\"queries_remaining\":11}" + "}";
InjectableValues inject = new InjectableValues.Std().addValue("locales", Collections.singletonList("en"));
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return new ObjectMapper().reader(CityResponse.class).with(inject).readValue(maxMindCityResponse);
}
private CityResponse getNullLatAndLongCityResponse() throws Exception {
// Taken from MaxMind unit tests and modified.
final String maxMindCityResponse = "{" + "\"city\":{" + "\"confidence\":76,"
+ "\"geoname_id\":9876," + "\"names\":{" + "\"en\":\"Minneapolis\""
+ "}" + "}," + "\"continent\":{" + "\"code\":\"NA\","
+ "\"geoname_id\":42," + "\"names\":{" + "\"en\":\"North America\""
+ "}" + "}," + "\"country\":{" + "\"confidence\":99,"
+ "\"iso_code\":\"US\"," + "\"geoname_id\":1," + "\"names\":{"
+ "\"en\":\"United States of America\"" + "}" + "},"
+ "\"location\":{" + "\"accuracy_radius\":1500,"
+ "\"metro_code\":765," + "\"time_zone\":\"America/Chicago\""
+ "}," + "\"postal\":{\"confidence\": 33, \"code\":\"55401\"},"
+ "\"registered_country\":{" + "\"geoname_id\":2,"
+ "\"iso_code\":\"CA\"," + "\"names\":{" + "\"en\":\"Canada\""
+ "}" + "}," + "\"represented_country\":{" + "\"geoname_id\":3,"
+ "\"iso_code\":\"GB\"," + "\"names\":{"
+ "\"en\":\"United Kingdom\"" + "}," + "\"type\":\"C<military>\""
+ "}," + "\"subdivisions\":[{" + "\"confidence\":88,"
+ "\"geoname_id\":574635," + "\"iso_code\":\"MN\"," + "\"names\":{"
+ "\"en\":\"Minnesota\"" + "}" + "}," + "{\"iso_code\":\"TT\"}],"
+ "\"traits\":{" + "\"autonomous_system_number\":1234,"
+ "\"autonomous_system_organization\":\"AS Organization\","
+ "\"domain\":\"example.com\"," + "\"ip_address\":\"1.2.3.4\","
+ "\"is_anonymous_proxy\":true,"
+ "\"is_satellite_provider\":true," + "\"isp\":\"Comcast\","
+ "\"organization\":\"Blorg\"," + "\"user_type\":\"college\""
+ "}," + "\"maxmind\":{\"queries_remaining\":11}" + "}";
InjectableValues inject = new InjectableValues.Std().addValue("locales", Collections.singletonList("en"));
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return new ObjectMapper().reader(CityResponse.class).with(inject).readValue(maxMindCityResponse);
}
class TestableGeoEnrichIP extends GeoEnrichIP { class TestableGeoEnrichIP extends GeoEnrichIP {
@OnScheduled @OnScheduled
@Override @Override

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors;
import com.maxmind.geoip2.model.CityResponse;
import org.apache.avro.Schema;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.maxmind.DatabaseReader;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockSchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.GeoEnrichTestUtils.getFullCityResponse;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestGeoEnrichIPRecord {
private TestRunner runner;
private DatabaseReader reader;
@Before
public void setup() throws Exception {
reader = mock(DatabaseReader.class);
final CityResponse cityResponse = getFullCityResponse();
when(reader.city(InetAddress.getByName("1.2.3.4"))).thenReturn(cityResponse);
runner = TestRunners.newTestRunner(new TestableGeoEnrichIPRecord());
ControllerService reader = new JsonTreeReader();
ControllerService writer = new JsonRecordSetWriter();
ControllerService registry = new MockSchemaRegistry();
runner.addControllerService("reader", reader);
runner.addControllerService("writer", writer);
runner.addControllerService("registry", registry);
try (InputStream is = getClass().getResourceAsStream("/avro/record_schema.avsc")) {
String raw = IOUtils.toString(is, "UTF-8");
RecordSchema parsed = AvroTypeUtil.createSchema(new Schema.Parser().parse(raw));
((MockSchemaRegistry) registry).addSchema("record", parsed);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry");
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY);
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_REGISTRY, "registry");
runner.setProperty(GeoEnrichIPRecord.IP_RECORD_PATH, "/ip_address");
runner.setProperty(GeoEnrichIPRecord.READER, "reader");
runner.setProperty(GeoEnrichIPRecord.WRITER, "writer");
runner.enableControllerService(registry);
runner.enableControllerService(reader);
runner.enableControllerService(writer);
runner.setProperty(GeoEnrichIPRecord.GEO_CITY, "/geo/city");
runner.setProperty(GeoEnrichIPRecord.GEO_ACCURACY, "/geo/accuracy");
runner.setProperty(GeoEnrichIPRecord.GEO_COUNTRY, "/geo/country");
runner.setProperty(GeoEnrichIPRecord.GEO_COUNTRY_ISO, "/geo/country_iso");
runner.setProperty(GeoEnrichIPRecord.GEO_POSTAL_CODE, "/geo/country_postal");
runner.setProperty(GeoEnrichIPRecord.GEO_LATITUDE, "/geo/lat");
runner.setProperty(GeoEnrichIPRecord.GEO_LONGITUDE, "/geo/lon");
runner.assertValid();
}
private void commonTest(String path, int not, int found, int original) {
Map<String, String> attrs = new HashMap<>();
attrs.put("schema.name", "record");
runner.enqueue(getClass().getResourceAsStream(path), attrs);
runner.run();
runner.assertTransferCount(GeoEnrichIPRecord.REL_NOT_FOUND, not);
runner.assertTransferCount(GeoEnrichIPRecord.REL_FOUND, found);
runner.assertTransferCount(GeoEnrichIPRecord.REL_ORIGINAL, original);
}
@Test
public void testSplitOutput() throws Exception {
runner.setProperty(GeoEnrichIPRecord.SPLIT_FOUND_NOT_FOUND, "true");
commonTest("/json/two_records_for_split.json", 1, 1, 1);
}
@Test
public void testEnrichSendToNotFound() throws Exception {
commonTest("/json/one_record_no_geo.json", 1, 0, 0);
}
@Test
public void testEnrichSendToFound() throws Exception {
commonTest("/json/one_record.json", 0, 1, 0);
MockFlowFile ff = runner.getFlowFilesForRelationship(GeoEnrichIPRecord.REL_FOUND).get(0);
byte[] raw = runner.getContentAsByteArray(ff);
String content = new String(raw);
ObjectMapper mapper = new ObjectMapper();
List<Map<String, Object>> result = (List<Map<String, Object>>)mapper.readValue(content, List.class);
Assert.assertNotNull(result);
Assert.assertEquals(1, result.size());
Map<String, Object> element = result.get(0);
Map<String, Object> geo = (Map<String, Object>) element.get("geo");
Assert.assertNotNull(geo);
Assert.assertNotNull(geo.get("accuracy"));
Assert.assertNotNull(geo.get("city"));
Assert.assertNotNull(geo.get("country"));
Assert.assertNotNull(geo.get("country_iso"));
Assert.assertNotNull(geo.get("country_postal"));
Assert.assertNotNull(geo.get("lat"));
Assert.assertNotNull(geo.get("lon"));
}
class TestableGeoEnrichIPRecord extends GeoEnrichIPRecord {
TestableGeoEnrichIPRecord() {}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.unmodifiableList(Arrays.asList(
READER, WRITER, IP_RECORD_PATH, SPLIT_FOUND_NOT_FOUND, GEO_CITY, GEO_ACCURACY, GEO_LATITUDE, GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE
));
}
@OnScheduled
public void onScheduled(ProcessContext context) {
databaseReaderRef.set(reader);
readerFactory = context.getProperty(READER).asControllerService(RecordReaderFactory.class);
writerFactory = context.getProperty(WRITER).asControllerService(RecordSetWriterFactory.class);
splitOutput = context.getProperty(SPLIT_FOUND_NOT_FOUND).asBoolean();
}
}
}

View File

@ -0,0 +1,42 @@
{
"type": "record",
"name": "TestRecord",
"fields": [{
"name": "message",
"type": "string"
},
{
"name": "ip_address",
"type": "string"
},
{
"name": "geo",
"type": ["null", {
"type": "record",
"name": "GeoRecord",
"fields": [{
"name": "city",
"type": "string"
}, {
"name": "country",
"type": "string"
}, {
"name": "country_iso",
"type": "string"
}, {
"name": "country_postal",
"type": "string"
}, {
"name": "lat",
"type": "double"
}, {
"name": "lon",
"type": "double"
}, {
"name": "accuracy",
"type": "int"
}]
}]
}
]
}

View File

@ -0,0 +1,5 @@
[{
"message": "Hello",
"ip_address": "1.2.3.4",
"geo": {}
}]

View File

@ -0,0 +1,4 @@
[{
"message": "Hello",
"ip_address": "1.2.3.4"
}]

View File

@ -0,0 +1,8 @@
[{
"message": "Hello",
"ip_address": "1.2.3.4"
}, {
"message": "Hello",
"ip_address": "1.2.3.4",
"geo": {}
}]