diff --git a/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java b/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java index 5ef79187ef6..448ed879043 100644 --- a/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java +++ b/extensions/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java @@ -148,9 +148,9 @@ public abstract class NamespaceExtractionCacheManager ); } - protected void waitForServiceToEnd(long time, TimeUnit unit) throws InterruptedException + protected boolean waitForServiceToEnd(long time, TimeUnit unit) throws InterruptedException { - listeningScheduledExecutorService.awaitTermination(time, unit); + return listeningScheduledExecutorService.awaitTermination(time, unit); } diff --git a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java index 90325696139..dca52f7eea9 100644 --- a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java +++ b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java @@ -23,15 +23,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.common.IAE; import com.metamx.common.lifecycle.Lifecycle; -import com.metamx.common.logger.Logger; +import io.druid.concurrent.Execs; import io.druid.data.SearchableVersionedDataFinder; -import io.druid.jackson.DefaultObjectMapper; import io.druid.query.extraction.namespace.ExtractionNamespace; import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory; import io.druid.query.extraction.namespace.URIExtractionNamespace; @@ -39,13 +38,13 @@ import io.druid.query.extraction.namespace.URIExtractionNamespaceTest; import io.druid.segment.loading.LocalFileTimestampVersionFinder; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.server.namespace.URIExtractionNamespaceFunctionFactory; -import org.apache.commons.io.FileUtils; import org.joda.time.Period; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.FileOutputStream; @@ -58,13 +57,14 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Map; -import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; /** @@ -72,73 +72,125 @@ import java.util.concurrent.atomic.AtomicLong; */ public class NamespaceExtractionCacheManagerExecutorsTest { - private static final Logger log = new Logger(NamespaceExtractionCacheManagerExecutorsTest.class); - private static Path tmpDir; + private static final String KEY = "foo"; + private static final String VALUE = "bar"; + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); private Lifecycle lifecycle; private NamespaceExtractionCacheManager manager; private File tmpFile; - private URIExtractionNamespaceFunctionFactory factory; - private final ConcurrentHashMap> fnCache = new ConcurrentHashMap>(); + private final ConcurrentMap> fnCache = new ConcurrentHashMap<>(); + private final ConcurrentMap cacheUpdateAlerts = new ConcurrentHashMap<>(); - @BeforeClass - public static void setUpStatic() throws IOException - { - tmpDir = Files.createTempDirectory("TestNamespaceExtractionCacheManagerExecutors"); - } - - @AfterClass - public static void tearDownStatic() throws IOException - { - FileUtils.deleteDirectory(tmpDir.toFile()); - } + private final AtomicLong numRuns = new AtomicLong(0L); @Before public void setUp() throws IOException { + final Path tmpDir = temporaryFolder.newFolder().toPath(); lifecycle = new Lifecycle(); - manager = new OnHeapNamespaceExtractionCacheManager( - lifecycle, fnCache, new NoopServiceEmitter(), - ImmutableMap., ExtractionNamespaceFunctionFactory>of( - URIExtractionNamespace.class, - new URIExtractionNamespaceFunctionFactory( - ImmutableMap.of("file", new LocalFileTimestampVersionFinder()) - ) - ) - ); - tmpFile = Files.createTempFile(tmpDir, "druidTestURIExtractionNS", ".dat").toFile(); - tmpFile.deleteOnExit(); - final ObjectMapper mapper = new DefaultObjectMapper(); - try (OutputStream ostream = new FileOutputStream(tmpFile)) { - try (OutputStreamWriter out = new OutputStreamWriter(ostream)) { - out.write(mapper.writeValueAsString(ImmutableMap.of("foo", "bar"))); - } - } - factory = new URIExtractionNamespaceFunctionFactory( + final URIExtractionNamespaceFunctionFactory factory = new URIExtractionNamespaceFunctionFactory( ImmutableMap.of("file", new LocalFileTimestampVersionFinder()) ) { + @Override public Callable getCachePopulator( final URIExtractionNamespace extractionNamespace, final String lastVersion, final Map cache ) { - final Callable superCallable = super.getCachePopulator(extractionNamespace, lastVersion, cache); return new Callable() { @Override public String call() throws Exception { - superCallable.call(); - return String.format("%d", System.currentTimeMillis()); + // Don't actually read off disk because TravisCI doesn't like that + cache.put(KEY, VALUE); + Thread.sleep(2);// To make absolutely sure there is a unique currentTimeMillis + return Long.toString(System.currentTimeMillis()); } }; } }; + manager = new OnHeapNamespaceExtractionCacheManager( + lifecycle, fnCache, new NoopServiceEmitter(), + ImmutableMap., ExtractionNamespaceFunctionFactory>of( + URIExtractionNamespace.class, + factory + ) + ) + { + @Override + protected Runnable getPostRunnable( + final T namespace, + final ExtractionNamespaceFunctionFactory factory, + final String cacheId + ) + { + final Runnable runnable = super.getPostRunnable(namespace, factory, cacheId); + cacheUpdateAlerts.putIfAbsent(namespace.getNamespace(), new Object()); + final Object cacheUpdateAlerter = cacheUpdateAlerts.get(namespace.getNamespace()); + return new Runnable() + { + @Override + public void run() + { + synchronized (cacheUpdateAlerter) { + try { + runnable.run(); + numRuns.incrementAndGet(); + } + finally { + cacheUpdateAlerter.notifyAll(); + } + } + } + }; + } + }; + tmpFile = Files.createTempFile(tmpDir, "druidTestURIExtractionNS", ".dat").toFile(); + try (OutputStream ostream = new FileOutputStream(tmpFile)) { + try (OutputStreamWriter out = new OutputStreamWriter(ostream)) { + // Since Travis sucks with disk related stuff, we override the disk reading part above. + // This is safe and should shake out any problem areas that accidentally read the file. + out.write("SHOULDN'T TRY TO PARSE"); + out.flush(); + } + } + } + + @After + public void tearDown() + { + lifecycle.stop(); + } + + @Test(expected = IAE.class) + public void testDoubleSubmission() + { + URIExtractionNamespace namespace = new URIExtractionNamespace( + "ns", + tmpFile.toURI(), + new URIExtractionNamespace.ObjectMapperFlatDataParser( + URIExtractionNamespaceTest.registerTypes(new ObjectMapper()) + ), + new Period(0), + null + ); + final ListenableFuture future = manager.schedule(namespace); + Assert.assertFalse(future.isDone()); + Assert.assertFalse(future.isCancelled()); + try { + manager.schedule(namespace).cancel(true); + } + finally { + future.cancel(true); + } } - @Test(timeout = 50_000) + @Test(timeout = 60_000) public void testSimpleSubmission() throws ExecutionException, InterruptedException { URIExtractionNamespace namespace = new URIExtractionNamespace( @@ -150,23 +202,16 @@ public class NamespaceExtractionCacheManagerExecutorsTest new Period(0), null ); - try { - NamespaceExtractionCacheManagersTest.waitFor(manager.schedule(namespace)); - } - finally { - lifecycle.stop(); - } + NamespaceExtractionCacheManagersTest.waitFor(manager.schedule(namespace)); } - @Test(timeout = 50_000) + @Test(timeout = 60_000) public void testRepeatSubmission() throws ExecutionException, InterruptedException { final int repeatCount = 5; final long delay = 5; - final AtomicLong ranCount = new AtomicLong(0l); final long totalRunCount; final long start; - final CountDownLatch latch = new CountDownLatch(repeatCount); try { final URIExtractionNamespace namespace = new URIExtractionNamespace( "ns", @@ -177,27 +222,28 @@ public class NamespaceExtractionCacheManagerExecutorsTest new Period(delay), null ); - + cacheUpdateAlerts.putIfAbsent(namespace.getNamespace(), new Object()); start = System.currentTimeMillis(); - final String cacheId = UUID.randomUUID().toString(); - ListenableFuture future = manager.schedule( - namespace, factory, new Runnable() - { - @Override - public void run() - { - try { - manager.getPostRunnable(namespace, factory, cacheId).run(); - ranCount.incrementAndGet(); - } - finally { - latch.countDown(); - } - } - }, - cacheId - ); - latch.await(); + ListenableFuture future = manager.schedule(namespace); + + Assert.assertFalse(future.isDone()); + Assert.assertFalse(future.isCancelled()); + + final long preRunCount; + final Object cacheUpdateAlerter = cacheUpdateAlerts.get(namespace.getNamespace()); + synchronized (cacheUpdateAlerter) { + preRunCount = numRuns.get(); + } + for (; ; ) { + synchronized (cacheUpdateAlerter) { + if (numRuns.get() - preRunCount >= repeatCount) { + break; + } else { + cacheUpdateAlerter.wait(); + } + } + } + long minEnd = start + ((repeatCount - 1) * delay); long end = System.currentTimeMillis(); Assert.assertTrue( @@ -205,148 +251,175 @@ public class NamespaceExtractionCacheManagerExecutorsTest "Didn't wait long enough between runs. Expected more than %d was %d", minEnd - start, end - start - ), minEnd < end + ), minEnd <= end ); } finally { lifecycle.stop(); } - totalRunCount = ranCount.get(); - Thread.sleep(50); - Assert.assertEquals(totalRunCount, ranCount.get(), 1); + + totalRunCount = numRuns.get(); + Thread.sleep(delay * 10); + Assert.assertEquals(totalRunCount, numRuns.get(), 1); } - @Test(timeout = 50_000) - public void testConcurrentDelete() throws ExecutionException, InterruptedException + @Test(timeout = 600_000) // This is very fast when run locally. Speed on Travis completely depends on noisy neighbors. + public void testConcurrentAddDelete() throws ExecutionException, InterruptedException, TimeoutException { - final int threads = 5; - ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threads)); + final int threads = 10; + final int deletesPerThread = 5; + ListeningExecutorService executorService = MoreExecutors.listeningDecorator( + Execs.multiThreaded( + threads, + "concurrentTestingPool-%s" + ) + ); final CountDownLatch latch = new CountDownLatch(threads); Collection> futures = new ArrayList<>(); for (int i = 0; i < threads; ++i) { - final int loopNum = i; - ListenableFuture future = executorService.submit( - new Runnable() - { - @Override - public void run() - { - try { - latch.countDown(); - latch.await(); - for (int j = 0; j < 10; ++j) { - testDelete(String.format("ns-%d", loopNum)); + final int ii = i; + futures.add( + executorService.submit( + new Runnable() + { + @Override + public void run() + { + try { + latch.countDown(); + if (!latch.await(5, TimeUnit.SECONDS)) { + throw new RuntimeException(new TimeoutException("Took too long to wait for more tasks")); + } + for (int j = 0; j < deletesPerThread; ++j) { + testDelete(String.format("ns-%d-%d", ii, j)); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } } } - catch (InterruptedException e) { - throw Throwables.propagate(e); - } - } - } + ) ); } - Futures.allAsList(futures).get(); - executorService.shutdown(); - } - - @Test(timeout = 50_000) - public void testDelete() - throws NoSuchFieldException, IllegalAccessException, InterruptedException, ExecutionException - { + // Create an all-encompassing exception if any of them failed + final Collection exceptions = new ArrayList<>(); try { - testDelete("ns"); + for (ListenableFuture future : futures) { + try { + future.get(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; + } + catch (Exception e) { + exceptions.add(e); + } + } + if (!exceptions.isEmpty()) { + final RuntimeException e = new RuntimeException("Futures failed"); + for (Exception ex : exceptions) { + e.addSuppressed(ex); + } + } } finally { - lifecycle.stop(); + executorService.shutdownNow(); } + checkNoMoreRunning(); + } + + @Test(timeout = 60_000L) + public void testSimpleDelete() throws InterruptedException + { + testDelete("someNamespace"); } public void testDelete(final String ns) throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(5); - final CountDownLatch latchMore = new CountDownLatch(10); + cacheUpdateAlerts.putIfAbsent(ns, new Object()); + final Object cacheUpdateAlerter = cacheUpdateAlerts.get(ns); - final AtomicLong runs = new AtomicLong(0); - long prior = 0; + final long period = 1_000L;// Give it some time between attempts to update final URIExtractionNamespace namespace = new URIExtractionNamespace( ns, tmpFile.toURI(), new URIExtractionNamespace.ObjectMapperFlatDataParser( URIExtractionNamespaceTest.registerTypes(new ObjectMapper()) ), - new Period(1l), + new Period(period), null ); - final String cacheId = UUID.randomUUID().toString(); - final CountDownLatch latchBeforeMore = new CountDownLatch(1); - ListenableFuture future = - manager.schedule( - namespace, factory, new Runnable() - { - @Override - public void run() - { - try { - if (!Thread.interrupted()) { - manager.getPostRunnable(namespace, factory, cacheId).run(); - } else { - Thread.currentThread().interrupt(); - } - if (!Thread.interrupted()) { - runs.incrementAndGet(); - } else { - Thread.currentThread().interrupt(); - } - } - finally { - latch.countDown(); - try { - if (latch.getCount() == 0) { - latchBeforeMore.await(); - } - } - catch (InterruptedException e) { - log.debug("Interrupted"); - Thread.currentThread().interrupt(); - } - finally { - latchMore.countDown(); - } - } - } - }, - cacheId - ); - latch.await(); - prior = runs.get(); - latchBeforeMore.countDown(); + final ListenableFuture future = manager.schedule(namespace); Assert.assertFalse(future.isCancelled()); Assert.assertFalse(future.isDone()); - Assert.assertTrue(fnCache.containsKey(ns)); - latchMore.await(); - Assert.assertTrue(runs.get() > prior); + long start = 0L; + + final long timeout = 45_000L; + do { + synchronized (cacheUpdateAlerter) { + if (!fnCache.containsKey(ns)) { + cacheUpdateAlerter.wait(10_000); + } + } + if (future.isDone()) { + try { + // Bubble up the exception + Assert.assertNull(future.get()); + Assert.fail("Task finished"); + } + catch (ExecutionException e) { + throw Throwables.propagate(e); + } + } + if (!fnCache.containsKey(ns) && System.currentTimeMillis() - start > timeout) { + throw new RuntimeException( + new TimeoutException( + String.format( + "Namespace took too long to appear in cache for %s", + namespace + ) + ) + ); + } + } while (!fnCache.containsKey(ns)); + + Assert.assertEquals(VALUE, manager.getCacheMap(ns).get(KEY)); + + Assert.assertTrue(fnCache.containsKey(ns)); Assert.assertTrue(manager.implData.containsKey(ns)); - manager.delete("ns"); + Assert.assertTrue(manager.delete(ns)); + + try { + Assert.assertNull(future.get()); + } + catch (CancellationException e) { + // Ignore + } + catch (ExecutionException e) { + if (!future.isCancelled()) { + throw Throwables.propagate(e); + } + } + Assert.assertFalse(manager.implData.containsKey(ns)); Assert.assertFalse(fnCache.containsKey(ns)); Assert.assertTrue(future.isCancelled()); Assert.assertTrue(future.isDone()); - prior = runs.get(); - Thread.sleep(20); - Assert.assertEquals(prior, runs.get()); } - @Test(timeout = 50_000) + @Test(timeout = 60_000) public void testShutdown() throws NoSuchFieldException, IllegalAccessException, InterruptedException, ExecutionException { - final CountDownLatch latch = new CountDownLatch(1); + final long period = 5L; final ListenableFuture future; - final AtomicLong runs = new AtomicLong(0); long prior = 0; try { @@ -356,43 +429,34 @@ public class NamespaceExtractionCacheManagerExecutorsTest new URIExtractionNamespace.ObjectMapperFlatDataParser( URIExtractionNamespaceTest.registerTypes(new ObjectMapper()) ), - new Period(1l), + new Period(period), null ); - final String cacheId = UUID.randomUUID().toString(); - final Runnable runnable = manager.getPostRunnable(namespace, factory, cacheId); - future = - manager.schedule( - namespace, factory, new Runnable() - { - @Override - public void run() - { - runnable.run(); - latch.countDown(); - runs.incrementAndGet(); - } - }, - cacheId - ); + cacheUpdateAlerts.putIfAbsent(namespace.getNamespace(), new Object()); + + future = manager.schedule(namespace); + + final Object cacheUpdateAlerter = cacheUpdateAlerts.get(namespace.getNamespace()); + synchronized (cacheUpdateAlerter) { + cacheUpdateAlerter.wait(); + } - latch.await(); Assert.assertFalse(future.isCancelled()); Assert.assertFalse(future.isDone()); - prior = runs.get(); - while (runs.get() <= prior) { - Thread.sleep(50); + + synchronized (cacheUpdateAlerter) { + prior = numRuns.get(); + cacheUpdateAlerter.wait(); } - Assert.assertTrue(runs.get() > prior); + Assert.assertTrue(numRuns.get() > prior); } finally { lifecycle.stop(); } - manager.waitForServiceToEnd(1_000, TimeUnit.MILLISECONDS); + while (!manager.waitForServiceToEnd(1_000, TimeUnit.MILLISECONDS)) { + } - prior = runs.get(); - Thread.sleep(50); - Assert.assertEquals(prior, runs.get()); + checkNoMoreRunning(); Field execField = NamespaceExtractionCacheManager.class.getDeclaredField("listeningScheduledExecutorService"); execField.setAccessible(true); @@ -400,62 +464,48 @@ public class NamespaceExtractionCacheManagerExecutorsTest Assert.assertTrue(((ListeningScheduledExecutorService) execField.get(manager)).isTerminated()); } - @Test(timeout = 50_000) + @Test(timeout = 60_000) public void testRunCount() throws InterruptedException, ExecutionException { - final Lifecycle lifecycle = new Lifecycle(); - final NamespaceExtractionCacheManager onHeap; - final AtomicLong runCount = new AtomicLong(0); - final CountDownLatch latch = new CountDownLatch(1); + final long numWaits = 5; + final ListenableFuture future; try { - onHeap = new OnHeapNamespaceExtractionCacheManager( - lifecycle, - new ConcurrentHashMap>(), - new NoopServiceEmitter(), - ImmutableMap., ExtractionNamespaceFunctionFactory>of( - URIExtractionNamespace.class, - new URIExtractionNamespaceFunctionFactory( - ImmutableMap.of( - "file", - new LocalFileTimestampVersionFinder() - ) - ) - ) - ); - - final URIExtractionNamespace namespace = new URIExtractionNamespace( "ns", tmpFile.toURI(), new URIExtractionNamespace.ObjectMapperFlatDataParser( URIExtractionNamespaceTest.registerTypes(new ObjectMapper()) ), - new Period(1l), + new Period(5l), null ); - final String cacheId = UUID.randomUUID().toString(); - ListenableFuture future = - onHeap.schedule( - namespace, factory, new Runnable() - { - @Override - public void run() - { - manager.getPostRunnable(namespace, factory, cacheId).run(); - latch.countDown(); - runCount.incrementAndGet(); - } - }, - cacheId - ); - latch.await(); - Thread.sleep(20); + + cacheUpdateAlerts.putIfAbsent(namespace.getNamespace(), new Object()); + future = manager.schedule(namespace); + Assert.assertFalse(future.isDone()); + + final Object cacheUpdateAlerter = cacheUpdateAlerts.get(namespace.getNamespace()); + for (int i = 0; i < numWaits; ++i) { + synchronized (cacheUpdateAlerter) { + cacheUpdateAlerter.wait(); + } + } + Assert.assertFalse(future.isDone()); } finally { lifecycle.stop(); } - onHeap.waitForServiceToEnd(1_000, TimeUnit.MILLISECONDS); - Assert.assertTrue(runCount.get() > 5); + while (!manager.waitForServiceToEnd(1_000, TimeUnit.MILLISECONDS)) { + } + Assert.assertTrue(numRuns.get() >= numWaits); + checkNoMoreRunning(); + } + + private void checkNoMoreRunning() throws InterruptedException + { + final long pre = numRuns.get(); + Thread.sleep(100L); + Assert.assertEquals(pre, numRuns.get(), 1); // since we don't synchronize here we might have an extra increment } }