fix timeout test race condition

This commit is contained in:
Xavier Léauté 2014-06-17 09:00:07 -07:00
parent 964f12b7d6
commit 9b03f5b13a
1 changed files with 22 additions and 12 deletions

View File

@ -65,6 +65,7 @@ public class ChainedExecutionQueryRunnerTest
);
final CountDownLatch queriesStarted = new CountDownLatch(2);
final CountDownLatch queriesInterrupted = new CountDownLatch(2);
final CountDownLatch queryIsRegistered = new CountDownLatch(1);
Capture<ListenableFuture> capturedFuture = new Capture<>();
@ -86,9 +87,9 @@ public class ChainedExecutionQueryRunnerTest
EasyMock.replay(watcher);
DyingQueryRunner runner1 = new DyingQueryRunner(queriesStarted);
DyingQueryRunner runner2 = new DyingQueryRunner(queriesStarted);
DyingQueryRunner runner3 = new DyingQueryRunner(queriesStarted);
DyingQueryRunner runner1 = new DyingQueryRunner(queriesStarted, queriesInterrupted);
DyingQueryRunner runner2 = new DyingQueryRunner(queriesStarted, queriesInterrupted);
DyingQueryRunner runner3 = new DyingQueryRunner(queriesStarted, queriesInterrupted);
ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>(
exec,
Ordering.<Integer>natural(),
@ -135,6 +136,7 @@ public class ChainedExecutionQueryRunnerTest
Assert.assertTrue(e.getCause() instanceof QueryInterruptedException);
cause = (QueryInterruptedException)e.getCause();
}
Assert.assertTrue(queriesInterrupted.await(500, TimeUnit.MILLISECONDS));
Assert.assertNotNull(cause);
Assert.assertTrue(future.isCancelled());
Assert.assertTrue(runner1.hasStarted);
@ -170,6 +172,7 @@ public class ChainedExecutionQueryRunnerTest
);
final CountDownLatch queriesStarted = new CountDownLatch(2);
final CountDownLatch queriesInterrupted = new CountDownLatch(2);
final CountDownLatch queryIsRegistered = new CountDownLatch(1);
Capture<ListenableFuture> capturedFuture = new Capture<>();
@ -191,9 +194,9 @@ public class ChainedExecutionQueryRunnerTest
EasyMock.replay(watcher);
DyingQueryRunner runner1 = new DyingQueryRunner(queriesStarted);
DyingQueryRunner runner2 = new DyingQueryRunner(queriesStarted);
DyingQueryRunner runner3 = new DyingQueryRunner(queriesStarted);
DyingQueryRunner runner1 = new DyingQueryRunner(queriesStarted, queriesInterrupted);
DyingQueryRunner runner2 = new DyingQueryRunner(queriesStarted, queriesInterrupted);
DyingQueryRunner runner3 = new DyingQueryRunner(queriesStarted, queriesInterrupted);
ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>(
exec,
Ordering.<Integer>natural(),
@ -210,7 +213,7 @@ public class ChainedExecutionQueryRunnerTest
.dataSource("test")
.intervals("2014/2015")
.aggregators(Lists.<AggregatorFactory>newArrayList(new CountAggregatorFactory("count")))
.context(ImmutableMap.<String, Object>of("timeout", (100), "queryId", "test"))
.context(ImmutableMap.<String, Object>of("timeout", 100, "queryId", "test"))
.build()
);
@ -229,10 +232,10 @@ public class ChainedExecutionQueryRunnerTest
Assert.assertTrue(queryIsRegistered.await(1, TimeUnit.SECONDS));
Assert.assertTrue(queriesStarted.await(1, TimeUnit.SECONDS));
// cancel the query
Assert.assertTrue(capturedFuture.hasCaptured());
ListenableFuture future = capturedFuture.getValue();
// wait for query to time out
QueryInterruptedException cause = null;
try {
resultFuture.get();
@ -241,6 +244,7 @@ public class ChainedExecutionQueryRunnerTest
Assert.assertEquals("Query timeout", e.getCause().getMessage());
cause = (QueryInterruptedException)e.getCause();
}
Assert.assertTrue(queriesInterrupted.await(500, TimeUnit.MILLISECONDS));
Assert.assertNotNull(cause);
Assert.assertTrue(future.isCancelled());
Assert.assertTrue(runner1.hasStarted);
@ -257,23 +261,27 @@ public class ChainedExecutionQueryRunnerTest
private static class DyingQueryRunner implements QueryRunner<Integer>
{
private final CountDownLatch latch;
private final CountDownLatch start;
private final CountDownLatch stop;
private boolean hasStarted = false;
private boolean hasCompleted = false;
private boolean interrupted = false;
public DyingQueryRunner(CountDownLatch latch)
public DyingQueryRunner(CountDownLatch start, CountDownLatch stop)
{
this.latch = latch;
this.start = start;
this.stop = stop;
}
@Override
public Sequence<Integer> run(Query<Integer> query)
{
hasStarted = true;
latch.countDown();
start.countDown();
if (Thread.interrupted()) {
interrupted = true;
stop.countDown();
throw new QueryInterruptedException("I got killed");
}
@ -283,10 +291,12 @@ public class ChainedExecutionQueryRunnerTest
}
catch (InterruptedException e) {
interrupted = true;
stop.countDown();
throw new QueryInterruptedException("I got killed");
}
hasCompleted = true;
stop.countDown();
return Sequences.simple(Lists.newArrayList(123));
}
}