interval chunk query runner now processes individual chunk in a thread pool and prints metrics query/time per chunk

This commit is contained in:
Himanshu Gupta 2015-02-22 19:40:47 -06:00
parent 7c02212584
commit 29039fd541
40 changed files with 658 additions and 216 deletions

View File

@ -41,7 +41,8 @@ Druid uses Jetty to serve HTTP requests.
#### Processing
The broker only uses processing configs for nested groupBy queries.
The broker uses processing configs for nested groupBy queries. And, optionally, Long-interval queries (of any type) can be broken into shorter interval queries and processed in parallel inside this thread pool. For more details, see "chunkPeriod" in [Querying](Querying.html) doc.
|Property|Description|Default|
|--------|-----------|-------|
@ -52,10 +53,6 @@ The broker only uses processing configs for nested groupBy queries.
#### General Query Configuration
|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.chunkPeriod`|Long-interval queries (of any type) may be broken into shorter interval queries, reducing the impact on resources. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. |0 (off)|
##### GroupBy Query Config
|Property|Description|Default|

View File

@ -145,6 +145,8 @@ Properties shared by all query types
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration |
|bySegment | `false` | Return "by segment" results. Pimarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `0` (off) | At broker, Long-interval queries (of any type) may be broken into shorter interval queries, reducing the impact on resources. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. All the query chunks will be processed asynchronously inside query processing executor service. Make sure "druid.processing.numThreads" is configured appropriately. |
Query Cancellation
------------------

View File

@ -84,7 +84,8 @@ public class ApproximateHistogramGroupByQueryTest
engine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
configSupplier,
new GroupByQueryQueryToolChest(configSupplier, mapper, engine, pool),
new GroupByQueryQueryToolChest(configSupplier, mapper, engine, pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
pool
);
@ -105,7 +106,8 @@ public class ApproximateHistogramGroupByQueryTest
singleThreadEngine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
singleThreadedConfigSupplier,
new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine, pool),
new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine, pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
pool
);

View File

@ -61,7 +61,7 @@ public class ApproximateHistogramTopNQueryTest
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
TestQueryRunners.getPool(),
new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
@ -79,7 +79,7 @@ public class ApproximateHistogramTopNQueryTest
}
}
),
new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)

View File

@ -0,0 +1,83 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.guava.LazySequence;
import com.metamx.common.guava.Sequence;
public class AsyncQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> baseRunner;
private final ListeningExecutorService executor;
private final QueryWatcher queryWatcher;
public AsyncQueryRunner(QueryRunner<T> baseRunner, ExecutorService executor, QueryWatcher queryWatcher) {
this.baseRunner = baseRunner;
this.executor = MoreExecutors.listeningDecorator(executor);
this.queryWatcher = queryWatcher;
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
final int priority = query.getContextPriority(0);
final ListenableFuture<Sequence<T>> future = executor.submit(new AbstractPrioritizedCallable<Sequence<T>>(priority)
{
@Override
public Sequence<T> call() throws Exception
{
//Note: this is assumed that baseRunner does most of the work eagerly on call to the
//run() method and resulting sequence accumulate/yield is fast.
return baseRunner.run(query, responseContext);
}
});
queryWatcher.registerQuery(query, future);
return new LazySequence<>(new Supplier<Sequence<T>>()
{
@Override
public Sequence<T> get()
{
try {
Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT);
if (timeout == null) {
return future.get();
} else {
return future.get(timeout.longValue(), TimeUnit.MILLISECONDS);
}
} catch (ExecutionException | InterruptedException | TimeoutException ex) {
throw Throwables.propagate(ex);
}
}
});
}
}

View File

@ -22,6 +22,9 @@ import com.google.common.collect.Lists;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.granularity.PeriodGranularity;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import org.joda.time.Interval;
@ -31,29 +34,38 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
*/
public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> baseRunner;
private final Period period;
public IntervalChunkingQueryRunner(QueryRunner<T> baseRunner, Period period)
private final QueryToolChest<T, Query<T>> toolChest;
private final ExecutorService executor;
private final QueryWatcher queryWatcher;
private final ServiceEmitter emitter;
public IntervalChunkingQueryRunner(QueryRunner<T> baseRunner, QueryToolChest<T, Query<T>> toolChest,
ExecutorService executor, QueryWatcher queryWatcher, ServiceEmitter emitter)
{
this.baseRunner = baseRunner;
this.period = period;
this.toolChest = toolChest;
this.executor = executor;
this.queryWatcher = queryWatcher;
this.emitter = emitter;
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
if (period.getMillis() == 0) {
final Period chunkPeriod = getChunkPeriod(query);
if (chunkPeriod.toStandardDuration().getMillis() == 0) {
return baseRunner.run(query, responseContext);
}
return Sequences.concat(
FunctionalIterable
List<Interval> chunkIntervals = Lists.newArrayList(FunctionalIterable
.create(query.getIntervals())
.transformCat(
new Function<Interval, Iterable<Interval>>()
@ -61,27 +73,51 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
@Override
public Iterable<Interval> apply(Interval input)
{
return splitInterval(input);
return splitInterval(input, chunkPeriod);
}
}
)
.transform(
));
if(chunkIntervals.size() <= 1) {
return baseRunner.run(query, responseContext);
}
final QueryRunner<T> finalQueryRunner = new AsyncQueryRunner<T>(
//Note: it is assumed that toolChest.mergeResults(..) gives a query runner that is
//not lazy i.e. it does most of its work on call to run() method
toolChest.mergeResults(
new MetricsEmittingQueryRunner<T>(
emitter,
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Override
public ServiceMetricEvent.Builder apply(Query<T> input)
{
return toolChest.makeMetricBuilder(input);
}
},
baseRunner
).withWaitMeasuredFromNow()),
executor, queryWatcher);
return Sequences.concat(
Lists.newArrayList(FunctionalIterable.create(chunkIntervals).transform(
new Function<Interval, Sequence<T>>()
{
@Override
public Sequence<T> apply(Interval singleInterval)
{
return baseRunner.run(
return finalQueryRunner.run(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))),
responseContext
);
}
}
)
))
);
}
private Iterable<Interval> splitInterval(Interval interval)
private Iterable<Interval> splitInterval(Interval interval, Period period)
{
if (interval.getEndMillis() == interval.getStartMillis()) {
return Lists.newArrayList(interval);
@ -106,4 +142,9 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
return intervals;
}
private Period getChunkPeriod(Query<T> query) {
String p = query.getContextValue(QueryContextKeys.CHUNK_PERIOD, "P0D");
return Period.parse(p);
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import io.druid.guice.annotations.Processing;
import java.util.concurrent.ExecutorService;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
public class IntervalChunkingQueryRunnerDecorator
{
private final ExecutorService executor;
private final QueryWatcher queryWatcher;
private final ServiceEmitter emitter;
@Inject
public IntervalChunkingQueryRunnerDecorator(@Processing ExecutorService executor, QueryWatcher queryWatcher,
ServiceEmitter emitter)
{
this.executor = executor;
this.queryWatcher = queryWatcher;
this.emitter = emitter;
}
public <T> QueryRunner<T> decorate(QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest)
{
return new IntervalChunkingQueryRunner<T>(delegate, (QueryToolChest<T, Query<T>>)toolChest,
executor, queryWatcher, emitter);
}
}

View File

@ -1,34 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.query;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;
/**
*/
public class QueryConfig
{
@JsonProperty
private Period chunkPeriod = new Period();
public Period getChunkPeriod()
{
return chunkPeriod;
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
public class QueryContextKeys
{
public static final String PRIORITY = "priority";
public static final String TIMEOUT = "timeout";
public static final String CHUNK_PERIOD = "chunkPeriod";
}

View File

@ -18,11 +18,10 @@
package io.druid.query.groupby;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.query.QueryConfig;
/**
*/
public class GroupByQueryConfig extends QueryConfig
public class GroupByQueryConfig
{
@JsonProperty
private boolean singleThreaded = false;

View File

@ -42,7 +42,7 @@ import io.druid.granularity.QueryGranularity;
import io.druid.guice.annotations.Global;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSource;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryCacheHelper;
import io.druid.query.QueryDataSource;
@ -77,10 +77,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
{
};
private static final String GROUP_BY_MERGE_KEY = "groupByMerge";
private static final Map<String, Object> NO_MERGE_CONTEXT = ImmutableMap.<String, Object>of(
GROUP_BY_MERGE_KEY,
"false"
);
private final Supplier<GroupByQueryConfig> configSupplier;
@ -88,19 +84,22 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
private final ObjectMapper jsonMapper;
private GroupByQueryEngine engine; // For running the outer query around a subquery
private final IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator;
@Inject
public GroupByQueryQueryToolChest(
Supplier<GroupByQueryConfig> configSupplier,
ObjectMapper jsonMapper,
GroupByQueryEngine engine,
@Global StupidPool<ByteBuffer> bufferPool
@Global StupidPool<ByteBuffer> bufferPool,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator
)
{
this.configSupplier = configSupplier;
this.jsonMapper = jsonMapper;
this.engine = engine;
this.bufferPool = bufferPool;
this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator;
}
@Override
@ -116,8 +115,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
}
if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner,
responseContext
return mergeGroupByResults(((GroupByQuery) input), runner, responseContext
);
}
return runner.run(input, responseContext);
@ -266,7 +264,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
public QueryRunner<Row> preMergeQueryDecoration(QueryRunner<Row> runner)
{
return new SubqueryQueryRunner<>(
new IntervalChunkingQueryRunner<>(runner, configSupplier.get().getChunkPeriod())
intervalChunkingQueryRunnerDecorator.decorate(runner, this)
);
}

View File

@ -36,7 +36,7 @@ import com.metamx.common.StringUtils;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryMetricUtil;
import io.druid.query.QueryRunner;
@ -71,12 +71,16 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
private final SearchQueryConfig config;
private final IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator;
@Inject
public SearchQueryQueryToolChest(
SearchQueryConfig config
SearchQueryConfig config,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator
)
{
this.config = config;
this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator;
}
@Override
@ -251,7 +255,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
public QueryRunner<Result<SearchResultValue>> preMergeQueryDecoration(QueryRunner<Result<SearchResultValue>> runner)
{
return new SearchThresholdAdjustingQueryRunner(
new IntervalChunkingQueryRunner<Result<SearchResultValue>>(runner, config.getChunkPeriod()),
intervalChunkingQueryRunnerDecorator.decorate(runner, this),
config
);
}

View File

@ -18,13 +18,12 @@
package io.druid.query.search.search;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.query.QueryConfig;
import javax.validation.constraints.Min;
/**
*/
public class SearchQueryConfig extends QueryConfig
public class SearchQueryConfig
{
@JsonProperty
@Min(1)

View File

@ -32,9 +32,8 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity;
import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryMetricUtil;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
@ -66,14 +65,16 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
{
};
private final QueryConfig config;
private final ObjectMapper jsonMapper;
private final IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator;
@Inject
public SelectQueryQueryToolChest(QueryConfig config, ObjectMapper jsonMapper)
public SelectQueryQueryToolChest(ObjectMapper jsonMapper,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator)
{
this.config = config;
this.jsonMapper = jsonMapper;
this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator;
}
@Override
@ -270,11 +271,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
@Override
public QueryRunner<Result<SelectResultValue>> preMergeQueryDecoration(QueryRunner<Result<SelectResultValue>> runner)
{
return new IntervalChunkingQueryRunner<Result<SelectResultValue>>(
runner,
config.getChunkPeriod()
);
return intervalChunkingQueryRunnerDecorator.decorate(runner, this);
}
public Ordering<Result<SelectResultValue>> getOrdering()

View File

@ -17,23 +17,12 @@
package io.druid.query.timeseries;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity;
import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryCacheHelper;
import io.druid.query.QueryConfig;
import io.druid.query.QueryMetricUtil;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
@ -44,14 +33,27 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.filter.DimFilter;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent;
/**
*/
public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery>
@ -66,12 +68,12 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
{
};
private final QueryConfig config;
private final IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator;
@Inject
public TimeseriesQueryQueryToolChest(QueryConfig config)
public TimeseriesQueryQueryToolChest(IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator)
{
this.config = config;
this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator;
}
@Override
@ -221,11 +223,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
@Override
public QueryRunner<Result<TimeseriesResultValue>> preMergeQueryDecoration(QueryRunner<Result<TimeseriesResultValue>> runner)
{
return new IntervalChunkingQueryRunner<Result<TimeseriesResultValue>>(
runner,
config.getChunkPeriod()
);
return intervalChunkingQueryRunnerDecorator.decorate(runner, this);
}
public Ordering<Result<TimeseriesResultValue>> getOrdering()

View File

@ -18,13 +18,12 @@
package io.druid.query.topn;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.query.QueryConfig;
import javax.validation.constraints.Min;
/**
*/
public class TopNQueryConfig extends QueryConfig
public class TopNQueryConfig
{
@JsonProperty
@Min(1)

View File

@ -35,7 +35,7 @@ import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity;
import io.druid.query.BySegmentResultValue;
import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryCacheHelper;
import io.druid.query.QueryMetricUtil;
@ -70,12 +70,16 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
};
private final TopNQueryConfig config;
private final IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator;
@Inject
public TopNQueryQueryToolChest(
TopNQueryConfig config
TopNQueryConfig config,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator
)
{
this.config = config;
this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator;
}
protected static String[] extractFactoryName(final List<AggregatorFactory> aggregatorFactories){
@ -416,7 +420,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
@Override
public QueryRunner<Result<TopNResultValue>> preMergeQueryDecoration(QueryRunner<Result<TopNResultValue>> runner)
{
return new IntervalChunkingQueryRunner<>(runner, config.getChunkPeriod());
return intervalChunkingQueryRunnerDecorator.decorate(runner, this);
}
@Override

View File

@ -0,0 +1,135 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
public class AsyncQueryRunnerTest
{
private final static long TEST_TIMEOUT = 60000;
private final ExecutorService executor;
private final Query query;
public AsyncQueryRunnerTest() {
this.executor = Executors.newSingleThreadExecutor();
query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals("2014/2015")
.aggregators(Lists.<AggregatorFactory>newArrayList(new CountAggregatorFactory("count")))
.build();
}
@Test(timeout = TEST_TIMEOUT)
public void testAsyncNature() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
QueryRunner baseRunner = new QueryRunner()
{
@Override
public Sequence run(Query query, Map responseContext)
{
try {
latch.await();
return Sequences.simple(Lists.newArrayList(1));
} catch(InterruptedException ex) {
throw Throwables.propagate(ex);
}
}
};
AsyncQueryRunner asyncRunner = new AsyncQueryRunner<>(baseRunner, executor,
QueryRunnerTestHelper.NOOP_QUERYWATCHER);
Sequence lazy = asyncRunner.run(query, Collections.EMPTY_MAP);
latch.countDown();
Assert.assertEquals(Lists.newArrayList(1), Sequences.toList(lazy, Lists.newArrayList()));
}
@Test(timeout = TEST_TIMEOUT)
public void testQueryTimeoutHonored() {
QueryRunner baseRunner = new QueryRunner()
{
@Override
public Sequence run(Query query, Map responseContext)
{
try {
Thread.sleep(Long.MAX_VALUE);
throw new RuntimeException("query should not have completed");
} catch(InterruptedException ex) {
throw Throwables.propagate(ex);
}
}
};
AsyncQueryRunner asyncRunner = new AsyncQueryRunner<>(baseRunner, executor,
QueryRunnerTestHelper.NOOP_QUERYWATCHER);
Sequence lazy = asyncRunner.run(
query.withOverriddenContext(ImmutableMap.<String,Object>of("timeout", 1)),
Collections.EMPTY_MAP);
try {
Sequences.toList(lazy, Lists.newArrayList());
} catch(RuntimeException ex) {
Assert.assertTrue(ex.getCause() instanceof TimeoutException);
return;
}
Assert.fail();
}
@Test
public void testQueryRegistration() {
QueryRunner baseRunner = new QueryRunner()
{
@Override
public Sequence run(Query query, Map responseContext) { return null; }
};
QueryWatcher mock = EasyMock.createMock(QueryWatcher.class);
mock.registerQuery(EasyMock.eq(query), EasyMock.anyObject(ListenableFuture.class));
EasyMock.replay(mock);
AsyncQueryRunner asyncRunner = new AsyncQueryRunner<>(baseRunner, executor,
mock);
asyncRunner.run(query, Collections.EMPTY_MAP);
EasyMock.verify(mock);
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import io.druid.query.Druids.TimeseriesQueryBuilder;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceEmitter;
public class IntervalChunkingQueryRunnerTest
{
private IntervalChunkingQueryRunnerDecorator decorator;
private ExecutorService executors;
private QueryRunner baseRunner;
private QueryToolChest toolChest;
private final TimeseriesQueryBuilder queryBuilder;
public IntervalChunkingQueryRunnerTest() {
queryBuilder = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.aggregators(Lists.<AggregatorFactory>newArrayList(new CountAggregatorFactory("count")));
}
@Before
public void setup() {
executors = EasyMock.createMock(ExecutorService.class);
ServiceEmitter emitter = EasyMock.createNiceMock(ServiceEmitter.class);
decorator = new IntervalChunkingQueryRunnerDecorator(executors,
QueryRunnerTestHelper.NOOP_QUERYWATCHER, emitter);
baseRunner = EasyMock.createMock(QueryRunner.class);
toolChest = EasyMock.createNiceMock(QueryToolChest.class);
}
@Test
public void testDefaultNoChunking() {
Query query = queryBuilder.intervals("2014/2016").build();
EasyMock.expect(baseRunner.run(query, Collections.EMPTY_MAP)).andReturn(Sequences.empty());
EasyMock.replay(baseRunner);
QueryRunner runner = decorator.decorate(baseRunner, toolChest);
runner.run(query, Collections.EMPTY_MAP);
EasyMock.verify(baseRunner);
}
@Test
public void testChunking() {
Query query = queryBuilder.intervals("2015-01-01T00:00:00.000/2015-01-11T00:00:00.000").context(ImmutableMap.<String, Object>of("chunkPeriod", "P1D")).build();
executors.execute(EasyMock.anyObject(Runnable.class));
EasyMock.expectLastCall().times(10);
EasyMock.replay(executors);
EasyMock.replay(toolChest);
QueryRunner runner = decorator.decorate(baseRunner, toolChest);
runner.run(query, Collections.EMPTY_MAP);
EasyMock.verify(executors);
}
}

View File

@ -21,6 +21,8 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.guava.Sequence;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
@ -49,6 +51,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/**
*/
@ -306,4 +309,21 @@ public class QueryRunnerTestHelper
factory.getToolchest()
);
}
public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator()
{
return new IntervalChunkingQueryRunnerDecorator(null, null, null) {
@Override
public <T> QueryRunner<T> decorate(final QueryRunner<T> delegate,
QueryToolChest<T, ? extends Query<T>> toolChest) {
return new QueryRunner<T>() {
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
return delegate.run(query, responseContext);
}
};
}
};
}
}

View File

@ -82,7 +82,7 @@ public class RetryQueryRunnerTest
}
},
(QueryToolChest) new TimeseriesQueryQueryToolChest(
new QueryConfig()
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
),
new RetryQueryRunnerConfig()
{
@ -155,7 +155,7 @@ public class RetryQueryRunnerTest
}
},
(QueryToolChest) new TimeseriesQueryQueryToolChest(
new QueryConfig()
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
),
new RetryQueryRunnerConfig()
{
@ -222,7 +222,7 @@ public class RetryQueryRunnerTest
}
},
(QueryToolChest) new TimeseriesQueryQueryToolChest(
new QueryConfig()
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
),
new RetryQueryRunnerConfig()
{
@ -274,7 +274,7 @@ public class RetryQueryRunnerTest
}
},
(QueryToolChest) new TimeseriesQueryQueryToolChest(
new QueryConfig()
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
),
new RetryQueryRunnerConfig()
{

View File

@ -21,14 +21,17 @@ import com.google.common.base.Supplier;
import io.druid.collections.StupidPool;
import io.druid.query.search.SearchQueryQueryToolChest;
import io.druid.query.search.SearchQueryRunnerFactory;
import io.druid.query.search.SearchResultValue;
import io.druid.query.search.search.SearchQueryConfig;
import io.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.query.topn.TopNQueryConfig;
import io.druid.query.topn.TopNQueryQueryToolChest;
import io.druid.query.topn.TopNQueryRunnerFactory;
import io.druid.query.topn.TopNResultValue;
import io.druid.segment.Segment;
import java.nio.ByteBuffer;
@ -60,7 +63,8 @@ public class TestQueryRunners
{
QueryRunnerFactory factory = new TopNQueryRunnerFactory(
pool,
new TopNQueryQueryToolChest(topNConfig),
new TopNQueryQueryToolChest(topNConfig,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
return new FinalizeResultsQueryRunner<T>(
@ -74,7 +78,8 @@ public class TestQueryRunners
)
{
QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
@ -89,7 +94,10 @@ public class TestQueryRunners
Segment adapter
)
{
QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()), QueryRunnerTestHelper.NOOP_QUERYWATCHER);
QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(
new SearchQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER);
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
factory.getToolchest()

View File

@ -120,7 +120,8 @@ public class GroupByQueryRunnerTest
engine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
configSupplier,
new GroupByQueryQueryToolChest(configSupplier, mapper, engine, TestQueryRunners.pool),
new GroupByQueryQueryToolChest(configSupplier, mapper, engine, TestQueryRunners.pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
TestQueryRunners.pool
);
@ -141,7 +142,8 @@ public class GroupByQueryRunnerTest
singleThreadEngine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
singleThreadedConfigSupplier,
new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine, pool),
new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine, pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
pool
);
@ -1245,7 +1247,8 @@ public class GroupByQueryRunnerTest
configSupplier,
new DefaultObjectMapper(),
engine,
TestQueryRunners.pool
TestQueryRunners.pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
).mergeResults(runner);
Map<String, Object> context = Maps.newHashMap();
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, context), "no-limit");
@ -1301,7 +1304,8 @@ public class GroupByQueryRunnerTest
configSupplier,
new DefaultObjectMapper(),
engine,
TestQueryRunners.pool
TestQueryRunners.pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
).mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, context), "no-limit");
}
@ -1356,7 +1360,8 @@ public class GroupByQueryRunnerTest
configSupplier,
new DefaultObjectMapper(),
engine,
TestQueryRunners.pool
TestQueryRunners.pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
).mergeResults(runner);
TestHelper.assertExpectedObjects(expectedResults, mergeRunner.run(query, context), "no-limit");
}

View File

@ -75,7 +75,9 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
engine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
configSupplier,
new GroupByQueryQueryToolChest(configSupplier, new DefaultObjectMapper(), engine, TestQueryRunners.pool),
new GroupByQueryQueryToolChest(configSupplier, new DefaultObjectMapper(),
engine, TestQueryRunners.pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
TestQueryRunners.pool
);

View File

@ -56,7 +56,8 @@ public class SearchQueryRunnerTest
{
return QueryRunnerTestHelper.makeQueryRunners(
new SearchQueryRunnerFactory(
new SearchQueryQueryToolChest(new SearchQueryConfig()),
new SearchQueryQueryToolChest(new SearchQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);

View File

@ -23,7 +23,6 @@ import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequences;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
@ -57,7 +56,8 @@ public class SelectQueryRunnerTest
{
return QueryRunnerTestHelper.makeQueryRunners(
new SelectQueryRunnerFactory(
new SelectQueryQueryToolChest(new QueryConfig(), new DefaultObjectMapper()),
new SelectQueryQueryToolChest(new DefaultObjectMapper(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new SelectQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)

View File

@ -24,7 +24,6 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest;
@ -64,7 +63,7 @@ public class TimeSeriesUnionQueryRunnerTest
{
return QueryRunnerTestHelper.makeUnionQueryRunners(
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
@ -138,7 +137,8 @@ public class TimeSeriesUnionQueryRunnerTest
)
)
.build();
QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(new QueryConfig());
QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator());
QueryRunner mergingrunner = toolChest.mergeResults(
new UnionQueryRunner<Result<TimeseriesResultValue>>(
(Iterable) Arrays.asList(

View File

@ -26,7 +26,6 @@ import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
@ -90,7 +89,8 @@ public class TimeseriesQueryRunnerBonusTest
private static List<Result<TimeseriesResultValue>> runTimeseriesCount(IncrementalIndex index)
{
final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);

View File

@ -25,7 +25,6 @@ import com.metamx.common.guava.Sequences;
import io.druid.granularity.PeriodGranularity;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
@ -72,7 +71,8 @@ public class TimeseriesQueryRunnerTest
{
return QueryRunnerTestHelper.makeQueryRunners(
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)

View File

@ -97,7 +97,7 @@ public class TopNQueryRunnerBenchmark extends AbstractBenchmark
}
}
),
new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
testCaseMap.put(

View File

@ -78,7 +78,8 @@ public class TopNQueryRunnerTest
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
TestQueryRunners.getPool(),
new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryQueryToolChest(new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
@ -96,7 +97,8 @@ public class TopNQueryRunnerTest
}
}
),
new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryQueryToolChest(new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
@ -398,7 +400,10 @@ public class TopNQueryRunnerTest
)
)
);
Sequence<Result<TopNResultValue>> results = new TopNQueryQueryToolChest(new TopNQueryConfig()).postMergeQueryDecoration(
Sequence<Result<TopNResultValue>> results = new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
).postMergeQueryDecoration(
runner
).run(
query,

View File

@ -64,7 +64,7 @@ public class TopNUnionQueryTest
QueryRunnerTestHelper.makeUnionQueryRunners(
new TopNQueryRunnerFactory(
TestQueryRunners.getPool(),
new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
QueryRunnerTestHelper.unionDataSource
@ -83,7 +83,7 @@ public class TopNUnionQueryTest
}
}
),
new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
QueryRunnerTestHelper.unionDataSource

View File

@ -31,7 +31,6 @@ import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
@ -283,7 +282,7 @@ public class IncrementalIndexTest
final List<ListenableFuture<?>> queryFutures = new LinkedList<>();
final Segment incrementalIndexSegment = new IncrementalIndexSegment(index, null);
final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);

View File

@ -28,7 +28,6 @@ import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
@ -464,7 +463,8 @@ public class SpatialFilterBonusTest
);
try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
@ -551,7 +551,8 @@ public class SpatialFilterBonusTest
);
try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);

View File

@ -29,7 +29,6 @@ import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
@ -540,7 +539,8 @@ public class SpatialFilterTest
);
try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
@ -592,7 +592,8 @@ public class SpatialFilterTest
);
try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
@ -679,7 +680,8 @@ public class SpatialFilterTest
);
try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);

View File

@ -34,7 +34,6 @@ import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
@ -280,7 +279,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
final List<ListenableFuture<?>> queryFutures = new LinkedList<>();
final Segment incrementalIndexSegment = new IncrementalIndexSegment(incrementalIndex, null);
final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);

View File

@ -22,7 +22,6 @@ import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryToolChest;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
@ -72,7 +71,6 @@ public class QueryToolChestModule implements Module
binder.bind(entry.getValue()).in(LazySingleton.class);
}
JsonConfigProvider.bind(binder, "druid.query", QueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.groupBy", GroupByQueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.search", SearchQueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.topN", TopNQueryConfig.class);

View File

@ -17,15 +17,8 @@
package io.druid.server;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.CachingClusteredClient;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.PostProcessingOperator;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -35,10 +28,15 @@ import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.RetryQueryRunner;
import io.druid.query.RetryQueryRunnerConfig;
import io.druid.query.SegmentDescriptor;
import java.util.Map;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Map;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
/**
*/
@ -84,26 +82,14 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
final FinalizeResultsQueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<T>(
toolChest.postMergeQueryDecoration(
toolChest.mergeResults(
new MetricsEmittingQueryRunner<T>(
emitter,
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Override
public ServiceMetricEvent.Builder apply(@Nullable Query<T> input)
{
return toolChest.makeMetricBuilder(query);
}
},
toolChest.preMergeQueryDecoration(
new RetryQueryRunner<T>(
baseClient,
toolChest,
retryConfig,
objectMapper
objectMapper)
)
)
).withWaitMeasuredFromNow()
)
),
toolChest
);

View File

@ -47,6 +47,7 @@ import com.metamx.common.guava.MergeIterable;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.nary.TrinaryFn;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.MapCache;
@ -66,8 +67,8 @@ import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.SegmentDescriptor;
@ -118,6 +119,7 @@ import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.SingleElementPartitionChunk;
import io.druid.timeline.partition.StringPartitionChunk;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
@ -367,7 +369,8 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig()));
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()));
testQueryCaching(
runner,
@ -402,7 +405,8 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig()));
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()));
testQueryCaching(
runner,
@ -470,7 +474,8 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig()));
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()));
testQueryCaching(
runner,
@ -529,7 +534,9 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig()));
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())
);
testQueryCaching(
runner,
@ -571,7 +578,8 @@ public class CachingClusteredClientTest
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig()));
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()));
testQueryCaching(
runner,
1,
@ -642,7 +650,8 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig()));
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()));
testQueryCaching(
runner,
@ -714,7 +723,8 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig()));
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()));
testQueryCaching(
runner,
@ -791,7 +801,8 @@ public class CachingClusteredClientTest
),
client.mergeCachedAndUncachedSequences(
sequences,
new TopNQueryQueryToolChest(new TopNQueryConfig())
new TopNQueryQueryToolChest(new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())
)
);
}
@ -813,7 +824,8 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig()));
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()));
testQueryCaching(
runner,
builder.build(),
@ -882,7 +894,8 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig()));
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()));
testQueryCaching(
runner,
builder.build(),
@ -977,7 +990,10 @@ public class CachingClusteredClientTest
)
);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new SearchQueryQueryToolChest(new SearchQueryConfig()));
QueryRunner runner = new FinalizeResultsQueryRunner(client, new SearchQueryQueryToolChest(
new SearchQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())
);
HashMap<String,Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(
makeSearchResults(
@ -1046,8 +1062,8 @@ public class CachingClusteredClientTest
QueryRunner runner = new FinalizeResultsQueryRunner(
client,
new SelectQueryQueryToolChest(
new QueryConfig(),
jsonMapper
jsonMapper,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
@ -1150,7 +1166,8 @@ public class CachingClusteredClientTest
}
)
),
TestQueryRunners.pool
TestQueryRunners.pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
@ -1992,13 +2009,17 @@ public class CachingClusteredClientTest
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put(
TimeseriesQuery.class,
new TimeseriesQueryQueryToolChest(new QueryConfig())
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())
)
.put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig()))
.put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig()))
.put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()))
.put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()))
.put(
SelectQuery.class,
new SelectQueryQueryToolChest(new QueryConfig(), jsonMapper)
new SelectQueryQueryToolChest(jsonMapper,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator())
)
.put(
GroupByQuery.class,
@ -2018,7 +2039,8 @@ public class CachingClusteredClientTest
}
)
),
TestQueryRunners.pool
TestQueryRunners.pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
)
.put(TimeBoundaryQuery.class, new TimeBoundaryQueryQueryToolChest())

View File

@ -37,6 +37,7 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.SegmentDescriptor;
import io.druid.query.aggregation.AggregatorFactory;
@ -112,7 +113,8 @@ public class CachingQueryRunnerTest
String segmentIdentifier = "segment";
SegmentDescriptor segmentDescriptor = new SegmentDescriptor(new Interval("2011/2012"), "version", 0);
TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig());
TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator());
DefaultObjectMapper objectMapper = new DefaultObjectMapper();
CachingQueryRunner runner = new CachingQueryRunner(
segmentIdentifier,
@ -188,7 +190,8 @@ public class CachingQueryRunnerTest
Iterable<Result<TopNResultValue>> expectedResults = makeTopNResults(true, objects);
String segmentIdentifier = "segment";
SegmentDescriptor segmentDescriptor = new SegmentDescriptor(new Interval("2011/2012"), "version", 0);
TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig());
TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator());
final TopNQueryBuilder builder = new TopNQueryBuilder()
.dataSource("ds")