diff --git a/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java b/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java index 5996235f1db..27782c9f852 100644 --- a/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java +++ b/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java @@ -336,150 +336,72 @@ public abstract class NamespaceExtractionCacheManager { final String namespaceName = namespace.getNamespace(); log.debug("Trying to update namespace [%s]", namespaceName); - final AtomicReference implDatum = new AtomicReference<>(implData.get(namespaceName)); - if (implDatum.get() != null) { - synchronized (implDatum.get().enabled) { - if (implDatum.get().enabled.get()) { + final NamespaceImplData implDatum = implData.get(namespaceName); + if (implDatum != null) { + synchronized (implDatum.enabled) { + if (implDatum.enabled.get()) { // We also check at the end of the function, but fail fast here throw new IAE("Namespace [%s] already exists! Leaving prior running", namespace.toString()); } } } + final long updateMs = namespace.getPollMs(); final CountDownLatch startLatch = new CountDownLatch(1); + + final Runnable command = new Runnable() + { + @Override + public void run() + { + try { + startLatch.await(); // wait for "election" to leadership or cancellation + if (!Thread.currentThread().isInterrupted()) { + final Map cache = getCacheMap(cacheId); + final String preVersion = lastVersion.get(namespaceName); + final Callable runnable = factory.getCachePopulator(namespace, preVersion, cache); + + tasksStarted.incrementAndGet(); + final String newVersion = runnable.call(); + if (preVersion != null && preVersion.equals(newVersion)) { + throw new IllegalStateException("Already exists"); + } + if (newVersion != null) { + lastVersion.put(namespaceName, newVersion); + } + postRunnable.run(); + log.debug("Namespace [%s] successfully updated", namespaceName); + } + } + catch (Throwable t) { + delete(cacheId); + if (t instanceof CancellationException) { + log.debug(t, "Namespace [%s] cancelled", namespaceName); + } else { + log.error(t, "Failed update namespace [%s]", namespace); + } + } + } + }; + + ListenableFuture future; try { - ListenableFuture future = null; - try { - if (namespace.getPollMs() > 0) { - final long updateMs = namespace.getPollMs(); - future = listeningScheduledExecutorService.scheduleAtFixedRate( - new Runnable() - { - @Override - public void run() - { - try { - startLatch.await(); // wait for "election" to leadership or cancellation - if (!Thread.interrupted()) { - final Map cache = getCacheMap(cacheId); - final String preVersion = lastVersion.get(namespaceName); - final Callable runnable = factory.getCachePopulator(namespace, preVersion, cache); - tasksStarted.incrementAndGet(); - final String newVersion = runnable.call(); - if (newVersion != null) { - lastVersion.put(namespaceName, newVersion); - } - if (preVersion == null || !preVersion.equals(lastVersion.get(namespaceName))) { - postRunnable.run(); - } else { - delete(cacheId); - } - } else { - Thread.currentThread().interrupt(); - } - } - catch (Exception e) { - if (e instanceof CancellationException) { - log.debug("Thread for namespace[%s] cancelled", namespaceName); - } else { - log.error(e, "Error in listener for namespace [%s]", namespaceName); - } - // Don't leave stale cache on error - delete(cacheId); - throw Throwables.propagate(e); - } - } - }, - 0, - updateMs, - TimeUnit.MILLISECONDS - ); - } else { - final Map cache = getCacheMap(cacheId); - final Callable runnable = factory.getCachePopulator(namespace, null, cache); - final ListenableFuture futureWithString = listeningScheduledExecutorService.schedule( - new Callable() - { - @Override - public String call() throws Exception - { - startLatch.await(); // wait for "election" to leadership or cancellation - tasksStarted.incrementAndGet(); - return runnable.call(); - } - }, - 0, - TimeUnit.MILLISECONDS - ); - Futures.addCallback( - futureWithString, new FutureCallback() - { - @Override - public void onSuccess(String result) - { - try { - postRunnable.run(); - } - catch (RuntimeException e) { - delete(cacheId); - throw e; - } - // Must have been set in order to make it here - if (implDatum.get().enabled.get()) { - lastVersion.put(namespaceName, result); - } - } - - @Override - public void onFailure(Throwable t) - { - // NOOP - } - } - ); - future = futureWithString; - } + if (updateMs > 0) { + future = listeningScheduledExecutorService.scheduleAtFixedRate(command, 0, updateMs, TimeUnit.MILLISECONDS); + } else { + future = listeningScheduledExecutorService.schedule(command, 0, TimeUnit.MILLISECONDS); } - catch (Exception e) { - if (future != null) { - if (!future.isDone() && !future.cancel(true)) { - log.warn("Could not cancel future for [%s]", namespaceName); - } - } - throw Throwables.propagate(e); - } - Futures.addCallback( - future, new FutureCallback() - { - @Override - public void onSuccess(@Nullable Object result) - { - log.debug("Namespace [%s] successfully updated", namespaceName); - } - @Override - public void onFailure(Throwable t) - { - delete(cacheId); - if (t instanceof CancellationException) { - log.debug(t, "Namespace [%s] cancelled", namespaceName); - } else { - log.error(t, "Failed update namespace [%s]", namespace); - } - } - } - ); final NamespaceImplData me = new NamespaceImplData(future, namespace, namespaceName); final NamespaceImplData other = implData.putIfAbsent(namespaceName, me); if (other != null) { if (!future.isDone() && !future.cancel(true)) { - log.warn("Unable to cancle future for namespace[%s] on race loss", namespaceName); + log.warn("Unable to cancel future for namespace[%s] on race loss", namespaceName); } throw new IAE("Namespace [%s] already exists! Leaving prior running", namespace); } else { if (!me.enabled.compareAndSet(false, true)) { log.wtf("How did someone enable this before ME?"); } - implDatum.set(me); log.debug("I own namespace [%s]", namespaceName); return future; } diff --git a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactoryTest.java b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactoryTest.java index 64e431c09ef..e6a6009231c 100644 --- a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactoryTest.java +++ b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactoryTest.java @@ -308,10 +308,7 @@ public class URIExtractionNamespaceFunctionFactoryTest Assert.assertNull(fnCache.get(namespace.getNamespace())); NamespaceExtractionCacheManagersTest.waitFor(manager.schedule(namespace)); Function fn = fnCache.get(namespace.getNamespace()); - while (fn == null) { - Thread.sleep(1); - fn = fnCache.get(namespace.getNamespace()); - } + Assert.assertNotNull(fn); Assert.assertEquals("bar", fn.apply("foo")); Assert.assertEquals(null, fn.apply("baz")); } @@ -355,10 +352,7 @@ public class URIExtractionNamespaceFunctionFactoryTest for (int i = 0; i < size; ++i) { URIExtractionNamespace namespace = namespaces.get(i); Function fn = fnCache.get(namespace.getNamespace()); - while (fn == null) { - Thread.sleep(1); - fn = fnCache.get(namespace.getNamespace()); - } + Assert.assertNotNull(fn); Assert.assertEquals("bar", fn.apply("foo")); Assert.assertEquals(null, fn.apply("baz")); manager.delete(namespace.getNamespace()); diff --git a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java index 52ea9bef959..a6fdceef3a9 100644 --- a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java +++ b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java @@ -67,6 +67,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; /** * diff --git a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagersTest.java b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagersTest.java index 402e6e8815a..d9f80a61c00 100644 --- a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagersTest.java +++ b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagersTest.java @@ -46,6 +46,8 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; /** * @@ -145,31 +147,16 @@ public class NamespaceExtractionCacheManagersTest Assert.assertArrayEquals(nsList.toArray(), retvalList.toArray()); } - public static void waitFor(ListenableFuture future) throws InterruptedException + public static void waitFor(Future future) throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - Futures.addCallback( - future, new FutureCallback() - { - @Override - public void onSuccess(Object result) - { - latch.countDown(); - } - - @Override - public void onFailure(Throwable t) - { - try { - log.error(t, "Error waiting"); - throw Throwables.propagate(t); - } - finally { - latch.countDown(); - } - } - } - ); - latch.await(); + while (!future.isDone()) { + try { + future.get(); + } + catch (ExecutionException e) { + log.error(e.getCause(), "Error waiting"); + throw Throwables.propagate(e.getCause()); + } + } } }