diff --git a/docs/content/Broker-Config.md b/docs/content/Broker-Config.md index 1898c8f5bc2..5bfa0b678d4 100644 --- a/docs/content/Broker-Config.md +++ b/docs/content/Broker-Config.md @@ -41,7 +41,8 @@ Druid uses Jetty to serve HTTP requests. #### Processing -The broker only uses processing configs for nested groupBy queries. +The broker uses processing configs for nested groupBy queries. And, optionally, Long-interval queries (of any type) can be broken into shorter interval queries and processed in parallel inside this thread pool. For more details, see "chunkPeriod" in [Querying](Querying.html) doc. + |Property|Description|Default| |--------|-----------|-------| @@ -52,10 +53,6 @@ The broker only uses processing configs for nested groupBy queries. #### General Query Configuration -|Property|Description|Default| -|--------|-----------|-------| -|`druid.query.chunkPeriod`|Long-interval queries (of any type) may be broken into shorter interval queries, reducing the impact on resources. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. |0 (off)| - ##### GroupBy Query Config |Property|Description|Default| diff --git a/docs/content/Querying.md b/docs/content/Querying.md index dd556b2740f..750f11e4183 100644 --- a/docs/content/Querying.md +++ b/docs/content/Querying.md @@ -145,6 +145,8 @@ Properties shared by all query types |populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration | |bySegment | `false` | Return "by segment" results. Pimarily used for debugging, setting it to `true` returns results associated with the data segment they came from | |finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | +|chunkPeriod | `0` (off) | At broker, Long-interval queries (of any type) may be broken into shorter interval queries, reducing the impact on resources. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. All the query chunks will be processed asynchronously inside query processing executor service. Make sure "druid.processing.numThreads" is configured appropriately. | + Query Cancellation ------------------ diff --git a/extensions/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramGroupByQueryTest.java b/extensions/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramGroupByQueryTest.java index f9889c5fbe1..1ab0308ffd8 100644 --- a/extensions/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramGroupByQueryTest.java +++ b/extensions/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramGroupByQueryTest.java @@ -84,7 +84,8 @@ public class ApproximateHistogramGroupByQueryTest engine, QueryRunnerTestHelper.NOOP_QUERYWATCHER, configSupplier, - new GroupByQueryQueryToolChest(configSupplier, mapper, engine, pool), + new GroupByQueryQueryToolChest(configSupplier, mapper, engine, pool, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), pool ); @@ -105,7 +106,8 @@ public class ApproximateHistogramGroupByQueryTest singleThreadEngine, QueryRunnerTestHelper.NOOP_QUERYWATCHER, singleThreadedConfigSupplier, - new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine, pool), + new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine, pool, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), pool ); diff --git a/extensions/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTopNQueryTest.java b/extensions/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTopNQueryTest.java index fd6a660e99c..b4f5c212aa6 100644 --- a/extensions/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTopNQueryTest.java +++ b/extensions/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTopNQueryTest.java @@ -61,7 +61,7 @@ public class ApproximateHistogramTopNQueryTest QueryRunnerTestHelper.makeQueryRunners( new TopNQueryRunnerFactory( TestQueryRunners.getPool(), - new TopNQueryQueryToolChest(new TopNQueryConfig()), + new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), QueryRunnerTestHelper.NOOP_QUERYWATCHER ) ) @@ -79,7 +79,7 @@ public class ApproximateHistogramTopNQueryTest } } ), - new TopNQueryQueryToolChest(new TopNQueryConfig()), + new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), QueryRunnerTestHelper.NOOP_QUERYWATCHER ) ) diff --git a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java new file mode 100644 index 00000000000..09eee158560 --- /dev/null +++ b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java @@ -0,0 +1,83 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.query; + +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.common.guava.LazySequence; +import com.metamx.common.guava.Sequence; + +public class AsyncQueryRunner implements QueryRunner +{ + + private final QueryRunner baseRunner; + private final ListeningExecutorService executor; + private final QueryWatcher queryWatcher; + + public AsyncQueryRunner(QueryRunner baseRunner, ExecutorService executor, QueryWatcher queryWatcher) { + this.baseRunner = baseRunner; + this.executor = MoreExecutors.listeningDecorator(executor); + this.queryWatcher = queryWatcher; + } + + @Override + public Sequence run(final Query query, final Map responseContext) + { + final int priority = query.getContextPriority(0); + final ListenableFuture> future = executor.submit(new AbstractPrioritizedCallable>(priority) + { + @Override + public Sequence call() throws Exception + { + //Note: this is assumed that baseRunner does most of the work eagerly on call to the + //run() method and resulting sequence accumulate/yield is fast. + return baseRunner.run(query, responseContext); + } + }); + queryWatcher.registerQuery(query, future); + + return new LazySequence<>(new Supplier>() + { + @Override + public Sequence get() + { + try { + Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT); + if (timeout == null) { + return future.get(); + } else { + return future.get(timeout.longValue(), TimeUnit.MILLISECONDS); + } + } catch (ExecutionException | InterruptedException | TimeoutException ex) { + throw Throwables.propagate(ex); + } + } + }); + } +} diff --git a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java index ce052dbddbf..243c73efc20 100644 --- a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java @@ -22,6 +22,9 @@ import com.google.common.collect.Lists; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; + import io.druid.granularity.PeriodGranularity; import io.druid.query.spec.MultipleIntervalSegmentSpec; import org.joda.time.Interval; @@ -31,57 +34,90 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; /** */ public class IntervalChunkingQueryRunner implements QueryRunner { private final QueryRunner baseRunner; - private final Period period; - public IntervalChunkingQueryRunner(QueryRunner baseRunner, Period period) + private final QueryToolChest> toolChest; + private final ExecutorService executor; + private final QueryWatcher queryWatcher; + private final ServiceEmitter emitter; + + public IntervalChunkingQueryRunner(QueryRunner baseRunner, QueryToolChest> toolChest, + ExecutorService executor, QueryWatcher queryWatcher, ServiceEmitter emitter) { this.baseRunner = baseRunner; - this.period = period; + this.toolChest = toolChest; + this.executor = executor; + this.queryWatcher = queryWatcher; + this.emitter = emitter; } @Override public Sequence run(final Query query, final Map responseContext) { - if (period.getMillis() == 0) { + final Period chunkPeriod = getChunkPeriod(query); + if (chunkPeriod.toStandardDuration().getMillis() == 0) { return baseRunner.run(query, responseContext); } + List chunkIntervals = Lists.newArrayList(FunctionalIterable + .create(query.getIntervals()) + .transformCat( + new Function>() + { + @Override + public Iterable apply(Interval input) + { + return splitInterval(input, chunkPeriod); + } + } + )); + + if(chunkIntervals.size() <= 1) { + return baseRunner.run(query, responseContext); + } + + final QueryRunner finalQueryRunner = new AsyncQueryRunner( + //Note: it is assumed that toolChest.mergeResults(..) gives a query runner that is + //not lazy i.e. it does most of its work on call to run() method + toolChest.mergeResults( + new MetricsEmittingQueryRunner( + emitter, + new Function, ServiceMetricEvent.Builder>() + { + @Override + public ServiceMetricEvent.Builder apply(Query input) + { + return toolChest.makeMetricBuilder(input); + } + }, + baseRunner + ).withWaitMeasuredFromNow()), + executor, queryWatcher); + return Sequences.concat( - FunctionalIterable - .create(query.getIntervals()) - .transformCat( - new Function>() - { - @Override - public Iterable apply(Interval input) - { - return splitInterval(input); - } - } - ) - .transform( - new Function>() - { - @Override - public Sequence apply(Interval singleInterval) - { - return baseRunner.run( - query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))), - responseContext + Lists.newArrayList(FunctionalIterable.create(chunkIntervals).transform( + new Function>() + { + @Override + public Sequence apply(Interval singleInterval) + { + return finalQueryRunner.run( + query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))), + responseContext ); - } - } - ) - ); + } + } + )) + ); } - private Iterable splitInterval(Interval interval) + private Iterable splitInterval(Interval interval, Period period) { if (interval.getEndMillis() == interval.getStartMillis()) { return Lists.newArrayList(interval); @@ -106,4 +142,9 @@ public class IntervalChunkingQueryRunner implements QueryRunner return intervals; } + + private Period getChunkPeriod(Query query) { + String p = query.getContextValue(QueryContextKeys.CHUNK_PERIOD, "P0D"); + return Period.parse(p); + } } diff --git a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunnerDecorator.java b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunnerDecorator.java new file mode 100644 index 00000000000..31ba68b5c2f --- /dev/null +++ b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunnerDecorator.java @@ -0,0 +1,49 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.query; + +import io.druid.guice.annotations.Processing; + +import java.util.concurrent.ExecutorService; + +import com.google.inject.Inject; +import com.metamx.emitter.service.ServiceEmitter; + +public class IntervalChunkingQueryRunnerDecorator +{ + private final ExecutorService executor; + private final QueryWatcher queryWatcher; + private final ServiceEmitter emitter; + + @Inject + public IntervalChunkingQueryRunnerDecorator(@Processing ExecutorService executor, QueryWatcher queryWatcher, + ServiceEmitter emitter) + { + this.executor = executor; + this.queryWatcher = queryWatcher; + this.emitter = emitter; + } + + public QueryRunner decorate(QueryRunner delegate, QueryToolChest> toolChest) + { + return new IntervalChunkingQueryRunner(delegate, (QueryToolChest>)toolChest, + executor, queryWatcher, emitter); + } +} diff --git a/processing/src/main/java/io/druid/query/QueryConfig.java b/processing/src/main/java/io/druid/query/QueryConfig.java deleted file mode 100644 index 2951bd1442c..00000000000 --- a/processing/src/main/java/io/druid/query/QueryConfig.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.query; - -import com.fasterxml.jackson.annotation.JsonProperty; -import org.joda.time.Period; - -/** - */ -public class QueryConfig -{ - @JsonProperty - private Period chunkPeriod = new Period(); - - public Period getChunkPeriod() - { - return chunkPeriod; - } -} diff --git a/processing/src/main/java/io/druid/query/QueryContextKeys.java b/processing/src/main/java/io/druid/query/QueryContextKeys.java new file mode 100644 index 00000000000..01e8482f258 --- /dev/null +++ b/processing/src/main/java/io/druid/query/QueryContextKeys.java @@ -0,0 +1,27 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.query; + +public class QueryContextKeys +{ + public static final String PRIORITY = "priority"; + public static final String TIMEOUT = "timeout"; + public static final String CHUNK_PERIOD = "chunkPeriod"; +} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java index e8b26083c48..da483558df8 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java @@ -18,11 +18,10 @@ package io.druid.query.groupby; import com.fasterxml.jackson.annotation.JsonProperty; -import io.druid.query.QueryConfig; /** */ -public class GroupByQueryConfig extends QueryConfig +public class GroupByQueryConfig { @JsonProperty private boolean singleThreaded = false; diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index f8652cb02a8..258abbc389a 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -42,7 +42,7 @@ import io.druid.granularity.QueryGranularity; import io.druid.guice.annotations.Global; import io.druid.query.CacheStrategy; import io.druid.query.DataSource; -import io.druid.query.IntervalChunkingQueryRunner; +import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; import io.druid.query.QueryCacheHelper; import io.druid.query.QueryDataSource; @@ -77,10 +77,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest NO_MERGE_CONTEXT = ImmutableMap.of( - GROUP_BY_MERGE_KEY, - "false" - ); private final Supplier configSupplier; @@ -88,19 +84,22 @@ public class GroupByQueryQueryToolChest extends QueryToolChest configSupplier, ObjectMapper jsonMapper, GroupByQueryEngine engine, - @Global StupidPool bufferPool + @Global StupidPool bufferPool, + IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator ) { this.configSupplier = configSupplier; this.jsonMapper = jsonMapper; this.engine = engine; this.bufferPool = bufferPool; + this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator; } @Override @@ -116,8 +115,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest preMergeQueryDecoration(QueryRunner runner) { return new SubqueryQueryRunner<>( - new IntervalChunkingQueryRunner<>(runner, configSupplier.get().getChunkPeriod()) + intervalChunkingQueryRunnerDecorator.decorate(runner, this) ); } diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index 20fd2a5b892..33dcdb47a04 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -36,7 +36,7 @@ import com.metamx.common.StringUtils; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.collections.OrderedMergeSequence; import io.druid.query.CacheStrategy; -import io.druid.query.IntervalChunkingQueryRunner; +import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; import io.druid.query.QueryMetricUtil; import io.druid.query.QueryRunner; @@ -71,12 +71,16 @@ public class SearchQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) { return new SearchThresholdAdjustingQueryRunner( - new IntervalChunkingQueryRunner>(runner, config.getChunkPeriod()), + intervalChunkingQueryRunnerDecorator.decorate(runner, this), config ); } diff --git a/processing/src/main/java/io/druid/query/search/search/SearchQueryConfig.java b/processing/src/main/java/io/druid/query/search/search/SearchQueryConfig.java index fe444f604d6..6f2f7fb8f96 100644 --- a/processing/src/main/java/io/druid/query/search/search/SearchQueryConfig.java +++ b/processing/src/main/java/io/druid/query/search/search/SearchQueryConfig.java @@ -18,13 +18,12 @@ package io.druid.query.search.search; import com.fasterxml.jackson.annotation.JsonProperty; -import io.druid.query.QueryConfig; import javax.validation.constraints.Min; /** */ -public class SearchQueryConfig extends QueryConfig +public class SearchQueryConfig { @JsonProperty @Min(1) diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index a10355ea274..991ba2d6d04 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -32,9 +32,8 @@ import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.collections.OrderedMergeSequence; import io.druid.granularity.QueryGranularity; import io.druid.query.CacheStrategy; -import io.druid.query.IntervalChunkingQueryRunner; +import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; -import io.druid.query.QueryConfig; import io.druid.query.QueryMetricUtil; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; @@ -66,14 +65,16 @@ public class SelectQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) { - return new IntervalChunkingQueryRunner>( - runner, - config.getChunkPeriod() - - ); + return intervalChunkingQueryRunnerDecorator.decorate(runner, this); } public Ordering> getOrdering() diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 071148218a6..7458f5a53e5 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -17,23 +17,12 @@ package io.druid.query.timeseries; -import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.base.Function; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Ordering; -import com.google.inject.Inject; -import com.metamx.common.guava.MergeSequence; -import com.metamx.common.guava.Sequence; -import com.metamx.common.guava.nary.BinaryFn; -import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.collections.OrderedMergeSequence; import io.druid.granularity.QueryGranularity; import io.druid.query.CacheStrategy; -import io.druid.query.IntervalChunkingQueryRunner; +import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; import io.druid.query.QueryCacheHelper; -import io.druid.query.QueryConfig; import io.druid.query.QueryMetricUtil; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; @@ -44,14 +33,27 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.PostAggregator; import io.druid.query.filter.DimFilter; -import org.joda.time.DateTime; -import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; + +import org.joda.time.DateTime; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import com.google.inject.Inject; +import com.metamx.common.guava.MergeSequence; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.nary.BinaryFn; +import com.metamx.emitter.service.ServiceMetricEvent; + /** */ public class TimeseriesQueryQueryToolChest extends QueryToolChest, TimeseriesQuery> @@ -66,12 +68,12 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) { - return new IntervalChunkingQueryRunner>( - runner, - config.getChunkPeriod() - - ); + return intervalChunkingQueryRunnerDecorator.decorate(runner, this); } public Ordering> getOrdering() diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryConfig.java b/processing/src/main/java/io/druid/query/topn/TopNQueryConfig.java index aeb0e885477..95097a66460 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryConfig.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryConfig.java @@ -18,13 +18,12 @@ package io.druid.query.topn; import com.fasterxml.jackson.annotation.JsonProperty; -import io.druid.query.QueryConfig; import javax.validation.constraints.Min; /** */ -public class TopNQueryConfig extends QueryConfig +public class TopNQueryConfig { @JsonProperty @Min(1) diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index fadd4497f20..d8f1acd67dc 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -35,7 +35,7 @@ import io.druid.collections.OrderedMergeSequence; import io.druid.granularity.QueryGranularity; import io.druid.query.BySegmentResultValue; import io.druid.query.CacheStrategy; -import io.druid.query.IntervalChunkingQueryRunner; +import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; import io.druid.query.QueryCacheHelper; import io.druid.query.QueryMetricUtil; @@ -70,12 +70,16 @@ public class TopNQueryQueryToolChest extends QueryToolChest aggregatorFactories){ @@ -416,7 +420,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest> preMergeQueryDecoration(QueryRunner> runner) { - return new IntervalChunkingQueryRunner<>(runner, config.getChunkPeriod()); + return intervalChunkingQueryRunnerDecorator.decorate(runner, this); } @Override diff --git a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java new file mode 100644 index 00000000000..962d775e7b8 --- /dev/null +++ b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java @@ -0,0 +1,135 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.query; + +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; + +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; + +public class AsyncQueryRunnerTest +{ + + private final static long TEST_TIMEOUT = 60000; + + private final ExecutorService executor; + private final Query query; + + public AsyncQueryRunnerTest() { + this.executor = Executors.newSingleThreadExecutor(); + query = Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .intervals("2014/2015") + .aggregators(Lists.newArrayList(new CountAggregatorFactory("count"))) + .build(); + } + + @Test(timeout = TEST_TIMEOUT) + public void testAsyncNature() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + QueryRunner baseRunner = new QueryRunner() + { + @Override + public Sequence run(Query query, Map responseContext) + { + try { + latch.await(); + return Sequences.simple(Lists.newArrayList(1)); + } catch(InterruptedException ex) { + throw Throwables.propagate(ex); + } + } + }; + + AsyncQueryRunner asyncRunner = new AsyncQueryRunner<>(baseRunner, executor, + QueryRunnerTestHelper.NOOP_QUERYWATCHER); + + Sequence lazy = asyncRunner.run(query, Collections.EMPTY_MAP); + latch.countDown(); + Assert.assertEquals(Lists.newArrayList(1), Sequences.toList(lazy, Lists.newArrayList())); + } + + @Test(timeout = TEST_TIMEOUT) + public void testQueryTimeoutHonored() { + QueryRunner baseRunner = new QueryRunner() + { + @Override + public Sequence run(Query query, Map responseContext) + { + try { + Thread.sleep(Long.MAX_VALUE); + throw new RuntimeException("query should not have completed"); + } catch(InterruptedException ex) { + throw Throwables.propagate(ex); + } + } + }; + + AsyncQueryRunner asyncRunner = new AsyncQueryRunner<>(baseRunner, executor, + QueryRunnerTestHelper.NOOP_QUERYWATCHER); + + Sequence lazy = asyncRunner.run( + query.withOverriddenContext(ImmutableMap.of("timeout", 1)), + Collections.EMPTY_MAP); + + try { + Sequences.toList(lazy, Lists.newArrayList()); + } catch(RuntimeException ex) { + Assert.assertTrue(ex.getCause() instanceof TimeoutException); + return; + } + Assert.fail(); + } + + @Test + public void testQueryRegistration() { + QueryRunner baseRunner = new QueryRunner() + { + @Override + public Sequence run(Query query, Map responseContext) { return null; } + }; + + QueryWatcher mock = EasyMock.createMock(QueryWatcher.class); + mock.registerQuery(EasyMock.eq(query), EasyMock.anyObject(ListenableFuture.class)); + EasyMock.replay(mock); + + AsyncQueryRunner asyncRunner = new AsyncQueryRunner<>(baseRunner, executor, + mock); + + asyncRunner.run(query, Collections.EMPTY_MAP); + EasyMock.verify(mock); + } +} \ No newline at end of file diff --git a/processing/src/test/java/io/druid/query/IntervalChunkingQueryRunnerTest.java b/processing/src/test/java/io/druid/query/IntervalChunkingQueryRunnerTest.java new file mode 100644 index 00000000000..821233e6c8c --- /dev/null +++ b/processing/src/test/java/io/druid/query/IntervalChunkingQueryRunnerTest.java @@ -0,0 +1,91 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.query; + +import io.druid.query.Druids.TimeseriesQueryBuilder; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; + +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.metamx.common.guava.Sequences; +import com.metamx.emitter.service.ServiceEmitter; + +public class IntervalChunkingQueryRunnerTest +{ + private IntervalChunkingQueryRunnerDecorator decorator; + private ExecutorService executors; + private QueryRunner baseRunner; + private QueryToolChest toolChest; + + private final TimeseriesQueryBuilder queryBuilder; + + public IntervalChunkingQueryRunnerTest() { + queryBuilder = Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .aggregators(Lists.newArrayList(new CountAggregatorFactory("count"))); + } + + @Before + public void setup() { + executors = EasyMock.createMock(ExecutorService.class); + ServiceEmitter emitter = EasyMock.createNiceMock(ServiceEmitter.class); + decorator = new IntervalChunkingQueryRunnerDecorator(executors, + QueryRunnerTestHelper.NOOP_QUERYWATCHER, emitter); + baseRunner = EasyMock.createMock(QueryRunner.class); + toolChest = EasyMock.createNiceMock(QueryToolChest.class); + } + + @Test + public void testDefaultNoChunking() { + Query query = queryBuilder.intervals("2014/2016").build(); + + EasyMock.expect(baseRunner.run(query, Collections.EMPTY_MAP)).andReturn(Sequences.empty()); + EasyMock.replay(baseRunner); + + QueryRunner runner = decorator.decorate(baseRunner, toolChest); + runner.run(query, Collections.EMPTY_MAP); + + EasyMock.verify(baseRunner); + } + + @Test + public void testChunking() { + Query query = queryBuilder.intervals("2015-01-01T00:00:00.000/2015-01-11T00:00:00.000").context(ImmutableMap.of("chunkPeriod", "P1D")).build(); + + executors.execute(EasyMock.anyObject(Runnable.class)); + EasyMock.expectLastCall().times(10); + + EasyMock.replay(executors); + EasyMock.replay(toolChest); + + QueryRunner runner = decorator.decorate(baseRunner, toolChest); + runner.run(query, Collections.EMPTY_MAP); + + EasyMock.verify(executors); + } +} diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index 31e6b1b17cf..6764130ad50 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -21,6 +21,8 @@ import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; +import com.metamx.common.guava.Sequence; + import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -49,6 +51,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; /** */ @@ -306,4 +309,21 @@ public class QueryRunnerTestHelper factory.getToolchest() ); } + + public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator() + { + return new IntervalChunkingQueryRunnerDecorator(null, null, null) { + @Override + public QueryRunner decorate(final QueryRunner delegate, + QueryToolChest> toolChest) { + return new QueryRunner() { + @Override + public Sequence run(Query query, Map responseContext) + { + return delegate.run(query, responseContext); + } + }; + } + }; + } } diff --git a/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java b/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java index 1f652419465..a1c9ad2e9be 100644 --- a/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java @@ -82,7 +82,7 @@ public class RetryQueryRunnerTest } }, (QueryToolChest) new TimeseriesQueryQueryToolChest( - new QueryConfig() + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ), new RetryQueryRunnerConfig() { @@ -155,7 +155,7 @@ public class RetryQueryRunnerTest } }, (QueryToolChest) new TimeseriesQueryQueryToolChest( - new QueryConfig() + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ), new RetryQueryRunnerConfig() { @@ -222,7 +222,7 @@ public class RetryQueryRunnerTest } }, (QueryToolChest) new TimeseriesQueryQueryToolChest( - new QueryConfig() + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ), new RetryQueryRunnerConfig() { @@ -274,7 +274,7 @@ public class RetryQueryRunnerTest } }, (QueryToolChest) new TimeseriesQueryQueryToolChest( - new QueryConfig() + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ), new RetryQueryRunnerConfig() { diff --git a/processing/src/test/java/io/druid/query/TestQueryRunners.java b/processing/src/test/java/io/druid/query/TestQueryRunners.java index 20eda4bb6a5..02ee18db30c 100644 --- a/processing/src/test/java/io/druid/query/TestQueryRunners.java +++ b/processing/src/test/java/io/druid/query/TestQueryRunners.java @@ -21,14 +21,17 @@ import com.google.common.base.Supplier; import io.druid.collections.StupidPool; import io.druid.query.search.SearchQueryQueryToolChest; import io.druid.query.search.SearchQueryRunnerFactory; +import io.druid.query.search.SearchResultValue; import io.druid.query.search.search.SearchQueryConfig; import io.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory; import io.druid.query.timeseries.TimeseriesQueryEngine; import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; +import io.druid.query.timeseries.TimeseriesResultValue; import io.druid.query.topn.TopNQueryConfig; import io.druid.query.topn.TopNQueryQueryToolChest; import io.druid.query.topn.TopNQueryRunnerFactory; +import io.druid.query.topn.TopNResultValue; import io.druid.segment.Segment; import java.nio.ByteBuffer; @@ -60,7 +63,8 @@ public class TestQueryRunners { QueryRunnerFactory factory = new TopNQueryRunnerFactory( pool, - new TopNQueryQueryToolChest(topNConfig), + new TopNQueryQueryToolChest(topNConfig, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), QueryRunnerTestHelper.NOOP_QUERYWATCHER ); return new FinalizeResultsQueryRunner( @@ -74,7 +78,8 @@ public class TestQueryRunners ) { QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( - new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER ); @@ -89,7 +94,10 @@ public class TestQueryRunners Segment adapter ) { - QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()), QueryRunnerTestHelper.NOOP_QUERYWATCHER); + QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest( + new SearchQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER); return new FinalizeResultsQueryRunner( factory.createRunner(adapter), factory.getToolchest() diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 9662f0c77c1..10b20396cde 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -120,7 +120,8 @@ public class GroupByQueryRunnerTest engine, QueryRunnerTestHelper.NOOP_QUERYWATCHER, configSupplier, - new GroupByQueryQueryToolChest(configSupplier, mapper, engine, TestQueryRunners.pool), + new GroupByQueryQueryToolChest(configSupplier, mapper, engine, TestQueryRunners.pool, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), TestQueryRunners.pool ); @@ -141,7 +142,8 @@ public class GroupByQueryRunnerTest singleThreadEngine, QueryRunnerTestHelper.NOOP_QUERYWATCHER, singleThreadedConfigSupplier, - new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine, pool), + new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine, pool, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), pool ); @@ -1245,7 +1247,8 @@ public class GroupByQueryRunnerTest configSupplier, new DefaultObjectMapper(), engine, - TestQueryRunners.pool + TestQueryRunners.pool, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ).mergeResults(runner); Map context = Maps.newHashMap(); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, context), "no-limit"); @@ -1301,7 +1304,8 @@ public class GroupByQueryRunnerTest configSupplier, new DefaultObjectMapper(), engine, - TestQueryRunners.pool + TestQueryRunners.pool, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ).mergeResults(runner); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, context), "no-limit"); } @@ -1356,7 +1360,8 @@ public class GroupByQueryRunnerTest configSupplier, new DefaultObjectMapper(), engine, - TestQueryRunners.pool + TestQueryRunners.pool, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ).mergeResults(runner); TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, context), "no-limit"); } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java index b91efabaf59..4f4400ba2be 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java @@ -75,8 +75,10 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest engine, QueryRunnerTestHelper.NOOP_QUERYWATCHER, configSupplier, - new GroupByQueryQueryToolChest(configSupplier, new DefaultObjectMapper(), engine, TestQueryRunners.pool), - TestQueryRunners.pool + new GroupByQueryQueryToolChest(configSupplier, new DefaultObjectMapper(), + engine, TestQueryRunners.pool, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), + TestQueryRunners.pool ); final Collection objects = QueryRunnerTestHelper.makeQueryRunners(factory); diff --git a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java index 20524624660..8c806076113 100644 --- a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java @@ -56,7 +56,8 @@ public class SearchQueryRunnerTest { return QueryRunnerTestHelper.makeQueryRunners( new SearchQueryRunnerFactory( - new SearchQueryQueryToolChest(new SearchQueryConfig()), + new SearchQueryQueryToolChest(new SearchQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), QueryRunnerTestHelper.NOOP_QUERYWATCHER ) ); diff --git a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java index 2db72128127..8e674b285f1 100644 --- a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.Maps; import com.metamx.common.ISE; import com.metamx.common.guava.Sequences; import io.druid.jackson.DefaultObjectMapper; -import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; @@ -57,7 +56,8 @@ public class SelectQueryRunnerTest { return QueryRunnerTestHelper.makeQueryRunners( new SelectQueryRunnerFactory( - new SelectQueryQueryToolChest(new QueryConfig(), new DefaultObjectMapper()), + new SelectQueryQueryToolChest(new DefaultObjectMapper(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), new SelectQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER ) diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java index 598aea76798..9e8032caa5a 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java @@ -24,7 +24,6 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.query.Druids; import io.druid.query.Query; -import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; @@ -64,7 +63,7 @@ public class TimeSeriesUnionQueryRunnerTest { return QueryRunnerTestHelper.makeUnionQueryRunners( new TimeseriesQueryRunnerFactory( - new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER ), @@ -138,7 +137,8 @@ public class TimeSeriesUnionQueryRunnerTest ) ) .build(); - QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(new QueryConfig()); + QueryToolChest toolChest = new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()); QueryRunner mergingrunner = toolChest.mergeResults( new UnionQueryRunner>( (Iterable) Arrays.asList( diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java index 4ca4f9a7565..b056c9f2f38 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java @@ -26,7 +26,6 @@ import io.druid.granularity.QueryGranularity; import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.Query; -import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; @@ -90,7 +89,8 @@ public class TimeseriesQueryRunnerBonusTest private static List> runTimeseriesCount(IncrementalIndex index) { final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( - new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER ); diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java index 31d2d9bf174..023476c155d 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java @@ -25,7 +25,6 @@ import com.metamx.common.guava.Sequences; import io.druid.granularity.PeriodGranularity; import io.druid.granularity.QueryGranularity; import io.druid.query.Druids; -import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; @@ -72,7 +71,8 @@ public class TimeseriesQueryRunnerTest { return QueryRunnerTestHelper.makeQueryRunners( new TimeseriesQueryRunnerFactory( - new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER ) diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerBenchmark.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerBenchmark.java index 90329300651..93ebb9da6ed 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerBenchmark.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerBenchmark.java @@ -97,7 +97,7 @@ public class TopNQueryRunnerBenchmark extends AbstractBenchmark } } ), - new TopNQueryQueryToolChest(new TopNQueryConfig()), + new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), QueryRunnerTestHelper.NOOP_QUERYWATCHER ); testCaseMap.put( diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java index cb133c14443..ca74163ed3c 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java @@ -78,7 +78,8 @@ public class TopNQueryRunnerTest QueryRunnerTestHelper.makeQueryRunners( new TopNQueryRunnerFactory( TestQueryRunners.getPool(), - new TopNQueryQueryToolChest(new TopNQueryConfig()), + new TopNQueryQueryToolChest(new TopNQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), QueryRunnerTestHelper.NOOP_QUERYWATCHER ) ) @@ -96,7 +97,8 @@ public class TopNQueryRunnerTest } } ), - new TopNQueryQueryToolChest(new TopNQueryConfig()), + new TopNQueryQueryToolChest(new TopNQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), QueryRunnerTestHelper.NOOP_QUERYWATCHER ) ) @@ -398,7 +400,10 @@ public class TopNQueryRunnerTest ) ) ); - Sequence> results = new TopNQueryQueryToolChest(new TopNQueryConfig()).postMergeQueryDecoration( + Sequence> results = new TopNQueryQueryToolChest( + new TopNQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ).postMergeQueryDecoration( runner ).run( query, diff --git a/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java index 3616b026679..7dcd16313b4 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java @@ -64,7 +64,7 @@ public class TopNUnionQueryTest QueryRunnerTestHelper.makeUnionQueryRunners( new TopNQueryRunnerFactory( TestQueryRunners.getPool(), - new TopNQueryQueryToolChest(new TopNQueryConfig()), + new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), QueryRunnerTestHelper.NOOP_QUERYWATCHER ), QueryRunnerTestHelper.unionDataSource @@ -83,7 +83,7 @@ public class TopNUnionQueryTest } } ), - new TopNQueryQueryToolChest(new TopNQueryConfig()), + new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), QueryRunnerTestHelper.NOOP_QUERYWATCHER ), QueryRunnerTestHelper.unionDataSource diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index 2c777318942..ae727b8f980 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -31,7 +31,6 @@ import io.druid.data.input.Row; import io.druid.granularity.QueryGranularity; import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; -import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; @@ -283,7 +282,7 @@ public class IncrementalIndexTest final List> queryFutures = new LinkedList<>(); final Segment incrementalIndexSegment = new IncrementalIndexSegment(index, null); final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( - new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER ); diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java index 26e4f10760a..d0730f73bea 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java @@ -28,7 +28,6 @@ import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.granularity.QueryGranularity; import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; -import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; @@ -464,7 +463,8 @@ public class SpatialFilterBonusTest ); try { TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( - new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER ); @@ -551,7 +551,8 @@ public class SpatialFilterBonusTest ); try { TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( - new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER ); diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java index 3a5722d20f4..887ebdb5b56 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java @@ -29,7 +29,6 @@ import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.granularity.QueryGranularity; import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; -import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; @@ -540,7 +539,8 @@ public class SpatialFilterTest ); try { TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( - new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER ); @@ -592,7 +592,8 @@ public class SpatialFilterTest ); try { TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( - new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER ); @@ -679,7 +680,8 @@ public class SpatialFilterTest ); try { TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( - new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER ); diff --git a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java index ed835c4cb29..21c38701b43 100644 --- a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java +++ b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java @@ -34,7 +34,6 @@ import io.druid.data.input.MapBasedInputRow; import io.druid.granularity.QueryGranularity; import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; -import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; @@ -280,7 +279,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark final List> queryFutures = new LinkedList<>(); final Segment incrementalIndexSegment = new IncrementalIndexSegment(incrementalIndex, null); final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( - new TimeseriesQueryQueryToolChest(new QueryConfig()), + new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER ); diff --git a/server/src/main/java/io/druid/guice/QueryToolChestModule.java b/server/src/main/java/io/druid/guice/QueryToolChestModule.java index 2c58177b6f4..a26c42b0dfe 100644 --- a/server/src/main/java/io/druid/guice/QueryToolChestModule.java +++ b/server/src/main/java/io/druid/guice/QueryToolChestModule.java @@ -22,7 +22,6 @@ import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.multibindings.MapBinder; import io.druid.query.Query; -import io.druid.query.QueryConfig; import io.druid.query.QueryToolChest; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; @@ -72,7 +71,6 @@ public class QueryToolChestModule implements Module binder.bind(entry.getValue()).in(LazySingleton.class); } - JsonConfigProvider.bind(binder, "druid.query", QueryConfig.class); JsonConfigProvider.bind(binder, "druid.query.groupBy", GroupByQueryConfig.class); JsonConfigProvider.bind(binder, "druid.query.search", SearchQueryConfig.class); JsonConfigProvider.bind(binder, "druid.query.topN", TopNQueryConfig.class); diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index 25a3b42075f..4fced547fee 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -17,15 +17,8 @@ package io.druid.server; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.inject.Inject; -import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.CachingClusteredClient; import io.druid.query.FinalizeResultsQueryRunner; -import io.druid.query.MetricsEmittingQueryRunner; import io.druid.query.PostProcessingOperator; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -35,10 +28,15 @@ import io.druid.query.QueryToolChestWarehouse; import io.druid.query.RetryQueryRunner; import io.druid.query.RetryQueryRunnerConfig; import io.druid.query.SegmentDescriptor; + +import java.util.Map; + import org.joda.time.Interval; -import javax.annotation.Nullable; -import java.util.Map; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import com.metamx.emitter.service.ServiceEmitter; /** */ @@ -84,25 +82,13 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final FinalizeResultsQueryRunner baseRunner = new FinalizeResultsQueryRunner( toolChest.postMergeQueryDecoration( toolChest.mergeResults( - new MetricsEmittingQueryRunner( - emitter, - new Function, ServiceMetricEvent.Builder>() - { - @Override - public ServiceMetricEvent.Builder apply(@Nullable Query input) - { - return toolChest.makeMetricBuilder(query); - } - }, - toolChest.preMergeQueryDecoration( - new RetryQueryRunner( + toolChest.preMergeQueryDecoration( + new RetryQueryRunner( baseClient, toolChest, retryConfig, - objectMapper - ) - ) - ).withWaitMeasuredFromNow() + objectMapper) + ) ) ), toolChest diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index c2073ccdde2..47d92e63261 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -47,6 +47,7 @@ import com.metamx.common.guava.MergeIterable; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.guava.nary.TrinaryFn; + import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.client.cache.MapCache; @@ -66,8 +67,8 @@ import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.Query; -import io.druid.query.QueryConfig; import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; import io.druid.query.Result; import io.druid.query.SegmentDescriptor; @@ -118,6 +119,7 @@ import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.SingleElementPartitionChunk; import io.druid.timeline.partition.StringPartitionChunk; + import org.easymock.Capture; import org.easymock.EasyMock; import org.joda.time.DateTime; @@ -367,7 +369,8 @@ public class CachingClusteredClientTest .postAggregators(POST_AGGS) .context(CONTEXT); - QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig())); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())); testQueryCaching( runner, @@ -402,7 +405,8 @@ public class CachingClusteredClientTest .postAggregators(POST_AGGS) .context(CONTEXT); - QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig())); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())); testQueryCaching( runner, @@ -470,7 +474,8 @@ public class CachingClusteredClientTest .postAggregators(POST_AGGS) .context(CONTEXT); - QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig())); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())); testQueryCaching( runner, @@ -529,7 +534,9 @@ public class CachingClusteredClientTest .postAggregators(POST_AGGS) .context(CONTEXT); - QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig())); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()) + ); testQueryCaching( runner, @@ -571,7 +578,8 @@ public class CachingClusteredClientTest .aggregators(AGGS) .postAggregators(POST_AGGS) .context(CONTEXT); - QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig())); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())); testQueryCaching( runner, 1, @@ -642,7 +650,8 @@ public class CachingClusteredClientTest .postAggregators(POST_AGGS) .context(CONTEXT); - QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig())); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())); testQueryCaching( runner, @@ -714,7 +723,8 @@ public class CachingClusteredClientTest .postAggregators(POST_AGGS) .context(CONTEXT); - QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig())); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())); testQueryCaching( runner, @@ -791,7 +801,8 @@ public class CachingClusteredClientTest ), client.mergeCachedAndUncachedSequences( sequences, - new TopNQueryQueryToolChest(new TopNQueryConfig()) + new TopNQueryQueryToolChest(new TopNQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()) ) ); } @@ -813,7 +824,8 @@ public class CachingClusteredClientTest .postAggregators(POST_AGGS) .context(CONTEXT); - QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig())); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())); testQueryCaching( runner, builder.build(), @@ -882,7 +894,8 @@ public class CachingClusteredClientTest .postAggregators(POST_AGGS) .context(CONTEXT); - QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig())); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())); testQueryCaching( runner, builder.build(), @@ -977,7 +990,10 @@ public class CachingClusteredClientTest ) ); - QueryRunner runner = new FinalizeResultsQueryRunner(client, new SearchQueryQueryToolChest(new SearchQueryConfig())); + QueryRunner runner = new FinalizeResultsQueryRunner(client, new SearchQueryQueryToolChest( + new SearchQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()) + ); HashMap context = new HashMap(); TestHelper.assertExpectedResults( makeSearchResults( @@ -1046,8 +1062,8 @@ public class CachingClusteredClientTest QueryRunner runner = new FinalizeResultsQueryRunner( client, new SelectQueryQueryToolChest( - new QueryConfig(), - jsonMapper + jsonMapper, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ) ); HashMap context = new HashMap(); @@ -1150,7 +1166,8 @@ public class CachingClusteredClientTest } ) ), - TestQueryRunners.pool + TestQueryRunners.pool, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ) ); HashMap context = new HashMap(); @@ -1992,13 +2009,17 @@ public class CachingClusteredClientTest ImmutableMap., QueryToolChest>builder() .put( TimeseriesQuery.class, - new TimeseriesQueryQueryToolChest(new QueryConfig()) + new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()) ) - .put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig())) - .put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig())) + .put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())) + .put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())) .put( SelectQuery.class, - new SelectQueryQueryToolChest(new QueryConfig(), jsonMapper) + new SelectQueryQueryToolChest(jsonMapper, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()) ) .put( GroupByQuery.class, @@ -2018,7 +2039,8 @@ public class CachingClusteredClientTest } ) ), - TestQueryRunners.pool + TestQueryRunners.pool, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ) ) .put(TimeBoundaryQuery.class, new TimeBoundaryQueryQueryToolChest()) diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java index d2fac19f4eb..e5cef961e12 100644 --- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java @@ -37,6 +37,7 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.query.CacheStrategy; import io.druid.query.Query; import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; import io.druid.query.SegmentDescriptor; import io.druid.query.aggregation.AggregatorFactory; @@ -112,7 +113,8 @@ public class CachingQueryRunnerTest String segmentIdentifier = "segment"; SegmentDescriptor segmentDescriptor = new SegmentDescriptor(new Interval("2011/2012"), "version", 0); - TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig()); + TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()); DefaultObjectMapper objectMapper = new DefaultObjectMapper(); CachingQueryRunner runner = new CachingQueryRunner( segmentIdentifier, @@ -188,7 +190,8 @@ public class CachingQueryRunnerTest Iterable> expectedResults = makeTopNResults(true, objects); String segmentIdentifier = "segment"; SegmentDescriptor segmentDescriptor = new SegmentDescriptor(new Interval("2011/2012"), "version", 0); - TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig()); + TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig(), + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()); final TopNQueryBuilder builder = new TopNQueryBuilder() .dataSource("ds")