initial query cancellation commit

This commit is contained in:
Xavier Léauté 2014-04-08 17:13:22 -07:00
parent ad7db018d5
commit d6f38827db
17 changed files with 481 additions and 84 deletions

View File

@ -32,6 +32,7 @@ import com.google.common.base.Throwables;
import com.metamx.common.Granularity;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Yielder;
import org.joda.time.DateTimeZone;
import java.io.IOException;

View File

@ -25,6 +25,10 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.MergeIterable;
@ -35,11 +39,8 @@ import com.metamx.common.logger.Logger;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* A QueryRunner that combines a list of other QueryRunners and executes them in parallel on an executor.
@ -59,27 +60,33 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
private static final Logger log = new Logger(ChainedExecutionQueryRunner.class);
private final Iterable<QueryRunner<T>> queryables;
private final ExecutorService exec;
private final ListeningExecutorService exec;
private final Ordering<T> ordering;
private final QueryWatcher queryWatcher;
public ChainedExecutionQueryRunner(
ExecutorService exec,
Ordering<T> ordering,
QueryWatcher queryWatcher,
QueryRunner<T>... queryables
)
{
this(exec, ordering, Arrays.asList(queryables));
this(exec, ordering, queryWatcher, Arrays.asList(queryables));
}
public ChainedExecutionQueryRunner(
ExecutorService exec,
Ordering<T> ordering,
QueryWatcher queryWatcher,
Iterable<QueryRunner<T>> queryables
)
{
this.exec = exec;
// listeningDecorator will leave PrioritizedExecutorService unchanged,
// since it already implements ListeningExecutorService
this.exec = MoreExecutors.listeningDecorator(exec);
this.ordering = ordering;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.queryWatcher = queryWatcher;
}
@Override
@ -94,71 +101,67 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
public Iterator<T> make()
{
// Make it a List<> to materialize all of the values (so that it will submit everything to the executor)
List<Future<List<T>>> futures = Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<T>, Future<List<T>>>()
{
@Override
public Future<List<T>> apply(final QueryRunner<T> input)
{
return exec.submit(
new AbstractPrioritizedCallable<List<T>>(priority)
{
@Override
public List<T> call() throws Exception
{
try {
if (input == null) {
throw new ISE("Input is null?! How is this possible?!");
}
ListenableFuture<List<Iterable<T>>> futures = Futures.allAsList(
Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<T>, ListenableFuture<Iterable<T>>>()
{
@Override
public ListenableFuture<Iterable<T>> apply(final QueryRunner<T> input)
{
return exec.submit(
new AbstractPrioritizedCallable<Iterable<T>>(priority)
{
@Override
public Iterable<T> call() throws Exception
{
try {
if (input == null) {
throw new ISE("Input is null?! How is this possible?!");
}
Sequence<T> result = input.run(query);
if (result == null) {
throw new ISE("Got a null result! Segments are missing!");
}
Sequence<T> result = input.run(query);
if (result == null) {
throw new ISE("Got a null result! Segments are missing!");
}
List<T> retVal = Sequences.toList(result, Lists.<T>newArrayList());
if (retVal == null) {
throw new ISE("Got a null list of results! WTF?!");
}
List<T> retVal = Sequences.toList(result, Lists.<T>newArrayList());
if (retVal == null) {
throw new ISE("Got a null list of results! WTF?!");
}
return retVal;
return retVal;
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
}
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
}
}
);
}
}
);
}
}
)
)
);
return new MergeIterable<T>(
ordering.nullsFirst(),
Iterables.transform(
futures,
new Function<Future<List<T>>, Iterable<T>>()
{
@Override
public Iterable<T> apply(Future<List<T>> input)
{
try {
return input.get();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
)
).iterator();
queryWatcher.registerQuery(query, futures);
try {
return new MergeIterable<>(
ordering.nullsFirst(),
futures.get()
).iterator();
}
catch (InterruptedException e) {
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException(e);
}
catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());
}
}
@Override

View File

@ -0,0 +1,37 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
public class QueryInterruptedException extends RuntimeException
{
public QueryInterruptedException() {
super();
}
public QueryInterruptedException(String message)
{
super(message);
}
public QueryInterruptedException(Throwable cause)
{
super(cause);
}
}

View File

@ -0,0 +1,27 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.google.common.util.concurrent.ListenableFuture;
public interface QueryWatcher
{
public void registerQuery(Query query, ListenableFuture future);
}

View File

@ -24,6 +24,7 @@ import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.query.search.search.SearchQuery;
import io.druid.segment.Segment;
@ -35,13 +36,16 @@ import java.util.concurrent.ExecutorService;
public class SearchQueryRunnerFactory implements QueryRunnerFactory<Result<SearchResultValue>, SearchQuery>
{
private final SearchQueryQueryToolChest toolChest;
private final QueryWatcher queryWatcher;
@Inject
public SearchQueryRunnerFactory(
SearchQueryQueryToolChest toolChest
SearchQueryQueryToolChest toolChest,
QueryWatcher queryWatcher
)
{
this.toolChest = toolChest;
this.queryWatcher = queryWatcher;
}
@Override
@ -56,7 +60,7 @@ public class SearchQueryRunnerFactory implements QueryRunnerFactory<Result<Searc
)
{
return new ChainedExecutionQueryRunner<Result<SearchResultValue>>(
queryExecutor, toolChest.getOrdering(), queryRunners
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
);
}

View File

@ -20,6 +20,7 @@
package io.druid.query.select;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
@ -29,6 +30,7 @@ import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.segment.Segment;
@ -43,21 +45,31 @@ public class SelectQueryRunnerFactory
{
return new SelectQueryRunnerFactory(
new SelectQueryQueryToolChest(new QueryConfig(), jsonMapper),
new SelectQueryEngine()
new SelectQueryEngine(),
new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
}
}
);
}
private final SelectQueryQueryToolChest toolChest;
private final SelectQueryEngine engine;
private final QueryWatcher queryWatcher;
@Inject
public SelectQueryRunnerFactory(
SelectQueryQueryToolChest toolChest,
SelectQueryEngine engine
SelectQueryEngine engine,
QueryWatcher queryWatcher
)
{
this.toolChest = toolChest;
this.engine = engine;
this.queryWatcher = queryWatcher;
}
@Override
@ -72,7 +84,7 @@ public class SelectQueryRunnerFactory
)
{
return new ChainedExecutionQueryRunner<Result<SelectResultValue>>(
queryExecutor, toolChest.getOrdering(), queryRunners
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
);
}

View File

@ -19,6 +19,7 @@
package io.druid.query.timeboundary;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.Sequence;
@ -27,6 +28,7 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
@ -40,6 +42,13 @@ public class TimeBoundaryQueryRunnerFactory
implements QueryRunnerFactory<Result<TimeBoundaryResultValue>, TimeBoundaryQuery>
{
private static final TimeBoundaryQueryQueryToolChest toolChest = new TimeBoundaryQueryQueryToolChest();
private final QueryWatcher queryWatcher;
@Inject
public TimeBoundaryQueryRunnerFactory(QueryWatcher queryWatcher)
{
this.queryWatcher = queryWatcher;
}
@Override
public QueryRunner<Result<TimeBoundaryResultValue>> createRunner(final Segment segment)
@ -53,7 +62,7 @@ public class TimeBoundaryQueryRunnerFactory
)
{
return new ChainedExecutionQueryRunner<Result<TimeBoundaryResultValue>>(
queryExecutor, toolChest.getOrdering(), queryRunners
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
);
}

View File

@ -19,6 +19,7 @@
package io.druid.query.timeseries;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
@ -28,6 +29,7 @@ import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
@ -39,25 +41,39 @@ import java.util.concurrent.ExecutorService;
public class TimeseriesQueryRunnerFactory
implements QueryRunnerFactory<Result<TimeseriesResultValue>, TimeseriesQuery>
{
/**
* Use only for testing
* @return
*/
public static TimeseriesQueryRunnerFactory create()
{
return new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine()
new TimeseriesQueryEngine(),
new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
}
}
);
}
private final TimeseriesQueryQueryToolChest toolChest;
private final TimeseriesQueryEngine engine;
private final QueryWatcher queryWatcher;
@Inject
public TimeseriesQueryRunnerFactory(
TimeseriesQueryQueryToolChest toolChest,
TimeseriesQueryEngine engine
TimeseriesQueryEngine engine,
QueryWatcher queryWatcher
)
{
this.toolChest = toolChest;
this.engine = engine;
this.queryWatcher = queryWatcher;
}
@Override
@ -72,7 +88,7 @@ public class TimeseriesQueryRunnerFactory
)
{
return new ChainedExecutionQueryRunner<Result<TimeseriesResultValue>>(
queryExecutor, toolChest.getOrdering(), queryRunners
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
);
}

View File

@ -30,6 +30,7 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.segment.Segment;
@ -43,15 +44,18 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory<Result<TopNRes
{
private final StupidPool<ByteBuffer> computationBufferPool;
private final TopNQueryQueryToolChest toolchest;
private final QueryWatcher queryWatcher;
@Inject
public TopNQueryRunnerFactory(
@Global StupidPool<ByteBuffer> computationBufferPool,
TopNQueryQueryToolChest toolchest
TopNQueryQueryToolChest toolchest,
QueryWatcher queryWatcher
)
{
this.computationBufferPool = computationBufferPool;
this.toolchest = toolchest;
this.queryWatcher = queryWatcher;
}
@Override
@ -79,7 +83,7 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory<Result<TopNRes
)
{
return new ChainedExecutionQueryRunner<Result<TopNResultValue>>(
queryExecutor, toolchest.getOrdering(), queryRunners
queryExecutor, toolchest.getOrdering(), queryWatcher, queryRunners
);
}

View File

@ -0,0 +1,148 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.lifecycle.Lifecycle;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import org.junit.Ignore;
import org.junit.Test;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ChainedExecutionQueryRunnerTest
{
@Test @Ignore
public void testQueryCancellation() throws Exception
{
ExecutorService exec = PrioritizedExecutorService.create(
new Lifecycle(), new ExecutorServiceConfig()
{
@Override
public String getFormatString()
{
return "test";
}
@Override
public int getNumThreads()
{
return 2;
}
}
);
final CountDownLatch queriesStarted = new CountDownLatch(2);
final CountDownLatch queryIsRegistered = new CountDownLatch(1);
final Map<Query, ListenableFuture> queries = Maps.newHashMap();
QueryWatcher watcher = new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
queries.put(query, future);
queryIsRegistered.countDown();
}
};
ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>(
exec,
Ordering.<Integer>natural(),
watcher,
Lists.<QueryRunner<Integer>>newArrayList(
new DyingQueryRunner(1, queriesStarted),
new DyingQueryRunner(2, queriesStarted),
new DyingQueryRunner(3, queriesStarted)
)
);
final Sequence seq = chainedRunner.run(
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals("2014/2015")
.aggregators(Lists.<AggregatorFactory>newArrayList(new CountAggregatorFactory("count")))
.build()
);
Future f = Executors.newFixedThreadPool(1).submit(
new Runnable()
{
@Override
public void run()
{
Sequences.toList(seq, Lists.newArrayList());
}
}
);
// wait for query to register
queryIsRegistered.await();
queriesStarted.await();
// cancel the query
queries.values().iterator().next().cancel(true);
f.get();
}
private static class DyingQueryRunner implements QueryRunner<Integer>
{
private final int id;
private final CountDownLatch latch;
public DyingQueryRunner(int id, CountDownLatch latch) {
this.id = id;
this.latch = latch;
}
@Override
public Sequence<Integer> run(Query<Integer> query)
{
latch.countDown();
int i = 0;
while (i >= 0) {
if(Thread.interrupted()) {
throw new QueryInterruptedException("I got killed");
}
// do a lot of work
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new QueryInterruptedException("I got killed");
}
++i;
}
return Sequences.simple(Lists.newArrayList(i));
}
}
}

View File

@ -1,6 +1,7 @@
package io.druid.query;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.collections.StupidPool;
import io.druid.query.search.SearchQueryQueryToolChest;
import io.druid.query.search.SearchQueryRunnerFactory;
@ -40,7 +41,14 @@ public class TestQueryRunners
Segment adapter
)
{
QueryRunnerFactory factory = new TopNQueryRunnerFactory(pool, new TopNQueryQueryToolChest(topNConfig));
QueryRunnerFactory factory = new TopNQueryRunnerFactory(pool, new TopNQueryQueryToolChest(topNConfig), new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
}
});
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
factory.getToolchest()
@ -62,7 +70,14 @@ public class TestQueryRunners
Segment adapter
)
{
QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()));
QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()), new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
}
});
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
factory.getToolchest()
@ -73,11 +88,18 @@ public class TestQueryRunners
Segment adapter
)
{
QueryRunnerFactory factory = new TimeBoundaryQueryRunnerFactory();
QueryRunnerFactory factory = new TimeBoundaryQueryRunnerFactory(new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
}
});
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
factory.getToolchest()
);
}
}
}

View File

@ -23,10 +23,13 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.guava.Sequences;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.query.filter.DimFilter;
import io.druid.query.search.search.FragmentSearchQuerySpec;
@ -56,7 +59,14 @@ public class SearchQueryRunnerTest
public static Collection<?> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeQueryRunners(
new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()))
new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()), new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
}
})
);
}

View File

@ -20,10 +20,13 @@
package io.druid.query.timeboundary;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.guava.Sequences;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import org.joda.time.DateTime;
import org.junit.Assert;
@ -43,7 +46,14 @@ public class TimeBoundaryQueryRunnerTest
public static Collection<?> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeQueryRunners(
new TimeBoundaryQueryRunnerFactory()
new TimeBoundaryQueryRunnerFactory(new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
}
})
);
}

View File

@ -23,12 +23,15 @@ import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.guava.Sequences;
import io.druid.collections.StupidPool;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
@ -68,7 +71,15 @@ public class TopNQueryRunnerTest
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
TestQueryRunners.getPool(),
new TopNQueryQueryToolChest(new TopNQueryConfig())
new TopNQueryQueryToolChest(new TopNQueryConfig()),
new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
}
}
)
)
);
@ -85,7 +96,15 @@ public class TopNQueryRunnerTest
}
}
),
new TopNQueryQueryToolChest(new TopNQueryConfig())
new TopNQueryQueryToolChest(new TopNQueryConfig()),
new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
}
}
)
)
);

View File

@ -24,6 +24,7 @@ import com.google.inject.Binder;
import com.google.inject.multibindings.MapBinder;
import io.druid.query.Query;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryWatcher;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryEngine;
import io.druid.query.groupby.GroupByQueryRunnerFactory;
@ -39,6 +40,7 @@ import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNQueryRunnerFactory;
import io.druid.server.QueryManager;
import java.util.Map;
@ -62,6 +64,12 @@ public class QueryRunnerFactoryModule extends QueryToolChestModule
{
super.configure(binder);
binder.bind(QueryWatcher.class)
.to(QueryManager.class)
.in(LazySingleton.class);
binder.bind(QueryManager.class)
.in(LazySingleton.class);
final MapBinder<Class<? extends Query>, QueryRunnerFactory> queryFactoryBinder = DruidBinders.queryRunnerFactoryBinder(
binder
);

View File

@ -0,0 +1,48 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.query.Query;
import io.druid.query.QueryWatcher;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
public class QueryManager implements QueryWatcher
{
final ConcurrentMap<String, ListenableFuture> queries;
public QueryManager() {
this.queries = Maps.newConcurrentMap();
}
public void cancelQuery(String id) {
Future future = queries.get(id);
if(future != null) {
future.cancel(true);
}
}
public void registerQuery(Query query, ListenableFuture future)
{
queries.put(query.getId(), future);
}
}

View File

@ -37,16 +37,20 @@ import io.druid.guice.annotations.Smile;
import io.druid.query.DataSourceUtil;
import io.druid.query.Query;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryWatcher;
import io.druid.server.log.RequestLogger;
import org.joda.time.DateTime;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.DELETE;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
@ -66,6 +70,7 @@ public class QueryResource
private final QuerySegmentWalker texasRanger;
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
private final QueryManager queryManager;
@Inject
public QueryResource(
@ -73,7 +78,8 @@ public class QueryResource
@Smile ObjectMapper smileMapper,
QuerySegmentWalker texasRanger,
ServiceEmitter emitter,
RequestLogger requestLogger
RequestLogger requestLogger,
QueryManager queryManager
)
{
this.jsonMapper = jsonMapper;
@ -81,6 +87,16 @@ public class QueryResource
this.texasRanger = texasRanger;
this.emitter = emitter;
this.requestLogger = requestLogger;
this.queryManager = queryManager;
}
@DELETE
@Path("{id}")
@Produces("application/json")
public Response getServer(@PathParam("id") String queryId)
{
queryManager.cancelQuery(queryId);
return Response.status(Response.Status.ACCEPTED).build();
}
@POST
@ -124,10 +140,13 @@ public class QueryResource
resp.setStatus(200);
resp.setContentType("application/x-javascript");
resp.setHeader("X-Druid-Query-Id", query.getId());
out = resp.getOutputStream();
jsonWriter.writeValue(out, results);
// JsonGenerator jgen = jsonWriter.getFactory().createGenerator(out);
long requestTime = System.currentTimeMillis() - start;
emitter.emit(