diff --git a/server/src/main/java/io/druid/server/lookup/cache/LookupCoordinatorManager.java b/server/src/main/java/io/druid/server/lookup/cache/LookupCoordinatorManager.java index f9aaee2f64f..664e338d38d 100644 --- a/server/src/main/java/io/druid/server/lookup/cache/LookupCoordinatorManager.java +++ b/server/src/main/java/io/druid/server/lookup/cache/LookupCoordinatorManager.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Predicates; import com.google.common.base.Throwables; @@ -31,8 +32,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import com.google.common.net.HostAndPort; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableScheduledFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; @@ -71,6 +74,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -121,6 +125,8 @@ public class LookupCoordinatorManager private AtomicReference>>> lookupMapConfigRef; private volatile Map>> prior_update = ImmutableMap.of(); private volatile boolean started = false; + private volatile ListenableScheduledFuture backgroundManagerFuture = null; + private final CountDownLatch backgroundManagerExitedLatch = new CountDownLatch(1); @Inject @@ -535,7 +541,7 @@ public class LookupCoordinatorManager }, null ); - executorService.scheduleWithFixedDelay( + final ListenableScheduledFuture backgroundManagerFuture = this.backgroundManagerFuture = executorService.scheduleWithFixedDelay( new Runnable() { @Override @@ -588,6 +594,27 @@ public class LookupCoordinatorManager lookupCoordinatorManagerConfig.getPeriod(), TimeUnit.MILLISECONDS ); + Futures.addCallback(backgroundManagerFuture, new FutureCallback() + { + @Override + public void onSuccess(@Nullable Object result) + { + backgroundManagerExitedLatch.countDown(); + LOG.debug("Exited background lookup manager"); + } + + @Override + public void onFailure(Throwable t) + { + backgroundManagerExitedLatch.countDown(); + if (backgroundManagerFuture.isCancelled()) { + LOG.info("Background lookup manager exited"); + LOG.trace(t, "Background lookup manager exited with throwable"); + } else { + LOG.makeAlert(t, "Background lookup manager exited with error!").emit(); + } + } + }); started = true; LOG.debug("Started"); } @@ -603,6 +630,11 @@ public class LookupCoordinatorManager } started = false; executorService.shutdownNow(); + final ListenableScheduledFuture backgroundManagerFuture = this.backgroundManagerFuture; + this.backgroundManagerFuture = null; + if (backgroundManagerFuture != null && !backgroundManagerFuture.cancel(true)) { + LOG.warn("Background lookup manager thread could not be cancelled"); + } // NOTE: we can't un-watch the configuration key LOG.debug("Stopped"); } @@ -627,4 +659,17 @@ public class LookupCoordinatorManager { return statusCode == 404; } + + @VisibleForTesting + boolean backgroundManagerIsRunning() + { + ListenableScheduledFuture backgroundManagerFuture = this.backgroundManagerFuture; + return backgroundManagerFuture != null && !backgroundManagerFuture.isDone(); + } + + @VisibleForTesting + boolean waitForBackgroundTermination(long timeout) throws InterruptedException + { + return backgroundManagerExitedLatch.await(timeout, TimeUnit.MILLISECONDS); + } } diff --git a/server/src/test/java/io/druid/server/lookup/cache/LookupCoordinatorManagerTest.java b/server/src/test/java/io/druid/server/lookup/cache/LookupCoordinatorManagerTest.java index 412e6a6b193..ddd8788cc33 100644 --- a/server/src/test/java/io/druid/server/lookup/cache/LookupCoordinatorManagerTest.java +++ b/server/src/test/java/io/druid/server/lookup/cache/LookupCoordinatorManagerTest.java @@ -1190,11 +1190,18 @@ public class LookupCoordinatorManagerTest discoverer, mapper, configManager, - lookupCoordinatorManagerConfig + new LookupCoordinatorManagerConfig(){ + @Override + public long getPeriod(){ + return 1; + } + } ); manager.start(); manager.start(); + Assert.assertTrue(manager.backgroundManagerIsRunning()); Assert.assertNull(manager.getKnownLookups()); + Assert.assertFalse(manager.waitForBackgroundTermination(10)); EasyMock.verify(configManager); } @@ -1219,8 +1226,12 @@ public class LookupCoordinatorManagerTest lookupCoordinatorManagerConfig ); manager.start(); + Assert.assertTrue(manager.backgroundManagerIsRunning()); + Assert.assertFalse(manager.waitForBackgroundTermination(10)); manager.stop(); manager.stop(); + Assert.assertTrue(manager.waitForBackgroundTermination(10)); + Assert.assertFalse(manager.backgroundManagerIsRunning()); EasyMock.verify(configManager); } @@ -1245,6 +1256,8 @@ public class LookupCoordinatorManagerTest lookupCoordinatorManagerConfig ); manager.start(); + Assert.assertTrue(manager.backgroundManagerIsRunning()); + Assert.assertFalse(manager.waitForBackgroundTermination(10)); manager.stop(); expectedException.expect(new BaseMatcher() {