diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-nar/pom.xml b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-nar/pom.xml
index 864cf956ce..96f377c6ae 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-nar/pom.xml
@@ -29,6 +29,12 @@
+
+ org.apache.nifi
+ nifi-standard-services-api-nar
+ ${project.version}
+ nar
+
org.apache.nifi
nifi-enrich-processors
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml
index 68e0061697..213757e77d 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml
@@ -88,5 +88,50 @@
1.6.5
test
+
+ org.apache.nifi
+ nifi-record-serialization-service-api
+ compile
+
+
+ org.apache.nifi
+ nifi-record-path
+ 1.10.0-SNAPSHOT
+ compile
+
+
+ org.apache.nifi
+ nifi-record-serialization-services
+ 1.10.0-SNAPSHOT
+ test
+
+
+ org.apache.nifi
+ nifi-mock-record-utils
+ 1.10.0-SNAPSHOT
+ test
+
+
+ org.apache.nifi
+ nifi-schema-registry-service-api
+ test
+
+
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+ src/test/resources/avro/record_schema.avsc
+ src/test/resources/json/one_record.json
+ src/test/resources/json/one_record_no_geo.json
+ src/test/resources/json/two_records_for_split.json
+
+
+
+
+
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
index 30226a9c79..e72697655e 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
@@ -50,6 +50,7 @@ public abstract class AbstractEnrichIP extends AbstractProcessor {
.description("Path to Maxmind IP Enrichment Database File")
.required(true)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor IP_ADDRESS_ATTRIBUTE = new PropertyDescriptor.Builder()
@@ -88,7 +89,7 @@ public abstract class AbstractEnrichIP extends AbstractProcessor {
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
- final String dbFileString = context.getProperty(GEO_DATABASE_FILE).getValue();
+ final String dbFileString = context.getProperty(GEO_DATABASE_FILE).evaluateAttributeExpressions().getValue();
final File dbFile = new File(dbFileString);
final StopWatch stopWatch = new StopWatch(true);
final DatabaseReader reader = new DatabaseReader.Builder(dbFile).build();
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
new file mode 100644
index 0000000000..4c83d50dce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import com.maxmind.geoip2.model.CityResponse;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.maxmind.DatabaseReader;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"geo", "enrich", "ip", "maxmind", "record"})
+@CapabilityDescription("Looks up geolocation information for an IP address and adds the geo information to FlowFile attributes. The "
+ + "geo data is provided as a MaxMind database. This version uses the NiFi Record API to allow large scale enrichment of record-oriented data sets. "
+ + "Each field provided by the MaxMind database can be directed to a field of the user's choosing by providing a record path for that field configuration. ")
+public class GeoEnrichIPRecord extends AbstractEnrichIP {
+ public static final PropertyDescriptor READER = new PropertyDescriptor.Builder()
+ .name("geo-enrich-ip-record-reader")
+ .displayName("Record Reader")
+ .description("Record reader service to use for reading the flowfile contents.")
+ .required(true)
+ .identifiesControllerService(RecordReaderFactory.class)
+ .build();
+ public static final PropertyDescriptor WRITER = new PropertyDescriptor.Builder()
+ .name("geo-enrich-ip-record-writer")
+ .displayName("Record Writer")
+ .description("Record writer service to use for enriching the flowfile contents.")
+ .required(true)
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .build();
+ public static final PropertyDescriptor IP_RECORD_PATH = new PropertyDescriptor.Builder()
+ .name("geo-enrich-ip-ip-record-path")
+ .displayName("IP Address Record Path")
+ .description("The record path to retrieve the IP address for doing the lookup.")
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .build();
+ public static final PropertyDescriptor SPLIT_FOUND_NOT_FOUND = new PropertyDescriptor.Builder()
+ .name("geo-enrich-ip-split-found-not-found")
+ .displayName("Separate Enriched From Not Enriched")
+ .description("Separate records that have been enriched from ones that have not. Default behavior is " +
+ "to send everything to the found relationship if even one record is enriched.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor GEO_CITY = new PropertyDescriptor.Builder()
+ .name("geo-enrich-ip-city-record-path")
+ .displayName("City Record Path")
+ .description("Record path for putting the city identified for the IP address")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ public static final PropertyDescriptor GEO_ACCURACY = new PropertyDescriptor.Builder()
+ .name("geo-enrich-ip-accuracy-record-path")
+ .displayName("Accuracy Radius Record Path")
+ .description("Record path for putting the accuracy radius if provided by the database (in Kilometers)")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ public static final PropertyDescriptor GEO_LATITUDE = new PropertyDescriptor.Builder()
+ .name("geo-enrich-ip-latitude-record-path")
+ .displayName("Latitude Record Path")
+ .description("Record path for putting the latitude identified for this IP address")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ public static final PropertyDescriptor GEO_LONGITUDE = new PropertyDescriptor.Builder()
+ .name("geo-enrich-ip-longitude-record-path")
+ .displayName("Longitude Record Path")
+ .description("Record path for putting the longitude identified for this IP address")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ public static final PropertyDescriptor GEO_COUNTRY = new PropertyDescriptor.Builder()
+ .name("geo-enrich-ip-country-record-path")
+ .displayName("Country Record Path")
+ .description("Record path for putting the country identified for this IP address")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ public static final PropertyDescriptor GEO_COUNTRY_ISO = new PropertyDescriptor.Builder()
+ .name("geo-enrich-ip-country-iso-record-path")
+ .displayName("Country ISO Code Record Path")
+ .description("Record path for putting the ISO Code for the country identified")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ public static final PropertyDescriptor GEO_POSTAL_CODE = new PropertyDescriptor.Builder()
+ .name("geo-enrich-ip-country-postal-record-path")
+ .displayName("Country Postal Code Record Path")
+ .description("Record path for putting the postal code for the country identified")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("The original input flowfile goes to this relationship regardless of whether the content was enriched or not.")
+ .build();
+
+ public static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ REL_ORIGINAL, REL_FOUND, REL_NOT_FOUND
+ )));
+
+ public static final List GEO_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+ GEO_CITY, GEO_ACCURACY, GEO_LATITUDE, GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE
+ ));
+
+ private static final List DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+ GEO_DATABASE_FILE, READER, WRITER, SPLIT_FOUND_NOT_FOUND, IP_RECORD_PATH, GEO_CITY, GEO_ACCURACY, GEO_LATITUDE,
+ GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE
+ ));
+
+ @Override
+ public Set getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ protected volatile RecordReaderFactory readerFactory;
+ protected volatile RecordSetWriterFactory writerFactory;
+ protected boolean splitOutput;
+
+ @Override
+ @OnScheduled
+ public void onScheduled(ProcessContext context) throws IOException {
+ super.onScheduled(context);
+
+ readerFactory = context.getProperty(READER).asControllerService(RecordReaderFactory.class);
+ writerFactory = context.getProperty(WRITER).asControllerService(RecordSetWriterFactory.class);
+ splitOutput = context.getProperty(SPLIT_FOUND_NOT_FOUND).asBoolean();
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile input = session.get();
+ if (input == null) {
+ return;
+ }
+
+ FlowFile output = session.create(input);
+ FlowFile notFound = splitOutput ? session.create(input) : null;
+ final DatabaseReader dbReader = databaseReaderRef.get();
+ try (InputStream is = session.read(input);
+ OutputStream os = session.write(output);
+ OutputStream osNotFound = splitOutput ? session.write(notFound) : null) {
+ RecordPathCache cache = new RecordPathCache(GEO_PROPERTIES.size() + 1);
+ Map paths = new HashMap<>();
+ for (PropertyDescriptor descriptor : GEO_PROPERTIES) {
+ if (!context.getProperty(descriptor).isSet()) {
+ continue;
+ }
+ String rawPath = context.getProperty(descriptor).evaluateAttributeExpressions(input).getValue();
+ RecordPath compiled = cache.getCompiled(rawPath);
+ paths.put(descriptor, compiled);
+ }
+
+ String rawIpPath = context.getProperty(IP_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+ RecordPath ipPath = cache.getCompiled(rawIpPath);
+
+ RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
+ RecordSchema schema = writerFactory.getSchema(input.getAttributes(), reader.getSchema());
+ RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, os);
+ RecordSetWriter notFoundWriter = splitOutput ? writerFactory.createWriter(getLogger(), schema, osNotFound) : null;
+ Record record;
+ Relationship targetRelationship = REL_NOT_FOUND;
+ writer.beginRecordSet();
+
+ if (notFoundWriter != null) {
+ notFoundWriter.beginRecordSet();
+ }
+
+ int foundCount = 0;
+ int notFoundCount = 0;
+ while ((record = reader.nextRecord()) != null) {
+ CityResponse response = geocode(ipPath, record, dbReader);
+ boolean wasEnriched = enrichRecord(response, record, paths);
+ if (wasEnriched) {
+ targetRelationship = REL_FOUND;
+ }
+ if (!splitOutput || (splitOutput && wasEnriched)) {
+ writer.write(record);
+ foundCount++;
+ } else {
+ notFoundWriter.write(record);
+ notFoundCount++;
+ }
+ }
+ writer.finishRecordSet();
+ writer.close();
+
+ if (notFoundWriter != null) {
+ notFoundWriter.finishRecordSet();
+ notFoundWriter.close();
+ }
+
+ is.close();
+ os.close();
+ if (osNotFound != null) {
+ osNotFound.close();
+ }
+
+ output = session.putAllAttributes(output, buildAttributes(foundCount, writer.getMimeType()));
+ if (!splitOutput) {
+ session.transfer(output, targetRelationship);
+ session.remove(input);
+ } else {
+ if (notFoundCount > 0) {
+ notFound = session.putAllAttributes(notFound, buildAttributes(notFoundCount, writer.getMimeType()));
+ session.transfer(notFound, REL_NOT_FOUND);
+ } else {
+ session.remove(notFound);
+ }
+ session.transfer(output, REL_FOUND);
+ session.transfer(input, REL_ORIGINAL);
+ session.getProvenanceReporter().modifyContent(notFound);
+ }
+ session.getProvenanceReporter().modifyContent(output);
+ } catch (Exception ex) {
+ getLogger().error("Error enriching records.", ex);
+ session.rollback();
+ context.yield();
+ }
+ }
+
+ private Map buildAttributes(int recordCount, String mimeType) {
+ Map retVal = new HashMap<>();
+ retVal.put(CoreAttributes.MIME_TYPE.key(), mimeType);
+ retVal.put("record.count", String.valueOf(recordCount));
+
+ return retVal;
+ }
+
+ private CityResponse geocode(RecordPath ipPath, Record record, DatabaseReader reader) throws Exception {
+ RecordPathResult result = ipPath.evaluate(record);
+ Optional ipField = result.getSelectedFields().findFirst();
+ if (ipField.isPresent()) {
+ FieldValue value = ipField.get();
+ Object val = value.getValue();
+ if (val == null) {
+ return null;
+ }
+ String realValue = val.toString();
+ InetAddress address = InetAddress.getByName(realValue);
+
+ return reader.city(address);
+ } else {
+ return null;
+ }
+ }
+
+ private boolean enrichRecord(CityResponse response, Record record, Map cached) {
+ boolean retVal;
+
+ if (response == null) {
+ return false;
+ } else if (response.getCity() == null) {
+ return false;
+ }
+
+ boolean city = update(GEO_CITY, cached, record, response.getCity().getName());
+ boolean accuracy = update(GEO_ACCURACY, cached, record, response.getCity().getConfidence());
+ boolean country = update(GEO_COUNTRY, cached, record, response.getCountry().getName());
+ boolean iso = update(GEO_COUNTRY_ISO, cached, record, response.getCountry().getIsoCode());
+ boolean lat = update(GEO_LATITUDE, cached, record, response.getLocation().getLatitude());
+ boolean lon = update(GEO_LONGITUDE, cached, record, response.getLocation().getLongitude());
+ boolean postal = update(GEO_POSTAL_CODE, cached, record, response.getPostal().getCode());
+
+ retVal = (city || accuracy || country || iso || lat || lon || postal);
+
+ return retVal;
+ }
+
+ private boolean update(PropertyDescriptor descriptor, Map cached, Record record, Object fieldValue) {
+ if (!cached.containsKey(descriptor) || fieldValue == null) {
+ return false;
+ }
+ RecordPath cityPath = cached.get(descriptor);
+ RecordPathResult result = cityPath.evaluate(record);
+ FieldValue value = result.getSelectedFields().findFirst().get();
+
+ if (value.getParent().get().getValue() == null) {
+ return false;
+ }
+
+ value.updateValue(fieldValue.toString());
+
+ return true;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index f9ef597b11..63be915cfa 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -14,6 +14,7 @@
# limitations under the License.
org.apache.nifi.processors.GeoEnrichIP
+org.apache.nifi.processors.GeoEnrichIPRecord
org.apache.nifi.processors.ISPEnrichIP
org.apache.nifi.processors.enrich.QueryDNS
org.apache.nifi.processors.enrich.QueryWhois
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/GeoEnrichTestUtils.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/GeoEnrichTestUtils.java
new file mode 100644
index 0000000000..834e99cb3f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/GeoEnrichTestUtils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.maxmind.geoip2.model.CityResponse;
+
+import java.util.Collections;
+
+public class GeoEnrichTestUtils {
+ public static CityResponse getFullCityResponse() throws Exception {
+ // Taken from MaxMind unit tests.
+ final String maxMindCityResponse = "{\"city\":{\"confidence\":76,"
+ + "\"geoname_id\":9876,\"names\":{\"en\":\"Minneapolis\""
+ + "}},\"continent\":{\"code\":\"NA\","
+ + "\"geoname_id\":42,\"names\":{" + "\"en\":\"North America\""
+ + "}},\"country\":{\"confidence\":99,"
+ + "\"iso_code\":\"US\",\"geoname_id\":1,\"names\":{"
+ + "\"en\":\"United States of America\"" + "}" + "},"
+ + "\"location\":{" + "\"accuracy_radius\":1500,"
+ + "\"latitude\":44.98," + "\"longitude\":93.2636,"
+ + "\"metro_code\":765," + "\"time_zone\":\"America/Chicago\""
+ + "}," + "\"postal\":{\"confidence\": 33, \"code\":\"55401\"},"
+ + "\"registered_country\":{" + "\"geoname_id\":2,"
+ + "\"iso_code\":\"CA\"," + "\"names\":{" + "\"en\":\"Canada\""
+ + "}" + "}," + "\"represented_country\":{" + "\"geoname_id\":3,"
+ + "\"iso_code\":\"GB\"," + "\"names\":{"
+ + "\"en\":\"United Kingdom\"" + "}," + "\"type\":\"C\""
+ + "}," + "\"subdivisions\":[{" + "\"confidence\":88,"
+ + "\"geoname_id\":574635," + "\"iso_code\":\"MN\"," + "\"names\":{"
+ + "\"en\":\"Minnesota\"" + "}" + "}," + "{\"iso_code\":\"TT\"}],"
+ + "\"traits\":{" + "\"autonomous_system_number\":1234,"
+ + "\"autonomous_system_organization\":\"AS Organization\","
+ + "\"domain\":\"example.com\"," + "\"ip_address\":\"1.2.3.4\","
+ + "\"is_anonymous_proxy\":true,"
+ + "\"is_satellite_provider\":true," + "\"isp\":\"Comcast\","
+ + "\"organization\":\"Blorg\"," + "\"user_type\":\"college\""
+ + "}," + "\"maxmind\":{\"queries_remaining\":11}" + "}";
+
+ InjectableValues inject = new InjectableValues.Std().addValue("locales", Collections.singletonList("en"));
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ return new ObjectMapper().reader(CityResponse.class).with(inject).readValue(maxMindCityResponse);
+ }
+
+ public static CityResponse getNullLatAndLongCityResponse() throws Exception {
+ // Taken from MaxMind unit tests and modified.
+ final String maxMindCityResponse = "{" + "\"city\":{" + "\"confidence\":76,"
+ + "\"geoname_id\":9876," + "\"names\":{" + "\"en\":\"Minneapolis\""
+ + "}" + "}," + "\"continent\":{" + "\"code\":\"NA\","
+ + "\"geoname_id\":42," + "\"names\":{" + "\"en\":\"North America\""
+ + "}" + "}," + "\"country\":{" + "\"confidence\":99,"
+ + "\"iso_code\":\"US\"," + "\"geoname_id\":1," + "\"names\":{"
+ + "\"en\":\"United States of America\"" + "}" + "},"
+ + "\"location\":{" + "\"accuracy_radius\":1500,"
+ + "\"metro_code\":765," + "\"time_zone\":\"America/Chicago\""
+ + "}," + "\"postal\":{\"confidence\": 33, \"code\":\"55401\"},"
+ + "\"registered_country\":{" + "\"geoname_id\":2,"
+ + "\"iso_code\":\"CA\"," + "\"names\":{" + "\"en\":\"Canada\""
+ + "}" + "}," + "\"represented_country\":{" + "\"geoname_id\":3,"
+ + "\"iso_code\":\"GB\"," + "\"names\":{"
+ + "\"en\":\"United Kingdom\"" + "}," + "\"type\":\"C\""
+ + "}," + "\"subdivisions\":[{" + "\"confidence\":88,"
+ + "\"geoname_id\":574635," + "\"iso_code\":\"MN\"," + "\"names\":{"
+ + "\"en\":\"Minnesota\"" + "}" + "}," + "{\"iso_code\":\"TT\"}],"
+ + "\"traits\":{" + "\"autonomous_system_number\":1234,"
+ + "\"autonomous_system_organization\":\"AS Organization\","
+ + "\"domain\":\"example.com\"," + "\"ip_address\":\"1.2.3.4\","
+ + "\"is_anonymous_proxy\":true,"
+ + "\"is_satellite_provider\":true," + "\"isp\":\"Comcast\","
+ + "\"organization\":\"Blorg\"," + "\"user_type\":\"college\""
+ + "}," + "\"maxmind\":{\"queries_remaining\":11}" + "}";
+
+ InjectableValues inject = new InjectableValues.Std().addValue("locales", Collections.singletonList("en"));
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ return new ObjectMapper().reader(CityResponse.class).with(inject).readValue(maxMindCityResponse);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIP.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIP.java
index f6629a0157..5004ff5a7b 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIP.java
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIP.java
@@ -16,9 +16,6 @@
*/
package org.apache.nifi.processors;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.InjectableValues;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.maxmind.geoip2.model.CityResponse;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.flowfile.FlowFile;
@@ -41,6 +38,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.nifi.processors.GeoEnrichTestUtils.getFullCityResponse;
+import static org.apache.nifi.processors.GeoEnrichTestUtils.getNullLatAndLongCityResponse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -283,77 +282,6 @@ public class TestGeoEnrichIP {
verifyNoMoreInteractions(databaseReader);
}
- private CityResponse getFullCityResponse() throws Exception {
- // Taken from MaxMind unit tests.
- final String maxMindCityResponse = "{\"city\":{\"confidence\":76,"
- + "\"geoname_id\":9876,\"names\":{\"en\":\"Minneapolis\""
- + "}},\"continent\":{\"code\":\"NA\","
- + "\"geoname_id\":42,\"names\":{" + "\"en\":\"North America\""
- + "}},\"country\":{\"confidence\":99,"
- + "\"iso_code\":\"US\",\"geoname_id\":1,\"names\":{"
- + "\"en\":\"United States of America\"" + "}" + "},"
- + "\"location\":{" + "\"accuracy_radius\":1500,"
- + "\"latitude\":44.98," + "\"longitude\":93.2636,"
- + "\"metro_code\":765," + "\"time_zone\":\"America/Chicago\""
- + "}," + "\"postal\":{\"confidence\": 33, \"code\":\"55401\"},"
- + "\"registered_country\":{" + "\"geoname_id\":2,"
- + "\"iso_code\":\"CA\"," + "\"names\":{" + "\"en\":\"Canada\""
- + "}" + "}," + "\"represented_country\":{" + "\"geoname_id\":3,"
- + "\"iso_code\":\"GB\"," + "\"names\":{"
- + "\"en\":\"United Kingdom\"" + "}," + "\"type\":\"C\""
- + "}," + "\"subdivisions\":[{" + "\"confidence\":88,"
- + "\"geoname_id\":574635," + "\"iso_code\":\"MN\"," + "\"names\":{"
- + "\"en\":\"Minnesota\"" + "}" + "}," + "{\"iso_code\":\"TT\"}],"
- + "\"traits\":{" + "\"autonomous_system_number\":1234,"
- + "\"autonomous_system_organization\":\"AS Organization\","
- + "\"domain\":\"example.com\"," + "\"ip_address\":\"1.2.3.4\","
- + "\"is_anonymous_proxy\":true,"
- + "\"is_satellite_provider\":true," + "\"isp\":\"Comcast\","
- + "\"organization\":\"Blorg\"," + "\"user_type\":\"college\""
- + "}," + "\"maxmind\":{\"queries_remaining\":11}" + "}";
-
- InjectableValues inject = new InjectableValues.Std().addValue("locales", Collections.singletonList("en"));
- ObjectMapper mapper = new ObjectMapper();
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-
- return new ObjectMapper().reader(CityResponse.class).with(inject).readValue(maxMindCityResponse);
- }
-
- private CityResponse getNullLatAndLongCityResponse() throws Exception {
- // Taken from MaxMind unit tests and modified.
- final String maxMindCityResponse = "{" + "\"city\":{" + "\"confidence\":76,"
- + "\"geoname_id\":9876," + "\"names\":{" + "\"en\":\"Minneapolis\""
- + "}" + "}," + "\"continent\":{" + "\"code\":\"NA\","
- + "\"geoname_id\":42," + "\"names\":{" + "\"en\":\"North America\""
- + "}" + "}," + "\"country\":{" + "\"confidence\":99,"
- + "\"iso_code\":\"US\"," + "\"geoname_id\":1," + "\"names\":{"
- + "\"en\":\"United States of America\"" + "}" + "},"
- + "\"location\":{" + "\"accuracy_radius\":1500,"
- + "\"metro_code\":765," + "\"time_zone\":\"America/Chicago\""
- + "}," + "\"postal\":{\"confidence\": 33, \"code\":\"55401\"},"
- + "\"registered_country\":{" + "\"geoname_id\":2,"
- + "\"iso_code\":\"CA\"," + "\"names\":{" + "\"en\":\"Canada\""
- + "}" + "}," + "\"represented_country\":{" + "\"geoname_id\":3,"
- + "\"iso_code\":\"GB\"," + "\"names\":{"
- + "\"en\":\"United Kingdom\"" + "}," + "\"type\":\"C\""
- + "}," + "\"subdivisions\":[{" + "\"confidence\":88,"
- + "\"geoname_id\":574635," + "\"iso_code\":\"MN\"," + "\"names\":{"
- + "\"en\":\"Minnesota\"" + "}" + "}," + "{\"iso_code\":\"TT\"}],"
- + "\"traits\":{" + "\"autonomous_system_number\":1234,"
- + "\"autonomous_system_organization\":\"AS Organization\","
- + "\"domain\":\"example.com\"," + "\"ip_address\":\"1.2.3.4\","
- + "\"is_anonymous_proxy\":true,"
- + "\"is_satellite_provider\":true," + "\"isp\":\"Comcast\","
- + "\"organization\":\"Blorg\"," + "\"user_type\":\"college\""
- + "}," + "\"maxmind\":{\"queries_remaining\":11}" + "}";
-
- InjectableValues inject = new InjectableValues.Std().addValue("locales", Collections.singletonList("en"));
- ObjectMapper mapper = new ObjectMapper();
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-
- return new ObjectMapper().reader(CityResponse.class).with(inject).readValue(maxMindCityResponse);
- }
-
class TestableGeoEnrichIP extends GeoEnrichIP {
@OnScheduled
@Override
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java
new file mode 100644
index 0000000000..d80105a803
--- /dev/null
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors;
+
+import com.maxmind.geoip2.model.CityResponse;
+import org.apache.avro.Schema;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.maxmind.DatabaseReader;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MockSchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.processors.GeoEnrichTestUtils.getFullCityResponse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestGeoEnrichIPRecord {
+ private TestRunner runner;
+ private DatabaseReader reader;
+ @Before
+ public void setup() throws Exception {
+ reader = mock(DatabaseReader.class);
+ final CityResponse cityResponse = getFullCityResponse();
+ when(reader.city(InetAddress.getByName("1.2.3.4"))).thenReturn(cityResponse);
+ runner = TestRunners.newTestRunner(new TestableGeoEnrichIPRecord());
+ ControllerService reader = new JsonTreeReader();
+ ControllerService writer = new JsonRecordSetWriter();
+ ControllerService registry = new MockSchemaRegistry();
+ runner.addControllerService("reader", reader);
+ runner.addControllerService("writer", writer);
+ runner.addControllerService("registry", registry);
+
+
+ try (InputStream is = getClass().getResourceAsStream("/avro/record_schema.avsc")) {
+ String raw = IOUtils.toString(is, "UTF-8");
+ RecordSchema parsed = AvroTypeUtil.createSchema(new Schema.Parser().parse(raw));
+ ((MockSchemaRegistry) registry).addSchema("record", parsed);
+
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+
+ runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry");
+ runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY);
+ runner.setProperty(writer, SchemaAccessUtils.SCHEMA_REGISTRY, "registry");
+ runner.setProperty(GeoEnrichIPRecord.IP_RECORD_PATH, "/ip_address");
+ runner.setProperty(GeoEnrichIPRecord.READER, "reader");
+ runner.setProperty(GeoEnrichIPRecord.WRITER, "writer");
+ runner.enableControllerService(registry);
+ runner.enableControllerService(reader);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(GeoEnrichIPRecord.GEO_CITY, "/geo/city");
+ runner.setProperty(GeoEnrichIPRecord.GEO_ACCURACY, "/geo/accuracy");
+ runner.setProperty(GeoEnrichIPRecord.GEO_COUNTRY, "/geo/country");
+ runner.setProperty(GeoEnrichIPRecord.GEO_COUNTRY_ISO, "/geo/country_iso");
+ runner.setProperty(GeoEnrichIPRecord.GEO_POSTAL_CODE, "/geo/country_postal");
+ runner.setProperty(GeoEnrichIPRecord.GEO_LATITUDE, "/geo/lat");
+ runner.setProperty(GeoEnrichIPRecord.GEO_LONGITUDE, "/geo/lon");
+ runner.assertValid();
+ }
+
+ private void commonTest(String path, int not, int found, int original) {
+ Map attrs = new HashMap<>();
+ attrs.put("schema.name", "record");
+ runner.enqueue(getClass().getResourceAsStream(path), attrs);
+ runner.run();
+
+ runner.assertTransferCount(GeoEnrichIPRecord.REL_NOT_FOUND, not);
+ runner.assertTransferCount(GeoEnrichIPRecord.REL_FOUND, found);
+ runner.assertTransferCount(GeoEnrichIPRecord.REL_ORIGINAL, original);
+ }
+
+ @Test
+ public void testSplitOutput() throws Exception {
+ runner.setProperty(GeoEnrichIPRecord.SPLIT_FOUND_NOT_FOUND, "true");
+ commonTest("/json/two_records_for_split.json", 1, 1, 1);
+ }
+
+ @Test
+ public void testEnrichSendToNotFound() throws Exception {
+ commonTest("/json/one_record_no_geo.json", 1, 0, 0);
+ }
+
+ @Test
+ public void testEnrichSendToFound() throws Exception {
+ commonTest("/json/one_record.json", 0, 1, 0);
+
+ MockFlowFile ff = runner.getFlowFilesForRelationship(GeoEnrichIPRecord.REL_FOUND).get(0);
+ byte[] raw = runner.getContentAsByteArray(ff);
+ String content = new String(raw);
+ ObjectMapper mapper = new ObjectMapper();
+ List