mirror of https://github.com/apache/nifi.git
504: Initial import of geo enrich processors
This commit is contained in:
parent
45416dc66b
commit
ff0bd2c669
|
@ -0,0 +1,33 @@
|
||||||
|
<?xml version="1.0"?>
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-geo-bundle</artifactId>
|
||||||
|
<version>0.1.0-incubating-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
<artifactId>nifi-geo-nar</artifactId>
|
||||||
|
<packaging>nar</packaging>
|
||||||
|
<description>NiFi Geo Enrichment NAR</description>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-geo-processors</artifactId>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</project>
|
|
@ -0,0 +1 @@
|
||||||
|
/target/
|
|
@ -0,0 +1,43 @@
|
||||||
|
<?xml version="1.0"?>
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-geo-bundle</artifactId>
|
||||||
|
<version>0.1.0-incubating-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
<artifactId>nifi-geo-processors</artifactId>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-processor-utils</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-utils</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.maxmind.geoip2</groupId>
|
||||||
|
<artifactId>geoip2</artifactId>
|
||||||
|
<version>2.1.0</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</project>
|
|
@ -0,0 +1,210 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
|
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||||
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.processors.maxmind.DatabaseReader;
|
||||||
|
import org.apache.nifi.util.StopWatch;
|
||||||
|
|
||||||
|
import com.maxmind.geoip2.exception.GeoIp2Exception;
|
||||||
|
import com.maxmind.geoip2.model.CityResponse;
|
||||||
|
import com.maxmind.geoip2.record.Subdivision;
|
||||||
|
|
||||||
|
@EventDriven
|
||||||
|
@SideEffectFree
|
||||||
|
@SupportsBatching
|
||||||
|
@Tags({"geo", "enrich", "ip", "maxmind"})
|
||||||
|
@CapabilityDescription("Looks up geolocation information for an IP address and adds the geo information to FlowFile attributes. The "
|
||||||
|
+ "geo data is provided as a MaxMind database. The attribute that contains the IP address to lookup is provided by the "
|
||||||
|
+ "'IP Address Attribute' property. If the name of the attribute provided is 'X', then the the attributes added by enrichment "
|
||||||
|
+ "will take the form X.geo.<fieldName>")
|
||||||
|
@WritesAttributes({
|
||||||
|
@WritesAttribute(attribute="X.geo.lookup.micros", description="The number of microseconds that the geo lookup took"),
|
||||||
|
@WritesAttribute(attribute="X.geo.city", description="The city identified for the IP address"),
|
||||||
|
@WritesAttribute(attribute="X.geo.latitude", description="The latitude identified for this IP address"),
|
||||||
|
@WritesAttribute(attribute="X.geo.longitude", description="The longitude identified for this IP address"),
|
||||||
|
@WritesAttribute(attribute="X.geo.subdivision.N", description="Each subdivision that is identified for this IP address is added with a one-up number appended to the attribute name, starting with 0"),
|
||||||
|
@WritesAttribute(attribute="X.geo.subdivision.isocode.N", description="The ISO code for the subdivision that is identified by X.geo.subdivision.N"),
|
||||||
|
@WritesAttribute(attribute="X.geo.country", description="The country identified for this IP address"),
|
||||||
|
@WritesAttribute(attribute="X.geo.country.isocode", description="The ISO Code for the country identified"),
|
||||||
|
@WritesAttribute(attribute="X.geo.postalcode", description="The postal code for the country identified"),
|
||||||
|
})
|
||||||
|
public class GeoEnrichIP extends AbstractProcessor {
|
||||||
|
|
||||||
|
public static final PropertyDescriptor GEO_DATABASE_FILE = new PropertyDescriptor.Builder()
|
||||||
|
.name("Geo Database File")
|
||||||
|
.description("Path to Maxmind Geo Enrichment Database File")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor IP_ADDRESS_ATTRIBUTE = new PropertyDescriptor.Builder()
|
||||||
|
.name("IP Address Attribute")
|
||||||
|
.required(true)
|
||||||
|
.description("The name of an attribute whose value is a dotted decimal IP address for which enrichment should occur")
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Relationship REL_FOUND = new Relationship.Builder()
|
||||||
|
.name("found")
|
||||||
|
.description("Where to route flow files after successfully enriching attributes with geo data")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final Relationship REL_NOT_FOUND = new Relationship.Builder()
|
||||||
|
.name("not found")
|
||||||
|
.description("Where to route flow files after unsuccessfully enriching attributes because no geo data was found")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
private Set<Relationship> relationships;
|
||||||
|
private List<PropertyDescriptor> propertyDescriptors;
|
||||||
|
private final AtomicReference<DatabaseReader> databaseReaderRef = new AtomicReference<>(null);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
return relationships;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return propertyDescriptors;
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnScheduled
|
||||||
|
public final void onScheduled(final ProcessContext context) throws IOException {
|
||||||
|
final String dbFileString = context.getProperty(GEO_DATABASE_FILE).getValue();
|
||||||
|
final File dbFile = new File(dbFileString);
|
||||||
|
final StopWatch stopWatch = new StopWatch(true);
|
||||||
|
final DatabaseReader reader = new DatabaseReader.Builder(dbFile).build();
|
||||||
|
stopWatch.stop();
|
||||||
|
getLogger().info("Completed loading of Maxmind Geo Database. Elapsed time was {} milliseconds.", new Object[]{stopWatch.getDuration(TimeUnit.MILLISECONDS)});
|
||||||
|
databaseReaderRef.set(reader);
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnStopped
|
||||||
|
public void closeReader() throws IOException {
|
||||||
|
final DatabaseReader reader = databaseReaderRef.get();
|
||||||
|
if ( reader != null ) {
|
||||||
|
reader.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void init(final ProcessorInitializationContext context) {
|
||||||
|
final Set<Relationship> rels = new HashSet<>();
|
||||||
|
rels.add(REL_FOUND);
|
||||||
|
rels.add(REL_NOT_FOUND);
|
||||||
|
this.relationships = Collections.unmodifiableSet(rels);
|
||||||
|
|
||||||
|
final List<PropertyDescriptor> props = new ArrayList<>();
|
||||||
|
props.add(GEO_DATABASE_FILE);
|
||||||
|
props.add(IP_ADDRESS_ATTRIBUTE);
|
||||||
|
this.propertyDescriptors = Collections.unmodifiableList(props);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||||
|
FlowFile flowFile = session.get();
|
||||||
|
if (flowFile == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final DatabaseReader dbReader = databaseReaderRef.get();
|
||||||
|
final String ipAttributeName = context.getProperty(IP_ADDRESS_ATTRIBUTE).getValue();
|
||||||
|
final String ipAttributeValue = flowFile.getAttribute(ipAttributeName);
|
||||||
|
if (StringUtils.isEmpty(ipAttributeName)) { //TODO need to add additional validation - should look like an IPv4 or IPv6 addr for instance
|
||||||
|
session.transfer(flowFile, REL_NOT_FOUND);
|
||||||
|
getLogger().warn("Unable to find ip address for {}", new Object[]{flowFile});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
InetAddress inetAddress = null;
|
||||||
|
CityResponse response = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
inetAddress = InetAddress.getByName(ipAttributeValue);
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
session.transfer(flowFile, REL_NOT_FOUND);
|
||||||
|
getLogger().warn("Could not resolve {} to ip address for {}", new Object[]{ipAttributeValue, flowFile}, ioe);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
final StopWatch stopWatch = new StopWatch(true);
|
||||||
|
try {
|
||||||
|
response = dbReader.city(inetAddress);
|
||||||
|
stopWatch.stop();
|
||||||
|
} catch (final IOException | GeoIp2Exception ex) {
|
||||||
|
session.transfer(flowFile, REL_NOT_FOUND);
|
||||||
|
getLogger().warn("Failure while trying to find enrichment data for {} due to {}", new Object[]{flowFile, ex}, ex);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (response == null) {
|
||||||
|
session.transfer(flowFile, REL_NOT_FOUND);
|
||||||
|
getLogger().warn("No enrichment data found for ip {} of {}", new Object[]{ipAttributeValue, flowFile});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final Map<String, String> attrs = new HashMap<>();
|
||||||
|
attrs.put(new StringBuilder(ipAttributeName).append(".geo.lookup.micros").toString(), String.valueOf(stopWatch.getDuration(TimeUnit.MICROSECONDS)));
|
||||||
|
attrs.put(new StringBuilder(ipAttributeName).append(".geo.city").toString(), response.getCity().getName());
|
||||||
|
attrs.put(new StringBuilder(ipAttributeName).append(".geo.latitude").toString(), response.getLocation().getLatitude().toString());
|
||||||
|
attrs.put(new StringBuilder(ipAttributeName).append(".geo.longitude").toString(), response.getLocation().getLongitude().toString());
|
||||||
|
int i = 0;
|
||||||
|
for (final Subdivision subd : response.getSubdivisions()) {
|
||||||
|
attrs.put(new StringBuilder(ipAttributeName).append(".geo.subdivision.").append(i).toString(), subd.getName());
|
||||||
|
attrs.put(new StringBuilder(ipAttributeName).append(".geo.subdivision.isocode.").append(i).toString(), subd.getIsoCode());
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
attrs.put(new StringBuilder(ipAttributeName).append(".geo.country").toString(), response.getCountry().getName());
|
||||||
|
attrs.put(new StringBuilder(ipAttributeName).append(".geo.country.isocode").toString(), response.getCountry().getIsoCode());
|
||||||
|
attrs.put(new StringBuilder(ipAttributeName).append(".geo.postalcode").toString(), response.getPostal().getCode());
|
||||||
|
flowFile = session.putAllAttributes(flowFile, attrs);
|
||||||
|
|
||||||
|
session.transfer(flowFile, REL_FOUND);
|
||||||
|
getLogger().info("Completed lookup of IP geo information for {}", new Object[]{flowFile});
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,286 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.maxmind;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
|
import com.fasterxml.jackson.databind.InjectableValues;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
import com.maxmind.db.Metadata;
|
||||||
|
import com.maxmind.db.Reader;
|
||||||
|
import com.maxmind.db.Reader.FileMode;
|
||||||
|
import com.maxmind.geoip2.GeoIp2Provider;
|
||||||
|
import com.maxmind.geoip2.exception.AddressNotFoundException;
|
||||||
|
import com.maxmind.geoip2.exception.GeoIp2Exception;
|
||||||
|
import com.maxmind.geoip2.model.AnonymousIpResponse;
|
||||||
|
import com.maxmind.geoip2.model.CityResponse;
|
||||||
|
import com.maxmind.geoip2.model.ConnectionTypeResponse;
|
||||||
|
import com.maxmind.geoip2.model.CountryResponse;
|
||||||
|
import com.maxmind.geoip2.model.DomainResponse;
|
||||||
|
import com.maxmind.geoip2.model.IspResponse;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* This class was copied from
|
||||||
|
* https://raw.githubusercontent.com/maxmind/GeoIP2-java/master/src/main/java/com/maxmind/geoip2/DatabaseReader.java
|
||||||
|
* It is written by Maxmind and it is available under Apache Software License V2
|
||||||
|
* Copyright (c) 2013 by MaxMind, Inc.
|
||||||
|
* The modification we're making to the code below is to stop using exceptions for
|
||||||
|
* mainline flow control. Specifically we don't want to throw an exception
|
||||||
|
* simply because an address was not found.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* Instances of this class provide a reader for the GeoIP2 database format. IP
|
||||||
|
* addresses can be looked up using the <code>get</code> method.
|
||||||
|
*/
|
||||||
|
public class DatabaseReader implements GeoIp2Provider, Closeable {
|
||||||
|
|
||||||
|
private final Reader reader;
|
||||||
|
|
||||||
|
private final ObjectMapper om;
|
||||||
|
|
||||||
|
private DatabaseReader(Builder builder) throws IOException {
|
||||||
|
if (builder.stream != null) {
|
||||||
|
this.reader = new Reader(builder.stream);
|
||||||
|
} else if (builder.database != null) {
|
||||||
|
this.reader = new Reader(builder.database, builder.mode);
|
||||||
|
} else {
|
||||||
|
// This should never happen. If it does, review the Builder class
|
||||||
|
// constructors for errors.
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Unsupported Builder configuration: expected either File or URL");
|
||||||
|
}
|
||||||
|
this.om = new ObjectMapper();
|
||||||
|
this.om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
|
||||||
|
false);
|
||||||
|
this.om.configure(
|
||||||
|
DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true);
|
||||||
|
InjectableValues inject = new InjectableValues.Std().addValue(
|
||||||
|
"locales", builder.locales);
|
||||||
|
this.om.setInjectableValues(inject);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Constructs a Builder for the DatabaseReader. The file passed to it must
|
||||||
|
* be a valid GeoIP2 database file.
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* <code>Builder</code> creates instances of <code>DatabaseReader</code>
|
||||||
|
* from values set by the methods.
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* Only the values set in the <code>Builder</code> constructor are required.
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
public final static class Builder {
|
||||||
|
final File database;
|
||||||
|
final InputStream stream;
|
||||||
|
|
||||||
|
List<String> locales = Arrays.asList("en");
|
||||||
|
FileMode mode = FileMode.MEMORY_MAPPED;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param stream the stream containing the GeoIP2 database to use.
|
||||||
|
*/
|
||||||
|
public Builder(InputStream stream) {
|
||||||
|
this.stream = stream;
|
||||||
|
this.database = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param database the GeoIP2 database file to use.
|
||||||
|
*/
|
||||||
|
public Builder(File database) {
|
||||||
|
this.database = database;
|
||||||
|
this.stream = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param val List of locale codes to use in name property from most
|
||||||
|
* preferred to least preferred.
|
||||||
|
* @return Builder object
|
||||||
|
*/
|
||||||
|
public Builder locales(List<String> val) {
|
||||||
|
this.locales = val;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param val The file mode used to open the GeoIP2 database
|
||||||
|
* @return Builder object
|
||||||
|
* @throws java.lang.IllegalArgumentException if you initialized the Builder with a URL, which uses
|
||||||
|
* {@link FileMode#MEMORY}, but you provided a different
|
||||||
|
* FileMode to this method.
|
||||||
|
*/
|
||||||
|
public Builder fileMode(FileMode val) {
|
||||||
|
if (this.stream != null && !FileMode.MEMORY.equals(val)) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Only FileMode.MEMORY is supported when using an InputStream.");
|
||||||
|
}
|
||||||
|
this.mode = val;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return an instance of <code>DatabaseReader</code> created from the
|
||||||
|
* fields set on this builder.
|
||||||
|
* @throws IOException if there is an error reading the database
|
||||||
|
*/
|
||||||
|
public DatabaseReader build() throws IOException {
|
||||||
|
return new DatabaseReader(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param ipAddress IPv4 or IPv6 address to lookup.
|
||||||
|
* @return A <T> object with the data for the IP address or null if no
|
||||||
|
* information could be found for the given IP address
|
||||||
|
* @throws IOException if there is an error opening or reading from the file.
|
||||||
|
*/
|
||||||
|
private <T> T get(InetAddress ipAddress, Class<T> cls, boolean hasTraits,
|
||||||
|
String type) throws IOException, AddressNotFoundException {
|
||||||
|
|
||||||
|
String databaseType = this.getMetadata().getDatabaseType();
|
||||||
|
if (!databaseType.contains(type)) {
|
||||||
|
String caller = Thread.currentThread().getStackTrace()[2]
|
||||||
|
.getMethodName();
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"Invalid attempt to open a " + databaseType
|
||||||
|
+ " database using the " + caller + " method");
|
||||||
|
}
|
||||||
|
|
||||||
|
ObjectNode node = (ObjectNode) this.reader.get(ipAddress);
|
||||||
|
|
||||||
|
if (node == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
ObjectNode ipNode;
|
||||||
|
if (hasTraits) {
|
||||||
|
if (!node.has("traits")) {
|
||||||
|
node.set("traits", this.om.createObjectNode());
|
||||||
|
}
|
||||||
|
ipNode = (ObjectNode) node.get("traits");
|
||||||
|
} else {
|
||||||
|
ipNode = node;
|
||||||
|
}
|
||||||
|
ipNode.put("ip_address", ipAddress.getHostAddress());
|
||||||
|
|
||||||
|
return this.om.treeToValue(node, cls);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Closes the database.
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* If you are using <code>FileMode.MEMORY_MAPPED</code>, this will
|
||||||
|
* <em>not</em> unmap the underlying file due to a limitation in Java's
|
||||||
|
* <code>MappedByteBuffer</code>. It will however set the reference to
|
||||||
|
* the buffer to <code>null</code>, allowing the garbage collector to
|
||||||
|
* collect it.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @throws IOException if an I/O error occurs.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
this.reader.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CountryResponse country(InetAddress ipAddress) throws IOException,
|
||||||
|
GeoIp2Exception {
|
||||||
|
return this.get(ipAddress, CountryResponse.class, true, "Country");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CityResponse city(InetAddress ipAddress) throws IOException,
|
||||||
|
GeoIp2Exception {
|
||||||
|
return this.get(ipAddress, CityResponse.class, true, "City");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Look up an IP address in a GeoIP2 Anonymous IP.
|
||||||
|
*
|
||||||
|
* @param ipAddress IPv4 or IPv6 address to lookup.
|
||||||
|
* @return a AnonymousIpResponse for the requested IP address.
|
||||||
|
* @throws GeoIp2Exception if there is an error looking up the IP
|
||||||
|
* @throws IOException if there is an IO error
|
||||||
|
*/
|
||||||
|
public AnonymousIpResponse anonymousIp(InetAddress ipAddress) throws IOException,
|
||||||
|
GeoIp2Exception {
|
||||||
|
return this.get(ipAddress, AnonymousIpResponse.class, false, "GeoIP2-Anonymous-IP");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Look up an IP address in a GeoIP2 Connection Type database.
|
||||||
|
*
|
||||||
|
* @param ipAddress IPv4 or IPv6 address to lookup.
|
||||||
|
* @return a ConnectTypeResponse for the requested IP address.
|
||||||
|
* @throws GeoIp2Exception if there is an error looking up the IP
|
||||||
|
* @throws IOException if there is an IO error
|
||||||
|
*/
|
||||||
|
public ConnectionTypeResponse connectionType(InetAddress ipAddress)
|
||||||
|
throws IOException, GeoIp2Exception {
|
||||||
|
return this.get(ipAddress, ConnectionTypeResponse.class, false,
|
||||||
|
"GeoIP2-Connection-Type");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Look up an IP address in a GeoIP2 Domain database.
|
||||||
|
*
|
||||||
|
* @param ipAddress IPv4 or IPv6 address to lookup.
|
||||||
|
* @return a DomainResponse for the requested IP address.
|
||||||
|
* @throws GeoIp2Exception if there is an error looking up the IP
|
||||||
|
* @throws IOException if there is an IO error
|
||||||
|
*/
|
||||||
|
public DomainResponse domain(InetAddress ipAddress) throws IOException,
|
||||||
|
GeoIp2Exception {
|
||||||
|
return this
|
||||||
|
.get(ipAddress, DomainResponse.class, false, "GeoIP2-Domain");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Look up an IP address in a GeoIP2 ISP database.
|
||||||
|
*
|
||||||
|
* @param ipAddress IPv4 or IPv6 address to lookup.
|
||||||
|
* @return an IspResponse for the requested IP address.
|
||||||
|
* @throws GeoIp2Exception if there is an error looking up the IP
|
||||||
|
* @throws IOException if there is an IO error
|
||||||
|
*/
|
||||||
|
public IspResponse isp(InetAddress ipAddress) throws IOException,
|
||||||
|
GeoIp2Exception {
|
||||||
|
return this.get(ipAddress, IspResponse.class, false, "GeoIP2-ISP");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the metadata for the open MaxMind DB file.
|
||||||
|
*/
|
||||||
|
public Metadata getMetadata() {
|
||||||
|
return this.reader.getMetadata();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,16 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
org.apache.nifi.processors.GeoEnrichIP
|
|
@ -0,0 +1,42 @@
|
||||||
|
<?xml version="1.0"?>
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-nar-bundles</artifactId>
|
||||||
|
<version>0.1.0-incubating-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<artifactId>nifi-geo-bundle</artifactId>
|
||||||
|
<packaging>pom</packaging>
|
||||||
|
<description>NiFi Geo Enrichment Capability Set</description>
|
||||||
|
|
||||||
|
<modules>
|
||||||
|
<module>nifi-geo-processors</module>
|
||||||
|
<module>nifi-geo-nar</module>
|
||||||
|
</modules>
|
||||||
|
|
||||||
|
<dependencyManagement>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-geo-processors</artifactId>
|
||||||
|
<version>0.1.0-incubating-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</dependencyManagement>
|
||||||
|
</project>
|
Loading…
Reference in New Issue