diff --git a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java index 38c3bc135ef..068bfecd963 100644 --- a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java @@ -32,6 +32,7 @@ import com.google.common.base.Throwables; import com.metamx.common.Granularity; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Yielder; import org.joda.time.DateTimeZone; import java.io.IOException; diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 31eac12702e..6c12d37669b 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -25,6 +25,10 @@ import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.ISE; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.MergeIterable; @@ -35,11 +39,8 @@ import com.metamx.common.logger.Logger; import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; /** * A QueryRunner that combines a list of other QueryRunners and executes them in parallel on an executor. @@ -59,27 +60,33 @@ public class ChainedExecutionQueryRunner implements QueryRunner private static final Logger log = new Logger(ChainedExecutionQueryRunner.class); private final Iterable> queryables; - private final ExecutorService exec; + private final ListeningExecutorService exec; private final Ordering ordering; + private final QueryWatcher queryWatcher; public ChainedExecutionQueryRunner( ExecutorService exec, Ordering ordering, + QueryWatcher queryWatcher, QueryRunner... queryables ) { - this(exec, ordering, Arrays.asList(queryables)); + this(exec, ordering, queryWatcher, Arrays.asList(queryables)); } public ChainedExecutionQueryRunner( ExecutorService exec, Ordering ordering, + QueryWatcher queryWatcher, Iterable> queryables ) { - this.exec = exec; + // listeningDecorator will leave PrioritizedExecutorService unchanged, + // since it already implements ListeningExecutorService + this.exec = MoreExecutors.listeningDecorator(exec); this.ordering = ordering; this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); + this.queryWatcher = queryWatcher; } @Override @@ -94,71 +101,67 @@ public class ChainedExecutionQueryRunner implements QueryRunner public Iterator make() { // Make it a List<> to materialize all of the values (so that it will submit everything to the executor) - List>> futures = Lists.newArrayList( - Iterables.transform( - queryables, - new Function, Future>>() - { - @Override - public Future> apply(final QueryRunner input) - { - return exec.submit( - new AbstractPrioritizedCallable>(priority) - { - @Override - public List call() throws Exception - { - try { - if (input == null) { - throw new ISE("Input is null?! How is this possible?!"); - } + ListenableFuture>> futures = Futures.allAsList( + Lists.newArrayList( + Iterables.transform( + queryables, + new Function, ListenableFuture>>() + { + @Override + public ListenableFuture> apply(final QueryRunner input) + { + return exec.submit( + new AbstractPrioritizedCallable>(priority) + { + @Override + public Iterable call() throws Exception + { + try { + if (input == null) { + throw new ISE("Input is null?! How is this possible?!"); + } - Sequence result = input.run(query); - if (result == null) { - throw new ISE("Got a null result! Segments are missing!"); - } + Sequence result = input.run(query); + if (result == null) { + throw new ISE("Got a null result! Segments are missing!"); + } - List retVal = Sequences.toList(result, Lists.newArrayList()); - if (retVal == null) { - throw new ISE("Got a null list of results! WTF?!"); - } + List retVal = Sequences.toList(result, Lists.newArrayList()); + if (retVal == null) { + throw new ISE("Got a null list of results! WTF?!"); + } - return retVal; + return retVal; + } + catch (Exception e) { + log.error(e, "Exception with one of the sequences!"); + throw Throwables.propagate(e); + } + } } - catch (Exception e) { - log.error(e, "Exception with one of the sequences!"); - throw Throwables.propagate(e); - } - } - } - ); - } - } + ); + } + } + ) ) ); - return new MergeIterable( - ordering.nullsFirst(), - Iterables.transform( - futures, - new Function>, Iterable>() - { - @Override - public Iterable apply(Future> input) - { - try { - return input.get(); - } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - } - ) - ).iterator(); + queryWatcher.registerQuery(query, futures); + + try { + return new MergeIterable<>( + ordering.nullsFirst(), + futures.get() + ).iterator(); + } + catch (InterruptedException e) { + log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); + futures.cancel(true); + throw new QueryInterruptedException(e); + } + catch (ExecutionException e) { + throw Throwables.propagate(e.getCause()); + } } @Override diff --git a/processing/src/main/java/io/druid/query/QueryInterruptedException.java b/processing/src/main/java/io/druid/query/QueryInterruptedException.java new file mode 100644 index 00000000000..7b889ef5f2d --- /dev/null +++ b/processing/src/main/java/io/druid/query/QueryInterruptedException.java @@ -0,0 +1,37 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query; + +public class QueryInterruptedException extends RuntimeException +{ + public QueryInterruptedException() { + super(); + } + + public QueryInterruptedException(String message) + { + super(message); + } + + public QueryInterruptedException(Throwable cause) + { + super(cause); + } +} diff --git a/processing/src/main/java/io/druid/query/QueryWatcher.java b/processing/src/main/java/io/druid/query/QueryWatcher.java new file mode 100644 index 00000000000..0a76a54f23a --- /dev/null +++ b/processing/src/main/java/io/druid/query/QueryWatcher.java @@ -0,0 +1,27 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query; + +import com.google.common.util.concurrent.ListenableFuture; + +public interface QueryWatcher +{ + public void registerQuery(Query query, ListenableFuture future); +} diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/search/SearchQueryRunnerFactory.java index 0c0fdab980d..2cd868e45d8 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryRunnerFactory.java @@ -24,6 +24,7 @@ import io.druid.query.ChainedExecutionQueryRunner; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.query.search.search.SearchQuery; import io.druid.segment.Segment; @@ -35,13 +36,16 @@ import java.util.concurrent.ExecutorService; public class SearchQueryRunnerFactory implements QueryRunnerFactory, SearchQuery> { private final SearchQueryQueryToolChest toolChest; + private final QueryWatcher queryWatcher; @Inject public SearchQueryRunnerFactory( - SearchQueryQueryToolChest toolChest + SearchQueryQueryToolChest toolChest, + QueryWatcher queryWatcher ) { this.toolChest = toolChest; + this.queryWatcher = queryWatcher; } @Override @@ -56,7 +60,7 @@ public class SearchQueryRunnerFactory implements QueryRunnerFactory>( - queryExecutor, toolChest.getOrdering(), queryRunners + queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners ); } diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java index 6e995b15f44..a1fa77dabd5 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java @@ -20,6 +20,7 @@ package io.druid.query.select; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.guava.Sequence; @@ -29,6 +30,7 @@ import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.segment.Segment; @@ -43,21 +45,31 @@ public class SelectQueryRunnerFactory { return new SelectQueryRunnerFactory( new SelectQueryQueryToolChest(new QueryConfig(), jsonMapper), - new SelectQueryEngine() + new SelectQueryEngine(), + new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + } + } ); } private final SelectQueryQueryToolChest toolChest; private final SelectQueryEngine engine; + private final QueryWatcher queryWatcher; @Inject public SelectQueryRunnerFactory( SelectQueryQueryToolChest toolChest, - SelectQueryEngine engine + SelectQueryEngine engine, + QueryWatcher queryWatcher ) { this.toolChest = toolChest; this.engine = engine; + this.queryWatcher = queryWatcher; } @Override @@ -72,7 +84,7 @@ public class SelectQueryRunnerFactory ) { return new ChainedExecutionQueryRunner>( - queryExecutor, toolChest.getOrdering(), queryRunners + queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners ); } diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java index 16e9ae832fa..1f78429ead3 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java @@ -19,6 +19,7 @@ package io.druid.query.timeboundary; +import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.Sequence; @@ -27,6 +28,7 @@ import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.segment.Segment; import io.druid.segment.StorageAdapter; @@ -40,6 +42,13 @@ public class TimeBoundaryQueryRunnerFactory implements QueryRunnerFactory, TimeBoundaryQuery> { private static final TimeBoundaryQueryQueryToolChest toolChest = new TimeBoundaryQueryQueryToolChest(); + private final QueryWatcher queryWatcher; + + @Inject + public TimeBoundaryQueryRunnerFactory(QueryWatcher queryWatcher) + { + this.queryWatcher = queryWatcher; + } @Override public QueryRunner> createRunner(final Segment segment) @@ -53,7 +62,7 @@ public class TimeBoundaryQueryRunnerFactory ) { return new ChainedExecutionQueryRunner>( - queryExecutor, toolChest.getOrdering(), queryRunners + queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners ); } diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java index 740b309e37e..726bc20bb43 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java @@ -19,6 +19,7 @@ package io.druid.query.timeseries; +import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.guava.Sequence; @@ -28,6 +29,7 @@ import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.segment.Segment; import io.druid.segment.StorageAdapter; @@ -39,25 +41,39 @@ import java.util.concurrent.ExecutorService; public class TimeseriesQueryRunnerFactory implements QueryRunnerFactory, TimeseriesQuery> { + /** + * Use only for testing + * @return + */ public static TimeseriesQueryRunnerFactory create() { return new TimeseriesQueryRunnerFactory( new TimeseriesQueryQueryToolChest(new QueryConfig()), - new TimeseriesQueryEngine() + new TimeseriesQueryEngine(), + new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + } + } ); } private final TimeseriesQueryQueryToolChest toolChest; private final TimeseriesQueryEngine engine; + private final QueryWatcher queryWatcher; @Inject public TimeseriesQueryRunnerFactory( TimeseriesQueryQueryToolChest toolChest, - TimeseriesQueryEngine engine + TimeseriesQueryEngine engine, + QueryWatcher queryWatcher ) { this.toolChest = toolChest; this.engine = engine; + this.queryWatcher = queryWatcher; } @Override @@ -72,7 +88,7 @@ public class TimeseriesQueryRunnerFactory ) { return new ChainedExecutionQueryRunner>( - queryExecutor, toolChest.getOrdering(), queryRunners + queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners ); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java index 044e6a64eb2..524f9ace6a5 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java @@ -30,6 +30,7 @@ import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.segment.Segment; @@ -43,15 +44,18 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory computationBufferPool; private final TopNQueryQueryToolChest toolchest; + private final QueryWatcher queryWatcher; @Inject public TopNQueryRunnerFactory( @Global StupidPool computationBufferPool, - TopNQueryQueryToolChest toolchest + TopNQueryQueryToolChest toolchest, + QueryWatcher queryWatcher ) { this.computationBufferPool = computationBufferPool; this.toolchest = toolchest; + this.queryWatcher = queryWatcher; } @Override @@ -79,7 +83,7 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory>( - queryExecutor, toolchest.getOrdering(), queryRunners + queryExecutor, toolchest.getOrdering(), queryWatcher, queryRunners ); } diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java new file mode 100644 index 00000000000..445bea9cf53 --- /dev/null +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -0,0 +1,148 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import com.google.common.util.concurrent.ListenableFuture; +import com.metamx.common.concurrent.ExecutorServiceConfig; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.lifecycle.Lifecycle; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class ChainedExecutionQueryRunnerTest +{ + @Test @Ignore + public void testQueryCancellation() throws Exception + { + ExecutorService exec = PrioritizedExecutorService.create( + new Lifecycle(), new ExecutorServiceConfig() + { + @Override + public String getFormatString() + { + return "test"; + } + + @Override + public int getNumThreads() + { + return 2; + } + } + ); + + final CountDownLatch queriesStarted = new CountDownLatch(2); + final CountDownLatch queryIsRegistered = new CountDownLatch(1); + + final Map queries = Maps.newHashMap(); + QueryWatcher watcher = new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + queries.put(query, future); + queryIsRegistered.countDown(); + } + }; + + ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>( + exec, + Ordering.natural(), + watcher, + Lists.>newArrayList( + new DyingQueryRunner(1, queriesStarted), + new DyingQueryRunner(2, queriesStarted), + new DyingQueryRunner(3, queriesStarted) + ) + ); + + final Sequence seq = chainedRunner.run( + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .intervals("2014/2015") + .aggregators(Lists.newArrayList(new CountAggregatorFactory("count"))) + .build() + ); + + Future f = Executors.newFixedThreadPool(1).submit( + new Runnable() + { + @Override + public void run() + { + Sequences.toList(seq, Lists.newArrayList()); + } + } + ); + + // wait for query to register + queryIsRegistered.await(); + queriesStarted.await(); + + // cancel the query + queries.values().iterator().next().cancel(true); + f.get(); + } + + private static class DyingQueryRunner implements QueryRunner + { + private final int id; + private final CountDownLatch latch; + + public DyingQueryRunner(int id, CountDownLatch latch) { + this.id = id; + this.latch = latch; + } + + @Override + public Sequence run(Query query) + { + latch.countDown(); + + int i = 0; + while (i >= 0) { + if(Thread.interrupted()) { + throw new QueryInterruptedException("I got killed"); + } + + // do a lot of work + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new QueryInterruptedException("I got killed"); + } + ++i; + } + return Sequences.simple(Lists.newArrayList(i)); + } + } +} diff --git a/processing/src/test/java/io/druid/query/TestQueryRunners.java b/processing/src/test/java/io/druid/query/TestQueryRunners.java index c4767c1c6f9..a858b5e0cdf 100644 --- a/processing/src/test/java/io/druid/query/TestQueryRunners.java +++ b/processing/src/test/java/io/druid/query/TestQueryRunners.java @@ -1,6 +1,7 @@ package io.druid.query; import com.google.common.base.Supplier; +import com.google.common.util.concurrent.ListenableFuture; import io.druid.collections.StupidPool; import io.druid.query.search.SearchQueryQueryToolChest; import io.druid.query.search.SearchQueryRunnerFactory; @@ -40,7 +41,14 @@ public class TestQueryRunners Segment adapter ) { - QueryRunnerFactory factory = new TopNQueryRunnerFactory(pool, new TopNQueryQueryToolChest(topNConfig)); + QueryRunnerFactory factory = new TopNQueryRunnerFactory(pool, new TopNQueryQueryToolChest(topNConfig), new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }); return new FinalizeResultsQueryRunner( factory.createRunner(adapter), factory.getToolchest() @@ -62,7 +70,14 @@ public class TestQueryRunners Segment adapter ) { - QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig())); + QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()), new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }); return new FinalizeResultsQueryRunner( factory.createRunner(adapter), factory.getToolchest() @@ -73,11 +88,18 @@ public class TestQueryRunners Segment adapter ) { - QueryRunnerFactory factory = new TimeBoundaryQueryRunnerFactory(); + QueryRunnerFactory factory = new TimeBoundaryQueryRunnerFactory(new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }); return new FinalizeResultsQueryRunner( factory.createRunner(adapter), factory.getToolchest() ); } -} \ No newline at end of file +} diff --git a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java index 2e5623b4365..0740333eed5 100644 --- a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java @@ -23,10 +23,13 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.guava.Sequences; import io.druid.query.Druids; +import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.query.filter.DimFilter; import io.druid.query.search.search.FragmentSearchQuerySpec; @@ -56,7 +59,14 @@ public class SearchQueryRunnerTest public static Collection constructorFeeder() throws IOException { return QueryRunnerTestHelper.makeQueryRunners( - new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig())) + new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()), new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }) ); } diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java index 9b92826ac34..de5ac1281b2 100644 --- a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java @@ -20,10 +20,13 @@ package io.druid.query.timeboundary; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.guava.Sequences; import io.druid.query.Druids; +import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import org.joda.time.DateTime; import org.junit.Assert; @@ -43,7 +46,14 @@ public class TimeBoundaryQueryRunnerTest public static Collection constructorFeeder() throws IOException { return QueryRunnerTestHelper.makeQueryRunners( - new TimeBoundaryQueryRunnerFactory() + new TimeBoundaryQueryRunnerFactory(new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }) ); } diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java index 22b750faf00..39af4459794 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java @@ -23,12 +23,15 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.guava.Sequences; import io.druid.collections.StupidPool; import io.druid.query.BySegmentResultValueClass; import io.druid.query.Druids; +import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; @@ -68,7 +71,15 @@ public class TopNQueryRunnerTest QueryRunnerTestHelper.makeQueryRunners( new TopNQueryRunnerFactory( TestQueryRunners.getPool(), - new TopNQueryQueryToolChest(new TopNQueryConfig()) + new TopNQueryQueryToolChest(new TopNQueryConfig()), + new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + } ) ) ); @@ -85,7 +96,15 @@ public class TopNQueryRunnerTest } } ), - new TopNQueryQueryToolChest(new TopNQueryConfig()) + new TopNQueryQueryToolChest(new TopNQueryConfig()), + new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + } ) ) ); diff --git a/server/src/main/java/io/druid/guice/QueryRunnerFactoryModule.java b/server/src/main/java/io/druid/guice/QueryRunnerFactoryModule.java index 6f4dc80b059..6c1bb62cafe 100644 --- a/server/src/main/java/io/druid/guice/QueryRunnerFactoryModule.java +++ b/server/src/main/java/io/druid/guice/QueryRunnerFactoryModule.java @@ -24,6 +24,7 @@ import com.google.inject.Binder; import com.google.inject.multibindings.MapBinder; import io.druid.query.Query; import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryWatcher; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryEngine; import io.druid.query.groupby.GroupByQueryRunnerFactory; @@ -39,6 +40,7 @@ import io.druid.query.timeseries.TimeseriesQuery; import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; import io.druid.query.topn.TopNQuery; import io.druid.query.topn.TopNQueryRunnerFactory; +import io.druid.server.QueryManager; import java.util.Map; @@ -62,6 +64,12 @@ public class QueryRunnerFactoryModule extends QueryToolChestModule { super.configure(binder); + binder.bind(QueryWatcher.class) + .to(QueryManager.class) + .in(LazySingleton.class); + binder.bind(QueryManager.class) + .in(LazySingleton.class); + final MapBinder, QueryRunnerFactory> queryFactoryBinder = DruidBinders.queryRunnerFactoryBinder( binder ); diff --git a/server/src/main/java/io/druid/server/QueryManager.java b/server/src/main/java/io/druid/server/QueryManager.java new file mode 100644 index 00000000000..5910aeda535 --- /dev/null +++ b/server/src/main/java/io/druid/server/QueryManager.java @@ -0,0 +1,48 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server; + +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ListenableFuture; +import io.druid.query.Query; +import io.druid.query.QueryWatcher; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; + +public class QueryManager implements QueryWatcher +{ + final ConcurrentMap queries; + + public QueryManager() { + this.queries = Maps.newConcurrentMap(); + } + + public void cancelQuery(String id) { + Future future = queries.get(id); + if(future != null) { + future.cancel(true); + } + } + public void registerQuery(Query query, ListenableFuture future) + { + queries.put(query.getId(), future); + } +} diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 6319a0d4633..e710a6c97c9 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -37,16 +37,20 @@ import io.druid.guice.annotations.Smile; import io.druid.query.DataSourceUtil; import io.druid.query.Query; import io.druid.query.QuerySegmentWalker; +import io.druid.query.QueryWatcher; import io.druid.server.log.RequestLogger; import org.joda.time.DateTime; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.DELETE; import javax.ws.rs.POST; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.core.Context; +import javax.ws.rs.core.Response; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.Charset; @@ -66,6 +70,7 @@ public class QueryResource private final QuerySegmentWalker texasRanger; private final ServiceEmitter emitter; private final RequestLogger requestLogger; + private final QueryManager queryManager; @Inject public QueryResource( @@ -73,7 +78,8 @@ public class QueryResource @Smile ObjectMapper smileMapper, QuerySegmentWalker texasRanger, ServiceEmitter emitter, - RequestLogger requestLogger + RequestLogger requestLogger, + QueryManager queryManager ) { this.jsonMapper = jsonMapper; @@ -81,6 +87,16 @@ public class QueryResource this.texasRanger = texasRanger; this.emitter = emitter; this.requestLogger = requestLogger; + this.queryManager = queryManager; + } + + @DELETE + @Path("{id}") + @Produces("application/json") + public Response getServer(@PathParam("id") String queryId) + { + queryManager.cancelQuery(queryId); + return Response.status(Response.Status.ACCEPTED).build(); } @POST @@ -124,10 +140,13 @@ public class QueryResource resp.setStatus(200); resp.setContentType("application/x-javascript"); + resp.setHeader("X-Druid-Query-Id", query.getId()); out = resp.getOutputStream(); jsonWriter.writeValue(out, results); +// JsonGenerator jgen = jsonWriter.getFactory().createGenerator(out); + long requestTime = System.currentTimeMillis() - start; emitter.emit(