diff --git a/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-nar/pom.xml b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-nar/pom.xml new file mode 100644 index 0000000000..484e291369 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-nar/pom.xml @@ -0,0 +1,33 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-geo-bundle + 0.1.0-incubating-SNAPSHOT + + nifi-geo-nar + nar + NiFi Geo Enrichment NAR + + + + org.apache.nifi + nifi-geo-processors + + + diff --git a/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/.gitignore b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/.gitignore new file mode 100644 index 0000000000..b83d22266a --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/.gitignore @@ -0,0 +1 @@ +/target/ diff --git a/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/pom.xml new file mode 100644 index 0000000000..67bc253a67 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/pom.xml @@ -0,0 +1,43 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-geo-bundle + 0.1.0-incubating-SNAPSHOT + + nifi-geo-processors + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-processor-utils + + + org.apache.nifi + nifi-utils + + + com.maxmind.geoip2 + geoip2 + 2.1.0 + + + diff --git a/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java new file mode 100644 index 0000000000..fed0e7e601 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.maxmind.DatabaseReader; +import org.apache.nifi.util.StopWatch; + +import com.maxmind.geoip2.exception.GeoIp2Exception; +import com.maxmind.geoip2.model.CityResponse; +import com.maxmind.geoip2.record.Subdivision; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"geo", "enrich", "ip", "maxmind"}) +@CapabilityDescription("Looks up geolocation information for an IP address and adds the geo information to FlowFile attributes. The " + + "geo data is provided as a MaxMind database. The attribute that contains the IP address to lookup is provided by the " + + "'IP Address Attribute' property. If the name of the attribute provided is 'X', then the the attributes added by enrichment " + + "will take the form X.geo.") +@WritesAttributes({ + @WritesAttribute(attribute="X.geo.lookup.micros", description="The number of microseconds that the geo lookup took"), + @WritesAttribute(attribute="X.geo.city", description="The city identified for the IP address"), + @WritesAttribute(attribute="X.geo.latitude", description="The latitude identified for this IP address"), + @WritesAttribute(attribute="X.geo.longitude", description="The longitude identified for this IP address"), + @WritesAttribute(attribute="X.geo.subdivision.N", description="Each subdivision that is identified for this IP address is added with a one-up number appended to the attribute name, starting with 0"), + @WritesAttribute(attribute="X.geo.subdivision.isocode.N", description="The ISO code for the subdivision that is identified by X.geo.subdivision.N"), + @WritesAttribute(attribute="X.geo.country", description="The country identified for this IP address"), + @WritesAttribute(attribute="X.geo.country.isocode", description="The ISO Code for the country identified"), + @WritesAttribute(attribute="X.geo.postalcode", description="The postal code for the country identified"), +}) +public class GeoEnrichIP extends AbstractProcessor { + + public static final PropertyDescriptor GEO_DATABASE_FILE = new PropertyDescriptor.Builder() + .name("Geo Database File") + .description("Path to Maxmind Geo Enrichment Database File") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final PropertyDescriptor IP_ADDRESS_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("IP Address Attribute") + .required(true) + .description("The name of an attribute whose value is a dotted decimal IP address for which enrichment should occur") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final Relationship REL_FOUND = new Relationship.Builder() + .name("found") + .description("Where to route flow files after successfully enriching attributes with geo data") + .build(); + + public static final Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not found") + .description("Where to route flow files after unsuccessfully enriching attributes because no geo data was found") + .build(); + + private Set relationships; + private List propertyDescriptors; + private final AtomicReference databaseReaderRef = new AtomicReference<>(null); + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @OnScheduled + public final void onScheduled(final ProcessContext context) throws IOException { + final String dbFileString = context.getProperty(GEO_DATABASE_FILE).getValue(); + final File dbFile = new File(dbFileString); + final StopWatch stopWatch = new StopWatch(true); + final DatabaseReader reader = new DatabaseReader.Builder(dbFile).build(); + stopWatch.stop(); + getLogger().info("Completed loading of Maxmind Geo Database. Elapsed time was {} milliseconds.", new Object[]{stopWatch.getDuration(TimeUnit.MILLISECONDS)}); + databaseReaderRef.set(reader); + } + + @OnStopped + public void closeReader() throws IOException { + final DatabaseReader reader = databaseReaderRef.get(); + if ( reader != null ) { + reader.close(); + } + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set rels = new HashSet<>(); + rels.add(REL_FOUND); + rels.add(REL_NOT_FOUND); + this.relationships = Collections.unmodifiableSet(rels); + + final List props = new ArrayList<>(); + props.add(GEO_DATABASE_FILE); + props.add(IP_ADDRESS_ATTRIBUTE); + this.propertyDescriptors = Collections.unmodifiableList(props); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final DatabaseReader dbReader = databaseReaderRef.get(); + final String ipAttributeName = context.getProperty(IP_ADDRESS_ATTRIBUTE).getValue(); + final String ipAttributeValue = flowFile.getAttribute(ipAttributeName); + if (StringUtils.isEmpty(ipAttributeName)) { //TODO need to add additional validation - should look like an IPv4 or IPv6 addr for instance + session.transfer(flowFile, REL_NOT_FOUND); + getLogger().warn("Unable to find ip address for {}", new Object[]{flowFile}); + return; + } + InetAddress inetAddress = null; + CityResponse response = null; + + try { + inetAddress = InetAddress.getByName(ipAttributeValue); + } catch (final IOException ioe) { + session.transfer(flowFile, REL_NOT_FOUND); + getLogger().warn("Could not resolve {} to ip address for {}", new Object[]{ipAttributeValue, flowFile}, ioe); + return; + } + final StopWatch stopWatch = new StopWatch(true); + try { + response = dbReader.city(inetAddress); + stopWatch.stop(); + } catch (final IOException | GeoIp2Exception ex) { + session.transfer(flowFile, REL_NOT_FOUND); + getLogger().warn("Failure while trying to find enrichment data for {} due to {}", new Object[]{flowFile, ex}, ex); + return; + } + + if (response == null) { + session.transfer(flowFile, REL_NOT_FOUND); + getLogger().warn("No enrichment data found for ip {} of {}", new Object[]{ipAttributeValue, flowFile}); + return; + } + + final Map attrs = new HashMap<>(); + attrs.put(new StringBuilder(ipAttributeName).append(".geo.lookup.micros").toString(), String.valueOf(stopWatch.getDuration(TimeUnit.MICROSECONDS))); + attrs.put(new StringBuilder(ipAttributeName).append(".geo.city").toString(), response.getCity().getName()); + attrs.put(new StringBuilder(ipAttributeName).append(".geo.latitude").toString(), response.getLocation().getLatitude().toString()); + attrs.put(new StringBuilder(ipAttributeName).append(".geo.longitude").toString(), response.getLocation().getLongitude().toString()); + int i = 0; + for (final Subdivision subd : response.getSubdivisions()) { + attrs.put(new StringBuilder(ipAttributeName).append(".geo.subdivision.").append(i).toString(), subd.getName()); + attrs.put(new StringBuilder(ipAttributeName).append(".geo.subdivision.isocode.").append(i).toString(), subd.getIsoCode()); + i++; + } + attrs.put(new StringBuilder(ipAttributeName).append(".geo.country").toString(), response.getCountry().getName()); + attrs.put(new StringBuilder(ipAttributeName).append(".geo.country.isocode").toString(), response.getCountry().getIsoCode()); + attrs.put(new StringBuilder(ipAttributeName).append(".geo.postalcode").toString(), response.getPostal().getCode()); + flowFile = session.putAllAttributes(flowFile, attrs); + + session.transfer(flowFile, REL_FOUND); + getLogger().info("Completed lookup of IP geo information for {}", new Object[]{flowFile}); + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/maxmind/DatabaseReader.java b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/maxmind/DatabaseReader.java new file mode 100644 index 0000000000..796a7aff5c --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/java/org/apache/nifi/processors/maxmind/DatabaseReader.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.maxmind; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.util.Arrays; +import java.util.List; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.maxmind.db.Metadata; +import com.maxmind.db.Reader; +import com.maxmind.db.Reader.FileMode; +import com.maxmind.geoip2.GeoIp2Provider; +import com.maxmind.geoip2.exception.AddressNotFoundException; +import com.maxmind.geoip2.exception.GeoIp2Exception; +import com.maxmind.geoip2.model.AnonymousIpResponse; +import com.maxmind.geoip2.model.CityResponse; +import com.maxmind.geoip2.model.ConnectionTypeResponse; +import com.maxmind.geoip2.model.CountryResponse; +import com.maxmind.geoip2.model.DomainResponse; +import com.maxmind.geoip2.model.IspResponse; + +/** + *

+ * This class was copied from + * https://raw.githubusercontent.com/maxmind/GeoIP2-java/master/src/main/java/com/maxmind/geoip2/DatabaseReader.java + * It is written by Maxmind and it is available under Apache Software License V2 + * Copyright (c) 2013 by MaxMind, Inc. + * The modification we're making to the code below is to stop using exceptions for + * mainline flow control. Specifically we don't want to throw an exception + * simply because an address was not found. + *

+ * + * Instances of this class provide a reader for the GeoIP2 database format. IP + * addresses can be looked up using the get method. + */ +public class DatabaseReader implements GeoIp2Provider, Closeable { + + private final Reader reader; + + private final ObjectMapper om; + + private DatabaseReader(Builder builder) throws IOException { + if (builder.stream != null) { + this.reader = new Reader(builder.stream); + } else if (builder.database != null) { + this.reader = new Reader(builder.database, builder.mode); + } else { + // This should never happen. If it does, review the Builder class + // constructors for errors. + throw new IllegalArgumentException( + "Unsupported Builder configuration: expected either File or URL"); + } + this.om = new ObjectMapper(); + this.om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, + false); + this.om.configure( + DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true); + InjectableValues inject = new InjectableValues.Std().addValue( + "locales", builder.locales); + this.om.setInjectableValues(inject); + } + + /** + *

+ * Constructs a Builder for the DatabaseReader. The file passed to it must + * be a valid GeoIP2 database file. + *

+ *

+ * Builder creates instances of DatabaseReader + * from values set by the methods. + *

+ *

+ * Only the values set in the Builder constructor are required. + *

+ */ + public final static class Builder { + final File database; + final InputStream stream; + + List locales = Arrays.asList("en"); + FileMode mode = FileMode.MEMORY_MAPPED; + + /** + * @param stream the stream containing the GeoIP2 database to use. + */ + public Builder(InputStream stream) { + this.stream = stream; + this.database = null; + } + + /** + * @param database the GeoIP2 database file to use. + */ + public Builder(File database) { + this.database = database; + this.stream = null; + } + + /** + * @param val List of locale codes to use in name property from most + * preferred to least preferred. + * @return Builder object + */ + public Builder locales(List val) { + this.locales = val; + return this; + } + + /** + * @param val The file mode used to open the GeoIP2 database + * @return Builder object + * @throws java.lang.IllegalArgumentException if you initialized the Builder with a URL, which uses + * {@link FileMode#MEMORY}, but you provided a different + * FileMode to this method. + */ + public Builder fileMode(FileMode val) { + if (this.stream != null && !FileMode.MEMORY.equals(val)) { + throw new IllegalArgumentException( + "Only FileMode.MEMORY is supported when using an InputStream."); + } + this.mode = val; + return this; + } + + /** + * @return an instance of DatabaseReader created from the + * fields set on this builder. + * @throws IOException if there is an error reading the database + */ + public DatabaseReader build() throws IOException { + return new DatabaseReader(this); + } + } + + /** + * @param ipAddress IPv4 or IPv6 address to lookup. + * @return A object with the data for the IP address or null if no + * information could be found for the given IP address + * @throws IOException if there is an error opening or reading from the file. + */ + private T get(InetAddress ipAddress, Class cls, boolean hasTraits, + String type) throws IOException, AddressNotFoundException { + + String databaseType = this.getMetadata().getDatabaseType(); + if (!databaseType.contains(type)) { + String caller = Thread.currentThread().getStackTrace()[2] + .getMethodName(); + throw new UnsupportedOperationException( + "Invalid attempt to open a " + databaseType + + " database using the " + caller + " method"); + } + + ObjectNode node = (ObjectNode) this.reader.get(ipAddress); + + if (node == null) { + return null; + } + + ObjectNode ipNode; + if (hasTraits) { + if (!node.has("traits")) { + node.set("traits", this.om.createObjectNode()); + } + ipNode = (ObjectNode) node.get("traits"); + } else { + ipNode = node; + } + ipNode.put("ip_address", ipAddress.getHostAddress()); + + return this.om.treeToValue(node, cls); + } + + /** + *

+ * Closes the database. + *

+ *

+ * If you are using FileMode.MEMORY_MAPPED, this will + * not unmap the underlying file due to a limitation in Java's + * MappedByteBuffer. It will however set the reference to + * the buffer to null, allowing the garbage collector to + * collect it. + *

+ * + * @throws IOException if an I/O error occurs. + */ + @Override + public void close() throws IOException { + this.reader.close(); + } + + @Override + public CountryResponse country(InetAddress ipAddress) throws IOException, + GeoIp2Exception { + return this.get(ipAddress, CountryResponse.class, true, "Country"); + } + + @Override + public CityResponse city(InetAddress ipAddress) throws IOException, + GeoIp2Exception { + return this.get(ipAddress, CityResponse.class, true, "City"); + } + + /** + * Look up an IP address in a GeoIP2 Anonymous IP. + * + * @param ipAddress IPv4 or IPv6 address to lookup. + * @return a AnonymousIpResponse for the requested IP address. + * @throws GeoIp2Exception if there is an error looking up the IP + * @throws IOException if there is an IO error + */ + public AnonymousIpResponse anonymousIp(InetAddress ipAddress) throws IOException, + GeoIp2Exception { + return this.get(ipAddress, AnonymousIpResponse.class, false, "GeoIP2-Anonymous-IP"); + } + + /** + * Look up an IP address in a GeoIP2 Connection Type database. + * + * @param ipAddress IPv4 or IPv6 address to lookup. + * @return a ConnectTypeResponse for the requested IP address. + * @throws GeoIp2Exception if there is an error looking up the IP + * @throws IOException if there is an IO error + */ + public ConnectionTypeResponse connectionType(InetAddress ipAddress) + throws IOException, GeoIp2Exception { + return this.get(ipAddress, ConnectionTypeResponse.class, false, + "GeoIP2-Connection-Type"); + } + + /** + * Look up an IP address in a GeoIP2 Domain database. + * + * @param ipAddress IPv4 or IPv6 address to lookup. + * @return a DomainResponse for the requested IP address. + * @throws GeoIp2Exception if there is an error looking up the IP + * @throws IOException if there is an IO error + */ + public DomainResponse domain(InetAddress ipAddress) throws IOException, + GeoIp2Exception { + return this + .get(ipAddress, DomainResponse.class, false, "GeoIP2-Domain"); + } + + /** + * Look up an IP address in a GeoIP2 ISP database. + * + * @param ipAddress IPv4 or IPv6 address to lookup. + * @return an IspResponse for the requested IP address. + * @throws GeoIp2Exception if there is an error looking up the IP + * @throws IOException if there is an IO error + */ + public IspResponse isp(InetAddress ipAddress) throws IOException, + GeoIp2Exception { + return this.get(ipAddress, IspResponse.class, false, "GeoIP2-ISP"); + } + + /** + * @return the metadata for the open MaxMind DB file. + */ + public Metadata getMetadata() { + return this.reader.getMetadata(); + } +} \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..9b1be71888 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-geo-bundle/nifi-geo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.processors.GeoEnrichIP \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-geo-bundle/pom.xml b/nifi/nifi-nar-bundles/nifi-geo-bundle/pom.xml new file mode 100644 index 0000000000..2dbd32fe91 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-geo-bundle/pom.xml @@ -0,0 +1,42 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-nar-bundles + 0.1.0-incubating-SNAPSHOT + + + nifi-geo-bundle + pom + NiFi Geo Enrichment Capability Set + + + nifi-geo-processors + nifi-geo-nar + + + + + + org.apache.nifi + nifi-geo-processors + 0.1.0-incubating-SNAPSHOT + + + +