diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/QueryBenchmarkUtil.java b/benchmarks/src/main/java/io/druid/benchmark/query/QueryBenchmarkUtil.java index 662b0ed71e8..a06bb2ba975 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/QueryBenchmarkUtil.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/QueryBenchmarkUtil.java @@ -25,6 +25,7 @@ import io.druid.query.BySegmentQueryRunner; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -58,9 +59,9 @@ public class QueryBenchmarkUtil QueryToolChest> toolChest) { return new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - return delegate.run(query, responseContext); + return delegate.run(queryPlus, responseContext); } }; } diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/SerializingQueryRunner.java b/benchmarks/src/main/java/io/druid/benchmark/query/SerializingQueryRunner.java index 25655968552..7b70beb75d0 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/SerializingQueryRunner.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/SerializingQueryRunner.java @@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; -import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import java.util.Map; @@ -47,12 +47,12 @@ public class SerializingQueryRunner implements QueryRunner @Override public Sequence run( - final Query query, + final QueryPlus queryPlus, final Map responseContext ) { return Sequences.map( - baseRunner.run(query, responseContext), + baseRunner.run(queryPlus, responseContext), new Function() { @Override diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java index f102d1c3ec6..d2f07a17da3 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java @@ -22,6 +22,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Yielder; import io.druid.java.util.common.guava.YieldingAccumulator; import io.druid.java.util.common.parsers.CloseableIterator; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import java.io.IOException; @@ -36,13 +37,15 @@ public class ScanQueryLimitRowIterator implements CloseableIterator baseRunner, ScanQuery query, + QueryRunner baseRunner, + QueryPlus queryPlus, Map responseContext ) { + ScanQuery query = (ScanQuery) queryPlus.getQuery(); resultFormat = query.getResultFormat(); limit = query.getLimit(); - Sequence baseSequence = baseRunner.run(query, responseContext); + Sequence baseSequence = baseRunner.run(queryPlus, responseContext); yielder = baseSequence.toYielder( null, new YieldingAccumulator() diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java index 8cc1c7e3ffc..26edb06c9fd 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java @@ -25,9 +25,10 @@ import com.google.inject.Inject; import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.Sequence; +import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; import io.druid.query.QueryMetrics; -import io.druid.query.GenericQueryMetricsFactory; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.aggregation.MetricManipulationFn; @@ -55,12 +56,12 @@ public class ScanQueryQueryToolChest extends QueryToolChest run( - final Query query, final Map responseContext + final QueryPlus queryPlus, final Map responseContext ) { - ScanQuery scanQuery = (ScanQuery) query; + ScanQuery scanQuery = (ScanQuery) queryPlus.getQuery(); if (scanQuery.getLimit() == Long.MAX_VALUE) { - return runner.run(query, responseContext); + return runner.run(queryPlus, responseContext); } return new BaseSequence<>( new BaseSequence.IteratorMaker() @@ -68,7 +69,7 @@ public class ScanQueryQueryToolChest extends QueryToolChest run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { - ScanQuery scanQuery = (ScanQuery) query; + ScanQuery scanQuery = (ScanQuery) queryPlus.getQuery(); if (scanQuery.getDimensionsFilter() != null) { scanQuery = scanQuery.withDimFilter(scanQuery.getDimensionsFilter().optimize()); + queryPlus = queryPlus.withQuery(scanQuery); } - return runner.run(scanQuery, responseContext); + return runner.run(queryPlus, responseContext); } }; } diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java index fcfe11b4366..0fb7d16a03c 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java @@ -26,6 +26,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.Query; import io.druid.query.QueryContexts; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -70,12 +71,12 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory run( - final Query query, final Map responseContext + final QueryPlus queryPlus, final Map responseContext ) { // Note: this variable is effective only when queryContext has a timeout. // See the comment of CTX_TIMEOUT_AT. - final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(query); + final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery()); responseContext.put(CTX_TIMEOUT_AT, timeoutAt); return Sequences.concat( Sequences.map( @@ -85,7 +86,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory apply(final QueryRunner input) { - return input.run(query, responseContext); + return input.run(queryPlus, responseContext); } } ) @@ -113,9 +114,10 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { + Query query = queryPlus.getQuery(); if (!(query instanceof ScanQuery)) { throw new ISE("Got a [%s] which isn't a %s", query.getClass(), ScanQuery.class); } diff --git a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java index 8f088ad8a68..254f4dceda9 100644 --- a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java +++ b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java @@ -28,7 +28,7 @@ import io.druid.java.util.common.guava.MergeSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.DefaultGenericQueryMetricsFactory; -import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; @@ -214,15 +214,15 @@ public class MultiSegmentScanQueryTest new QueryRunner() { @Override public Sequence run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { // simulate results back from 2 historicals List> sequences = Lists.newArrayListWithExpectedSize(2); - sequences.add(factory.createRunner(segment0).run(query, new HashMap())); - sequences.add(factory.createRunner(segment1).run(query, new HashMap())); + sequences.add(factory.createRunner(segment0).run(queryPlus, new HashMap())); + sequences.add(factory.createRunner(segment1).run(queryPlus, new HashMap())); return new MergeSequence<>( - query.getResultOrdering(), + queryPlus.getQuery().getResultOrdering(), Sequences.simple(sequences) ); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 8edda9e52b9..a172530fbf0 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -57,6 +57,7 @@ import io.druid.java.util.common.parsers.ParseException; import io.druid.query.DruidMetrics; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; @@ -626,9 +627,9 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler return new QueryRunner() { @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { - return query.run(appenderator, responseContext); + return queryPlus.run(appenderator, responseContext); } }; } diff --git a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java index ce5f1fc64b1..8de2a2b17d7 100644 --- a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java +++ b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java @@ -47,8 +47,9 @@ public class AsyncQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { + final Query query = queryPlus.getQuery(); final int priority = QueryContexts.getPriority(query); final ListenableFuture> future = executor.submit(new AbstractPrioritizedCallable>(priority) { @@ -57,7 +58,7 @@ public class AsyncQueryRunner implements QueryRunner { //Note: this is assumed that baseRunner does most of the work eagerly on call to the //run() method and resulting sequence accumulate/yield is fast. - return baseRunner.run(query, responseContext); + return baseRunner.run(queryPlus, responseContext); } }); queryWatcher.registerQuery(query, future); diff --git a/processing/src/main/java/io/druid/query/BaseQuery.java b/processing/src/main/java/io/druid/query/BaseQuery.java index 9539a85b377..4a3e4716a59 100644 --- a/processing/src/main/java/io/druid/query/BaseQuery.java +++ b/processing/src/main/java/io/druid/query/BaseQuery.java @@ -92,6 +92,7 @@ public abstract class BaseQuery> implements Query return run(querySegmentSpec.lookup(this, walker), context); } + @Override public Sequence run(QueryRunner runner, Map context) { return runner.run(this, context); diff --git a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java index e57b9471ce3..e3781825aad 100644 --- a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java @@ -49,10 +49,10 @@ public class BySegmentQueryRunner implements QueryRunner @Override @SuppressWarnings("unchecked") - public Sequence run(final Query query, Map responseContext) + public Sequence run(final QueryPlus queryPlus, Map responseContext) { - if (QueryContexts.isBySegment(query)) { - final Sequence baseSequence = base.run(query, responseContext); + if (QueryContexts.isBySegment(queryPlus.getQuery())) { + final Sequence baseSequence = base.run(queryPlus, responseContext); final List results = Sequences.toList(baseSequence, Lists.newArrayList()); return Sequences.simple( Arrays.asList( @@ -61,12 +61,12 @@ public class BySegmentQueryRunner implements QueryRunner new BySegmentResultValueClass( results, segmentIdentifier, - query.getIntervals().get(0) + queryPlus.getQuery().getIntervals().get(0) ) ) ) ); } - return base.run(query, responseContext); + return base.run(queryPlus, responseContext); } } diff --git a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java index 373c2e3b119..5dda6f618a3 100644 --- a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java @@ -38,14 +38,14 @@ public abstract class BySegmentSkippingQueryRunner implements QueryRunner } @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - if (QueryContexts.isBySegment(query)) { - return baseRunner.run(query, responseContext); + if (QueryContexts.isBySegment(queryPlus.getQuery())) { + return baseRunner.run(queryPlus, responseContext); } - return doRun(baseRunner, query, responseContext); + return doRun(baseRunner, queryPlus, responseContext); } - protected abstract Sequence doRun(QueryRunner baseRunner, Query query, Map context); + protected abstract Sequence doRun(QueryRunner baseRunner, QueryPlus queryPlus, Map context); } diff --git a/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java b/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java index 0d42ab983d4..141f2bd075b 100644 --- a/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java +++ b/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java @@ -58,9 +58,11 @@ public class CPUTimeMetricQueryRunner implements QueryRunner @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { - final Sequence baseSequence = delegate.run(query, responseContext); + final QueryPlus queryWithMetrics = + queryPlus.withQueryMetrics((QueryToolChest>) queryToolChest); + final Sequence baseSequence = delegate.run(queryWithMetrics, responseContext); return Sequences.wrap( baseSequence, new SequenceWrapper() @@ -82,13 +84,14 @@ public class CPUTimeMetricQueryRunner implements QueryRunner if (report) { final long cpuTimeNs = cpuTimeAccumulator.get(); if (cpuTimeNs > 0) { - queryToolChest.makeMetrics(query).reportCpuTime(cpuTimeNs).emit(emitter); + queryWithMetrics.getQueryMetrics().reportCpuTime(cpuTimeNs).emit(emitter); } } } } ); } + public static QueryRunner safeBuild( QueryRunner delegate, QueryToolChest> queryToolChest, diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 117d6436082..7ef86f51a4d 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -89,8 +89,9 @@ public class ChainedExecutionQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { + Query query = queryPlus.getQuery(); final int priority = QueryContexts.getPriority(query); final Ordering ordering = query.getResultOrdering(); @@ -121,7 +122,7 @@ public class ChainedExecutionQueryRunner implements QueryRunner public Iterable call() throws Exception { try { - Sequence result = input.run(query, responseContext); + Sequence result = input.run(queryPlus, responseContext); if (result == null) { throw new ISE("Got a null result! Segments are missing!"); } diff --git a/processing/src/main/java/io/druid/query/ConcatQueryRunner.java b/processing/src/main/java/io/druid/query/ConcatQueryRunner.java index e36ec4cdc2d..2237ad8c5b1 100644 --- a/processing/src/main/java/io/druid/query/ConcatQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ConcatQueryRunner.java @@ -38,7 +38,7 @@ public class ConcatQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { return Sequences.concat( Sequences.map( @@ -48,7 +48,7 @@ public class ConcatQueryRunner implements QueryRunner @Override public Sequence apply(final QueryRunner input) { - return input.run(query, responseContext); + return input.run(queryPlus, responseContext); } } ) diff --git a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java index 7fe58ee06b9..acf52b2aea4 100644 --- a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java +++ b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java @@ -47,8 +47,9 @@ public class FinalizeResultsQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, Map responseContext) + public Sequence run(final QueryPlus queryPlus, Map responseContext) { + final Query query = queryPlus.getQuery(); final boolean isBySegment = QueryContexts.isBySegment(query); final boolean shouldFinalize = QueryContexts.isFinalize(query, true); @@ -100,7 +101,7 @@ public class FinalizeResultsQueryRunner implements QueryRunner return Sequences.map( - baseRunner.run(queryToRun, responseContext), + baseRunner.run(queryPlus.withQuery(queryToRun), responseContext), finalizerFn ); diff --git a/processing/src/main/java/io/druid/query/FluentQueryRunnerBuilder.java b/processing/src/main/java/io/druid/query/FluentQueryRunnerBuilder.java index 71bf44451cd..a52da78c9ef 100644 --- a/processing/src/main/java/io/druid/query/FluentQueryRunnerBuilder.java +++ b/processing/src/main/java/io/druid/query/FluentQueryRunnerBuilder.java @@ -49,10 +49,10 @@ public class FluentQueryRunnerBuilder @Override public Sequence run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { - return baseRunner.run(query, responseContext); + return baseRunner.run(queryPlus, responseContext); } public FluentQueryRunner from(QueryRunner runner) { diff --git a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java index 77775f295c6..295e79eee23 100644 --- a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java @@ -78,9 +78,9 @@ public class GroupByMergedQueryRunner implements QueryRunner } @Override - public Sequence run(final Query queryParam, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { - final GroupByQuery query = (GroupByQuery) queryParam; + final GroupByQuery query = (GroupByQuery) queryPlus.getQuery(); final GroupByQueryConfig querySpecificConfig = configSupplier.get().withOverrides(query); final boolean isSingleThreaded = querySpecificConfig.isSingleThreaded(); final Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( @@ -114,10 +114,10 @@ public class GroupByMergedQueryRunner implements QueryRunner { try { if (bySegment) { - input.run(queryParam, responseContext) + input.run(queryPlus, responseContext) .accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs); } else { - input.run(queryParam, responseContext) + input.run(queryPlus, responseContext) .accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); } diff --git a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java index 5e8b529b153..a6b26072f6d 100644 --- a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java @@ -63,18 +63,18 @@ public class IntervalChunkingQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { - final Period chunkPeriod = getChunkPeriod(query); + final Period chunkPeriod = getChunkPeriod(queryPlus.getQuery()); // Check for non-empty chunkPeriod, avoiding toStandardDuration since that cannot handle periods like P1M. if (EPOCH.plus(chunkPeriod).getMillis() == EPOCH.getMillis()) { - return baseRunner.run(query, responseContext); + return baseRunner.run(queryPlus, responseContext); } List chunkIntervals = Lists.newArrayList( FunctionalIterable - .create(query.getIntervals()) + .create(queryPlus.getQuery().getIntervals()) .transformCat( new Function>() { @@ -88,7 +88,7 @@ public class IntervalChunkingQueryRunner implements QueryRunner ); if (chunkIntervals.size() <= 1) { - return baseRunner.run(query, responseContext); + return baseRunner.run(queryPlus, responseContext); } return Sequences.concat( @@ -113,7 +113,7 @@ public class IntervalChunkingQueryRunner implements QueryRunner ), executor, queryWatcher ).run( - query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))), + queryPlus.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))), responseContext ); } diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java index 1f4041973b4..06c563d8971 100644 --- a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java @@ -81,9 +81,10 @@ public class MetricsEmittingQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { - final QueryMetrics> queryMetrics = queryToolChest.makeMetrics(query); + QueryPlus queryWithMetrics = queryPlus.withQueryMetrics((QueryToolChest>) queryToolChest); + final QueryMetrics> queryMetrics = (QueryMetrics>) queryWithMetrics.getQueryMetrics(); applyCustomDimensions.accept(queryMetrics); @@ -91,7 +92,7 @@ public class MetricsEmittingQueryRunner implements QueryRunner // Use LazySequence because want to account execution time of queryRunner.run() (it prepares the underlying // Sequence) as part of the reported query time, i. e. we want to execute queryRunner.run() after // `startTime = System.nanoTime();` (see below). - new LazySequence<>(() -> queryRunner.run(query, responseContext)), + new LazySequence<>(() -> queryRunner.run(queryWithMetrics, responseContext)), new SequenceWrapper() { private long startTimeNs; diff --git a/processing/src/main/java/io/druid/query/NoopQueryRunner.java b/processing/src/main/java/io/druid/query/NoopQueryRunner.java index 3d3aa19b30d..b058a4d8939 100644 --- a/processing/src/main/java/io/druid/query/NoopQueryRunner.java +++ b/processing/src/main/java/io/druid/query/NoopQueryRunner.java @@ -29,7 +29,7 @@ import java.util.Map; public class NoopQueryRunner implements QueryRunner { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { return Sequences.empty(); } diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index 9ad178161ea..4e6745ca6d4 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -70,8 +70,19 @@ public interface Query String getType(); + /** + * @deprecated use {@link QueryPlus#run(QuerySegmentWalker, Map)} instead. This method is going to be removed in Druid + * 0.11. In the future, a method like getRunner(QuerySegmentWalker, Map) could be added instead of this method, so + * that {@link QueryPlus#run(QuerySegmentWalker, Map)} could be implemented as {@code + * this.query.getRunner(walker, context).run(this, context))}. + */ + @Deprecated Sequence run(QuerySegmentWalker walker, Map context); + /** + * @deprecated use {@link QueryRunner#run(QueryPlus, Map)} instead. This method is going to be removed in Druid 0.11. + */ + @Deprecated Sequence run(QueryRunner runner, Map context); List getIntervals(); diff --git a/processing/src/main/java/io/druid/query/QueryPlus.java b/processing/src/main/java/io/druid/query/QueryPlus.java new file mode 100644 index 00000000000..ae697701177 --- /dev/null +++ b/processing/src/main/java/io/druid/query/QueryPlus.java @@ -0,0 +1,104 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.google.common.base.Preconditions; +import io.druid.java.util.common.guava.Sequence; +import io.druid.query.spec.QuerySegmentSpec; + +import javax.annotation.Nullable; +import java.util.Map; + +/** + * An immutable composite object of {@link Query} + extra stuff needed in {@link QueryRunner}s. This "extra stuff" + * is only {@link QueryMetrics} yet. + */ +public final class QueryPlus +{ + /** + * Returns the minimum bare QueryPlus object with the given query. {@link #getQueryMetrics()} of the QueryPlus object, + * returned from this factory method, returns {@code null}. + */ + public static QueryPlus wrap(Query query) + { + Preconditions.checkNotNull(query); + return new QueryPlus<>(query, null); + } + + private final Query query; + private final QueryMetrics queryMetrics; + + private QueryPlus(Query query, QueryMetrics queryMetrics) + { + this.query = query; + this.queryMetrics = queryMetrics; + } + + public Query getQuery() + { + return query; + } + + @Nullable + public QueryMetrics getQueryMetrics() + { + return queryMetrics; + } + + /** + * Returns the same QueryPlus object, if it already has {@link QueryMetrics} ({@link #getQueryMetrics()} returns not + * null), or returns a new QueryPlus object with {@link Query} from this QueryPlus and QueryMetrics created using the + * given {@link QueryToolChest}, via {@link QueryToolChest#makeMetrics(Query)} method. + */ + public QueryPlus withQueryMetrics(QueryToolChest> queryToolChest) + { + if (queryMetrics != null) { + return this; + } else { + return new QueryPlus<>(query, ((QueryToolChest) queryToolChest).makeMetrics(query)); + } + } + + /** + * Equivalent of withQuery(getQuery().withQuerySegmentSpec(spec)). + */ + public QueryPlus withQuerySegmentSpec(QuerySegmentSpec spec) + { + return new QueryPlus<>(query.withQuerySegmentSpec(spec), queryMetrics); + } + + /** + * Returns a QueryPlus object with {@link QueryMetrics} from this QueryPlus object, and the provided {@link Query}. + */ + public QueryPlus withQuery(Query replacementQuery) + { + return new QueryPlus<>(replacementQuery, queryMetrics); + } + + public Sequence run(QuerySegmentWalker walker, Map context) + { + if (query instanceof BaseQuery) { + return ((BaseQuery) query).getQuerySegmentSpec().lookup(query, walker).run(this, context); + } else { + // fallback + return query.run(walker, context); + } + } +} diff --git a/processing/src/main/java/io/druid/query/QueryRunner.java b/processing/src/main/java/io/druid/query/QueryRunner.java index 58cdf9d7081..a7e17f43d24 100644 --- a/processing/src/main/java/io/druid/query/QueryRunner.java +++ b/processing/src/main/java/io/druid/query/QueryRunner.java @@ -24,14 +24,26 @@ import io.druid.java.util.common.guava.Sequence; import java.util.Map; /** + * This interface has two similar run() methods. {@link #run(Query, Map)} is legacy and {@link #run(QueryPlus, Map)} + * is the new one. Their default implementations delegate to each other. Every implementation of QueryRunner should + * override only one of those methods. New implementations should override the new method: {@link #run(QueryPlus, Map)}. */ public interface QueryRunner { /** - * Runs the given query and returns results in a time-ordered sequence - * @param query - * @param responseContext - * @return + * @deprecated use and override {@link #run(QueryPlus, Map)} instead. This method is going to be removed in Druid 0.11 */ - Sequence run(Query query, Map responseContext); + @Deprecated + default Sequence run(Query query, Map responseContext) + { + return run(QueryPlus.wrap(query), responseContext); + } + + /** + * Runs the given query and returns results in a time-ordered sequence. + */ + default Sequence run(QueryPlus queryPlus, Map responseContext) + { + return run(queryPlus.getQuery(), responseContext); + } } diff --git a/processing/src/main/java/io/druid/query/QueryRunnerHelper.java b/processing/src/main/java/io/druid/query/QueryRunnerHelper.java index 5f9a62ccdf2..ed619f2711c 100644 --- a/processing/src/main/java/io/druid/query/QueryRunnerHelper.java +++ b/processing/src/main/java/io/druid/query/QueryRunnerHelper.java @@ -77,9 +77,9 @@ public class QueryRunnerHelper return new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - return Sequences.withBaggage(runner.run(query, responseContext), closeable); + return Sequences.withBaggage(runner.run(queryPlus, responseContext), closeable); } }; } diff --git a/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java index a85a6f91128..d513e079409 100644 --- a/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java @@ -47,12 +47,12 @@ public class ReferenceCountingSegmentQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, Map responseContext) + public Sequence run(final QueryPlus queryPlus, Map responseContext) { final Closeable closeable = adapter.increment(); if (closeable != null) { try { - final Sequence baseSequence = factory.createRunner(adapter).run(query, responseContext); + final Sequence baseSequence = factory.createRunner(adapter).run(queryPlus, responseContext); return Sequences.withBaggage(baseSequence, closeable); } @@ -62,7 +62,7 @@ public class ReferenceCountingSegmentQueryRunner implements QueryRunner } } else { // Segment was closed before we had a chance to increment the reference count - return new ReportTimelineMissingSegmentQueryRunner(descriptor).run(query, responseContext); + return new ReportTimelineMissingSegmentQueryRunner(descriptor).run(queryPlus, responseContext); } } } diff --git a/processing/src/main/java/io/druid/query/ReportTimelineMissingSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/ReportTimelineMissingSegmentQueryRunner.java index a8b6c5e2ffd..7b60899249b 100644 --- a/processing/src/main/java/io/druid/query/ReportTimelineMissingSegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ReportTimelineMissingSegmentQueryRunner.java @@ -39,7 +39,7 @@ public class ReportTimelineMissingSegmentQueryRunner implements QueryRunner run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { List missingSegments = (List) responseContext.get(Result.MISSING_SEGMENTS_KEY); diff --git a/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java b/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java index b6276a107f3..e160cbf75b5 100644 --- a/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java @@ -38,9 +38,10 @@ public abstract class ResultMergeQueryRunner extends BySegmentSkippingQueryRu } @Override - public Sequence doRun(QueryRunner baseRunner, Query query, Map context) + public Sequence doRun(QueryRunner baseRunner, QueryPlus queryPlus, Map context) { - return CombiningSequence.create(baseRunner.run(query, context), makeOrdering(query), createMergeFn(query)); + Query query = queryPlus.getQuery(); + return CombiningSequence.create(baseRunner.run(queryPlus, context), makeOrdering(query), createMergeFn(query)); } protected abstract Ordering makeOrdering(Query query); diff --git a/processing/src/main/java/io/druid/query/RetryQueryRunner.java b/processing/src/main/java/io/druid/query/RetryQueryRunner.java index 57a5cde5124..dc0a6df06e8 100644 --- a/processing/src/main/java/io/druid/query/RetryQueryRunner.java +++ b/processing/src/main/java/io/druid/query/RetryQueryRunner.java @@ -61,10 +61,10 @@ public class RetryQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final QueryPlus queryPlus, final Map context) { final List> listOfSequences = Lists.newArrayList(); - listOfSequences.add(baseRunner.run(query, context)); + listOfSequences.add(baseRunner.run(queryPlus, context)); return new YieldingSequenceBase() { @@ -80,12 +80,12 @@ public class RetryQueryRunner implements QueryRunner log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i); context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); - final Query retryQuery = query.withQuerySegmentSpec( + final QueryPlus retryQueryPlus = queryPlus.withQuerySegmentSpec( new MultipleSpecificSegmentSpec( missingSegments ) ); - Sequence retrySequence = baseRunner.run(retryQuery, context); + Sequence retrySequence = baseRunner.run(retryQueryPlus, context); listOfSequences.add(retrySequence); missingSegments = getMissingSegments(context); if (missingSegments.isEmpty()) { @@ -99,7 +99,7 @@ public class RetryQueryRunner implements QueryRunner } return new MergeSequence<>( - query.getResultOrdering(), + queryPlus.getQuery().getResultOrdering(), Sequences.simple(listOfSequences)).toYielder( initValue, accumulator ); diff --git a/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java b/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java index e3dc7356c24..b2bc013e1dc 100644 --- a/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java +++ b/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java @@ -36,13 +36,13 @@ public class SubqueryQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, Map responseContext) + public Sequence run(final QueryPlus queryPlus, Map responseContext) { - DataSource dataSource = query.getDataSource(); + DataSource dataSource = queryPlus.getQuery().getDataSource(); if (dataSource instanceof QueryDataSource) { - return run((Query) ((QueryDataSource) dataSource).getQuery(), responseContext); + return run(queryPlus.withQuery((Query) ((QueryDataSource) dataSource).getQuery()), responseContext); } else { - return baseRunner.run(query, responseContext); + return baseRunner.run(queryPlus, responseContext); } } } diff --git a/processing/src/main/java/io/druid/query/TimewarpOperator.java b/processing/src/main/java/io/druid/query/TimewarpOperator.java index 0444d95d518..d6a9e8ac459 100644 --- a/processing/src/main/java/io/druid/query/TimewarpOperator.java +++ b/processing/src/main/java/io/druid/query/TimewarpOperator.java @@ -81,18 +81,18 @@ public class TimewarpOperator implements PostProcessingOperator return new QueryRunner() { @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { final long offset = computeOffset(now); - final Interval interval = query.getIntervals().get(0); + final Interval interval = queryPlus.getQuery().getIntervals().get(0); final Interval modifiedInterval = new Interval( Math.min(interval.getStartMillis() + offset, now + offset), Math.min(interval.getEndMillis() + offset, now + offset) ); return Sequences.map( baseRunner.run( - query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval))), + queryPlus.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval))), responseContext ), new Function() @@ -113,7 +113,7 @@ public class TimewarpOperator implements PostProcessingOperator final DateTime maxTime = boundary.getMaxTime(); - return (T) ((TimeBoundaryQuery) query).buildResult( + return (T) ((TimeBoundaryQuery) queryPlus.getQuery()).buildResult( new DateTime(Math.min(res.getTimestamp().getMillis() - offset, now)), minTime != null ? minTime.minus(offset) : null, maxTime != null ? new DateTime(Math.min(maxTime.getMillis() - offset, now)) : null diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index cfb337d9c94..815838416c5 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -39,8 +39,9 @@ public class UnionQueryRunner implements QueryRunner } @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { + Query query = queryPlus.getQuery(); DataSource dataSource = query.getDataSource(); if (dataSource instanceof UnionDataSource) { @@ -55,7 +56,7 @@ public class UnionQueryRunner implements QueryRunner public Sequence apply(DataSource singleSource) { return baseRunner.run( - query.withDataSource(singleSource), + queryPlus.withQuery(query.withDataSource(singleSource)), responseContext ); } @@ -64,7 +65,7 @@ public class UnionQueryRunner implements QueryRunner ) ); } else { - return baseRunner.run(query, responseContext); + return baseRunner.run(queryPlus, responseContext); } } diff --git a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java index c137f75c9b1..b60522bd696 100644 --- a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java @@ -25,6 +25,7 @@ import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.query.ChainedExecutionQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -88,15 +89,16 @@ public class DataSourceMetadataQueryRunnerFactory @Override public Sequence> run( - Query> input, + QueryPlus> input, Map responseContext ) { - if (!(input instanceof DataSourceMetadataQuery)) { - throw new ISE("Got a [%s] which isn't a %s", input.getClass().getCanonicalName(), DataSourceMetadataQuery.class); + Query> query = input.getQuery(); + if (!(query instanceof DataSourceMetadataQuery)) { + throw new ISE("Got a [%s] which isn't a %s", query.getClass().getCanonicalName(), DataSourceMetadataQuery.class); } - final DataSourceMetadataQuery legacyQuery = (DataSourceMetadataQuery) input; + final DataSourceMetadataQuery legacyQuery = (DataSourceMetadataQuery) query; return new BaseSequence<>( new BaseSequence.IteratorMaker, Iterator>>() diff --git a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java index 1c5b2eb00ea..03b52599cc3 100644 --- a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java @@ -33,6 +33,7 @@ import io.druid.query.CacheStrategy; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; import io.druid.query.QueryMetrics; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; @@ -93,15 +94,15 @@ public class DataSourceQueryQueryToolChest @Override protected Sequence> doRun( QueryRunner> baseRunner, - Query> input, + QueryPlus> input, Map context ) { - DataSourceMetadataQuery query = (DataSourceMetadataQuery) input; + DataSourceMetadataQuery query = (DataSourceMetadataQuery) input.getQuery(); return Sequences.simple( query.mergeResults( Sequences.toList( - baseRunner.run(query, context), + baseRunner.run(input, context), Lists.>newArrayList() ) ) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index ddf9c4277c3..19d8c541d7d 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -41,9 +41,9 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.query.CacheStrategy; import io.druid.query.DataSource; import io.druid.query.IntervalChunkingQueryRunnerDecorator; -import io.druid.query.Query; import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.SubqueryQueryRunner; @@ -113,13 +113,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - if (QueryContexts.isBySegment(query)) { - return runner.run(query, responseContext); + if (QueryContexts.isBySegment(queryPlus.getQuery())) { + return runner.run(queryPlus, responseContext); } - final GroupByQuery groupByQuery = (GroupByQuery) query; + final GroupByQuery groupByQuery = (GroupByQuery) queryPlus.getQuery(); if (strategySelector.strategize(groupByQuery).doMergeResults(groupByQuery)) { return initAndMergeGroupByResults( groupByQuery, @@ -127,7 +127,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - GroupByQuery groupByQuery = (GroupByQuery) query; + GroupByQuery groupByQuery = (GroupByQuery) queryPlus.getQuery(); if (groupByQuery.getDimFilter() != null) { groupByQuery = groupByQuery.withDimFilter(groupByQuery.getDimFilter().optimize()); } @@ -365,7 +365,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - return strategySelector.strategize((GroupByQuery) query).mergeRunners(queryExecutor, queryRunners).run( - query, - responseContext + QueryRunner rowQueryRunner = strategySelector.strategize((GroupByQuery) queryPlus.getQuery()).mergeRunners( + queryExecutor, + queryRunners ); + return rowQueryRunner.run(queryPlus, responseContext); } }; } @@ -96,13 +98,14 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory run(Query input, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - if (!(input instanceof GroupByQuery)) { - throw new ISE("Got a [%s] which isn't a %s", input.getClass(), GroupByQuery.class); + Query query = queryPlus.getQuery(); + if (!(query instanceof GroupByQuery)) { + throw new ISE("Got a [%s] which isn't a %s", query.getClass(), GroupByQuery.class); } - return strategySelector.strategize((GroupByQuery) input).process((GroupByQuery) input, adapter); + return strategySelector.strategize((GroupByQuery) query).process((GroupByQuery) query, adapter); } } } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index de6fee13b21..242915ddf13 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -45,9 +45,9 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.logger.Logger; import io.druid.query.AbstractPrioritizedCallable; import io.druid.query.ChainedExecutionQueryRunner; -import io.druid.query.Query; import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; import io.druid.query.ResourceLimitExceededException; @@ -105,9 +105,9 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner } @Override - public Sequence run(final Query queryParam, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { - final GroupByQuery query = (GroupByQuery) queryParam; + final GroupByQuery query = (GroupByQuery) queryPlus.getQuery(); final GroupByQueryConfig querySpecificConfig = config.withOverrides(query); // CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION is here because realtime servers use nested mergeRunners calls @@ -119,12 +119,13 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, false ); - final GroupByQuery queryForRunners = query.withOverriddenContext( - ImmutableMap.of(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, true) + final QueryPlus queryPlusForRunners = queryPlus.withQuery( + query.withOverriddenContext(ImmutableMap.of(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, true)) ); if (QueryContexts.isBySegment(query) || forceChainedExecution) { - return new ChainedExecutionQueryRunner(exec, queryWatcher, queryables).run(query, responseContext); + ChainedExecutionQueryRunner runner = new ChainedExecutionQueryRunner<>(exec, queryWatcher, queryables); + return runner.run(queryPlusForRunners, responseContext); } final boolean isSingleThreaded = querySpecificConfig.isSingleThreaded(); @@ -225,7 +226,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner Releaser bufferReleaser = mergeBufferHolder.increment(); Releaser grouperReleaser = grouperHolder.increment() ) { - final AggregateResult retVal = input.run(queryForRunners, responseContext) + final AggregateResult retVal = input.run(queryPlusForRunners, responseContext) .accumulate( AggregateResult.ok(), accumulator diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java index 4ba495356ba..0cf98c31350 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -37,6 +37,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.GroupByMergedQueryRunner; import io.druid.query.IntervalChunkingQueryRunnerDecorator; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; import io.druid.query.aggregation.AggregatorFactory; @@ -120,25 +121,27 @@ public class GroupByStrategyV1 implements GroupByStrategy configSupplier.get(), bufferPool, baseRunner.run( - new GroupByQuery.Builder(query) - // Don't do post aggs until the end of this method. - .setPostAggregatorSpecs(ImmutableList.of()) - // Don't do "having" clause until the end of this method. - .setHavingSpec(null) - .setLimitSpec(NoopLimitSpec.instance()) - .overrideContext( - ImmutableMap.of( - "finalize", false, - //setting sort to false avoids unnecessary sorting while merging results. we only need to sort - //in the end when returning results to user. (note this is only respected by groupBy v1) - GroupByQueryHelper.CTX_KEY_SORT_RESULTS, false, - //no merging needed at historicals because GroupByQueryRunnerFactory.mergeRunners(..) would - //return merged results. (note this is only respected by groupBy v1) - GroupByQueryQueryToolChest.GROUP_BY_MERGE_KEY, false, - GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V1 + QueryPlus.wrap( + new GroupByQuery.Builder(query) + // Don't do post aggs until the end of this method. + .setPostAggregatorSpecs(ImmutableList.of()) + // Don't do "having" clause until the end of this method. + .setHavingSpec(null) + .setLimitSpec(NoopLimitSpec.instance()) + .overrideContext( + ImmutableMap.of( + "finalize", false, + //set sort to false avoids unnecessary sorting while merging results. we only need to sort + //in the end when returning results to user. (note this is only respected by groupBy v1) + GroupByQueryHelper.CTX_KEY_SORT_RESULTS, false, + //no merging needed at historicals because GroupByQueryRunnerFactory.mergeRunners(..) would + //return merged results. (note this is only respected by groupBy v1) + GroupByQueryQueryToolChest.GROUP_BY_MERGE_KEY, false, + GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V1 + ) ) - ) - .build(), + .build() + ), responseContext ), true diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index e13b4e7c6fb..b23f3cd50a9 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -48,6 +48,7 @@ import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; import io.druid.query.ResourceLimitExceededException; @@ -233,21 +234,23 @@ public class GroupByStrategyV2 implements GroupByStrategy return query.postProcess( Sequences.map( mergingQueryRunner.run( - new GroupByQuery.Builder(query) - // Don't do post aggs until the end of this method. - .setPostAggregatorSpecs(ImmutableList.of()) - // Don't do "having" clause until the end of this method. - .setHavingSpec(null) - .setLimitSpec(NoopLimitSpec.instance()) - .overrideContext( - ImmutableMap.of( - "finalize", false, - GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2, - CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp == null ? "" : String.valueOf(fudgeTimestamp.getMillis()), - CTX_KEY_OUTERMOST, false + QueryPlus.wrap( + new GroupByQuery.Builder(query) + // Don't do post aggs until the end of this method. + .setPostAggregatorSpecs(ImmutableList.of()) + // Don't do "having" clause until the end of this method. + .setHavingSpec(null) + .setLimitSpec(NoopLimitSpec.instance()) + .overrideContext( + ImmutableMap.of( + "finalize", false, + GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2, + CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp == null ? "" : String.valueOf(fudgeTimestamp.getMillis()), + CTX_KEY_OUTERMOST, false + ) ) - ) - .build(), + .build() + ), responseContext ), new Function() @@ -304,7 +307,7 @@ public class GroupByStrategyV2 implements GroupByStrategy return mergeResults(new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { return results; } diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index a13f87ccd39..5e284b01dbb 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -43,6 +43,7 @@ import io.druid.query.DefaultGenericQueryMetricsFactory; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; import io.druid.query.QueryMetrics; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.ResultMergeQueryRunner; @@ -102,13 +103,14 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest doRun( QueryRunner baseRunner, - Query query, + QueryPlus queryPlus, Map context ) { + Query query = queryPlus.getQuery(); return new MappedSequence<>( CombiningSequence.create( - baseRunner.run(query, context), + baseRunner.run(queryPlus, context), makeOrdering(query), createMergeFn(query) ), diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index 475d98453df..5c4aaaceca3 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -36,6 +36,7 @@ import io.druid.query.ConcatQueryRunner; import io.druid.query.Query; import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -83,9 +84,9 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory() { @Override - public Sequence run(Query inQ, Map responseContext) + public Sequence run(QueryPlus inQ, Map responseContext) { - SegmentMetadataQuery query = (SegmentMetadataQuery) inQ; + SegmentMetadataQuery query = (SegmentMetadataQuery) inQ.getQuery(); final SegmentAnalyzer analyzer = new SegmentAnalyzer(query.getAnalysisTypes()); final Map analyzedColumns = analyzer.analyze(segment); final long numRows = analyzer.numRows(segment); @@ -197,10 +198,11 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory run( - final Query query, + final QueryPlus queryPlus, final Map responseContext ) { + final Query query = queryPlus.getQuery(); final int priority = QueryContexts.getPriority(query); ListenableFuture> future = queryExecutor.submit( new AbstractPrioritizedCallable>(priority) @@ -209,7 +211,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory call() throws Exception { return Sequences.simple( - Sequences.toList(input.run(query, responseContext), new ArrayList<>()) + Sequences.toList(input.run(queryPlus, responseContext), new ArrayList<>()) ); } } diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index d84a7396e3e..592418e0a8c 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -42,6 +42,7 @@ import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; import io.druid.query.QueryContexts; import io.druid.query.QueryMetrics; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; @@ -348,14 +349,15 @@ public class SearchQueryQueryToolChest extends QueryToolChest> run( - Query> query, Map responseContext + QueryPlus> queryPlus, Map responseContext ) { - SearchQuery searchQuery = (SearchQuery) query; + SearchQuery searchQuery = (SearchQuery) queryPlus.getQuery(); if (searchQuery.getDimensionsFilter() != null) { searchQuery = searchQuery.withDimFilter(searchQuery.getDimensionsFilter().optimize()); + queryPlus = queryPlus.withQuery(searchQuery); } - return runner.run(searchQuery, responseContext); + return runner.run(queryPlus, responseContext); } } , this), config @@ -378,23 +380,24 @@ public class SearchQueryQueryToolChest extends QueryToolChest> run( - Query> input, + QueryPlus> queryPlus, Map responseContext ) { + Query> input = queryPlus.getQuery(); if (!(input instanceof SearchQuery)) { throw new ISE("Can only handle [%s], got [%s]", SearchQuery.class, input.getClass()); } final SearchQuery query = (SearchQuery) input; if (query.getLimit() < config.getMaxSearchLimit()) { - return runner.run(query, responseContext); + return runner.run(queryPlus, responseContext); } final boolean isBySegment = QueryContexts.isBySegment(query); return Sequences.map( - runner.run(query.withLimit(config.getMaxSearchLimit()), responseContext), + runner.run(queryPlus.withQuery(query.withLimit(config.getMaxSearchLimit())), responseContext), new Function, Result>() { @Override diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java b/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java index cde2e4567db..9367efb5f08 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java @@ -30,6 +30,7 @@ import io.druid.java.util.common.guava.FunctionalIterable; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.Result; import io.druid.query.dimension.ColumnSelectorStrategy; @@ -186,10 +187,11 @@ public class SearchQueryRunner implements QueryRunner> @Override public Sequence> run( - final Query> input, + final QueryPlus> queryPlus, Map responseContext ) { + Query> input = queryPlus.getQuery(); if (!(input instanceof SearchQuery)) { throw new ISE("Got a [%s] which isn't a %s", input.getClass(), SearchQuery.class); } diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index 45948efbe71..7dabd959ec1 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -43,6 +43,7 @@ import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; import io.druid.query.QueryMetrics; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; @@ -348,14 +349,15 @@ public class SelectQueryQueryToolChest extends QueryToolChest> run( - Query> query, Map responseContext + QueryPlus> queryPlus, Map responseContext ) { - SelectQuery selectQuery = (SelectQuery) query; + SelectQuery selectQuery = (SelectQuery) queryPlus.getQuery(); if (selectQuery.getDimensionsFilter() != null) { selectQuery = selectQuery.withDimFilter(selectQuery.getDimensionsFilter().optimize()); + queryPlus = queryPlus.withQuery(selectQuery); } - return runner.run(selectQuery, responseContext); + return runner.run(queryPlus, responseContext); } }, this); } diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java index 19e4bbd1d2c..2b6d2359d1a 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java @@ -24,6 +24,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.query.ChainedExecutionQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -90,10 +91,11 @@ public class SelectQueryRunnerFactory @Override public Sequence> run( - Query> input, + QueryPlus> queryPlus, Map responseContext ) { + Query> input = queryPlus.getQuery(); if (!(input instanceof SelectQuery)) { throw new ISE("Got a [%s] which isn't a %s", input.getClass(), SelectQuery.class); } diff --git a/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java index c331ef7556d..e60c545c5cf 100644 --- a/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java @@ -29,6 +29,7 @@ import io.druid.java.util.common.guava.Yielder; import io.druid.java.util.common.guava.Yielders; import io.druid.java.util.common.guava.YieldingAccumulator; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.Result; import io.druid.query.SegmentDescriptor; @@ -55,9 +56,10 @@ public class SpecificSegmentQueryRunner implements QueryRunner } @Override - public Sequence run(final Query input, final Map responseContext) + public Sequence run(final QueryPlus input, final Map responseContext) { - final Query query = input.withQuerySegmentSpec(specificSpec); + final QueryPlus queryPlus = input.withQuerySegmentSpec(specificSpec); + final Query query = queryPlus.getQuery(); final Thread currThread = Thread.currentThread(); final String currThreadName = currThread.getName(); @@ -69,7 +71,7 @@ public class SpecificSegmentQueryRunner implements QueryRunner @Override public Sequence get() { - return base.run(query, responseContext); + return base.run(queryPlus, responseContext); } } ); diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index a6569baed71..8c7abc576d2 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -32,9 +32,10 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.query.BySegmentSkippingQueryRunner; import io.druid.query.CacheStrategy; import io.druid.query.DefaultGenericQueryMetricsFactory; +import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; import io.druid.query.QueryMetrics; -import io.druid.query.GenericQueryMetricsFactory; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; @@ -109,13 +110,15 @@ public class TimeBoundaryQueryQueryToolChest { @Override protected Sequence> doRun( - QueryRunner> baseRunner, Query> input, Map context + QueryRunner> baseRunner, + QueryPlus> input, + Map context ) { - TimeBoundaryQuery query = (TimeBoundaryQuery) input; + TimeBoundaryQuery query = (TimeBoundaryQuery) input.getQuery(); return Sequences.simple( query.mergeResults( - Sequences.toList(baseRunner.run(query, context), Lists.>newArrayList()) + Sequences.toList(baseRunner.run(input, context), Lists.>newArrayList()) ) ); } diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java index 913933ffd33..231368f2c51 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java @@ -29,6 +29,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.ChainedExecutionQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerHelper; @@ -131,10 +132,11 @@ public class TimeBoundaryQueryRunnerFactory @Override public Sequence> run( - final Query> input, + final QueryPlus> queryPlus, final Map responseContext ) { + Query> input = queryPlus.getQuery(); if (!(input instanceof TimeBoundaryQuery)) { throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TimeBoundaryQuery.class); } diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 11f9209bedd..473775b71bc 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -33,6 +33,7 @@ import io.druid.java.util.common.guava.nary.BinaryFn; import io.druid.query.CacheStrategy; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; @@ -92,14 +93,14 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> doRun( QueryRunner> baseRunner, - Query> query, + QueryPlus> queryPlus, Map context ) { return super.doRun( baseRunner, // Don't do post aggs until makePostComputeManipulatorFn() is called - ((TimeseriesQuery) query).withPostAggregatorSpecs(ImmutableList.of()), + queryPlus.withQuery(((TimeseriesQuery) queryPlus.getQuery()).withPostAggregatorSpecs(ImmutableList.of())), context ); } @@ -234,14 +235,15 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> run( - Query> query, Map responseContext + QueryPlus> queryPlus, Map responseContext ) { - TimeseriesQuery timeseriesQuery = (TimeseriesQuery) query; + TimeseriesQuery timeseriesQuery = (TimeseriesQuery) queryPlus.getQuery(); if (timeseriesQuery.getDimensionsFilter() != null) { timeseriesQuery = timeseriesQuery.withDimFilter(timeseriesQuery.getDimensionsFilter().optimize()); + queryPlus = queryPlus.withQuery(timeseriesQuery); } - return runner.run(timeseriesQuery, responseContext); + return runner.run(queryPlus, responseContext); } }, this); } diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java index e6ff92b16e2..79922df83e9 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java @@ -24,6 +24,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.query.ChainedExecutionQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -91,10 +92,11 @@ public class TimeseriesQueryRunnerFactory @Override public Sequence> run( - Query> input, + QueryPlus> queryPlus, Map responseContext ) { + Query> input = queryPlus.getQuery(); if (!(input instanceof TimeseriesQuery)) { throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TimeseriesQuery.class); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 7d9be82c6fb..12e489a85a6 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -37,6 +37,7 @@ import io.druid.query.CacheStrategy; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; import io.druid.query.QueryContexts; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; @@ -416,26 +417,27 @@ public class TopNQueryQueryToolChest extends QueryToolChest> run( - Query> query, Map responseContext + QueryPlus> queryPlus, Map responseContext ) { - TopNQuery topNQuery = (TopNQuery) query; + TopNQuery topNQuery = (TopNQuery) queryPlus.getQuery(); if (topNQuery.getDimensionsFilter() != null) { topNQuery = topNQuery.withDimFilter(topNQuery.getDimensionsFilter().optimize()); } final TopNQuery delegateTopNQuery = topNQuery; if (TopNQueryEngine.canApplyExtractionInPost(delegateTopNQuery)) { final DimensionSpec dimensionSpec = delegateTopNQuery.getDimensionSpec(); - return runner.run( + QueryPlus> delegateQueryPlus = queryPlus.withQuery( delegateTopNQuery.withDimensionSpec( new DefaultDimensionSpec( dimensionSpec.getDimension(), dimensionSpec.getOutputName() ) - ), responseContext + ) ); + return runner.run(delegateQueryPlus, responseContext); } else { - return runner.run(delegateTopNQuery, responseContext); + return runner.run(queryPlus.withQuery(delegateTopNQuery), responseContext); } } } @@ -455,12 +457,12 @@ public class TopNQueryQueryToolChest extends QueryToolChest> run( - final Query> query, final Map responseContext + final QueryPlus> queryPlus, final Map responseContext ) { // thresholdRunner.run throws ISE if query is not TopNQuery - final Sequence> resultSequence = thresholdRunner.run(query, responseContext); - final TopNQuery topNQuery = (TopNQuery) query; + final Sequence> resultSequence = thresholdRunner.run(queryPlus, responseContext); + final TopNQuery topNQuery = (TopNQuery) queryPlus.getQuery(); if (!TopNQueryEngine.canApplyExtractionInPost(topNQuery)) { return resultSequence; } else { @@ -521,10 +523,11 @@ public class TopNQueryQueryToolChest extends QueryToolChest> run( - Query> input, + QueryPlus> queryPlus, Map responseContext ) { + Query> input = queryPlus.getQuery(); if (!(input instanceof TopNQuery)) { throw new ISE("Can only handle [%s], got [%s]", TopNQuery.class, input.getClass()); } @@ -532,13 +535,13 @@ public class TopNQueryQueryToolChest extends QueryToolChest minTopNThreshold) { - return runner.run(query, responseContext); + return runner.run(queryPlus, responseContext); } final boolean isBySegment = QueryContexts.isBySegment(query); return Sequences.map( - runner.run(query.withThreshold(minTopNThreshold), responseContext), + runner.run(queryPlus.withQuery(query.withThreshold(minTopNThreshold)), responseContext), new Function, Result>() { @Override diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java index 35af25e12a6..2e392f0b75e 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java @@ -25,7 +25,7 @@ import io.druid.guice.annotations.Global; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.query.ChainedExecutionQueryRunner; -import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -65,15 +65,15 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory> run( - Query> input, + QueryPlus> input, Map responseContext ) { - if (!(input instanceof TopNQuery)) { + if (!(input.getQuery() instanceof TopNQuery)) { throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TopNQuery.class); } - return queryEngine.query((TopNQuery) input, segment.asStorageAdapter()); + return queryEngine.query((TopNQuery) input.getQuery(), segment.asStorageAdapter()); } }; diff --git a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java index b6c074e7eeb..a5761454f63 100644 --- a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java @@ -61,7 +61,7 @@ public class AsyncQueryRunnerTest QueryRunner baseRunner = new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { try { latch.await(); @@ -85,7 +85,7 @@ public class AsyncQueryRunnerTest QueryRunner baseRunner = new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { try { Thread.sleep(Long.MAX_VALUE); @@ -117,7 +117,7 @@ public class AsyncQueryRunnerTest QueryRunner baseRunner = new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) { return null; } + public Sequence run(QueryPlus queryPlus, Map responseContext) { return null; } }; QueryWatcher mock = EasyMock.createMock(QueryWatcher.class); diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java index 311aaaa798f..935957eb079 100644 --- a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -330,7 +330,7 @@ public class ChainedExecutionQueryRunnerTest } @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { // do a lot of work synchronized (this) { diff --git a/processing/src/test/java/io/druid/query/IntervalChunkingQueryRunnerTest.java b/processing/src/test/java/io/druid/query/IntervalChunkingQueryRunnerTest.java index 194b12e6bdc..6ec306db2fc 100644 --- a/processing/src/test/java/io/druid/query/IntervalChunkingQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/IntervalChunkingQueryRunnerTest.java @@ -60,13 +60,13 @@ public class IntervalChunkingQueryRunnerTest @Test public void testDefaultNoChunking() { - Query query = queryBuilder.intervals("2014/2016").build(); + QueryPlus queryPlus = QueryPlus.wrap(queryBuilder.intervals("2014/2016").build()); - EasyMock.expect(baseRunner.run(query, Collections.EMPTY_MAP)).andReturn(Sequences.empty()); + EasyMock.expect(baseRunner.run(queryPlus, Collections.EMPTY_MAP)).andReturn(Sequences.empty()); EasyMock.replay(baseRunner); QueryRunner runner = decorator.decorate(baseRunner, toolChest); - runner.run(query, Collections.EMPTY_MAP); + runner.run(queryPlus, Collections.EMPTY_MAP); EasyMock.verify(baseRunner); } diff --git a/processing/src/test/java/io/druid/query/LegacyApiQueryRunnerTest.java b/processing/src/test/java/io/druid/query/LegacyApiQueryRunnerTest.java new file mode 100644 index 00000000000..31bf7825fc4 --- /dev/null +++ b/processing/src/test/java/io/druid/query/LegacyApiQueryRunnerTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.google.common.collect.ImmutableList; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; +import io.druid.query.spec.MultipleIntervalSegmentSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Tests that if a QueryRunner overrides a legacy {@link QueryRunner#run(Query, Map)} method, it still works. This + * test should be removed when {@link QueryRunner#run(Query, Map)} is removed. + */ +public class LegacyApiQueryRunnerTest +{ + private static class LegacyApiQueryRunner implements QueryRunner + { + /** + * Overrides legacy API. + */ + @Override + public Sequence run(Query query, Map responseContext) + { + return Sequences.empty(); + } + } + + @Test + public void testQueryRunnerLegacyApi() + { + final Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(new Interval("0/100"))), + false, + new HashMap() + ); + + Map context = new HashMap<>(); + Assert.assertEquals(Sequences.empty(), new LegacyApiQueryRunner<>().run(QueryPlus.wrap(query), context)); + } +} diff --git a/processing/src/test/java/io/druid/query/QueryContextsTest.java b/processing/src/test/java/io/druid/query/QueryContextsTest.java index 2a6945b9d3c..81f23374a21 100644 --- a/processing/src/test/java/io/druid/query/QueryContextsTest.java +++ b/processing/src/test/java/io/druid/query/QueryContextsTest.java @@ -21,67 +21,15 @@ package io.druid.query; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import io.druid.query.filter.DimFilter; import io.druid.query.spec.MultipleIntervalSegmentSpec; -import io.druid.query.spec.QuerySegmentSpec; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; import java.util.HashMap; -import java.util.Map; public class QueryContextsTest { - private static class TestQuery extends BaseQuery - { - - public TestQuery(DataSource dataSource, QuerySegmentSpec querySegmentSpec, boolean descending, Map context) - { - super(dataSource, querySegmentSpec, descending, context); - } - - @Override - public boolean hasFilters() - { - return false; - } - - @Override - public DimFilter getFilter() - { - return null; - } - - @Override - public String getType() - { - return null; - } - - @Override - public Query withQuerySegmentSpec(QuerySegmentSpec spec) - { - return null; - } - - @Override - public Query withDataSource(DataSource dataSource) - { - return null; - } - - @Override - public Query withOverriddenContext(Map contextOverride) - { - return new TestQuery( - getDataSource(), - getQuerySegmentSpec(), - isDescending(), - BaseQuery.computeOverriddenContext(getContext(), contextOverride) - ); - } - } @Test public void testDefaultQueryTimeout() diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index c1206adae34..a24d92b5a4e 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -501,9 +501,9 @@ public class QueryRunnerTestHelper return new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - return qr.run(query, responseContext); + return qr.run(queryPlus, responseContext); } @Override @@ -526,8 +526,9 @@ public class QueryRunnerTestHelper new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { + Query query = queryPlus.getQuery(); List segments = Lists.newArrayList(); for (Interval interval : query.getIntervals()) { segments.addAll(timeline.lookup(interval)); @@ -535,7 +536,7 @@ public class QueryRunnerTestHelper List> sequences = Lists.newArrayList(); for (TimelineObjectHolder holder : toolChest.filterSegments(query, segments)) { Segment segment = holder.getObject().getChunk(0).getObject(); - Query running = query.withQuerySegmentSpec( + QueryPlus queryPlusRunning = queryPlus.withQuerySegmentSpec( new SpecificSegmentSpec( new SegmentDescriptor( holder.getInterval(), @@ -544,7 +545,7 @@ public class QueryRunnerTestHelper ) ) ); - sequences.add(factory.createRunner(segment).run(running, responseContext)); + sequences.add(factory.createRunner(segment).run(queryPlusRunning, responseContext)); } return new MergeSequence<>(query.getResultOrdering(), Sequences.simple(sequences)); } @@ -568,9 +569,9 @@ public class QueryRunnerTestHelper return new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - return delegate.run(query, responseContext); + return delegate.run(queryPlus, responseContext); } }; } diff --git a/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java b/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java index d7ae4b8b7c1..40f1fff4ad1 100644 --- a/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java @@ -71,7 +71,7 @@ public class RetryQueryRunnerTest new QueryRunner>() { @Override - public Sequence> run(Query query, Map context) + public Sequence> run(QueryPlus queryPlus, Map context) { ((List) context.get(Result.MISSING_SEGMENTS_KEY)).add( new SegmentDescriptor( @@ -128,7 +128,7 @@ public class RetryQueryRunnerTest { @Override public Sequence> run( - Query> query, + QueryPlus> queryPlus, Map context ) { @@ -195,7 +195,7 @@ public class RetryQueryRunnerTest { @Override public Sequence> run( - Query> query, + QueryPlus> queryPlus, Map context ) { @@ -261,7 +261,7 @@ public class RetryQueryRunnerTest { @Override public Sequence> run( - Query> query, + QueryPlus> queryPlus, Map context ) { @@ -313,10 +313,11 @@ public class RetryQueryRunnerTest { @Override public Sequence> run( - Query> query, + QueryPlus> queryPlus, Map context ) { + final Query> query = queryPlus.getQuery(); if ((int) context.get("count") == 0) { // assume 2 missing segments at first run ((List) context.get(Result.MISSING_SEGMENTS_KEY)).add( diff --git a/processing/src/test/java/io/druid/query/TestQuery.java b/processing/src/test/java/io/druid/query/TestQuery.java new file mode 100644 index 00000000000..bd478cc622c --- /dev/null +++ b/processing/src/test/java/io/druid/query/TestQuery.java @@ -0,0 +1,75 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import io.druid.query.filter.DimFilter; +import io.druid.query.spec.QuerySegmentSpec; + +import java.util.Map; + +class TestQuery extends BaseQuery +{ + + public TestQuery(DataSource dataSource, QuerySegmentSpec querySegmentSpec, boolean descending, Map context) + { + super(dataSource, querySegmentSpec, descending, context); + } + + @Override + public boolean hasFilters() + { + return false; + } + + @Override + public DimFilter getFilter() + { + return null; + } + + @Override + public String getType() + { + return null; + } + + @Override + public Query withQuerySegmentSpec(QuerySegmentSpec spec) + { + return null; + } + + @Override + public Query withDataSource(DataSource dataSource) + { + return null; + } + + @Override + public Query withOverriddenContext(Map contextOverride) + { + return new TestQuery( + getDataSource(), + getQuerySegmentSpec(), + isDescending(), + BaseQuery.computeOverriddenContext(getContext(), contextOverride) + ); + } +} diff --git a/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java b/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java index 2f8ae218108..133e435ed97 100644 --- a/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java +++ b/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java @@ -81,7 +81,7 @@ public class TimewarpOperatorTest { @Override public Sequence> run( - Query> query, + QueryPlus> queryPlus, Map responseContext ) { @@ -96,7 +96,7 @@ public class TimewarpOperatorTest new TimeseriesResultValue(ImmutableMap.of("metric", 3)) ), new Result<>( - query.getIntervals().get(0).getEnd(), + queryPlus.getQuery().getIntervals().get(0).getEnd(), new TimeseriesResultValue(ImmutableMap.of("metric", 5)) ) ) @@ -143,7 +143,7 @@ public class TimewarpOperatorTest { @Override public Sequence> run( - Query> query, + QueryPlus> queryPlus, Map responseContext ) { @@ -193,10 +193,11 @@ public class TimewarpOperatorTest { @Override public Sequence> run( - Query> query, + QueryPlus> queryPlus, Map responseContext ) { + final Query> query = queryPlus.getQuery(); return Sequences.simple( ImmutableList.of( new Result<>( diff --git a/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java index b7b914fe5ab..250006458ba 100644 --- a/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java @@ -39,11 +39,11 @@ public class UnionQueryRunnerTest QueryRunner baseRunner = new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { // verify that table datasource is passed to baseQueryRunner - Assert.assertTrue(query.getDataSource() instanceof TableDataSource); - String dsName = Iterables.getOnlyElement(query.getDataSource().getNames()); + Assert.assertTrue(queryPlus.getQuery().getDataSource() instanceof TableDataSource); + String dsName = Iterables.getOnlyElement(queryPlus.getQuery().getDataSource().getNames()); if (dsName.equals("ds1")) { responseContext.put("ds1", "ds1"); return Sequences.simple(Arrays.asList(1, 2, 3)); @@ -70,7 +70,7 @@ public class UnionQueryRunnerTest .aggregators(QueryRunnerTestHelper.commonAggregators) .build(); Map responseContext = Maps.newHashMap(); - Sequence result = runner.run(q, responseContext); + Sequence result = runner.run(q, responseContext); List res = Sequences.toList(result, Lists.newArrayList()); Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), res); diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index df73615e991..a782e39c5b7 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -50,6 +50,7 @@ import io.druid.java.util.common.guava.Yielder; import io.druid.java.util.common.guava.YieldingAccumulator; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; @@ -535,10 +536,10 @@ public class AggregationTestHelper return new QueryRunner() { @Override - public Sequence run(Query query, Map map) + public Sequence run(QueryPlus queryPlus, Map map) { try { - Sequence resultSeq = baseRunner.run(query, Maps.newHashMap()); + Sequence resultSeq = baseRunner.run(queryPlus, Maps.newHashMap()); final Yielder yielder = resultSeq.toYielder( null, new YieldingAccumulator() @@ -559,7 +560,7 @@ public class AggregationTestHelper List resultRows = Lists.transform( readQueryResultArrayFromString(resultStr), toolChest.makePreComputeManipulatorFn( - query, + queryPlus.getQuery(), MetricManipulatorFns.deserializing() ) ); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java index dc7648154da..26df8376601 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java @@ -33,6 +33,7 @@ import io.druid.java.util.common.guava.MergeSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.aggregation.AggregatorFactory; @@ -86,21 +87,22 @@ public class GroupByQueryRunnerFactoryTest new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { return factory.getToolchest().mergeResults( new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { + final Query query = queryPlus.getQuery(); try { return new MergeSequence( query.getResultOrdering(), Sequences.simple( Arrays.asList( - factory.createRunner(createSegment()).run(query, responseContext), - factory.createRunner(createSegment()).run(query, responseContext) + factory.createRunner(createSegment()).run(queryPlus, responseContext), + factory.createRunner(createSegment()).run(queryPlus, responseContext) ) ) ); @@ -110,7 +112,7 @@ public class GroupByQueryRunnerFactoryTest } } } - ).run(query, responseContext); + ).run(queryPlus, responseContext); } } ); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 147c8e1bf64..1d1a0c3290b 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -51,9 +51,9 @@ import io.druid.query.BySegmentResultValueClass; import io.druid.query.DruidProcessingConfig; import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; -import io.druid.query.Query; import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; @@ -2353,20 +2353,20 @@ public class GroupByQueryRunnerTest { @Override public Sequence run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { // simulate two daily segments - final Query query1 = query.withQuerySegmentSpec( + final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03"))) ); - final Query query2 = query.withQuerySegmentSpec( + final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) ); return new MergeSequence( - query.getResultOrdering(), + queryPlus.getQuery().getResultOrdering(), Sequences.simple( - Arrays.asList(runner.run(query1, responseContext), runner.run(query2, responseContext)) + Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)) ) ); } @@ -2649,20 +2649,20 @@ public class GroupByQueryRunnerTest { @Override public Sequence run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { // simulate two daily segments - final Query query1 = query.withQuerySegmentSpec( + final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03"))) ); - final Query query2 = query.withQuerySegmentSpec( + final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) ); return new MergeSequence( - query.getResultOrdering(), + queryPlus.getQuery().getResultOrdering(), Sequences.simple( - Arrays.asList(runner.run(query1, responseContext), runner.run(query2, responseContext)) + Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)) ) ); } @@ -3437,20 +3437,20 @@ public class GroupByQueryRunnerTest { @Override public Sequence run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { // simulate two daily segments - final Query query1 = query.withQuerySegmentSpec( + final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03"))) ); - final Query query2 = query.withQuerySegmentSpec( + final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) ); return new MergeSequence( - query.getResultOrdering(), + queryPlus.getQuery().getResultOrdering(), Sequences.simple( - Arrays.asList(runner.run(query1, responseContext), runner.run(query2, responseContext)) + Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)) ) ); } @@ -3770,20 +3770,20 @@ public class GroupByQueryRunnerTest { @Override public Sequence run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { // simulate two daily segments - final Query query1 = query.withQuerySegmentSpec( + final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03"))) ); - final Query query2 = query.withQuerySegmentSpec( + final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) ); return new MergeSequence( - query.getResultOrdering(), + queryPlus.getQuery().getResultOrdering(), Sequences.simple( - Arrays.asList(runner.run(query1, responseContext), runner.run(query2, responseContext)) + Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)) ) ); } @@ -3879,20 +3879,20 @@ public class GroupByQueryRunnerTest { @Override public Sequence run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { // simulate two daily segments - final Query query1 = query.withQuerySegmentSpec( + final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03"))) ); - final Query query2 = query.withQuerySegmentSpec( + final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) ); return new MergeSequence( - query.getResultOrdering(), + queryPlus.getQuery().getResultOrdering(), Sequences.simple( - Arrays.asList(runner.run(query1, responseContext), runner.run(query2, responseContext)) + Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)) ) ); } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java index 4ac25c7499b..09d121cedcd 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java @@ -30,7 +30,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; -import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; @@ -76,9 +76,9 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest return new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - TimeseriesQuery tsQuery = (TimeseriesQuery) query; + TimeseriesQuery tsQuery = (TimeseriesQuery) queryPlus.getQuery(); QueryRunner newRunner = factory.mergeRunners( MoreExecutors.sameThreadExecutor(), ImmutableList.>of(input) ); diff --git a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java index 527a3f0fa66..1d23741d327 100644 --- a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java @@ -30,6 +30,7 @@ import io.druid.java.util.common.logger.Logger; import io.druid.js.JavaScriptConfig; import io.druid.query.Druids; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; @@ -168,16 +169,16 @@ public class SearchQueryRunnerTest { @Override public Sequence> run( - Query> query, Map responseContext + QueryPlus> queryPlus, Map responseContext ) { - final Query> query1 = searchQuery.withQuerySegmentSpec( + final QueryPlus> queryPlus1 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-01-12/2011-02-28"))) ); - final Query> query2 = searchQuery.withQuerySegmentSpec( + final QueryPlus> queryPlus2 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-03-01/2011-04-15"))) ); - return Sequences.concat(runner.run(query1, responseContext), runner.run(query2, responseContext)); + return Sequences.concat(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)); } } ); diff --git a/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java b/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java index 04bf6d66dc8..b4f5bcc92a9 100644 --- a/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java @@ -32,7 +32,7 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.guava.Yielder; import io.druid.java.util.common.guava.YieldingAccumulator; import io.druid.query.Druids; -import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.Result; import io.druid.query.SegmentDescriptor; @@ -68,7 +68,7 @@ public class SpecificSegmentQueryRunnerTest new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { return new Sequence() { @@ -151,7 +151,7 @@ public class SpecificSegmentQueryRunnerTest new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { return Sequences.withEffect( Sequences.simple(Arrays.asList(value)), diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java index 69a1ea43515..b69072cd543 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java @@ -25,7 +25,7 @@ import com.google.common.collect.Maps; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.Druids; -import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; @@ -186,11 +186,11 @@ public class TimeSeriesUnionQueryRunnerTest { @Override public Sequence> run( - Query> query, + QueryPlus> queryPlus, Map responseContext ) { - if (query.getDataSource().equals(new TableDataSource("ds1"))) { + if (queryPlus.getQuery().getDataSource().equals(new TableDataSource("ds1"))) { return Sequences.simple(descending ? Lists.reverse(ds1) : ds1); } else { return Sequences.simple(descending ? Lists.reverse(ds2) : ds2); diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java index 9e59eb11cde..0a063aeb59a 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -27,7 +27,7 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Sequence; import io.druid.query.CacheStrategy; -import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; @@ -243,11 +243,11 @@ public class TopNQueryQueryToolChestTest @Override public Sequence> run( - Query> query, + QueryPlus> queryPlus, Map responseContext ) { - this.query = (TopNQuery) query; + this.query = (TopNQuery) queryPlus.getQuery(); return query.run(runner, responseContext); } } diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index c86123b0ec8..abacb196b10 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -59,6 +59,7 @@ import io.druid.query.BySegmentResultValueClass; import io.druid.query.CacheStrategy; import io.druid.query.Query; import io.druid.query.QueryContexts; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; @@ -139,8 +140,9 @@ public class CachingClusteredClient implements QueryRunner } @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { + final Query query = queryPlus.getQuery(); final QueryToolChest> toolChest = warehouse.getToolChest(query); final CacheStrategy> strategy = toolChest.getCacheStrategy(query); @@ -429,17 +431,12 @@ public class CachingClusteredClient implements QueryRunner final Sequence resultSeqToAdd; if (!server.isAssignable() || !populateCache || isBySegment) { // Direct server queryable if (!isBySegment) { - resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext); + resultSeqToAdd = clientQueryable.run(queryPlus.withQuerySegmentSpec(segmentSpec), responseContext); } else { // bySegment queries need to be de-serialized, see DirectDruidClient.run() - - @SuppressWarnings("unchecked") - final Query>> bySegmentQuery = - (Query>>) ((Query) query); - @SuppressWarnings("unchecked") final Sequence>> resultSequence = clientQueryable.run( - bySegmentQuery.withQuerySegmentSpec(segmentSpec), + queryPlus.withQuerySegmentSpec(segmentSpec), responseContext ); @@ -472,7 +469,7 @@ public class CachingClusteredClient implements QueryRunner } else { // Requires some manipulation on broker side @SuppressWarnings("unchecked") final Sequence>> runningSequence = clientQueryable.run( - rewrittenQuery.withQuerySegmentSpec(segmentSpec), + queryPlus.withQuery(rewrittenQuery.withQuerySegmentSpec(segmentSpec)), responseContext ); resultSeqToAdd = new MergeSequence( diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index 418098c9d55..17df72a5996 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -38,6 +38,7 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.logger.Logger; import io.druid.query.CacheStrategy; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.SegmentDescriptor; @@ -83,8 +84,9 @@ public class CachingQueryRunner implements QueryRunner } @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { + Query query = queryPlus.getQuery(); final CacheStrategy strategy = toolChest.getCacheStrategy(query); final boolean populateCache = CacheUtil.populateCacheOnDataNodes(query, strategy, cacheConfig); final boolean useCache = CacheUtil.useCacheOnDataNodes(query, strategy, cacheConfig); @@ -145,7 +147,7 @@ public class CachingQueryRunner implements QueryRunner return Sequences.withEffect( Sequences.map( - base.run(query, responseContext), + base.run(queryPlus, responseContext), new Function() { @Override @@ -190,7 +192,7 @@ public class CachingQueryRunner implements QueryRunner backgroundExecutorService ); } else { - return base.run(query, responseContext); + return base.run(queryPlus, responseContext); } } diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index c3311798a5a..98d04bea2a4 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -55,6 +55,7 @@ import io.druid.query.Query; import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryMetrics; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; @@ -131,8 +132,9 @@ public class DirectDruidClient implements QueryRunner } @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final QueryPlus queryPlus, final Map context) { + final Query query = queryPlus.getQuery(); QueryToolChest> toolChest = warehouse.getToolChest(query); boolean isBySegment = QueryContexts.isBySegment(query); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java index 4f94c4f3c3f..411d7500f93 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java @@ -31,16 +31,16 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.metamx.emitter.EmittingLogger; - import io.druid.common.guava.ThreadRenamingCallable; import io.druid.concurrent.Execs; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; -import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.ISE; import io.druid.java.util.common.concurrent.ScheduledExecutors; +import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.guava.Sequence; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; @@ -54,7 +54,6 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.VersioningPolicy; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; - import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; @@ -169,9 +168,9 @@ public class AppenderatorPlumber implements Plumber return new QueryRunner() { @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { - return query.run(appenderator, responseContext); + return queryPlus.run(appenderator, responseContext); } }; } diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index f161e9a064b..2cb88853b11 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -43,6 +43,7 @@ import io.druid.query.Query; import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryMetrics; +import io.druid.query.QueryPlus; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; @@ -227,7 +228,7 @@ public class QueryResource implements QueryCountStatsProvider } final Map responseContext = new MapMaker().makeMap(); - final Sequence res = query.run(texasRanger, responseContext); + final Sequence res = QueryPlus.wrap(query).run(texasRanger, responseContext); if (prevEtag != null && prevEtag.equals(responseContext.get(HDR_ETAG))) { return Response.notModified().build(); diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index b81f1ee1b9a..54486c9c578 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -71,6 +71,7 @@ import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; @@ -1781,7 +1782,7 @@ public class CachingClusteredClientTest timeline.add(interval2, "v", new StringPartitionChunk<>("d", null, 5, selector5)); timeline.add(interval3, "v", new StringPartitionChunk<>(null, null, 6, selector6)); - final Capture capture = Capture.newInstance(); + final Capture capture = Capture.newInstance(); final Capture> contextCap = Capture.newInstance(); QueryRunner mockRunner = EasyMock.createNiceMock(QueryRunner.class); @@ -1801,12 +1802,9 @@ public class CachingClusteredClientTest descriptors.add(new SegmentDescriptor(interval3, "v", 6)); MultipleSpecificSegmentSpec expected = new MultipleSpecificSegmentSpec(descriptors); - Sequences.toList(runner.run( - query, - context - ), Lists.newArrayList()); + Sequences.toList(runner.run(QueryPlus.wrap(query), context), Lists.newArrayList()); - Assert.assertEquals(expected, capture.getValue().getQuerySegmentSpec()); + Assert.assertEquals(expected, ((TimeseriesQuery) capture.getValue().getQuery()).getQuerySegmentSpec()); } private ServerSelector makeMockSingleDimensionSelector( @@ -1923,7 +1921,7 @@ public class CachingClusteredClientTest .andReturn(expectations.getQueryRunner()) .times(0, 1); - final Capture capture = new Capture(); + final Capture capture = new Capture(); final Capture context = new Capture(); QueryRunner queryable = expectations.getQueryRunner(); @@ -1940,7 +1938,7 @@ public class CachingClusteredClientTest @Override public Sequence answer() throws Throwable { - return toFilteredQueryableTimeseriesResults((TimeseriesQuery)capture.getValue(), segmentIds, queryIntervals, results); + return toFilteredQueryableTimeseriesResults((TimeseriesQuery)capture.getValue().getQuery(), segmentIds, queryIntervals, results); } }) .times(0, 1); @@ -1965,10 +1963,12 @@ public class CachingClusteredClientTest TestHelper.assertExpectedResults( expected, runner.run( - query.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec( - ImmutableList.of( - actualQueryInterval + QueryPlus.wrap( + query.withQuerySegmentSpec( + new MultipleIntervalSegmentSpec( + ImmutableList.of( + actualQueryInterval + ) ) ) ), @@ -2062,7 +2062,7 @@ public class CachingClusteredClientTest .andReturn(expectations.getQueryRunner()) .once(); - final Capture capture = new Capture(); + final Capture capture = new Capture(); final Capture context = new Capture(); queryCaptures.add(capture); QueryRunner queryable = expectations.getQueryRunner(); @@ -2210,7 +2210,8 @@ public class CachingClusteredClientTest // make sure all the queries were sent down as 'bySegment' for (Capture queryCapture : queryCaptures) { - Query capturedQuery = (Query) queryCapture.getValue(); + QueryPlus capturedQueryPlus = (QueryPlus) queryCapture.getValue(); + Query capturedQuery = capturedQueryPlus.getQuery(); if (expectBySegment) { Assert.assertEquals(true, capturedQuery.getContextValue("bySegment")); } else { diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java index af57a5e97ac..d2e2d365c91 100644 --- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java @@ -39,6 +39,7 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.query.CacheStrategy; import io.druid.query.Druids; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; @@ -269,7 +270,7 @@ public class CachingQueryRunnerTest new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { return resultSeq; } @@ -362,7 +363,7 @@ public class CachingQueryRunnerTest new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { return Sequences.empty(); } diff --git a/server/src/test/java/io/druid/server/QueryResourceTest.java b/server/src/test/java/io/druid/server/QueryResourceTest.java index f8ca466b97a..3a4573f7cc9 100644 --- a/server/src/test/java/io/druid/server/QueryResourceTest.java +++ b/server/src/test/java/io/druid/server/QueryResourceTest.java @@ -32,6 +32,7 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.query.DefaultGenericQueryMetricsFactory; import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; @@ -95,11 +96,9 @@ public class QueryResourceTest return new QueryRunner() { @Override - public Sequence run( - Query query, Map responseContext - ) + public Sequence run(QueryPlus query, Map responseContext) { - return Sequences.empty(); + return Sequences.empty(); } }; } diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 7656e7583af..502156e5e90 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -47,6 +47,7 @@ import io.druid.query.Druids; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; import io.druid.query.QueryMetrics; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -679,9 +680,9 @@ public class ServerManagerTest } @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - return new BlockingSequence(runner.run(query, responseContext), waitLatch, waitYieldLatch, notifyLatch); + return new BlockingSequence<>(runner.run(queryPlus, responseContext), waitLatch, waitYieldLatch, notifyLatch); } } diff --git a/services/src/main/java/io/druid/cli/DumpSegment.java b/services/src/main/java/io/druid/cli/DumpSegment.java index fa14574cfca..8901a440190 100644 --- a/services/src/main/java/io/druid/cli/DumpSegment.java +++ b/services/src/main/java/io/druid/cli/DumpSegment.java @@ -51,6 +51,7 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.logger.Logger; import io.druid.query.DruidProcessingConfig; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -471,7 +472,7 @@ public class DumpSegment extends GuiceRunnable final QueryRunner runner = factory.createRunner(new QueryableIndexSegment("segment", index)); final Sequence results = factory.getToolchest().mergeResults( factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.of(runner)) - ).run(query, Maps.newHashMap()); + ).run(QueryPlus.wrap(query), Maps.newHashMap()); return (Sequence) results; } diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java b/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java index 5cbf91d4b18..c0eba66180d 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java @@ -32,6 +32,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.DataSource; import io.druid.query.QueryDataSource; +import io.druid.query.QueryPlus; import io.druid.query.QuerySegmentWalker; import io.druid.query.Result; import io.druid.query.dimension.DimensionSpec; @@ -193,7 +194,7 @@ public class QueryMaker return Sequences.concat( Sequences.map( - queryWithPagination.run(walker, Maps.newHashMap()), + QueryPlus.wrap(queryWithPagination).run(walker, Maps.newHashMap()), new Function, Sequence>() { @Override @@ -264,7 +265,7 @@ public class QueryMaker Hook.QUERY_PLAN.run(query); return Sequences.map( - query.run(walker, Maps.newHashMap()), + QueryPlus.wrap(query).run(walker, Maps.newHashMap()), new Function, Object[]>() { @Override @@ -299,7 +300,7 @@ public class QueryMaker return Sequences.concat( Sequences.map( - query.run(walker, Maps.newHashMap()), + QueryPlus.wrap(query).run(walker, Maps.newHashMap()), new Function, Sequence>() { @Override @@ -335,7 +336,7 @@ public class QueryMaker Hook.QUERY_PLAN.run(query); return Sequences.map( - query.run(walker, Maps.newHashMap()), + QueryPlus.wrap(query).run(walker, Maps.newHashMap()), new Function() { @Override diff --git a/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java index 38f82c8213c..e5cd0da46c1 100644 --- a/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java @@ -42,6 +42,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; +import io.druid.query.QueryPlus; import io.druid.query.QuerySegmentWalker; import io.druid.query.TableDataSource; import io.druid.query.metadata.metadata.ColumnAnalysis; @@ -305,7 +306,7 @@ public class DruidSchema extends AbstractSchema true ); - final Sequence sequence = segmentMetadataQuery.run(walker, Maps.newHashMap()); + final Sequence sequence = QueryPlus.wrap(segmentMetadataQuery).run(walker, Maps.newHashMap()); final List results = Sequences.toList(sequence, Lists.newArrayList()); if (results.isEmpty()) { return null; diff --git a/sql/src/test/java/io/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/io/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java index c9b88af530a..a96596555d6 100644 --- a/sql/src/test/java/io/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java +++ b/sql/src/test/java/io/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java @@ -32,6 +32,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -110,8 +111,9 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { + Query query = queryPlus.getQuery(); final VersionedIntervalTimeline timeline = getTimelineForTableDataSource(query); return makeBaseRunner( query, @@ -154,7 +156,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C } } ) - ).run(query, responseContext); + ).run(queryPlus, responseContext); } } )