mirror of https://github.com/apache/nifi.git
NIFI-12230 Add configurable Log Level for IP not found in GeoEnrichIP
NIFI-12253 Route to not found relationship instead of rolling back in GeoEnrichIPRecord This closes #7909 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
6c7d02e22e
commit
184757fede
|
@ -70,6 +70,16 @@ public abstract class AbstractEnrichIP extends AbstractProcessor {
|
||||||
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
|
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor LOG_LEVEL = new PropertyDescriptor.Builder()
|
||||||
|
.name("Log Level")
|
||||||
|
.displayName("Log Level")
|
||||||
|
.required(true)
|
||||||
|
.description("The Log Level to use when an IP is not found in the database. Accepted values: INFO, DEBUG, WARN, ERROR.")
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.defaultValue(MessageLogLevel.WARN.toString())
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||||
|
.build();
|
||||||
|
|
||||||
public static final Relationship REL_FOUND = new Relationship.Builder()
|
public static final Relationship REL_FOUND = new Relationship.Builder()
|
||||||
.name("found")
|
.name("found")
|
||||||
.description("Where to route flow files after successfully enriching attributes with data provided by database")
|
.description("Where to route flow files after successfully enriching attributes with data provided by database")
|
||||||
|
@ -80,6 +90,10 @@ public abstract class AbstractEnrichIP extends AbstractProcessor {
|
||||||
.description("Where to route flow files after unsuccessfully enriching attributes because no data was found")
|
.description("Where to route flow files after unsuccessfully enriching attributes because no data was found")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
enum MessageLogLevel {
|
||||||
|
DEBUG, INFO, WARN, ERROR
|
||||||
|
}
|
||||||
|
|
||||||
private Set<Relationship> relationships;
|
private Set<Relationship> relationships;
|
||||||
private List<PropertyDescriptor> propertyDescriptors;
|
private List<PropertyDescriptor> propertyDescriptors;
|
||||||
final AtomicReference<DatabaseReader> databaseReaderRef = new AtomicReference<>(null);
|
final AtomicReference<DatabaseReader> databaseReaderRef = new AtomicReference<>(null);
|
||||||
|
@ -134,6 +148,7 @@ public abstract class AbstractEnrichIP extends AbstractProcessor {
|
||||||
final List<PropertyDescriptor> props = new ArrayList<>();
|
final List<PropertyDescriptor> props = new ArrayList<>();
|
||||||
props.add(GEO_DATABASE_FILE);
|
props.add(GEO_DATABASE_FILE);
|
||||||
props.add(IP_ADDRESS_ATTRIBUTE);
|
props.add(IP_ADDRESS_ATTRIBUTE);
|
||||||
|
props.add(LOG_LEVEL);
|
||||||
this.propertyDescriptors = Collections.unmodifiableList(props);
|
this.propertyDescriptors = Collections.unmodifiableList(props);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.nifi.processors;
|
||||||
|
|
||||||
import com.maxmind.db.InvalidDatabaseException;
|
import com.maxmind.db.InvalidDatabaseException;
|
||||||
import com.maxmind.geoip2.DatabaseReader;
|
import com.maxmind.geoip2.DatabaseReader;
|
||||||
|
import com.maxmind.geoip2.exception.AddressNotFoundException;
|
||||||
import com.maxmind.geoip2.exception.GeoIp2Exception;
|
import com.maxmind.geoip2.exception.GeoIp2Exception;
|
||||||
import com.maxmind.geoip2.model.CityResponse;
|
import com.maxmind.geoip2.model.CityResponse;
|
||||||
import com.maxmind.geoip2.record.Subdivision;
|
import com.maxmind.geoip2.record.Subdivision;
|
||||||
|
@ -93,6 +94,7 @@ public class GeoEnrichIP extends AbstractEnrichIP {
|
||||||
}
|
}
|
||||||
|
|
||||||
DatabaseReader dbReader = databaseReaderRef.get();
|
DatabaseReader dbReader = databaseReaderRef.get();
|
||||||
|
final MessageLogLevel logLevel = MessageLogLevel.valueOf(context.getProperty(LOG_LEVEL).evaluateAttributeExpressions(flowFile).getValue().toUpperCase());
|
||||||
final String ipAttributeName = context.getProperty(IP_ADDRESS_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
|
final String ipAttributeName = context.getProperty(IP_ADDRESS_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
final String ipAttributeValue = flowFile.getAttribute(ipAttributeName);
|
final String ipAttributeValue = flowFile.getAttribute(ipAttributeName);
|
||||||
|
|
||||||
|
@ -130,6 +132,26 @@ public class GeoEnrichIP extends AbstractEnrichIP {
|
||||||
getLogger().warn("Failure while trying to load enrichment data for {} due to {}, rolling back session "
|
getLogger().warn("Failure while trying to load enrichment data for {} due to {}, rolling back session "
|
||||||
+ "and will reload the database on the next run", flowFile, idbe.getMessage());
|
+ "and will reload the database on the next run", flowFile, idbe.getMessage());
|
||||||
session.rollback();
|
session.rollback();
|
||||||
|
return;
|
||||||
|
} catch (AddressNotFoundException anfe) {
|
||||||
|
session.transfer(flowFile, REL_NOT_FOUND);
|
||||||
|
|
||||||
|
switch (logLevel) {
|
||||||
|
case INFO:
|
||||||
|
getLogger().info("Address not found in the database", anfe);
|
||||||
|
break;
|
||||||
|
case WARN:
|
||||||
|
getLogger().warn("Address not found in the database", anfe);
|
||||||
|
break;
|
||||||
|
case ERROR:
|
||||||
|
getLogger().error("Address not found in the database", anfe);
|
||||||
|
break;
|
||||||
|
case DEBUG:
|
||||||
|
default:
|
||||||
|
getLogger().debug("Address not found in the database", anfe);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
} catch (GeoIp2Exception | IOException ex) {
|
} catch (GeoIp2Exception | IOException ex) {
|
||||||
// Note IOException is captured again as dbReader also makes InetAddress.getByName() calls.
|
// Note IOException is captured again as dbReader also makes InetAddress.getByName() calls.
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.nifi.processors;
|
||||||
|
|
||||||
import com.maxmind.db.InvalidDatabaseException;
|
import com.maxmind.db.InvalidDatabaseException;
|
||||||
import com.maxmind.geoip2.DatabaseReader;
|
import com.maxmind.geoip2.DatabaseReader;
|
||||||
|
import com.maxmind.geoip2.exception.AddressNotFoundException;
|
||||||
import com.maxmind.geoip2.model.CityResponse;
|
import com.maxmind.geoip2.model.CityResponse;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
@ -160,7 +161,7 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
|
||||||
|
|
||||||
private static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
|
private static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
|
||||||
GEO_DATABASE_FILE, READER, WRITER, SPLIT_FOUND_NOT_FOUND, IP_RECORD_PATH, GEO_CITY, GEO_LATITUDE,
|
GEO_DATABASE_FILE, READER, WRITER, SPLIT_FOUND_NOT_FOUND, IP_RECORD_PATH, GEO_CITY, GEO_LATITUDE,
|
||||||
GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE
|
GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE, LOG_LEVEL
|
||||||
));
|
));
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -231,6 +232,8 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
|
||||||
}
|
}
|
||||||
|
|
||||||
String rawIpPath = context.getProperty(IP_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
|
String rawIpPath = context.getProperty(IP_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
|
||||||
|
final MessageLogLevel logLevel = MessageLogLevel.valueOf(context.getProperty(LOG_LEVEL).evaluateAttributeExpressions(input).getValue().toUpperCase());
|
||||||
|
|
||||||
RecordPath ipPath = cache.getCompiled(rawIpPath);
|
RecordPath ipPath = cache.getCompiled(rawIpPath);
|
||||||
|
|
||||||
RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
|
RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
|
||||||
|
@ -249,7 +252,7 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
|
||||||
int notFoundCount = 0;
|
int notFoundCount = 0;
|
||||||
while ((record = reader.nextRecord()) != null) {
|
while ((record = reader.nextRecord()) != null) {
|
||||||
CityResponse response;
|
CityResponse response;
|
||||||
response = geocode(ipPath, record, dbReader);
|
response = geocode(ipPath, record, dbReader, logLevel);
|
||||||
boolean wasEnriched = enrichRecord(response, record, paths);
|
boolean wasEnriched = enrichRecord(response, record, paths);
|
||||||
if (wasEnriched) {
|
if (wasEnriched) {
|
||||||
targetRelationship = REL_FOUND;
|
targetRelationship = REL_FOUND;
|
||||||
|
@ -314,7 +317,7 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
private CityResponse geocode(RecordPath ipPath, Record record, DatabaseReader reader) throws Exception {
|
private CityResponse geocode(RecordPath ipPath, Record record, DatabaseReader reader, MessageLogLevel logLevel) throws Exception {
|
||||||
RecordPathResult result = ipPath.evaluate(record);
|
RecordPathResult result = ipPath.evaluate(record);
|
||||||
Optional<FieldValue> ipField = result.getSelectedFields().findFirst();
|
Optional<FieldValue> ipField = result.getSelectedFields().findFirst();
|
||||||
if (ipField.isPresent()) {
|
if (ipField.isPresent()) {
|
||||||
|
@ -326,7 +329,28 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
|
||||||
String realValue = val.toString();
|
String realValue = val.toString();
|
||||||
InetAddress address = InetAddress.getByName(realValue);
|
InetAddress address = InetAddress.getByName(realValue);
|
||||||
|
|
||||||
|
try {
|
||||||
return reader.city(address);
|
return reader.city(address);
|
||||||
|
} catch (AddressNotFoundException anfe) {
|
||||||
|
|
||||||
|
switch (logLevel) {
|
||||||
|
case INFO:
|
||||||
|
getLogger().info("Address not found in the database", anfe);
|
||||||
|
break;
|
||||||
|
case WARN:
|
||||||
|
getLogger().warn("Address not found in the database", anfe);
|
||||||
|
break;
|
||||||
|
case ERROR:
|
||||||
|
getLogger().error("Address not found in the database", anfe);
|
||||||
|
break;
|
||||||
|
case DEBUG:
|
||||||
|
default:
|
||||||
|
getLogger().debug("Address not found in the database", anfe);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,8 +55,10 @@ import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class TestGeoEnrichIPRecord {
|
public class TestGeoEnrichIPRecord {
|
||||||
|
|
||||||
private TestRunner runner;
|
private TestRunner runner;
|
||||||
private DatabaseReader reader;
|
private DatabaseReader reader;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
reader = mock(DatabaseReader.class);
|
reader = mock(DatabaseReader.class);
|
||||||
|
@ -96,6 +98,7 @@ public class TestGeoEnrichIPRecord {
|
||||||
runner.setProperty(GeoEnrichIPRecord.GEO_POSTAL_CODE, "/geo/country_postal");
|
runner.setProperty(GeoEnrichIPRecord.GEO_POSTAL_CODE, "/geo/country_postal");
|
||||||
runner.setProperty(GeoEnrichIPRecord.GEO_LATITUDE, "/geo/lat");
|
runner.setProperty(GeoEnrichIPRecord.GEO_LATITUDE, "/geo/lat");
|
||||||
runner.setProperty(GeoEnrichIPRecord.GEO_LONGITUDE, "/geo/lon");
|
runner.setProperty(GeoEnrichIPRecord.GEO_LONGITUDE, "/geo/lon");
|
||||||
|
runner.setProperty(AbstractEnrichIP.LOG_LEVEL, "WARN");
|
||||||
runner.assertValid();
|
runner.assertValid();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,7 +132,7 @@ public class TestGeoEnrichIPRecord {
|
||||||
byte[] raw = runner.getContentAsByteArray(ff);
|
byte[] raw = runner.getContentAsByteArray(ff);
|
||||||
String content = new String(raw);
|
String content = new String(raw);
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
List<Map<String, Object>> result = (List<Map<String, Object>>)mapper.readValue(content, List.class);
|
List<Map<String, Object>> result = mapper.readValue(content, List.class);
|
||||||
|
|
||||||
assertNotNull(result);
|
assertNotNull(result);
|
||||||
assertEquals(1, result.size());
|
assertEquals(1, result.size());
|
||||||
|
@ -152,9 +155,11 @@ public class TestGeoEnrichIPRecord {
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
return Collections.unmodifiableList(Arrays.asList(
|
return Collections.unmodifiableList(Arrays.asList(
|
||||||
READER, WRITER, IP_RECORD_PATH, SPLIT_FOUND_NOT_FOUND, GEO_CITY, GEO_LATITUDE, GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE
|
READER, WRITER, IP_RECORD_PATH, SPLIT_FOUND_NOT_FOUND, GEO_CITY, GEO_LATITUDE, GEO_LONGITUDE,
|
||||||
|
GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE, LOG_LEVEL
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
@Override
|
||||||
@OnScheduled
|
@OnScheduled
|
||||||
public void onScheduled(ProcessContext context) {
|
public void onScheduled(ProcessContext context) {
|
||||||
databaseReaderRef.set(reader);
|
databaseReaderRef.set(reader);
|
||||||
|
@ -162,6 +167,7 @@ public class TestGeoEnrichIPRecord {
|
||||||
writerFactory = context.getProperty(WRITER).asControllerService(RecordSetWriterFactory.class);
|
writerFactory = context.getProperty(WRITER).asControllerService(RecordSetWriterFactory.class);
|
||||||
splitOutput = context.getProperty(SPLIT_FOUND_NOT_FOUND).asBoolean();
|
splitOutput = context.getProperty(SPLIT_FOUND_NOT_FOUND).asBoolean();
|
||||||
}
|
}
|
||||||
|
@Override
|
||||||
protected void loadDatabaseFile() {
|
protected void loadDatabaseFile() {
|
||||||
// Do nothing, the mock database reader is used
|
// Do nothing, the mock database reader is used
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue