Refactor QueryRunner to accept QueryPlus: Query + QueryMetrics (part of #3798) (#4184)

* Add QueryPlus. Add QueryRunner.run(QueryPlus, Map) method with default implementation, to replace QueryRunner.run(Query, Map).

* Fix GroupByMergingQueryRunnerV2

* Fix QueryResourceTest

* Expand the comment to Query.run(walker, context)

* Remove legacy version of BySegmentSkippingQueryRunner.doRun()

* Add LegacyApiQueryRunnerTest and be more specific about legacy API removal plans in Druid 0.11 in Javadocs
This commit is contained in:
Roman Leventov 2017-05-10 14:25:00 -05:00 committed by Charles Allen
parent 11538e2ece
commit e09e892477
82 changed files with 668 additions and 380 deletions

View File

@ -25,6 +25,7 @@ import io.druid.query.BySegmentQueryRunner;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
@ -58,9 +59,9 @@ public class QueryBenchmarkUtil
QueryToolChest<T, ? extends Query<T>> toolChest) {
return new QueryRunner<T>() {
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
return delegate.run(query, responseContext);
return delegate.run(queryPlus, responseContext);
}
};
}

View File

@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import java.util.Map;
@ -47,12 +47,12 @@ public class SerializingQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(
final Query<T> query,
final QueryPlus<T> queryPlus,
final Map<String, Object> responseContext
)
{
return Sequences.map(
baseRunner.run(query, responseContext),
baseRunner.run(queryPlus, responseContext),
new Function<T, T>()
{
@Override

View File

@ -22,6 +22,7 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Yielder;
import io.druid.java.util.common.guava.YieldingAccumulator;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import java.io.IOException;
@ -36,13 +37,15 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
private long count = 0;
public ScanQueryLimitRowIterator(
QueryRunner<ScanResultValue> baseRunner, ScanQuery query,
QueryRunner<ScanResultValue> baseRunner,
QueryPlus<ScanResultValue> queryPlus,
Map<String, Object> responseContext
)
{
ScanQuery query = (ScanQuery) queryPlus.getQuery();
resultFormat = query.getResultFormat();
limit = query.getLimit();
Sequence<ScanResultValue> baseSequence = baseRunner.run(query, responseContext);
Sequence<ScanResultValue> baseSequence = baseRunner.run(queryPlus, responseContext);
yielder = baseSequence.toYielder(
null,
new YieldingAccumulator<ScanResultValue, ScanResultValue>()

View File

@ -25,9 +25,10 @@ import com.google.inject.Inject;
import io.druid.java.util.common.guava.BaseSequence;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.aggregation.MetricManipulationFn;
@ -55,12 +56,12 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
{
@Override
public Sequence<ScanResultValue> run(
final Query<ScanResultValue> query, final Map<String, Object> responseContext
final QueryPlus<ScanResultValue> queryPlus, final Map<String, Object> responseContext
)
{
ScanQuery scanQuery = (ScanQuery) query;
ScanQuery scanQuery = (ScanQuery) queryPlus.getQuery();
if (scanQuery.getLimit() == Long.MAX_VALUE) {
return runner.run(query, responseContext);
return runner.run(queryPlus, responseContext);
}
return new BaseSequence<>(
new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryLimitRowIterator>()
@ -68,7 +69,7 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
@Override
public ScanQueryLimitRowIterator make()
{
return new ScanQueryLimitRowIterator(runner, (ScanQuery) query, responseContext);
return new ScanQueryLimitRowIterator(runner, queryPlus, responseContext);
}
@Override
@ -109,14 +110,15 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
{
@Override
public Sequence<ScanResultValue> run(
Query<ScanResultValue> query, Map<String, Object> responseContext
QueryPlus<ScanResultValue> queryPlus, Map<String, Object> responseContext
)
{
ScanQuery scanQuery = (ScanQuery) query;
ScanQuery scanQuery = (ScanQuery) queryPlus.getQuery();
if (scanQuery.getDimensionsFilter() != null) {
scanQuery = scanQuery.withDimFilter(scanQuery.getDimensionsFilter().optimize());
queryPlus = queryPlus.withQuery(scanQuery);
}
return runner.run(scanQuery, responseContext);
return runner.run(queryPlus, responseContext);
}
};
}

View File

@ -26,6 +26,7 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
@ -70,12 +71,12 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
{
@Override
public Sequence<ScanResultValue> run(
final Query<ScanResultValue> query, final Map<String, Object> responseContext
final QueryPlus<ScanResultValue> queryPlus, final Map<String, Object> responseContext
)
{
// Note: this variable is effective only when queryContext has a timeout.
// See the comment of CTX_TIMEOUT_AT.
final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(query);
final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
responseContext.put(CTX_TIMEOUT_AT, timeoutAt);
return Sequences.concat(
Sequences.map(
@ -85,7 +86,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
@Override
public Sequence<ScanResultValue> apply(final QueryRunner<ScanResultValue> input)
{
return input.run(query, responseContext);
return input.run(queryPlus, responseContext);
}
}
)
@ -113,9 +114,10 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
@Override
public Sequence<ScanResultValue> run(
Query<ScanResultValue> query, Map<String, Object> responseContext
QueryPlus<ScanResultValue> queryPlus, Map<String, Object> responseContext
)
{
Query<ScanResultValue> query = queryPlus.getQuery();
if (!(query instanceof ScanQuery)) {
throw new ISE("Got a [%s] which isn't a %s", query.getClass(), ScanQuery.class);
}

View File

@ -28,7 +28,7 @@ import io.druid.java.util.common.guava.MergeSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
@ -214,15 +214,15 @@ public class MultiSegmentScanQueryTest
new QueryRunner<ScanResultValue>() {
@Override
public Sequence<ScanResultValue> run(
Query<ScanResultValue> query, Map<String, Object> responseContext
QueryPlus<ScanResultValue> queryPlus, Map<String, Object> responseContext
)
{
// simulate results back from 2 historicals
List<Sequence<ScanResultValue>> sequences = Lists.newArrayListWithExpectedSize(2);
sequences.add(factory.createRunner(segment0).run(query, new HashMap<String, Object>()));
sequences.add(factory.createRunner(segment1).run(query, new HashMap<String, Object>()));
sequences.add(factory.createRunner(segment0).run(queryPlus, new HashMap<String, Object>()));
sequences.add(factory.createRunner(segment1).run(queryPlus, new HashMap<String, Object>()));
return new MergeSequence<>(
query.getResultOrdering(),
queryPlus.getQuery().getResultOrdering(),
Sequences.simple(sequences)
);
}

View File

@ -57,6 +57,7 @@ import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.DruidMetrics;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
@ -626,9 +627,9 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
return query.run(appenderator, responseContext);
return queryPlus.run(appenderator, responseContext);
}
};
}

View File

@ -47,8 +47,9 @@ public class AsyncQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
final Query<T> query = queryPlus.getQuery();
final int priority = QueryContexts.getPriority(query);
final ListenableFuture<Sequence<T>> future = executor.submit(new AbstractPrioritizedCallable<Sequence<T>>(priority)
{
@ -57,7 +58,7 @@ public class AsyncQueryRunner<T> implements QueryRunner<T>
{
//Note: this is assumed that baseRunner does most of the work eagerly on call to the
//run() method and resulting sequence accumulate/yield is fast.
return baseRunner.run(query, responseContext);
return baseRunner.run(queryPlus, responseContext);
}
});
queryWatcher.registerQuery(query, future);

View File

@ -92,6 +92,7 @@ public abstract class BaseQuery<T extends Comparable<T>> implements Query<T>
return run(querySegmentSpec.lookup(this, walker), context);
}
@Override
public Sequence<T> run(QueryRunner<T> runner, Map<String, Object> context)
{
return runner.run(this, context);

View File

@ -49,10 +49,10 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
@Override
@SuppressWarnings("unchecked")
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
if (QueryContexts.isBySegment(query)) {
final Sequence<T> baseSequence = base.run(query, responseContext);
if (QueryContexts.isBySegment(queryPlus.getQuery())) {
final Sequence<T> baseSequence = base.run(queryPlus, responseContext);
final List<T> results = Sequences.toList(baseSequence, Lists.<T>newArrayList());
return Sequences.simple(
Arrays.asList(
@ -61,12 +61,12 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
new BySegmentResultValueClass<T>(
results,
segmentIdentifier,
query.getIntervals().get(0)
queryPlus.getQuery().getIntervals().get(0)
)
)
)
);
}
return base.run(query, responseContext);
return base.run(queryPlus, responseContext);
}
}

View File

@ -38,14 +38,14 @@ public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
if (QueryContexts.isBySegment(query)) {
return baseRunner.run(query, responseContext);
if (QueryContexts.isBySegment(queryPlus.getQuery())) {
return baseRunner.run(queryPlus, responseContext);
}
return doRun(baseRunner, query, responseContext);
return doRun(baseRunner, queryPlus, responseContext);
}
protected abstract Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query, Map<String, Object> context);
protected abstract Sequence<T> doRun(QueryRunner<T> baseRunner, QueryPlus<T> queryPlus, Map<String, Object> context);
}

View File

@ -58,9 +58,11 @@ public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
final Sequence<T> baseSequence = delegate.run(query, responseContext);
final QueryPlus<T> queryWithMetrics =
queryPlus.withQueryMetrics((QueryToolChest<T, ? extends Query<T>>) queryToolChest);
final Sequence<T> baseSequence = delegate.run(queryWithMetrics, responseContext);
return Sequences.wrap(
baseSequence,
new SequenceWrapper()
@ -82,13 +84,14 @@ public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
if (report) {
final long cpuTimeNs = cpuTimeAccumulator.get();
if (cpuTimeNs > 0) {
queryToolChest.makeMetrics(query).reportCpuTime(cpuTimeNs).emit(emitter);
queryWithMetrics.getQueryMetrics().reportCpuTime(cpuTimeNs).emit(emitter);
}
}
}
}
);
}
public static <T> QueryRunner<T> safeBuild(
QueryRunner<T> delegate,
QueryToolChest<?, ? super Query<T>> queryToolChest,

View File

@ -89,8 +89,9 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
Query<T> query = queryPlus.getQuery();
final int priority = QueryContexts.getPriority(query);
final Ordering ordering = query.getResultOrdering();
@ -121,7 +122,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
public Iterable<T> call() throws Exception
{
try {
Sequence<T> result = input.run(query, responseContext);
Sequence<T> result = input.run(queryPlus, responseContext);
if (result == null) {
throw new ISE("Got a null result! Segments are missing!");
}

View File

@ -38,7 +38,7 @@ public class ConcatQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
return Sequences.concat(
Sequences.map(
@ -48,7 +48,7 @@ public class ConcatQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> apply(final QueryRunner<T> input)
{
return input.run(query, responseContext);
return input.run(queryPlus, responseContext);
}
}
)

View File

@ -47,8 +47,9 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
final Query<T> query = queryPlus.getQuery();
final boolean isBySegment = QueryContexts.isBySegment(query);
final boolean shouldFinalize = QueryContexts.isFinalize(query, true);
@ -100,7 +101,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
return Sequences.map(
baseRunner.run(queryToRun, responseContext),
baseRunner.run(queryPlus.withQuery(queryToRun), responseContext),
finalizerFn
);

View File

@ -49,10 +49,10 @@ public class FluentQueryRunnerBuilder<T>
@Override
public Sequence<T> run(
Query<T> query, Map<String, Object> responseContext
QueryPlus<T> queryPlus, Map<String, Object> responseContext
)
{
return baseRunner.run(query, responseContext);
return baseRunner.run(queryPlus, responseContext);
}
public FluentQueryRunner from(QueryRunner<T> runner) {

View File

@ -78,9 +78,9 @@ public class GroupByMergedQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> queryParam, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
final GroupByQuery query = (GroupByQuery) queryParam;
final GroupByQuery query = (GroupByQuery) queryPlus.getQuery();
final GroupByQueryConfig querySpecificConfig = configSupplier.get().withOverrides(query);
final boolean isSingleThreaded = querySpecificConfig.isSingleThreaded();
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
@ -114,10 +114,10 @@ public class GroupByMergedQueryRunner<T> implements QueryRunner<T>
{
try {
if (bySegment) {
input.run(queryParam, responseContext)
input.run(queryPlus, responseContext)
.accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs);
} else {
input.run(queryParam, responseContext)
input.run(queryPlus, responseContext)
.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
}

View File

@ -63,18 +63,18 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
final Period chunkPeriod = getChunkPeriod(query);
final Period chunkPeriod = getChunkPeriod(queryPlus.getQuery());
// Check for non-empty chunkPeriod, avoiding toStandardDuration since that cannot handle periods like P1M.
if (EPOCH.plus(chunkPeriod).getMillis() == EPOCH.getMillis()) {
return baseRunner.run(query, responseContext);
return baseRunner.run(queryPlus, responseContext);
}
List<Interval> chunkIntervals = Lists.newArrayList(
FunctionalIterable
.create(query.getIntervals())
.create(queryPlus.getQuery().getIntervals())
.transformCat(
new Function<Interval, Iterable<Interval>>()
{
@ -88,7 +88,7 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
);
if (chunkIntervals.size() <= 1) {
return baseRunner.run(query, responseContext);
return baseRunner.run(queryPlus, responseContext);
}
return Sequences.concat(
@ -113,7 +113,7 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
),
executor, queryWatcher
).run(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))),
queryPlus.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))),
responseContext
);
}

View File

@ -81,9 +81,10 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
final QueryMetrics<? super Query<T>> queryMetrics = queryToolChest.makeMetrics(query);
QueryPlus<T> queryWithMetrics = queryPlus.withQueryMetrics((QueryToolChest<T, ? extends Query<T>>) queryToolChest);
final QueryMetrics<? super Query<T>> queryMetrics = (QueryMetrics<? super Query<T>>) queryWithMetrics.getQueryMetrics();
applyCustomDimensions.accept(queryMetrics);
@ -91,7 +92,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
// Use LazySequence because want to account execution time of queryRunner.run() (it prepares the underlying
// Sequence) as part of the reported query time, i. e. we want to execute queryRunner.run() after
// `startTime = System.nanoTime();` (see below).
new LazySequence<>(() -> queryRunner.run(query, responseContext)),
new LazySequence<>(() -> queryRunner.run(queryWithMetrics, responseContext)),
new SequenceWrapper()
{
private long startTimeNs;

View File

@ -29,7 +29,7 @@ import java.util.Map;
public class NoopQueryRunner<T> implements QueryRunner<T>
{
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
return Sequences.empty();
}

View File

@ -70,8 +70,19 @@ public interface Query<T>
String getType();
/**
* @deprecated use {@link QueryPlus#run(QuerySegmentWalker, Map)} instead. This method is going to be removed in Druid
* 0.11. In the future, a method like getRunner(QuerySegmentWalker, Map) could be added instead of this method, so
* that {@link QueryPlus#run(QuerySegmentWalker, Map)} could be implemented as {@code
* this.query.getRunner(walker, context).run(this, context))}.
*/
@Deprecated
Sequence<T> run(QuerySegmentWalker walker, Map<String, Object> context);
/**
* @deprecated use {@link QueryRunner#run(QueryPlus, Map)} instead. This method is going to be removed in Druid 0.11.
*/
@Deprecated
Sequence<T> run(QueryRunner<T> runner, Map<String, Object> context);
List<Interval> getIntervals();

View File

@ -0,0 +1,104 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import com.google.common.base.Preconditions;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.spec.QuerySegmentSpec;
import javax.annotation.Nullable;
import java.util.Map;
/**
* An immutable composite object of {@link Query} + extra stuff needed in {@link QueryRunner}s. This "extra stuff"
* is only {@link QueryMetrics} yet.
*/
public final class QueryPlus<T>
{
/**
* Returns the minimum bare QueryPlus object with the given query. {@link #getQueryMetrics()} of the QueryPlus object,
* returned from this factory method, returns {@code null}.
*/
public static <T> QueryPlus<T> wrap(Query<T> query)
{
Preconditions.checkNotNull(query);
return new QueryPlus<>(query, null);
}
private final Query<T> query;
private final QueryMetrics<?> queryMetrics;
private QueryPlus(Query<T> query, QueryMetrics<?> queryMetrics)
{
this.query = query;
this.queryMetrics = queryMetrics;
}
public Query<T> getQuery()
{
return query;
}
@Nullable
public QueryMetrics<?> getQueryMetrics()
{
return queryMetrics;
}
/**
* Returns the same QueryPlus object, if it already has {@link QueryMetrics} ({@link #getQueryMetrics()} returns not
* null), or returns a new QueryPlus object with {@link Query} from this QueryPlus and QueryMetrics created using the
* given {@link QueryToolChest}, via {@link QueryToolChest#makeMetrics(Query)} method.
*/
public QueryPlus<T> withQueryMetrics(QueryToolChest<T, ? extends Query<T>> queryToolChest)
{
if (queryMetrics != null) {
return this;
} else {
return new QueryPlus<>(query, ((QueryToolChest) queryToolChest).makeMetrics(query));
}
}
/**
* Equivalent of withQuery(getQuery().withQuerySegmentSpec(spec)).
*/
public QueryPlus<T> withQuerySegmentSpec(QuerySegmentSpec spec)
{
return new QueryPlus<>(query.withQuerySegmentSpec(spec), queryMetrics);
}
/**
* Returns a QueryPlus object with {@link QueryMetrics} from this QueryPlus object, and the provided {@link Query}.
*/
public QueryPlus<T> withQuery(Query<T> replacementQuery)
{
return new QueryPlus<>(replacementQuery, queryMetrics);
}
public Sequence<T> run(QuerySegmentWalker walker, Map<String, Object> context)
{
if (query instanceof BaseQuery) {
return ((BaseQuery) query).getQuerySegmentSpec().lookup(query, walker).run(this, context);
} else {
// fallback
return query.run(walker, context);
}
}
}

View File

@ -24,14 +24,26 @@ import io.druid.java.util.common.guava.Sequence;
import java.util.Map;
/**
* This interface has two similar run() methods. {@link #run(Query, Map)} is legacy and {@link #run(QueryPlus, Map)}
* is the new one. Their default implementations delegate to each other. Every implementation of QueryRunner should
* override only one of those methods. New implementations should override the new method: {@link #run(QueryPlus, Map)}.
*/
public interface QueryRunner<T>
{
/**
* Runs the given query and returns results in a time-ordered sequence
* @param query
* @param responseContext
* @return
* @deprecated use and override {@link #run(QueryPlus, Map)} instead. This method is going to be removed in Druid 0.11
*/
Sequence<T> run(Query<T> query, Map<String, Object> responseContext);
@Deprecated
default Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
return run(QueryPlus.wrap(query), responseContext);
}
/**
* Runs the given query and returns results in a time-ordered sequence.
*/
default Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
return run(queryPlus.getQuery(), responseContext);
}
}

View File

@ -77,9 +77,9 @@ public class QueryRunnerHelper
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
return Sequences.withBaggage(runner.run(query, responseContext), closeable);
return Sequences.withBaggage(runner.run(queryPlus, responseContext), closeable);
}
};
}

View File

@ -47,12 +47,12 @@ public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
final Closeable closeable = adapter.increment();
if (closeable != null) {
try {
final Sequence<T> baseSequence = factory.createRunner(adapter).run(query, responseContext);
final Sequence<T> baseSequence = factory.createRunner(adapter).run(queryPlus, responseContext);
return Sequences.withBaggage(baseSequence, closeable);
}
@ -62,7 +62,7 @@ public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
}
} else {
// Segment was closed before we had a chance to increment the reference count
return new ReportTimelineMissingSegmentQueryRunner<T>(descriptor).run(query, responseContext);
return new ReportTimelineMissingSegmentQueryRunner<T>(descriptor).run(queryPlus, responseContext);
}
}
}

View File

@ -39,7 +39,7 @@ public class ReportTimelineMissingSegmentQueryRunner<T> implements QueryRunner<T
@Override
public Sequence<T> run(
Query<T> query, Map<String, Object> responseContext
QueryPlus<T> queryPlus, Map<String, Object> responseContext
)
{
List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) responseContext.get(Result.MISSING_SEGMENTS_KEY);

View File

@ -38,9 +38,10 @@ public abstract class ResultMergeQueryRunner<T> extends BySegmentSkippingQueryRu
}
@Override
public Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query, Map<String, Object> context)
public Sequence<T> doRun(QueryRunner<T> baseRunner, QueryPlus<T> queryPlus, Map<String, Object> context)
{
return CombiningSequence.create(baseRunner.run(query, context), makeOrdering(query), createMergeFn(query));
Query<T> query = queryPlus.getQuery();
return CombiningSequence.create(baseRunner.run(queryPlus, context), makeOrdering(query), createMergeFn(query));
}
protected abstract Ordering<T> makeOrdering(Query<T> query);

View File

@ -61,10 +61,10 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> context)
{
final List<Sequence<T>> listOfSequences = Lists.newArrayList();
listOfSequences.add(baseRunner.run(query, context));
listOfSequences.add(baseRunner.run(queryPlus, context));
return new YieldingSequenceBase<T>()
{
@ -80,12 +80,12 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i);
context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList());
final Query<T> retryQuery = query.withQuerySegmentSpec(
final QueryPlus<T> retryQueryPlus = queryPlus.withQuerySegmentSpec(
new MultipleSpecificSegmentSpec(
missingSegments
)
);
Sequence<T> retrySequence = baseRunner.run(retryQuery, context);
Sequence<T> retrySequence = baseRunner.run(retryQueryPlus, context);
listOfSequences.add(retrySequence);
missingSegments = getMissingSegments(context);
if (missingSegments.isEmpty()) {
@ -99,7 +99,7 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
}
return new MergeSequence<>(
query.getResultOrdering(),
queryPlus.getQuery().getResultOrdering(),
Sequences.simple(listOfSequences)).toYielder(
initValue, accumulator
);

View File

@ -36,13 +36,13 @@ public class SubqueryQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
DataSource dataSource = query.getDataSource();
DataSource dataSource = queryPlus.getQuery().getDataSource();
if (dataSource instanceof QueryDataSource) {
return run((Query<T>) ((QueryDataSource) dataSource).getQuery(), responseContext);
return run(queryPlus.withQuery((Query<T>) ((QueryDataSource) dataSource).getQuery()), responseContext);
} else {
return baseRunner.run(query, responseContext);
return baseRunner.run(queryPlus, responseContext);
}
}
}

View File

@ -81,18 +81,18 @@ public class TimewarpOperator<T> implements PostProcessingOperator<T>
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
final long offset = computeOffset(now);
final Interval interval = query.getIntervals().get(0);
final Interval interval = queryPlus.getQuery().getIntervals().get(0);
final Interval modifiedInterval = new Interval(
Math.min(interval.getStartMillis() + offset, now + offset),
Math.min(interval.getEndMillis() + offset, now + offset)
);
return Sequences.map(
baseRunner.run(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval))),
queryPlus.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval))),
responseContext
),
new Function<T, T>()
@ -113,7 +113,7 @@ public class TimewarpOperator<T> implements PostProcessingOperator<T>
final DateTime maxTime = boundary.getMaxTime();
return (T) ((TimeBoundaryQuery) query).buildResult(
return (T) ((TimeBoundaryQuery) queryPlus.getQuery()).buildResult(
new DateTime(Math.min(res.getTimestamp().getMillis() - offset, now)),
minTime != null ? minTime.minus(offset) : null,
maxTime != null ? new DateTime(Math.min(maxTime.getMillis() - offset, now)) : null

View File

@ -39,8 +39,9 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
Query<T> query = queryPlus.getQuery();
DataSource dataSource = query.getDataSource();
if (dataSource instanceof UnionDataSource) {
@ -55,7 +56,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
public Sequence<T> apply(DataSource singleSource)
{
return baseRunner.run(
query.withDataSource(singleSource),
queryPlus.withQuery(query.withDataSource(singleSource)),
responseContext
);
}
@ -64,7 +65,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
)
);
} else {
return baseRunner.run(query, responseContext);
return baseRunner.run(queryPlus, responseContext);
}
}

View File

@ -25,6 +25,7 @@ import io.druid.java.util.common.guava.BaseSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
@ -88,15 +89,16 @@ public class DataSourceMetadataQueryRunnerFactory
@Override
public Sequence<Result<DataSourceMetadataResultValue>> run(
Query<Result<DataSourceMetadataResultValue>> input,
QueryPlus<Result<DataSourceMetadataResultValue>> input,
Map<String, Object> responseContext
)
{
if (!(input instanceof DataSourceMetadataQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass().getCanonicalName(), DataSourceMetadataQuery.class);
Query<Result<DataSourceMetadataResultValue>> query = input.getQuery();
if (!(query instanceof DataSourceMetadataQuery)) {
throw new ISE("Got a [%s] which isn't a %s", query.getClass().getCanonicalName(), DataSourceMetadataQuery.class);
}
final DataSourceMetadataQuery legacyQuery = (DataSourceMetadataQuery) input;
final DataSourceMetadataQuery legacyQuery = (DataSourceMetadataQuery) query;
return new BaseSequence<>(
new BaseSequence.IteratorMaker<Result<DataSourceMetadataResultValue>, Iterator<Result<DataSourceMetadataResultValue>>>()

View File

@ -33,6 +33,7 @@ import io.druid.query.CacheStrategy;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
@ -93,15 +94,15 @@ public class DataSourceQueryQueryToolChest
@Override
protected Sequence<Result<DataSourceMetadataResultValue>> doRun(
QueryRunner<Result<DataSourceMetadataResultValue>> baseRunner,
Query<Result<DataSourceMetadataResultValue>> input,
QueryPlus<Result<DataSourceMetadataResultValue>> input,
Map<String, Object> context
)
{
DataSourceMetadataQuery query = (DataSourceMetadataQuery) input;
DataSourceMetadataQuery query = (DataSourceMetadataQuery) input.getQuery();
return Sequences.simple(
query.mergeResults(
Sequences.toList(
baseRunner.run(query, context),
baseRunner.run(input, context),
Lists.<Result<DataSourceMetadataResultValue>>newArrayList()
)
)

View File

@ -41,9 +41,9 @@ import io.druid.java.util.common.guava.Sequences;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSource;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.SubqueryQueryRunner;
@ -113,13 +113,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
return new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(Query<Row> query, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
{
if (QueryContexts.isBySegment(query)) {
return runner.run(query, responseContext);
if (QueryContexts.isBySegment(queryPlus.getQuery())) {
return runner.run(queryPlus, responseContext);
}
final GroupByQuery groupByQuery = (GroupByQuery) query;
final GroupByQuery groupByQuery = (GroupByQuery) queryPlus.getQuery();
if (strategySelector.strategize(groupByQuery).doMergeResults(groupByQuery)) {
return initAndMergeGroupByResults(
groupByQuery,
@ -127,7 +127,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
responseContext
);
}
return runner.run(query, responseContext);
return runner.run(queryPlus, responseContext);
}
};
}
@ -327,9 +327,9 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(Query<Row> query, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
{
GroupByQuery groupByQuery = (GroupByQuery) query;
GroupByQuery groupByQuery = (GroupByQuery) queryPlus.getQuery();
if (groupByQuery.getDimFilter() != null) {
groupByQuery = groupByQuery.withDimFilter(groupByQuery.getDimFilter().optimize());
}
@ -365,7 +365,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
GroupByQueryQueryToolChest.this
)
.run(
delegateGroupByQuery.withDimensionSpecs(dimensionSpecs),
queryPlus.withQuery(delegateGroupByQuery.withDimensionSpecs(dimensionSpecs)),
responseContext
);
}

View File

@ -26,6 +26,7 @@ import io.druid.data.input.Row;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
@ -68,12 +69,13 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
return new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(Query<Row> query, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
{
return strategySelector.strategize((GroupByQuery) query).mergeRunners(queryExecutor, queryRunners).run(
query,
responseContext
QueryRunner<Row> rowQueryRunner = strategySelector.strategize((GroupByQuery) queryPlus.getQuery()).mergeRunners(
queryExecutor,
queryRunners
);
return rowQueryRunner.run(queryPlus, responseContext);
}
};
}
@ -96,13 +98,14 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
}
@Override
public Sequence<Row> run(Query<Row> input, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
{
if (!(input instanceof GroupByQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), GroupByQuery.class);
Query<Row> query = queryPlus.getQuery();
if (!(query instanceof GroupByQuery)) {
throw new ISE("Got a [%s] which isn't a %s", query.getClass(), GroupByQuery.class);
}
return strategySelector.strategize((GroupByQuery) input).process((GroupByQuery) input, adapter);
return strategySelector.strategize((GroupByQuery) query).process((GroupByQuery) query, adapter);
}
}
}

View File

@ -45,9 +45,9 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.AbstractPrioritizedCallable;
import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryWatcher;
import io.druid.query.ResourceLimitExceededException;
@ -105,9 +105,9 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
}
@Override
public Sequence<Row> run(final Query<Row> queryParam, final Map<String, Object> responseContext)
public Sequence<Row> run(final QueryPlus<Row> queryPlus, final Map<String, Object> responseContext)
{
final GroupByQuery query = (GroupByQuery) queryParam;
final GroupByQuery query = (GroupByQuery) queryPlus.getQuery();
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
// CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION is here because realtime servers use nested mergeRunners calls
@ -119,12 +119,13 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION,
false
);
final GroupByQuery queryForRunners = query.withOverriddenContext(
ImmutableMap.<String, Object>of(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, true)
final QueryPlus<Row> queryPlusForRunners = queryPlus.withQuery(
query.withOverriddenContext(ImmutableMap.<String, Object>of(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, true))
);
if (QueryContexts.isBySegment(query) || forceChainedExecution) {
return new ChainedExecutionQueryRunner(exec, queryWatcher, queryables).run(query, responseContext);
ChainedExecutionQueryRunner<Row> runner = new ChainedExecutionQueryRunner<>(exec, queryWatcher, queryables);
return runner.run(queryPlusForRunners, responseContext);
}
final boolean isSingleThreaded = querySpecificConfig.isSingleThreaded();
@ -225,7 +226,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
Releaser bufferReleaser = mergeBufferHolder.increment();
Releaser grouperReleaser = grouperHolder.increment()
) {
final AggregateResult retVal = input.run(queryForRunners, responseContext)
final AggregateResult retVal = input.run(queryPlusForRunners, responseContext)
.accumulate(
AggregateResult.ok(),
accumulator

View File

@ -37,6 +37,7 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.GroupByMergedQueryRunner;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryWatcher;
import io.druid.query.aggregation.AggregatorFactory;
@ -120,25 +121,27 @@ public class GroupByStrategyV1 implements GroupByStrategy
configSupplier.get(),
bufferPool,
baseRunner.run(
new GroupByQuery.Builder(query)
// Don't do post aggs until the end of this method.
.setPostAggregatorSpecs(ImmutableList.of())
// Don't do "having" clause until the end of this method.
.setHavingSpec(null)
.setLimitSpec(NoopLimitSpec.instance())
.overrideContext(
ImmutableMap.of(
"finalize", false,
//setting sort to false avoids unnecessary sorting while merging results. we only need to sort
//in the end when returning results to user. (note this is only respected by groupBy v1)
GroupByQueryHelper.CTX_KEY_SORT_RESULTS, false,
//no merging needed at historicals because GroupByQueryRunnerFactory.mergeRunners(..) would
//return merged results. (note this is only respected by groupBy v1)
GroupByQueryQueryToolChest.GROUP_BY_MERGE_KEY, false,
GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V1
QueryPlus.wrap(
new GroupByQuery.Builder(query)
// Don't do post aggs until the end of this method.
.setPostAggregatorSpecs(ImmutableList.of())
// Don't do "having" clause until the end of this method.
.setHavingSpec(null)
.setLimitSpec(NoopLimitSpec.instance())
.overrideContext(
ImmutableMap.of(
"finalize", false,
//set sort to false avoids unnecessary sorting while merging results. we only need to sort
//in the end when returning results to user. (note this is only respected by groupBy v1)
GroupByQueryHelper.CTX_KEY_SORT_RESULTS, false,
//no merging needed at historicals because GroupByQueryRunnerFactory.mergeRunners(..) would
//return merged results. (note this is only respected by groupBy v1)
GroupByQueryQueryToolChest.GROUP_BY_MERGE_KEY, false,
GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V1
)
)
)
.build(),
.build()
),
responseContext
),
true

View File

@ -48,6 +48,7 @@ import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryWatcher;
import io.druid.query.ResourceLimitExceededException;
@ -233,21 +234,23 @@ public class GroupByStrategyV2 implements GroupByStrategy
return query.postProcess(
Sequences.map(
mergingQueryRunner.run(
new GroupByQuery.Builder(query)
// Don't do post aggs until the end of this method.
.setPostAggregatorSpecs(ImmutableList.of())
// Don't do "having" clause until the end of this method.
.setHavingSpec(null)
.setLimitSpec(NoopLimitSpec.instance())
.overrideContext(
ImmutableMap.of(
"finalize", false,
GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2,
CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp == null ? "" : String.valueOf(fudgeTimestamp.getMillis()),
CTX_KEY_OUTERMOST, false
QueryPlus.wrap(
new GroupByQuery.Builder(query)
// Don't do post aggs until the end of this method.
.setPostAggregatorSpecs(ImmutableList.of())
// Don't do "having" clause until the end of this method.
.setHavingSpec(null)
.setLimitSpec(NoopLimitSpec.instance())
.overrideContext(
ImmutableMap.of(
"finalize", false,
GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2,
CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp == null ? "" : String.valueOf(fudgeTimestamp.getMillis()),
CTX_KEY_OUTERMOST, false
)
)
)
.build(),
.build()
),
responseContext
),
new Function<Row, Row>()
@ -304,7 +307,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
return mergeResults(new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(Query<Row> query, Map<String, Object> responseContext)
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
{
return results;
}

View File

@ -43,6 +43,7 @@ import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.ResultMergeQueryRunner;
@ -102,13 +103,14 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
@Override
public Sequence<SegmentAnalysis> doRun(
QueryRunner<SegmentAnalysis> baseRunner,
Query<SegmentAnalysis> query,
QueryPlus<SegmentAnalysis> queryPlus,
Map<String, Object> context
)
{
Query<SegmentAnalysis> query = queryPlus.getQuery();
return new MappedSequence<>(
CombiningSequence.create(
baseRunner.run(query, context),
baseRunner.run(queryPlus, context),
makeOrdering(query),
createMergeFn(query)
),

View File

@ -36,6 +36,7 @@ import io.druid.query.ConcatQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
@ -83,9 +84,9 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
return new QueryRunner<SegmentAnalysis>()
{
@Override
public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Object> responseContext)
public Sequence<SegmentAnalysis> run(QueryPlus<SegmentAnalysis> inQ, Map<String, Object> responseContext)
{
SegmentMetadataQuery query = (SegmentMetadataQuery) inQ;
SegmentMetadataQuery query = (SegmentMetadataQuery) inQ.getQuery();
final SegmentAnalyzer analyzer = new SegmentAnalyzer(query.getAnalysisTypes());
final Map<String, ColumnAnalysis> analyzedColumns = analyzer.analyze(segment);
final long numRows = analyzer.numRows(segment);
@ -197,10 +198,11 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
{
@Override
public Sequence<SegmentAnalysis> run(
final Query<SegmentAnalysis> query,
final QueryPlus<SegmentAnalysis> queryPlus,
final Map<String, Object> responseContext
)
{
final Query<SegmentAnalysis> query = queryPlus.getQuery();
final int priority = QueryContexts.getPriority(query);
ListenableFuture<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
new AbstractPrioritizedCallable<Sequence<SegmentAnalysis>>(priority)
@ -209,7 +211,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
public Sequence<SegmentAnalysis> call() throws Exception
{
return Sequences.simple(
Sequences.toList(input.run(query, responseContext), new ArrayList<>())
Sequences.toList(input.run(queryPlus, responseContext), new ArrayList<>())
);
}
}

View File

@ -42,6 +42,7 @@ import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
@ -348,14 +349,15 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
{
@Override
public Sequence<Result<SearchResultValue>> run(
Query<Result<SearchResultValue>> query, Map<String, Object> responseContext
QueryPlus<Result<SearchResultValue>> queryPlus, Map<String, Object> responseContext
)
{
SearchQuery searchQuery = (SearchQuery) query;
SearchQuery searchQuery = (SearchQuery) queryPlus.getQuery();
if (searchQuery.getDimensionsFilter() != null) {
searchQuery = searchQuery.withDimFilter(searchQuery.getDimensionsFilter().optimize());
queryPlus = queryPlus.withQuery(searchQuery);
}
return runner.run(searchQuery, responseContext);
return runner.run(queryPlus, responseContext);
}
} , this),
config
@ -378,23 +380,24 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
@Override
public Sequence<Result<SearchResultValue>> run(
Query<Result<SearchResultValue>> input,
QueryPlus<Result<SearchResultValue>> queryPlus,
Map<String, Object> responseContext
)
{
Query<Result<SearchResultValue>> input = queryPlus.getQuery();
if (!(input instanceof SearchQuery)) {
throw new ISE("Can only handle [%s], got [%s]", SearchQuery.class, input.getClass());
}
final SearchQuery query = (SearchQuery) input;
if (query.getLimit() < config.getMaxSearchLimit()) {
return runner.run(query, responseContext);
return runner.run(queryPlus, responseContext);
}
final boolean isBySegment = QueryContexts.isBySegment(query);
return Sequences.map(
runner.run(query.withLimit(config.getMaxSearchLimit()), responseContext),
runner.run(queryPlus.withQuery(query.withLimit(config.getMaxSearchLimit())), responseContext),
new Function<Result<SearchResultValue>, Result<SearchResultValue>>()
{
@Override

View File

@ -30,6 +30,7 @@ import io.druid.java.util.common.guava.FunctionalIterable;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.Result;
import io.druid.query.dimension.ColumnSelectorStrategy;
@ -186,10 +187,11 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
@Override
public Sequence<Result<SearchResultValue>> run(
final Query<Result<SearchResultValue>> input,
final QueryPlus<Result<SearchResultValue>> queryPlus,
Map<String, Object> responseContext
)
{
Query<Result<SearchResultValue>> input = queryPlus.getQuery();
if (!(input instanceof SearchQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), SearchQuery.class);
}

View File

@ -43,6 +43,7 @@ import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
@ -348,14 +349,15 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
{
@Override
public Sequence<Result<SelectResultValue>> run(
Query<Result<SelectResultValue>> query, Map<String, Object> responseContext
QueryPlus<Result<SelectResultValue>> queryPlus, Map<String, Object> responseContext
)
{
SelectQuery selectQuery = (SelectQuery) query;
SelectQuery selectQuery = (SelectQuery) queryPlus.getQuery();
if (selectQuery.getDimensionsFilter() != null) {
selectQuery = selectQuery.withDimFilter(selectQuery.getDimensionsFilter().optimize());
queryPlus = queryPlus.withQuery(selectQuery);
}
return runner.run(selectQuery, responseContext);
return runner.run(queryPlus, responseContext);
}
}, this);
}

View File

@ -24,6 +24,7 @@ import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
@ -90,10 +91,11 @@ public class SelectQueryRunnerFactory
@Override
public Sequence<Result<SelectResultValue>> run(
Query<Result<SelectResultValue>> input,
QueryPlus<Result<SelectResultValue>> queryPlus,
Map<String, Object> responseContext
)
{
Query<Result<SelectResultValue>> input = queryPlus.getQuery();
if (!(input instanceof SelectQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), SelectQuery.class);
}

View File

@ -29,6 +29,7 @@ import io.druid.java.util.common.guava.Yielder;
import io.druid.java.util.common.guava.Yielders;
import io.druid.java.util.common.guava.YieldingAccumulator;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.Result;
import io.druid.query.SegmentDescriptor;
@ -55,9 +56,10 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> input, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> input, final Map<String, Object> responseContext)
{
final Query<T> query = input.withQuerySegmentSpec(specificSpec);
final QueryPlus<T> queryPlus = input.withQuerySegmentSpec(specificSpec);
final Query<T> query = queryPlus.getQuery();
final Thread currThread = Thread.currentThread();
final String currThreadName = currThread.getName();
@ -69,7 +71,7 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> get()
{
return base.run(query, responseContext);
return base.run(queryPlus, responseContext);
}
}
);

View File

@ -32,9 +32,10 @@ import io.druid.java.util.common.guava.Sequences;
import io.druid.query.BySegmentSkippingQueryRunner;
import io.druid.query.CacheStrategy;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
@ -109,13 +110,15 @@ public class TimeBoundaryQueryQueryToolChest
{
@Override
protected Sequence<Result<TimeBoundaryResultValue>> doRun(
QueryRunner<Result<TimeBoundaryResultValue>> baseRunner, Query<Result<TimeBoundaryResultValue>> input, Map<String, Object> context
QueryRunner<Result<TimeBoundaryResultValue>> baseRunner,
QueryPlus<Result<TimeBoundaryResultValue>> input,
Map<String, Object> context
)
{
TimeBoundaryQuery query = (TimeBoundaryQuery) input;
TimeBoundaryQuery query = (TimeBoundaryQuery) input.getQuery();
return Sequences.simple(
query.mergeResults(
Sequences.toList(baseRunner.run(query, context), Lists.<Result<TimeBoundaryResultValue>>newArrayList())
Sequences.toList(baseRunner.run(input, context), Lists.<Result<TimeBoundaryResultValue>>newArrayList())
)
);
}

View File

@ -29,6 +29,7 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerHelper;
@ -131,10 +132,11 @@ public class TimeBoundaryQueryRunnerFactory
@Override
public Sequence<Result<TimeBoundaryResultValue>> run(
final Query<Result<TimeBoundaryResultValue>> input,
final QueryPlus<Result<TimeBoundaryResultValue>> queryPlus,
final Map<String, Object> responseContext
)
{
Query<Result<TimeBoundaryResultValue>> input = queryPlus.getQuery();
if (!(input instanceof TimeBoundaryQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TimeBoundaryQuery.class);
}

View File

@ -33,6 +33,7 @@ import io.druid.java.util.common.guava.nary.BinaryFn;
import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
@ -92,14 +93,14 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
@Override
public Sequence<Result<TimeseriesResultValue>> doRun(
QueryRunner<Result<TimeseriesResultValue>> baseRunner,
Query<Result<TimeseriesResultValue>> query,
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> context
)
{
return super.doRun(
baseRunner,
// Don't do post aggs until makePostComputeManipulatorFn() is called
((TimeseriesQuery) query).withPostAggregatorSpecs(ImmutableList.<PostAggregator>of()),
queryPlus.withQuery(((TimeseriesQuery) queryPlus.getQuery()).withPostAggregatorSpecs(ImmutableList.of())),
context
);
}
@ -234,14 +235,15 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query, Map<String, Object> responseContext
QueryPlus<Result<TimeseriesResultValue>> queryPlus, Map<String, Object> responseContext
)
{
TimeseriesQuery timeseriesQuery = (TimeseriesQuery) query;
TimeseriesQuery timeseriesQuery = (TimeseriesQuery) queryPlus.getQuery();
if (timeseriesQuery.getDimensionsFilter() != null) {
timeseriesQuery = timeseriesQuery.withDimFilter(timeseriesQuery.getDimensionsFilter().optimize());
queryPlus = queryPlus.withQuery(timeseriesQuery);
}
return runner.run(timeseriesQuery, responseContext);
return runner.run(queryPlus, responseContext);
}
}, this);
}

View File

@ -24,6 +24,7 @@ import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
@ -91,10 +92,11 @@ public class TimeseriesQueryRunnerFactory
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> input,
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> responseContext
)
{
Query<Result<TimeseriesResultValue>> input = queryPlus.getQuery();
if (!(input instanceof TimeseriesQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TimeseriesQuery.class);
}

View File

@ -37,6 +37,7 @@ import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
@ -416,26 +417,27 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
{
@Override
public Sequence<Result<TopNResultValue>> run(
Query<Result<TopNResultValue>> query, Map<String, Object> responseContext
QueryPlus<Result<TopNResultValue>> queryPlus, Map<String, Object> responseContext
)
{
TopNQuery topNQuery = (TopNQuery) query;
TopNQuery topNQuery = (TopNQuery) queryPlus.getQuery();
if (topNQuery.getDimensionsFilter() != null) {
topNQuery = topNQuery.withDimFilter(topNQuery.getDimensionsFilter().optimize());
}
final TopNQuery delegateTopNQuery = topNQuery;
if (TopNQueryEngine.canApplyExtractionInPost(delegateTopNQuery)) {
final DimensionSpec dimensionSpec = delegateTopNQuery.getDimensionSpec();
return runner.run(
QueryPlus<Result<TopNResultValue>> delegateQueryPlus = queryPlus.withQuery(
delegateTopNQuery.withDimensionSpec(
new DefaultDimensionSpec(
dimensionSpec.getDimension(),
dimensionSpec.getOutputName()
)
), responseContext
)
);
return runner.run(delegateQueryPlus, responseContext);
} else {
return runner.run(delegateTopNQuery, responseContext);
return runner.run(queryPlus.withQuery(delegateTopNQuery), responseContext);
}
}
}
@ -455,12 +457,12 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
@Override
public Sequence<Result<TopNResultValue>> run(
final Query<Result<TopNResultValue>> query, final Map<String, Object> responseContext
final QueryPlus<Result<TopNResultValue>> queryPlus, final Map<String, Object> responseContext
)
{
// thresholdRunner.run throws ISE if query is not TopNQuery
final Sequence<Result<TopNResultValue>> resultSequence = thresholdRunner.run(query, responseContext);
final TopNQuery topNQuery = (TopNQuery) query;
final Sequence<Result<TopNResultValue>> resultSequence = thresholdRunner.run(queryPlus, responseContext);
final TopNQuery topNQuery = (TopNQuery) queryPlus.getQuery();
if (!TopNQueryEngine.canApplyExtractionInPost(topNQuery)) {
return resultSequence;
} else {
@ -521,10 +523,11 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
@Override
public Sequence<Result<TopNResultValue>> run(
Query<Result<TopNResultValue>> input,
QueryPlus<Result<TopNResultValue>> queryPlus,
Map<String, Object> responseContext
)
{
Query<Result<TopNResultValue>> input = queryPlus.getQuery();
if (!(input instanceof TopNQuery)) {
throw new ISE("Can only handle [%s], got [%s]", TopNQuery.class, input.getClass());
}
@ -532,13 +535,13 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
final TopNQuery query = (TopNQuery) input;
final int minTopNThreshold = query.getContextValue("minTopNThreshold", config.getMinTopNThreshold());
if (query.getThreshold() > minTopNThreshold) {
return runner.run(query, responseContext);
return runner.run(queryPlus, responseContext);
}
final boolean isBySegment = QueryContexts.isBySegment(query);
return Sequences.map(
runner.run(query.withThreshold(minTopNThreshold), responseContext),
runner.run(queryPlus.withQuery(query.withThreshold(minTopNThreshold)), responseContext),
new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
@Override

View File

@ -25,7 +25,7 @@ import io.druid.guice.annotations.Global;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
@ -65,15 +65,15 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory<Result<TopNRes
{
@Override
public Sequence<Result<TopNResultValue>> run(
Query<Result<TopNResultValue>> input,
QueryPlus<Result<TopNResultValue>> input,
Map<String, Object> responseContext
)
{
if (!(input instanceof TopNQuery)) {
if (!(input.getQuery() instanceof TopNQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TopNQuery.class);
}
return queryEngine.query((TopNQuery) input, segment.asStorageAdapter());
return queryEngine.query((TopNQuery) input.getQuery(), segment.asStorageAdapter());
}
};

View File

@ -61,7 +61,7 @@ public class AsyncQueryRunnerTest
QueryRunner baseRunner = new QueryRunner()
{
@Override
public Sequence run(Query query, Map responseContext)
public Sequence run(QueryPlus queryPlus, Map responseContext)
{
try {
latch.await();
@ -85,7 +85,7 @@ public class AsyncQueryRunnerTest
QueryRunner baseRunner = new QueryRunner()
{
@Override
public Sequence run(Query query, Map responseContext)
public Sequence run(QueryPlus queryPlus, Map responseContext)
{
try {
Thread.sleep(Long.MAX_VALUE);
@ -117,7 +117,7 @@ public class AsyncQueryRunnerTest
QueryRunner baseRunner = new QueryRunner()
{
@Override
public Sequence run(Query query, Map responseContext) { return null; }
public Sequence run(QueryPlus queryPlus, Map responseContext) { return null; }
};
QueryWatcher mock = EasyMock.createMock(QueryWatcher.class);

View File

@ -330,7 +330,7 @@ public class ChainedExecutionQueryRunnerTest
}
@Override
public Sequence<Integer> run(Query<Integer> query, Map<String, Object> responseContext)
public Sequence<Integer> run(QueryPlus<Integer> queryPlus, Map<String, Object> responseContext)
{
// do a lot of work
synchronized (this) {

View File

@ -60,13 +60,13 @@ public class IntervalChunkingQueryRunnerTest
@Test
public void testDefaultNoChunking() {
Query query = queryBuilder.intervals("2014/2016").build();
QueryPlus queryPlus = QueryPlus.wrap(queryBuilder.intervals("2014/2016").build());
EasyMock.expect(baseRunner.run(query, Collections.EMPTY_MAP)).andReturn(Sequences.empty());
EasyMock.expect(baseRunner.run(queryPlus, Collections.EMPTY_MAP)).andReturn(Sequences.empty());
EasyMock.replay(baseRunner);
QueryRunner runner = decorator.decorate(baseRunner, toolChest);
runner.run(query, Collections.EMPTY_MAP);
runner.run(queryPlus, Collections.EMPTY_MAP);
EasyMock.verify(baseRunner);
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import com.google.common.collect.ImmutableList;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
/**
* Tests that if a QueryRunner overrides a legacy {@link QueryRunner#run(Query, Map)} method, it still works. This
* test should be removed when {@link QueryRunner#run(Query, Map)} is removed.
*/
public class LegacyApiQueryRunnerTest
{
private static class LegacyApiQueryRunner<T> implements QueryRunner<T>
{
/**
* Overrides legacy API.
*/
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
return Sequences.empty();
}
}
@Test
public void testQueryRunnerLegacyApi()
{
final Query query = new TestQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(ImmutableList.of(new Interval("0/100"))),
false,
new HashMap()
);
Map<String, Object> context = new HashMap<>();
Assert.assertEquals(Sequences.empty(), new LegacyApiQueryRunner<>().run(QueryPlus.wrap(query), context));
}
}

View File

@ -21,67 +21,15 @@ package io.druid.query;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.query.filter.DimFilter;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class QueryContextsTest
{
private static class TestQuery extends BaseQuery
{
public TestQuery(DataSource dataSource, QuerySegmentSpec querySegmentSpec, boolean descending, Map context)
{
super(dataSource, querySegmentSpec, descending, context);
}
@Override
public boolean hasFilters()
{
return false;
}
@Override
public DimFilter getFilter()
{
return null;
}
@Override
public String getType()
{
return null;
}
@Override
public Query withQuerySegmentSpec(QuerySegmentSpec spec)
{
return null;
}
@Override
public Query withDataSource(DataSource dataSource)
{
return null;
}
@Override
public Query withOverriddenContext(Map contextOverride)
{
return new TestQuery(
getDataSource(),
getQuerySegmentSpec(),
isDescending(),
BaseQuery.computeOverriddenContext(getContext(), contextOverride)
);
}
}
@Test
public void testDefaultQueryTimeout()

View File

@ -501,9 +501,9 @@ public class QueryRunnerTestHelper
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
return qr.run(query, responseContext);
return qr.run(queryPlus, responseContext);
}
@Override
@ -526,8 +526,9 @@ public class QueryRunnerTestHelper
new QueryRunner<T>()
{
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
Query<T> query = queryPlus.getQuery();
List<TimelineObjectHolder> segments = Lists.newArrayList();
for (Interval interval : query.getIntervals()) {
segments.addAll(timeline.lookup(interval));
@ -535,7 +536,7 @@ public class QueryRunnerTestHelper
List<Sequence<T>> sequences = Lists.newArrayList();
for (TimelineObjectHolder<String, Segment> holder : toolChest.filterSegments(query, segments)) {
Segment segment = holder.getObject().getChunk(0).getObject();
Query running = query.withQuerySegmentSpec(
QueryPlus queryPlusRunning = queryPlus.withQuerySegmentSpec(
new SpecificSegmentSpec(
new SegmentDescriptor(
holder.getInterval(),
@ -544,7 +545,7 @@ public class QueryRunnerTestHelper
)
)
);
sequences.add(factory.createRunner(segment).run(running, responseContext));
sequences.add(factory.createRunner(segment).run(queryPlusRunning, responseContext));
}
return new MergeSequence<>(query.getResultOrdering(), Sequences.simple(sequences));
}
@ -568,9 +569,9 @@ public class QueryRunnerTestHelper
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
return delegate.run(query, responseContext);
return delegate.run(queryPlus, responseContext);
}
};
}

View File

@ -71,7 +71,7 @@ public class RetryQueryRunnerTest
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(Query query, Map context)
public Sequence<Result<TimeseriesResultValue>> run(QueryPlus queryPlus, Map context)
{
((List) context.get(Result.MISSING_SEGMENTS_KEY)).add(
new SegmentDescriptor(
@ -128,7 +128,7 @@ public class RetryQueryRunnerTest
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query,
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> context
)
{
@ -195,7 +195,7 @@ public class RetryQueryRunnerTest
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query,
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> context
)
{
@ -261,7 +261,7 @@ public class RetryQueryRunnerTest
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query,
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> context
)
{
@ -313,10 +313,11 @@ public class RetryQueryRunnerTest
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query,
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> context
)
{
final Query<Result<TimeseriesResultValue>> query = queryPlus.getQuery();
if ((int) context.get("count") == 0) {
// assume 2 missing segments at first run
((List) context.get(Result.MISSING_SEGMENTS_KEY)).add(

View File

@ -0,0 +1,75 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import io.druid.query.filter.DimFilter;
import io.druid.query.spec.QuerySegmentSpec;
import java.util.Map;
class TestQuery extends BaseQuery
{
public TestQuery(DataSource dataSource, QuerySegmentSpec querySegmentSpec, boolean descending, Map context)
{
super(dataSource, querySegmentSpec, descending, context);
}
@Override
public boolean hasFilters()
{
return false;
}
@Override
public DimFilter getFilter()
{
return null;
}
@Override
public String getType()
{
return null;
}
@Override
public Query withQuerySegmentSpec(QuerySegmentSpec spec)
{
return null;
}
@Override
public Query withDataSource(DataSource dataSource)
{
return null;
}
@Override
public Query withOverriddenContext(Map contextOverride)
{
return new TestQuery(
getDataSource(),
getQuerySegmentSpec(),
isDescending(),
BaseQuery.computeOverriddenContext(getContext(), contextOverride)
);
}
}

View File

@ -81,7 +81,7 @@ public class TimewarpOperatorTest
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query,
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> responseContext
)
{
@ -96,7 +96,7 @@ public class TimewarpOperatorTest
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 3))
),
new Result<>(
query.getIntervals().get(0).getEnd(),
queryPlus.getQuery().getIntervals().get(0).getEnd(),
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 5))
)
)
@ -143,7 +143,7 @@ public class TimewarpOperatorTest
{
@Override
public Sequence<Result<TimeBoundaryResultValue>> run(
Query<Result<TimeBoundaryResultValue>> query,
QueryPlus<Result<TimeBoundaryResultValue>> queryPlus,
Map<String, Object> responseContext
)
{
@ -193,10 +193,11 @@ public class TimewarpOperatorTest
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query,
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> responseContext
)
{
final Query<Result<TimeseriesResultValue>> query = queryPlus.getQuery();
return Sequences.simple(
ImmutableList.of(
new Result<>(

View File

@ -39,11 +39,11 @@ public class UnionQueryRunnerTest
QueryRunner baseRunner = new QueryRunner()
{
@Override
public Sequence run(Query query, Map responseContext)
public Sequence run(QueryPlus queryPlus, Map responseContext)
{
// verify that table datasource is passed to baseQueryRunner
Assert.assertTrue(query.getDataSource() instanceof TableDataSource);
String dsName = Iterables.getOnlyElement(query.getDataSource().getNames());
Assert.assertTrue(queryPlus.getQuery().getDataSource() instanceof TableDataSource);
String dsName = Iterables.getOnlyElement(queryPlus.getQuery().getDataSource().getNames());
if (dsName.equals("ds1")) {
responseContext.put("ds1", "ds1");
return Sequences.simple(Arrays.asList(1, 2, 3));
@ -70,7 +70,7 @@ public class UnionQueryRunnerTest
.aggregators(QueryRunnerTestHelper.commonAggregators)
.build();
Map<String, Object> responseContext = Maps.newHashMap();
Sequence result = runner.run(q, responseContext);
Sequence<?> result = runner.run(q, responseContext);
List res = Sequences.toList(result, Lists.newArrayList());
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), res);

View File

@ -50,6 +50,7 @@ import io.druid.java.util.common.guava.Yielder;
import io.druid.java.util.common.guava.YieldingAccumulator;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
@ -535,10 +536,10 @@ public class AggregationTestHelper
return new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(Query<Row> query, Map<String, Object> map)
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> map)
{
try {
Sequence<Row> resultSeq = baseRunner.run(query, Maps.<String, Object>newHashMap());
Sequence<Row> resultSeq = baseRunner.run(queryPlus, Maps.<String, Object>newHashMap());
final Yielder yielder = resultSeq.toYielder(
null,
new YieldingAccumulator()
@ -559,7 +560,7 @@ public class AggregationTestHelper
List resultRows = Lists.transform(
readQueryResultArrayFromString(resultStr),
toolChest.makePreComputeManipulatorFn(
query,
queryPlus.getQuery(),
MetricManipulatorFns.deserializing()
)
);

View File

@ -33,6 +33,7 @@ import io.druid.java.util.common.guava.MergeSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.aggregation.AggregatorFactory;
@ -86,21 +87,22 @@ public class GroupByQueryRunnerFactoryTest
new QueryRunner()
{
@Override
public Sequence run(Query query, Map responseContext)
public Sequence run(QueryPlus queryPlus, Map responseContext)
{
return factory.getToolchest().mergeResults(
new QueryRunner()
{
@Override
public Sequence run(Query query, Map responseContext)
public Sequence run(QueryPlus queryPlus, Map responseContext)
{
final Query query = queryPlus.getQuery();
try {
return new MergeSequence(
query.getResultOrdering(),
Sequences.simple(
Arrays.asList(
factory.createRunner(createSegment()).run(query, responseContext),
factory.createRunner(createSegment()).run(query, responseContext)
factory.createRunner(createSegment()).run(queryPlus, responseContext),
factory.createRunner(createSegment()).run(queryPlus, responseContext)
)
)
);
@ -110,7 +112,7 @@ public class GroupByQueryRunnerFactoryTest
}
}
}
).run(query, responseContext);
).run(queryPlus, responseContext);
}
}
);

View File

@ -51,9 +51,9 @@ import io.druid.query.BySegmentResultValueClass;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest;
@ -2353,20 +2353,20 @@ public class GroupByQueryRunnerTest
{
@Override
public Sequence<Row> run(
Query<Row> query, Map<String, Object> responseContext
QueryPlus<Row> queryPlus, Map<String, Object> responseContext
)
{
// simulate two daily segments
final Query query1 = query.withQuerySegmentSpec(
final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03")))
);
final Query query2 = query.withQuerySegmentSpec(
final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
);
return new MergeSequence(
query.getResultOrdering(),
queryPlus.getQuery().getResultOrdering(),
Sequences.simple(
Arrays.asList(runner.run(query1, responseContext), runner.run(query2, responseContext))
Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext))
)
);
}
@ -2649,20 +2649,20 @@ public class GroupByQueryRunnerTest
{
@Override
public Sequence<Row> run(
Query<Row> query, Map<String, Object> responseContext
QueryPlus<Row> queryPlus, Map<String, Object> responseContext
)
{
// simulate two daily segments
final Query query1 = query.withQuerySegmentSpec(
final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03")))
);
final Query query2 = query.withQuerySegmentSpec(
final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
);
return new MergeSequence(
query.getResultOrdering(),
queryPlus.getQuery().getResultOrdering(),
Sequences.simple(
Arrays.asList(runner.run(query1, responseContext), runner.run(query2, responseContext))
Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext))
)
);
}
@ -3437,20 +3437,20 @@ public class GroupByQueryRunnerTest
{
@Override
public Sequence<Row> run(
Query<Row> query, Map<String, Object> responseContext
QueryPlus<Row> queryPlus, Map<String, Object> responseContext
)
{
// simulate two daily segments
final Query query1 = query.withQuerySegmentSpec(
final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03")))
);
final Query query2 = query.withQuerySegmentSpec(
final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
);
return new MergeSequence(
query.getResultOrdering(),
queryPlus.getQuery().getResultOrdering(),
Sequences.simple(
Arrays.asList(runner.run(query1, responseContext), runner.run(query2, responseContext))
Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext))
)
);
}
@ -3770,20 +3770,20 @@ public class GroupByQueryRunnerTest
{
@Override
public Sequence<Row> run(
Query<Row> query, Map<String, Object> responseContext
QueryPlus<Row> queryPlus, Map<String, Object> responseContext
)
{
// simulate two daily segments
final Query query1 = query.withQuerySegmentSpec(
final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03")))
);
final Query query2 = query.withQuerySegmentSpec(
final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
);
return new MergeSequence(
query.getResultOrdering(),
queryPlus.getQuery().getResultOrdering(),
Sequences.simple(
Arrays.asList(runner.run(query1, responseContext), runner.run(query2, responseContext))
Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext))
)
);
}
@ -3879,20 +3879,20 @@ public class GroupByQueryRunnerTest
{
@Override
public Sequence<Row> run(
Query<Row> query, Map<String, Object> responseContext
QueryPlus<Row> queryPlus, Map<String, Object> responseContext
)
{
// simulate two daily segments
final Query query1 = query.withQuerySegmentSpec(
final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03")))
);
final Query query2 = query.withQuerySegmentSpec(
final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
);
return new MergeSequence(
query.getResultOrdering(),
queryPlus.getQuery().getResultOrdering(),
Sequences.simple(
Arrays.asList(runner.run(query1, responseContext), runner.run(query2, responseContext))
Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext))
)
);
}

View File

@ -30,7 +30,7 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest;
@ -76,9 +76,9 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
return new QueryRunner()
{
@Override
public Sequence run(Query query, Map responseContext)
public Sequence run(QueryPlus queryPlus, Map responseContext)
{
TimeseriesQuery tsQuery = (TimeseriesQuery) query;
TimeseriesQuery tsQuery = (TimeseriesQuery) queryPlus.getQuery();
QueryRunner<Row> newRunner = factory.mergeRunners(
MoreExecutors.sameThreadExecutor(), ImmutableList.<QueryRunner<Row>>of(input)
);

View File

@ -30,6 +30,7 @@ import io.druid.java.util.common.logger.Logger;
import io.druid.js.JavaScriptConfig;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
@ -168,16 +169,16 @@ public class SearchQueryRunnerTest
{
@Override
public Sequence<Result<SearchResultValue>> run(
Query<Result<SearchResultValue>> query, Map<String, Object> responseContext
QueryPlus<Result<SearchResultValue>> queryPlus, Map<String, Object> responseContext
)
{
final Query<Result<SearchResultValue>> query1 = searchQuery.withQuerySegmentSpec(
final QueryPlus<Result<SearchResultValue>> queryPlus1 = queryPlus.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-01-12/2011-02-28")))
);
final Query<Result<SearchResultValue>> query2 = searchQuery.withQuerySegmentSpec(
final QueryPlus<Result<SearchResultValue>> queryPlus2 = queryPlus.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-03-01/2011-04-15")))
);
return Sequences.concat(runner.run(query1, responseContext), runner.run(query2, responseContext));
return Sequences.concat(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext));
}
}
);

View File

@ -32,7 +32,7 @@ import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.guava.Yielder;
import io.druid.java.util.common.guava.YieldingAccumulator;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.Result;
import io.druid.query.SegmentDescriptor;
@ -68,7 +68,7 @@ public class SpecificSegmentQueryRunnerTest
new QueryRunner()
{
@Override
public Sequence run(Query query, Map responseContext)
public Sequence run(QueryPlus queryPlus, Map responseContext)
{
return new Sequence()
{
@ -151,7 +151,7 @@ public class SpecificSegmentQueryRunnerTest
new QueryRunner()
{
@Override
public Sequence run(Query query, Map responseContext)
public Sequence run(QueryPlus queryPlus, Map responseContext)
{
return Sequences.withEffect(
Sequences.simple(Arrays.asList(value)),

View File

@ -25,7 +25,7 @@ import com.google.common.collect.Maps;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest;
@ -186,11 +186,11 @@ public class TimeSeriesUnionQueryRunnerTest
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
Query<Result<TimeseriesResultValue>> query,
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
Map<String, Object> responseContext
)
{
if (query.getDataSource().equals(new TableDataSource("ds1"))) {
if (queryPlus.getQuery().getDataSource().equals(new TableDataSource("ds1"))) {
return Sequences.simple(descending ? Lists.reverse(ds1) : ds1);
} else {
return Sequences.simple(descending ? Lists.reverse(ds2) : ds2);

View File

@ -27,7 +27,7 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
@ -243,11 +243,11 @@ public class TopNQueryQueryToolChestTest
@Override
public Sequence<Result<TopNResultValue>> run(
Query<Result<TopNResultValue>> query,
QueryPlus<Result<TopNResultValue>> queryPlus,
Map<String, Object> responseContext
)
{
this.query = (TopNQuery) query;
this.query = (TopNQuery) queryPlus.getQuery();
return query.run(runner, responseContext);
}
}

View File

@ -59,6 +59,7 @@ import io.druid.query.BySegmentResultValueClass;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
@ -139,8 +140,9 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
final Query<T> query = queryPlus.getQuery();
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
final CacheStrategy<T, Object, Query<T>> strategy = toolChest.getCacheStrategy(query);
@ -429,17 +431,12 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final Sequence<T> resultSeqToAdd;
if (!server.isAssignable() || !populateCache || isBySegment) { // Direct server queryable
if (!isBySegment) {
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext);
resultSeqToAdd = clientQueryable.run(queryPlus.withQuerySegmentSpec(segmentSpec), responseContext);
} else {
// bySegment queries need to be de-serialized, see DirectDruidClient.run()
@SuppressWarnings("unchecked")
final Query<Result<BySegmentResultValueClass<T>>> bySegmentQuery =
(Query<Result<BySegmentResultValueClass<T>>>) ((Query) query);
@SuppressWarnings("unchecked")
final Sequence<Result<BySegmentResultValueClass<T>>> resultSequence = clientQueryable.run(
bySegmentQuery.withQuerySegmentSpec(segmentSpec),
queryPlus.withQuerySegmentSpec(segmentSpec),
responseContext
);
@ -472,7 +469,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
} else { // Requires some manipulation on broker side
@SuppressWarnings("unchecked")
final Sequence<Result<BySegmentResultValueClass<T>>> runningSequence = clientQueryable.run(
rewrittenQuery.withQuerySegmentSpec(segmentSpec),
queryPlus.withQuery(rewrittenQuery.withQuerySegmentSpec(segmentSpec)),
responseContext
);
resultSeqToAdd = new MergeSequence(

View File

@ -38,6 +38,7 @@ import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;
@ -83,8 +84,9 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
Query<T> query = queryPlus.getQuery();
final CacheStrategy strategy = toolChest.getCacheStrategy(query);
final boolean populateCache = CacheUtil.populateCacheOnDataNodes(query, strategy, cacheConfig);
final boolean useCache = CacheUtil.useCacheOnDataNodes(query, strategy, cacheConfig);
@ -145,7 +147,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
return Sequences.withEffect(
Sequences.map(
base.run(query, responseContext),
base.run(queryPlus, responseContext),
new Function<T, T>()
{
@Override
@ -190,7 +192,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
backgroundExecutorService
);
} else {
return base.run(query, responseContext);
return base.run(queryPlus, responseContext);
}
}

View File

@ -55,6 +55,7 @@ import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
@ -131,8 +132,9 @@ public class DirectDruidClient<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> context)
{
final Query<T> query = queryPlus.getQuery();
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
boolean isBySegment = QueryContexts.isBySegment(query);

View File

@ -31,16 +31,16 @@ import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.metamx.emitter.EmittingLogger;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.concurrent.Execs;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
@ -54,7 +54,6 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.VersioningPolicy;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
@ -169,9 +168,9 @@ public class AppenderatorPlumber implements Plumber
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
return query.run(appenderator, responseContext);
return queryPlus.run(appenderator, responseContext);
}
};
}

View File

@ -43,6 +43,7 @@ import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryPlus;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
@ -227,7 +228,7 @@ public class QueryResource implements QueryCountStatsProvider
}
final Map<String, Object> responseContext = new MapMaker().makeMap();
final Sequence res = query.run(texasRanger, responseContext);
final Sequence res = QueryPlus.wrap(query).run(texasRanger, responseContext);
if (prevEtag != null && prevEtag.equals(responseContext.get(HDR_ETAG))) {
return Response.notModified().build();

View File

@ -71,6 +71,7 @@ import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest;
@ -1781,7 +1782,7 @@ public class CachingClusteredClientTest
timeline.add(interval2, "v", new StringPartitionChunk<>("d", null, 5, selector5));
timeline.add(interval3, "v", new StringPartitionChunk<>(null, null, 6, selector6));
final Capture<TimeseriesQuery> capture = Capture.newInstance();
final Capture<QueryPlus> capture = Capture.newInstance();
final Capture<Map<String, List>> contextCap = Capture.newInstance();
QueryRunner mockRunner = EasyMock.createNiceMock(QueryRunner.class);
@ -1801,12 +1802,9 @@ public class CachingClusteredClientTest
descriptors.add(new SegmentDescriptor(interval3, "v", 6));
MultipleSpecificSegmentSpec expected = new MultipleSpecificSegmentSpec(descriptors);
Sequences.toList(runner.run(
query,
context
), Lists.newArrayList());
Sequences.toList(runner.run(QueryPlus.wrap(query), context), Lists.newArrayList());
Assert.assertEquals(expected, capture.getValue().getQuerySegmentSpec());
Assert.assertEquals(expected, ((TimeseriesQuery) capture.getValue().getQuery()).getQuerySegmentSpec());
}
private ServerSelector makeMockSingleDimensionSelector(
@ -1923,7 +1921,7 @@ public class CachingClusteredClientTest
.andReturn(expectations.getQueryRunner())
.times(0, 1);
final Capture<? extends Query> capture = new Capture();
final Capture<? extends QueryPlus> capture = new Capture();
final Capture<? extends Map> context = new Capture();
QueryRunner queryable = expectations.getQueryRunner();
@ -1940,7 +1938,7 @@ public class CachingClusteredClientTest
@Override
public Sequence answer() throws Throwable
{
return toFilteredQueryableTimeseriesResults((TimeseriesQuery)capture.getValue(), segmentIds, queryIntervals, results);
return toFilteredQueryableTimeseriesResults((TimeseriesQuery)capture.getValue().getQuery(), segmentIds, queryIntervals, results);
}
})
.times(0, 1);
@ -1965,10 +1963,12 @@ public class CachingClusteredClientTest
TestHelper.assertExpectedResults(
expected,
runner.run(
query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(
ImmutableList.of(
actualQueryInterval
QueryPlus.wrap(
query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(
ImmutableList.of(
actualQueryInterval
)
)
)
),
@ -2062,7 +2062,7 @@ public class CachingClusteredClientTest
.andReturn(expectations.getQueryRunner())
.once();
final Capture<? extends Query> capture = new Capture();
final Capture<? extends QueryPlus> capture = new Capture();
final Capture<? extends Map> context = new Capture();
queryCaptures.add(capture);
QueryRunner queryable = expectations.getQueryRunner();
@ -2210,7 +2210,8 @@ public class CachingClusteredClientTest
// make sure all the queries were sent down as 'bySegment'
for (Capture queryCapture : queryCaptures) {
Query capturedQuery = (Query) queryCapture.getValue();
QueryPlus capturedQueryPlus = (QueryPlus) queryCapture.getValue();
Query capturedQuery = capturedQueryPlus.getQuery();
if (expectBySegment) {
Assert.assertEquals(true, capturedQuery.getContextValue("bySegment"));
} else {

View File

@ -39,6 +39,7 @@ import io.druid.java.util.common.guava.Sequences;
import io.druid.query.CacheStrategy;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest;
@ -269,7 +270,7 @@ public class CachingQueryRunnerTest
new QueryRunner()
{
@Override
public Sequence run(Query query, Map responseContext)
public Sequence run(QueryPlus queryPlus, Map responseContext)
{
return resultSeq;
}
@ -362,7 +363,7 @@ public class CachingQueryRunnerTest
new QueryRunner()
{
@Override
public Sequence run(Query query, Map responseContext)
public Sequence run(QueryPlus queryPlus, Map responseContext)
{
return Sequences.empty();
}

View File

@ -32,6 +32,7 @@ import io.druid.java.util.common.guava.Sequences;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
@ -95,11 +96,9 @@ public class QueryResourceTest
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(
Query<T> query, Map<String, Object> responseContext
)
public Sequence<T> run(QueryPlus<T> query, Map<String, Object> responseContext)
{
return Sequences.<T>empty();
return Sequences.empty();
}
};
}

View File

@ -47,6 +47,7 @@ import io.druid.query.Druids;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
@ -679,9 +680,9 @@ public class ServerManagerTest
}
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
return new BlockingSequence<T>(runner.run(query, responseContext), waitLatch, waitYieldLatch, notifyLatch);
return new BlockingSequence<>(runner.run(queryPlus, responseContext), waitLatch, waitYieldLatch, notifyLatch);
}
}

View File

@ -51,6 +51,7 @@ import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
@ -471,7 +472,7 @@ public class DumpSegment extends GuiceRunnable
final QueryRunner<T> runner = factory.createRunner(new QueryableIndexSegment("segment", index));
final Sequence results = factory.getToolchest().mergeResults(
factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.<QueryRunner>of(runner))
).run(query, Maps.<String, Object>newHashMap());
).run(QueryPlus.wrap(query), Maps.<String, Object>newHashMap());
return (Sequence<T>) results;
}

View File

@ -32,6 +32,7 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.DataSource;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryPlus;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.Result;
import io.druid.query.dimension.DimensionSpec;
@ -193,7 +194,7 @@ public class QueryMaker
return Sequences.concat(
Sequences.map(
queryWithPagination.run(walker, Maps.<String, Object>newHashMap()),
QueryPlus.wrap(queryWithPagination).run(walker, Maps.<String, Object>newHashMap()),
new Function<Result<SelectResultValue>, Sequence<Object[]>>()
{
@Override
@ -264,7 +265,7 @@ public class QueryMaker
Hook.QUERY_PLAN.run(query);
return Sequences.map(
query.run(walker, Maps.<String, Object>newHashMap()),
QueryPlus.wrap(query).run(walker, Maps.<String, Object>newHashMap()),
new Function<Result<TimeseriesResultValue>, Object[]>()
{
@Override
@ -299,7 +300,7 @@ public class QueryMaker
return Sequences.concat(
Sequences.map(
query.run(walker, Maps.<String, Object>newHashMap()),
QueryPlus.wrap(query).run(walker, Maps.<String, Object>newHashMap()),
new Function<Result<TopNResultValue>, Sequence<Object[]>>()
{
@Override
@ -335,7 +336,7 @@ public class QueryMaker
Hook.QUERY_PLAN.run(query);
return Sequences.map(
query.run(walker, Maps.<String, Object>newHashMap()),
QueryPlus.wrap(query).run(walker, Maps.<String, Object>newHashMap()),
new Function<io.druid.data.input.Row, Object[]>()
{
@Override

View File

@ -42,6 +42,7 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.query.QueryPlus;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.TableDataSource;
import io.druid.query.metadata.metadata.ColumnAnalysis;
@ -305,7 +306,7 @@ public class DruidSchema extends AbstractSchema
true
);
final Sequence<SegmentAnalysis> sequence = segmentMetadataQuery.run(walker, Maps.<String, Object>newHashMap());
final Sequence<SegmentAnalysis> sequence = QueryPlus.wrap(segmentMetadataQuery).run(walker, Maps.newHashMap());
final List<SegmentAnalysis> results = Sequences.toList(sequence, Lists.<SegmentAnalysis>newArrayList());
if (results.isEmpty()) {
return null;

View File

@ -32,6 +32,7 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
@ -110,8 +111,9 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
new QueryRunner<T>()
{
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
Query<T> query = queryPlus.getQuery();
final VersionedIntervalTimeline<String, Segment> timeline = getTimelineForTableDataSource(query);
return makeBaseRunner(
query,
@ -154,7 +156,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
}
}
)
).run(query, responseContext);
).run(queryPlus, responseContext);
}
}
)