From d6f38827db453f31dd1e082e31dbcaade23b085a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 8 Apr 2014 17:13:22 -0700 Subject: [PATCH 01/19] initial query cancellation commit --- .../DruidDefaultSerializersModule.java | 1 + .../query/ChainedExecutionQueryRunner.java | 131 ++++++++-------- .../query/QueryInterruptedException.java | 37 +++++ .../java/io/druid/query/QueryWatcher.java | 27 ++++ .../search/SearchQueryRunnerFactory.java | 8 +- .../select/SelectQueryRunnerFactory.java | 18 ++- .../TimeBoundaryQueryRunnerFactory.java | 11 +- .../TimeseriesQueryRunnerFactory.java | 22 ++- .../query/topn/TopNQueryRunnerFactory.java | 8 +- .../ChainedExecutionQueryRunnerTest.java | 148 ++++++++++++++++++ .../java/io/druid/query/TestQueryRunners.java | 30 +++- .../query/search/SearchQueryRunnerTest.java | 12 +- .../TimeBoundaryQueryRunnerTest.java | 12 +- .../druid/query/topn/TopNQueryRunnerTest.java | 23 ++- .../druid/guice/QueryRunnerFactoryModule.java | 8 + .../java/io/druid/server/QueryManager.java | 48 ++++++ .../java/io/druid/server/QueryResource.java | 21 ++- 17 files changed, 481 insertions(+), 84 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/QueryInterruptedException.java create mode 100644 processing/src/main/java/io/druid/query/QueryWatcher.java create mode 100644 processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java create mode 100644 server/src/main/java/io/druid/server/QueryManager.java diff --git a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java index 38c3bc135ef..068bfecd963 100644 --- a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java @@ -32,6 +32,7 @@ import com.google.common.base.Throwables; import com.metamx.common.Granularity; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Yielder; import org.joda.time.DateTimeZone; import java.io.IOException; diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 31eac12702e..6c12d37669b 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -25,6 +25,10 @@ import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.ISE; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.MergeIterable; @@ -35,11 +39,8 @@ import com.metamx.common.logger.Logger; import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; /** * A QueryRunner that combines a list of other QueryRunners and executes them in parallel on an executor. @@ -59,27 +60,33 @@ public class ChainedExecutionQueryRunner implements QueryRunner private static final Logger log = new Logger(ChainedExecutionQueryRunner.class); private final Iterable> queryables; - private final ExecutorService exec; + private final ListeningExecutorService exec; private final Ordering ordering; + private final QueryWatcher queryWatcher; public ChainedExecutionQueryRunner( ExecutorService exec, Ordering ordering, + QueryWatcher queryWatcher, QueryRunner... queryables ) { - this(exec, ordering, Arrays.asList(queryables)); + this(exec, ordering, queryWatcher, Arrays.asList(queryables)); } public ChainedExecutionQueryRunner( ExecutorService exec, Ordering ordering, + QueryWatcher queryWatcher, Iterable> queryables ) { - this.exec = exec; + // listeningDecorator will leave PrioritizedExecutorService unchanged, + // since it already implements ListeningExecutorService + this.exec = MoreExecutors.listeningDecorator(exec); this.ordering = ordering; this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); + this.queryWatcher = queryWatcher; } @Override @@ -94,71 +101,67 @@ public class ChainedExecutionQueryRunner implements QueryRunner public Iterator make() { // Make it a List<> to materialize all of the values (so that it will submit everything to the executor) - List>> futures = Lists.newArrayList( - Iterables.transform( - queryables, - new Function, Future>>() - { - @Override - public Future> apply(final QueryRunner input) - { - return exec.submit( - new AbstractPrioritizedCallable>(priority) - { - @Override - public List call() throws Exception - { - try { - if (input == null) { - throw new ISE("Input is null?! How is this possible?!"); - } + ListenableFuture>> futures = Futures.allAsList( + Lists.newArrayList( + Iterables.transform( + queryables, + new Function, ListenableFuture>>() + { + @Override + public ListenableFuture> apply(final QueryRunner input) + { + return exec.submit( + new AbstractPrioritizedCallable>(priority) + { + @Override + public Iterable call() throws Exception + { + try { + if (input == null) { + throw new ISE("Input is null?! How is this possible?!"); + } - Sequence result = input.run(query); - if (result == null) { - throw new ISE("Got a null result! Segments are missing!"); - } + Sequence result = input.run(query); + if (result == null) { + throw new ISE("Got a null result! Segments are missing!"); + } - List retVal = Sequences.toList(result, Lists.newArrayList()); - if (retVal == null) { - throw new ISE("Got a null list of results! WTF?!"); - } + List retVal = Sequences.toList(result, Lists.newArrayList()); + if (retVal == null) { + throw new ISE("Got a null list of results! WTF?!"); + } - return retVal; + return retVal; + } + catch (Exception e) { + log.error(e, "Exception with one of the sequences!"); + throw Throwables.propagate(e); + } + } } - catch (Exception e) { - log.error(e, "Exception with one of the sequences!"); - throw Throwables.propagate(e); - } - } - } - ); - } - } + ); + } + } + ) ) ); - return new MergeIterable( - ordering.nullsFirst(), - Iterables.transform( - futures, - new Function>, Iterable>() - { - @Override - public Iterable apply(Future> input) - { - try { - return input.get(); - } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - } - ) - ).iterator(); + queryWatcher.registerQuery(query, futures); + + try { + return new MergeIterable<>( + ordering.nullsFirst(), + futures.get() + ).iterator(); + } + catch (InterruptedException e) { + log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); + futures.cancel(true); + throw new QueryInterruptedException(e); + } + catch (ExecutionException e) { + throw Throwables.propagate(e.getCause()); + } } @Override diff --git a/processing/src/main/java/io/druid/query/QueryInterruptedException.java b/processing/src/main/java/io/druid/query/QueryInterruptedException.java new file mode 100644 index 00000000000..7b889ef5f2d --- /dev/null +++ b/processing/src/main/java/io/druid/query/QueryInterruptedException.java @@ -0,0 +1,37 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query; + +public class QueryInterruptedException extends RuntimeException +{ + public QueryInterruptedException() { + super(); + } + + public QueryInterruptedException(String message) + { + super(message); + } + + public QueryInterruptedException(Throwable cause) + { + super(cause); + } +} diff --git a/processing/src/main/java/io/druid/query/QueryWatcher.java b/processing/src/main/java/io/druid/query/QueryWatcher.java new file mode 100644 index 00000000000..0a76a54f23a --- /dev/null +++ b/processing/src/main/java/io/druid/query/QueryWatcher.java @@ -0,0 +1,27 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query; + +import com.google.common.util.concurrent.ListenableFuture; + +public interface QueryWatcher +{ + public void registerQuery(Query query, ListenableFuture future); +} diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/search/SearchQueryRunnerFactory.java index 0c0fdab980d..2cd868e45d8 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryRunnerFactory.java @@ -24,6 +24,7 @@ import io.druid.query.ChainedExecutionQueryRunner; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.query.search.search.SearchQuery; import io.druid.segment.Segment; @@ -35,13 +36,16 @@ import java.util.concurrent.ExecutorService; public class SearchQueryRunnerFactory implements QueryRunnerFactory, SearchQuery> { private final SearchQueryQueryToolChest toolChest; + private final QueryWatcher queryWatcher; @Inject public SearchQueryRunnerFactory( - SearchQueryQueryToolChest toolChest + SearchQueryQueryToolChest toolChest, + QueryWatcher queryWatcher ) { this.toolChest = toolChest; + this.queryWatcher = queryWatcher; } @Override @@ -56,7 +60,7 @@ public class SearchQueryRunnerFactory implements QueryRunnerFactory>( - queryExecutor, toolChest.getOrdering(), queryRunners + queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners ); } diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java index 6e995b15f44..a1fa77dabd5 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java @@ -20,6 +20,7 @@ package io.druid.query.select; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.guava.Sequence; @@ -29,6 +30,7 @@ import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.segment.Segment; @@ -43,21 +45,31 @@ public class SelectQueryRunnerFactory { return new SelectQueryRunnerFactory( new SelectQueryQueryToolChest(new QueryConfig(), jsonMapper), - new SelectQueryEngine() + new SelectQueryEngine(), + new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + } + } ); } private final SelectQueryQueryToolChest toolChest; private final SelectQueryEngine engine; + private final QueryWatcher queryWatcher; @Inject public SelectQueryRunnerFactory( SelectQueryQueryToolChest toolChest, - SelectQueryEngine engine + SelectQueryEngine engine, + QueryWatcher queryWatcher ) { this.toolChest = toolChest; this.engine = engine; + this.queryWatcher = queryWatcher; } @Override @@ -72,7 +84,7 @@ public class SelectQueryRunnerFactory ) { return new ChainedExecutionQueryRunner>( - queryExecutor, toolChest.getOrdering(), queryRunners + queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners ); } diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java index 16e9ae832fa..1f78429ead3 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java @@ -19,6 +19,7 @@ package io.druid.query.timeboundary; +import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.Sequence; @@ -27,6 +28,7 @@ import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.segment.Segment; import io.druid.segment.StorageAdapter; @@ -40,6 +42,13 @@ public class TimeBoundaryQueryRunnerFactory implements QueryRunnerFactory, TimeBoundaryQuery> { private static final TimeBoundaryQueryQueryToolChest toolChest = new TimeBoundaryQueryQueryToolChest(); + private final QueryWatcher queryWatcher; + + @Inject + public TimeBoundaryQueryRunnerFactory(QueryWatcher queryWatcher) + { + this.queryWatcher = queryWatcher; + } @Override public QueryRunner> createRunner(final Segment segment) @@ -53,7 +62,7 @@ public class TimeBoundaryQueryRunnerFactory ) { return new ChainedExecutionQueryRunner>( - queryExecutor, toolChest.getOrdering(), queryRunners + queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners ); } diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java index 740b309e37e..726bc20bb43 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java @@ -19,6 +19,7 @@ package io.druid.query.timeseries; +import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.guava.Sequence; @@ -28,6 +29,7 @@ import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.segment.Segment; import io.druid.segment.StorageAdapter; @@ -39,25 +41,39 @@ import java.util.concurrent.ExecutorService; public class TimeseriesQueryRunnerFactory implements QueryRunnerFactory, TimeseriesQuery> { + /** + * Use only for testing + * @return + */ public static TimeseriesQueryRunnerFactory create() { return new TimeseriesQueryRunnerFactory( new TimeseriesQueryQueryToolChest(new QueryConfig()), - new TimeseriesQueryEngine() + new TimeseriesQueryEngine(), + new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + } + } ); } private final TimeseriesQueryQueryToolChest toolChest; private final TimeseriesQueryEngine engine; + private final QueryWatcher queryWatcher; @Inject public TimeseriesQueryRunnerFactory( TimeseriesQueryQueryToolChest toolChest, - TimeseriesQueryEngine engine + TimeseriesQueryEngine engine, + QueryWatcher queryWatcher ) { this.toolChest = toolChest; this.engine = engine; + this.queryWatcher = queryWatcher; } @Override @@ -72,7 +88,7 @@ public class TimeseriesQueryRunnerFactory ) { return new ChainedExecutionQueryRunner>( - queryExecutor, toolChest.getOrdering(), queryRunners + queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners ); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java index 044e6a64eb2..524f9ace6a5 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java @@ -30,6 +30,7 @@ import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.segment.Segment; @@ -43,15 +44,18 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory computationBufferPool; private final TopNQueryQueryToolChest toolchest; + private final QueryWatcher queryWatcher; @Inject public TopNQueryRunnerFactory( @Global StupidPool computationBufferPool, - TopNQueryQueryToolChest toolchest + TopNQueryQueryToolChest toolchest, + QueryWatcher queryWatcher ) { this.computationBufferPool = computationBufferPool; this.toolchest = toolchest; + this.queryWatcher = queryWatcher; } @Override @@ -79,7 +83,7 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory>( - queryExecutor, toolchest.getOrdering(), queryRunners + queryExecutor, toolchest.getOrdering(), queryWatcher, queryRunners ); } diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java new file mode 100644 index 00000000000..445bea9cf53 --- /dev/null +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -0,0 +1,148 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import com.google.common.util.concurrent.ListenableFuture; +import com.metamx.common.concurrent.ExecutorServiceConfig; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.lifecycle.Lifecycle; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class ChainedExecutionQueryRunnerTest +{ + @Test @Ignore + public void testQueryCancellation() throws Exception + { + ExecutorService exec = PrioritizedExecutorService.create( + new Lifecycle(), new ExecutorServiceConfig() + { + @Override + public String getFormatString() + { + return "test"; + } + + @Override + public int getNumThreads() + { + return 2; + } + } + ); + + final CountDownLatch queriesStarted = new CountDownLatch(2); + final CountDownLatch queryIsRegistered = new CountDownLatch(1); + + final Map queries = Maps.newHashMap(); + QueryWatcher watcher = new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + queries.put(query, future); + queryIsRegistered.countDown(); + } + }; + + ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>( + exec, + Ordering.natural(), + watcher, + Lists.>newArrayList( + new DyingQueryRunner(1, queriesStarted), + new DyingQueryRunner(2, queriesStarted), + new DyingQueryRunner(3, queriesStarted) + ) + ); + + final Sequence seq = chainedRunner.run( + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .intervals("2014/2015") + .aggregators(Lists.newArrayList(new CountAggregatorFactory("count"))) + .build() + ); + + Future f = Executors.newFixedThreadPool(1).submit( + new Runnable() + { + @Override + public void run() + { + Sequences.toList(seq, Lists.newArrayList()); + } + } + ); + + // wait for query to register + queryIsRegistered.await(); + queriesStarted.await(); + + // cancel the query + queries.values().iterator().next().cancel(true); + f.get(); + } + + private static class DyingQueryRunner implements QueryRunner + { + private final int id; + private final CountDownLatch latch; + + public DyingQueryRunner(int id, CountDownLatch latch) { + this.id = id; + this.latch = latch; + } + + @Override + public Sequence run(Query query) + { + latch.countDown(); + + int i = 0; + while (i >= 0) { + if(Thread.interrupted()) { + throw new QueryInterruptedException("I got killed"); + } + + // do a lot of work + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new QueryInterruptedException("I got killed"); + } + ++i; + } + return Sequences.simple(Lists.newArrayList(i)); + } + } +} diff --git a/processing/src/test/java/io/druid/query/TestQueryRunners.java b/processing/src/test/java/io/druid/query/TestQueryRunners.java index c4767c1c6f9..a858b5e0cdf 100644 --- a/processing/src/test/java/io/druid/query/TestQueryRunners.java +++ b/processing/src/test/java/io/druid/query/TestQueryRunners.java @@ -1,6 +1,7 @@ package io.druid.query; import com.google.common.base.Supplier; +import com.google.common.util.concurrent.ListenableFuture; import io.druid.collections.StupidPool; import io.druid.query.search.SearchQueryQueryToolChest; import io.druid.query.search.SearchQueryRunnerFactory; @@ -40,7 +41,14 @@ public class TestQueryRunners Segment adapter ) { - QueryRunnerFactory factory = new TopNQueryRunnerFactory(pool, new TopNQueryQueryToolChest(topNConfig)); + QueryRunnerFactory factory = new TopNQueryRunnerFactory(pool, new TopNQueryQueryToolChest(topNConfig), new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }); return new FinalizeResultsQueryRunner( factory.createRunner(adapter), factory.getToolchest() @@ -62,7 +70,14 @@ public class TestQueryRunners Segment adapter ) { - QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig())); + QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()), new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }); return new FinalizeResultsQueryRunner( factory.createRunner(adapter), factory.getToolchest() @@ -73,11 +88,18 @@ public class TestQueryRunners Segment adapter ) { - QueryRunnerFactory factory = new TimeBoundaryQueryRunnerFactory(); + QueryRunnerFactory factory = new TimeBoundaryQueryRunnerFactory(new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }); return new FinalizeResultsQueryRunner( factory.createRunner(adapter), factory.getToolchest() ); } -} \ No newline at end of file +} diff --git a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java index 2e5623b4365..0740333eed5 100644 --- a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java @@ -23,10 +23,13 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.guava.Sequences; import io.druid.query.Druids; +import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.query.filter.DimFilter; import io.druid.query.search.search.FragmentSearchQuerySpec; @@ -56,7 +59,14 @@ public class SearchQueryRunnerTest public static Collection constructorFeeder() throws IOException { return QueryRunnerTestHelper.makeQueryRunners( - new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig())) + new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()), new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }) ); } diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java index 9b92826ac34..de5ac1281b2 100644 --- a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java @@ -20,10 +20,13 @@ package io.druid.query.timeboundary; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.guava.Sequences; import io.druid.query.Druids; +import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import org.joda.time.DateTime; import org.junit.Assert; @@ -43,7 +46,14 @@ public class TimeBoundaryQueryRunnerTest public static Collection constructorFeeder() throws IOException { return QueryRunnerTestHelper.makeQueryRunners( - new TimeBoundaryQueryRunnerFactory() + new TimeBoundaryQueryRunnerFactory(new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }) ); } diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java index 22b750faf00..39af4459794 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java @@ -23,12 +23,15 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.guava.Sequences; import io.druid.collections.StupidPool; import io.druid.query.BySegmentResultValueClass; import io.druid.query.Druids; +import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; @@ -68,7 +71,15 @@ public class TopNQueryRunnerTest QueryRunnerTestHelper.makeQueryRunners( new TopNQueryRunnerFactory( TestQueryRunners.getPool(), - new TopNQueryQueryToolChest(new TopNQueryConfig()) + new TopNQueryQueryToolChest(new TopNQueryConfig()), + new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + } ) ) ); @@ -85,7 +96,15 @@ public class TopNQueryRunnerTest } } ), - new TopNQueryQueryToolChest(new TopNQueryConfig()) + new TopNQueryQueryToolChest(new TopNQueryConfig()), + new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + } ) ) ); diff --git a/server/src/main/java/io/druid/guice/QueryRunnerFactoryModule.java b/server/src/main/java/io/druid/guice/QueryRunnerFactoryModule.java index 6f4dc80b059..6c1bb62cafe 100644 --- a/server/src/main/java/io/druid/guice/QueryRunnerFactoryModule.java +++ b/server/src/main/java/io/druid/guice/QueryRunnerFactoryModule.java @@ -24,6 +24,7 @@ import com.google.inject.Binder; import com.google.inject.multibindings.MapBinder; import io.druid.query.Query; import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryWatcher; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryEngine; import io.druid.query.groupby.GroupByQueryRunnerFactory; @@ -39,6 +40,7 @@ import io.druid.query.timeseries.TimeseriesQuery; import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; import io.druid.query.topn.TopNQuery; import io.druid.query.topn.TopNQueryRunnerFactory; +import io.druid.server.QueryManager; import java.util.Map; @@ -62,6 +64,12 @@ public class QueryRunnerFactoryModule extends QueryToolChestModule { super.configure(binder); + binder.bind(QueryWatcher.class) + .to(QueryManager.class) + .in(LazySingleton.class); + binder.bind(QueryManager.class) + .in(LazySingleton.class); + final MapBinder, QueryRunnerFactory> queryFactoryBinder = DruidBinders.queryRunnerFactoryBinder( binder ); diff --git a/server/src/main/java/io/druid/server/QueryManager.java b/server/src/main/java/io/druid/server/QueryManager.java new file mode 100644 index 00000000000..5910aeda535 --- /dev/null +++ b/server/src/main/java/io/druid/server/QueryManager.java @@ -0,0 +1,48 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server; + +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.query.Query; +import io.druid.query.QueryWatcher; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; + +public class QueryManager implements QueryWatcher +{ + final ConcurrentMap queries; + + public QueryManager() { + this.queries = Maps.newConcurrentMap(); + } + + public void cancelQuery(String id) { + Future future = queries.get(id); + if(future != null) { + future.cancel(true); + } + } + public void registerQuery(Query query, ListenableFuture future) + { + queries.put(query.getId(), future); + } +} diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 6319a0d4633..e710a6c97c9 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -37,16 +37,20 @@ import io.druid.guice.annotations.Smile; import io.druid.query.DataSourceUtil; import io.druid.query.Query; import io.druid.query.QuerySegmentWalker; +import io.druid.query.QueryWatcher; import io.druid.server.log.RequestLogger; import org.joda.time.DateTime; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.DELETE; import javax.ws.rs.POST; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.core.Context; +import javax.ws.rs.core.Response; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.Charset; @@ -66,6 +70,7 @@ public class QueryResource private final QuerySegmentWalker texasRanger; private final ServiceEmitter emitter; private final RequestLogger requestLogger; + private final QueryManager queryManager; @Inject public QueryResource( @@ -73,7 +78,8 @@ public class QueryResource @Smile ObjectMapper smileMapper, QuerySegmentWalker texasRanger, ServiceEmitter emitter, - RequestLogger requestLogger + RequestLogger requestLogger, + QueryManager queryManager ) { this.jsonMapper = jsonMapper; @@ -81,6 +87,16 @@ public class QueryResource this.texasRanger = texasRanger; this.emitter = emitter; this.requestLogger = requestLogger; + this.queryManager = queryManager; + } + + @DELETE + @Path("{id}") + @Produces("application/json") + public Response getServer(@PathParam("id") String queryId) + { + queryManager.cancelQuery(queryId); + return Response.status(Response.Status.ACCEPTED).build(); } @POST @@ -124,10 +140,13 @@ public class QueryResource resp.setStatus(200); resp.setContentType("application/x-javascript"); + resp.setHeader("X-Druid-Query-Id", query.getId()); out = resp.getOutputStream(); jsonWriter.writeValue(out, results); +// JsonGenerator jgen = jsonWriter.getFactory().createGenerator(out); + long requestTime = System.currentTimeMillis() - start; emitter.emit( From 1be85af32019afd80252ebc2aa746640afa8fb04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Fri, 9 May 2014 22:51:43 -0700 Subject: [PATCH 02/19] handle query interruption at cursor level --- .../io/druid/segment/QueryableIndexStorageAdapter.java | 7 +++++++ .../incremental/IncrementalIndexStorageAdapter.java | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index a71297b3b89..f624cdfa807 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -27,6 +27,7 @@ import com.google.common.io.Closeables; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.granularity.QueryGranularity; +import io.druid.query.QueryInterruptedException; import io.druid.query.filter.Filter; import io.druid.segment.column.Column; import io.druid.segment.column.ColumnCapabilities; @@ -224,6 +225,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter @Override public void advance() { + if (Thread.interrupted()) { + throw new QueryInterruptedException(); + } cursorOffset.increment(); } @@ -652,6 +656,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter @Override public void advance() { + if (Thread.interrupted()) { + throw new QueryInterruptedException(); + } ++currRow; } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 0eddf59ac98..3fe807b2761 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -29,6 +29,7 @@ import com.metamx.collections.spatial.search.Bound; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.granularity.QueryGranularity; +import io.druid.query.QueryInterruptedException; import io.druid.query.aggregation.Aggregator; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; @@ -239,6 +240,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter Iterators.advance(baseIter, numAdvanced); } + if (Thread.interrupted()) { + throw new QueryInterruptedException(); + } + boolean foundMatched = false; while (baseIter.hasNext()) { currEntry.set(baseIter.next()); From 32f6243be0a765bb59389671132b9fb9d35f66cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Fri, 9 May 2014 23:28:42 -0700 Subject: [PATCH 03/19] proper closing of resources in case of query cancellation --- .../timeseries/TimeseriesQueryEngine.java | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java index 9f1d0860aa7..ad290536b30 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java @@ -60,27 +60,29 @@ public class TimeseriesQueryEngine { Aggregator[] aggregators = QueryRunnerHelper.makeAggregators(cursor, aggregatorSpecs); - while (!cursor.isDone()) { - for (Aggregator aggregator : aggregators) { - aggregator.aggregate(); + try { + while (!cursor.isDone()) { + for (Aggregator aggregator : aggregators) { + aggregator.aggregate(); + } + cursor.advance(); } - cursor.advance(); + + TimeseriesResultBuilder bob = new TimeseriesResultBuilder(cursor.getTime()); + + for (Aggregator aggregator : aggregators) { + bob.addMetric(aggregator); + } + + Result retVal = bob.build(); + return retVal; } - - TimeseriesResultBuilder bob = new TimeseriesResultBuilder(cursor.getTime()); - - for (Aggregator aggregator : aggregators) { - bob.addMetric(aggregator); + finally { + // cleanup + for (Aggregator agg : aggregators) { + agg.close(); + } } - - Result retVal = bob.build(); - - // cleanup - for (Aggregator agg : aggregators) { - agg.close(); - } - - return retVal; } } ); From 4f1e1576397e08eb26413b76108d0c03eb6bbefd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 14 May 2014 16:58:41 -0700 Subject: [PATCH 04/19] pass through errors from computes in DirectDruidClient --- .../io/druid/query/QueryInterruptedException.java | 13 ++++++++++++- .../java/io/druid/client/DirectDruidClient.java | 8 +++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/io/druid/query/QueryInterruptedException.java b/processing/src/main/java/io/druid/query/QueryInterruptedException.java index 7b889ef5f2d..00676918dcf 100644 --- a/processing/src/main/java/io/druid/query/QueryInterruptedException.java +++ b/processing/src/main/java/io/druid/query/QueryInterruptedException.java @@ -19,13 +19,17 @@ package io.druid.query; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + public class QueryInterruptedException extends RuntimeException { public QueryInterruptedException() { super(); } - public QueryInterruptedException(String message) + @JsonCreator + public QueryInterruptedException(@JsonProperty("error") String message) { super(message); } @@ -34,4 +38,11 @@ public class QueryInterruptedException extends RuntimeException { super(cause); } + + @JsonProperty("error") + @Override + public String getMessage() + { + return super.getMessage(); + } } diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index ae994d1cda7..c08cd9e2bd4 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -45,6 +45,7 @@ import com.metamx.http.client.response.ClientResponse; import com.metamx.http.client.response.InputStreamResponseHandler; import io.druid.query.BySegmentResultValueClass; import io.druid.query.Query; +import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; @@ -283,7 +284,12 @@ public class DirectDruidClient implements QueryRunner if (jp == null) { try { jp = objectMapper.getFactory().createParser(future.get()); - if (jp.nextToken() != JsonToken.START_ARRAY) { + final JsonToken nextToken = jp.nextToken(); + if (nextToken == JsonToken.START_OBJECT) { + QueryInterruptedException e = jp.getCodec().readValue(jp, QueryInterruptedException.class); + throw e; + } + else if (nextToken != JsonToken.START_ARRAY) { throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url); } else { jp.nextToken(); From 1183c68ce4e2c55529be8bcf7fcd7c087af2fdcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 14 May 2014 16:59:01 -0700 Subject: [PATCH 05/19] formatting --- .../main/java/io/druid/segment/ReferenceCountingSequence.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/io/druid/segment/ReferenceCountingSequence.java b/processing/src/main/java/io/druid/segment/ReferenceCountingSequence.java index 402e63dc31d..4c5cbfc3d3d 100644 --- a/processing/src/main/java/io/druid/segment/ReferenceCountingSequence.java +++ b/processing/src/main/java/io/druid/segment/ReferenceCountingSequence.java @@ -48,4 +48,4 @@ public class ReferenceCountingSequence extends YieldingSequenceBase final Closeable closeable = segment.increment(); return new ResourceClosingYielder(baseSequence.toYielder(initValue, accumulator), closeable); } -} \ No newline at end of file +} From a56a655eae2f4dfb5492f278d5bb0078ec3293bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 14 May 2014 17:00:18 -0700 Subject: [PATCH 06/19] proper query exceptions and add support for query timeout --- .../query/ChainedExecutionQueryRunner.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 6c12d37669b..c7ed29f1ddf 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -39,8 +39,11 @@ import com.metamx.common.logger.Logger; import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * A QueryRunner that combines a list of other QueryRunners and executes them in parallel on an executor. @@ -149,15 +152,26 @@ public class ChainedExecutionQueryRunner implements QueryRunner queryWatcher.registerQuery(query, futures); try { + final Number timeout = query.getContextValue("timeout", (Number)null); return new MergeIterable<>( ordering.nullsFirst(), - futures.get() + timeout == null ? + futures.get() : + futures.get(timeout.longValue(), TimeUnit.MILLISECONDS) ).iterator(); } catch (InterruptedException e) { log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); futures.cancel(true); - throw new QueryInterruptedException(e); + throw new QueryInterruptedException("Query interrupted"); + } + catch(CancellationException e) { + log.warn(e, "Query cancelled, query id [%s]", query.getId()); + throw new QueryInterruptedException("Query cancelled"); + } + catch(TimeoutException e) { + log.warn(e, "Query timeout, query id [%s]", query.getId()); + throw new QueryInterruptedException("Query timeout"); } catch (ExecutionException e) { throw Throwables.propagate(e.getCause()); From def62c74f8e60f799b7118d26554e5f340ca9cc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 14 May 2014 17:01:18 -0700 Subject: [PATCH 07/19] properly remove completed queries from query manager --- .../java/io/druid/server/QueryManager.java | 52 +++++++++++++------ 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/io/druid/server/QueryManager.java b/server/src/main/java/io/druid/server/QueryManager.java index 5910aeda535..84b947414dd 100644 --- a/server/src/main/java/io/druid/server/QueryManager.java +++ b/server/src/main/java/io/druid/server/QueryManager.java @@ -19,30 +19,50 @@ package io.druid.server; -import com.google.common.collect.Maps; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimaps; +import com.google.common.collect.SetMultimap; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import io.druid.query.Query; import io.druid.query.QueryWatcher; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Future; +import java.util.Set; public class QueryManager implements QueryWatcher { - final ConcurrentMap queries; + final SetMultimap queries; - public QueryManager() { - this.queries = Maps.newConcurrentMap(); - } - - public void cancelQuery(String id) { - Future future = queries.get(id); - if(future != null) { - future.cancel(true); - } - } - public void registerQuery(Query query, ListenableFuture future) + public QueryManager() { - queries.put(query.getId(), future); + this.queries = Multimaps.synchronizedSetMultimap( + HashMultimap.create() + ); + } + + public boolean cancelQuery(String id) { + Set futures = queries.removeAll(id); + boolean success = true; + for (ListenableFuture future : futures) { + success = success && future.cancel(true); + } + return success; + } + + public void registerQuery(Query query, final ListenableFuture future) + { + final String id = query.getId(); + queries.put(id, future); + future.addListener( + new Runnable() + { + @Override + public void run() + { + queries.remove(id, future); + } + }, + MoreExecutors.sameThreadExecutor() + ); } } From d2c729adec3b9b82705646c6f006ad55b8860a5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 12 May 2014 15:07:49 -0700 Subject: [PATCH 08/19] return proper json errors to propagate query cancellation and timeout --- .../io/druid/jackson/DefaultObjectMapper.java | 13 +- .../DruidDefaultSerializersModule.java | 26 ++- .../java/io/druid/server/QueryResource.java | 154 +++++++++++------- 3 files changed, 127 insertions(+), 66 deletions(-) diff --git a/processing/src/main/java/io/druid/jackson/DefaultObjectMapper.java b/processing/src/main/java/io/druid/jackson/DefaultObjectMapper.java index 952b506bd95..e55e2299d2b 100644 --- a/processing/src/main/java/io/druid/jackson/DefaultObjectMapper.java +++ b/processing/src/main/java/io/druid/jackson/DefaultObjectMapper.java @@ -32,7 +32,12 @@ public class DefaultObjectMapper extends ObjectMapper { public DefaultObjectMapper() { - this(null); + this((JsonFactory)null); + } + + public DefaultObjectMapper(DefaultObjectMapper mapper) + { + super(mapper); } public DefaultObjectMapper(JsonFactory factory) @@ -52,4 +57,10 @@ public class DefaultObjectMapper extends ObjectMapper configure(MapperFeature.AUTO_DETECT_SETTERS, false); configure(SerializationFeature.INDENT_OUTPUT, false); } + + @Override + public ObjectMapper copy() + { + return new DefaultObjectMapper(this); + } } diff --git a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java index 068bfecd963..6184221a1db 100644 --- a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java @@ -105,7 +105,7 @@ public class DruidDefaultSerializersModule extends SimpleModule jgen.writeStartArray(); value.accumulate( null, - new Accumulator() + new Accumulator() { @Override public Object accumulate(Object o, Object o1) @@ -116,7 +116,7 @@ public class DruidDefaultSerializersModule extends SimpleModule catch (IOException e) { throw Throwables.propagate(e); } - return o; + return null; } } ); @@ -124,6 +124,28 @@ public class DruidDefaultSerializersModule extends SimpleModule } } ); + addSerializer( + Yielder.class, + new JsonSerializer() + { + @Override + public void serialize(Yielder yielder, final JsonGenerator jgen, SerializerProvider provider) + throws IOException, JsonProcessingException + { + jgen.writeStartArray(); + try { + while (!yielder.isDone()) { + final Object o = yielder.get(); + jgen.writeObject(o); + yielder = yielder.next(null); + } + } finally { + yielder.close(); + } + jgen.writeEndArray(); + } + } + ); addSerializer(ByteOrder.class, ToStringSerializer.instance); addDeserializer( ByteOrder.class, diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index e710a6c97c9..1e6ea06607f 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -19,16 +19,23 @@ package io.druid.server; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; import com.google.inject.Inject; +import com.metamx.common.guava.Accumulator; +import com.metamx.common.guava.Accumulators; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import com.metamx.common.guava.Yielder; +import com.metamx.common.guava.YieldingAccumulator; +import com.metamx.common.guava.YieldingAccumulators; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; @@ -36,21 +43,25 @@ import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.query.DataSourceUtil; import io.druid.query.Query; +import io.druid.query.QueryInterruptedException; import io.druid.query.QuerySegmentWalker; -import io.druid.query.QueryWatcher; import io.druid.server.log.RequestLogger; import org.joda.time.DateTime; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.Charset; @@ -64,6 +75,8 @@ public class QueryResource private static final EmittingLogger log = new EmittingLogger(QueryResource.class); private static final Charset UTF8 = Charset.forName("UTF-8"); private static final Joiner COMMA_JOIN = Joiner.on(","); + public static final String APPLICATION_SMILE = "application/smile"; + public static final String APPLICATION_JSON = "application/json"; private final ObjectMapper jsonMapper; private final ObjectMapper smileMapper; @@ -82,8 +95,12 @@ public class QueryResource QueryManager queryManager ) { - this.jsonMapper = jsonMapper; - this.smileMapper = smileMapper; + this.jsonMapper = jsonMapper.copy(); + this.jsonMapper.getFactory().configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); + + this.smileMapper = smileMapper.copy(); + this.smileMapper.getFactory().configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); + this.texasRanger = texasRanger; this.emitter = emitter; this.requestLogger = requestLogger; @@ -97,13 +114,13 @@ public class QueryResource { queryManager.cancelQuery(queryId); return Response.status(Response.Status.ACCEPTED).build(); + } @POST - @Produces("application/json") - public void doPost( + public Response doPost( @Context HttpServletRequest req, - @Context HttpServletResponse resp + @Context final HttpServletResponse resp ) throws ServletException, IOException { final long start = System.currentTimeMillis(); @@ -111,13 +128,12 @@ public class QueryResource byte[] requestQuery = null; String queryId; - final boolean isSmile = "application/smile".equals(req.getContentType()); + final boolean isSmile = APPLICATION_SMILE.equals(req.getContentType()); ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; - ObjectWriter jsonWriter = req.getParameter("pretty") == null + final ObjectWriter jsonWriter = req.getParameter("pretty") == null ? objectMapper.writer() : objectMapper.writerWithDefaultPrettyPrinter(); - OutputStream out = null; try { requestQuery = ByteStreams.toByteArray(req.getInputStream()); @@ -132,48 +148,70 @@ public class QueryResource log.debug("Got query [%s]", query); } - Sequence results = query.run(texasRanger); + Sequence results = query.run(texasRanger); if (results == null) { results = Sequences.empty(); } - resp.setStatus(200); - resp.setContentType("application/x-javascript"); - resp.setHeader("X-Druid-Query-Id", query.getId()); - - out = resp.getOutputStream(); - jsonWriter.writeValue(out, results); - -// JsonGenerator jgen = jsonWriter.getFactory().createGenerator(out); - - long requestTime = System.currentTimeMillis() - start; - - emitter.emit( - new ServiceMetricEvent.Builder() - .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) - .setUser4(query.getType()) - .setUser5(COMMA_JOIN.join(query.getIntervals())) - .setUser6(String.valueOf(query.hasFilters())) - .setUser7(req.getRemoteAddr()) - .setUser8(queryId) - .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()) - .build("request/time", requestTime) - ); - - requestLogger.log( - new RequestLogLine( - new DateTime(), - req.getRemoteAddr(), - query, - new QueryStats( - ImmutableMap.of( - "request/time", requestTime, - "success", true - ) - ) + try ( + final Yielder yielder = results.toYielder( + null, + new YieldingAccumulator() + { + @Override + public Object accumulate(Object accumulated, Object in) + { + yield(); + return in; + } + } ) - ); + ) { + long requestTime = System.currentTimeMillis() - start; + + emitter.emit( + new ServiceMetricEvent.Builder() + .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) + .setUser4(query.getType()) + .setUser5(COMMA_JOIN.join(query.getIntervals())) + .setUser6(String.valueOf(query.hasFilters())) + .setUser7(req.getRemoteAddr()) + .setUser8(queryId) + .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()) + .build("request/time", requestTime) + ); + + requestLogger.log( + new RequestLogLine( + new DateTime(), + req.getRemoteAddr(), + query, + new QueryStats( + ImmutableMap.of( + "request/time", requestTime, + "success", true + ) + ) + ) + ); + + return Response + .ok( + new StreamingOutput() + { + @Override + public void write(OutputStream outputStream) throws IOException, WebApplicationException + { + jsonWriter.writeValue(outputStream, yielder); + outputStream.close(); + } + }, + isSmile ? APPLICATION_JSON : APPLICATION_SMILE + ) + .header("X-Druid-Query-Id", queryId) + .build(); + } } catch (Exception e) { final String queryString = @@ -183,20 +221,6 @@ public class QueryResource log.warn(e, "Exception occurred on request [%s]", queryString); - if (!resp.isCommitted()) { - resp.setStatus(500); - resp.resetBuffer(); - - if (out == null) { - out = resp.getOutputStream(); - } - - out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8)); - out.write("\n".getBytes(UTF8)); - } - - resp.flushBuffer(); - try { requestLogger.log( new RequestLogLine( @@ -216,10 +240,14 @@ public class QueryResource .addData("query", queryString) .addData("peer", req.getRemoteAddr()) .emit(); - } - finally { - resp.flushBuffer(); - Closeables.closeQuietly(out); + + return Response.serverError().entity( + jsonWriter.writeValueAsString( + ImmutableMap.of( + "error", (e.getMessage() == null) ? "null Exception" : e.getMessage() + ) + ) + ).build(); } } } From d01f272a7ab71ab7ba565493404f8330af3e774b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 2 Jun 2014 17:40:35 -0700 Subject: [PATCH 09/19] forward cancellation in direct druid client --- pom.xml | 2 +- .../io/druid/client/BrokerServerView.java | 8 +- .../io/druid/client/DirectDruidClient.java | 46 ++++++++-- .../java/io/druid/server/QueryResource.java | 1 - .../druid/client/DirectDruidClientTest.java | 90 +++++++++++++++++-- 5 files changed, 127 insertions(+), 20 deletions(-) diff --git a/pom.xml b/pom.xml index 2116d7f617f..2a3af54886e 100644 --- a/pom.xml +++ b/pom.xml @@ -78,7 +78,7 @@ com.metamx http-client - 0.9.5 + 0.9.6 com.metamx diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index 57663154156..0070622ac85 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -32,10 +32,9 @@ import io.druid.client.selector.TierSelectorStrategy; import io.druid.concurrent.Execs; import io.druid.guice.annotations.Client; import io.druid.query.DataSource; -import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChestWarehouse; -import io.druid.query.TableDataSource; +import io.druid.query.QueryWatcher; import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import io.druid.timeline.VersionedIntervalTimeline; @@ -60,6 +59,7 @@ public class BrokerServerView implements TimelineServerView private final Map> timelines; private final QueryToolChestWarehouse warehouse; + private final QueryWatcher queryWatcher; private final ObjectMapper smileMapper; private final HttpClient httpClient; private final ServerInventoryView baseView; @@ -68,6 +68,7 @@ public class BrokerServerView implements TimelineServerView @Inject public BrokerServerView( QueryToolChestWarehouse warehouse, + QueryWatcher queryWatcher, ObjectMapper smileMapper, @Client HttpClient httpClient, ServerInventoryView baseView, @@ -75,6 +76,7 @@ public class BrokerServerView implements TimelineServerView ) { this.warehouse = warehouse; + this.queryWatcher = queryWatcher; this.smileMapper = smileMapper; this.httpClient = httpClient; this.baseView = baseView; @@ -154,7 +156,7 @@ public class BrokerServerView implements TimelineServerView private DirectDruidClient makeDirectClient(DruidServer server) { - return new DirectDruidClient(warehouse, smileMapper, httpClient, server.getHost()); + return new DirectDruidClient(warehouse, queryWatcher, smileMapper, httpClient, server.getHost()); } private QueryableDruidServer removeServer(DruidServer server) diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index c08cd9e2bd4..34584ee2395 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.type.TypeFactory; import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Charsets; import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.io.Closeables; @@ -43,12 +44,15 @@ import com.metamx.http.client.HttpClient; import com.metamx.http.client.io.AppendableByteArrayInputStream; import com.metamx.http.client.response.ClientResponse; import com.metamx.http.client.response.InputStreamResponseHandler; +import com.metamx.http.client.response.StatusResponseHandler; +import com.metamx.http.client.response.StatusResponseHolder; import io.druid.query.BySegmentResultValueClass; import io.druid.query.Query; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.query.aggregation.MetricManipulatorFns; import org.jboss.netty.handler.codec.http.HttpChunk; @@ -61,6 +65,7 @@ import java.io.InputStream; import java.net.URL; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; @@ -74,6 +79,7 @@ public class DirectDruidClient implements QueryRunner private static final Map, Pair> typesMap = Maps.newConcurrentMap(); private final QueryToolChestWarehouse warehouse; + private final QueryWatcher queryWatcher; private final ObjectMapper objectMapper; private final HttpClient httpClient; private final String host; @@ -83,12 +89,14 @@ public class DirectDruidClient implements QueryRunner public DirectDruidClient( QueryToolChestWarehouse warehouse, + QueryWatcher queryWatcher, ObjectMapper objectMapper, HttpClient httpClient, String host ) { this.warehouse = warehouse; + this.queryWatcher = queryWatcher; this.objectMapper = objectMapper; this.httpClient = httpClient; this.host = host; @@ -103,7 +111,7 @@ public class DirectDruidClient implements QueryRunner } @Override - public Sequence run(Query query) + public Sequence run(final Query query) { QueryToolChest> toolChest = warehouse.getToolChest(query); boolean isBySegment = query.getContextBySegment(false); @@ -128,6 +136,7 @@ public class DirectDruidClient implements QueryRunner final ListenableFuture future; final String url = String.format("http://%s/druid/v2/", host); + final String cancelUrl = String.format("http://%s/druid/v2/%s", host, query.getId()); try { log.debug("Querying url[%s]", url); @@ -175,6 +184,9 @@ public class DirectDruidClient implements QueryRunner } } ); + + queryWatcher.registerQuery(query, future); + openConnections.getAndIncrement(); Futures.addCallback( future, new FutureCallback() @@ -189,6 +201,27 @@ public class DirectDruidClient implements QueryRunner public void onFailure(Throwable t) { openConnections.getAndDecrement(); + if (future.isCancelled()) { + // forward the cancellation to underlying queriable node + try { + StatusResponseHolder res = httpClient + .delete(new URL(cancelUrl)) + .setContent(objectMapper.writeValueAsBytes(query)) + .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? "application/smile" : "application/json") + .go(new StatusResponseHandler(Charsets.UTF_8)) + .get(); + if (res.getStatus().getCode() >= 500) { + throw new RE( + "Error cancelling query[%s]: queriable node returned status[%d] [%s].", + res.getStatus().getCode(), + res.getStatus().getReasonPhrase() + ); + } + } + catch (IOException | ExecutionException | InterruptedException e) { + Throwables.propagate(e); + } + } } } ); @@ -197,7 +230,7 @@ public class DirectDruidClient implements QueryRunner throw Throwables.propagate(e); } - Sequence retVal = new BaseSequence>( + Sequence retVal = new BaseSequence<>( new BaseSequence.IteratorMaker>() { @Override @@ -296,14 +329,11 @@ public class DirectDruidClient implements QueryRunner objectCodec = jp.getCodec(); } } - catch (IOException e) { + catch (IOException | InterruptedException | ExecutionException e) { throw new RE(e, "Failure getting results from[%s]", url); } - catch (InterruptedException e) { - throw new RE(e, "Failure getting results from[%s]", url); - } - catch (ExecutionException e) { - throw new RE(e, "Failure getting results from[%s]", url); + catch (CancellationException e) { + throw new QueryInterruptedException(); } } } diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 1e6ea06607f..c97657b4e35 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -73,7 +73,6 @@ import java.util.UUID; public class QueryResource { private static final EmittingLogger log = new EmittingLogger(QueryResource.class); - private static final Charset UTF8 = Charset.forName("UTF-8"); private static final Joiner COMMA_JOIN = Joiner.on(","); public static final String APPLICATION_SMILE = "application/smile"; public static final String APPLICATION_JSON = "application/json"; diff --git a/server/src/test/java/io/druid/client/DirectDruidClientTest.java b/server/src/test/java/io/druid/client/DirectDruidClientTest.java index aba91657686..84a80058d35 100644 --- a/server/src/test/java/io/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/io/druid/client/DirectDruidClientTest.java @@ -21,18 +21,24 @@ package io.druid.client; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.http.client.HttpClient; import com.metamx.http.client.Request; import com.metamx.http.client.RequestBuilder; +import com.metamx.http.client.response.StatusResponseHolder; import io.druid.client.selector.ConnectionCountServerSelectorStrategy; import io.druid.client.selector.HighestPriorityTierSelectorStrategy; import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.ServerSelector; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.Druids; +import io.druid.query.Query; +import io.druid.query.QueryInterruptedException; +import io.druid.query.QueryWatcher; import io.druid.query.ReflectionQueryToolChestWarehouse; import io.druid.query.Result; import io.druid.query.timeboundary.TimeBoundaryQuery; @@ -41,11 +47,13 @@ import io.druid.timeline.partition.NoneShardSpec; import junit.framework.Assert; import org.easymock.EasyMock; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.timeout.ReadTimeoutException; import org.joda.time.DateTime; import org.joda.time.Interval; -import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -54,17 +62,22 @@ import java.util.List; public class DirectDruidClientTest { - private HttpClient httpClient; - - @Before - public void setUp() throws Exception + public static final QueryWatcher DUMMY_WATCHER = new QueryWatcher() { - httpClient = EasyMock.createMock(HttpClient.class); - } + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }; + + @Rule + public ExpectedException thrown = ExpectedException.none(); @Test public void testRun() throws Exception { + HttpClient httpClient = EasyMock.createMock(HttpClient.class); RequestBuilder requestBuilder = new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com")); EasyMock.expect(httpClient.post(EasyMock.anyObject())).andReturn(requestBuilder).atLeastOnce(); @@ -93,12 +106,14 @@ public class DirectDruidClientTest DirectDruidClient client1 = new DirectDruidClient( new ReflectionQueryToolChestWarehouse(), + DUMMY_WATCHER, new DefaultObjectMapper(), httpClient, "foo" ); DirectDruidClient client2 = new DirectDruidClient( new ReflectionQueryToolChestWarehouse(), + DUMMY_WATCHER, new DefaultObjectMapper(), httpClient, "foo2" @@ -149,4 +164,65 @@ public class DirectDruidClientTest EasyMock.verify(httpClient); } + + @Test + public void testCancel() throws Exception + { + HttpClient httpClient = EasyMock.createStrictMock(HttpClient.class); + EasyMock.expect(httpClient.post(EasyMock.anyObject())).andReturn( + new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com")) + ).once(); + + ListenableFuture cancelledFuture = Futures.immediateCancelledFuture(); + EasyMock.expect(httpClient.go(EasyMock.anyObject())).andReturn(cancelledFuture).once(); + + EasyMock.expect(httpClient.delete(EasyMock.anyObject())) + .andReturn(new RequestBuilder(httpClient, HttpMethod.DELETE, new URL("http://foo.com/delete"))) + .once(); + SettableFuture cancellationFuture = SettableFuture.create(); + EasyMock.expect(httpClient.go(EasyMock.anyObject())).andReturn(cancellationFuture).once(); + + EasyMock.replay(httpClient); + + final ServerSelector serverSelector = new ServerSelector( + new DataSegment( + "test", + new Interval("2013-01-01/2013-01-02"), + new DateTime("2013-01-01").toString(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + new NoneShardSpec(), + 0, + 0L + ), + new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()) + ); + + DirectDruidClient client1 = new DirectDruidClient( + new ReflectionQueryToolChestWarehouse(), + DUMMY_WATCHER, + new DefaultObjectMapper(), + httpClient, + "foo" + ); + + QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer( + new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0), + client1 + ); + serverSelector.addServer(queryableDruidServer1); + + TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); + + cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled"))); + Sequence results = client1.run(query); + Assert.assertEquals(0, client1.getNumOpenConnections()); + + + thrown.expect(QueryInterruptedException.class); + Assert.assertTrue(Sequences.toList(results, Lists.newArrayList()).isEmpty()); + + EasyMock.verify(httpClient); + } } From 855c66c9ad13e0b133597e0324ffaf761c99bbeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 2 Jun 2014 18:29:42 -0700 Subject: [PATCH 10/19] less stack traces when cancelling queries --- .../query/ChainedExecutionQueryRunner.java | 5 ++-- .../io/druid/client/DirectDruidClient.java | 2 +- .../java/io/druid/server/QueryResource.java | 26 +++++++++++++++++-- 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index c7ed29f1ddf..256cefa33a0 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -136,6 +136,9 @@ public class ChainedExecutionQueryRunner implements QueryRunner return retVal; } + catch (QueryInterruptedException e) { + throw Throwables.propagate(e); + } catch (Exception e) { log.error(e, "Exception with one of the sequences!"); throw Throwables.propagate(e); @@ -166,11 +169,9 @@ public class ChainedExecutionQueryRunner implements QueryRunner throw new QueryInterruptedException("Query interrupted"); } catch(CancellationException e) { - log.warn(e, "Query cancelled, query id [%s]", query.getId()); throw new QueryInterruptedException("Query cancelled"); } catch(TimeoutException e) { - log.warn(e, "Query timeout, query id [%s]", query.getId()); throw new QueryInterruptedException("Query timeout"); } catch (ExecutionException e) { diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 34584ee2395..b6030f9755b 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -333,7 +333,7 @@ public class DirectDruidClient implements QueryRunner throw new RE(e, "Failure getting results from[%s]", url); } catch (CancellationException e) { - throw new QueryInterruptedException(); + throw new QueryInterruptedException("Query cancelled"); } } } diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index c97657b4e35..33bdd519c83 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -125,7 +125,7 @@ public class QueryResource final long start = System.currentTimeMillis(); Query query = null; byte[] requestQuery = null; - String queryId; + String queryId = null; final boolean isSmile = APPLICATION_SMILE.equals(req.getContentType()); @@ -212,6 +212,28 @@ public class QueryResource .build(); } } + catch (QueryInterruptedException e) { + try { + log.info("%s [%s]", e.getMessage(), queryId); + requestLogger.log( + new RequestLogLine( + new DateTime(), + req.getRemoteAddr(), + query, + new QueryStats(ImmutableMap.of("success", false, "interrupted", true, "reason", e.toString())) + ) + ); + } catch (Exception e2) { + log.error(e2, "Unable to log query [%s]!", query); + } + return Response.serverError().entity( + jsonWriter.writeValueAsString( + ImmutableMap.of( + "error", e.getMessage() + ) + ) + ).build(); + } catch (Exception e) { final String queryString = query == null @@ -243,7 +265,7 @@ public class QueryResource return Response.serverError().entity( jsonWriter.writeValueAsString( ImmutableMap.of( - "error", (e.getMessage() == null) ? "null Exception" : e.getMessage() + "error", e.getMessage() == null ? "null exception" : e.getMessage() ) ) ).build(); From d0f9c438f826f6ebe072bb193a80e8b5757d0c79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 2 Jun 2014 17:39:08 -0700 Subject: [PATCH 11/19] proper query cancellation tests --- .../ChainedExecutionQueryRunnerTest.java | 115 +++++++++++------- .../io/druid/query/QueryRunnerTestHelper.java | 11 ++ .../java/io/druid/query/TestQueryRunners.java | 13 +- .../druid/query/topn/TopNQueryRunnerTest.java | 18 +-- .../druid/query/topn/TopNUnionQueryTest.java | 9 +- .../druid/client/DirectDruidClientTest.java | 16 +-- 6 files changed, 107 insertions(+), 75 deletions(-) diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java index 445bea9cf53..b5391605d32 100644 --- a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -20,7 +20,6 @@ package io.druid.query; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.concurrent.ExecutorServiceConfig; @@ -29,18 +28,22 @@ import com.metamx.common.guava.Sequences; import com.metamx.common.lifecycle.Lifecycle; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; -import org.junit.Ignore; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Assert; import org.junit.Test; -import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; public class ChainedExecutionQueryRunnerTest { - @Test @Ignore + @Test public void testQueryCancellation() throws Exception { ExecutorService exec = PrioritizedExecutorService.create( @@ -63,25 +66,36 @@ public class ChainedExecutionQueryRunnerTest final CountDownLatch queriesStarted = new CountDownLatch(2); final CountDownLatch queryIsRegistered = new CountDownLatch(1); - final Map queries = Maps.newHashMap(); - QueryWatcher watcher = new QueryWatcher() - { - @Override - public void registerQuery(Query query, ListenableFuture future) - { - queries.put(query, future); - queryIsRegistered.countDown(); - } - }; + Capture capturedFuture = new Capture<>(); + QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class); + watcher.registerQuery(EasyMock.anyObject(), EasyMock.and(EasyMock.anyObject(), EasyMock.capture(capturedFuture))); + EasyMock.expectLastCall() + .andAnswer( + new IAnswer() + { + @Override + public Void answer() throws Throwable + { + queryIsRegistered.countDown(); + return null; + } + } + ) + .once(); + EasyMock.replay(watcher); + + DyingQueryRunner runner1 = new DyingQueryRunner(queriesStarted); + DyingQueryRunner runner2 = new DyingQueryRunner(queriesStarted); + DyingQueryRunner runner3 = new DyingQueryRunner(queriesStarted); ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>( exec, Ordering.natural(), watcher, Lists.>newArrayList( - new DyingQueryRunner(1, queriesStarted), - new DyingQueryRunner(2, queriesStarted), - new DyingQueryRunner(3, queriesStarted) + runner1, + runner2, + runner3 ) ); @@ -93,7 +107,7 @@ public class ChainedExecutionQueryRunnerTest .build() ); - Future f = Executors.newFixedThreadPool(1).submit( + Future resultFuture = Executors.newFixedThreadPool(1).submit( new Runnable() { @Override @@ -104,45 +118,64 @@ public class ChainedExecutionQueryRunnerTest } ); - // wait for query to register - queryIsRegistered.await(); - queriesStarted.await(); + // wait for query to register and start + Assert.assertTrue(queryIsRegistered.await(1, TimeUnit.SECONDS)); + Assert.assertTrue(queriesStarted.await(1, TimeUnit.SECONDS)); // cancel the query - queries.values().iterator().next().cancel(true); - f.get(); + Assert.assertTrue(capturedFuture.hasCaptured()); + ListenableFuture future = capturedFuture.getValue(); + future.cancel(true); + + QueryInterruptedException cause = null; + try { + resultFuture.get(); + } catch(ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof QueryInterruptedException); + cause = (QueryInterruptedException)e.getCause(); + } + Assert.assertNotNull(cause); + Assert.assertTrue(future.isCancelled()); + Assert.assertTrue(runner1.hasStarted); + Assert.assertTrue(runner2.hasStarted); + Assert.assertFalse(runner3.hasStarted); + Assert.assertFalse(runner1.hasCompleted); + Assert.assertFalse(runner2.hasCompleted); + Assert.assertFalse(runner3.hasCompleted); + + EasyMock.verify(watcher); } private static class DyingQueryRunner implements QueryRunner { - private final int id; private final CountDownLatch latch; + private boolean hasStarted = false; + private boolean hasCompleted = false; - public DyingQueryRunner(int id, CountDownLatch latch) { - this.id = id; + public DyingQueryRunner(CountDownLatch latch) + { this.latch = latch; } @Override public Sequence run(Query query) { + hasStarted = true; latch.countDown(); - - int i = 0; - while (i >= 0) { - if(Thread.interrupted()) { - throw new QueryInterruptedException("I got killed"); - } - - // do a lot of work - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw new QueryInterruptedException("I got killed"); - } - ++i; + if (Thread.interrupted()) { + throw new QueryInterruptedException("I got killed"); } - return Sequences.simple(Lists.newArrayList(i)); + + // do a lot of work + try { + Thread.sleep(500); + } + catch (InterruptedException e) { + throw new QueryInterruptedException("I got killed"); + } + + hasCompleted = true; + return Sequences.simple(Lists.newArrayList(123)); } } } diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index ffa6b02c236..954cb4fd5ee 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -21,6 +21,7 @@ package io.druid.query; import com.google.common.base.Function; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -53,6 +54,16 @@ import java.util.List; */ public class QueryRunnerTestHelper { + + public static final QueryWatcher DUMMY_QUERYWATCHER = new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }; + public static final String segmentId = "testSegment"; public static final String dataSource = "testing"; public static final UnionDataSource unionDataSource = new UnionDataSource( diff --git a/processing/src/test/java/io/druid/query/TestQueryRunners.java b/processing/src/test/java/io/druid/query/TestQueryRunners.java index a858b5e0cdf..90d394f3e3b 100644 --- a/processing/src/test/java/io/druid/query/TestQueryRunners.java +++ b/processing/src/test/java/io/druid/query/TestQueryRunners.java @@ -41,14 +41,11 @@ public class TestQueryRunners Segment adapter ) { - QueryRunnerFactory factory = new TopNQueryRunnerFactory(pool, new TopNQueryQueryToolChest(topNConfig), new QueryWatcher() - { - @Override - public void registerQuery(Query query, ListenableFuture future) - { - - } - }); + QueryRunnerFactory factory = new TopNQueryRunnerFactory( + pool, + new TopNQueryQueryToolChest(topNConfig), + QueryRunnerTestHelper.DUMMY_QUERYWATCHER + ); return new FinalizeResultsQueryRunner( factory.createRunner(adapter), factory.getToolchest() diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java index 39af4459794..f06258a9a09 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java @@ -72,14 +72,7 @@ public class TopNQueryRunnerTest new TopNQueryRunnerFactory( TestQueryRunners.getPool(), new TopNQueryQueryToolChest(new TopNQueryConfig()), - new QueryWatcher() - { - @Override - public void registerQuery(Query query, ListenableFuture future) - { - - } - } + QueryRunnerTestHelper.DUMMY_QUERYWATCHER ) ) ); @@ -97,14 +90,7 @@ public class TopNQueryRunnerTest } ), new TopNQueryQueryToolChest(new TopNQueryConfig()), - new QueryWatcher() - { - @Override - public void registerQuery(Query query, ListenableFuture future) - { - - } - } + QueryRunnerTestHelper.DUMMY_QUERYWATCHER ) ) ); diff --git a/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java index 1fdc3b11cf5..d2e3e5dea73 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java @@ -23,9 +23,12 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; import io.druid.collections.StupidPool; +import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; @@ -65,7 +68,8 @@ public class TopNUnionQueryTest QueryRunnerTestHelper.makeUnionQueryRunners( new TopNQueryRunnerFactory( TestQueryRunners.getPool(), - new TopNQueryQueryToolChest(new TopNQueryConfig()) + new TopNQueryQueryToolChest(new TopNQueryConfig()), + QueryRunnerTestHelper.DUMMY_QUERYWATCHER ) ) ); @@ -82,7 +86,8 @@ public class TopNUnionQueryTest } } ), - new TopNQueryQueryToolChest(new TopNQueryConfig()) + new TopNQueryQueryToolChest(new TopNQueryConfig()), + QueryRunnerTestHelper.DUMMY_QUERYWATCHER ) ) ); diff --git a/server/src/test/java/io/druid/client/DirectDruidClientTest.java b/server/src/test/java/io/druid/client/DirectDruidClientTest.java index 84a80058d35..b2396688900 100644 --- a/server/src/test/java/io/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/io/druid/client/DirectDruidClientTest.java @@ -44,16 +44,14 @@ import io.druid.query.Result; import io.druid.query.timeboundary.TimeBoundaryQuery; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; -import junit.framework.Assert; import org.easymock.EasyMock; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.timeout.ReadTimeoutException; import org.joda.time.DateTime; import org.joda.time.Interval; -import org.junit.Rule; +import org.junit.Assert; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -71,9 +69,6 @@ public class DirectDruidClientTest } }; - @Rule - public ExpectedException thrown = ExpectedException.none(); - @Test public void testRun() throws Exception { @@ -220,8 +215,13 @@ public class DirectDruidClientTest Assert.assertEquals(0, client1.getNumOpenConnections()); - thrown.expect(QueryInterruptedException.class); - Assert.assertTrue(Sequences.toList(results, Lists.newArrayList()).isEmpty()); + QueryInterruptedException exception = null; + try { + Sequences.toList(results, Lists.newArrayList()); + } catch(QueryInterruptedException e) { + exception = e; + } + Assert.assertNotNull(exception); EasyMock.verify(httpClient); } From 97d5455f3a455fecbb1296f60fdf038ef040f862 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 3 Jun 2014 11:15:03 -0700 Subject: [PATCH 12/19] properly kill timed out queries --- .../query/ChainedExecutionQueryRunner.java | 2 + .../ChainedExecutionQueryRunnerTest.java | 105 ++++++++++++++++++ 2 files changed, 107 insertions(+) diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 256cefa33a0..8a5ed51a4df 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -172,6 +172,8 @@ public class ChainedExecutionQueryRunner implements QueryRunner throw new QueryInterruptedException("Query cancelled"); } catch(TimeoutException e) { + log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); + futures.cancel(true); throw new QueryInterruptedException("Query timeout"); } catch (ExecutionException e) { diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java index b5391605d32..f2555dd7214 100644 --- a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -19,6 +19,7 @@ package io.druid.query; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.util.concurrent.ListenableFuture; @@ -146,6 +147,110 @@ public class ChainedExecutionQueryRunnerTest EasyMock.verify(watcher); } + @Test + public void testQueryTimeout() throws Exception + { + ExecutorService exec = PrioritizedExecutorService.create( + new Lifecycle(), new ExecutorServiceConfig() + { + @Override + public String getFormatString() + { + return "test"; + } + + @Override + public int getNumThreads() + { + return 2; + } + } + ); + + final CountDownLatch queriesStarted = new CountDownLatch(2); + final CountDownLatch queryIsRegistered = new CountDownLatch(1); + + Capture capturedFuture = new Capture<>(); + QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class); + watcher.registerQuery(EasyMock.anyObject(), EasyMock.and(EasyMock.anyObject(), EasyMock.capture(capturedFuture))); + EasyMock.expectLastCall() + .andAnswer( + new IAnswer() + { + @Override + public Void answer() throws Throwable + { + queryIsRegistered.countDown(); + return null; + } + } + ) + .once(); + + EasyMock.replay(watcher); + + DyingQueryRunner runner1 = new DyingQueryRunner(queriesStarted); + DyingQueryRunner runner2 = new DyingQueryRunner(queriesStarted); + DyingQueryRunner runner3 = new DyingQueryRunner(queriesStarted); + ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>( + exec, + Ordering.natural(), + watcher, + Lists.>newArrayList( + runner1, + runner2, + runner3 + ) + ); + + final Sequence seq = chainedRunner.run( + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .intervals("2014/2015") + .aggregators(Lists.newArrayList(new CountAggregatorFactory("count"))) + .context(ImmutableMap.of("timeout", (100), "queryId", "test")) + .build() + ); + + Future resultFuture = Executors.newFixedThreadPool(1).submit( + new Runnable() + { + @Override + public void run() + { + Sequences.toList(seq, Lists.newArrayList()); + } + } + ); + + // wait for query to register and start + Assert.assertTrue(queryIsRegistered.await(1, TimeUnit.SECONDS)); + Assert.assertTrue(queriesStarted.await(1, TimeUnit.SECONDS)); + + // cancel the query + Assert.assertTrue(capturedFuture.hasCaptured()); + ListenableFuture future = capturedFuture.getValue(); + + QueryInterruptedException cause = null; + try { + resultFuture.get(); + } catch(ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof QueryInterruptedException); + Assert.assertEquals("Query timeout", e.getCause().getMessage()); + cause = (QueryInterruptedException)e.getCause(); + } + Assert.assertNotNull(cause); + Assert.assertTrue(future.isCancelled()); + Assert.assertTrue(runner1.hasStarted); + Assert.assertTrue(runner2.hasStarted); + Assert.assertFalse(runner3.hasStarted); + Assert.assertFalse(runner1.hasCompleted); + Assert.assertFalse(runner2.hasCompleted); + Assert.assertFalse(runner3.hasCompleted); + + EasyMock.verify(watcher); + } + private static class DyingQueryRunner implements QueryRunner { private final CountDownLatch latch; From 99c9a2cf05fc7065a333a6b943cfb3e73fcd29e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 3 Jun 2014 11:35:26 -0700 Subject: [PATCH 13/19] make sure to close yielder in MetricsEmittingQueryRunner --- .../query/MetricsEmittingQueryRunner.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java index 8425aa97fe2..dbad443cb36 100644 --- a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java @@ -167,18 +167,20 @@ public class MetricsEmittingQueryRunner implements QueryRunner @Override public void close() throws IOException { - if (!isDone() && builder.getUser10() == null) { - builder.setUser10("short"); + try { + if (!isDone() && builder.getUser10() == null) { + builder.setUser10("short"); + } + + long timeTaken = System.currentTimeMillis() - startTime; + emitter.emit(builder.build("query/time", timeTaken)); + + if (creationTime > 0) { + emitter.emit(builder.build("query/wait", startTime - creationTime)); + } + } finally { + yielder.close(); } - - long timeTaken = System.currentTimeMillis() - startTime; - emitter.emit(builder.build("query/time", timeTaken)); - - if(creationTime > 0) { - emitter.emit(builder.build("query/wait", startTime - creationTime)); - } - - yielder.close(); } }; } From c08002aa4d9b2c5892ee090833d5cd33ac827e3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 3 Jun 2014 17:32:14 -0700 Subject: [PATCH 14/19] interrupt queries on incremental indexer --- .../segment/incremental/IncrementalIndexStorageAdapter.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 3fe807b2761..057754e6087 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -201,6 +201,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } while (baseIter.hasNext()) { + if (Thread.interrupted()) { + throw new QueryInterruptedException(); + } + currEntry.set(baseIter.next()); if (filterMatcher.matches()) { From b84884ab7677a34485a7bc6d961cf8d1d29eb043 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 9 Jun 2014 13:48:01 -0700 Subject: [PATCH 15/19] remove methods used only for testing --- .../select/SelectQueryRunnerFactory.java | 15 ---------- .../TimeseriesQueryRunnerFactory.java | 19 ------------ .../io/druid/query/QueryRunnerTestHelper.java | 2 +- .../java/io/druid/query/TestQueryRunners.java | 30 +++++++------------ .../query/search/SearchQueryRunnerTest.java | 12 +++----- .../query/select/SelectQueryRunnerTest.java | 10 ++++++- .../TimeBoundaryQueryRunnerTest.java | 9 +----- .../TimeSeriesUnionQueryRunnerTest.java | 7 ++++- .../TimeseriesQueryRunnerBonusTest.java | 9 +++++- .../timeseries/TimeseriesQueryRunnerTest.java | 7 ++++- .../druid/query/topn/TopNQueryRunnerTest.java | 7 ++--- .../druid/query/topn/TopNUnionQueryTest.java | 7 ++--- .../filter/SpatialFilterBonusTest.java | 20 +++++++++++-- .../segment/filter/SpatialFilterTest.java | 20 +++++++++++-- .../server/coordination/ServerManager.java | 1 - .../druid/client/DirectDruidClientTest.java | 16 +++------- 16 files changed, 88 insertions(+), 103 deletions(-) diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java index a1fa77dabd5..72cce700a6d 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java @@ -41,21 +41,6 @@ import java.util.concurrent.ExecutorService; public class SelectQueryRunnerFactory implements QueryRunnerFactory, SelectQuery> { - public static SelectQueryRunnerFactory create(ObjectMapper jsonMapper) - { - return new SelectQueryRunnerFactory( - new SelectQueryQueryToolChest(new QueryConfig(), jsonMapper), - new SelectQueryEngine(), - new QueryWatcher() - { - @Override - public void registerQuery(Query query, ListenableFuture future) - { - } - } - ); - } - private final SelectQueryQueryToolChest toolChest; private final SelectQueryEngine engine; private final QueryWatcher queryWatcher; diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java index 726bc20bb43..724d4818226 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java @@ -41,25 +41,6 @@ import java.util.concurrent.ExecutorService; public class TimeseriesQueryRunnerFactory implements QueryRunnerFactory, TimeseriesQuery> { - /** - * Use only for testing - * @return - */ - public static TimeseriesQueryRunnerFactory create() - { - return new TimeseriesQueryRunnerFactory( - new TimeseriesQueryQueryToolChest(new QueryConfig()), - new TimeseriesQueryEngine(), - new QueryWatcher() - { - @Override - public void registerQuery(Query query, ListenableFuture future) - { - } - } - ); - } - private final TimeseriesQueryQueryToolChest toolChest; private final TimeseriesQueryEngine engine; private final QueryWatcher queryWatcher; diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index 954cb4fd5ee..55c29752ac1 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -55,7 +55,7 @@ import java.util.List; public class QueryRunnerTestHelper { - public static final QueryWatcher DUMMY_QUERYWATCHER = new QueryWatcher() + public static final QueryWatcher NOOP_QUERYWATCHER = new QueryWatcher() { @Override public void registerQuery(Query query, ListenableFuture future) diff --git a/processing/src/test/java/io/druid/query/TestQueryRunners.java b/processing/src/test/java/io/druid/query/TestQueryRunners.java index 90d394f3e3b..f50e81d038e 100644 --- a/processing/src/test/java/io/druid/query/TestQueryRunners.java +++ b/processing/src/test/java/io/druid/query/TestQueryRunners.java @@ -7,6 +7,8 @@ import io.druid.query.search.SearchQueryQueryToolChest; import io.druid.query.search.SearchQueryRunnerFactory; import io.druid.query.search.search.SearchQueryConfig; import io.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory; +import io.druid.query.timeseries.TimeseriesQueryEngine; +import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; import io.druid.query.topn.TopNQueryConfig; import io.druid.query.topn.TopNQueryQueryToolChest; @@ -44,7 +46,7 @@ public class TestQueryRunners QueryRunnerFactory factory = new TopNQueryRunnerFactory( pool, new TopNQueryQueryToolChest(topNConfig), - QueryRunnerTestHelper.DUMMY_QUERYWATCHER + QueryRunnerTestHelper.NOOP_QUERYWATCHER ); return new FinalizeResultsQueryRunner( factory.createRunner(adapter), @@ -56,7 +58,12 @@ public class TestQueryRunners Segment adapter ) { - QueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create(); + QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + return new FinalizeResultsQueryRunner( factory.createRunner(adapter), factory.getToolchest() @@ -67,14 +74,7 @@ public class TestQueryRunners Segment adapter ) { - QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()), new QueryWatcher() - { - @Override - public void registerQuery(Query query, ListenableFuture future) - { - - } - }); + QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()), QueryRunnerTestHelper.NOOP_QUERYWATCHER); return new FinalizeResultsQueryRunner( factory.createRunner(adapter), factory.getToolchest() @@ -85,18 +85,10 @@ public class TestQueryRunners Segment adapter ) { - QueryRunnerFactory factory = new TimeBoundaryQueryRunnerFactory(new QueryWatcher() - { - @Override - public void registerQuery(Query query, ListenableFuture future) - { - - } - }); + QueryRunnerFactory factory = new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER); return new FinalizeResultsQueryRunner( factory.createRunner(adapter), factory.getToolchest() ); } - } diff --git a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java index 0740333eed5..c69ee1c5a27 100644 --- a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java @@ -59,14 +59,10 @@ public class SearchQueryRunnerTest public static Collection constructorFeeder() throws IOException { return QueryRunnerTestHelper.makeQueryRunners( - new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()), new QueryWatcher() - { - @Override - public void registerQuery(Query query, ListenableFuture future) - { - - } - }) + new SearchQueryRunnerFactory( + new SearchQueryQueryToolChest(new SearchQueryConfig()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ) ); } diff --git a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java index 5015239870e..07f99165873 100644 --- a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java @@ -22,11 +22,15 @@ package io.druid.query.select; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.ISE; import com.metamx.common.guava.Sequences; import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Query; +import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.query.TableDataSource; import io.druid.query.filter.SelectorDimFilter; @@ -54,7 +58,11 @@ public class SelectQueryRunnerTest public static Collection constructorFeeder() throws IOException { return QueryRunnerTestHelper.makeQueryRunners( - SelectQueryRunnerFactory.create(new DefaultObjectMapper()) + new SelectQueryRunnerFactory( + new SelectQueryQueryToolChest(new QueryConfig(), new DefaultObjectMapper()), + new SelectQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ) ); } diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java index de5ac1281b2..7bc499dca80 100644 --- a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java @@ -46,14 +46,7 @@ public class TimeBoundaryQueryRunnerTest public static Collection constructorFeeder() throws IOException { return QueryRunnerTestHelper.makeQueryRunners( - new TimeBoundaryQueryRunnerFactory(new QueryWatcher() - { - @Override - public void registerQuery(Query query, ListenableFuture future) - { - - } - }) + new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER) ); } diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java index 36e1fc13955..17d61908c3c 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.metamx.common.guava.Sequences; import io.druid.query.Druids; +import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; @@ -46,7 +47,11 @@ public class TimeSeriesUnionQueryRunnerTest public static Collection constructorFeeder() throws IOException { return QueryRunnerTestHelper.makeUnionQueryRunners( - TimeseriesQueryRunnerFactory.create() + new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ) ); } diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java index d1497a19026..67c91b4be40 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java @@ -28,8 +28,10 @@ import io.druid.granularity.QueryGranularity; import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -87,7 +89,12 @@ public class TimeseriesQueryRunnerBonusTest private static List> runTimeseriesCount(IncrementalIndex index) { - final QueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create(); + final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + final QueryRunner> runner = makeQueryRunner( factory, new IncrementalIndexSegment(index, null) diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java index cfc26c4326a..708a7de1054 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java @@ -26,6 +26,7 @@ import com.metamx.common.guava.Sequences; import io.druid.granularity.PeriodGranularity; import io.druid.granularity.QueryGranularity; import io.druid.query.Druids; +import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; @@ -62,7 +63,11 @@ public class TimeseriesQueryRunnerTest public static Collection constructorFeeder() throws IOException { return QueryRunnerTestHelper.makeQueryRunners( - TimeseriesQueryRunnerFactory.create() + new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ) ); } diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java index f06258a9a09..09d383168cf 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java @@ -23,15 +23,12 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.guava.Sequences; import io.druid.collections.StupidPool; import io.druid.query.BySegmentResultValueClass; import io.druid.query.Druids; -import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; -import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; @@ -72,7 +69,7 @@ public class TopNQueryRunnerTest new TopNQueryRunnerFactory( TestQueryRunners.getPool(), new TopNQueryQueryToolChest(new TopNQueryConfig()), - QueryRunnerTestHelper.DUMMY_QUERYWATCHER + QueryRunnerTestHelper.NOOP_QUERYWATCHER ) ) ); @@ -90,7 +87,7 @@ public class TopNQueryRunnerTest } ), new TopNQueryQueryToolChest(new TopNQueryConfig()), - QueryRunnerTestHelper.DUMMY_QUERYWATCHER + QueryRunnerTestHelper.NOOP_QUERYWATCHER ) ) ); diff --git a/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java index d2e3e5dea73..7dc7b645cad 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java @@ -23,12 +23,9 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ListenableFuture; import io.druid.collections.StupidPool; -import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; -import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; @@ -69,7 +66,7 @@ public class TopNUnionQueryTest new TopNQueryRunnerFactory( TestQueryRunners.getPool(), new TopNQueryQueryToolChest(new TopNQueryConfig()), - QueryRunnerTestHelper.DUMMY_QUERYWATCHER + QueryRunnerTestHelper.NOOP_QUERYWATCHER ) ) ); @@ -87,7 +84,7 @@ public class TopNUnionQueryTest } ), new TopNQueryQueryToolChest(new TopNQueryConfig()), - QueryRunnerTestHelper.DUMMY_QUERYWATCHER + QueryRunnerTestHelper.NOOP_QUERYWATCHER ) ) ); diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java index 0eb327972ee..c8155526a89 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java @@ -29,13 +29,17 @@ import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.granularity.QueryGranularity; import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; +import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.filter.SpatialDimFilter; import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.timeseries.TimeseriesQueryEngine; +import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; import io.druid.query.timeseries.TimeseriesResultValue; import io.druid.segment.IncrementalIndexSegment; @@ -434,7 +438,12 @@ public class SpatialFilterBonusTest ) ); try { - TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create(); + TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + QueryRunner runner = new FinalizeResultsQueryRunner( factory.createRunner(segment), factory.getToolchest() @@ -516,7 +525,12 @@ public class SpatialFilterBonusTest ) ); try { - TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create(); + TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + QueryRunner runner = new FinalizeResultsQueryRunner( factory.createRunner(segment), factory.getToolchest() @@ -528,4 +542,4 @@ public class SpatialFilterBonusTest throw Throwables.propagate(e); } } -} \ No newline at end of file +} diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java index d342c12c577..84df58a260d 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java @@ -29,13 +29,17 @@ import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.granularity.QueryGranularity; import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; +import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.filter.SpatialDimFilter; import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.timeseries.TimeseriesQueryEngine; +import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; import io.druid.query.timeseries.TimeseriesResultValue; import io.druid.segment.IncrementalIndexSegment; @@ -464,7 +468,12 @@ public class SpatialFilterTest ) ); try { - TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create(); + TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + QueryRunner runner = new FinalizeResultsQueryRunner( factory.createRunner(segment), factory.getToolchest() @@ -546,7 +555,12 @@ public class SpatialFilterTest ) ); try { - TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create(); + TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + QueryRunner runner = new FinalizeResultsQueryRunner( factory.createRunner(segment), factory.getToolchest() @@ -558,4 +572,4 @@ public class SpatialFilterTest throw Throwables.propagate(e); } } -} \ No newline at end of file +} diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 537cc0145fb..6bc703297e5 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -21,7 +21,6 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; -import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; diff --git a/server/src/test/java/io/druid/client/DirectDruidClientTest.java b/server/src/test/java/io/druid/client/DirectDruidClientTest.java index b2396688900..4ad8ca5cd51 100644 --- a/server/src/test/java/io/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/io/druid/client/DirectDruidClientTest.java @@ -38,6 +38,7 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.query.Druids; import io.druid.query.Query; import io.druid.query.QueryInterruptedException; +import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryWatcher; import io.druid.query.ReflectionQueryToolChestWarehouse; import io.druid.query.Result; @@ -60,15 +61,6 @@ import java.util.List; public class DirectDruidClientTest { - public static final QueryWatcher DUMMY_WATCHER = new QueryWatcher() - { - @Override - public void registerQuery(Query query, ListenableFuture future) - { - - } - }; - @Test public void testRun() throws Exception { @@ -101,14 +93,14 @@ public class DirectDruidClientTest DirectDruidClient client1 = new DirectDruidClient( new ReflectionQueryToolChestWarehouse(), - DUMMY_WATCHER, + QueryRunnerTestHelper.NOOP_QUERYWATCHER, new DefaultObjectMapper(), httpClient, "foo" ); DirectDruidClient client2 = new DirectDruidClient( new ReflectionQueryToolChestWarehouse(), - DUMMY_WATCHER, + QueryRunnerTestHelper.NOOP_QUERYWATCHER, new DefaultObjectMapper(), httpClient, "foo2" @@ -196,7 +188,7 @@ public class DirectDruidClientTest DirectDruidClient client1 = new DirectDruidClient( new ReflectionQueryToolChestWarehouse(), - DUMMY_WATCHER, + QueryRunnerTestHelper.NOOP_QUERYWATCHER, new DefaultObjectMapper(), httpClient, "foo" From 8f7fd93491a6eb97661ebc7dbe7bf70683adeed2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 9 Jun 2014 14:34:44 -0700 Subject: [PATCH 16/19] add comments --- .../java/io/druid/query/QueryWatcher.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/processing/src/main/java/io/druid/query/QueryWatcher.java b/processing/src/main/java/io/druid/query/QueryWatcher.java index 0a76a54f23a..36394b11e34 100644 --- a/processing/src/main/java/io/druid/query/QueryWatcher.java +++ b/processing/src/main/java/io/druid/query/QueryWatcher.java @@ -21,7 +21,27 @@ package io.druid.query; import com.google.common.util.concurrent.ListenableFuture; +/** + * This interface is in a very early stage and should not be considered stable. + * + * The purpose of the QueryWatcher is to give overall visibility into queries running + * or pending at the QueryRunner level. This is currently used to cancel all the + * parts of a pending query, but may be expanded in the future to offer more direct + * visibility into query execution and resource usage. + * + * QueryRunners executing any computation asynchronously must register their queries + * with the QueryWatcher. + * + */ public interface QueryWatcher { + /** + * QueryRunners must use this method to register any pending queries. + * + * The given future may have cancel(true) called at any time, if cancellation of this query has been requested. + * + * @param query a query, which may be a subset of a larger query, as long as the underlying queryId is unchanged + * @param future the future holding the execution status of the query + */ public void registerQuery(Query query, ListenableFuture future); } From 1fb9b21cf0c5206e1f8ccf6ea1d5f771157488e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 9 Jun 2014 17:22:50 -0700 Subject: [PATCH 17/19] async servlet delete support + cleanup --- .../io/druid/client/RoutingDruidClient.java | 22 +- .../server/AsyncQueryForwardingServlet.java | 529 +++++++++--------- 2 files changed, 280 insertions(+), 271 deletions(-) diff --git a/server/src/main/java/io/druid/client/RoutingDruidClient.java b/server/src/main/java/io/druid/client/RoutingDruidClient.java index 10170fcfb9e..79ae1c16f6d 100644 --- a/server/src/main/java/io/druid/client/RoutingDruidClient.java +++ b/server/src/main/java/io/druid/client/RoutingDruidClient.java @@ -22,6 +22,7 @@ package io.druid.client; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMultimap; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -30,7 +31,9 @@ import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.HttpResponseHandler; import io.druid.guice.annotations.Client; import io.druid.query.Query; +import io.druid.server.QueryResource; import io.druid.server.router.Router; +import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.handler.codec.http.HttpHeaders; import javax.inject.Inject; @@ -68,7 +71,7 @@ public class RoutingDruidClient return openConnections.get(); } - public ListenableFuture post( + public ListenableFuture postQuery( String url, Query query, HttpResponseHandler responseHandler @@ -81,7 +84,7 @@ public class RoutingDruidClient future = httpClient .post(new URL(url)) .setContent(objectMapper.writeValueAsBytes(query)) - .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? "application/smile" : "application/json") + .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? QueryResource.APPLICATION_SMILE : QueryResource.APPLICATION_JSON) .go(responseHandler); openConnections.getAndIncrement(); @@ -125,4 +128,19 @@ public class RoutingDruidClient throw Throwables.propagate(e); } } + + public ListenableFuture delete( + String url, + HttpResponseHandler responseHandler + ) + { + try { + return httpClient + .delete(new URL(url)) + .go(responseHandler); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } } diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index 5360d529ad3..3d82cc013ea 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -21,7 +21,11 @@ package io.druid.server; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.google.common.base.Function; import com.google.common.base.Joiner; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableMap; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; @@ -37,19 +41,20 @@ import io.druid.server.log.RequestLogger; import io.druid.server.router.QueryHostFinder; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.handler.codec.http.HttpChunk; +import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpResponse; import org.joda.time.DateTime; +import javax.annotation.Nullable; import javax.servlet.AsyncContext; import javax.servlet.ServletException; -import javax.servlet.ServletOutputStream; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.OutputStream; -import java.nio.charset.Charset; +import java.util.Map; import java.util.UUID; /** @@ -59,8 +64,6 @@ import java.util.UUID; public class AsyncQueryForwardingServlet extends HttpServlet { private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class); - private static final Charset UTF8 = Charset.forName("UTF-8"); - private static final String DISPATCHED = "dispatched"; private static final Joiner COMMA_JOIN = Joiner.on(","); private final ObjectMapper jsonMapper; @@ -88,275 +91,161 @@ public class AsyncQueryForwardingServlet extends HttpServlet } @Override - protected void doGet(final HttpServletRequest req, final HttpServletResponse resp) + protected void doGet(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException { - OutputStream out = null; - AsyncContext ctx = null; - - try { - ctx = req.startAsync(req, resp); - final AsyncContext asyncContext = ctx; - - if (req.getAttribute(DISPATCHED) != null) { - return; - } - - out = resp.getOutputStream(); - final OutputStream outputStream = out; - - final String host = hostFinder.getDefaultHost(); - - final HttpResponseHandler responseHandler = new HttpResponseHandler() - { - @Override - public ClientResponse handleResponse(HttpResponse response) + final AsyncContext asyncContext = req.startAsync(req, res); + asyncContext.start( + new Runnable() { - resp.setStatus(response.getStatus().getCode()); - resp.setContentType("application/json"); - - try { - ChannelBuffer buf = response.getContent(); - buf.readBytes(outputStream, buf.readableBytes()); - } - catch (Exception e) { - asyncContext.complete(); - throw Throwables.propagate(e); - } - - return ClientResponse.finished(outputStream); - } - - @Override - public ClientResponse handleChunk( - ClientResponse clientResponse, HttpChunk chunk - ) - { - try { - ChannelBuffer buf = chunk.getContent(); - buf.readBytes(outputStream, buf.readableBytes()); - } - catch (Exception e) { - asyncContext.complete(); - throw Throwables.propagate(e); - } - return clientResponse; - } - - @Override - public ClientResponse done(ClientResponse clientResponse) - { - final OutputStream obj = clientResponse.getObj(); - try { - resp.flushBuffer(); - outputStream.close(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - finally { - asyncContext.complete(); - } - - return ClientResponse.finished(obj); - } - - @Override - public void exceptionCaught( - ClientResponse clientResponse, - Throwable e - ) - { - handleException(resp, asyncContext, e); - } - }; - - asyncContext.start( - new Runnable() + @Override + public void run() { - @Override - public void run() - { - routingDruidClient.get(makeUrl(host, req), responseHandler); + try { + final HttpResponseHandler responseHandler = new PassthroughHttpResponseHandler(asyncContext, jsonMapper); + + final String host = hostFinder.getDefaultHost(); + routingDruidClient.get(makeUrl(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler); + } + catch (Exception e) { + handleException(jsonMapper, asyncContext, e); } } - ); - - asyncContext.dispatch(); - req.setAttribute(DISPATCHED, true); - } - catch (Exception e) { - handleException(resp, ctx, e); - } + } + ); } @Override - protected void doPost( - final HttpServletRequest req, final HttpServletResponse resp - ) throws ServletException, IOException + protected void doDelete(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException { - final long start = System.currentTimeMillis(); - Query query = null; - String queryId; - - final boolean isSmile = "application/smile".equals(req.getContentType()); - - final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; - - OutputStream out = null; - AsyncContext ctx = null; - - try { - ctx = req.startAsync(req, resp); - final AsyncContext asyncContext = ctx; - - if (req.getAttribute(DISPATCHED) != null) { - return; - } - - query = objectMapper.readValue(req.getInputStream(), Query.class); - queryId = query.getId(); - if (queryId == null) { - queryId = UUID.randomUUID().toString(); - query = query.withId(queryId); - } - - if (log.isDebugEnabled()) { - log.debug("Got query [%s]", query); - } - - out = resp.getOutputStream(); - final OutputStream outputStream = out; - - final String host = hostFinder.getHost(query); - - final Query theQuery = query; - final String theQueryId = queryId; - - final HttpResponseHandler responseHandler = new HttpResponseHandler() - { - @Override - public ClientResponse handleResponse(HttpResponse response) + final AsyncContext asyncContext = req.startAsync(req, res); + asyncContext.start( + new Runnable() { - resp.setStatus(response.getStatus().getCode()); - resp.setContentType("application/x-javascript"); - - try { - ChannelBuffer buf = response.getContent(); - buf.readBytes(outputStream, buf.readableBytes()); - } - catch (Exception e) { - asyncContext.complete(); - throw Throwables.propagate(e); - } - return ClientResponse.finished(outputStream); - } - - @Override - public ClientResponse handleChunk( - ClientResponse clientResponse, HttpChunk chunk - ) - { - try { - ChannelBuffer buf = chunk.getContent(); - buf.readBytes(outputStream, buf.readableBytes()); - } - catch (Exception e) { - asyncContext.complete(); - throw Throwables.propagate(e); - } - return clientResponse; - } - - @Override - public ClientResponse done(ClientResponse clientResponse) - { - final long requestTime = System.currentTimeMillis() - start; - - log.debug("Request time: %d", requestTime); - - emitter.emit( - new ServiceMetricEvent.Builder() - .setUser2(DataSourceUtil.getMetricName(theQuery.getDataSource())) - .setUser4(theQuery.getType()) - .setUser5(COMMA_JOIN.join(theQuery.getIntervals())) - .setUser6(String.valueOf(theQuery.hasFilters())) - .setUser7(req.getRemoteAddr()) - .setUser8(theQueryId) - .setUser9(theQuery.getDuration().toPeriod().toStandardMinutes().toString()) - .build("request/time", requestTime) - ); - - final OutputStream obj = clientResponse.getObj(); - try { - requestLogger.log( - new RequestLogLine( - new DateTime(), - req.getRemoteAddr(), - theQuery, - new QueryStats(ImmutableMap.of("request/time", requestTime, "success", true)) - ) - ); - - resp.flushBuffer(); - outputStream.close(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - finally { - asyncContext.complete(); - } - - return ClientResponse.finished(obj); - } - - @Override - public void exceptionCaught( - ClientResponse clientResponse, - Throwable e - ) - { - handleException(resp, asyncContext, e); - } - }; - - asyncContext.start( - new Runnable() + @Override + public void run() { - @Override - public void run() - { - routingDruidClient.post(makeUrl(host, req), theQuery, responseHandler); + try { + final HttpResponseHandler responseHandler = new PassthroughHttpResponseHandler(asyncContext, jsonMapper); + + final String host = hostFinder.getDefaultHost(); + routingDruidClient.delete(makeUrl(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler); + } + catch (Exception e) { + handleException(jsonMapper, asyncContext, e); } } - ); + } + ); + } - asyncContext.dispatch(); - req.setAttribute(DISPATCHED, true); - } - catch (Exception e) { - handleException(resp, ctx, e); + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException + { + final long start = System.currentTimeMillis(); + final AsyncContext asyncContext = req.startAsync(req, res); + asyncContext.start( + new Runnable() + { + @Override + public void run() + { + final HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest(); - try { - requestLogger.log( - new RequestLogLine( - new DateTime(), - req.getRemoteAddr(), - query, - new QueryStats(ImmutableMap.of("success", false, "exception", e.toString())) - ) - ); - } - catch (Exception e2) { - log.error(e2, "Unable to log query [%s]!", query); - } + final boolean isSmile = QueryResource.APPLICATION_SMILE.equals(request.getContentType()); + final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; - log.makeAlert(e, "Exception handling request") - .addData("query", query) - .addData("peer", req.getRemoteAddr()) - .emit(); - } + Query inputQuery = null; + try { + inputQuery = objectMapper.readValue(request.getInputStream(), Query.class); + if (inputQuery.getId() == null) { + inputQuery = inputQuery.withId(UUID.randomUUID().toString()); + } + final Query query = inputQuery; + + if (log.isDebugEnabled()) { + log.debug("Got query [%s]", inputQuery); + } + + final HttpResponseHandler responseHandler = new PassthroughHttpResponseHandler( + asyncContext, + objectMapper + ) + { + @Override + public ClientResponse done(ClientResponse clientResponse) + { + final long requestTime = System.currentTimeMillis() - start; + log.debug("Request time: %d", requestTime); + + emitter.emit( + new ServiceMetricEvent.Builder() + .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) + .setUser4(query.getType()) + .setUser5(COMMA_JOIN.join(query.getIntervals())) + .setUser6(String.valueOf(query.hasFilters())) + .setUser7(request.getRemoteAddr()) + .setUser8(query.getId()) + .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()) + .build("request/time", requestTime) + ); + + try { + requestLogger.log( + new RequestLogLine( + new DateTime(), + request.getRemoteAddr(), + query, + new QueryStats( + ImmutableMap.of( + "request/time", + requestTime, + "success", + true + ) + ) + ) + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + + return super.done(clientResponse); + } + }; + + routingDruidClient.postQuery( + makeUrl(hostFinder.getHost(inputQuery), request), + inputQuery, + responseHandler + ); + } + catch (Exception e) { + handleException(objectMapper, asyncContext, e); + + try { + requestLogger.log( + new RequestLogLine( + new DateTime(), + request.getRemoteAddr(), + inputQuery, + new QueryStats(ImmutableMap.of("success", false, "exception", e.toString())) + ) + ); + } + catch (Exception logError) { + log.error(logError, "Unable to log query [%s]!", inputQuery); + } + + log.makeAlert(e, "Exception handling request") + .addData("query", inputQuery) + .addData("peer", request.getRemoteAddr()) + .emit(); + } + } + } + ); } private String makeUrl(final String host, final HttpServletRequest req) @@ -370,24 +259,126 @@ public class AsyncQueryForwardingServlet extends HttpServlet return String.format("http://%s%s?%s", host, requestURI, queryString); } - private static void handleException(HttpServletResponse resp, AsyncContext ctx, Throwable e) + private static void handleException(ObjectMapper objectMapper, AsyncContext asyncContext, Throwable exception) { try { - final ServletOutputStream out = resp.getOutputStream(); - if (!resp.isCommitted()) { - resp.setStatus(500); - resp.resetBuffer(); - out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8)); - out.write("\n".getBytes(UTF8)); + HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); + if (!response.isCommitted()) { + final String errorMessage = exception.getMessage() == null ? "null exception" : exception.getMessage(); + + response.resetBuffer(); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + objectMapper.writeValue( + response.getOutputStream(), + ImmutableMap.of( + "error", errorMessage + ) + ); + } + response.flushBuffer(); + } + catch (IOException e) { + Throwables.propagate(e); + } + finally { + asyncContext.complete(); + } + } + + private static class PassthroughHttpResponseHandler implements HttpResponseHandler + { + private final AsyncContext asyncContext; + private final ObjectMapper objectMapper; + private final OutputStream outputStream; + + public PassthroughHttpResponseHandler(AsyncContext asyncContext, ObjectMapper objectMapper) throws IOException + { + this.asyncContext = asyncContext; + this.objectMapper = objectMapper; + this.outputStream = asyncContext.getResponse().getOutputStream(); + } + + protected void copyStatusHeaders(HttpResponse clientResponse) + { + final HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); + response.setStatus(clientResponse.getStatus().getCode()); + response.setContentType(clientResponse.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + + FluentIterable.from(clientResponse.headers().entries()) + .filter(new Predicate>() + { + @Override + public boolean apply(@Nullable Map.Entry input) + { + return input.getKey().startsWith("X-Druid"); + } + } + ) + .transform( + new Function, Object>() + { + @Nullable + @Override + public Object apply(@Nullable Map.Entry input) + { + response.setHeader(input.getKey(), input.getValue()); + return null; + } + } + ) + .allMatch(Predicates.alwaysTrue()); + } + + @Override + public ClientResponse handleResponse(HttpResponse clientResponse) + { + copyStatusHeaders(clientResponse); + + try { + ChannelBuffer buf = clientResponse.getContent(); + buf.readBytes(outputStream, buf.readableBytes()); + } + catch (Exception e) { + throw Throwables.propagate(e); } - if (ctx != null) { - ctx.complete(); - } - resp.flushBuffer(); + return ClientResponse.finished(outputStream); } - catch (IOException e1) { - Throwables.propagate(e1); + + @Override + public ClientResponse handleChunk( + ClientResponse clientResponse, HttpChunk chunk + ) + { + try { + ChannelBuffer buf = chunk.getContent(); + buf.readBytes(outputStream, buf.readableBytes()); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + return clientResponse; + } + + @Override + public ClientResponse done(ClientResponse clientResponse) + { + asyncContext.complete(); + return ClientResponse.finished(clientResponse.getObj()); + } + + @Override + public void exceptionCaught( + ClientResponse clientResponse, + Throwable e + ) + { + // throwing an exception here may cause resource leak + try { + handleException(objectMapper, asyncContext, e); + } catch(Exception err) { + log.error(err, "Unable to handle exception response"); + } } } } From 6550cb1776101232cd17086052d46f00b2e3fd0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 10 Jun 2014 10:15:31 -0700 Subject: [PATCH 18/19] groupBy query cancellation --- .../query/GroupByParallelQueryRunner.java | 108 +++++++++++------- .../groupby/GroupByQueryRunnerFactory.java | 41 +++++-- .../query/groupby/GroupByQueryRunnerTest.java | 1 + .../GroupByTimeseriesQueryRunnerTest.java | 1 + 4 files changed, 105 insertions(+), 46 deletions(-) diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java index 51c663c6a2e..c9b14b6314b 100644 --- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -26,6 +26,10 @@ import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.Pair; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Sequence; @@ -39,37 +43,44 @@ import io.druid.segment.incremental.IncrementalIndex; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class GroupByParallelQueryRunner implements QueryRunner { private static final Logger log = new Logger(GroupByParallelQueryRunner.class); private final Iterable> queryables; - private final ExecutorService exec; + private final ListeningExecutorService exec; private final Ordering ordering; private final Supplier configSupplier; + private final QueryWatcher queryWatcher; + public GroupByParallelQueryRunner( ExecutorService exec, Ordering ordering, Supplier configSupplier, + QueryWatcher queryWatcher, QueryRunner... queryables ) { - this(exec, ordering, configSupplier, Arrays.asList(queryables)); + this(exec, ordering, configSupplier, queryWatcher, Arrays.asList(queryables)); } public GroupByParallelQueryRunner( ExecutorService exec, Ordering ordering, Supplier configSupplier, + QueryWatcher queryWatcher, Iterable> queryables ) { - this.exec = exec; + this.exec = MoreExecutors.listeningDecorator(exec); this.ordering = ordering; + this.queryWatcher = queryWatcher; this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); this.configSupplier = configSupplier; } @@ -88,48 +99,67 @@ public class GroupByParallelQueryRunner implements QueryRunner if (Iterables.isEmpty(queryables)) { log.warn("No queryables found."); } - List> futures = Lists.newArrayList( - Iterables.transform( - queryables, - new Function, Future>() - { - @Override - public Future apply(final QueryRunner input) - { - return exec.submit( - new AbstractPrioritizedCallable(priority) - { - @Override - public Boolean call() throws Exception - { - try { - input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); - return true; + ListenableFuture> futures = Futures.allAsList( + Lists.newArrayList( + Iterables.transform( + queryables, + new Function, ListenableFuture>() + { + @Override + public ListenableFuture apply(final QueryRunner input) + { + return exec.submit( + new AbstractPrioritizedCallable(priority) + { + @Override + public Boolean call() throws Exception + { + try { + input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); + return true; + } + catch (QueryInterruptedException e) { + throw Throwables.propagate(e); + } + catch (Exception e) { + log.error(e, "Exception with one of the sequences!"); + throw Throwables.propagate(e); + } + } } - catch (Exception e) { - log.error(e, "Exception with one of the sequences!"); - throw Throwables.propagate(e); - } - } - } - ); - } - } + ); + } + } + ) ) ); // Let the runners complete - for (Future future : futures) { - try { - future.get(); - } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - catch (ExecutionException e) { - throw new RuntimeException(e); + try { + queryWatcher.registerQuery(query, futures); + final Number timeout = query.getContextValue("timeout", (Number) null); + if(timeout == null) { + futures.get(); + } else { + futures.get(timeout.longValue(), TimeUnit.MILLISECONDS); } } + catch (InterruptedException e) { + log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); + futures.cancel(true); + throw new QueryInterruptedException("Query interrupted"); + } + catch(CancellationException e) { + throw new QueryInterruptedException("Query cancelled"); + } + catch(TimeoutException e) { + log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); + futures.cancel(true); + throw new QueryInterruptedException("Query timeout"); + } + catch (ExecutionException e) { + throw Throwables.propagate(e.getCause()); + } return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(null)); } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java index 714aad37925..e8634089c2f 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java @@ -24,42 +24,55 @@ import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.Ordering; import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.guava.ExecutorExecutingSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import com.metamx.common.logger.Logger; import io.druid.data.input.Row; import io.druid.query.ConcatQueryRunner; import io.druid.query.GroupByParallelQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.segment.Segment; import io.druid.segment.StorageAdapter; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** */ public class GroupByQueryRunnerFactory implements QueryRunnerFactory { private final GroupByQueryEngine engine; + private final QueryWatcher queryWatcher; private final Supplier config; private final GroupByQueryQueryToolChest toolChest; + private static final Logger log = new Logger(GroupByQueryRunnerFactory.class); + @Inject public GroupByQueryRunnerFactory( GroupByQueryEngine engine, + QueryWatcher queryWatcher, Supplier config, GroupByQueryQueryToolChest toolChest ) { this.engine = engine; + this.queryWatcher = queryWatcher; this.config = config; this.toolChest = toolChest; } @@ -71,8 +84,10 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory mergeRunners(final ExecutorService queryExecutor, Iterable> queryRunners) + public QueryRunner mergeRunners(final ExecutorService exec, Iterable> queryRunners) { + // mergeRunners should take ListeningExecutorService at some point + final ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator(exec); if (config.get().isSingleThreaded()) { return new ConcatQueryRunner( Sequences.map( @@ -88,7 +103,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory run(final Query query) { - Future> future = queryExecutor.submit( + ListenableFuture> future = queryExecutor.submit( new Callable>() { @Override @@ -102,13 +117,25 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory Date: Tue, 10 Jun 2014 10:24:18 -0700 Subject: [PATCH 19/19] segmentMetadata query cancellation --- .../SegmentMetadataQueryRunnerFactory.java | 48 +++++++++++++++---- .../query/metadata/SegmentAnalyzerTest.java | 3 +- .../metadata/SegmentMetadataQueryTest.java | 2 +- 3 files changed, 42 insertions(+), 11 deletions(-) diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index 9fa393c5c2f..c8e7208638c 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -22,15 +22,21 @@ package io.druid.query.metadata; import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.Maps; -import com.metamx.common.guava.ExecutorExecutingSequence; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.inject.Inject; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import com.metamx.common.logger.Logger; import io.druid.query.AbstractPrioritizedCallable; import io.druid.query.ConcatQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.query.metadata.metadata.ColumnAnalysis; import io.druid.query.metadata.metadata.ColumnIncluderator; import io.druid.query.metadata.metadata.SegmentAnalysis; @@ -40,16 +46,27 @@ import io.druid.segment.Segment; import java.util.Arrays; import java.util.Map; -import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory { private static final SegmentAnalyzer analyzer = new SegmentAnalyzer(); - private static final SegmentMetadataQueryQueryToolChest toolChest = new SegmentMetadataQueryQueryToolChest(); + private static final Logger log = new Logger(SegmentMetadataQueryRunnerFactory.class); + + private final QueryWatcher queryWatcher; + + @Inject + public SegmentMetadataQueryRunnerFactory( + QueryWatcher queryWatcher + ) + { + this.queryWatcher = queryWatcher; + } @Override public QueryRunner createRunner(final Segment segment) @@ -101,9 +118,10 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory mergeRunners( - final ExecutorService queryExecutor, Iterable> queryRunners + ExecutorService exec, Iterable> queryRunners ) { + final ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator(exec); return new ConcatQueryRunner( Sequences.map( Sequences.simple(queryRunners), @@ -118,7 +136,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory run(final Query query) { final int priority = query.getContextPriority(0); - Future> future = queryExecutor.submit( + ListenableFuture> future = queryExecutor.submit( new AbstractPrioritizedCallable>(priority) { @Override @@ -129,13 +147,25 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory getSegmentAnalysises(Segment index) { final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner( - (QueryRunnerFactory) new SegmentMetadataQueryRunnerFactory(), index + (QueryRunnerFactory) new SegmentMetadataQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER), index ); final SegmentMetadataQuery query = new SegmentMetadataQuery( diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java index 5d596292348..ed1740460f8 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java @@ -45,7 +45,7 @@ public class SegmentMetadataQueryTest { @SuppressWarnings("unchecked") private final QueryRunner runner = makeQueryRunner( - new SegmentMetadataQueryRunnerFactory() + new SegmentMetadataQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER) ); private ObjectMapper mapper = new DefaultObjectMapper();