Merge pull request #2229 from metamx/improveTimeoutsJDBC

[WIP] Make timeouts more explicit on what is failing in JDBCExtractionNamespaceTest
This commit is contained in:
Fangjin Yang 2016-01-08 10:36:46 -08:00
commit 0d12e7024b
1 changed files with 101 additions and 78 deletions

View File

@ -42,7 +42,6 @@ import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -62,7 +61,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -111,21 +109,20 @@ public class JDBCExtractionNamespaceTest
private final AtomicLong updates = new AtomicLong(0L);
private final Lock updateLock = new ReentrantLock(true);
private final Closer closer = Closer.create();
private final AtomicReference<Handle> handleRef = new AtomicReference<>(null);
private final ListeningExecutorService setupTeardownService =
MoreExecutors.listeningDecorator(Execs.singleThreaded("JDBCExtractionNamespaceTeardown"));
MoreExecutors.listeningDecorator(Execs.multiThreaded(2, "JDBCExtractionNamespaceTeardown--%s"));
private Handle handleRef = null;
@Before
public void setup() throws Exception
{
final ListenableFuture<?> setupFuture = setupTeardownService.submit(
new Runnable()
final ListenableFuture<Handle> setupFuture = setupTeardownService.submit(
new Callable<Handle>()
{
@Override
public void run()
public Handle call()
{
final Handle handle = derbyConnectorRule.getConnector().getDBI().open();
handleRef.set(handle);
Assert.assertEquals(
0,
handle.createStatement(
@ -146,7 +143,27 @@ public class JDBCExtractionNamespaceTest
public void close() throws IOException
{
handle.createStatement("DROP TABLE " + tableName).setQueryTimeout(1).execute();
handle.close();
final ListenableFuture future = setupTeardownService.submit(new Runnable()
{
@Override
public void run()
{
handle.close();
}
});
try (Closeable closeable = new Closeable()
{
@Override
public void close() throws IOException
{
future.cancel(true);
}
}) {
future.get(10, TimeUnit.SECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new IOException("Error closing handle", e);
}
}
});
closer.register(new Closeable()
@ -154,7 +171,9 @@ public class JDBCExtractionNamespaceTest
@Override
public void close() throws IOException
{
// Register first so it gets run last and checks for cleanup
if (extractionCacheManager == null) {
return;
}
final NamespaceExtractionCacheManager.NamespaceImplData implData = extractionCacheManager.implData.get(
namespace);
if (implData != null && implData.future != null) {
@ -165,7 +184,7 @@ public class JDBCExtractionNamespaceTest
});
for (Map.Entry<String, String> entry : renames.entrySet()) {
try {
insertValues(entry.getKey(), entry.getValue(), "2015-01-01 00:00:00");
insertValues(handle, entry.getKey(), entry.getValue(), "2015-01-01 00:00:00");
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
@ -226,99 +245,103 @@ public class JDBCExtractionNamespaceTest
@Override
public void close() throws IOException
{
lifecycle.stop();
}
}
);
closer.register(
new Closeable()
{
@Override
public void close() throws IOException
{
Assert.assertTrue("Delete failed", extractionCacheManager.delete(namespace));
final ListenableFuture future = setupTeardownService.submit(
new Runnable()
{
@Override
public void run()
{
lifecycle.stop();
}
}
);
try (final Closeable closeable = new Closeable()
{
@Override
public void close() throws IOException
{
future.cancel(true);
}
}) {
future.get(30, TimeUnit.SECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new IOException("Error stopping lifecycle", e);
}
}
}
);
return handle;
}
}
);
final Closer setupCloser = Closer.create();
setupCloser.register(
new Closeable()
{
@Override
public void close() throws IOException
{
if (!setupFuture.isDone() && !setupFuture.cancel(true) && !setupFuture.isDone()) {
throw new IOException("Unable to stop future");
}
}
}
);
try {
setupFuture.get(10, TimeUnit.SECONDS);
}
catch (Throwable t) {
throw setupCloser.rethrow(t);
}
finally {
setupCloser.close();
try (final Closeable closeable =
new Closeable()
{
@Override
public void close() throws IOException
{
if (!setupFuture.isDone() && !setupFuture.cancel(true) && !setupFuture.isDone()) {
throw new IOException("Unable to stop future");
}
}
}) {
handleRef = setupFuture.get(10, TimeUnit.SECONDS);
}
Assert.assertNotNull(handleRef);
}
@After
public void tearDown() throws InterruptedException, ExecutionException, TimeoutException, IOException
{
final Closer tearDownCloser = Closer.create();
tearDownCloser.register(
new Closeable()
final ListenableFuture<?> tearDownFuture = setupTeardownService.submit(
new Runnable()
{
@Override
public void close() throws IOException
public void run()
{
setupTeardownService.shutdownNow();
try {
setupTeardownService.awaitTermination(60, TimeUnit.SECONDS);
closer.close();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted", e);
catch (IOException e) {
throw Throwables.propagate(e);
}
}
}
);
try {
setupTeardownService.submit(
new Runnable()
{
@Override
public void run()
{
try {
closer.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
try (final Closeable closeable = new Closeable()
{
@Override
public void close() throws IOException
{
setupTeardownService.shutdownNow();
try {
if (!setupTeardownService.awaitTermination(60, TimeUnit.SECONDS)) {
log.error("Tear down service didn't finish");
}
).get(60, TimeUnit.SECONDS);
}
catch (Throwable t) {
throw closer.rethrow(t);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted", e);
}
}
}) {
tearDownFuture.get(60, TimeUnit.SECONDS);
}
finally {
closer.close();
if (Thread.interrupted()) {
log.info("Thread was interrupted. Clearing interrupt and continuing.");
}
}
}
private void insertValues(final String key, final String val, final String updateTs) throws InterruptedException
private void insertValues(final Handle handle, final String key, final String val, final String updateTs)
throws InterruptedException
{
final String query;
if (tsColumn == null) {
handleRef.get().createStatement(
handle.createStatement(
String.format("DELETE FROM %s WHERE %s='%s'", tableName, keyName, key)
).setQueryTimeout(1).execute();
query = String.format(
@ -335,8 +358,8 @@ public class JDBCExtractionNamespaceTest
updateTs, key, val
);
}
Assert.assertEquals(1, handleRef.get().createStatement(query).setQueryTimeout(1).execute());
handleRef.get().commit();
Assert.assertEquals(1, handle.createStatement(query).setQueryTimeout(1).execute());
handle.commit();
// Some internals have timing resolution no better than MS. This is to help make sure that checks for timings
// have elapsed at least to the next ms... 2 is for good measure.
Thread.sleep(2);
@ -417,7 +440,7 @@ public class JDBCExtractionNamespaceTest
assertUpdated(extractionNamespace.getNamespace(), "foo", "bar");
if (tsColumn != null) {
insertValues("foo", "baz", "1900-01-01 00:00:00");
insertValues(handleRef, "foo", "baz", "1900-01-01 00:00:00");
}
assertUpdated(extractionNamespace.getNamespace(), "foo", "bar");
@ -431,7 +454,7 @@ public class JDBCExtractionNamespaceTest
assertUpdated(extractionNamespace.getNamespace(), "foo", "bar");
insertValues("foo", "baz", "2900-01-01 00:00:00");
insertValues(handleRef, "foo", "baz", "2900-01-01 00:00:00");
assertUpdated(extractionNamespace.getNamespace(), "foo", "baz");
}