diff --git a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java index facdb380ffa..380c324e3ab 100644 --- a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java +++ b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java @@ -21,11 +21,17 @@ package io.druid.server.namespace.cache; import com.google.common.base.Function; import com.google.common.base.Strings; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; +import com.google.common.io.Closer; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; +import io.druid.concurrent.Execs; import io.druid.metadata.TestDerbyConnector; import io.druid.query.extraction.namespace.ExtractionNamespace; import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory; @@ -42,6 +48,8 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.skife.jdbi.v2.Handle; +import java.io.Closeable; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -50,8 +58,10 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -99,88 +109,198 @@ public class JDBCExtractionNamespaceTest private final Lifecycle lifecycle = new Lifecycle(); private final AtomicLong updates = new AtomicLong(0L); private final Lock updateLock = new ReentrantLock(true); - private Handle handle; + private final Closer closer = Closer.create(); + private final AtomicReference handleRef = new AtomicReference<>(null); + private final ListeningExecutorService setupTeardownService = + MoreExecutors.listeningDecorator(Execs.singleThreaded("JDBCExtractionNamespaceTeardown")); @Before public void setup() throws Exception { - log.info("Setting up"); - handle = derbyConnectorRule.getConnector().getDBI().open(); - Assert.assertEquals( - 0, - handle.createStatement( - String.format( - "CREATE TABLE %s (%s TIMESTAMP, %s VARCHAR(64), %s VARCHAR(64))", - tableName, - tsColumn_, - keyName, - valName - ) - ).setQueryTimeout(1).execute() - ); - handle.createStatement(String.format("TRUNCATE TABLE %s", tableName)).setQueryTimeout(1).execute(); - handle.commit(); - - for (Map.Entry entry : renames.entrySet()) { - insertValues(entry.getKey(), entry.getValue(), "2015-01-01 00:00:00"); - } - - extractionCacheManager = new OnHeapNamespaceExtractionCacheManager( - lifecycle, - fnCache, - reverseFnCache, - new NoopServiceEmitter(), - ImmutableMap., ExtractionNamespaceFunctionFactory>of( - JDBCExtractionNamespace.class, - new JDBCExtractionNamespaceFunctionFactory() + final ListenableFuture setupFuture = setupTeardownService.submit( + new Runnable() + { + @Override + public void run() + { + final Handle handle = derbyConnectorRule.getConnector().getDBI().open(); + handleRef.set(handle); + Assert.assertEquals( + 0, + handle.createStatement( + String.format( + "CREATE TABLE %s (%s TIMESTAMP, %s VARCHAR(64), %s VARCHAR(64))", + tableName, + tsColumn_, + keyName, + valName + ) + ).setQueryTimeout(1).execute() + ); + handle.createStatement(String.format("TRUNCATE TABLE %s", tableName)).setQueryTimeout(1).execute(); + handle.commit(); + closer.register(new Closeable() { @Override - public Callable getCachePopulator( - final JDBCExtractionNamespace namespace, - final String lastVersion, - final Map cache - ) + public void close() throws IOException { - final Callable cachePopulator = super.getCachePopulator(namespace, lastVersion, cache); - return new Callable() - { - @Override - public String call() throws Exception - { - updateLock.lockInterruptibly(); - try { - log.debug("Running cache populator"); - try { - return cachePopulator.call(); - } - finally { - updates.incrementAndGet(); - } - } - finally { - updateLock.unlock(); - } - } - }; + // Register first so it gets run last and checks for cleanup + final NamespaceExtractionCacheManager.NamespaceImplData implData = extractionCacheManager.implData.get( + namespace); + if (implData != null && implData.future != null) { + implData.future.cancel(true); + Assert.assertTrue(implData.future.isDone()); + } + } + }); + closer.register(new Closeable() + { + @Override + public void close() throws IOException + { + handle.createStatement("DROP TABLE " + tableName).setQueryTimeout(1).execute(); + handle.close(); + } + }); + for (Map.Entry entry : renames.entrySet()) { + try { + insertValues(entry.getKey(), entry.getValue(), "2015-01-01 00:00:00"); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); } } - ) + + extractionCacheManager = new OnHeapNamespaceExtractionCacheManager( + lifecycle, + fnCache, + reverseFnCache, + new NoopServiceEmitter(), + ImmutableMap., ExtractionNamespaceFunctionFactory>of( + JDBCExtractionNamespace.class, + new JDBCExtractionNamespaceFunctionFactory() + { + @Override + public Callable getCachePopulator( + final JDBCExtractionNamespace namespace, + final String lastVersion, + final Map cache + ) + { + final Callable cachePopulator = super.getCachePopulator(namespace, lastVersion, cache); + return new Callable() + { + @Override + public String call() throws Exception + { + updateLock.lockInterruptibly(); + try { + log.debug("Running cache populator"); + try { + return cachePopulator.call(); + } + finally { + updates.incrementAndGet(); + } + } + finally { + updateLock.unlock(); + } + } + }; + } + } + ) + ); + try { + lifecycle.start(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + closer.register(new Closeable() + { + @Override + public void close() throws IOException + { + lifecycle.stop(); + } + }); + closer.register(new Closeable() + { + @Override + public void close() throws IOException + { + Assert.assertTrue("Delete failed", extractionCacheManager.delete(namespace)); + } + }); + } + } ); - lifecycle.start(); + final Closer setupCloser = Closer.create(); + setupCloser.register(new Closeable() + { + @Override + public void close() throws IOException + { + if (!setupFuture.isDone() && !setupFuture.cancel(true) && !setupFuture.isDone()) { + throw new IOException("Unable to stop future"); + } + } + }); + try { + setupFuture.get(10, TimeUnit.SECONDS); + } + catch (Throwable t) { + throw setupCloser.rethrow(t); + } + finally { + setupCloser.close(); + } } @After - public void tearDown() throws InterruptedException + public void tearDown() throws InterruptedException, ExecutionException, TimeoutException, IOException { - log.info("Tearing down"); - handle.createStatement("DROP TABLE " + tableName).setQueryTimeout(1).execute(); - handle.close(); - Assert.assertTrue("Delete failed", extractionCacheManager.delete(namespace)); - lifecycle.stop(); - final NamespaceExtractionCacheManager.NamespaceImplData implData = extractionCacheManager.implData.get(namespace); - if (implData != null && implData.future != null) { - implData.future.cancel(true); - Assert.assertTrue(implData.future.isDone()); + final Closer tearDownCloser = Closer.create(); + tearDownCloser.register(new Closeable() + { + @Override + public void close() throws IOException + { + setupTeardownService.shutdownNow(); + try { + setupTeardownService.awaitTermination(10, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted", e); + } + } + }); + try { + setupTeardownService.submit( + new Runnable() + { + @Override + public void run() + { + try { + closer.close(); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + } + ).get(10, TimeUnit.SECONDS); + } + catch (Throwable t) { + throw closer.rethrow(t); + } + finally { + closer.close(); } } @@ -188,7 +308,7 @@ public class JDBCExtractionNamespaceTest { final String query; if (tsColumn == null) { - handle.createStatement( + handleRef.get().createStatement( String.format("DELETE FROM %s WHERE %s='%s'", tableName, keyName, key) ).setQueryTimeout(1).execute(); query = String.format( @@ -205,8 +325,8 @@ public class JDBCExtractionNamespaceTest updateTs, key, val ); } - Assert.assertEquals(1, handle.createStatement(query).setQueryTimeout(1).execute()); - handle.commit(); + Assert.assertEquals(1, handleRef.get().createStatement(query).setQueryTimeout(1).execute()); + handleRef.get().commit(); // Some internals have timing resolution no better than MS. This is to help make sure that checks for timings // have elapsed at least to the next ms... 2 is for good measure. Thread.sleep(2); @@ -251,13 +371,31 @@ public class JDBCExtractionNamespaceTest ); NamespaceExtractionCacheManagersTest.waitFor(extractionCacheManager.schedule(extractionNamespace)); Function> reverseExtractionFn = reverseFnCache.get(extractionNamespace.getNamespace()); - Assert.assertEquals("reverse lookup should match", Sets.newHashSet("foo", "bad"), Sets.newHashSet(reverseExtractionFn.apply("bar"))); - Assert.assertEquals("reverse lookup should match", Sets.newHashSet("how about that"), Sets.newHashSet(reverseExtractionFn.apply("foo"))); - Assert.assertEquals("reverse lookup should match", Sets.newHashSet("empty string"), Sets.newHashSet(reverseExtractionFn.apply(""))); - Assert.assertEquals("null is same as empty string", Sets.newHashSet("empty string"), Sets.newHashSet(reverseExtractionFn.apply(null))); - Assert.assertEquals("reverse lookup of none existing value should be empty list", - Collections.EMPTY_LIST, - reverseExtractionFn.apply("does't exist")); + Assert.assertEquals( + "reverse lookup should match", + Sets.newHashSet("foo", "bad"), + Sets.newHashSet(reverseExtractionFn.apply("bar")) + ); + Assert.assertEquals( + "reverse lookup should match", + Sets.newHashSet("how about that"), + Sets.newHashSet(reverseExtractionFn.apply("foo")) + ); + Assert.assertEquals( + "reverse lookup should match", + Sets.newHashSet("empty string"), + Sets.newHashSet(reverseExtractionFn.apply("")) + ); + Assert.assertEquals( + "null is same as empty string", + Sets.newHashSet("empty string"), + Sets.newHashSet(reverseExtractionFn.apply(null)) + ); + Assert.assertEquals( + "reverse lookup of none existing value should be empty list", + Collections.EMPTY_LIST, + reverseExtractionFn.apply("does't exist") + ); } @Test(timeout = 10_000L) diff --git a/server/src/test/java/io/druid/metadata/TestDerbyConnector.java b/server/src/test/java/io/druid/metadata/TestDerbyConnector.java index 20d069612ae..53016f995a6 100644 --- a/server/src/test/java/io/druid/metadata/TestDerbyConnector.java +++ b/server/src/test/java/io/druid/metadata/TestDerbyConnector.java @@ -73,7 +73,7 @@ public class TestDerbyConnector extends DerbyConnector { return jdbcUri; } - + public static class DerbyConnectorRule extends ExternalResource { private TestDerbyConnector connector; @@ -82,7 +82,14 @@ public class TestDerbyConnector extends DerbyConnector public DerbyConnectorRule() { - this(Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase("druidTest"))); + this("druidTest" + dbSafeUUID()); + } + + private DerbyConnectorRule( + final String defaultBase + ) + { + this(Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase(defaultBase))); } public DerbyConnectorRule( diff --git a/server/src/test/java/io/druid/server/audit/SQLAuditManagerTest.java b/server/src/test/java/io/druid/server/audit/SQLAuditManagerTest.java index 7374eac8c29..50974954241 100644 --- a/server/src/test/java/io/druid/server/audit/SQLAuditManagerTest.java +++ b/server/src/test/java/io/druid/server/audit/SQLAuditManagerTest.java @@ -63,7 +63,7 @@ public class SQLAuditManagerTest ); } - @Test + @Test(timeout = 10_000L) public void testAuditEntrySerde() throws IOException { AuditEntry entry = new AuditEntry( @@ -82,7 +82,7 @@ public class SQLAuditManagerTest Assert.assertEquals(entry, serde); } - @Test + @Test(timeout = 10_000L) public void testCreateAuditEntry() throws IOException { AuditEntry entry = new AuditEntry( @@ -108,7 +108,7 @@ public class SQLAuditManagerTest } - @Test + @Test(timeout = 10_000L) public void testFetchAuditHistory() throws IOException { AuditEntry entry = new AuditEntry( @@ -136,7 +136,7 @@ public class SQLAuditManagerTest Assert.assertEquals(entry, auditEntries.get(1)); } - @Test + @Test(timeout = 10_000L) public void testFetchAuditHistoryByKeyAndTypeWithLimit() throws IOException { AuditEntry entry1 = new AuditEntry( @@ -172,7 +172,7 @@ public class SQLAuditManagerTest Assert.assertEquals(entry1, auditEntries.get(0)); } - @Test + @Test(timeout = 10_000L) public void testFetchAuditHistoryByTypeWithLimit() throws IOException { AuditEntry entry1 = new AuditEntry( @@ -220,13 +220,13 @@ public class SQLAuditManagerTest Assert.assertEquals(entry2, auditEntries.get(1)); } - @Test(expected=IllegalArgumentException.class) + @Test(expected=IllegalArgumentException.class, timeout = 10_000L) public void testFetchAuditHistoryLimitBelowZero() throws IOException { auditManager.fetchAuditHistory("testType", -1); } - @Test(expected=IllegalArgumentException.class) + @Test(expected=IllegalArgumentException.class, timeout = 10_000L) public void testFetchAuditHistoryLimitZero() throws IOException { auditManager.fetchAuditHistory("testType", 0); @@ -240,7 +240,7 @@ public class SQLAuditManagerTest private void dropTable(final String tableName) { - connector.getDBI().withHandle( + Assert.assertNull(connector.getDBI().withHandle( new HandleCallback() { @Override @@ -251,6 +251,6 @@ public class SQLAuditManagerTest return null; } } - ); + )); } }