NIFI-3942 Added retry logic if a lookup fails due to InvalidDatabaseException which occurs if the underlying file was modified before we could refresh the reader

This closes #1831.
This commit is contained in:
Bryan Bende 2017-05-19 17:58:52 -04:00 committed by Mark Payne
parent f35e0ecdd0
commit 71cd497fef

View File

@ -17,6 +17,7 @@
package org.apache.nifi.lookup.maxmind; package org.apache.nifi.lookup.maxmind;
import com.maxmind.db.InvalidDatabaseException;
import com.maxmind.geoip2.model.AnonymousIpResponse; import com.maxmind.geoip2.model.AnonymousIpResponse;
import com.maxmind.geoip2.model.CityResponse; import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.model.ConnectionTypeResponse; import com.maxmind.geoip2.model.ConnectionTypeResponse;
@ -188,10 +189,41 @@ public class IPLookupService extends AbstractControllerService implements Record
} }
} }
// assign to a local so we don't need a read lock, this way another thread can update the member variable reference // If an external process changes the underlying file before we have a chance to reload the reader, then we'll get an
// while the current thread continues using the local reference // InvalidDatabaseException, so force a reload and then retry the lookup one time, if we still get an error then throw it
try {
final DatabaseReader databaseReader = this.databaseReader; final DatabaseReader databaseReader = this.databaseReader;
return doLookup(databaseReader, key);
} catch (InvalidDatabaseException idbe) {
if (dbWriteLock.tryLock()) {
try {
getLogger().debug("Attempting to reload database after InvalidDatabaseException");
try {
final File dbFile = new File(databaseFile);
final String dbFileChecksum = getChecksum(dbFile);
loadDatabase(dbFile, dbFileChecksum);
databaseLastRefreshAttempt = System.currentTimeMillis();
} catch (IOException ioe) {
throw new LookupFailureException("Error reloading database due to: " + ioe.getMessage(), ioe);
}
getLogger().debug("Attempting to retry lookup after InvalidDatabaseException");
try {
final DatabaseReader databaseReader = this.databaseReader;
return doLookup(databaseReader, key);
} catch (final Exception e) {
throw new LookupFailureException("Error performing look up: " + e.getMessage(), e);
}
} finally {
dbWriteLock.unlock();
}
} else {
throw new LookupFailureException("Failed to lookup the key " + key + " due to " + idbe.getMessage(), idbe);
}
}
}
private Optional<Record> doLookup(final DatabaseReader databaseReader, final String key) throws LookupFailureException, InvalidDatabaseException {
final InetAddress inetAddress; final InetAddress inetAddress;
try { try {
inetAddress = InetAddress.getByName(key); inetAddress = InetAddress.getByName(key);
@ -207,6 +239,8 @@ public class IPLookupService extends AbstractControllerService implements Record
final CityResponse cityResponse; final CityResponse cityResponse;
try { try {
cityResponse = databaseReader.city(inetAddress); cityResponse = databaseReader.city(inetAddress);
} catch (final InvalidDatabaseException idbe) {
throw idbe;
} catch (final Exception e) { } catch (final Exception e) {
throw new LookupFailureException("Failed to lookup City information for IP Address " + inetAddress, e); throw new LookupFailureException("Failed to lookup City information for IP Address " + inetAddress, e);
} }
@ -221,6 +255,8 @@ public class IPLookupService extends AbstractControllerService implements Record
final IspResponse ispResponse; final IspResponse ispResponse;
try { try {
ispResponse = databaseReader.isp(inetAddress); ispResponse = databaseReader.isp(inetAddress);
} catch (final InvalidDatabaseException idbe) {
throw idbe;
} catch (final Exception e) { } catch (final Exception e) {
throw new LookupFailureException("Failed to lookup ISP information for IP Address " + inetAddress, e); throw new LookupFailureException("Failed to lookup ISP information for IP Address " + inetAddress, e);
} }
@ -235,6 +271,8 @@ public class IPLookupService extends AbstractControllerService implements Record
final DomainResponse domainResponse; final DomainResponse domainResponse;
try { try {
domainResponse = databaseReader.domain(inetAddress); domainResponse = databaseReader.domain(inetAddress);
} catch (final InvalidDatabaseException idbe) {
throw idbe;
} catch (final Exception e) { } catch (final Exception e) {
throw new LookupFailureException("Failed to lookup Domain information for IP Address " + inetAddress, e); throw new LookupFailureException("Failed to lookup Domain information for IP Address " + inetAddress, e);
} }
@ -249,6 +287,8 @@ public class IPLookupService extends AbstractControllerService implements Record
final ConnectionTypeResponse connectionTypeResponse; final ConnectionTypeResponse connectionTypeResponse;
try { try {
connectionTypeResponse = databaseReader.connectionType(inetAddress); connectionTypeResponse = databaseReader.connectionType(inetAddress);
} catch (final InvalidDatabaseException idbe) {
throw idbe;
} catch (final Exception e) { } catch (final Exception e) {
throw new LookupFailureException("Failed to lookup Domain information for IP Address " + inetAddress, e); throw new LookupFailureException("Failed to lookup Domain information for IP Address " + inetAddress, e);
} }
@ -268,6 +308,8 @@ public class IPLookupService extends AbstractControllerService implements Record
final AnonymousIpResponse anonymousIpResponse; final AnonymousIpResponse anonymousIpResponse;
try { try {
anonymousIpResponse = databaseReader.anonymousIp(inetAddress); anonymousIpResponse = databaseReader.anonymousIp(inetAddress);
} catch (final InvalidDatabaseException idbe) {
throw idbe;
} catch (final Exception e) { } catch (final Exception e) {
throw new LookupFailureException("Failed to lookup Anonymous IP Information for IP Address " + inetAddress, e); throw new LookupFailureException("Failed to lookup Anonymous IP Information for IP Address " + inetAddress, e);
} }