From 36a1543222dcbae54d68791f1107435aabd7fc52 Mon Sep 17 00:00:00 2001 From: Roman Leventov Date: Wed, 2 Nov 2016 10:40:19 -0600 Subject: [PATCH] Lookup cache bug fixes (#3609) * Return better lastVersion from JDBCExtractionNamespaceCacheFactory's cache populator callable * Return the lastVersion if URI lookup last modified date is not later than the last cached, from URIExtractionNamespaceCacheFactory's cache populator callable * Fix a race condition in NamespaceExtractionCacheManager.cancelFuture() * Don't delete cache from NamespaceExtractionCacheManager if the ExtractionNamespaceCacheFactory returned the same version as the last; Better exception treatment in the scheduled cache updater runnable in NamespaceExtractionCacheManager (in particular, don't consume Errors); throw AssertionError in StaticMapExtractionNamespaceCacheFactory if the lastVersion != null) * In NamespaceExtractionCacheManager, put NamespaceImplData.latestVersion update in the same synchronized() block with swapAndClearCache(id, cacheId); Turn getPostRunnable which returns a callback into a simple updateNamespace() method * In StaticMapExtractionNamespaceCacheFactory.getCachePopulator(), check the input directly, not inside a callback * In URIExtractionNamespaceCacheFactory, allow URI last modified time to go backwards * Better logging in NamespaceExtractionCacheManager * Add comment on lastVersion nullability in URIExtractionNamespaceCacheFactory --- .../JDBCExtractionNamespaceCacheFactory.java | 7 +- ...ticMapExtractionNamespaceCacheFactory.java | 15 +-- .../URIExtractionNamespaceCacheFactory.java | 21 ++--- .../NamespaceExtractionCacheManager.java | 92 ++++++++----------- ...apExtractionNamespaceCacheFactoryTest.java | 8 ++ ...ceExtractionCacheManagerExecutorsTest.java | 29 ++---- 6 files changed, 80 insertions(+), 92 deletions(-) diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JDBCExtractionNamespaceCacheFactory.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JDBCExtractionNamespaceCacheFactory.java index 15724bffd6c..3900bd81b98 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JDBCExtractionNamespaceCacheFactory.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JDBCExtractionNamespaceCacheFactory.java @@ -74,6 +74,7 @@ public class JDBCExtractionNamespaceCacheFactory @Override public String call() { + final long dbQueryStart = System.currentTimeMillis(); final DBI dbi = ensureDBI(id, namespace); final String table = namespace.getTable(); final String valueColumn = namespace.getValueColumn(); @@ -118,7 +119,11 @@ public class JDBCExtractionNamespaceCacheFactory cache.put(pair.lhs, pair.rhs); } LOG.info("Finished loading %d values for namespace[%s]", cache.size(), id); - return String.format("%d", System.currentTimeMillis()); + if (lastDBUpdate != null) { + return lastDBUpdate.toString(); + } else { + return String.format("%d", dbQueryStart); + } } }; } diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/StaticMapExtractionNamespaceCacheFactory.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/StaticMapExtractionNamespaceCacheFactory.java index 4b13e4d44aa..4a4b8a771eb 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/StaticMapExtractionNamespaceCacheFactory.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/StaticMapExtractionNamespaceCacheFactory.java @@ -39,17 +39,20 @@ public class StaticMapExtractionNamespaceCacheFactory final Map swap ) { + if (lastVersion != null) { + // Throwing AssertionError, because NamespaceExtractionCacheManager doesn't suppress Errors and will stop trying + // to update the cache periodically. + throw new AssertionError( + "StaticMapExtractionNamespaceCacheFactory could only be configured for a namespace which is scheduled " + + "to be updated once, not periodically. Last version: `" + lastVersion + "`"); + } return new Callable() { @Override public String call() throws Exception { - if (version.equals(lastVersion)) { - return null; - } else { - swap.putAll(extractionNamespace.getMap()); - return version; - } + swap.putAll(extractionNamespace.getMap()); + return version; } }; } diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/URIExtractionNamespaceCacheFactory.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/URIExtractionNamespaceCacheFactory.java index 4e09c174419..c79c199987e 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/URIExtractionNamespaceCacheFactory.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/URIExtractionNamespaceCacheFactory.java @@ -22,8 +22,6 @@ package io.druid.server.lookup.namespace; import com.google.common.base.Throwables; import com.google.common.io.ByteSource; import com.google.inject.Inject; - -import io.druid.common.utils.JodaUtils; import io.druid.data.SearchableVersionedDataFinder; import io.druid.data.input.MapPopulator; import io.druid.java.util.common.CompressionUtils; @@ -33,9 +31,8 @@ import io.druid.java.util.common.logger.Logger; import io.druid.query.lookup.namespace.ExtractionNamespaceCacheFactory; import io.druid.query.lookup.namespace.URIExtractionNamespace; import io.druid.segment.loading.URIDataPuller; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.ISODateTimeFormat; +import javax.annotation.Nullable; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -65,11 +62,10 @@ public class URIExtractionNamespaceCacheFactory implements ExtractionNamespaceCa public Callable getCachePopulator( final String id, final URIExtractionNamespace extractionNamespace, - final String lastVersion, + @Nullable final String lastVersion, final Map cache ) { - final long lastCached = lastVersion == null ? JodaUtils.MIN_INSTANT : Long.parseLong(lastVersion); return new Callable() { @Override @@ -134,17 +130,16 @@ public class URIExtractionNamespaceCacheFactory implements ExtractionNamespaceCa { final String version = puller.getVersion(uri); try { - long lastModified = Long.parseLong(version); - if (lastModified <= lastCached) { - final DateTimeFormatter fmt = ISODateTimeFormat.dateTime(); + // Important to call equals() against version because lastVersion could be null + if (version.equals(lastVersion)) { log.debug( - "URI [%s] for namespace [%s] was las modified [%s] but was last cached [%s]. Skipping ", + "URI [%s] for namespace [%s] has the same last modified time [%s] as the last cached. " + + "Skipping ", uri.toString(), id, - fmt.print(lastModified), - fmt.print(lastCached) + version ); - return version; + return lastVersion; } } catch (NumberFormatException ex) { diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java index d73651bdef9..d76d4b94b21 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java @@ -76,7 +76,7 @@ public abstract class NamespaceExtractionCacheManager final Object changeLock = new Object(); final AtomicBoolean enabled = new AtomicBoolean(false); final CountDownLatch firstRun = new CountDownLatch(1); - final AtomicReference latestVersion = new AtomicReference<>(null); + volatile String latestVersion = null; } private static final Logger log = new Logger(NamespaceExtractionCacheManager.class); @@ -151,38 +151,29 @@ public abstract class NamespaceExtractionCacheManager } - protected Runnable getPostRunnable( - final String id, - final String cacheId - ) + protected void updateNamespace(final String id, final String cacheId, final String newVersion) { - return new Runnable() - { - @Override - public void run() - { - final NamespaceImplData namespaceDatum = implData.get(id); - if (namespaceDatum == null) { - // was removed + final NamespaceImplData namespaceDatum = implData.get(id); + if (namespaceDatum == null) { + // was removed + return; + } + try { + if (!namespaceDatum.enabled.get()) { + // skip because it was disabled + return; + } + synchronized (namespaceDatum.enabled) { + if (!namespaceDatum.enabled.get()) { return; } - try { - if (!namespaceDatum.enabled.get()) { - // skip because it was disabled - return; - } - synchronized (namespaceDatum.enabled) { - if (!namespaceDatum.enabled.get()) { - return; - } - swapAndClearCache(id, cacheId); - } - } - finally { - namespaceDatum.firstRun.countDown(); - } + swapAndClearCache(id, cacheId); + namespaceDatum.latestVersion = newVersion; } - }; + } + finally { + namespaceDatum.firstRun.countDown(); + } } // return value means actually delete or not @@ -289,10 +280,7 @@ public abstract class NamespaceExtractionCacheManager } } ); - if (!future.isDone() - && !future.cancel(true)) { // Interrupt to make sure we don't pollute stuff after we've already cleaned up - throw new ISE("Future for namespace [%s] was not able to be canceled", implDatum.name); - } + future.cancel(true); try { latch.await(); } @@ -328,7 +316,7 @@ public abstract class NamespaceExtractionCacheManager throw new ISE("Cannot find factory for namespace [%s]", namespace); } final String cacheId = String.format("namespace-cache-%s-%s", id, UUID.randomUUID().toString()); - return schedule(id, namespace, factory, getPostRunnable(id, cacheId), cacheId); + return schedule(id, namespace, factory, cacheId); } // For testing purposes this is protected @@ -336,7 +324,6 @@ public abstract class NamespaceExtractionCacheManager final String id, final T namespace, final ExtractionNamespaceCacheFactory factory, - final Runnable postRunnable, final String cacheId ) { @@ -369,31 +356,32 @@ public abstract class NamespaceExtractionCacheManager throw new NullPointerException(String.format("No data for namespace [%s]", id)); } final Map cache = getCacheMap(cacheId); - final String preVersion = implData.latestVersion.get(); + final String preVersion = implData.latestVersion; final Callable runnable = factory.getCachePopulator(id, namespace, preVersion, cache); tasksStarted.incrementAndGet(); final String newVersion = runnable.call(); - if (preVersion != null && preVersion.equals(newVersion)) { - throw new CancellationException(String.format("Version `%s` already exists", preVersion)); + if (newVersion.equals(preVersion)) { + log.debug("Version `%s` already exists, skipping updating cache", preVersion); + } else { + updateNamespace(id, cacheId, newVersion); + log.debug("Namespace [%s] successfully updated", id); } - if (newVersion != null) { - if (!implData.latestVersion.compareAndSet(preVersion, newVersion)) { - log.wtf("Somehow multiple threads are updating the same implData for [%s]", id); - } - } - postRunnable.run(); - log.debug("Namespace [%s] successfully updated", id); } } catch (Throwable t) { - delete(cacheId); - if (t instanceof CancellationException) { - log.debug(t, "Namespace [%s] cancelled", id); - } else { - log.error(t, "Failed update namespace [%s]", namespace); + try { + delete(cacheId); + if (t instanceof InterruptedException) { + log.debug(t, "Namespace [%s] cancelled", id); + } else { + log.error(t, "Failed update namespace [%s]", namespace); + } } - if (Thread.currentThread().isInterrupted()) { + catch (Exception e) { + t.addSuppressed(e); + } + if (Thread.currentThread().isInterrupted() || (t instanceof Error)) { throw Throwables.propagate(t); } } @@ -485,7 +473,7 @@ public abstract class NamespaceExtractionCacheManager if (implDatum == null) { return null; } - return implDatum.latestVersion.get(); + return implDatum.latestVersion; } public Collection getKnownIDs() diff --git a/extensions-core/lookups-cached-global/src/test/java/io/druid/server/lookup/namespace/StaticMapExtractionNamespaceCacheFactoryTest.java b/extensions-core/lookups-cached-global/src/test/java/io/druid/server/lookup/namespace/StaticMapExtractionNamespaceCacheFactoryTest.java index b3fcaa6daf3..8dfd766dafb 100644 --- a/extensions-core/lookups-cached-global/src/test/java/io/druid/server/lookup/namespace/StaticMapExtractionNamespaceCacheFactoryTest.java +++ b/extensions-core/lookups-cached-global/src/test/java/io/druid/server/lookup/namespace/StaticMapExtractionNamespaceCacheFactoryTest.java @@ -39,6 +39,14 @@ public class StaticMapExtractionNamespaceCacheFactoryTest final Map cache = new HashMap<>(); Assert.assertEquals(factory.getVersion(), factory.getCachePopulator(null, namespace, null, cache).call()); Assert.assertEquals(MAP, cache); + } + + @Test(expected = AssertionError.class) + public void testNonNullLastVersionCausesAssertionError() throws Exception + { + final StaticMapExtractionNamespaceCacheFactory factory = new StaticMapExtractionNamespaceCacheFactory(); + final StaticMapExtractionNamespace namespace = new StaticMapExtractionNamespace(MAP); + final Map cache = new HashMap<>(); Assert.assertNull(factory.getCachePopulator(null, namespace, factory.getVersion(), cache).call()); } } diff --git a/extensions-core/lookups-cached-global/src/test/java/io/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java b/extensions-core/lookups-cached-global/src/test/java/io/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java index f9d71e3cd61..6e4c40f73f7 100644 --- a/extensions-core/lookups-cached-global/src/test/java/io/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java +++ b/extensions-core/lookups-cached-global/src/test/java/io/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java @@ -123,30 +123,19 @@ public class NamespaceExtractionCacheManagerExecutorsTest ) { @Override - protected Runnable getPostRunnable( - final String id, - final String cacheId - ) + protected void updateNamespace(final String id, final String cacheId, final String newVersion) { - final Runnable runnable = super.getPostRunnable(id, cacheId); cacheUpdateAlerts.putIfAbsent(id, new Object()); final Object cacheUpdateAlerter = cacheUpdateAlerts.get(id); - return new Runnable() - { - @Override - public void run() - { - synchronized (cacheUpdateAlerter) { - try { - runnable.run(); - numRuns.incrementAndGet(); - } - finally { - cacheUpdateAlerter.notifyAll(); - } - } + synchronized (cacheUpdateAlerter) { + try { + super.updateNamespace(id, cacheId, newVersion); + numRuns.incrementAndGet(); } - }; + finally { + cacheUpdateAlerter.notifyAll(); + } + } } }; tmpFile = Files.createTempFile(tmpDir, "druidTestURIExtractionNS", ".dat").toFile();