mirror of https://github.com/apache/nifi.git
NIFI-271
This commit is contained in:
parent
5a559e4557
commit
90ae022e54
|
@ -60,20 +60,20 @@ import com.maxmind.geoip2.record.Subdivision;
|
|||
@SupportsBatching
|
||||
@Tags({"geo", "enrich", "ip", "maxmind"})
|
||||
@CapabilityDescription("Looks up geolocation information for an IP address and adds the geo information to FlowFile attributes. The "
|
||||
+ "geo data is provided as a MaxMind database. The attribute that contains the IP address to lookup is provided by the "
|
||||
+ "'IP Address Attribute' property. If the name of the attribute provided is 'X', then the the attributes added by enrichment "
|
||||
+ "will take the form X.geo.<fieldName>")
|
||||
+ "geo data is provided as a MaxMind database. The attribute that contains the IP address to lookup is provided by the "
|
||||
+ "'IP Address Attribute' property. If the name of the attribute provided is 'X', then the the attributes added by enrichment "
|
||||
+ "will take the form X.geo.<fieldName>")
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute="X.geo.lookup.micros", description="The number of microseconds that the geo lookup took"),
|
||||
@WritesAttribute(attribute="X.geo.city", description="The city identified for the IP address"),
|
||||
@WritesAttribute(attribute="X.geo.latitude", description="The latitude identified for this IP address"),
|
||||
@WritesAttribute(attribute="X.geo.longitude", description="The longitude identified for this IP address"),
|
||||
@WritesAttribute(attribute="X.geo.subdivision.N", description="Each subdivision that is identified for this IP address is added with a one-up number appended to the attribute name, starting with 0"),
|
||||
@WritesAttribute(attribute="X.geo.subdivision.isocode.N", description="The ISO code for the subdivision that is identified by X.geo.subdivision.N"),
|
||||
@WritesAttribute(attribute="X.geo.country", description="The country identified for this IP address"),
|
||||
@WritesAttribute(attribute="X.geo.country.isocode", description="The ISO Code for the country identified"),
|
||||
@WritesAttribute(attribute="X.geo.postalcode", description="The postal code for the country identified"),
|
||||
})
|
||||
@WritesAttribute(attribute = "X.geo.lookup.micros", description = "The number of microseconds that the geo lookup took"),
|
||||
@WritesAttribute(attribute = "X.geo.city", description = "The city identified for the IP address"),
|
||||
@WritesAttribute(attribute = "X.geo.latitude", description = "The latitude identified for this IP address"),
|
||||
@WritesAttribute(attribute = "X.geo.longitude", description = "The longitude identified for this IP address"),
|
||||
@WritesAttribute(attribute = "X.geo.subdivision.N",
|
||||
description = "Each subdivision that is identified for this IP address is added with a one-up number appended to the attribute name, starting with 0"),
|
||||
@WritesAttribute(attribute = "X.geo.subdivision.isocode.N", description = "The ISO code for the subdivision that is identified by X.geo.subdivision.N"),
|
||||
@WritesAttribute(attribute = "X.geo.country", description = "The country identified for this IP address"),
|
||||
@WritesAttribute(attribute = "X.geo.country.isocode", description = "The ISO Code for the country identified"),
|
||||
@WritesAttribute(attribute = "X.geo.postalcode", description = "The postal code for the country identified"),})
|
||||
public class GeoEnrichIP extends AbstractProcessor {
|
||||
|
||||
public static final PropertyDescriptor GEO_DATABASE_FILE = new PropertyDescriptor.Builder()
|
||||
|
@ -127,12 +127,12 @@ public class GeoEnrichIP extends AbstractProcessor {
|
|||
|
||||
@OnStopped
|
||||
public void closeReader() throws IOException {
|
||||
final DatabaseReader reader = databaseReaderRef.get();
|
||||
if ( reader != null ) {
|
||||
reader.close();
|
||||
}
|
||||
final DatabaseReader reader = databaseReaderRef.get();
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
final Set<Relationship> rels = new HashSet<>();
|
||||
|
@ -180,7 +180,7 @@ public class GeoEnrichIP extends AbstractProcessor {
|
|||
getLogger().warn("Failure while trying to find enrichment data for {} due to {}", new Object[]{flowFile, ex}, ex);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
if (response == null) {
|
||||
session.transfer(flowFile, REL_NOT_FOUND);
|
||||
return;
|
||||
|
|
|
@ -43,15 +43,15 @@ import com.maxmind.geoip2.model.IspResponse;
|
|||
|
||||
/**
|
||||
* <p>
|
||||
* This class was copied from
|
||||
* This class was copied from
|
||||
* https://raw.githubusercontent.com/maxmind/GeoIP2-java/master/src/main/java/com/maxmind/geoip2/DatabaseReader.java
|
||||
* It is written by Maxmind and it is available under Apache Software License V2
|
||||
*
|
||||
* The modification we're making to the code below is to stop using exceptions for
|
||||
* mainline flow control. Specifically we don't want to throw an exception
|
||||
*
|
||||
* The modification we're making to the code below is to stop using exceptions
|
||||
* for mainline flow control. Specifically we don't want to throw an exception
|
||||
* simply because an address was not found.
|
||||
* </p>
|
||||
*
|
||||
*
|
||||
* Instances of this class provide a reader for the GeoIP2 database format. IP
|
||||
* addresses can be looked up using the <code>get</code> method.
|
||||
*/
|
||||
|
@ -96,6 +96,7 @@ public class DatabaseReader implements GeoIp2Provider, Closeable {
|
|||
* </p>
|
||||
*/
|
||||
public final static class Builder {
|
||||
|
||||
final File database;
|
||||
final InputStream stream;
|
||||
|
||||
|
@ -120,7 +121,7 @@ public class DatabaseReader implements GeoIp2Provider, Closeable {
|
|||
|
||||
/**
|
||||
* @param val List of locale codes to use in name property from most
|
||||
* preferred to least preferred.
|
||||
* preferred to least preferred.
|
||||
* @return Builder object
|
||||
*/
|
||||
public Builder locales(List<String> val) {
|
||||
|
@ -131,9 +132,9 @@ public class DatabaseReader implements GeoIp2Provider, Closeable {
|
|||
/**
|
||||
* @param val The file mode used to open the GeoIP2 database
|
||||
* @return Builder object
|
||||
* @throws java.lang.IllegalArgumentException if you initialized the Builder with a URL, which uses
|
||||
* {@link FileMode#MEMORY}, but you provided a different
|
||||
* FileMode to this method.
|
||||
* @throws java.lang.IllegalArgumentException if you initialized the
|
||||
* Builder with a URL, which uses {@link FileMode#MEMORY}, but you
|
||||
* provided a different FileMode to this method.
|
||||
*/
|
||||
public Builder fileMode(FileMode val) {
|
||||
if (this.stream != null && !FileMode.MEMORY.equals(val)) {
|
||||
|
@ -156,12 +157,13 @@ public class DatabaseReader implements GeoIp2Provider, Closeable {
|
|||
|
||||
/**
|
||||
* @param ipAddress IPv4 or IPv6 address to lookup.
|
||||
* @return A <T> object with the data for the IP address or null if no
|
||||
* @return An object of type T with the data for the IP address or null if no
|
||||
* information could be found for the given IP address
|
||||
* @throws IOException if there is an error opening or reading from the file.
|
||||
* @throws IOException if there is an error opening or reading from the
|
||||
* file.
|
||||
*/
|
||||
private <T> T get(InetAddress ipAddress, Class<T> cls, boolean hasTraits,
|
||||
String type) throws IOException, AddressNotFoundException {
|
||||
String type) throws IOException, AddressNotFoundException {
|
||||
|
||||
String databaseType = this.getMetadata().getDatabaseType();
|
||||
if (!databaseType.contains(type)) {
|
||||
|
@ -169,7 +171,7 @@ public class DatabaseReader implements GeoIp2Provider, Closeable {
|
|||
.getMethodName();
|
||||
throw new UnsupportedOperationException(
|
||||
"Invalid attempt to open a " + databaseType
|
||||
+ " database using the " + caller + " method");
|
||||
+ " database using the " + caller + " method");
|
||||
}
|
||||
|
||||
ObjectNode node = (ObjectNode) this.reader.get(ipAddress);
|
||||
|
@ -199,9 +201,9 @@ public class DatabaseReader implements GeoIp2Provider, Closeable {
|
|||
* <p>
|
||||
* If you are using <code>FileMode.MEMORY_MAPPED</code>, this will
|
||||
* <em>not</em> unmap the underlying file due to a limitation in Java's
|
||||
* <code>MappedByteBuffer</code>. It will however set the reference to
|
||||
* the buffer to <code>null</code>, allowing the garbage collector to
|
||||
* collect it.
|
||||
* <code>MappedByteBuffer</code>. It will however set the reference to the
|
||||
* buffer to <code>null</code>, allowing the garbage collector to collect
|
||||
* it.
|
||||
* </p>
|
||||
*
|
||||
* @throws IOException if an I/O error occurs.
|
||||
|
@ -229,7 +231,7 @@ public class DatabaseReader implements GeoIp2Provider, Closeable {
|
|||
* @param ipAddress IPv4 or IPv6 address to lookup.
|
||||
* @return a AnonymousIpResponse for the requested IP address.
|
||||
* @throws GeoIp2Exception if there is an error looking up the IP
|
||||
* @throws IOException if there is an IO error
|
||||
* @throws IOException if there is an IO error
|
||||
*/
|
||||
public AnonymousIpResponse anonymousIp(InetAddress ipAddress) throws IOException,
|
||||
GeoIp2Exception {
|
||||
|
@ -242,7 +244,7 @@ public class DatabaseReader implements GeoIp2Provider, Closeable {
|
|||
* @param ipAddress IPv4 or IPv6 address to lookup.
|
||||
* @return a ConnectTypeResponse for the requested IP address.
|
||||
* @throws GeoIp2Exception if there is an error looking up the IP
|
||||
* @throws IOException if there is an IO error
|
||||
* @throws IOException if there is an IO error
|
||||
*/
|
||||
public ConnectionTypeResponse connectionType(InetAddress ipAddress)
|
||||
throws IOException, GeoIp2Exception {
|
||||
|
@ -256,7 +258,7 @@ public class DatabaseReader implements GeoIp2Provider, Closeable {
|
|||
* @param ipAddress IPv4 or IPv6 address to lookup.
|
||||
* @return a DomainResponse for the requested IP address.
|
||||
* @throws GeoIp2Exception if there is an error looking up the IP
|
||||
* @throws IOException if there is an IO error
|
||||
* @throws IOException if there is an IO error
|
||||
*/
|
||||
public DomainResponse domain(InetAddress ipAddress) throws IOException,
|
||||
GeoIp2Exception {
|
||||
|
@ -270,7 +272,7 @@ public class DatabaseReader implements GeoIp2Provider, Closeable {
|
|||
* @param ipAddress IPv4 or IPv6 address to lookup.
|
||||
* @return an IspResponse for the requested IP address.
|
||||
* @throws GeoIp2Exception if there is an error looking up the IP
|
||||
* @throws IOException if there is an IO error
|
||||
* @throws IOException if there is an IO error
|
||||
*/
|
||||
public IspResponse isp(InetAddress ipAddress) throws IOException,
|
||||
GeoIp2Exception {
|
||||
|
@ -283,4 +285,4 @@ public class DatabaseReader implements GeoIp2Provider, Closeable {
|
|||
public Metadata getMetadata() {
|
||||
return this.reader.getMetadata();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue