Lookup cache bug fixes (#3609)

* Return better lastVersion from JDBCExtractionNamespaceCacheFactory's cache populator callable

* Return the lastVersion if URI lookup last modified date is not later than the last cached, from URIExtractionNamespaceCacheFactory's cache populator callable

* Fix a race condition in NamespaceExtractionCacheManager.cancelFuture()

* Don't delete cache from NamespaceExtractionCacheManager if the ExtractionNamespaceCacheFactory returned the same version as the last; Better exception treatment in the scheduled cache updater runnable in NamespaceExtractionCacheManager (in particular, don't consume Errors); throw AssertionError in StaticMapExtractionNamespaceCacheFactory if the lastVersion != null)

* In NamespaceExtractionCacheManager, put NamespaceImplData.latestVersion update in the same synchronized() block with swapAndClearCache(id, cacheId); Turn getPostRunnable which returns a callback into a simple updateNamespace() method

* In StaticMapExtractionNamespaceCacheFactory.getCachePopulator(), check the input directly, not inside a callback

* In URIExtractionNamespaceCacheFactory, allow URI last modified time to go backwards

* Better logging in NamespaceExtractionCacheManager

* Add comment on lastVersion nullability in URIExtractionNamespaceCacheFactory
This commit is contained in:
Roman Leventov 2016-11-02 10:40:19 -06:00 committed by Fangjin Yang
parent 2961406b90
commit 36a1543222
6 changed files with 80 additions and 92 deletions

View File

@ -74,6 +74,7 @@ public class JDBCExtractionNamespaceCacheFactory
@Override @Override
public String call() public String call()
{ {
final long dbQueryStart = System.currentTimeMillis();
final DBI dbi = ensureDBI(id, namespace); final DBI dbi = ensureDBI(id, namespace);
final String table = namespace.getTable(); final String table = namespace.getTable();
final String valueColumn = namespace.getValueColumn(); final String valueColumn = namespace.getValueColumn();
@ -118,7 +119,11 @@ public class JDBCExtractionNamespaceCacheFactory
cache.put(pair.lhs, pair.rhs); cache.put(pair.lhs, pair.rhs);
} }
LOG.info("Finished loading %d values for namespace[%s]", cache.size(), id); LOG.info("Finished loading %d values for namespace[%s]", cache.size(), id);
return String.format("%d", System.currentTimeMillis()); if (lastDBUpdate != null) {
return lastDBUpdate.toString();
} else {
return String.format("%d", dbQueryStart);
}
} }
}; };
} }

View File

@ -39,17 +39,20 @@ public class StaticMapExtractionNamespaceCacheFactory
final Map<String, String> swap final Map<String, String> swap
) )
{ {
if (lastVersion != null) {
// Throwing AssertionError, because NamespaceExtractionCacheManager doesn't suppress Errors and will stop trying
// to update the cache periodically.
throw new AssertionError(
"StaticMapExtractionNamespaceCacheFactory could only be configured for a namespace which is scheduled " +
"to be updated once, not periodically. Last version: `" + lastVersion + "`");
}
return new Callable<String>() return new Callable<String>()
{ {
@Override @Override
public String call() throws Exception public String call() throws Exception
{ {
if (version.equals(lastVersion)) { swap.putAll(extractionNamespace.getMap());
return null; return version;
} else {
swap.putAll(extractionNamespace.getMap());
return version;
}
} }
}; };
} }

View File

@ -22,8 +22,6 @@ package io.druid.server.lookup.namespace;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.io.ByteSource; import com.google.common.io.ByteSource;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.druid.common.utils.JodaUtils;
import io.druid.data.SearchableVersionedDataFinder; import io.druid.data.SearchableVersionedDataFinder;
import io.druid.data.input.MapPopulator; import io.druid.data.input.MapPopulator;
import io.druid.java.util.common.CompressionUtils; import io.druid.java.util.common.CompressionUtils;
@ -33,9 +31,8 @@ import io.druid.java.util.common.logger.Logger;
import io.druid.query.lookup.namespace.ExtractionNamespaceCacheFactory; import io.druid.query.lookup.namespace.ExtractionNamespaceCacheFactory;
import io.druid.query.lookup.namespace.URIExtractionNamespace; import io.druid.query.lookup.namespace.URIExtractionNamespace;
import io.druid.segment.loading.URIDataPuller; import io.druid.segment.loading.URIDataPuller;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import javax.annotation.Nullable;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -65,11 +62,10 @@ public class URIExtractionNamespaceCacheFactory implements ExtractionNamespaceCa
public Callable<String> getCachePopulator( public Callable<String> getCachePopulator(
final String id, final String id,
final URIExtractionNamespace extractionNamespace, final URIExtractionNamespace extractionNamespace,
final String lastVersion, @Nullable final String lastVersion,
final Map<String, String> cache final Map<String, String> cache
) )
{ {
final long lastCached = lastVersion == null ? JodaUtils.MIN_INSTANT : Long.parseLong(lastVersion);
return new Callable<String>() return new Callable<String>()
{ {
@Override @Override
@ -134,17 +130,16 @@ public class URIExtractionNamespaceCacheFactory implements ExtractionNamespaceCa
{ {
final String version = puller.getVersion(uri); final String version = puller.getVersion(uri);
try { try {
long lastModified = Long.parseLong(version); // Important to call equals() against version because lastVersion could be null
if (lastModified <= lastCached) { if (version.equals(lastVersion)) {
final DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
log.debug( log.debug(
"URI [%s] for namespace [%s] was las modified [%s] but was last cached [%s]. Skipping ", "URI [%s] for namespace [%s] has the same last modified time [%s] as the last cached. " +
"Skipping ",
uri.toString(), uri.toString(),
id, id,
fmt.print(lastModified), version
fmt.print(lastCached)
); );
return version; return lastVersion;
} }
} }
catch (NumberFormatException ex) { catch (NumberFormatException ex) {

View File

@ -76,7 +76,7 @@ public abstract class NamespaceExtractionCacheManager
final Object changeLock = new Object(); final Object changeLock = new Object();
final AtomicBoolean enabled = new AtomicBoolean(false); final AtomicBoolean enabled = new AtomicBoolean(false);
final CountDownLatch firstRun = new CountDownLatch(1); final CountDownLatch firstRun = new CountDownLatch(1);
final AtomicReference<String> latestVersion = new AtomicReference<>(null); volatile String latestVersion = null;
} }
private static final Logger log = new Logger(NamespaceExtractionCacheManager.class); private static final Logger log = new Logger(NamespaceExtractionCacheManager.class);
@ -151,38 +151,29 @@ public abstract class NamespaceExtractionCacheManager
} }
protected Runnable getPostRunnable( protected void updateNamespace(final String id, final String cacheId, final String newVersion)
final String id,
final String cacheId
)
{ {
return new Runnable() final NamespaceImplData namespaceDatum = implData.get(id);
{ if (namespaceDatum == null) {
@Override // was removed
public void run() return;
{ }
final NamespaceImplData namespaceDatum = implData.get(id); try {
if (namespaceDatum == null) { if (!namespaceDatum.enabled.get()) {
// was removed // skip because it was disabled
return;
}
synchronized (namespaceDatum.enabled) {
if (!namespaceDatum.enabled.get()) {
return; return;
} }
try { swapAndClearCache(id, cacheId);
if (!namespaceDatum.enabled.get()) { namespaceDatum.latestVersion = newVersion;
// skip because it was disabled
return;
}
synchronized (namespaceDatum.enabled) {
if (!namespaceDatum.enabled.get()) {
return;
}
swapAndClearCache(id, cacheId);
}
}
finally {
namespaceDatum.firstRun.countDown();
}
} }
}; }
finally {
namespaceDatum.firstRun.countDown();
}
} }
// return value means actually delete or not // return value means actually delete or not
@ -289,10 +280,7 @@ public abstract class NamespaceExtractionCacheManager
} }
} }
); );
if (!future.isDone() future.cancel(true);
&& !future.cancel(true)) { // Interrupt to make sure we don't pollute stuff after we've already cleaned up
throw new ISE("Future for namespace [%s] was not able to be canceled", implDatum.name);
}
try { try {
latch.await(); latch.await();
} }
@ -328,7 +316,7 @@ public abstract class NamespaceExtractionCacheManager
throw new ISE("Cannot find factory for namespace [%s]", namespace); throw new ISE("Cannot find factory for namespace [%s]", namespace);
} }
final String cacheId = String.format("namespace-cache-%s-%s", id, UUID.randomUUID().toString()); final String cacheId = String.format("namespace-cache-%s-%s", id, UUID.randomUUID().toString());
return schedule(id, namespace, factory, getPostRunnable(id, cacheId), cacheId); return schedule(id, namespace, factory, cacheId);
} }
// For testing purposes this is protected // For testing purposes this is protected
@ -336,7 +324,6 @@ public abstract class NamespaceExtractionCacheManager
final String id, final String id,
final T namespace, final T namespace,
final ExtractionNamespaceCacheFactory<T> factory, final ExtractionNamespaceCacheFactory<T> factory,
final Runnable postRunnable,
final String cacheId final String cacheId
) )
{ {
@ -369,31 +356,32 @@ public abstract class NamespaceExtractionCacheManager
throw new NullPointerException(String.format("No data for namespace [%s]", id)); throw new NullPointerException(String.format("No data for namespace [%s]", id));
} }
final Map<String, String> cache = getCacheMap(cacheId); final Map<String, String> cache = getCacheMap(cacheId);
final String preVersion = implData.latestVersion.get(); final String preVersion = implData.latestVersion;
final Callable<String> runnable = factory.getCachePopulator(id, namespace, preVersion, cache); final Callable<String> runnable = factory.getCachePopulator(id, namespace, preVersion, cache);
tasksStarted.incrementAndGet(); tasksStarted.incrementAndGet();
final String newVersion = runnable.call(); final String newVersion = runnable.call();
if (preVersion != null && preVersion.equals(newVersion)) { if (newVersion.equals(preVersion)) {
throw new CancellationException(String.format("Version `%s` already exists", preVersion)); log.debug("Version `%s` already exists, skipping updating cache", preVersion);
} else {
updateNamespace(id, cacheId, newVersion);
log.debug("Namespace [%s] successfully updated", id);
} }
if (newVersion != null) {
if (!implData.latestVersion.compareAndSet(preVersion, newVersion)) {
log.wtf("Somehow multiple threads are updating the same implData for [%s]", id);
}
}
postRunnable.run();
log.debug("Namespace [%s] successfully updated", id);
} }
} }
catch (Throwable t) { catch (Throwable t) {
delete(cacheId); try {
if (t instanceof CancellationException) { delete(cacheId);
log.debug(t, "Namespace [%s] cancelled", id); if (t instanceof InterruptedException) {
} else { log.debug(t, "Namespace [%s] cancelled", id);
log.error(t, "Failed update namespace [%s]", namespace); } else {
log.error(t, "Failed update namespace [%s]", namespace);
}
} }
if (Thread.currentThread().isInterrupted()) { catch (Exception e) {
t.addSuppressed(e);
}
if (Thread.currentThread().isInterrupted() || (t instanceof Error)) {
throw Throwables.propagate(t); throw Throwables.propagate(t);
} }
} }
@ -485,7 +473,7 @@ public abstract class NamespaceExtractionCacheManager
if (implDatum == null) { if (implDatum == null) {
return null; return null;
} }
return implDatum.latestVersion.get(); return implDatum.latestVersion;
} }
public Collection<String> getKnownIDs() public Collection<String> getKnownIDs()

View File

@ -39,6 +39,14 @@ public class StaticMapExtractionNamespaceCacheFactoryTest
final Map<String, String> cache = new HashMap<>(); final Map<String, String> cache = new HashMap<>();
Assert.assertEquals(factory.getVersion(), factory.getCachePopulator(null, namespace, null, cache).call()); Assert.assertEquals(factory.getVersion(), factory.getCachePopulator(null, namespace, null, cache).call());
Assert.assertEquals(MAP, cache); Assert.assertEquals(MAP, cache);
}
@Test(expected = AssertionError.class)
public void testNonNullLastVersionCausesAssertionError() throws Exception
{
final StaticMapExtractionNamespaceCacheFactory factory = new StaticMapExtractionNamespaceCacheFactory();
final StaticMapExtractionNamespace namespace = new StaticMapExtractionNamespace(MAP);
final Map<String, String> cache = new HashMap<>();
Assert.assertNull(factory.getCachePopulator(null, namespace, factory.getVersion(), cache).call()); Assert.assertNull(factory.getCachePopulator(null, namespace, factory.getVersion(), cache).call());
} }
} }

View File

@ -123,30 +123,19 @@ public class NamespaceExtractionCacheManagerExecutorsTest
) )
{ {
@Override @Override
protected Runnable getPostRunnable( protected void updateNamespace(final String id, final String cacheId, final String newVersion)
final String id,
final String cacheId
)
{ {
final Runnable runnable = super.getPostRunnable(id, cacheId);
cacheUpdateAlerts.putIfAbsent(id, new Object()); cacheUpdateAlerts.putIfAbsent(id, new Object());
final Object cacheUpdateAlerter = cacheUpdateAlerts.get(id); final Object cacheUpdateAlerter = cacheUpdateAlerts.get(id);
return new Runnable() synchronized (cacheUpdateAlerter) {
{ try {
@Override super.updateNamespace(id, cacheId, newVersion);
public void run() numRuns.incrementAndGet();
{
synchronized (cacheUpdateAlerter) {
try {
runnable.run();
numRuns.incrementAndGet();
}
finally {
cacheUpdateAlerter.notifyAll();
}
}
} }
}; finally {
cacheUpdateAlerter.notifyAll();
}
}
} }
}; };
tmpFile = Files.createTempFile(tmpDir, "druidTestURIExtractionNS", ".dat").toFile(); tmpFile = Files.createTempFile(tmpDir, "druidTestURIExtractionNS", ".dat").toFile();