Merge pull request #576 from metamx/query-cancellation

Query cancellation & timeout
This commit is contained in:
xvrl 2014-06-11 12:18:43 -07:00
commit ada2d92c16
45 changed files with 1456 additions and 554 deletions

View File

@ -79,7 +79,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
<version>0.9.5</version>
<version>0.9.6</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>

View File

@ -32,7 +32,12 @@ public class DefaultObjectMapper extends ObjectMapper
{
public DefaultObjectMapper()
{
this(null);
this((JsonFactory)null);
}
public DefaultObjectMapper(DefaultObjectMapper mapper)
{
super(mapper);
}
public DefaultObjectMapper(JsonFactory factory)
@ -52,4 +57,10 @@ public class DefaultObjectMapper extends ObjectMapper
configure(MapperFeature.AUTO_DETECT_SETTERS, false);
configure(SerializationFeature.INDENT_OUTPUT, false);
}
@Override
public ObjectMapper copy()
{
return new DefaultObjectMapper(this);
}
}

View File

@ -32,6 +32,7 @@ import com.google.common.base.Throwables;
import com.metamx.common.Granularity;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Yielder;
import org.joda.time.DateTimeZone;
import java.io.IOException;
@ -104,7 +105,7 @@ public class DruidDefaultSerializersModule extends SimpleModule
jgen.writeStartArray();
value.accumulate(
null,
new Accumulator()
new Accumulator<Object, Object>()
{
@Override
public Object accumulate(Object o, Object o1)
@ -115,7 +116,7 @@ public class DruidDefaultSerializersModule extends SimpleModule
catch (IOException e) {
throw Throwables.propagate(e);
}
return o;
return null;
}
}
);
@ -123,6 +124,28 @@ public class DruidDefaultSerializersModule extends SimpleModule
}
}
);
addSerializer(
Yielder.class,
new JsonSerializer<Yielder>()
{
@Override
public void serialize(Yielder yielder, final JsonGenerator jgen, SerializerProvider provider)
throws IOException, JsonProcessingException
{
jgen.writeStartArray();
try {
while (!yielder.isDone()) {
final Object o = yielder.get();
jgen.writeObject(o);
yielder = yielder.next(null);
}
} finally {
yielder.close();
}
jgen.writeEndArray();
}
}
);
addSerializer(ByteOrder.class, ToStringSerializer.instance);
addDeserializer(
ByteOrder.class,

View File

@ -25,6 +25,10 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.MergeIterable;
@ -35,11 +39,11 @@ import com.metamx.common.logger.Logger;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* A QueryRunner that combines a list of other QueryRunners and executes them in parallel on an executor.
@ -59,27 +63,33 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
private static final Logger log = new Logger(ChainedExecutionQueryRunner.class);
private final Iterable<QueryRunner<T>> queryables;
private final ExecutorService exec;
private final ListeningExecutorService exec;
private final Ordering<T> ordering;
private final QueryWatcher queryWatcher;
public ChainedExecutionQueryRunner(
ExecutorService exec,
Ordering<T> ordering,
QueryWatcher queryWatcher,
QueryRunner<T>... queryables
)
{
this(exec, ordering, Arrays.asList(queryables));
this(exec, ordering, queryWatcher, Arrays.asList(queryables));
}
public ChainedExecutionQueryRunner(
ExecutorService exec,
Ordering<T> ordering,
QueryWatcher queryWatcher,
Iterable<QueryRunner<T>> queryables
)
{
this.exec = exec;
// listeningDecorator will leave PrioritizedExecutorService unchanged,
// since it already implements ListeningExecutorService
this.exec = MoreExecutors.listeningDecorator(exec);
this.ordering = ordering;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.queryWatcher = queryWatcher;
}
@Override
@ -94,19 +104,20 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
public Iterator<T> make()
{
// Make it a List<> to materialize all of the values (so that it will submit everything to the executor)
List<Future<List<T>>> futures = Lists.newArrayList(
ListenableFuture<List<Iterable<T>>> futures = Futures.allAsList(
Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<T>, Future<List<T>>>()
new Function<QueryRunner<T>, ListenableFuture<Iterable<T>>>()
{
@Override
public Future<List<T>> apply(final QueryRunner<T> input)
public ListenableFuture<Iterable<T>> apply(final QueryRunner<T> input)
{
return exec.submit(
new AbstractPrioritizedCallable<List<T>>(priority)
new AbstractPrioritizedCallable<Iterable<T>>(priority)
{
@Override
public List<T> call() throws Exception
public Iterable<T> call() throws Exception
{
try {
if (input == null) {
@ -125,6 +136,9 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
return retVal;
}
catch (QueryInterruptedException e) {
throw Throwables.propagate(e);
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
@ -135,31 +149,37 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
}
}
)
)
);
return new MergeIterable<T>(
ordering.nullsFirst(),
Iterables.transform(
futures,
new Function<Future<List<T>>, Iterable<T>>()
{
@Override
public Iterable<T> apply(Future<List<T>> input)
{
queryWatcher.registerQuery(query, futures);
try {
return input.get();
final Number timeout = query.getContextValue("timeout", (Number)null);
return new MergeIterable<>(
ordering.nullsFirst(),
timeout == null ?
futures.get() :
futures.get(timeout.longValue(), TimeUnit.MILLISECONDS)
).iterator();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query interrupted");
}
catch(CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
}
catch(TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query timeout");
}
catch (ExecutionException e) {
throw new RuntimeException(e);
throw Throwables.propagate(e.getCause());
}
}
}
)
).iterator();
}
@Override
public void cleanup(Iterator<T> tIterator)

View File

@ -26,6 +26,10 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
@ -39,37 +43,44 @@ import io.druid.segment.incremental.IncrementalIndex;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class GroupByParallelQueryRunner implements QueryRunner<Row>
{
private static final Logger log = new Logger(GroupByParallelQueryRunner.class);
private final Iterable<QueryRunner<Row>> queryables;
private final ExecutorService exec;
private final ListeningExecutorService exec;
private final Ordering<Row> ordering;
private final Supplier<GroupByQueryConfig> configSupplier;
private final QueryWatcher queryWatcher;
public GroupByParallelQueryRunner(
ExecutorService exec,
Ordering<Row> ordering,
Supplier<GroupByQueryConfig> configSupplier,
QueryWatcher queryWatcher,
QueryRunner<Row>... queryables
)
{
this(exec, ordering, configSupplier, Arrays.asList(queryables));
this(exec, ordering, configSupplier, queryWatcher, Arrays.asList(queryables));
}
public GroupByParallelQueryRunner(
ExecutorService exec,
Ordering<Row> ordering, Supplier<GroupByQueryConfig> configSupplier,
QueryWatcher queryWatcher,
Iterable<QueryRunner<Row>> queryables
)
{
this.exec = exec;
this.exec = MoreExecutors.listeningDecorator(exec);
this.ordering = ordering;
this.queryWatcher = queryWatcher;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.configSupplier = configSupplier;
}
@ -88,13 +99,14 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found.");
}
List<Future<Boolean>> futures = Lists.newArrayList(
ListenableFuture<List<Boolean>> futures = Futures.allAsList(
Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<Row>, Future<Boolean>>()
new Function<QueryRunner<Row>, ListenableFuture<Boolean>>()
{
@Override
public Future<Boolean> apply(final QueryRunner<Row> input)
public ListenableFuture<Boolean> apply(final QueryRunner<Row> input)
{
return exec.submit(
new AbstractPrioritizedCallable<Boolean>(priority)
@ -106,6 +118,9 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
return true;
}
catch (QueryInterruptedException e) {
throw Throwables.propagate(e);
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
@ -116,19 +131,34 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
}
}
)
)
);
// Let the runners complete
for (Future<Boolean> future : futures) {
try {
future.get();
queryWatcher.registerQuery(query, futures);
final Number timeout = query.getContextValue("timeout", (Number) null);
if(timeout == null) {
futures.get();
} else {
futures.get(timeout.longValue(), TimeUnit.MILLISECONDS);
}
}
catch (InterruptedException e) {
throw new RuntimeException(e);
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query interrupted");
}
catch(CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
}
catch(TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query timeout");
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
throw Throwables.propagate(e.getCause());
}
return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(null));

View File

@ -167,6 +167,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
@Override
public void close() throws IOException
{
try {
if (!isDone() && builder.getUser10() == null) {
builder.setUser10("short");
}
@ -177,9 +178,10 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
if (creationTime > 0) {
emitter.emit(builder.build("query/wait", startTime - creationTime));
}
} finally {
yielder.close();
}
}
};
}
};

View File

@ -0,0 +1,48 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class QueryInterruptedException extends RuntimeException
{
public QueryInterruptedException() {
super();
}
@JsonCreator
public QueryInterruptedException(@JsonProperty("error") String message)
{
super(message);
}
public QueryInterruptedException(Throwable cause)
{
super(cause);
}
@JsonProperty("error")
@Override
public String getMessage()
{
return super.getMessage();
}
}

View File

@ -0,0 +1,47 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.google.common.util.concurrent.ListenableFuture;
/**
* This interface is in a very early stage and should not be considered stable.
*
* The purpose of the QueryWatcher is to give overall visibility into queries running
* or pending at the QueryRunner level. This is currently used to cancel all the
* parts of a pending query, but may be expanded in the future to offer more direct
* visibility into query execution and resource usage.
*
* QueryRunners executing any computation asynchronously must register their queries
* with the QueryWatcher.
*
*/
public interface QueryWatcher
{
/**
* QueryRunners must use this method to register any pending queries.
*
* The given future may have cancel(true) called at any time, if cancellation of this query has been requested.
*
* @param query a query, which may be a subset of a larger query, as long as the underlying queryId is unchanged
* @param future the future holding the execution status of the query
*/
public void registerQuery(Query query, ListenableFuture future);
}

View File

@ -24,42 +24,55 @@ import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.ExecutorExecutingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Row;
import io.druid.query.ConcatQueryRunner;
import io.druid.query.GroupByParallelQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
*/
public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupByQuery>
{
private final GroupByQueryEngine engine;
private final QueryWatcher queryWatcher;
private final Supplier<GroupByQueryConfig> config;
private final GroupByQueryQueryToolChest toolChest;
private static final Logger log = new Logger(GroupByQueryRunnerFactory.class);
@Inject
public GroupByQueryRunnerFactory(
GroupByQueryEngine engine,
QueryWatcher queryWatcher,
Supplier<GroupByQueryConfig> config,
GroupByQueryQueryToolChest toolChest
)
{
this.engine = engine;
this.queryWatcher = queryWatcher;
this.config = config;
this.toolChest = toolChest;
}
@ -71,8 +84,10 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
}
@Override
public QueryRunner<Row> mergeRunners(final ExecutorService queryExecutor, Iterable<QueryRunner<Row>> queryRunners)
public QueryRunner<Row> mergeRunners(final ExecutorService exec, Iterable<QueryRunner<Row>> queryRunners)
{
// mergeRunners should take ListeningExecutorService at some point
final ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator(exec);
if (config.get().isSingleThreaded()) {
return new ConcatQueryRunner<Row>(
Sequences.map(
@ -88,7 +103,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
public Sequence<Row> run(final Query<Row> query)
{
Future<Sequence<Row>> future = queryExecutor.submit(
ListenableFuture<Sequence<Row>> future = queryExecutor.submit(
new Callable<Sequence<Row>>()
{
@Override
@ -102,13 +117,25 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
}
);
try {
return future.get();
queryWatcher.registerQuery(query, future);
final Number timeout = query.getContextValue("timeout", (Number)null);
return timeout == null ? future.get() : future.get(timeout.longValue(), TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query interrupted");
}
catch(CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
}
catch(TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query timeout");
}
catch (ExecutionException e) {
throw Throwables.propagate(e);
throw Throwables.propagate(e.getCause());
}
}
};
@ -117,7 +144,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
)
);
} else {
return new GroupByParallelQueryRunner(queryExecutor, new RowOrdering(), config, queryRunners);
return new GroupByParallelQueryRunner(queryExecutor, new RowOrdering(), config, queryWatcher, queryRunners);
}
}

View File

@ -22,15 +22,21 @@ package io.druid.query.metadata;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.metamx.common.guava.ExecutorExecutingSequence;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.query.AbstractPrioritizedCallable;
import io.druid.query.ConcatQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.ColumnIncluderator;
import io.druid.query.metadata.metadata.SegmentAnalysis;
@ -40,16 +46,27 @@ import io.druid.segment.Segment;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<SegmentAnalysis, SegmentMetadataQuery>
{
private static final SegmentAnalyzer analyzer = new SegmentAnalyzer();
private static final SegmentMetadataQueryQueryToolChest toolChest = new SegmentMetadataQueryQueryToolChest();
private static final Logger log = new Logger(SegmentMetadataQueryRunnerFactory.class);
private final QueryWatcher queryWatcher;
@Inject
public SegmentMetadataQueryRunnerFactory(
QueryWatcher queryWatcher
)
{
this.queryWatcher = queryWatcher;
}
@Override
public QueryRunner<SegmentAnalysis> createRunner(final Segment segment)
@ -101,9 +118,10 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
@Override
public QueryRunner<SegmentAnalysis> mergeRunners(
final ExecutorService queryExecutor, Iterable<QueryRunner<SegmentAnalysis>> queryRunners
ExecutorService exec, Iterable<QueryRunner<SegmentAnalysis>> queryRunners
)
{
final ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator(exec);
return new ConcatQueryRunner<SegmentAnalysis>(
Sequences.map(
Sequences.simple(queryRunners),
@ -118,7 +136,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
public Sequence<SegmentAnalysis> run(final Query<SegmentAnalysis> query)
{
final int priority = query.getContextPriority(0);
Future<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
ListenableFuture<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
new AbstractPrioritizedCallable<Sequence<SegmentAnalysis>>(priority)
{
@Override
@ -129,13 +147,25 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
}
);
try {
return future.get();
queryWatcher.registerQuery(query, future);
final Number timeout = query.getContextValue("timeout", (Number) null);
return timeout == null ? future.get() : future.get(timeout.longValue(), TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query interrupted");
}
catch(CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
}
catch(TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query timeout");
}
catch (ExecutionException e) {
throw Throwables.propagate(e);
throw Throwables.propagate(e.getCause());
}
}
};

View File

@ -24,6 +24,7 @@ import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.query.search.search.SearchQuery;
import io.druid.segment.Segment;
@ -35,13 +36,16 @@ import java.util.concurrent.ExecutorService;
public class SearchQueryRunnerFactory implements QueryRunnerFactory<Result<SearchResultValue>, SearchQuery>
{
private final SearchQueryQueryToolChest toolChest;
private final QueryWatcher queryWatcher;
@Inject
public SearchQueryRunnerFactory(
SearchQueryQueryToolChest toolChest
SearchQueryQueryToolChest toolChest,
QueryWatcher queryWatcher
)
{
this.toolChest = toolChest;
this.queryWatcher = queryWatcher;
}
@Override
@ -56,7 +60,7 @@ public class SearchQueryRunnerFactory implements QueryRunnerFactory<Result<Searc
)
{
return new ChainedExecutionQueryRunner<Result<SearchResultValue>>(
queryExecutor, toolChest.getOrdering(), queryRunners
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
);
}

View File

@ -20,6 +20,7 @@
package io.druid.query.select;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
@ -29,6 +30,7 @@ import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.segment.Segment;
@ -39,25 +41,20 @@ import java.util.concurrent.ExecutorService;
public class SelectQueryRunnerFactory
implements QueryRunnerFactory<Result<SelectResultValue>, SelectQuery>
{
public static SelectQueryRunnerFactory create(ObjectMapper jsonMapper)
{
return new SelectQueryRunnerFactory(
new SelectQueryQueryToolChest(new QueryConfig(), jsonMapper),
new SelectQueryEngine()
);
}
private final SelectQueryQueryToolChest toolChest;
private final SelectQueryEngine engine;
private final QueryWatcher queryWatcher;
@Inject
public SelectQueryRunnerFactory(
SelectQueryQueryToolChest toolChest,
SelectQueryEngine engine
SelectQueryEngine engine,
QueryWatcher queryWatcher
)
{
this.toolChest = toolChest;
this.engine = engine;
this.queryWatcher = queryWatcher;
}
@Override
@ -72,7 +69,7 @@ public class SelectQueryRunnerFactory
)
{
return new ChainedExecutionQueryRunner<Result<SelectResultValue>>(
queryExecutor, toolChest.getOrdering(), queryRunners
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
);
}

View File

@ -19,6 +19,7 @@
package io.druid.query.timeboundary;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.Sequence;
@ -27,6 +28,7 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
@ -40,6 +42,13 @@ public class TimeBoundaryQueryRunnerFactory
implements QueryRunnerFactory<Result<TimeBoundaryResultValue>, TimeBoundaryQuery>
{
private static final TimeBoundaryQueryQueryToolChest toolChest = new TimeBoundaryQueryQueryToolChest();
private final QueryWatcher queryWatcher;
@Inject
public TimeBoundaryQueryRunnerFactory(QueryWatcher queryWatcher)
{
this.queryWatcher = queryWatcher;
}
@Override
public QueryRunner<Result<TimeBoundaryResultValue>> createRunner(final Segment segment)
@ -53,7 +62,7 @@ public class TimeBoundaryQueryRunnerFactory
)
{
return new ChainedExecutionQueryRunner<Result<TimeBoundaryResultValue>>(
queryExecutor, toolChest.getOrdering(), queryRunners
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
);
}

View File

@ -60,6 +60,7 @@ public class TimeseriesQueryEngine
{
Aggregator[] aggregators = QueryRunnerHelper.makeAggregators(cursor, aggregatorSpecs);
try {
while (!cursor.isDone()) {
for (Aggregator aggregator : aggregators) {
aggregator.aggregate();
@ -74,13 +75,14 @@ public class TimeseriesQueryEngine
}
Result<TimeseriesResultValue> retVal = bob.build();
return retVal;
}
finally {
// cleanup
for (Aggregator agg : aggregators) {
agg.close();
}
return retVal;
}
}
}
);

View File

@ -19,6 +19,7 @@
package io.druid.query.timeseries;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
@ -28,6 +29,7 @@ import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
@ -39,25 +41,20 @@ import java.util.concurrent.ExecutorService;
public class TimeseriesQueryRunnerFactory
implements QueryRunnerFactory<Result<TimeseriesResultValue>, TimeseriesQuery>
{
public static TimeseriesQueryRunnerFactory create()
{
return new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine()
);
}
private final TimeseriesQueryQueryToolChest toolChest;
private final TimeseriesQueryEngine engine;
private final QueryWatcher queryWatcher;
@Inject
public TimeseriesQueryRunnerFactory(
TimeseriesQueryQueryToolChest toolChest,
TimeseriesQueryEngine engine
TimeseriesQueryEngine engine,
QueryWatcher queryWatcher
)
{
this.toolChest = toolChest;
this.engine = engine;
this.queryWatcher = queryWatcher;
}
@Override
@ -72,7 +69,7 @@ public class TimeseriesQueryRunnerFactory
)
{
return new ChainedExecutionQueryRunner<Result<TimeseriesResultValue>>(
queryExecutor, toolChest.getOrdering(), queryRunners
queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners
);
}

View File

@ -30,6 +30,7 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.segment.Segment;
@ -43,15 +44,18 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory<Result<TopNRes
{
private final StupidPool<ByteBuffer> computationBufferPool;
private final TopNQueryQueryToolChest toolchest;
private final QueryWatcher queryWatcher;
@Inject
public TopNQueryRunnerFactory(
@Global StupidPool<ByteBuffer> computationBufferPool,
TopNQueryQueryToolChest toolchest
TopNQueryQueryToolChest toolchest,
QueryWatcher queryWatcher
)
{
this.computationBufferPool = computationBufferPool;
this.toolchest = toolchest;
this.queryWatcher = queryWatcher;
}
@Override
@ -79,7 +83,7 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory<Result<TopNRes
)
{
return new ChainedExecutionQueryRunner<Result<TopNResultValue>>(
queryExecutor, toolchest.getOrdering(), queryRunners
queryExecutor, toolchest.getOrdering(), queryWatcher, queryRunners
);
}

View File

@ -27,6 +27,7 @@ import com.google.common.io.Closeables;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.granularity.QueryGranularity;
import io.druid.query.QueryInterruptedException;
import io.druid.query.filter.Filter;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
@ -224,6 +225,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public void advance()
{
if (Thread.interrupted()) {
throw new QueryInterruptedException();
}
cursorOffset.increment();
}
@ -652,6 +656,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public void advance()
{
if (Thread.interrupted()) {
throw new QueryInterruptedException();
}
++currRow;
}

View File

@ -29,6 +29,7 @@ import com.metamx.collections.spatial.search.Bound;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.granularity.QueryGranularity;
import io.druid.query.QueryInterruptedException;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher;
@ -200,6 +201,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
while (baseIter.hasNext()) {
if (Thread.interrupted()) {
throw new QueryInterruptedException();
}
currEntry.set(baseIter.next());
if (filterMatcher.matches()) {
@ -239,6 +244,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
Iterators.advance(baseIter, numAdvanced);
}
if (Thread.interrupted()) {
throw new QueryInterruptedException();
}
boolean foundMatched = false;
while (baseIter.hasNext()) {
currEntry.set(baseIter.next());

View File

@ -0,0 +1,286 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.concurrent.ExecutorServiceConfig;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.lifecycle.Lifecycle;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class ChainedExecutionQueryRunnerTest
{
@Test
public void testQueryCancellation() throws Exception
{
ExecutorService exec = PrioritizedExecutorService.create(
new Lifecycle(), new ExecutorServiceConfig()
{
@Override
public String getFormatString()
{
return "test";
}
@Override
public int getNumThreads()
{
return 2;
}
}
);
final CountDownLatch queriesStarted = new CountDownLatch(2);
final CountDownLatch queryIsRegistered = new CountDownLatch(1);
Capture<ListenableFuture> capturedFuture = new Capture<>();
QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class);
watcher.registerQuery(EasyMock.<Query>anyObject(), EasyMock.and(EasyMock.<ListenableFuture>anyObject(), EasyMock.capture(capturedFuture)));
EasyMock.expectLastCall()
.andAnswer(
new IAnswer<Void>()
{
@Override
public Void answer() throws Throwable
{
queryIsRegistered.countDown();
return null;
}
}
)
.once();
EasyMock.replay(watcher);
DyingQueryRunner runner1 = new DyingQueryRunner(queriesStarted);
DyingQueryRunner runner2 = new DyingQueryRunner(queriesStarted);
DyingQueryRunner runner3 = new DyingQueryRunner(queriesStarted);
ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>(
exec,
Ordering.<Integer>natural(),
watcher,
Lists.<QueryRunner<Integer>>newArrayList(
runner1,
runner2,
runner3
)
);
final Sequence seq = chainedRunner.run(
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals("2014/2015")
.aggregators(Lists.<AggregatorFactory>newArrayList(new CountAggregatorFactory("count")))
.build()
);
Future resultFuture = Executors.newFixedThreadPool(1).submit(
new Runnable()
{
@Override
public void run()
{
Sequences.toList(seq, Lists.newArrayList());
}
}
);
// wait for query to register and start
Assert.assertTrue(queryIsRegistered.await(1, TimeUnit.SECONDS));
Assert.assertTrue(queriesStarted.await(1, TimeUnit.SECONDS));
// cancel the query
Assert.assertTrue(capturedFuture.hasCaptured());
ListenableFuture future = capturedFuture.getValue();
future.cancel(true);
QueryInterruptedException cause = null;
try {
resultFuture.get();
} catch(ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof QueryInterruptedException);
cause = (QueryInterruptedException)e.getCause();
}
Assert.assertNotNull(cause);
Assert.assertTrue(future.isCancelled());
Assert.assertTrue(runner1.hasStarted);
Assert.assertTrue(runner2.hasStarted);
Assert.assertFalse(runner3.hasStarted);
Assert.assertFalse(runner1.hasCompleted);
Assert.assertFalse(runner2.hasCompleted);
Assert.assertFalse(runner3.hasCompleted);
EasyMock.verify(watcher);
}
@Test
public void testQueryTimeout() throws Exception
{
ExecutorService exec = PrioritizedExecutorService.create(
new Lifecycle(), new ExecutorServiceConfig()
{
@Override
public String getFormatString()
{
return "test";
}
@Override
public int getNumThreads()
{
return 2;
}
}
);
final CountDownLatch queriesStarted = new CountDownLatch(2);
final CountDownLatch queryIsRegistered = new CountDownLatch(1);
Capture<ListenableFuture> capturedFuture = new Capture<>();
QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class);
watcher.registerQuery(EasyMock.<Query>anyObject(), EasyMock.and(EasyMock.<ListenableFuture>anyObject(), EasyMock.capture(capturedFuture)));
EasyMock.expectLastCall()
.andAnswer(
new IAnswer<Void>()
{
@Override
public Void answer() throws Throwable
{
queryIsRegistered.countDown();
return null;
}
}
)
.once();
EasyMock.replay(watcher);
DyingQueryRunner runner1 = new DyingQueryRunner(queriesStarted);
DyingQueryRunner runner2 = new DyingQueryRunner(queriesStarted);
DyingQueryRunner runner3 = new DyingQueryRunner(queriesStarted);
ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>(
exec,
Ordering.<Integer>natural(),
watcher,
Lists.<QueryRunner<Integer>>newArrayList(
runner1,
runner2,
runner3
)
);
final Sequence seq = chainedRunner.run(
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals("2014/2015")
.aggregators(Lists.<AggregatorFactory>newArrayList(new CountAggregatorFactory("count")))
.context(ImmutableMap.<String, Object>of("timeout", (100), "queryId", "test"))
.build()
);
Future resultFuture = Executors.newFixedThreadPool(1).submit(
new Runnable()
{
@Override
public void run()
{
Sequences.toList(seq, Lists.newArrayList());
}
}
);
// wait for query to register and start
Assert.assertTrue(queryIsRegistered.await(1, TimeUnit.SECONDS));
Assert.assertTrue(queriesStarted.await(1, TimeUnit.SECONDS));
// cancel the query
Assert.assertTrue(capturedFuture.hasCaptured());
ListenableFuture future = capturedFuture.getValue();
QueryInterruptedException cause = null;
try {
resultFuture.get();
} catch(ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof QueryInterruptedException);
Assert.assertEquals("Query timeout", e.getCause().getMessage());
cause = (QueryInterruptedException)e.getCause();
}
Assert.assertNotNull(cause);
Assert.assertTrue(future.isCancelled());
Assert.assertTrue(runner1.hasStarted);
Assert.assertTrue(runner2.hasStarted);
Assert.assertFalse(runner3.hasStarted);
Assert.assertFalse(runner1.hasCompleted);
Assert.assertFalse(runner2.hasCompleted);
Assert.assertFalse(runner3.hasCompleted);
EasyMock.verify(watcher);
}
private static class DyingQueryRunner implements QueryRunner<Integer>
{
private final CountDownLatch latch;
private boolean hasStarted = false;
private boolean hasCompleted = false;
public DyingQueryRunner(CountDownLatch latch)
{
this.latch = latch;
}
@Override
public Sequence<Integer> run(Query<Integer> query)
{
hasStarted = true;
latch.countDown();
if (Thread.interrupted()) {
throw new QueryInterruptedException("I got killed");
}
// do a lot of work
try {
Thread.sleep(500);
}
catch (InterruptedException e) {
throw new QueryInterruptedException("I got killed");
}
hasCompleted = true;
return Sequences.simple(Lists.newArrayList(123));
}
}
}

View File

@ -21,6 +21,7 @@ package io.druid.query;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
@ -53,6 +54,16 @@ import java.util.List;
*/
public class QueryRunnerTestHelper
{
public static final QueryWatcher NOOP_QUERYWATCHER = new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
}
};
public static final String segmentId = "testSegment";
public static final String dataSource = "testing";
public static final UnionDataSource unionDataSource = new UnionDataSource(

View File

@ -1,11 +1,14 @@
package io.druid.query;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.collections.StupidPool;
import io.druid.query.search.SearchQueryQueryToolChest;
import io.druid.query.search.SearchQueryRunnerFactory;
import io.druid.query.search.search.SearchQueryConfig;
import io.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.topn.TopNQueryConfig;
import io.druid.query.topn.TopNQueryQueryToolChest;
@ -40,7 +43,11 @@ public class TestQueryRunners
Segment adapter
)
{
QueryRunnerFactory factory = new TopNQueryRunnerFactory(pool, new TopNQueryQueryToolChest(topNConfig));
QueryRunnerFactory factory = new TopNQueryRunnerFactory(
pool,
new TopNQueryQueryToolChest(topNConfig),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
factory.getToolchest()
@ -51,7 +58,12 @@ public class TestQueryRunners
Segment adapter
)
{
QueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
factory.getToolchest()
@ -62,7 +74,7 @@ public class TestQueryRunners
Segment adapter
)
{
QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()));
QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()), QueryRunnerTestHelper.NOOP_QUERYWATCHER);
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
factory.getToolchest()
@ -73,11 +85,10 @@ public class TestQueryRunners
Segment adapter
)
{
QueryRunnerFactory factory = new TimeBoundaryQueryRunnerFactory();
QueryRunnerFactory factory = new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER);
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
factory.getToolchest()
);
}
}

View File

@ -113,6 +113,7 @@ public class GroupByQueryRunnerTest
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
engine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
configSupplier,
new GroupByQueryQueryToolChest(configSupplier, engine)
);

View File

@ -72,6 +72,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
engine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
configSupplier,
new GroupByQueryQueryToolChest(configSupplier, engine)
);

View File

@ -25,6 +25,7 @@ import io.druid.query.LegacyDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryWatcher;
import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
@ -95,7 +96,7 @@ public class SegmentAnalyzerTest
private List<SegmentAnalysis> getSegmentAnalysises(Segment index)
{
final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
(QueryRunnerFactory) new SegmentMetadataQueryRunnerFactory(), index
(QueryRunnerFactory) new SegmentMetadataQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER), index
);
final SegmentMetadataQuery query = new SegmentMetadataQuery(

View File

@ -45,7 +45,7 @@ public class SegmentMetadataQueryTest
{
@SuppressWarnings("unchecked")
private final QueryRunner runner = makeQueryRunner(
new SegmentMetadataQueryRunnerFactory()
new SegmentMetadataQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER)
);
private ObjectMapper mapper = new DefaultObjectMapper();

View File

@ -23,10 +23,13 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.guava.Sequences;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.query.filter.DimFilter;
import io.druid.query.search.search.FragmentSearchQuerySpec;
@ -56,7 +59,10 @@ public class SearchQueryRunnerTest
public static Collection<?> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeQueryRunners(
new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()))
new SearchQueryRunnerFactory(
new SearchQueryQueryToolChest(new SearchQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);
}

View File

@ -22,11 +22,15 @@ package io.druid.query.select;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequences;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.query.TableDataSource;
import io.druid.query.filter.SelectorDimFilter;
@ -54,7 +58,11 @@ public class SelectQueryRunnerTest
public static Collection<?> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeQueryRunners(
SelectQueryRunnerFactory.create(new DefaultObjectMapper())
new SelectQueryRunnerFactory(
new SelectQueryQueryToolChest(new QueryConfig(), new DefaultObjectMapper()),
new SelectQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);
}

View File

@ -20,10 +20,13 @@
package io.druid.query.timeboundary;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.guava.Sequences;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import org.joda.time.DateTime;
import org.junit.Assert;
@ -43,7 +46,7 @@ public class TimeBoundaryQueryRunnerTest
public static Collection<?> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeQueryRunners(
new TimeBoundaryQueryRunnerFactory()
new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER)
);
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequences;
import io.druid.query.Druids;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
@ -46,7 +47,11 @@ public class TimeSeriesUnionQueryRunnerTest
public static Collection<?> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeUnionQueryRunners(
TimeseriesQueryRunnerFactory.create()
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);
}

View File

@ -28,8 +28,10 @@ import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
@ -87,7 +89,12 @@ public class TimeseriesQueryRunnerBonusTest
private static List<Result<TimeseriesResultValue>> runTimeseriesCount(IncrementalIndex index)
{
final QueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
final QueryRunner<Result<TimeseriesResultValue>> runner = makeQueryRunner(
factory,
new IncrementalIndexSegment(index, null)

View File

@ -26,6 +26,7 @@ import com.metamx.common.guava.Sequences;
import io.druid.granularity.PeriodGranularity;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
@ -62,7 +63,11 @@ public class TimeseriesQueryRunnerTest
public static Collection<?> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeQueryRunners(
TimeseriesQueryRunnerFactory.create()
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);
}

View File

@ -68,7 +68,8 @@ public class TopNQueryRunnerTest
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
TestQueryRunners.getPool(),
new TopNQueryQueryToolChest(new TopNQueryConfig())
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
);
@ -85,7 +86,8 @@ public class TopNQueryRunnerTest
}
}
),
new TopNQueryQueryToolChest(new TopNQueryConfig())
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
);

View File

@ -65,7 +65,8 @@ public class TopNUnionQueryTest
QueryRunnerTestHelper.makeUnionQueryRunners(
new TopNQueryRunnerFactory(
TestQueryRunners.getPool(),
new TopNQueryQueryToolChest(new TopNQueryConfig())
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
);
@ -82,7 +83,8 @@ public class TopNUnionQueryTest
}
}
),
new TopNQueryQueryToolChest(new TopNQueryConfig())
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
);

View File

@ -29,13 +29,17 @@ import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.SpatialDimFilter;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.IncrementalIndexSegment;
@ -434,7 +438,12 @@ public class SpatialFilterBonusTest
)
);
try {
TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()
@ -516,7 +525,12 @@ public class SpatialFilterBonusTest
)
);
try {
TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()

View File

@ -29,13 +29,17 @@ import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.SpatialDimFilter;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.IncrementalIndexSegment;
@ -464,7 +468,12 @@ public class SpatialFilterTest
)
);
try {
TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()
@ -546,7 +555,12 @@ public class SpatialFilterTest
)
);
try {
TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()

View File

@ -32,10 +32,9 @@ import io.druid.client.selector.TierSelectorStrategy;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Client;
import io.druid.query.DataSource;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.TableDataSource;
import io.druid.query.QueryWatcher;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline;
@ -60,6 +59,7 @@ public class BrokerServerView implements TimelineServerView
private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines;
private final QueryToolChestWarehouse warehouse;
private final QueryWatcher queryWatcher;
private final ObjectMapper smileMapper;
private final HttpClient httpClient;
private final ServerInventoryView baseView;
@ -68,6 +68,7 @@ public class BrokerServerView implements TimelineServerView
@Inject
public BrokerServerView(
QueryToolChestWarehouse warehouse,
QueryWatcher queryWatcher,
ObjectMapper smileMapper,
@Client HttpClient httpClient,
ServerInventoryView baseView,
@ -75,6 +76,7 @@ public class BrokerServerView implements TimelineServerView
)
{
this.warehouse = warehouse;
this.queryWatcher = queryWatcher;
this.smileMapper = smileMapper;
this.httpClient = httpClient;
this.baseView = baseView;
@ -154,7 +156,7 @@ public class BrokerServerView implements TimelineServerView
private DirectDruidClient makeDirectClient(DruidServer server)
{
return new DirectDruidClient(warehouse, smileMapper, httpClient, server.getHost());
return new DirectDruidClient(warehouse, queryWatcher, smileMapper, httpClient, server.getHost());
}
private QueryableDruidServer removeServer(DruidServer server)

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
@ -43,11 +44,15 @@ import com.metamx.http.client.HttpClient;
import com.metamx.http.client.io.AppendableByteArrayInputStream;
import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.query.aggregation.MetricManipulatorFns;
import org.jboss.netty.handler.codec.http.HttpChunk;
@ -60,6 +65,7 @@ import java.io.InputStream;
import java.net.URL;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
@ -73,6 +79,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
private static final Map<Class<? extends Query>, Pair<JavaType, JavaType>> typesMap = Maps.newConcurrentMap();
private final QueryToolChestWarehouse warehouse;
private final QueryWatcher queryWatcher;
private final ObjectMapper objectMapper;
private final HttpClient httpClient;
private final String host;
@ -82,12 +89,14 @@ public class DirectDruidClient<T> implements QueryRunner<T>
public DirectDruidClient(
QueryToolChestWarehouse warehouse,
QueryWatcher queryWatcher,
ObjectMapper objectMapper,
HttpClient httpClient,
String host
)
{
this.warehouse = warehouse;
this.queryWatcher = queryWatcher;
this.objectMapper = objectMapper;
this.httpClient = httpClient;
this.host = host;
@ -102,7 +111,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(Query<T> query)
public Sequence<T> run(final Query<T> query)
{
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
boolean isBySegment = query.getContextBySegment(false);
@ -127,6 +136,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
final ListenableFuture<InputStream> future;
final String url = String.format("http://%s/druid/v2/", host);
final String cancelUrl = String.format("http://%s/druid/v2/%s", host, query.getId());
try {
log.debug("Querying url[%s]", url);
@ -174,6 +184,9 @@ public class DirectDruidClient<T> implements QueryRunner<T>
}
}
);
queryWatcher.registerQuery(query, future);
openConnections.getAndIncrement();
Futures.addCallback(
future, new FutureCallback<InputStream>()
@ -188,6 +201,27 @@ public class DirectDruidClient<T> implements QueryRunner<T>
public void onFailure(Throwable t)
{
openConnections.getAndDecrement();
if (future.isCancelled()) {
// forward the cancellation to underlying queriable node
try {
StatusResponseHolder res = httpClient
.delete(new URL(cancelUrl))
.setContent(objectMapper.writeValueAsBytes(query))
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? "application/smile" : "application/json")
.go(new StatusResponseHandler(Charsets.UTF_8))
.get();
if (res.getStatus().getCode() >= 500) {
throw new RE(
"Error cancelling query[%s]: queriable node returned status[%d] [%s].",
res.getStatus().getCode(),
res.getStatus().getReasonPhrase()
);
}
}
catch (IOException | ExecutionException | InterruptedException e) {
Throwables.propagate(e);
}
}
}
}
);
@ -196,7 +230,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
throw Throwables.propagate(e);
}
Sequence<T> retVal = new BaseSequence<T, JsonParserIterator<T>>(
Sequence<T> retVal = new BaseSequence<>(
new BaseSequence.IteratorMaker<T, JsonParserIterator<T>>()
{
@Override
@ -283,21 +317,23 @@ public class DirectDruidClient<T> implements QueryRunner<T>
if (jp == null) {
try {
jp = objectMapper.getFactory().createParser(future.get());
if (jp.nextToken() != JsonToken.START_ARRAY) {
final JsonToken nextToken = jp.nextToken();
if (nextToken == JsonToken.START_OBJECT) {
QueryInterruptedException e = jp.getCodec().readValue(jp, QueryInterruptedException.class);
throw e;
}
else if (nextToken != JsonToken.START_ARRAY) {
throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url);
} else {
jp.nextToken();
objectCodec = jp.getCodec();
}
}
catch (IOException e) {
catch (IOException | InterruptedException | ExecutionException e) {
throw new RE(e, "Failure getting results from[%s]", url);
}
catch (InterruptedException e) {
throw new RE(e, "Failure getting results from[%s]", url);
}
catch (ExecutionException e) {
throw new RE(e, "Failure getting results from[%s]", url);
catch (CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
}
}
}

View File

@ -22,6 +22,7 @@ package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -30,7 +31,9 @@ import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.HttpResponseHandler;
import io.druid.guice.annotations.Client;
import io.druid.query.Query;
import io.druid.server.QueryResource;
import io.druid.server.router.Router;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import javax.inject.Inject;
@ -68,7 +71,7 @@ public class RoutingDruidClient<IntermediateType, FinalType>
return openConnections.get();
}
public ListenableFuture<FinalType> post(
public ListenableFuture<FinalType> postQuery(
String url,
Query query,
HttpResponseHandler<IntermediateType, FinalType> responseHandler
@ -81,7 +84,7 @@ public class RoutingDruidClient<IntermediateType, FinalType>
future = httpClient
.post(new URL(url))
.setContent(objectMapper.writeValueAsBytes(query))
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? "application/smile" : "application/json")
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? QueryResource.APPLICATION_SMILE : QueryResource.APPLICATION_JSON)
.go(responseHandler);
openConnections.getAndIncrement();
@ -125,4 +128,19 @@ public class RoutingDruidClient<IntermediateType, FinalType>
throw Throwables.propagate(e);
}
}
public ListenableFuture<FinalType> delete(
String url,
HttpResponseHandler<IntermediateType, FinalType> responseHandler
)
{
try {
return httpClient
.delete(new URL(url))
.go(responseHandler);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -24,6 +24,7 @@ import com.google.inject.Binder;
import com.google.inject.multibindings.MapBinder;
import io.druid.query.Query;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryWatcher;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryEngine;
import io.druid.query.groupby.GroupByQueryRunnerFactory;
@ -39,6 +40,7 @@ import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNQueryRunnerFactory;
import io.druid.server.QueryManager;
import java.util.Map;
@ -62,6 +64,12 @@ public class QueryRunnerFactoryModule extends QueryToolChestModule
{
super.configure(binder);
binder.bind(QueryWatcher.class)
.to(QueryManager.class)
.in(LazySingleton.class);
binder.bind(QueryManager.class)
.in(LazySingleton.class);
final MapBinder<Class<? extends Query>, QueryRunnerFactory> queryFactoryBinder = DruidBinders.queryRunnerFactoryBinder(
binder
);

View File

@ -21,7 +21,11 @@ package io.druid.server;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
@ -37,19 +41,20 @@ import io.druid.server.log.RequestLogger;
import io.druid.server.router.QueryHostFinder;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.UUID;
/**
@ -59,8 +64,6 @@ import java.util.UUID;
public class AsyncQueryForwardingServlet extends HttpServlet
{
private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class);
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final String DISPATCHED = "dispatched";
private static final Joiner COMMA_JOIN = Joiner.on(",");
private final ObjectMapper jsonMapper;
@ -88,276 +91,162 @@ public class AsyncQueryForwardingServlet extends HttpServlet
}
@Override
protected void doGet(final HttpServletRequest req, final HttpServletResponse resp)
protected void doGet(HttpServletRequest req, HttpServletResponse res)
throws ServletException, IOException
{
OutputStream out = null;
AsyncContext ctx = null;
try {
ctx = req.startAsync(req, resp);
final AsyncContext asyncContext = ctx;
if (req.getAttribute(DISPATCHED) != null) {
return;
}
out = resp.getOutputStream();
final OutputStream outputStream = out;
final String host = hostFinder.getDefaultHost();
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new HttpResponseHandler<OutputStream, OutputStream>()
{
@Override
public ClientResponse<OutputStream> handleResponse(HttpResponse response)
{
resp.setStatus(response.getStatus().getCode());
resp.setContentType("application/json");
try {
ChannelBuffer buf = response.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
return ClientResponse.finished(outputStream);
}
@Override
public ClientResponse<OutputStream> handleChunk(
ClientResponse<OutputStream> clientResponse, HttpChunk chunk
)
{
try {
ChannelBuffer buf = chunk.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
return clientResponse;
}
@Override
public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
{
final OutputStream obj = clientResponse.getObj();
try {
resp.flushBuffer();
outputStream.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
asyncContext.complete();
}
return ClientResponse.finished(obj);
}
@Override
public void exceptionCaught(
ClientResponse<OutputStream> clientResponse,
Throwable e
)
{
handleException(resp, asyncContext, e);
}
};
final AsyncContext asyncContext = req.startAsync(req, res);
asyncContext.start(
new Runnable()
{
@Override
public void run()
{
routingDruidClient.get(makeUrl(host, req), responseHandler);
try {
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new PassthroughHttpResponseHandler(asyncContext, jsonMapper);
final String host = hostFinder.getDefaultHost();
routingDruidClient.get(makeUrl(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler);
}
catch (Exception e) {
handleException(jsonMapper, asyncContext, e);
}
}
}
);
asyncContext.dispatch();
req.setAttribute(DISPATCHED, true);
}
catch (Exception e) {
handleException(resp, ctx, e);
}
}
@Override
protected void doPost(
final HttpServletRequest req, final HttpServletResponse resp
) throws ServletException, IOException
protected void doDelete(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException
{
final AsyncContext asyncContext = req.startAsync(req, res);
asyncContext.start(
new Runnable()
{
@Override
public void run()
{
try {
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new PassthroughHttpResponseHandler(asyncContext, jsonMapper);
final String host = hostFinder.getDefaultHost();
routingDruidClient.delete(makeUrl(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler);
}
catch (Exception e) {
handleException(jsonMapper, asyncContext, e);
}
}
}
);
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException
{
final long start = System.currentTimeMillis();
Query query = null;
String queryId;
final boolean isSmile = "application/smile".equals(req.getContentType());
final AsyncContext asyncContext = req.startAsync(req, res);
asyncContext.start(
new Runnable()
{
@Override
public void run()
{
final HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest();
final boolean isSmile = QueryResource.APPLICATION_SMILE.equals(request.getContentType());
final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
OutputStream out = null;
AsyncContext ctx = null;
Query inputQuery = null;
try {
ctx = req.startAsync(req, resp);
final AsyncContext asyncContext = ctx;
if (req.getAttribute(DISPATCHED) != null) {
return;
}
query = objectMapper.readValue(req.getInputStream(), Query.class);
queryId = query.getId();
if (queryId == null) {
queryId = UUID.randomUUID().toString();
query = query.withId(queryId);
inputQuery = objectMapper.readValue(request.getInputStream(), Query.class);
if (inputQuery.getId() == null) {
inputQuery = inputQuery.withId(UUID.randomUUID().toString());
}
final Query query = inputQuery;
if (log.isDebugEnabled()) {
log.debug("Got query [%s]", query);
log.debug("Got query [%s]", inputQuery);
}
out = resp.getOutputStream();
final OutputStream outputStream = out;
final String host = hostFinder.getHost(query);
final Query theQuery = query;
final String theQueryId = queryId;
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new HttpResponseHandler<OutputStream, OutputStream>()
{
@Override
public ClientResponse<OutputStream> handleResponse(HttpResponse response)
{
resp.setStatus(response.getStatus().getCode());
resp.setContentType("application/x-javascript");
try {
ChannelBuffer buf = response.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
return ClientResponse.finished(outputStream);
}
@Override
public ClientResponse<OutputStream> handleChunk(
ClientResponse<OutputStream> clientResponse, HttpChunk chunk
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new PassthroughHttpResponseHandler(
asyncContext,
objectMapper
)
{
try {
ChannelBuffer buf = chunk.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
return clientResponse;
}
@Override
public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
{
final long requestTime = System.currentTimeMillis() - start;
log.debug("Request time: %d", requestTime);
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(theQuery.getDataSource()))
.setUser4(theQuery.getType())
.setUser5(COMMA_JOIN.join(theQuery.getIntervals()))
.setUser6(String.valueOf(theQuery.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser8(theQueryId)
.setUser9(theQuery.getDuration().toPeriod().toStandardMinutes().toString())
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4(query.getType())
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(request.getRemoteAddr())
.setUser8(query.getId())
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime)
);
final OutputStream obj = clientResponse.getObj();
try {
requestLogger.log(
new RequestLogLine(
new DateTime(),
req.getRemoteAddr(),
theQuery,
new QueryStats(ImmutableMap.<String, Object>of("request/time", requestTime, "success", true))
request.getRemoteAddr(),
query,
new QueryStats(
ImmutableMap.<String, Object>of(
"request/time",
requestTime,
"success",
true
)
)
)
);
resp.flushBuffer();
outputStream.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
asyncContext.complete();
}
return ClientResponse.finished(obj);
}
@Override
public void exceptionCaught(
ClientResponse<OutputStream> clientResponse,
Throwable e
)
{
handleException(resp, asyncContext, e);
return super.done(clientResponse);
}
};
asyncContext.start(
new Runnable()
{
@Override
public void run()
{
routingDruidClient.post(makeUrl(host, req), theQuery, responseHandler);
}
}
routingDruidClient.postQuery(
makeUrl(hostFinder.getHost(inputQuery), request),
inputQuery,
responseHandler
);
asyncContext.dispatch();
req.setAttribute(DISPATCHED, true);
}
catch (Exception e) {
handleException(resp, ctx, e);
handleException(objectMapper, asyncContext, e);
try {
requestLogger.log(
new RequestLogLine(
new DateTime(),
req.getRemoteAddr(),
query,
request.getRemoteAddr(),
inputQuery,
new QueryStats(ImmutableMap.<String, Object>of("success", false, "exception", e.toString()))
)
);
}
catch (Exception e2) {
log.error(e2, "Unable to log query [%s]!", query);
catch (Exception logError) {
log.error(logError, "Unable to log query [%s]!", inputQuery);
}
log.makeAlert(e, "Exception handling request")
.addData("query", query)
.addData("peer", req.getRemoteAddr())
.addData("query", inputQuery)
.addData("peer", request.getRemoteAddr())
.emit();
}
}
}
);
}
private String makeUrl(final String host, final HttpServletRequest req)
{
@ -370,24 +259,126 @@ public class AsyncQueryForwardingServlet extends HttpServlet
return String.format("http://%s%s?%s", host, requestURI, queryString);
}
private static void handleException(HttpServletResponse resp, AsyncContext ctx, Throwable e)
private static void handleException(ObjectMapper objectMapper, AsyncContext asyncContext, Throwable exception)
{
try {
final ServletOutputStream out = resp.getOutputStream();
if (!resp.isCommitted()) {
resp.setStatus(500);
resp.resetBuffer();
out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8));
out.write("\n".getBytes(UTF8));
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
if (!response.isCommitted()) {
final String errorMessage = exception.getMessage() == null ? "null exception" : exception.getMessage();
response.resetBuffer();
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
objectMapper.writeValue(
response.getOutputStream(),
ImmutableMap.of(
"error", errorMessage
)
);
}
response.flushBuffer();
}
catch (IOException e) {
Throwables.propagate(e);
}
finally {
asyncContext.complete();
}
}
if (ctx != null) {
ctx.complete();
private static class PassthroughHttpResponseHandler implements HttpResponseHandler<OutputStream, OutputStream>
{
private final AsyncContext asyncContext;
private final ObjectMapper objectMapper;
private final OutputStream outputStream;
public PassthroughHttpResponseHandler(AsyncContext asyncContext, ObjectMapper objectMapper) throws IOException
{
this.asyncContext = asyncContext;
this.objectMapper = objectMapper;
this.outputStream = asyncContext.getResponse().getOutputStream();
}
resp.flushBuffer();
protected void copyStatusHeaders(HttpResponse clientResponse)
{
final HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
response.setStatus(clientResponse.getStatus().getCode());
response.setContentType(clientResponse.headers().get(HttpHeaders.Names.CONTENT_TYPE));
FluentIterable.from(clientResponse.headers().entries())
.filter(new Predicate<Map.Entry<String, String>>()
{
@Override
public boolean apply(@Nullable Map.Entry<String, String> input)
{
return input.getKey().startsWith("X-Druid");
}
}
)
.transform(
new Function<Map.Entry<String, String>, Object>()
{
@Nullable
@Override
public Object apply(@Nullable Map.Entry<String, String> input)
{
response.setHeader(input.getKey(), input.getValue());
return null;
}
}
)
.allMatch(Predicates.alwaysTrue());
}
@Override
public ClientResponse<OutputStream> handleResponse(HttpResponse clientResponse)
{
copyStatusHeaders(clientResponse);
try {
ChannelBuffer buf = clientResponse.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return ClientResponse.finished(outputStream);
}
@Override
public ClientResponse<OutputStream> handleChunk(
ClientResponse<OutputStream> clientResponse, HttpChunk chunk
)
{
try {
ChannelBuffer buf = chunk.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return clientResponse;
}
@Override
public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
{
asyncContext.complete();
return ClientResponse.finished(clientResponse.getObj());
}
@Override
public void exceptionCaught(
ClientResponse<OutputStream> clientResponse,
Throwable e
)
{
// throwing an exception here may cause resource leak
try {
handleException(objectMapper, asyncContext, e);
} catch(Exception err) {
log.error(err, "Unable to handle exception response");
}
catch (IOException e1) {
Throwables.propagate(e1);
}
}
}

View File

@ -0,0 +1,68 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.query.Query;
import io.druid.query.QueryWatcher;
import java.util.Set;
public class QueryManager implements QueryWatcher
{
final SetMultimap<String, ListenableFuture> queries;
public QueryManager()
{
this.queries = Multimaps.synchronizedSetMultimap(
HashMultimap.<String, ListenableFuture>create()
);
}
public boolean cancelQuery(String id) {
Set<ListenableFuture> futures = queries.removeAll(id);
boolean success = true;
for (ListenableFuture future : futures) {
success = success && future.cancel(true);
}
return success;
}
public void registerQuery(Query query, final ListenableFuture future)
{
final String id = query.getId();
queries.put(id, future);
future.addListener(
new Runnable()
{
@Override
public void run()
{
queries.remove(id, future);
}
},
MoreExecutors.sameThreadExecutor()
);
}
}

View File

@ -19,16 +19,23 @@
package io.druid.server;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.inject.Inject;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Accumulators;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingAccumulators;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
@ -36,6 +43,7 @@ import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.query.DataSourceUtil;
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QuerySegmentWalker;
import io.druid.server.log.RequestLogger;
import org.joda.time.DateTime;
@ -43,10 +51,17 @@ import org.joda.time.DateTime;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
@ -58,14 +73,16 @@ import java.util.UUID;
public class QueryResource
{
private static final EmittingLogger log = new EmittingLogger(QueryResource.class);
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final Joiner COMMA_JOIN = Joiner.on(",");
public static final String APPLICATION_SMILE = "application/smile";
public static final String APPLICATION_JSON = "application/json";
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QuerySegmentWalker texasRanger;
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
private final QueryManager queryManager;
@Inject
public QueryResource(
@ -73,35 +90,49 @@ public class QueryResource
@Smile ObjectMapper smileMapper,
QuerySegmentWalker texasRanger,
ServiceEmitter emitter,
RequestLogger requestLogger
RequestLogger requestLogger,
QueryManager queryManager
)
{
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.jsonMapper = jsonMapper.copy();
this.jsonMapper.getFactory().configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
this.smileMapper = smileMapper.copy();
this.smileMapper.getFactory().configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
this.texasRanger = texasRanger;
this.emitter = emitter;
this.requestLogger = requestLogger;
this.queryManager = queryManager;
}
@DELETE
@Path("{id}")
@Produces("application/json")
public Response getServer(@PathParam("id") String queryId)
{
queryManager.cancelQuery(queryId);
return Response.status(Response.Status.ACCEPTED).build();
}
@POST
@Produces("application/json")
public void doPost(
public Response doPost(
@Context HttpServletRequest req,
@Context HttpServletResponse resp
@Context final HttpServletResponse resp
) throws ServletException, IOException
{
final long start = System.currentTimeMillis();
Query query = null;
byte[] requestQuery = null;
String queryId;
String queryId = null;
final boolean isSmile = "application/smile".equals(req.getContentType());
final boolean isSmile = APPLICATION_SMILE.equals(req.getContentType());
ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
ObjectWriter jsonWriter = req.getParameter("pretty") == null
final ObjectWriter jsonWriter = req.getParameter("pretty") == null
? objectMapper.writer()
: objectMapper.writerWithDefaultPrettyPrinter();
OutputStream out = null;
try {
requestQuery = ByteStreams.toByteArray(req.getInputStream());
@ -116,18 +147,26 @@ public class QueryResource
log.debug("Got query [%s]", query);
}
Sequence<?> results = query.run(texasRanger);
Sequence results = query.run(texasRanger);
if (results == null) {
results = Sequences.empty();
}
resp.setStatus(200);
resp.setContentType("application/x-javascript");
out = resp.getOutputStream();
jsonWriter.writeValue(out, results);
try (
final Yielder yielder = results.toYielder(
null,
new YieldingAccumulator()
{
@Override
public Object accumulate(Object accumulated, Object in)
{
yield();
return in;
}
}
)
) {
long requestTime = System.currentTimeMillis() - start;
emitter.emit(
@ -155,6 +194,45 @@ public class QueryResource
)
)
);
return Response
.ok(
new StreamingOutput()
{
@Override
public void write(OutputStream outputStream) throws IOException, WebApplicationException
{
jsonWriter.writeValue(outputStream, yielder);
outputStream.close();
}
},
isSmile ? APPLICATION_JSON : APPLICATION_SMILE
)
.header("X-Druid-Query-Id", queryId)
.build();
}
}
catch (QueryInterruptedException e) {
try {
log.info("%s [%s]", e.getMessage(), queryId);
requestLogger.log(
new RequestLogLine(
new DateTime(),
req.getRemoteAddr(),
query,
new QueryStats(ImmutableMap.<String, Object>of("success", false, "interrupted", true, "reason", e.toString()))
)
);
} catch (Exception e2) {
log.error(e2, "Unable to log query [%s]!", query);
}
return Response.serverError().entity(
jsonWriter.writeValueAsString(
ImmutableMap.of(
"error", e.getMessage()
)
)
).build();
}
catch (Exception e) {
final String queryString =
@ -164,20 +242,6 @@ public class QueryResource
log.warn(e, "Exception occurred on request [%s]", queryString);
if (!resp.isCommitted()) {
resp.setStatus(500);
resp.resetBuffer();
if (out == null) {
out = resp.getOutputStream();
}
out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8));
out.write("\n".getBytes(UTF8));
}
resp.flushBuffer();
try {
requestLogger.log(
new RequestLogLine(
@ -197,10 +261,14 @@ public class QueryResource
.addData("query", queryString)
.addData("peer", req.getRemoteAddr())
.emit();
}
finally {
resp.flushBuffer();
Closeables.closeQuietly(out);
return Response.serverError().entity(
jsonWriter.writeValueAsString(
ImmutableMap.of(
"error", e.getMessage() == null ? "null exception" : e.getMessage()
)
)
).build();
}
}
}

View File

@ -21,7 +21,6 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;

View File

@ -21,30 +21,37 @@ package io.druid.client;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.RequestBuilder;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.client.selector.ConnectionCountServerSelectorStrategy;
import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryWatcher;
import io.druid.query.ReflectionQueryToolChestWarehouse;
import io.druid.query.Result;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Before;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
@ -54,17 +61,10 @@ import java.util.List;
public class DirectDruidClientTest
{
private HttpClient httpClient;
@Before
public void setUp() throws Exception
{
httpClient = EasyMock.createMock(HttpClient.class);
}
@Test
public void testRun() throws Exception
{
HttpClient httpClient = EasyMock.createMock(HttpClient.class);
RequestBuilder requestBuilder = new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com"));
EasyMock.expect(httpClient.post(EasyMock.<URL>anyObject())).andReturn(requestBuilder).atLeastOnce();
@ -93,12 +93,14 @@ public class DirectDruidClientTest
DirectDruidClient client1 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new DefaultObjectMapper(),
httpClient,
"foo"
);
DirectDruidClient client2 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new DefaultObjectMapper(),
httpClient,
"foo2"
@ -149,4 +151,70 @@ public class DirectDruidClientTest
EasyMock.verify(httpClient);
}
@Test
public void testCancel() throws Exception
{
HttpClient httpClient = EasyMock.createStrictMock(HttpClient.class);
EasyMock.expect(httpClient.post(EasyMock.<URL>anyObject())).andReturn(
new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com"))
).once();
ListenableFuture<Object> cancelledFuture = Futures.immediateCancelledFuture();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(cancelledFuture).once();
EasyMock.expect(httpClient.delete(EasyMock.<URL>anyObject()))
.andReturn(new RequestBuilder(httpClient, HttpMethod.DELETE, new URL("http://foo.com/delete")))
.once();
SettableFuture<Object> cancellationFuture = SettableFuture.create();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(cancellationFuture).once();
EasyMock.replay(httpClient);
final ServerSelector serverSelector = new ServerSelector(
new DataSegment(
"test",
new Interval("2013-01-01/2013-01-02"),
new DateTime("2013-01-01").toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
0L
),
new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy())
);
DirectDruidClient client1 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new DefaultObjectMapper(),
httpClient,
"foo"
);
QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0),
client1
);
serverSelector.addServer(queryableDruidServer1);
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled")));
Sequence results = client1.run(query);
Assert.assertEquals(0, client1.getNumOpenConnections());
QueryInterruptedException exception = null;
try {
Sequences.toList(results, Lists.newArrayList());
} catch(QueryInterruptedException e) {
exception = e;
}
Assert.assertNotNull(exception);
EasyMock.verify(httpClient);
}
}