diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java index 616d8dc3ae..0da9e224b7 100644 --- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java +++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java @@ -70,6 +70,16 @@ public abstract class AbstractEnrichIP extends AbstractProcessor { .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) .build(); + public static final PropertyDescriptor LOG_LEVEL = new PropertyDescriptor.Builder() + .name("Log Level") + .displayName("Log Level") + .required(true) + .description("The Log Level to use when an IP is not found in the database. Accepted values: INFO, DEBUG, WARN, ERROR.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue(MessageLogLevel.WARN.toString()) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final Relationship REL_FOUND = new Relationship.Builder() .name("found") .description("Where to route flow files after successfully enriching attributes with data provided by database") @@ -80,6 +90,10 @@ public abstract class AbstractEnrichIP extends AbstractProcessor { .description("Where to route flow files after unsuccessfully enriching attributes because no data was found") .build(); + enum MessageLogLevel { + DEBUG, INFO, WARN, ERROR + } + private Set relationships; private List propertyDescriptors; final AtomicReference databaseReaderRef = new AtomicReference<>(null); @@ -134,6 +148,7 @@ public abstract class AbstractEnrichIP extends AbstractProcessor { final List props = new ArrayList<>(); props.add(GEO_DATABASE_FILE); props.add(IP_ADDRESS_ATTRIBUTE); + props.add(LOG_LEVEL); this.propertyDescriptors = Collections.unmodifiableList(props); } diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java index 96ae4ca8ef..dfcd410ac9 100644 --- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java +++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors; import com.maxmind.db.InvalidDatabaseException; import com.maxmind.geoip2.DatabaseReader; +import com.maxmind.geoip2.exception.AddressNotFoundException; import com.maxmind.geoip2.exception.GeoIp2Exception; import com.maxmind.geoip2.model.CityResponse; import com.maxmind.geoip2.record.Subdivision; @@ -93,6 +94,7 @@ public class GeoEnrichIP extends AbstractEnrichIP { } DatabaseReader dbReader = databaseReaderRef.get(); + final MessageLogLevel logLevel = MessageLogLevel.valueOf(context.getProperty(LOG_LEVEL).evaluateAttributeExpressions(flowFile).getValue().toUpperCase()); final String ipAttributeName = context.getProperty(IP_ADDRESS_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue(); final String ipAttributeValue = flowFile.getAttribute(ipAttributeName); @@ -130,6 +132,26 @@ public class GeoEnrichIP extends AbstractEnrichIP { getLogger().warn("Failure while trying to load enrichment data for {} due to {}, rolling back session " + "and will reload the database on the next run", flowFile, idbe.getMessage()); session.rollback(); + return; + } catch (AddressNotFoundException anfe) { + session.transfer(flowFile, REL_NOT_FOUND); + + switch (logLevel) { + case INFO: + getLogger().info("Address not found in the database", anfe); + break; + case WARN: + getLogger().warn("Address not found in the database", anfe); + break; + case ERROR: + getLogger().error("Address not found in the database", anfe); + break; + case DEBUG: + default: + getLogger().debug("Address not found in the database", anfe); + break; + } + return; } catch (GeoIp2Exception | IOException ex) { // Note IOException is captured again as dbReader also makes InetAddress.getByName() calls. diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java index d8c6aec07e..e917b01047 100644 --- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java +++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors; import com.maxmind.db.InvalidDatabaseException; import com.maxmind.geoip2.DatabaseReader; +import com.maxmind.geoip2.exception.AddressNotFoundException; import com.maxmind.geoip2.model.CityResponse; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -160,7 +161,7 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP { private static final List DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( GEO_DATABASE_FILE, READER, WRITER, SPLIT_FOUND_NOT_FOUND, IP_RECORD_PATH, GEO_CITY, GEO_LATITUDE, - GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE + GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE, LOG_LEVEL )); @Override @@ -231,6 +232,8 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP { } String rawIpPath = context.getProperty(IP_RECORD_PATH).evaluateAttributeExpressions(input).getValue(); + final MessageLogLevel logLevel = MessageLogLevel.valueOf(context.getProperty(LOG_LEVEL).evaluateAttributeExpressions(input).getValue().toUpperCase()); + RecordPath ipPath = cache.getCompiled(rawIpPath); RecordReader reader = readerFactory.createRecordReader(input, is, getLogger()); @@ -249,7 +252,7 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP { int notFoundCount = 0; while ((record = reader.nextRecord()) != null) { CityResponse response; - response = geocode(ipPath, record, dbReader); + response = geocode(ipPath, record, dbReader, logLevel); boolean wasEnriched = enrichRecord(response, record, paths); if (wasEnriched) { targetRelationship = REL_FOUND; @@ -314,7 +317,7 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP { return retVal; } - private CityResponse geocode(RecordPath ipPath, Record record, DatabaseReader reader) throws Exception { + private CityResponse geocode(RecordPath ipPath, Record record, DatabaseReader reader, MessageLogLevel logLevel) throws Exception { RecordPathResult result = ipPath.evaluate(record); Optional ipField = result.getSelectedFields().findFirst(); if (ipField.isPresent()) { @@ -326,7 +329,28 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP { String realValue = val.toString(); InetAddress address = InetAddress.getByName(realValue); - return reader.city(address); + try { + return reader.city(address); + } catch (AddressNotFoundException anfe) { + + switch (logLevel) { + case INFO: + getLogger().info("Address not found in the database", anfe); + break; + case WARN: + getLogger().warn("Address not found in the database", anfe); + break; + case ERROR: + getLogger().error("Address not found in the database", anfe); + break; + case DEBUG: + default: + getLogger().debug("Address not found in the database", anfe); + break; + } + + return null; + } } else { return null; } diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java index c7aebb00d1..20a39098b9 100644 --- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java +++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java @@ -55,8 +55,10 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TestGeoEnrichIPRecord { + private TestRunner runner; private DatabaseReader reader; + @BeforeEach public void setup() throws Exception { reader = mock(DatabaseReader.class); @@ -96,6 +98,7 @@ public class TestGeoEnrichIPRecord { runner.setProperty(GeoEnrichIPRecord.GEO_POSTAL_CODE, "/geo/country_postal"); runner.setProperty(GeoEnrichIPRecord.GEO_LATITUDE, "/geo/lat"); runner.setProperty(GeoEnrichIPRecord.GEO_LONGITUDE, "/geo/lon"); + runner.setProperty(AbstractEnrichIP.LOG_LEVEL, "WARN"); runner.assertValid(); } @@ -129,7 +132,7 @@ public class TestGeoEnrichIPRecord { byte[] raw = runner.getContentAsByteArray(ff); String content = new String(raw); ObjectMapper mapper = new ObjectMapper(); - List> result = (List>)mapper.readValue(content, List.class); + List> result = mapper.readValue(content, List.class); assertNotNull(result); assertEquals(1, result.size()); @@ -152,9 +155,11 @@ public class TestGeoEnrichIPRecord { @Override protected List getSupportedPropertyDescriptors() { return Collections.unmodifiableList(Arrays.asList( - READER, WRITER, IP_RECORD_PATH, SPLIT_FOUND_NOT_FOUND, GEO_CITY, GEO_LATITUDE, GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE + READER, WRITER, IP_RECORD_PATH, SPLIT_FOUND_NOT_FOUND, GEO_CITY, GEO_LATITUDE, GEO_LONGITUDE, + GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE, LOG_LEVEL )); } + @Override @OnScheduled public void onScheduled(ProcessContext context) { databaseReaderRef.set(reader); @@ -162,6 +167,7 @@ public class TestGeoEnrichIPRecord { writerFactory = context.getProperty(WRITER).asControllerService(RecordSetWriterFactory.class); splitOutput = context.getProperty(SPLIT_FOUND_NOT_FOUND).asBoolean(); } + @Override protected void loadDatabaseFile() { // Do nothing, the mock database reader is used }