properly kill timed out queries

This commit is contained in:
Xavier Léauté 2014-06-03 11:15:03 -07:00
parent d0f9c438f8
commit 97d5455f3a
2 changed files with 107 additions and 0 deletions

View File

@ -172,6 +172,8 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
throw new QueryInterruptedException("Query cancelled"); throw new QueryInterruptedException("Query cancelled");
} }
catch(TimeoutException e) { catch(TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query timeout"); throw new QueryInterruptedException("Query timeout");
} }
catch (ExecutionException e) { catch (ExecutionException e) {

View File

@ -19,6 +19,7 @@
package io.druid.query; package io.druid.query;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
@ -146,6 +147,110 @@ public class ChainedExecutionQueryRunnerTest
EasyMock.verify(watcher); EasyMock.verify(watcher);
} }
@Test
public void testQueryTimeout() throws Exception
{
ExecutorService exec = PrioritizedExecutorService.create(
new Lifecycle(), new ExecutorServiceConfig()
{
@Override
public String getFormatString()
{
return "test";
}
@Override
public int getNumThreads()
{
return 2;
}
}
);
final CountDownLatch queriesStarted = new CountDownLatch(2);
final CountDownLatch queryIsRegistered = new CountDownLatch(1);
Capture<ListenableFuture> capturedFuture = new Capture<>();
QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class);
watcher.registerQuery(EasyMock.<Query>anyObject(), EasyMock.and(EasyMock.<ListenableFuture>anyObject(), EasyMock.capture(capturedFuture)));
EasyMock.expectLastCall()
.andAnswer(
new IAnswer<Void>()
{
@Override
public Void answer() throws Throwable
{
queryIsRegistered.countDown();
return null;
}
}
)
.once();
EasyMock.replay(watcher);
DyingQueryRunner runner1 = new DyingQueryRunner(queriesStarted);
DyingQueryRunner runner2 = new DyingQueryRunner(queriesStarted);
DyingQueryRunner runner3 = new DyingQueryRunner(queriesStarted);
ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>(
exec,
Ordering.<Integer>natural(),
watcher,
Lists.<QueryRunner<Integer>>newArrayList(
runner1,
runner2,
runner3
)
);
final Sequence seq = chainedRunner.run(
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals("2014/2015")
.aggregators(Lists.<AggregatorFactory>newArrayList(new CountAggregatorFactory("count")))
.context(ImmutableMap.<String, Object>of("timeout", (100), "queryId", "test"))
.build()
);
Future resultFuture = Executors.newFixedThreadPool(1).submit(
new Runnable()
{
@Override
public void run()
{
Sequences.toList(seq, Lists.newArrayList());
}
}
);
// wait for query to register and start
Assert.assertTrue(queryIsRegistered.await(1, TimeUnit.SECONDS));
Assert.assertTrue(queriesStarted.await(1, TimeUnit.SECONDS));
// cancel the query
Assert.assertTrue(capturedFuture.hasCaptured());
ListenableFuture future = capturedFuture.getValue();
QueryInterruptedException cause = null;
try {
resultFuture.get();
} catch(ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof QueryInterruptedException);
Assert.assertEquals("Query timeout", e.getCause().getMessage());
cause = (QueryInterruptedException)e.getCause();
}
Assert.assertNotNull(cause);
Assert.assertTrue(future.isCancelled());
Assert.assertTrue(runner1.hasStarted);
Assert.assertTrue(runner2.hasStarted);
Assert.assertFalse(runner3.hasStarted);
Assert.assertFalse(runner1.hasCompleted);
Assert.assertFalse(runner2.hasCompleted);
Assert.assertFalse(runner3.hasCompleted);
EasyMock.verify(watcher);
}
private static class DyingQueryRunner implements QueryRunner<Integer> private static class DyingQueryRunner implements QueryRunner<Integer>
{ {
private final CountDownLatch latch; private final CountDownLatch latch;