From 97d5455f3a455fecbb1296f60fdf038ef040f862 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 3 Jun 2014 11:15:03 -0700 Subject: [PATCH] properly kill timed out queries --- .../query/ChainedExecutionQueryRunner.java | 2 + .../ChainedExecutionQueryRunnerTest.java | 105 ++++++++++++++++++ 2 files changed, 107 insertions(+) diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 256cefa33a0..8a5ed51a4df 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -172,6 +172,8 @@ public class ChainedExecutionQueryRunner implements QueryRunner throw new QueryInterruptedException("Query cancelled"); } catch(TimeoutException e) { + log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); + futures.cancel(true); throw new QueryInterruptedException("Query timeout"); } catch (ExecutionException e) { diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java index b5391605d32..f2555dd7214 100644 --- a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -19,6 +19,7 @@ package io.druid.query; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.util.concurrent.ListenableFuture; @@ -146,6 +147,110 @@ public class ChainedExecutionQueryRunnerTest EasyMock.verify(watcher); } + @Test + public void testQueryTimeout() throws Exception + { + ExecutorService exec = PrioritizedExecutorService.create( + new Lifecycle(), new ExecutorServiceConfig() + { + @Override + public String getFormatString() + { + return "test"; + } + + @Override + public int getNumThreads() + { + return 2; + } + } + ); + + final CountDownLatch queriesStarted = new CountDownLatch(2); + final CountDownLatch queryIsRegistered = new CountDownLatch(1); + + Capture capturedFuture = new Capture<>(); + QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class); + watcher.registerQuery(EasyMock.anyObject(), EasyMock.and(EasyMock.anyObject(), EasyMock.capture(capturedFuture))); + EasyMock.expectLastCall() + .andAnswer( + new IAnswer() + { + @Override + public Void answer() throws Throwable + { + queryIsRegistered.countDown(); + return null; + } + } + ) + .once(); + + EasyMock.replay(watcher); + + DyingQueryRunner runner1 = new DyingQueryRunner(queriesStarted); + DyingQueryRunner runner2 = new DyingQueryRunner(queriesStarted); + DyingQueryRunner runner3 = new DyingQueryRunner(queriesStarted); + ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>( + exec, + Ordering.natural(), + watcher, + Lists.>newArrayList( + runner1, + runner2, + runner3 + ) + ); + + final Sequence seq = chainedRunner.run( + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .intervals("2014/2015") + .aggregators(Lists.newArrayList(new CountAggregatorFactory("count"))) + .context(ImmutableMap.of("timeout", (100), "queryId", "test")) + .build() + ); + + Future resultFuture = Executors.newFixedThreadPool(1).submit( + new Runnable() + { + @Override + public void run() + { + Sequences.toList(seq, Lists.newArrayList()); + } + } + ); + + // wait for query to register and start + Assert.assertTrue(queryIsRegistered.await(1, TimeUnit.SECONDS)); + Assert.assertTrue(queriesStarted.await(1, TimeUnit.SECONDS)); + + // cancel the query + Assert.assertTrue(capturedFuture.hasCaptured()); + ListenableFuture future = capturedFuture.getValue(); + + QueryInterruptedException cause = null; + try { + resultFuture.get(); + } catch(ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof QueryInterruptedException); + Assert.assertEquals("Query timeout", e.getCause().getMessage()); + cause = (QueryInterruptedException)e.getCause(); + } + Assert.assertNotNull(cause); + Assert.assertTrue(future.isCancelled()); + Assert.assertTrue(runner1.hasStarted); + Assert.assertTrue(runner2.hasStarted); + Assert.assertFalse(runner3.hasStarted); + Assert.assertFalse(runner1.hasCompleted); + Assert.assertFalse(runner2.hasCompleted); + Assert.assertFalse(runner3.hasCompleted); + + EasyMock.verify(watcher); + } + private static class DyingQueryRunner implements QueryRunner { private final CountDownLatch latch;