NIFI-3942 Making IPLookupService reload the database file on the fly when detecting the file has changed

This commit is contained in:
Bryan Bende 2017-05-19 15:06:48 -04:00 committed by Mark Payne
parent c49933f03d
commit f35e0ecdd0
1 changed files with 109 additions and 26 deletions

View File

@ -17,16 +17,16 @@
package org.apache.nifi.lookup.maxmind; package org.apache.nifi.lookup.maxmind;
import java.io.File; import com.maxmind.geoip2.model.AnonymousIpResponse;
import java.io.IOException; import com.maxmind.geoip2.model.CityResponse;
import java.net.InetAddress; import com.maxmind.geoip2.model.ConnectionTypeResponse;
import java.util.ArrayList; import com.maxmind.geoip2.model.ConnectionTypeResponse.ConnectionType;
import java.util.HashMap; import com.maxmind.geoip2.model.DomainResponse;
import java.util.List; import com.maxmind.geoip2.model.IspResponse;
import java.util.Map; import com.maxmind.geoip2.record.Country;
import java.util.Optional; import com.maxmind.geoip2.record.Location;
import java.util.concurrent.TimeUnit; import com.maxmind.geoip2.record.Subdivision;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnEnabled;
@ -41,15 +41,19 @@ import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import com.maxmind.geoip2.model.AnonymousIpResponse; import java.io.File;
import com.maxmind.geoip2.model.CityResponse; import java.io.FileInputStream;
import com.maxmind.geoip2.model.ConnectionTypeResponse; import java.io.IOException;
import com.maxmind.geoip2.model.ConnectionTypeResponse.ConnectionType; import java.io.InputStream;
import com.maxmind.geoip2.model.DomainResponse; import java.net.InetAddress;
import com.maxmind.geoip2.model.IspResponse; import java.util.ArrayList;
import com.maxmind.geoip2.record.Country; import java.util.HashMap;
import com.maxmind.geoip2.record.Location; import java.util.List;
import com.maxmind.geoip2.record.Subdivision; import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Tags({"lookup", "enrich", "ip", "geo", "ipgeo", "maxmind", "isp", "domain", "cellular", "anonymous", "tor"}) @Tags({"lookup", "enrich", "ip", "geo", "ipgeo", "maxmind", "isp", "domain", "cellular", "anonymous", "tor"})
@CapabilityDescription("A lookup service that provides several types of enrichment information for IP addresses. The service is configured by providing a MaxMind " @CapabilityDescription("A lookup service that provides several types of enrichment information for IP addresses. The service is configured by providing a MaxMind "
@ -57,7 +61,15 @@ import com.maxmind.geoip2.record.Subdivision;
+ "service to provide all of the available enrichment data may be slower than returning only a portion of the available enrichments. View the Usage of this component " + "service to provide all of the available enrichment data may be slower than returning only a portion of the available enrichments. View the Usage of this component "
+ "and choose to view Additional Details for more information, such as the Schema that pertains to the information that is returned.") + "and choose to view Additional Details for more information, such as the Schema that pertains to the information that is returned.")
public class IPLookupService extends AbstractControllerService implements RecordLookupService { public class IPLookupService extends AbstractControllerService implements RecordLookupService {
private volatile String databaseFile = null;
private volatile DatabaseReader databaseReader = null; private volatile DatabaseReader databaseReader = null;
private volatile String databaseChecksum = null;
private volatile long databaseLastRefreshAttempt = -1;
private final Lock dbWriteLock = new ReentrantLock();
static final long REFRESH_THRESHOLD_MS = 5 * 60 * 1000;
static final PropertyDescriptor GEO_DATABASE_FILE = new PropertyDescriptor.Builder() static final PropertyDescriptor GEO_DATABASE_FILE = new PropertyDescriptor.Builder()
.name("database-file") .name("database-file")
@ -65,6 +77,7 @@ public class IPLookupService extends AbstractControllerService implements Record
.description("Path to Maxmind IP Enrichment Database File") .description("Path to Maxmind IP Enrichment Database File")
.required(true) .required(true)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.expressionLanguageSupported(true)
.build(); .build();
static final PropertyDescriptor LOOKUP_CITY = new PropertyDescriptor.Builder() static final PropertyDescriptor LOOKUP_CITY = new PropertyDescriptor.Builder()
.name("lookup-city") .name("lookup-city")
@ -128,13 +141,23 @@ public class IPLookupService extends AbstractControllerService implements Record
@OnEnabled @OnEnabled
public void onEnabled(final ConfigurationContext context) throws IOException { public void onEnabled(final ConfigurationContext context) throws IOException {
final String dbFileString = context.getProperty(GEO_DATABASE_FILE).getValue(); databaseFile = context.getProperty(GEO_DATABASE_FILE).evaluateAttributeExpressions().getValue();
final File dbFile = new File(dbFileString);
final StopWatch stopWatch = new StopWatch(true); final File dbFile = new File(databaseFile);
final DatabaseReader reader = new DatabaseReader.Builder(dbFile).build(); final String dbFileChecksum = getChecksum(dbFile);
stopWatch.stop(); loadDatabase(dbFile, dbFileChecksum);
getLogger().info("Completed loading of Maxmind Database. Elapsed time was {} milliseconds.", new Object[] {stopWatch.getDuration(TimeUnit.MILLISECONDS)});
databaseReader = reader; // initialize the last refresh attempt to the time the service was enabled
databaseLastRefreshAttempt = System.currentTimeMillis();
}
private String getChecksum(final File file) throws IOException {
String fileChecksum;
try (final InputStream in = new FileInputStream(file)){
fileChecksum = DigestUtils.md5Hex(in);
}
return fileChecksum;
} }
@OnStopped @OnStopped
@ -143,6 +166,11 @@ public class IPLookupService extends AbstractControllerService implements Record
if (reader != null) { if (reader != null) {
reader.close(); reader.close();
} }
databaseFile = null;
databaseReader = null;
databaseChecksum = null;
databaseLastRefreshAttempt = -1;
} }
@Override @Override
@ -151,6 +179,19 @@ public class IPLookupService extends AbstractControllerService implements Record
return Optional.empty(); return Optional.empty();
} }
// determine if we should attempt to refresh the database based on exceeding a certain amount of time since last refresh
if (shouldAttemptDatabaseRefresh()) {
try {
refreshDatabase();
} catch (IOException e) {
throw new LookupFailureException("Failed to refresh database file: " + e.getMessage(), e);
}
}
// assign to a local so we don't need a read lock, this way another thread can update the member variable reference
// while the current thread continues using the local reference
final DatabaseReader databaseReader = this.databaseReader;
final InetAddress inetAddress; final InetAddress inetAddress;
try { try {
inetAddress = InetAddress.getByName(key); inetAddress = InetAddress.getByName(key);
@ -239,6 +280,48 @@ public class IPLookupService extends AbstractControllerService implements Record
return Optional.ofNullable(createContainerRecord(geoRecord, ispRecord, domainName, connectionType, anonymousIpRecord)); return Optional.ofNullable(createContainerRecord(geoRecord, ispRecord, domainName, connectionType, anonymousIpRecord));
} }
// returns true if the reader was never initialized or if the database hasn't been updated in longer than our threshold
private boolean shouldAttemptDatabaseRefresh() {
return System.currentTimeMillis() - databaseLastRefreshAttempt >= REFRESH_THRESHOLD_MS;
}
private void refreshDatabase() throws IOException {
// since this is the only place the write lock is used, if something else has it then we know another thread is
// already refreshing the database so we can just move on if we don't get the lock, no need to block
if (dbWriteLock.tryLock()) {
try {
// now that we have the lock check again to make sure we still need to refresh
if (shouldAttemptDatabaseRefresh()) {
final File dbFile = new File(databaseFile);
final String dbFileChecksum = getChecksum(dbFile);
if (!dbFileChecksum.equals(databaseChecksum)) {
loadDatabase(dbFile, dbFileChecksum);
} else {
getLogger().debug("Checksum hasn't changed, database will not be reloaded");
}
// update the timestamp even if we didn't refresh so that we'll wait a full threshold again
databaseLastRefreshAttempt = System.currentTimeMillis();
} else {
getLogger().debug("Acquired write lock, but no longer need to reload the database");
}
} finally {
dbWriteLock.unlock();
}
} else {
getLogger().debug("Unable to acquire write lock, skipping reload of database");
}
}
private void loadDatabase(final File dbFile, final String dbFileChecksum) throws IOException {
final StopWatch stopWatch = new StopWatch(true);
final DatabaseReader reader = new DatabaseReader.Builder(dbFile).build();
stopWatch.stop();
getLogger().info("Completed loading of Maxmind Database. Elapsed time was {} milliseconds.", new Object[]{stopWatch.getDuration(TimeUnit.MILLISECONDS)});
databaseReader = reader;
databaseChecksum = dbFileChecksum;
}
private Record createRecord(final CityResponse city) { private Record createRecord(final CityResponse city) {
if (city == null) { if (city == null) {
return null; return null;