diff --git a/pom.xml b/pom.xml index a78251143a5..6717674b368 100644 --- a/pom.xml +++ b/pom.xml @@ -79,7 +79,7 @@ com.metamx http-client - 0.9.5 + 0.9.6 com.metamx diff --git a/processing/src/main/java/io/druid/jackson/DefaultObjectMapper.java b/processing/src/main/java/io/druid/jackson/DefaultObjectMapper.java index 952b506bd95..e55e2299d2b 100644 --- a/processing/src/main/java/io/druid/jackson/DefaultObjectMapper.java +++ b/processing/src/main/java/io/druid/jackson/DefaultObjectMapper.java @@ -32,7 +32,12 @@ public class DefaultObjectMapper extends ObjectMapper { public DefaultObjectMapper() { - this(null); + this((JsonFactory)null); + } + + public DefaultObjectMapper(DefaultObjectMapper mapper) + { + super(mapper); } public DefaultObjectMapper(JsonFactory factory) @@ -52,4 +57,10 @@ public class DefaultObjectMapper extends ObjectMapper configure(MapperFeature.AUTO_DETECT_SETTERS, false); configure(SerializationFeature.INDENT_OUTPUT, false); } + + @Override + public ObjectMapper copy() + { + return new DefaultObjectMapper(this); + } } diff --git a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java index 38c3bc135ef..6184221a1db 100644 --- a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java @@ -32,6 +32,7 @@ import com.google.common.base.Throwables; import com.metamx.common.Granularity; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Yielder; import org.joda.time.DateTimeZone; import java.io.IOException; @@ -104,7 +105,7 @@ public class DruidDefaultSerializersModule extends SimpleModule jgen.writeStartArray(); value.accumulate( null, - new Accumulator() + new Accumulator() { @Override public Object accumulate(Object o, Object o1) @@ -115,7 +116,7 @@ public class DruidDefaultSerializersModule extends SimpleModule catch (IOException e) { throw Throwables.propagate(e); } - return o; + return null; } } ); @@ -123,6 +124,28 @@ public class DruidDefaultSerializersModule extends SimpleModule } } ); + addSerializer( + Yielder.class, + new JsonSerializer() + { + @Override + public void serialize(Yielder yielder, final JsonGenerator jgen, SerializerProvider provider) + throws IOException, JsonProcessingException + { + jgen.writeStartArray(); + try { + while (!yielder.isDone()) { + final Object o = yielder.get(); + jgen.writeObject(o); + yielder = yielder.next(null); + } + } finally { + yielder.close(); + } + jgen.writeEndArray(); + } + } + ); addSerializer(ByteOrder.class, ToStringSerializer.instance); addDeserializer( ByteOrder.class, diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 31eac12702e..8a5ed51a4df 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -25,6 +25,10 @@ import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.ISE; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.MergeIterable; @@ -35,11 +39,11 @@ import com.metamx.common.logger.Logger; import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * A QueryRunner that combines a list of other QueryRunners and executes them in parallel on an executor. @@ -59,27 +63,33 @@ public class ChainedExecutionQueryRunner implements QueryRunner private static final Logger log = new Logger(ChainedExecutionQueryRunner.class); private final Iterable> queryables; - private final ExecutorService exec; + private final ListeningExecutorService exec; private final Ordering ordering; + private final QueryWatcher queryWatcher; public ChainedExecutionQueryRunner( ExecutorService exec, Ordering ordering, + QueryWatcher queryWatcher, QueryRunner... queryables ) { - this(exec, ordering, Arrays.asList(queryables)); + this(exec, ordering, queryWatcher, Arrays.asList(queryables)); } public ChainedExecutionQueryRunner( ExecutorService exec, Ordering ordering, + QueryWatcher queryWatcher, Iterable> queryables ) { - this.exec = exec; + // listeningDecorator will leave PrioritizedExecutorService unchanged, + // since it already implements ListeningExecutorService + this.exec = MoreExecutors.listeningDecorator(exec); this.ordering = ordering; this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); + this.queryWatcher = queryWatcher; } @Override @@ -94,71 +104,81 @@ public class ChainedExecutionQueryRunner implements QueryRunner public Iterator make() { // Make it a List<> to materialize all of the values (so that it will submit everything to the executor) - List>> futures = Lists.newArrayList( - Iterables.transform( - queryables, - new Function, Future>>() - { - @Override - public Future> apply(final QueryRunner input) - { - return exec.submit( - new AbstractPrioritizedCallable>(priority) - { - @Override - public List call() throws Exception - { - try { - if (input == null) { - throw new ISE("Input is null?! How is this possible?!"); - } + ListenableFuture>> futures = Futures.allAsList( + Lists.newArrayList( + Iterables.transform( + queryables, + new Function, ListenableFuture>>() + { + @Override + public ListenableFuture> apply(final QueryRunner input) + { + return exec.submit( + new AbstractPrioritizedCallable>(priority) + { + @Override + public Iterable call() throws Exception + { + try { + if (input == null) { + throw new ISE("Input is null?! How is this possible?!"); + } - Sequence result = input.run(query); - if (result == null) { - throw new ISE("Got a null result! Segments are missing!"); - } + Sequence result = input.run(query); + if (result == null) { + throw new ISE("Got a null result! Segments are missing!"); + } - List retVal = Sequences.toList(result, Lists.newArrayList()); - if (retVal == null) { - throw new ISE("Got a null list of results! WTF?!"); - } + List retVal = Sequences.toList(result, Lists.newArrayList()); + if (retVal == null) { + throw new ISE("Got a null list of results! WTF?!"); + } - return retVal; + return retVal; + } + catch (QueryInterruptedException e) { + throw Throwables.propagate(e); + } + catch (Exception e) { + log.error(e, "Exception with one of the sequences!"); + throw Throwables.propagate(e); + } + } } - catch (Exception e) { - log.error(e, "Exception with one of the sequences!"); - throw Throwables.propagate(e); - } - } - } - ); - } - } + ); + } + } + ) ) ); - return new MergeIterable( - ordering.nullsFirst(), - Iterables.transform( - futures, - new Function>, Iterable>() - { - @Override - public Iterable apply(Future> input) - { - try { - return input.get(); - } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - } - ) - ).iterator(); + queryWatcher.registerQuery(query, futures); + + try { + final Number timeout = query.getContextValue("timeout", (Number)null); + return new MergeIterable<>( + ordering.nullsFirst(), + timeout == null ? + futures.get() : + futures.get(timeout.longValue(), TimeUnit.MILLISECONDS) + ).iterator(); + } + catch (InterruptedException e) { + log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); + futures.cancel(true); + throw new QueryInterruptedException("Query interrupted"); + } + catch(CancellationException e) { + throw new QueryInterruptedException("Query cancelled"); + } + catch(TimeoutException e) { + log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); + futures.cancel(true); + throw new QueryInterruptedException("Query timeout"); + } + catch (ExecutionException e) { + throw Throwables.propagate(e.getCause()); + } } @Override diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java index 51c663c6a2e..c9b14b6314b 100644 --- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -26,6 +26,10 @@ import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.Pair; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Sequence; @@ -39,37 +43,44 @@ import io.druid.segment.incremental.IncrementalIndex; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class GroupByParallelQueryRunner implements QueryRunner { private static final Logger log = new Logger(GroupByParallelQueryRunner.class); private final Iterable> queryables; - private final ExecutorService exec; + private final ListeningExecutorService exec; private final Ordering ordering; private final Supplier configSupplier; + private final QueryWatcher queryWatcher; + public GroupByParallelQueryRunner( ExecutorService exec, Ordering ordering, Supplier configSupplier, + QueryWatcher queryWatcher, QueryRunner... queryables ) { - this(exec, ordering, configSupplier, Arrays.asList(queryables)); + this(exec, ordering, configSupplier, queryWatcher, Arrays.asList(queryables)); } public GroupByParallelQueryRunner( ExecutorService exec, Ordering ordering, Supplier configSupplier, + QueryWatcher queryWatcher, Iterable> queryables ) { - this.exec = exec; + this.exec = MoreExecutors.listeningDecorator(exec); this.ordering = ordering; + this.queryWatcher = queryWatcher; this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); this.configSupplier = configSupplier; } @@ -88,48 +99,67 @@ public class GroupByParallelQueryRunner implements QueryRunner if (Iterables.isEmpty(queryables)) { log.warn("No queryables found."); } - List> futures = Lists.newArrayList( - Iterables.transform( - queryables, - new Function, Future>() - { - @Override - public Future apply(final QueryRunner input) - { - return exec.submit( - new AbstractPrioritizedCallable(priority) - { - @Override - public Boolean call() throws Exception - { - try { - input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); - return true; + ListenableFuture> futures = Futures.allAsList( + Lists.newArrayList( + Iterables.transform( + queryables, + new Function, ListenableFuture>() + { + @Override + public ListenableFuture apply(final QueryRunner input) + { + return exec.submit( + new AbstractPrioritizedCallable(priority) + { + @Override + public Boolean call() throws Exception + { + try { + input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); + return true; + } + catch (QueryInterruptedException e) { + throw Throwables.propagate(e); + } + catch (Exception e) { + log.error(e, "Exception with one of the sequences!"); + throw Throwables.propagate(e); + } + } } - catch (Exception e) { - log.error(e, "Exception with one of the sequences!"); - throw Throwables.propagate(e); - } - } - } - ); - } - } + ); + } + } + ) ) ); // Let the runners complete - for (Future future : futures) { - try { - future.get(); - } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - catch (ExecutionException e) { - throw new RuntimeException(e); + try { + queryWatcher.registerQuery(query, futures); + final Number timeout = query.getContextValue("timeout", (Number) null); + if(timeout == null) { + futures.get(); + } else { + futures.get(timeout.longValue(), TimeUnit.MILLISECONDS); } } + catch (InterruptedException e) { + log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); + futures.cancel(true); + throw new QueryInterruptedException("Query interrupted"); + } + catch(CancellationException e) { + throw new QueryInterruptedException("Query cancelled"); + } + catch(TimeoutException e) { + log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); + futures.cancel(true); + throw new QueryInterruptedException("Query timeout"); + } + catch (ExecutionException e) { + throw Throwables.propagate(e.getCause()); + } return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(null)); } diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java index 8425aa97fe2..dbad443cb36 100644 --- a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java @@ -167,18 +167,20 @@ public class MetricsEmittingQueryRunner implements QueryRunner @Override public void close() throws IOException { - if (!isDone() && builder.getUser10() == null) { - builder.setUser10("short"); + try { + if (!isDone() && builder.getUser10() == null) { + builder.setUser10("short"); + } + + long timeTaken = System.currentTimeMillis() - startTime; + emitter.emit(builder.build("query/time", timeTaken)); + + if (creationTime > 0) { + emitter.emit(builder.build("query/wait", startTime - creationTime)); + } + } finally { + yielder.close(); } - - long timeTaken = System.currentTimeMillis() - startTime; - emitter.emit(builder.build("query/time", timeTaken)); - - if(creationTime > 0) { - emitter.emit(builder.build("query/wait", startTime - creationTime)); - } - - yielder.close(); } }; } diff --git a/processing/src/main/java/io/druid/query/QueryInterruptedException.java b/processing/src/main/java/io/druid/query/QueryInterruptedException.java new file mode 100644 index 00000000000..00676918dcf --- /dev/null +++ b/processing/src/main/java/io/druid/query/QueryInterruptedException.java @@ -0,0 +1,48 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class QueryInterruptedException extends RuntimeException +{ + public QueryInterruptedException() { + super(); + } + + @JsonCreator + public QueryInterruptedException(@JsonProperty("error") String message) + { + super(message); + } + + public QueryInterruptedException(Throwable cause) + { + super(cause); + } + + @JsonProperty("error") + @Override + public String getMessage() + { + return super.getMessage(); + } +} diff --git a/processing/src/main/java/io/druid/query/QueryWatcher.java b/processing/src/main/java/io/druid/query/QueryWatcher.java new file mode 100644 index 00000000000..36394b11e34 --- /dev/null +++ b/processing/src/main/java/io/druid/query/QueryWatcher.java @@ -0,0 +1,47 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query; + +import com.google.common.util.concurrent.ListenableFuture; + +/** + * This interface is in a very early stage and should not be considered stable. + * + * The purpose of the QueryWatcher is to give overall visibility into queries running + * or pending at the QueryRunner level. This is currently used to cancel all the + * parts of a pending query, but may be expanded in the future to offer more direct + * visibility into query execution and resource usage. + * + * QueryRunners executing any computation asynchronously must register their queries + * with the QueryWatcher. + * + */ +public interface QueryWatcher +{ + /** + * QueryRunners must use this method to register any pending queries. + * + * The given future may have cancel(true) called at any time, if cancellation of this query has been requested. + * + * @param query a query, which may be a subset of a larger query, as long as the underlying queryId is unchanged + * @param future the future holding the execution status of the query + */ + public void registerQuery(Query query, ListenableFuture future); +} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java index 714aad37925..e8634089c2f 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java @@ -24,42 +24,55 @@ import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.Ordering; import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.guava.ExecutorExecutingSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import com.metamx.common.logger.Logger; import io.druid.data.input.Row; import io.druid.query.ConcatQueryRunner; import io.druid.query.GroupByParallelQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.segment.Segment; import io.druid.segment.StorageAdapter; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** */ public class GroupByQueryRunnerFactory implements QueryRunnerFactory { private final GroupByQueryEngine engine; + private final QueryWatcher queryWatcher; private final Supplier config; private final GroupByQueryQueryToolChest toolChest; + private static final Logger log = new Logger(GroupByQueryRunnerFactory.class); + @Inject public GroupByQueryRunnerFactory( GroupByQueryEngine engine, + QueryWatcher queryWatcher, Supplier config, GroupByQueryQueryToolChest toolChest ) { this.engine = engine; + this.queryWatcher = queryWatcher; this.config = config; this.toolChest = toolChest; } @@ -71,8 +84,10 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory mergeRunners(final ExecutorService queryExecutor, Iterable> queryRunners) + public QueryRunner mergeRunners(final ExecutorService exec, Iterable> queryRunners) { + // mergeRunners should take ListeningExecutorService at some point + final ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator(exec); if (config.get().isSingleThreaded()) { return new ConcatQueryRunner( Sequences.map( @@ -88,7 +103,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory run(final Query query) { - Future> future = queryExecutor.submit( + ListenableFuture> future = queryExecutor.submit( new Callable>() { @Override @@ -102,13 +117,25 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory { private static final SegmentAnalyzer analyzer = new SegmentAnalyzer(); - private static final SegmentMetadataQueryQueryToolChest toolChest = new SegmentMetadataQueryQueryToolChest(); + private static final Logger log = new Logger(SegmentMetadataQueryRunnerFactory.class); + + private final QueryWatcher queryWatcher; + + @Inject + public SegmentMetadataQueryRunnerFactory( + QueryWatcher queryWatcher + ) + { + this.queryWatcher = queryWatcher; + } @Override public QueryRunner createRunner(final Segment segment) @@ -101,9 +118,10 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory mergeRunners( - final ExecutorService queryExecutor, Iterable> queryRunners + ExecutorService exec, Iterable> queryRunners ) { + final ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator(exec); return new ConcatQueryRunner( Sequences.map( Sequences.simple(queryRunners), @@ -118,7 +136,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory run(final Query query) { final int priority = query.getContextPriority(0); - Future> future = queryExecutor.submit( + ListenableFuture> future = queryExecutor.submit( new AbstractPrioritizedCallable>(priority) { @Override @@ -129,13 +147,25 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory, SearchQuery> { private final SearchQueryQueryToolChest toolChest; + private final QueryWatcher queryWatcher; @Inject public SearchQueryRunnerFactory( - SearchQueryQueryToolChest toolChest + SearchQueryQueryToolChest toolChest, + QueryWatcher queryWatcher ) { this.toolChest = toolChest; + this.queryWatcher = queryWatcher; } @Override @@ -56,7 +60,7 @@ public class SearchQueryRunnerFactory implements QueryRunnerFactory>( - queryExecutor, toolChest.getOrdering(), queryRunners + queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners ); } diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java index 6e995b15f44..72cce700a6d 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java @@ -20,6 +20,7 @@ package io.druid.query.select; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.guava.Sequence; @@ -29,6 +30,7 @@ import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.segment.Segment; @@ -39,25 +41,20 @@ import java.util.concurrent.ExecutorService; public class SelectQueryRunnerFactory implements QueryRunnerFactory, SelectQuery> { - public static SelectQueryRunnerFactory create(ObjectMapper jsonMapper) - { - return new SelectQueryRunnerFactory( - new SelectQueryQueryToolChest(new QueryConfig(), jsonMapper), - new SelectQueryEngine() - ); - } - private final SelectQueryQueryToolChest toolChest; private final SelectQueryEngine engine; + private final QueryWatcher queryWatcher; @Inject public SelectQueryRunnerFactory( SelectQueryQueryToolChest toolChest, - SelectQueryEngine engine + SelectQueryEngine engine, + QueryWatcher queryWatcher ) { this.toolChest = toolChest; this.engine = engine; + this.queryWatcher = queryWatcher; } @Override @@ -72,7 +69,7 @@ public class SelectQueryRunnerFactory ) { return new ChainedExecutionQueryRunner>( - queryExecutor, toolChest.getOrdering(), queryRunners + queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners ); } diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java index 16e9ae832fa..1f78429ead3 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java @@ -19,6 +19,7 @@ package io.druid.query.timeboundary; +import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.Sequence; @@ -27,6 +28,7 @@ import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.segment.Segment; import io.druid.segment.StorageAdapter; @@ -40,6 +42,13 @@ public class TimeBoundaryQueryRunnerFactory implements QueryRunnerFactory, TimeBoundaryQuery> { private static final TimeBoundaryQueryQueryToolChest toolChest = new TimeBoundaryQueryQueryToolChest(); + private final QueryWatcher queryWatcher; + + @Inject + public TimeBoundaryQueryRunnerFactory(QueryWatcher queryWatcher) + { + this.queryWatcher = queryWatcher; + } @Override public QueryRunner> createRunner(final Segment segment) @@ -53,7 +62,7 @@ public class TimeBoundaryQueryRunnerFactory ) { return new ChainedExecutionQueryRunner>( - queryExecutor, toolChest.getOrdering(), queryRunners + queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners ); } diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java index 9f1d0860aa7..ad290536b30 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryEngine.java @@ -60,27 +60,29 @@ public class TimeseriesQueryEngine { Aggregator[] aggregators = QueryRunnerHelper.makeAggregators(cursor, aggregatorSpecs); - while (!cursor.isDone()) { - for (Aggregator aggregator : aggregators) { - aggregator.aggregate(); + try { + while (!cursor.isDone()) { + for (Aggregator aggregator : aggregators) { + aggregator.aggregate(); + } + cursor.advance(); } - cursor.advance(); + + TimeseriesResultBuilder bob = new TimeseriesResultBuilder(cursor.getTime()); + + for (Aggregator aggregator : aggregators) { + bob.addMetric(aggregator); + } + + Result retVal = bob.build(); + return retVal; } - - TimeseriesResultBuilder bob = new TimeseriesResultBuilder(cursor.getTime()); - - for (Aggregator aggregator : aggregators) { - bob.addMetric(aggregator); + finally { + // cleanup + for (Aggregator agg : aggregators) { + agg.close(); + } } - - Result retVal = bob.build(); - - // cleanup - for (Aggregator agg : aggregators) { - agg.close(); - } - - return retVal; } } ); diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java index 740b309e37e..724d4818226 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java @@ -19,6 +19,7 @@ package io.druid.query.timeseries; +import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.guava.Sequence; @@ -28,6 +29,7 @@ import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.segment.Segment; import io.druid.segment.StorageAdapter; @@ -39,25 +41,20 @@ import java.util.concurrent.ExecutorService; public class TimeseriesQueryRunnerFactory implements QueryRunnerFactory, TimeseriesQuery> { - public static TimeseriesQueryRunnerFactory create() - { - return new TimeseriesQueryRunnerFactory( - new TimeseriesQueryQueryToolChest(new QueryConfig()), - new TimeseriesQueryEngine() - ); - } - private final TimeseriesQueryQueryToolChest toolChest; private final TimeseriesQueryEngine engine; + private final QueryWatcher queryWatcher; @Inject public TimeseriesQueryRunnerFactory( TimeseriesQueryQueryToolChest toolChest, - TimeseriesQueryEngine engine + TimeseriesQueryEngine engine, + QueryWatcher queryWatcher ) { this.toolChest = toolChest; this.engine = engine; + this.queryWatcher = queryWatcher; } @Override @@ -72,7 +69,7 @@ public class TimeseriesQueryRunnerFactory ) { return new ChainedExecutionQueryRunner>( - queryExecutor, toolChest.getOrdering(), queryRunners + queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners ); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java index 044e6a64eb2..524f9ace6a5 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java @@ -30,6 +30,7 @@ import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.segment.Segment; @@ -43,15 +44,18 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory computationBufferPool; private final TopNQueryQueryToolChest toolchest; + private final QueryWatcher queryWatcher; @Inject public TopNQueryRunnerFactory( @Global StupidPool computationBufferPool, - TopNQueryQueryToolChest toolchest + TopNQueryQueryToolChest toolchest, + QueryWatcher queryWatcher ) { this.computationBufferPool = computationBufferPool; this.toolchest = toolchest; + this.queryWatcher = queryWatcher; } @Override @@ -79,7 +83,7 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory>( - queryExecutor, toolchest.getOrdering(), queryRunners + queryExecutor, toolchest.getOrdering(), queryWatcher, queryRunners ); } diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index a71297b3b89..f624cdfa807 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -27,6 +27,7 @@ import com.google.common.io.Closeables; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.granularity.QueryGranularity; +import io.druid.query.QueryInterruptedException; import io.druid.query.filter.Filter; import io.druid.segment.column.Column; import io.druid.segment.column.ColumnCapabilities; @@ -224,6 +225,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter @Override public void advance() { + if (Thread.interrupted()) { + throw new QueryInterruptedException(); + } cursorOffset.increment(); } @@ -652,6 +656,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter @Override public void advance() { + if (Thread.interrupted()) { + throw new QueryInterruptedException(); + } ++currRow; } diff --git a/processing/src/main/java/io/druid/segment/ReferenceCountingSequence.java b/processing/src/main/java/io/druid/segment/ReferenceCountingSequence.java index 402e63dc31d..4c5cbfc3d3d 100644 --- a/processing/src/main/java/io/druid/segment/ReferenceCountingSequence.java +++ b/processing/src/main/java/io/druid/segment/ReferenceCountingSequence.java @@ -48,4 +48,4 @@ public class ReferenceCountingSequence extends YieldingSequenceBase final Closeable closeable = segment.increment(); return new ResourceClosingYielder(baseSequence.toYielder(initValue, accumulator), closeable); } -} \ No newline at end of file +} diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 0eddf59ac98..057754e6087 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -29,6 +29,7 @@ import com.metamx.collections.spatial.search.Bound; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.granularity.QueryGranularity; +import io.druid.query.QueryInterruptedException; import io.druid.query.aggregation.Aggregator; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; @@ -200,6 +201,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } while (baseIter.hasNext()) { + if (Thread.interrupted()) { + throw new QueryInterruptedException(); + } + currEntry.set(baseIter.next()); if (filterMatcher.matches()) { @@ -239,6 +244,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter Iterators.advance(baseIter, numAdvanced); } + if (Thread.interrupted()) { + throw new QueryInterruptedException(); + } + boolean foundMatched = false; while (baseIter.hasNext()) { currEntry.set(baseIter.next()); diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java new file mode 100644 index 00000000000..f2555dd7214 --- /dev/null +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -0,0 +1,286 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; +import com.google.common.util.concurrent.ListenableFuture; +import com.metamx.common.concurrent.ExecutorServiceConfig; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.lifecycle.Lifecycle; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public class ChainedExecutionQueryRunnerTest +{ + @Test + public void testQueryCancellation() throws Exception + { + ExecutorService exec = PrioritizedExecutorService.create( + new Lifecycle(), new ExecutorServiceConfig() + { + @Override + public String getFormatString() + { + return "test"; + } + + @Override + public int getNumThreads() + { + return 2; + } + } + ); + + final CountDownLatch queriesStarted = new CountDownLatch(2); + final CountDownLatch queryIsRegistered = new CountDownLatch(1); + + Capture capturedFuture = new Capture<>(); + QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class); + watcher.registerQuery(EasyMock.anyObject(), EasyMock.and(EasyMock.anyObject(), EasyMock.capture(capturedFuture))); + EasyMock.expectLastCall() + .andAnswer( + new IAnswer() + { + @Override + public Void answer() throws Throwable + { + queryIsRegistered.countDown(); + return null; + } + } + ) + .once(); + + EasyMock.replay(watcher); + + DyingQueryRunner runner1 = new DyingQueryRunner(queriesStarted); + DyingQueryRunner runner2 = new DyingQueryRunner(queriesStarted); + DyingQueryRunner runner3 = new DyingQueryRunner(queriesStarted); + ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>( + exec, + Ordering.natural(), + watcher, + Lists.>newArrayList( + runner1, + runner2, + runner3 + ) + ); + + final Sequence seq = chainedRunner.run( + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .intervals("2014/2015") + .aggregators(Lists.newArrayList(new CountAggregatorFactory("count"))) + .build() + ); + + Future resultFuture = Executors.newFixedThreadPool(1).submit( + new Runnable() + { + @Override + public void run() + { + Sequences.toList(seq, Lists.newArrayList()); + } + } + ); + + // wait for query to register and start + Assert.assertTrue(queryIsRegistered.await(1, TimeUnit.SECONDS)); + Assert.assertTrue(queriesStarted.await(1, TimeUnit.SECONDS)); + + // cancel the query + Assert.assertTrue(capturedFuture.hasCaptured()); + ListenableFuture future = capturedFuture.getValue(); + future.cancel(true); + + QueryInterruptedException cause = null; + try { + resultFuture.get(); + } catch(ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof QueryInterruptedException); + cause = (QueryInterruptedException)e.getCause(); + } + Assert.assertNotNull(cause); + Assert.assertTrue(future.isCancelled()); + Assert.assertTrue(runner1.hasStarted); + Assert.assertTrue(runner2.hasStarted); + Assert.assertFalse(runner3.hasStarted); + Assert.assertFalse(runner1.hasCompleted); + Assert.assertFalse(runner2.hasCompleted); + Assert.assertFalse(runner3.hasCompleted); + + EasyMock.verify(watcher); + } + + @Test + public void testQueryTimeout() throws Exception + { + ExecutorService exec = PrioritizedExecutorService.create( + new Lifecycle(), new ExecutorServiceConfig() + { + @Override + public String getFormatString() + { + return "test"; + } + + @Override + public int getNumThreads() + { + return 2; + } + } + ); + + final CountDownLatch queriesStarted = new CountDownLatch(2); + final CountDownLatch queryIsRegistered = new CountDownLatch(1); + + Capture capturedFuture = new Capture<>(); + QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class); + watcher.registerQuery(EasyMock.anyObject(), EasyMock.and(EasyMock.anyObject(), EasyMock.capture(capturedFuture))); + EasyMock.expectLastCall() + .andAnswer( + new IAnswer() + { + @Override + public Void answer() throws Throwable + { + queryIsRegistered.countDown(); + return null; + } + } + ) + .once(); + + EasyMock.replay(watcher); + + DyingQueryRunner runner1 = new DyingQueryRunner(queriesStarted); + DyingQueryRunner runner2 = new DyingQueryRunner(queriesStarted); + DyingQueryRunner runner3 = new DyingQueryRunner(queriesStarted); + ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>( + exec, + Ordering.natural(), + watcher, + Lists.>newArrayList( + runner1, + runner2, + runner3 + ) + ); + + final Sequence seq = chainedRunner.run( + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .intervals("2014/2015") + .aggregators(Lists.newArrayList(new CountAggregatorFactory("count"))) + .context(ImmutableMap.of("timeout", (100), "queryId", "test")) + .build() + ); + + Future resultFuture = Executors.newFixedThreadPool(1).submit( + new Runnable() + { + @Override + public void run() + { + Sequences.toList(seq, Lists.newArrayList()); + } + } + ); + + // wait for query to register and start + Assert.assertTrue(queryIsRegistered.await(1, TimeUnit.SECONDS)); + Assert.assertTrue(queriesStarted.await(1, TimeUnit.SECONDS)); + + // cancel the query + Assert.assertTrue(capturedFuture.hasCaptured()); + ListenableFuture future = capturedFuture.getValue(); + + QueryInterruptedException cause = null; + try { + resultFuture.get(); + } catch(ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof QueryInterruptedException); + Assert.assertEquals("Query timeout", e.getCause().getMessage()); + cause = (QueryInterruptedException)e.getCause(); + } + Assert.assertNotNull(cause); + Assert.assertTrue(future.isCancelled()); + Assert.assertTrue(runner1.hasStarted); + Assert.assertTrue(runner2.hasStarted); + Assert.assertFalse(runner3.hasStarted); + Assert.assertFalse(runner1.hasCompleted); + Assert.assertFalse(runner2.hasCompleted); + Assert.assertFalse(runner3.hasCompleted); + + EasyMock.verify(watcher); + } + + private static class DyingQueryRunner implements QueryRunner + { + private final CountDownLatch latch; + private boolean hasStarted = false; + private boolean hasCompleted = false; + + public DyingQueryRunner(CountDownLatch latch) + { + this.latch = latch; + } + + @Override + public Sequence run(Query query) + { + hasStarted = true; + latch.countDown(); + if (Thread.interrupted()) { + throw new QueryInterruptedException("I got killed"); + } + + // do a lot of work + try { + Thread.sleep(500); + } + catch (InterruptedException e) { + throw new QueryInterruptedException("I got killed"); + } + + hasCompleted = true; + return Sequences.simple(Lists.newArrayList(123)); + } + } +} diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index ffa6b02c236..55c29752ac1 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -21,6 +21,7 @@ package io.druid.query; import com.google.common.base.Function; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -53,6 +54,16 @@ import java.util.List; */ public class QueryRunnerTestHelper { + + public static final QueryWatcher NOOP_QUERYWATCHER = new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }; + public static final String segmentId = "testSegment"; public static final String dataSource = "testing"; public static final UnionDataSource unionDataSource = new UnionDataSource( diff --git a/processing/src/test/java/io/druid/query/TestQueryRunners.java b/processing/src/test/java/io/druid/query/TestQueryRunners.java index c4767c1c6f9..f50e81d038e 100644 --- a/processing/src/test/java/io/druid/query/TestQueryRunners.java +++ b/processing/src/test/java/io/druid/query/TestQueryRunners.java @@ -1,11 +1,14 @@ package io.druid.query; import com.google.common.base.Supplier; +import com.google.common.util.concurrent.ListenableFuture; import io.druid.collections.StupidPool; import io.druid.query.search.SearchQueryQueryToolChest; import io.druid.query.search.SearchQueryRunnerFactory; import io.druid.query.search.search.SearchQueryConfig; import io.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory; +import io.druid.query.timeseries.TimeseriesQueryEngine; +import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; import io.druid.query.topn.TopNQueryConfig; import io.druid.query.topn.TopNQueryQueryToolChest; @@ -40,7 +43,11 @@ public class TestQueryRunners Segment adapter ) { - QueryRunnerFactory factory = new TopNQueryRunnerFactory(pool, new TopNQueryQueryToolChest(topNConfig)); + QueryRunnerFactory factory = new TopNQueryRunnerFactory( + pool, + new TopNQueryQueryToolChest(topNConfig), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); return new FinalizeResultsQueryRunner( factory.createRunner(adapter), factory.getToolchest() @@ -51,7 +58,12 @@ public class TestQueryRunners Segment adapter ) { - QueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create(); + QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + return new FinalizeResultsQueryRunner( factory.createRunner(adapter), factory.getToolchest() @@ -62,7 +74,7 @@ public class TestQueryRunners Segment adapter ) { - QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig())); + QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()), QueryRunnerTestHelper.NOOP_QUERYWATCHER); return new FinalizeResultsQueryRunner( factory.createRunner(adapter), factory.getToolchest() @@ -73,11 +85,10 @@ public class TestQueryRunners Segment adapter ) { - QueryRunnerFactory factory = new TimeBoundaryQueryRunnerFactory(); + QueryRunnerFactory factory = new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER); return new FinalizeResultsQueryRunner( factory.createRunner(adapter), factory.getToolchest() ); } - -} \ No newline at end of file +} diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 97e64a0ec0c..be8fe086d78 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -113,6 +113,7 @@ public class GroupByQueryRunnerTest final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( engine, + QueryRunnerTestHelper.NOOP_QUERYWATCHER, configSupplier, new GroupByQueryQueryToolChest(configSupplier, engine) ); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java index 2538e91bc76..a9fb506ca0b 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java @@ -72,6 +72,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( engine, + QueryRunnerTestHelper.NOOP_QUERYWATCHER, configSupplier, new GroupByQueryQueryToolChest(configSupplier, engine) ); diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java index 913894459c2..70c65f8da88 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java @@ -25,6 +25,7 @@ import io.druid.query.LegacyDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryWatcher; import io.druid.query.metadata.metadata.ColumnAnalysis; import io.druid.query.metadata.metadata.SegmentAnalysis; import io.druid.query.metadata.metadata.SegmentMetadataQuery; @@ -95,7 +96,7 @@ public class SegmentAnalyzerTest private List getSegmentAnalysises(Segment index) { final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner( - (QueryRunnerFactory) new SegmentMetadataQueryRunnerFactory(), index + (QueryRunnerFactory) new SegmentMetadataQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER), index ); final SegmentMetadataQuery query = new SegmentMetadataQuery( diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java index 5d596292348..ed1740460f8 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java @@ -45,7 +45,7 @@ public class SegmentMetadataQueryTest { @SuppressWarnings("unchecked") private final QueryRunner runner = makeQueryRunner( - new SegmentMetadataQueryRunnerFactory() + new SegmentMetadataQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER) ); private ObjectMapper mapper = new DefaultObjectMapper(); diff --git a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java index 2e5623b4365..c69ee1c5a27 100644 --- a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java @@ -23,10 +23,13 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.guava.Sequences; import io.druid.query.Druids; +import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.query.filter.DimFilter; import io.druid.query.search.search.FragmentSearchQuerySpec; @@ -56,7 +59,10 @@ public class SearchQueryRunnerTest public static Collection constructorFeeder() throws IOException { return QueryRunnerTestHelper.makeQueryRunners( - new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig())) + new SearchQueryRunnerFactory( + new SearchQueryQueryToolChest(new SearchQueryConfig()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ) ); } diff --git a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java index 5015239870e..07f99165873 100644 --- a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java @@ -22,11 +22,15 @@ package io.druid.query.select; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.ISE; import com.metamx.common.guava.Sequences; import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.Query; +import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.query.TableDataSource; import io.druid.query.filter.SelectorDimFilter; @@ -54,7 +58,11 @@ public class SelectQueryRunnerTest public static Collection constructorFeeder() throws IOException { return QueryRunnerTestHelper.makeQueryRunners( - SelectQueryRunnerFactory.create(new DefaultObjectMapper()) + new SelectQueryRunnerFactory( + new SelectQueryQueryToolChest(new QueryConfig(), new DefaultObjectMapper()), + new SelectQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ) ); } diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java index 9b92826ac34..7bc499dca80 100644 --- a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java @@ -20,10 +20,13 @@ package io.druid.query.timeboundary; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.guava.Sequences; import io.druid.query.Druids; +import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import org.joda.time.DateTime; import org.junit.Assert; @@ -43,7 +46,7 @@ public class TimeBoundaryQueryRunnerTest public static Collection constructorFeeder() throws IOException { return QueryRunnerTestHelper.makeQueryRunners( - new TimeBoundaryQueryRunnerFactory() + new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER) ); } diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java index 36e1fc13955..17d61908c3c 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.metamx.common.guava.Sequences; import io.druid.query.Druids; +import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; @@ -46,7 +47,11 @@ public class TimeSeriesUnionQueryRunnerTest public static Collection constructorFeeder() throws IOException { return QueryRunnerTestHelper.makeUnionQueryRunners( - TimeseriesQueryRunnerFactory.create() + new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ) ); } diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java index d1497a19026..67c91b4be40 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java @@ -28,8 +28,10 @@ import io.druid.granularity.QueryGranularity; import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -87,7 +89,12 @@ public class TimeseriesQueryRunnerBonusTest private static List> runTimeseriesCount(IncrementalIndex index) { - final QueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create(); + final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + final QueryRunner> runner = makeQueryRunner( factory, new IncrementalIndexSegment(index, null) diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java index cfc26c4326a..708a7de1054 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java @@ -26,6 +26,7 @@ import com.metamx.common.guava.Sequences; import io.druid.granularity.PeriodGranularity; import io.druid.granularity.QueryGranularity; import io.druid.query.Druids; +import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; @@ -62,7 +63,11 @@ public class TimeseriesQueryRunnerTest public static Collection constructorFeeder() throws IOException { return QueryRunnerTestHelper.makeQueryRunners( - TimeseriesQueryRunnerFactory.create() + new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ) ); } diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java index 22b750faf00..09d383168cf 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java @@ -68,7 +68,8 @@ public class TopNQueryRunnerTest QueryRunnerTestHelper.makeQueryRunners( new TopNQueryRunnerFactory( TestQueryRunners.getPool(), - new TopNQueryQueryToolChest(new TopNQueryConfig()) + new TopNQueryQueryToolChest(new TopNQueryConfig()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER ) ) ); @@ -85,7 +86,8 @@ public class TopNQueryRunnerTest } } ), - new TopNQueryQueryToolChest(new TopNQueryConfig()) + new TopNQueryQueryToolChest(new TopNQueryConfig()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER ) ) ); diff --git a/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java index 1fdc3b11cf5..7dc7b645cad 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java @@ -65,7 +65,8 @@ public class TopNUnionQueryTest QueryRunnerTestHelper.makeUnionQueryRunners( new TopNQueryRunnerFactory( TestQueryRunners.getPool(), - new TopNQueryQueryToolChest(new TopNQueryConfig()) + new TopNQueryQueryToolChest(new TopNQueryConfig()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER ) ) ); @@ -82,7 +83,8 @@ public class TopNUnionQueryTest } } ), - new TopNQueryQueryToolChest(new TopNQueryConfig()) + new TopNQueryQueryToolChest(new TopNQueryConfig()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER ) ) ); diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java index 0eb327972ee..c8155526a89 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java @@ -29,13 +29,17 @@ import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.granularity.QueryGranularity; import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; +import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.filter.SpatialDimFilter; import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.timeseries.TimeseriesQueryEngine; +import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; import io.druid.query.timeseries.TimeseriesResultValue; import io.druid.segment.IncrementalIndexSegment; @@ -434,7 +438,12 @@ public class SpatialFilterBonusTest ) ); try { - TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create(); + TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + QueryRunner runner = new FinalizeResultsQueryRunner( factory.createRunner(segment), factory.getToolchest() @@ -516,7 +525,12 @@ public class SpatialFilterBonusTest ) ); try { - TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create(); + TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + QueryRunner runner = new FinalizeResultsQueryRunner( factory.createRunner(segment), factory.getToolchest() @@ -528,4 +542,4 @@ public class SpatialFilterBonusTest throw Throwables.propagate(e); } } -} \ No newline at end of file +} diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java index d342c12c577..84df58a260d 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java @@ -29,13 +29,17 @@ import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.granularity.QueryGranularity; import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; +import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.filter.SpatialDimFilter; import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.timeseries.TimeseriesQueryEngine; +import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; import io.druid.query.timeseries.TimeseriesResultValue; import io.druid.segment.IncrementalIndexSegment; @@ -464,7 +468,12 @@ public class SpatialFilterTest ) ); try { - TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create(); + TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + QueryRunner runner = new FinalizeResultsQueryRunner( factory.createRunner(segment), factory.getToolchest() @@ -546,7 +555,12 @@ public class SpatialFilterTest ) ); try { - TimeseriesQueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create(); + TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ); + QueryRunner runner = new FinalizeResultsQueryRunner( factory.createRunner(segment), factory.getToolchest() @@ -558,4 +572,4 @@ public class SpatialFilterTest throw Throwables.propagate(e); } } -} \ No newline at end of file +} diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index 57663154156..0070622ac85 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -32,10 +32,9 @@ import io.druid.client.selector.TierSelectorStrategy; import io.druid.concurrent.Execs; import io.druid.guice.annotations.Client; import io.druid.query.DataSource; -import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChestWarehouse; -import io.druid.query.TableDataSource; +import io.druid.query.QueryWatcher; import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import io.druid.timeline.VersionedIntervalTimeline; @@ -60,6 +59,7 @@ public class BrokerServerView implements TimelineServerView private final Map> timelines; private final QueryToolChestWarehouse warehouse; + private final QueryWatcher queryWatcher; private final ObjectMapper smileMapper; private final HttpClient httpClient; private final ServerInventoryView baseView; @@ -68,6 +68,7 @@ public class BrokerServerView implements TimelineServerView @Inject public BrokerServerView( QueryToolChestWarehouse warehouse, + QueryWatcher queryWatcher, ObjectMapper smileMapper, @Client HttpClient httpClient, ServerInventoryView baseView, @@ -75,6 +76,7 @@ public class BrokerServerView implements TimelineServerView ) { this.warehouse = warehouse; + this.queryWatcher = queryWatcher; this.smileMapper = smileMapper; this.httpClient = httpClient; this.baseView = baseView; @@ -154,7 +156,7 @@ public class BrokerServerView implements TimelineServerView private DirectDruidClient makeDirectClient(DruidServer server) { - return new DirectDruidClient(warehouse, smileMapper, httpClient, server.getHost()); + return new DirectDruidClient(warehouse, queryWatcher, smileMapper, httpClient, server.getHost()); } private QueryableDruidServer removeServer(DruidServer server) diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index ae994d1cda7..b6030f9755b 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.type.TypeFactory; import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Charsets; import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.io.Closeables; @@ -43,11 +44,15 @@ import com.metamx.http.client.HttpClient; import com.metamx.http.client.io.AppendableByteArrayInputStream; import com.metamx.http.client.response.ClientResponse; import com.metamx.http.client.response.InputStreamResponseHandler; +import com.metamx.http.client.response.StatusResponseHandler; +import com.metamx.http.client.response.StatusResponseHolder; import io.druid.query.BySegmentResultValueClass; import io.druid.query.Query; +import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.query.aggregation.MetricManipulatorFns; import org.jboss.netty.handler.codec.http.HttpChunk; @@ -60,6 +65,7 @@ import java.io.InputStream; import java.net.URL; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; @@ -73,6 +79,7 @@ public class DirectDruidClient implements QueryRunner private static final Map, Pair> typesMap = Maps.newConcurrentMap(); private final QueryToolChestWarehouse warehouse; + private final QueryWatcher queryWatcher; private final ObjectMapper objectMapper; private final HttpClient httpClient; private final String host; @@ -82,12 +89,14 @@ public class DirectDruidClient implements QueryRunner public DirectDruidClient( QueryToolChestWarehouse warehouse, + QueryWatcher queryWatcher, ObjectMapper objectMapper, HttpClient httpClient, String host ) { this.warehouse = warehouse; + this.queryWatcher = queryWatcher; this.objectMapper = objectMapper; this.httpClient = httpClient; this.host = host; @@ -102,7 +111,7 @@ public class DirectDruidClient implements QueryRunner } @Override - public Sequence run(Query query) + public Sequence run(final Query query) { QueryToolChest> toolChest = warehouse.getToolChest(query); boolean isBySegment = query.getContextBySegment(false); @@ -127,6 +136,7 @@ public class DirectDruidClient implements QueryRunner final ListenableFuture future; final String url = String.format("http://%s/druid/v2/", host); + final String cancelUrl = String.format("http://%s/druid/v2/%s", host, query.getId()); try { log.debug("Querying url[%s]", url); @@ -174,6 +184,9 @@ public class DirectDruidClient implements QueryRunner } } ); + + queryWatcher.registerQuery(query, future); + openConnections.getAndIncrement(); Futures.addCallback( future, new FutureCallback() @@ -188,6 +201,27 @@ public class DirectDruidClient implements QueryRunner public void onFailure(Throwable t) { openConnections.getAndDecrement(); + if (future.isCancelled()) { + // forward the cancellation to underlying queriable node + try { + StatusResponseHolder res = httpClient + .delete(new URL(cancelUrl)) + .setContent(objectMapper.writeValueAsBytes(query)) + .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? "application/smile" : "application/json") + .go(new StatusResponseHandler(Charsets.UTF_8)) + .get(); + if (res.getStatus().getCode() >= 500) { + throw new RE( + "Error cancelling query[%s]: queriable node returned status[%d] [%s].", + res.getStatus().getCode(), + res.getStatus().getReasonPhrase() + ); + } + } + catch (IOException | ExecutionException | InterruptedException e) { + Throwables.propagate(e); + } + } } } ); @@ -196,7 +230,7 @@ public class DirectDruidClient implements QueryRunner throw Throwables.propagate(e); } - Sequence retVal = new BaseSequence>( + Sequence retVal = new BaseSequence<>( new BaseSequence.IteratorMaker>() { @Override @@ -283,21 +317,23 @@ public class DirectDruidClient implements QueryRunner if (jp == null) { try { jp = objectMapper.getFactory().createParser(future.get()); - if (jp.nextToken() != JsonToken.START_ARRAY) { + final JsonToken nextToken = jp.nextToken(); + if (nextToken == JsonToken.START_OBJECT) { + QueryInterruptedException e = jp.getCodec().readValue(jp, QueryInterruptedException.class); + throw e; + } + else if (nextToken != JsonToken.START_ARRAY) { throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url); } else { jp.nextToken(); objectCodec = jp.getCodec(); } } - catch (IOException e) { + catch (IOException | InterruptedException | ExecutionException e) { throw new RE(e, "Failure getting results from[%s]", url); } - catch (InterruptedException e) { - throw new RE(e, "Failure getting results from[%s]", url); - } - catch (ExecutionException e) { - throw new RE(e, "Failure getting results from[%s]", url); + catch (CancellationException e) { + throw new QueryInterruptedException("Query cancelled"); } } } diff --git a/server/src/main/java/io/druid/client/RoutingDruidClient.java b/server/src/main/java/io/druid/client/RoutingDruidClient.java index 10170fcfb9e..79ae1c16f6d 100644 --- a/server/src/main/java/io/druid/client/RoutingDruidClient.java +++ b/server/src/main/java/io/druid/client/RoutingDruidClient.java @@ -22,6 +22,7 @@ package io.druid.client; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMultimap; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -30,7 +31,9 @@ import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.HttpResponseHandler; import io.druid.guice.annotations.Client; import io.druid.query.Query; +import io.druid.server.QueryResource; import io.druid.server.router.Router; +import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.handler.codec.http.HttpHeaders; import javax.inject.Inject; @@ -68,7 +71,7 @@ public class RoutingDruidClient return openConnections.get(); } - public ListenableFuture post( + public ListenableFuture postQuery( String url, Query query, HttpResponseHandler responseHandler @@ -81,7 +84,7 @@ public class RoutingDruidClient future = httpClient .post(new URL(url)) .setContent(objectMapper.writeValueAsBytes(query)) - .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? "application/smile" : "application/json") + .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? QueryResource.APPLICATION_SMILE : QueryResource.APPLICATION_JSON) .go(responseHandler); openConnections.getAndIncrement(); @@ -125,4 +128,19 @@ public class RoutingDruidClient throw Throwables.propagate(e); } } + + public ListenableFuture delete( + String url, + HttpResponseHandler responseHandler + ) + { + try { + return httpClient + .delete(new URL(url)) + .go(responseHandler); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } } diff --git a/server/src/main/java/io/druid/guice/QueryRunnerFactoryModule.java b/server/src/main/java/io/druid/guice/QueryRunnerFactoryModule.java index 6f4dc80b059..6c1bb62cafe 100644 --- a/server/src/main/java/io/druid/guice/QueryRunnerFactoryModule.java +++ b/server/src/main/java/io/druid/guice/QueryRunnerFactoryModule.java @@ -24,6 +24,7 @@ import com.google.inject.Binder; import com.google.inject.multibindings.MapBinder; import io.druid.query.Query; import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryWatcher; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryEngine; import io.druid.query.groupby.GroupByQueryRunnerFactory; @@ -39,6 +40,7 @@ import io.druid.query.timeseries.TimeseriesQuery; import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; import io.druid.query.topn.TopNQuery; import io.druid.query.topn.TopNQueryRunnerFactory; +import io.druid.server.QueryManager; import java.util.Map; @@ -62,6 +64,12 @@ public class QueryRunnerFactoryModule extends QueryToolChestModule { super.configure(binder); + binder.bind(QueryWatcher.class) + .to(QueryManager.class) + .in(LazySingleton.class); + binder.bind(QueryManager.class) + .in(LazySingleton.class); + final MapBinder, QueryRunnerFactory> queryFactoryBinder = DruidBinders.queryRunnerFactoryBinder( binder ); diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index 5360d529ad3..3d82cc013ea 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -21,7 +21,11 @@ package io.druid.server; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.google.common.base.Function; import com.google.common.base.Joiner; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableMap; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; @@ -37,19 +41,20 @@ import io.druid.server.log.RequestLogger; import io.druid.server.router.QueryHostFinder; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.handler.codec.http.HttpChunk; +import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpResponse; import org.joda.time.DateTime; +import javax.annotation.Nullable; import javax.servlet.AsyncContext; import javax.servlet.ServletException; -import javax.servlet.ServletOutputStream; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.OutputStream; -import java.nio.charset.Charset; +import java.util.Map; import java.util.UUID; /** @@ -59,8 +64,6 @@ import java.util.UUID; public class AsyncQueryForwardingServlet extends HttpServlet { private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class); - private static final Charset UTF8 = Charset.forName("UTF-8"); - private static final String DISPATCHED = "dispatched"; private static final Joiner COMMA_JOIN = Joiner.on(","); private final ObjectMapper jsonMapper; @@ -88,275 +91,161 @@ public class AsyncQueryForwardingServlet extends HttpServlet } @Override - protected void doGet(final HttpServletRequest req, final HttpServletResponse resp) + protected void doGet(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException { - OutputStream out = null; - AsyncContext ctx = null; - - try { - ctx = req.startAsync(req, resp); - final AsyncContext asyncContext = ctx; - - if (req.getAttribute(DISPATCHED) != null) { - return; - } - - out = resp.getOutputStream(); - final OutputStream outputStream = out; - - final String host = hostFinder.getDefaultHost(); - - final HttpResponseHandler responseHandler = new HttpResponseHandler() - { - @Override - public ClientResponse handleResponse(HttpResponse response) + final AsyncContext asyncContext = req.startAsync(req, res); + asyncContext.start( + new Runnable() { - resp.setStatus(response.getStatus().getCode()); - resp.setContentType("application/json"); - - try { - ChannelBuffer buf = response.getContent(); - buf.readBytes(outputStream, buf.readableBytes()); - } - catch (Exception e) { - asyncContext.complete(); - throw Throwables.propagate(e); - } - - return ClientResponse.finished(outputStream); - } - - @Override - public ClientResponse handleChunk( - ClientResponse clientResponse, HttpChunk chunk - ) - { - try { - ChannelBuffer buf = chunk.getContent(); - buf.readBytes(outputStream, buf.readableBytes()); - } - catch (Exception e) { - asyncContext.complete(); - throw Throwables.propagate(e); - } - return clientResponse; - } - - @Override - public ClientResponse done(ClientResponse clientResponse) - { - final OutputStream obj = clientResponse.getObj(); - try { - resp.flushBuffer(); - outputStream.close(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - finally { - asyncContext.complete(); - } - - return ClientResponse.finished(obj); - } - - @Override - public void exceptionCaught( - ClientResponse clientResponse, - Throwable e - ) - { - handleException(resp, asyncContext, e); - } - }; - - asyncContext.start( - new Runnable() + @Override + public void run() { - @Override - public void run() - { - routingDruidClient.get(makeUrl(host, req), responseHandler); + try { + final HttpResponseHandler responseHandler = new PassthroughHttpResponseHandler(asyncContext, jsonMapper); + + final String host = hostFinder.getDefaultHost(); + routingDruidClient.get(makeUrl(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler); + } + catch (Exception e) { + handleException(jsonMapper, asyncContext, e); } } - ); - - asyncContext.dispatch(); - req.setAttribute(DISPATCHED, true); - } - catch (Exception e) { - handleException(resp, ctx, e); - } + } + ); } @Override - protected void doPost( - final HttpServletRequest req, final HttpServletResponse resp - ) throws ServletException, IOException + protected void doDelete(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException { - final long start = System.currentTimeMillis(); - Query query = null; - String queryId; - - final boolean isSmile = "application/smile".equals(req.getContentType()); - - final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; - - OutputStream out = null; - AsyncContext ctx = null; - - try { - ctx = req.startAsync(req, resp); - final AsyncContext asyncContext = ctx; - - if (req.getAttribute(DISPATCHED) != null) { - return; - } - - query = objectMapper.readValue(req.getInputStream(), Query.class); - queryId = query.getId(); - if (queryId == null) { - queryId = UUID.randomUUID().toString(); - query = query.withId(queryId); - } - - if (log.isDebugEnabled()) { - log.debug("Got query [%s]", query); - } - - out = resp.getOutputStream(); - final OutputStream outputStream = out; - - final String host = hostFinder.getHost(query); - - final Query theQuery = query; - final String theQueryId = queryId; - - final HttpResponseHandler responseHandler = new HttpResponseHandler() - { - @Override - public ClientResponse handleResponse(HttpResponse response) + final AsyncContext asyncContext = req.startAsync(req, res); + asyncContext.start( + new Runnable() { - resp.setStatus(response.getStatus().getCode()); - resp.setContentType("application/x-javascript"); - - try { - ChannelBuffer buf = response.getContent(); - buf.readBytes(outputStream, buf.readableBytes()); - } - catch (Exception e) { - asyncContext.complete(); - throw Throwables.propagate(e); - } - return ClientResponse.finished(outputStream); - } - - @Override - public ClientResponse handleChunk( - ClientResponse clientResponse, HttpChunk chunk - ) - { - try { - ChannelBuffer buf = chunk.getContent(); - buf.readBytes(outputStream, buf.readableBytes()); - } - catch (Exception e) { - asyncContext.complete(); - throw Throwables.propagate(e); - } - return clientResponse; - } - - @Override - public ClientResponse done(ClientResponse clientResponse) - { - final long requestTime = System.currentTimeMillis() - start; - - log.debug("Request time: %d", requestTime); - - emitter.emit( - new ServiceMetricEvent.Builder() - .setUser2(DataSourceUtil.getMetricName(theQuery.getDataSource())) - .setUser4(theQuery.getType()) - .setUser5(COMMA_JOIN.join(theQuery.getIntervals())) - .setUser6(String.valueOf(theQuery.hasFilters())) - .setUser7(req.getRemoteAddr()) - .setUser8(theQueryId) - .setUser9(theQuery.getDuration().toPeriod().toStandardMinutes().toString()) - .build("request/time", requestTime) - ); - - final OutputStream obj = clientResponse.getObj(); - try { - requestLogger.log( - new RequestLogLine( - new DateTime(), - req.getRemoteAddr(), - theQuery, - new QueryStats(ImmutableMap.of("request/time", requestTime, "success", true)) - ) - ); - - resp.flushBuffer(); - outputStream.close(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - finally { - asyncContext.complete(); - } - - return ClientResponse.finished(obj); - } - - @Override - public void exceptionCaught( - ClientResponse clientResponse, - Throwable e - ) - { - handleException(resp, asyncContext, e); - } - }; - - asyncContext.start( - new Runnable() + @Override + public void run() { - @Override - public void run() - { - routingDruidClient.post(makeUrl(host, req), theQuery, responseHandler); + try { + final HttpResponseHandler responseHandler = new PassthroughHttpResponseHandler(asyncContext, jsonMapper); + + final String host = hostFinder.getDefaultHost(); + routingDruidClient.delete(makeUrl(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler); + } + catch (Exception e) { + handleException(jsonMapper, asyncContext, e); } } - ); + } + ); + } - asyncContext.dispatch(); - req.setAttribute(DISPATCHED, true); - } - catch (Exception e) { - handleException(resp, ctx, e); + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException + { + final long start = System.currentTimeMillis(); + final AsyncContext asyncContext = req.startAsync(req, res); + asyncContext.start( + new Runnable() + { + @Override + public void run() + { + final HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest(); - try { - requestLogger.log( - new RequestLogLine( - new DateTime(), - req.getRemoteAddr(), - query, - new QueryStats(ImmutableMap.of("success", false, "exception", e.toString())) - ) - ); - } - catch (Exception e2) { - log.error(e2, "Unable to log query [%s]!", query); - } + final boolean isSmile = QueryResource.APPLICATION_SMILE.equals(request.getContentType()); + final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; - log.makeAlert(e, "Exception handling request") - .addData("query", query) - .addData("peer", req.getRemoteAddr()) - .emit(); - } + Query inputQuery = null; + try { + inputQuery = objectMapper.readValue(request.getInputStream(), Query.class); + if (inputQuery.getId() == null) { + inputQuery = inputQuery.withId(UUID.randomUUID().toString()); + } + final Query query = inputQuery; + + if (log.isDebugEnabled()) { + log.debug("Got query [%s]", inputQuery); + } + + final HttpResponseHandler responseHandler = new PassthroughHttpResponseHandler( + asyncContext, + objectMapper + ) + { + @Override + public ClientResponse done(ClientResponse clientResponse) + { + final long requestTime = System.currentTimeMillis() - start; + log.debug("Request time: %d", requestTime); + + emitter.emit( + new ServiceMetricEvent.Builder() + .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) + .setUser4(query.getType()) + .setUser5(COMMA_JOIN.join(query.getIntervals())) + .setUser6(String.valueOf(query.hasFilters())) + .setUser7(request.getRemoteAddr()) + .setUser8(query.getId()) + .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()) + .build("request/time", requestTime) + ); + + try { + requestLogger.log( + new RequestLogLine( + new DateTime(), + request.getRemoteAddr(), + query, + new QueryStats( + ImmutableMap.of( + "request/time", + requestTime, + "success", + true + ) + ) + ) + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + + return super.done(clientResponse); + } + }; + + routingDruidClient.postQuery( + makeUrl(hostFinder.getHost(inputQuery), request), + inputQuery, + responseHandler + ); + } + catch (Exception e) { + handleException(objectMapper, asyncContext, e); + + try { + requestLogger.log( + new RequestLogLine( + new DateTime(), + request.getRemoteAddr(), + inputQuery, + new QueryStats(ImmutableMap.of("success", false, "exception", e.toString())) + ) + ); + } + catch (Exception logError) { + log.error(logError, "Unable to log query [%s]!", inputQuery); + } + + log.makeAlert(e, "Exception handling request") + .addData("query", inputQuery) + .addData("peer", request.getRemoteAddr()) + .emit(); + } + } + } + ); } private String makeUrl(final String host, final HttpServletRequest req) @@ -370,24 +259,126 @@ public class AsyncQueryForwardingServlet extends HttpServlet return String.format("http://%s%s?%s", host, requestURI, queryString); } - private static void handleException(HttpServletResponse resp, AsyncContext ctx, Throwable e) + private static void handleException(ObjectMapper objectMapper, AsyncContext asyncContext, Throwable exception) { try { - final ServletOutputStream out = resp.getOutputStream(); - if (!resp.isCommitted()) { - resp.setStatus(500); - resp.resetBuffer(); - out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8)); - out.write("\n".getBytes(UTF8)); + HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); + if (!response.isCommitted()) { + final String errorMessage = exception.getMessage() == null ? "null exception" : exception.getMessage(); + + response.resetBuffer(); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + objectMapper.writeValue( + response.getOutputStream(), + ImmutableMap.of( + "error", errorMessage + ) + ); + } + response.flushBuffer(); + } + catch (IOException e) { + Throwables.propagate(e); + } + finally { + asyncContext.complete(); + } + } + + private static class PassthroughHttpResponseHandler implements HttpResponseHandler + { + private final AsyncContext asyncContext; + private final ObjectMapper objectMapper; + private final OutputStream outputStream; + + public PassthroughHttpResponseHandler(AsyncContext asyncContext, ObjectMapper objectMapper) throws IOException + { + this.asyncContext = asyncContext; + this.objectMapper = objectMapper; + this.outputStream = asyncContext.getResponse().getOutputStream(); + } + + protected void copyStatusHeaders(HttpResponse clientResponse) + { + final HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); + response.setStatus(clientResponse.getStatus().getCode()); + response.setContentType(clientResponse.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + + FluentIterable.from(clientResponse.headers().entries()) + .filter(new Predicate>() + { + @Override + public boolean apply(@Nullable Map.Entry input) + { + return input.getKey().startsWith("X-Druid"); + } + } + ) + .transform( + new Function, Object>() + { + @Nullable + @Override + public Object apply(@Nullable Map.Entry input) + { + response.setHeader(input.getKey(), input.getValue()); + return null; + } + } + ) + .allMatch(Predicates.alwaysTrue()); + } + + @Override + public ClientResponse handleResponse(HttpResponse clientResponse) + { + copyStatusHeaders(clientResponse); + + try { + ChannelBuffer buf = clientResponse.getContent(); + buf.readBytes(outputStream, buf.readableBytes()); + } + catch (Exception e) { + throw Throwables.propagate(e); } - if (ctx != null) { - ctx.complete(); - } - resp.flushBuffer(); + return ClientResponse.finished(outputStream); } - catch (IOException e1) { - Throwables.propagate(e1); + + @Override + public ClientResponse handleChunk( + ClientResponse clientResponse, HttpChunk chunk + ) + { + try { + ChannelBuffer buf = chunk.getContent(); + buf.readBytes(outputStream, buf.readableBytes()); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + return clientResponse; + } + + @Override + public ClientResponse done(ClientResponse clientResponse) + { + asyncContext.complete(); + return ClientResponse.finished(clientResponse.getObj()); + } + + @Override + public void exceptionCaught( + ClientResponse clientResponse, + Throwable e + ) + { + // throwing an exception here may cause resource leak + try { + handleException(objectMapper, asyncContext, e); + } catch(Exception err) { + log.error(err, "Unable to handle exception response"); + } } } } diff --git a/server/src/main/java/io/druid/server/QueryManager.java b/server/src/main/java/io/druid/server/QueryManager.java new file mode 100644 index 00000000000..84b947414dd --- /dev/null +++ b/server/src/main/java/io/druid/server/QueryManager.java @@ -0,0 +1,68 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimaps; +import com.google.common.collect.SetMultimap; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.query.Query; +import io.druid.query.QueryWatcher; + +import java.util.Set; + +public class QueryManager implements QueryWatcher +{ + final SetMultimap queries; + + public QueryManager() + { + this.queries = Multimaps.synchronizedSetMultimap( + HashMultimap.create() + ); + } + + public boolean cancelQuery(String id) { + Set futures = queries.removeAll(id); + boolean success = true; + for (ListenableFuture future : futures) { + success = success && future.cancel(true); + } + return success; + } + + public void registerQuery(Query query, final ListenableFuture future) + { + final String id = query.getId(); + queries.put(id, future); + future.addListener( + new Runnable() + { + @Override + public void run() + { + queries.remove(id, future); + } + }, + MoreExecutors.sameThreadExecutor() + ); + } +} diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 6319a0d4633..33bdd519c83 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -19,16 +19,23 @@ package io.druid.server; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; import com.google.inject.Inject; +import com.metamx.common.guava.Accumulator; +import com.metamx.common.guava.Accumulators; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import com.metamx.common.guava.Yielder; +import com.metamx.common.guava.YieldingAccumulator; +import com.metamx.common.guava.YieldingAccumulators; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; @@ -36,6 +43,7 @@ import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.query.DataSourceUtil; import io.druid.query.Query; +import io.druid.query.QueryInterruptedException; import io.druid.query.QuerySegmentWalker; import io.druid.server.log.RequestLogger; import org.joda.time.DateTime; @@ -43,10 +51,17 @@ import org.joda.time.DateTime; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; import javax.ws.rs.POST; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.Charset; @@ -58,14 +73,16 @@ import java.util.UUID; public class QueryResource { private static final EmittingLogger log = new EmittingLogger(QueryResource.class); - private static final Charset UTF8 = Charset.forName("UTF-8"); private static final Joiner COMMA_JOIN = Joiner.on(","); + public static final String APPLICATION_SMILE = "application/smile"; + public static final String APPLICATION_JSON = "application/json"; private final ObjectMapper jsonMapper; private final ObjectMapper smileMapper; private final QuerySegmentWalker texasRanger; private final ServiceEmitter emitter; private final RequestLogger requestLogger; + private final QueryManager queryManager; @Inject public QueryResource( @@ -73,35 +90,49 @@ public class QueryResource @Smile ObjectMapper smileMapper, QuerySegmentWalker texasRanger, ServiceEmitter emitter, - RequestLogger requestLogger + RequestLogger requestLogger, + QueryManager queryManager ) { - this.jsonMapper = jsonMapper; - this.smileMapper = smileMapper; + this.jsonMapper = jsonMapper.copy(); + this.jsonMapper.getFactory().configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); + + this.smileMapper = smileMapper.copy(); + this.smileMapper.getFactory().configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); + this.texasRanger = texasRanger; this.emitter = emitter; this.requestLogger = requestLogger; + this.queryManager = queryManager; + } + + @DELETE + @Path("{id}") + @Produces("application/json") + public Response getServer(@PathParam("id") String queryId) + { + queryManager.cancelQuery(queryId); + return Response.status(Response.Status.ACCEPTED).build(); + } @POST - @Produces("application/json") - public void doPost( + public Response doPost( @Context HttpServletRequest req, - @Context HttpServletResponse resp + @Context final HttpServletResponse resp ) throws ServletException, IOException { final long start = System.currentTimeMillis(); Query query = null; byte[] requestQuery = null; - String queryId; + String queryId = null; - final boolean isSmile = "application/smile".equals(req.getContentType()); + final boolean isSmile = APPLICATION_SMILE.equals(req.getContentType()); ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; - ObjectWriter jsonWriter = req.getParameter("pretty") == null + final ObjectWriter jsonWriter = req.getParameter("pretty") == null ? objectMapper.writer() : objectMapper.writerWithDefaultPrettyPrinter(); - OutputStream out = null; try { requestQuery = ByteStreams.toByteArray(req.getInputStream()); @@ -116,45 +147,92 @@ public class QueryResource log.debug("Got query [%s]", query); } - Sequence results = query.run(texasRanger); + Sequence results = query.run(texasRanger); if (results == null) { results = Sequences.empty(); } - resp.setStatus(200); - resp.setContentType("application/x-javascript"); + try ( + final Yielder yielder = results.toYielder( + null, + new YieldingAccumulator() + { + @Override + public Object accumulate(Object accumulated, Object in) + { + yield(); + return in; + } + } + ) + ) { + long requestTime = System.currentTimeMillis() - start; - out = resp.getOutputStream(); - jsonWriter.writeValue(out, results); + emitter.emit( + new ServiceMetricEvent.Builder() + .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) + .setUser4(query.getType()) + .setUser5(COMMA_JOIN.join(query.getIntervals())) + .setUser6(String.valueOf(query.hasFilters())) + .setUser7(req.getRemoteAddr()) + .setUser8(queryId) + .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()) + .build("request/time", requestTime) + ); - long requestTime = System.currentTimeMillis() - start; + requestLogger.log( + new RequestLogLine( + new DateTime(), + req.getRemoteAddr(), + query, + new QueryStats( + ImmutableMap.of( + "request/time", requestTime, + "success", true + ) + ) + ) + ); - emitter.emit( - new ServiceMetricEvent.Builder() - .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) - .setUser4(query.getType()) - .setUser5(COMMA_JOIN.join(query.getIntervals())) - .setUser6(String.valueOf(query.hasFilters())) - .setUser7(req.getRemoteAddr()) - .setUser8(queryId) - .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()) - .build("request/time", requestTime) - ); - - requestLogger.log( - new RequestLogLine( - new DateTime(), - req.getRemoteAddr(), - query, - new QueryStats( - ImmutableMap.of( - "request/time", requestTime, - "success", true - ) + return Response + .ok( + new StreamingOutput() + { + @Override + public void write(OutputStream outputStream) throws IOException, WebApplicationException + { + jsonWriter.writeValue(outputStream, yielder); + outputStream.close(); + } + }, + isSmile ? APPLICATION_JSON : APPLICATION_SMILE + ) + .header("X-Druid-Query-Id", queryId) + .build(); + } + } + catch (QueryInterruptedException e) { + try { + log.info("%s [%s]", e.getMessage(), queryId); + requestLogger.log( + new RequestLogLine( + new DateTime(), + req.getRemoteAddr(), + query, + new QueryStats(ImmutableMap.of("success", false, "interrupted", true, "reason", e.toString())) + ) + ); + } catch (Exception e2) { + log.error(e2, "Unable to log query [%s]!", query); + } + return Response.serverError().entity( + jsonWriter.writeValueAsString( + ImmutableMap.of( + "error", e.getMessage() ) ) - ); + ).build(); } catch (Exception e) { final String queryString = @@ -164,20 +242,6 @@ public class QueryResource log.warn(e, "Exception occurred on request [%s]", queryString); - if (!resp.isCommitted()) { - resp.setStatus(500); - resp.resetBuffer(); - - if (out == null) { - out = resp.getOutputStream(); - } - - out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8)); - out.write("\n".getBytes(UTF8)); - } - - resp.flushBuffer(); - try { requestLogger.log( new RequestLogLine( @@ -197,10 +261,14 @@ public class QueryResource .addData("query", queryString) .addData("peer", req.getRemoteAddr()) .emit(); - } - finally { - resp.flushBuffer(); - Closeables.closeQuietly(out); + + return Response.serverError().entity( + jsonWriter.writeValueAsString( + ImmutableMap.of( + "error", e.getMessage() == null ? "null exception" : e.getMessage() + ) + ) + ).build(); } } } diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 537cc0145fb..6bc703297e5 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -21,7 +21,6 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; -import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; diff --git a/server/src/test/java/io/druid/client/DirectDruidClientTest.java b/server/src/test/java/io/druid/client/DirectDruidClientTest.java index aba91657686..4ad8ca5cd51 100644 --- a/server/src/test/java/io/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/io/druid/client/DirectDruidClientTest.java @@ -21,30 +21,37 @@ package io.druid.client; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.http.client.HttpClient; import com.metamx.http.client.Request; import com.metamx.http.client.RequestBuilder; +import com.metamx.http.client.response.StatusResponseHolder; import io.druid.client.selector.ConnectionCountServerSelectorStrategy; import io.druid.client.selector.HighestPriorityTierSelectorStrategy; import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.ServerSelector; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.Druids; +import io.druid.query.Query; +import io.druid.query.QueryInterruptedException; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryWatcher; import io.druid.query.ReflectionQueryToolChestWarehouse; import io.druid.query.Result; import io.druid.query.timeboundary.TimeBoundaryQuery; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; -import junit.framework.Assert; import org.easymock.EasyMock; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.timeout.ReadTimeoutException; import org.joda.time.DateTime; import org.joda.time.Interval; -import org.junit.Before; +import org.junit.Assert; import org.junit.Test; import java.io.ByteArrayInputStream; @@ -54,17 +61,10 @@ import java.util.List; public class DirectDruidClientTest { - private HttpClient httpClient; - - @Before - public void setUp() throws Exception - { - httpClient = EasyMock.createMock(HttpClient.class); - } - @Test public void testRun() throws Exception { + HttpClient httpClient = EasyMock.createMock(HttpClient.class); RequestBuilder requestBuilder = new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com")); EasyMock.expect(httpClient.post(EasyMock.anyObject())).andReturn(requestBuilder).atLeastOnce(); @@ -93,12 +93,14 @@ public class DirectDruidClientTest DirectDruidClient client1 = new DirectDruidClient( new ReflectionQueryToolChestWarehouse(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER, new DefaultObjectMapper(), httpClient, "foo" ); DirectDruidClient client2 = new DirectDruidClient( new ReflectionQueryToolChestWarehouse(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER, new DefaultObjectMapper(), httpClient, "foo2" @@ -149,4 +151,70 @@ public class DirectDruidClientTest EasyMock.verify(httpClient); } + + @Test + public void testCancel() throws Exception + { + HttpClient httpClient = EasyMock.createStrictMock(HttpClient.class); + EasyMock.expect(httpClient.post(EasyMock.anyObject())).andReturn( + new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com")) + ).once(); + + ListenableFuture cancelledFuture = Futures.immediateCancelledFuture(); + EasyMock.expect(httpClient.go(EasyMock.anyObject())).andReturn(cancelledFuture).once(); + + EasyMock.expect(httpClient.delete(EasyMock.anyObject())) + .andReturn(new RequestBuilder(httpClient, HttpMethod.DELETE, new URL("http://foo.com/delete"))) + .once(); + SettableFuture cancellationFuture = SettableFuture.create(); + EasyMock.expect(httpClient.go(EasyMock.anyObject())).andReturn(cancellationFuture).once(); + + EasyMock.replay(httpClient); + + final ServerSelector serverSelector = new ServerSelector( + new DataSegment( + "test", + new Interval("2013-01-01/2013-01-02"), + new DateTime("2013-01-01").toString(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + new NoneShardSpec(), + 0, + 0L + ), + new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()) + ); + + DirectDruidClient client1 = new DirectDruidClient( + new ReflectionQueryToolChestWarehouse(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER, + new DefaultObjectMapper(), + httpClient, + "foo" + ); + + QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer( + new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0), + client1 + ); + serverSelector.addServer(queryableDruidServer1); + + TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); + + cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled"))); + Sequence results = client1.run(query); + Assert.assertEquals(0, client1.getNumOpenConnections()); + + + QueryInterruptedException exception = null; + try { + Sequences.toList(results, Lists.newArrayList()); + } catch(QueryInterruptedException e) { + exception = e; + } + Assert.assertNotNull(exception); + + EasyMock.verify(httpClient); + } }