diff --git a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java index 8b05a77d1a2..cdab411e10f 100644 --- a/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java +++ b/extensions/namespace-lookup/src/test/java/io/druid/server/namespace/cache/JDBCExtractionNamespaceTest.java @@ -42,7 +42,6 @@ import org.joda.time.Period; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -62,7 +61,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -111,21 +109,20 @@ public class JDBCExtractionNamespaceTest private final AtomicLong updates = new AtomicLong(0L); private final Lock updateLock = new ReentrantLock(true); private final Closer closer = Closer.create(); - private final AtomicReference handleRef = new AtomicReference<>(null); private final ListeningExecutorService setupTeardownService = - MoreExecutors.listeningDecorator(Execs.singleThreaded("JDBCExtractionNamespaceTeardown")); + MoreExecutors.listeningDecorator(Execs.multiThreaded(2, "JDBCExtractionNamespaceTeardown--%s")); + private Handle handleRef = null; @Before public void setup() throws Exception { - final ListenableFuture setupFuture = setupTeardownService.submit( - new Runnable() + final ListenableFuture setupFuture = setupTeardownService.submit( + new Callable() { @Override - public void run() + public Handle call() { final Handle handle = derbyConnectorRule.getConnector().getDBI().open(); - handleRef.set(handle); Assert.assertEquals( 0, handle.createStatement( @@ -146,7 +143,27 @@ public class JDBCExtractionNamespaceTest public void close() throws IOException { handle.createStatement("DROP TABLE " + tableName).setQueryTimeout(1).execute(); - handle.close(); + final ListenableFuture future = setupTeardownService.submit(new Runnable() + { + @Override + public void run() + { + handle.close(); + } + }); + try (Closeable closeable = new Closeable() + { + @Override + public void close() throws IOException + { + future.cancel(true); + } + }) { + future.get(10, TimeUnit.SECONDS); + } + catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new IOException("Error closing handle", e); + } } }); closer.register(new Closeable() @@ -154,7 +171,9 @@ public class JDBCExtractionNamespaceTest @Override public void close() throws IOException { - // Register first so it gets run last and checks for cleanup + if (extractionCacheManager == null) { + return; + } final NamespaceExtractionCacheManager.NamespaceImplData implData = extractionCacheManager.implData.get( namespace); if (implData != null && implData.future != null) { @@ -165,7 +184,7 @@ public class JDBCExtractionNamespaceTest }); for (Map.Entry entry : renames.entrySet()) { try { - insertValues(entry.getKey(), entry.getValue(), "2015-01-01 00:00:00"); + insertValues(handle, entry.getKey(), entry.getValue(), "2015-01-01 00:00:00"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -226,99 +245,103 @@ public class JDBCExtractionNamespaceTest @Override public void close() throws IOException { - lifecycle.stop(); - } - } - ); - closer.register( - new Closeable() - { - @Override - public void close() throws IOException - { - Assert.assertTrue("Delete failed", extractionCacheManager.delete(namespace)); + final ListenableFuture future = setupTeardownService.submit( + new Runnable() + { + @Override + public void run() + { + lifecycle.stop(); + } + } + ); + try (final Closeable closeable = new Closeable() + { + @Override + public void close() throws IOException + { + future.cancel(true); + } + }) { + future.get(30, TimeUnit.SECONDS); + } + catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new IOException("Error stopping lifecycle", e); + } } } ); + return handle; } } ); - final Closer setupCloser = Closer.create(); - setupCloser.register( - new Closeable() - { - @Override - public void close() throws IOException - { - if (!setupFuture.isDone() && !setupFuture.cancel(true) && !setupFuture.isDone()) { - throw new IOException("Unable to stop future"); - } - } - } - ); - try { - setupFuture.get(10, TimeUnit.SECONDS); - } - catch (Throwable t) { - throw setupCloser.rethrow(t); - } - finally { - setupCloser.close(); + + try (final Closeable closeable = + new Closeable() + { + @Override + public void close() throws IOException + { + if (!setupFuture.isDone() && !setupFuture.cancel(true) && !setupFuture.isDone()) { + throw new IOException("Unable to stop future"); + } + } + }) { + handleRef = setupFuture.get(10, TimeUnit.SECONDS); } + Assert.assertNotNull(handleRef); } @After public void tearDown() throws InterruptedException, ExecutionException, TimeoutException, IOException { - final Closer tearDownCloser = Closer.create(); - tearDownCloser.register( - new Closeable() + final ListenableFuture tearDownFuture = setupTeardownService.submit( + new Runnable() { @Override - public void close() throws IOException + public void run() { - setupTeardownService.shutdownNow(); try { - setupTeardownService.awaitTermination(60, TimeUnit.SECONDS); + closer.close(); } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted", e); + catch (IOException e) { + throw Throwables.propagate(e); } } } ); - - try { - setupTeardownService.submit( - new Runnable() - { - @Override - public void run() - { - try { - closer.close(); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } + try (final Closeable closeable = new Closeable() + { + @Override + public void close() throws IOException + { + setupTeardownService.shutdownNow(); + try { + if (!setupTeardownService.awaitTermination(60, TimeUnit.SECONDS)) { + log.error("Tear down service didn't finish"); } - ).get(60, TimeUnit.SECONDS); - } - catch (Throwable t) { - throw closer.rethrow(t); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted", e); + } + } + }) { + tearDownFuture.get(60, TimeUnit.SECONDS); } finally { - closer.close(); + if (Thread.interrupted()) { + log.info("Thread was interrupted. Clearing interrupt and continuing."); + } } } - private void insertValues(final String key, final String val, final String updateTs) throws InterruptedException + private void insertValues(final Handle handle, final String key, final String val, final String updateTs) + throws InterruptedException { final String query; if (tsColumn == null) { - handleRef.get().createStatement( + handle.createStatement( String.format("DELETE FROM %s WHERE %s='%s'", tableName, keyName, key) ).setQueryTimeout(1).execute(); query = String.format( @@ -335,8 +358,8 @@ public class JDBCExtractionNamespaceTest updateTs, key, val ); } - Assert.assertEquals(1, handleRef.get().createStatement(query).setQueryTimeout(1).execute()); - handleRef.get().commit(); + Assert.assertEquals(1, handle.createStatement(query).setQueryTimeout(1).execute()); + handle.commit(); // Some internals have timing resolution no better than MS. This is to help make sure that checks for timings // have elapsed at least to the next ms... 2 is for good measure. Thread.sleep(2); @@ -417,7 +440,7 @@ public class JDBCExtractionNamespaceTest assertUpdated(extractionNamespace.getNamespace(), "foo", "bar"); if (tsColumn != null) { - insertValues("foo", "baz", "1900-01-01 00:00:00"); + insertValues(handleRef, "foo", "baz", "1900-01-01 00:00:00"); } assertUpdated(extractionNamespace.getNamespace(), "foo", "bar"); @@ -431,7 +454,7 @@ public class JDBCExtractionNamespaceTest assertUpdated(extractionNamespace.getNamespace(), "foo", "bar"); - insertValues("foo", "baz", "2900-01-01 00:00:00"); + insertValues(handleRef, "foo", "baz", "2900-01-01 00:00:00"); assertUpdated(extractionNamespace.getNamespace(), "foo", "baz"); }