From f0f68570ec1e897348ab7c2825c7063e1330355e Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 23 Jan 2020 14:07:14 -0800 Subject: [PATCH] Use DataSourceAnalysis throughout the query stack. (#9239) Builds on #9235, using the datasource analysis functionality to replace various ad-hoc approaches. The most interesting changes are in ClientQuerySegmentWalker (brokers), ServerManager (historicals), and SinkQuerySegmentWalker (indexing tasks). Other changes related to improving how we analyze queries: 1) Changes TimelineServerView to return an Optional timeline, which I thought made the analysis changes cleaner to implement. 2) Added QueryToolChest#canPerformSubquery, which is now used by query entry points to determine whether it is safe to pass a subquery dataSource to the query toolchest. Fixes an issue introduced in #5471 where subqueries under non-groupBy-typed queries were silently ignored, since neither the query entry point nor the toolchest did anything special with them. 3) Removes the QueryPlus.withQuerySegmentSpec method, which was mostly being used in error-prone ways (ignoring any potential subqueries, and not verifying that the underlying data source is actually a table). Replaces with a new function, Queries.withSpecificSegments, that includes sanity checks. --- .../CachingClusteredClientBenchmark.java | 30 ++- .../materializedview/DataSourceOptimizer.java | 37 ++-- .../movingaverage/MovingAverageQueryTest.java | 7 +- .../overlord/SingleTaskBackgroundRunner.java | 10 +- .../org/apache/druid/query/BaseQuery.java | 15 +- .../java/org/apache/druid/query/Queries.java | 48 ++++- .../java/org/apache/druid/query/Query.java | 16 +- .../org/apache/druid/query/QueryPlus.java | 9 - .../druid/query/QuerySegmentWalker.java | 17 +- .../apache/druid/query/QueryToolChest.java | 14 ++ .../apache/druid/query/RetryQueryRunner.java | 7 +- .../apache/druid/query/TimewarpOperator.java | 7 +- .../druid/query/filter/DimFilterUtils.java | 29 ++- .../groupby/GroupByQueryQueryToolChest.java | 20 ++ .../spec/MultipleSpecificSegmentSpec.java | 16 +- .../spec/SpecificSegmentQueryRunner.java | 10 +- .../org/apache/druid/query/QueriesTest.java | 121 +++++++++++ .../druid/query/QueryRunnerTestHelper.java | 14 +- .../query/filter/DimFilterUtilsTest.java | 2 +- .../GroupByQueryQueryToolChestTest.java | 60 ++++++ .../query/groupby/GroupByQueryRunnerTest.java | 108 ++++++---- .../query/search/SearchQueryRunnerTest.java | 13 +- .../apache/druid/client/BrokerServerView.java | 18 +- .../druid/client/CachingClusteredClient.java | 53 +++-- .../apache/druid/client/ServerViewUtil.java | 9 +- .../druid/client/TimelineServerView.java | 17 +- .../appenderator/SinkQuerySegmentWalker.java | 68 +++---- .../druid/server/ClientInfoResource.java | 17 +- .../server/ClientQuerySegmentWalker.java | 29 ++- .../apache/druid/server/SegmentManager.java | 28 ++- .../server/coordination/ServerManager.java | 192 ++++++------------ .../druid/client/BrokerServerViewTest.java | 13 +- ...chingClusteredClientFunctionalityTest.java | 9 +- .../client/CachingClusteredClientTest.java | 7 +- .../druid/server/ClientInfoResourceTest.java | 6 +- .../druid/server/SegmentManagerTest.java | 13 +- .../druid/sql/calcite/rel/QueryMaker.java | 18 +- .../calcite/util/TestServerInventoryView.java | 5 +- 38 files changed, 709 insertions(+), 403 deletions(-) diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index d7b8b63bedf..72fefa93946 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -25,7 +25,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo; import org.apache.druid.benchmark.datagen.BenchmarkSchemas; @@ -58,7 +57,6 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.BySegmentQueryRunner; -import org.apache.druid.query.DataSource; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Druids; @@ -89,6 +87,7 @@ import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; import org.apache.druid.query.groupby.strategy.GroupByStrategyV1; import org.apache.druid.query.groupby.strategy.GroupByStrategyV2; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; @@ -126,7 +125,6 @@ import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; -import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; @@ -134,6 +132,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; @@ -217,8 +216,17 @@ public class CachingClusteredClientBenchmark .size(0) .build(); final SegmentGenerator segmentGenerator = closer.register(new SegmentGenerator()); - LOG.info("Starting benchmark setup using cacheDir[%s], rows[%,d].", segmentGenerator.getCacheDir(), rowsPerSegment); - final QueryableIndex index = segmentGenerator.generate(dataSegment, schemaInfo, Granularities.NONE, rowsPerSegment); + LOG.info( + "Starting benchmark setup using cacheDir[%s], rows[%,d].", + segmentGenerator.getCacheDir(), + rowsPerSegment + ); + final QueryableIndex index = segmentGenerator.generate( + dataSegment, + schemaInfo, + Granularities.NONE, + rowsPerSegment + ); queryableIndexes.put(dataSegment, index); } @@ -518,12 +526,10 @@ public class CachingClusteredClientBenchmark .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector)); } - @Nullable @Override - public TimelineLookup getTimeline(DataSource dataSource) + public Optional> getTimeline(DataSourceAnalysis analysis) { - final String table = Iterables.getOnlyElement(dataSource.getTableNames()); - return timelines.get(table); + return Optional.ofNullable(timelines.get(analysis.getBaseTableDataSource().get().getName())); } @Override @@ -563,7 +569,11 @@ public class CachingClusteredClientBenchmark private final QueryRunnerFactoryConglomerate conglomerate; private final QueryableIndexSegment segment; - public SimpleQueryRunner(QueryRunnerFactoryConglomerate conglomerate, SegmentId segmentId, QueryableIndex queryableIndex) + public SimpleQueryRunner( + QueryRunnerFactoryConglomerate conglomerate, + SegmentId segmentId, + QueryableIndex queryableIndex + ) { this.conglomerate = conglomerate; this.segment = new QueryableIndexSegment(queryableIndex, segmentId); diff --git a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java index bbd6aa8a4f2..1653539f3e3 100644 --- a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java +++ b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java @@ -23,9 +23,11 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedSet; import com.google.inject.Inject; import org.apache.druid.client.TimelineServerView; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.Query; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.topn.TopNQuery; @@ -54,24 +56,24 @@ public class DataSourceOptimizer private ConcurrentHashMap hitCount = new ConcurrentHashMap<>(); private ConcurrentHashMap costTime = new ConcurrentHashMap<>(); private ConcurrentHashMap, AtomicLong>> missFields = new ConcurrentHashMap<>(); - + @Inject - public DataSourceOptimizer(TimelineServerView serverView) + public DataSourceOptimizer(TimelineServerView serverView) { this.serverView = serverView; } /** * Do main work about materialized view selection: transform user query to one or more sub-queries. - * - * In the sub-query, the dataSource is the derivative of dataSource in user query, and sum of all sub-queries' + * + * In the sub-query, the dataSource is the derivative of dataSource in user query, and sum of all sub-queries' * intervals equals the interval in user query - * + * * Derived dataSource with smallest average data size per segment granularity have highest priority to replace the * datasource in user query - * + * * @param query only TopNQuery/TimeseriesQuery/GroupByQuery can be optimized - * @return a list of queries with specified derived dataSources and intervals + * @return a list of queries with specified derived dataSources and intervals */ public List optimize(Query query) { @@ -86,7 +88,7 @@ public class DataSourceOptimizer // get all derivatives for datasource in query. The derivatives set is sorted by average size of // per segment granularity. Set derivatives = DerivativeDataSourceManager.getDerivatives(datasourceName); - + if (derivatives.isEmpty()) { return Collections.singletonList(query); } @@ -96,10 +98,10 @@ public class DataSourceOptimizer hitCount.putIfAbsent(datasourceName, new AtomicLong(0)); costTime.putIfAbsent(datasourceName, new AtomicLong(0)); totalCount.get(datasourceName).incrementAndGet(); - + // get all fields which the query required Set requiredFields = MaterializedViewUtils.getRequiredFields(query); - + Set derivativesWithRequiredFields = new HashSet<>(); for (DerivativeDataSource derivativeDataSource : derivatives) { derivativesHitCount.putIfAbsent(derivativeDataSource.getName(), new AtomicLong(0)); @@ -115,14 +117,15 @@ public class DataSourceOptimizer costTime.get(datasourceName).addAndGet(System.currentTimeMillis() - start); return Collections.singletonList(query); } - + List queries = new ArrayList<>(); List remainingQueryIntervals = (List) query.getIntervals(); - + for (DerivativeDataSource derivativeDataSource : ImmutableSortedSet.copyOf(derivativesWithRequiredFields)) { final List derivativeIntervals = remainingQueryIntervals.stream() .flatMap(interval -> serverView - .getTimeline((new TableDataSource(derivativeDataSource.getName()))) + .getTimeline(DataSourceAnalysis.forDataSource(new TableDataSource(derivativeDataSource.getName()))) + .orElseThrow(() -> new ISE("No timeline for dataSource: %s", derivativeDataSource.getName())) .lookup(interval) .stream() .map(TimelineObjectHolder::getInterval) @@ -133,7 +136,7 @@ public class DataSourceOptimizer if (derivativeIntervals.isEmpty()) { continue; } - + remainingQueryIntervals = MaterializedViewUtils.minus(remainingQueryIntervals, derivativeIntervals); queries.add( query.withDataSource(new TableDataSource(derivativeDataSource.getName())) @@ -158,13 +161,13 @@ public class DataSourceOptimizer hitCount.get(datasourceName).incrementAndGet(); costTime.get(datasourceName).addAndGet(System.currentTimeMillis() - start); return queries; - } + } finally { lock.readLock().unlock(); } } - public List getAndResetStats() + public List getAndResetStats() { ImmutableMap derivativesHitCountSnapshot; ImmutableMap totalCountSnapshot; @@ -183,7 +186,7 @@ public class DataSourceOptimizer hitCount.clear(); costTime.clear(); missFields.clear(); - } + } finally { lock.writeLock().unlock(); } diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 83881c79cf9..9090bfe168d 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -49,7 +49,6 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.query.DataSource; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; @@ -62,6 +61,7 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.movingaverage.test.TestConfig; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesResultValue; import org.apache.druid.server.ClientQuerySegmentWalker; @@ -84,6 +84,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; @@ -305,9 +306,9 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest new TimelineServerView() { @Override - public TimelineLookup getTimeline(DataSource dataSource) + public Optional> getTimeline(DataSourceAnalysis analysis) { - return null; + return Optional.empty(); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java index ecb5d9ab03d..7e7fafac3e8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java @@ -22,7 +22,6 @@ package org.apache.druid.indexing.overlord; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -51,6 +50,7 @@ import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.DruidNode; import org.apache.druid.server.SetAndVerifyContextQueryRunner; import org.apache.druid.server.initialization.ServerConfig; @@ -328,11 +328,13 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke private QueryRunner getQueryRunnerImpl(Query query) { QueryRunner queryRunner = null; - final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getTableNames()); if (runningItem != null) { + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); final Task task = runningItem.getTask(); - if (task.getDataSource().equals(queryDataSource)) { + + if (analysis.getBaseTableDataSource().isPresent() + && task.getDataSource().equals(analysis.getBaseTableDataSource().get().getName())) { final QueryRunner taskQueryRunner = task.getQueryRunner(query); if (taskQueryRunner != null) { @@ -379,7 +381,7 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke { return task.getType(); } - + @Override public String getDataSource() { diff --git a/processing/src/main/java/org/apache/druid/query/BaseQuery.java b/processing/src/main/java/org/apache/druid/query/BaseQuery.java index cc2cd6df6b4..70fbdf99270 100644 --- a/processing/src/main/java/org/apache/druid/query/BaseQuery.java +++ b/processing/src/main/java/org/apache/druid/query/BaseQuery.java @@ -28,6 +28,7 @@ import org.apache.druid.guice.annotations.ExtensionPoint; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.PeriodGranularity; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.QuerySegmentSpec; import org.joda.time.DateTimeZone; import org.joda.time.Duration; @@ -117,17 +118,11 @@ public abstract class BaseQuery implements Query } @VisibleForTesting - public static QuerySegmentSpec getQuerySegmentSpecForLookUp(BaseQuery query) + public static QuerySegmentSpec getQuerySegmentSpecForLookUp(BaseQuery query) { - if (query.getDataSource() instanceof QueryDataSource) { - QueryDataSource ds = (QueryDataSource) query.getDataSource(); - Query subquery = ds.getQuery(); - if (subquery instanceof BaseQuery) { - return getQuerySegmentSpecForLookUp((BaseQuery) subquery); - } - throw new IllegalStateException("Invalid subquery type " + subquery.getClass()); - } - return query.getQuerySegmentSpec(); + return DataSourceAnalysis.forDataSource(query.getDataSource()) + .getBaseQuerySegmentSpec() + .orElse(query.getQuerySegmentSpec()); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/Queries.java b/processing/src/main/java/org/apache/druid/query/Queries.java index 37408a4aea3..a2ff0051bf9 100644 --- a/processing/src/main/java/org/apache/druid/query/Queries.java +++ b/processing/src/main/java/org/apache/druid/query/Queries.java @@ -23,8 +23,11 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.druid.guice.annotations.PublicApi; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import java.util.Collections; import java.util.HashMap; @@ -33,9 +36,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -/** - * - */ @PublicApi public class Queries { @@ -131,4 +131,46 @@ public class Queries return postAggs; } + + /** + * Rewrite "query" to refer to some specific segment descriptors. + * + * The dataSource for "query" must be based on a single table for this operation to be valid. Otherwise, this + * function will throw an exception. + * + * Unlike the seemingly-similar {@code query.withQuerySegmentSpec(new MultipleSpecificSegmentSpec(descriptors))}, + * this this method will walk down subqueries found within the query datasource, if any, and modify the lowest-level + * subquery. The effect is that + * {@code DataSourceAnalysis.forDataSource(query.getDataSource()).getBaseQuerySegmentSpec()} is guaranteed to return + * either {@code new MultipleSpecificSegmentSpec(descriptors)} or empty. + * + * Because {@link BaseQuery#getRunner} is implemented using {@link DataSourceAnalysis#getBaseQuerySegmentSpec}, this + * method will cause the runner to be a specific-segments runner. + */ + public static Query withSpecificSegments(final Query query, final List descriptors) + { + final Query retVal; + + if (query.getDataSource() instanceof QueryDataSource) { + final Query subQuery = ((QueryDataSource) query.getDataSource()).getQuery(); + retVal = query.withDataSource(new QueryDataSource(withSpecificSegments(subQuery, descriptors))); + } else { + retVal = query.withQuerySegmentSpec(new MultipleSpecificSegmentSpec(descriptors)); + } + + // Verify preconditions and invariants, just in case. + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(retVal.getDataSource()); + + if (!analysis.getBaseTableDataSource().isPresent()) { + throw new ISE("Unable to apply specific segments to non-table-based dataSource[%s]", query.getDataSource()); + } + + if (analysis.getBaseQuerySegmentSpec().isPresent() + && !analysis.getBaseQuerySegmentSpec().get().equals(new MultipleSpecificSegmentSpec(descriptors))) { + // If you see the error message below, it's a bug in either this function or in DataSourceAnalysis. + throw new ISE("Unable to apply specific segments to query with dataSource[%s]", query.getDataSource()); + } + + return retVal; + } } diff --git a/processing/src/main/java/org/apache/druid/query/Query.java b/processing/src/main/java/org/apache/druid/query/Query.java index 387509839bb..37868b4a25d 100644 --- a/processing/src/main/java/org/apache/druid/query/Query.java +++ b/processing/src/main/java/org/apache/druid/query/Query.java @@ -116,6 +116,12 @@ public interface Query Query withOverriddenContext(Map contextOverride); + /** + * Returns a new query, identical to this one, but with a different associated {@link QuerySegmentSpec}. + * + * This often changes the behavior of {@link #getRunner(QuerySegmentWalker)}, since most queries inherit that method + * from {@link BaseQuery}, which implements it by calling {@link QuerySegmentSpec#lookup}. + */ Query withQuerySegmentSpec(QuerySegmentSpec spec); Query withId(String id); @@ -140,14 +146,4 @@ public interface Query { return this; } - - default List getIntervalsOfInnerMostQuery() - { - if (getDataSource() instanceof QueryDataSource) { - //noinspection unchecked - return ((QueryDataSource) getDataSource()).getQuery().getIntervalsOfInnerMostQuery(); - } else { - return getIntervals(); - } - } } diff --git a/processing/src/main/java/org/apache/druid/query/QueryPlus.java b/processing/src/main/java/org/apache/druid/query/QueryPlus.java index f1884d35624..1b18e943909 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryPlus.java +++ b/processing/src/main/java/org/apache/druid/query/QueryPlus.java @@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.query.spec.QuerySegmentSpec; import javax.annotation.Nullable; @@ -125,14 +124,6 @@ public final class QueryPlus } } - /** - * Equivalent of withQuery(getQuery().withQuerySegmentSpec(spec)). - */ - public QueryPlus withQuerySegmentSpec(QuerySegmentSpec spec) - { - return new QueryPlus<>(query.withQuerySegmentSpec(spec), queryMetrics, identity); - } - /** * Equivalent of withQuery(getQuery().withOverriddenContext(ImmutableMap.of(MAX_QUEUED_BYTES_KEY, maxQueuedBytes))). */ diff --git a/processing/src/main/java/org/apache/druid/query/QuerySegmentWalker.java b/processing/src/main/java/org/apache/druid/query/QuerySegmentWalker.java index 970fb6e28c0..7084a80935d 100644 --- a/processing/src/main/java/org/apache/druid/query/QuerySegmentWalker.java +++ b/processing/src/main/java/org/apache/druid/query/QuerySegmentWalker.java @@ -22,6 +22,7 @@ package org.apache.druid.query; import org.joda.time.Interval; /** + * An interface for query-handling entry points. */ public interface QuerySegmentWalker { @@ -29,19 +30,27 @@ public interface QuerySegmentWalker * Gets the Queryable for a given interval, the Queryable returned can be any version(s) or partitionNumber(s) * such that it represents the interval. * - * @param query result type - * @param query the query to find a Queryable for + * @param query result type + * @param query the query to find a Queryable for * @param intervals the intervals to find a Queryable for + * * @return a Queryable object that represents the interval */ QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals); /** - * Gets the Queryable for a given list of SegmentSpecs. + * Gets the Queryable for a given list of SegmentDescriptors. * - * @param the query result type + * The descriptors are expected to apply to the base datasource involved in the query, i.e. the one returned by: + * + *
+   *   DataSourceAnalysis.forDataSource(query.getDataSource()).getBaseDataSource()
+   * 
+ * + * @param the query result type * @param query the query to return a Queryable for * @param specs the list of SegmentSpecs to find a Queryable for + * * @return the Queryable object with the given SegmentSpecs */ QueryRunner getQueryRunnerForSegments(Query query, Iterable specs); diff --git a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java index c271af8e4d5..b72d9d76eb4 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java @@ -271,6 +271,20 @@ public abstract class QueryToolChest subquery) + { + return false; + } + /** * Returns a list of field names in the order that {@link #resultsAsArrays} would return them. The returned list will * be the same length as each array returned by {@link #resultsAsArrays}. diff --git a/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java b/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java index 6b991b87057..fa337d04789 100644 --- a/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java @@ -30,7 +30,6 @@ import org.apache.druid.java.util.common.guava.YieldingAccumulator; import org.apache.druid.java.util.common.guava.YieldingSequenceBase; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.segment.SegmentMissingException; import java.util.ArrayList; @@ -73,10 +72,8 @@ public class RetryQueryRunner implements QueryRunner log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i); context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); - final QueryPlus retryQueryPlus = queryPlus.withQuerySegmentSpec( - new MultipleSpecificSegmentSpec( - missingSegments - ) + final QueryPlus retryQueryPlus = queryPlus.withQuery( + Queries.withSpecificSegments(queryPlus.getQuery(), missingSegments) ); Sequence retrySequence = baseRunner.run(retryQueryPlus, context); listOfSequences.add(retrySequence); diff --git a/processing/src/main/java/org/apache/druid/query/TimewarpOperator.java b/processing/src/main/java/org/apache/druid/query/TimewarpOperator.java index 0beca6849a4..88c88b32489 100644 --- a/processing/src/main/java/org/apache/druid/query/TimewarpOperator.java +++ b/processing/src/main/java/org/apache/druid/query/TimewarpOperator.java @@ -92,8 +92,11 @@ public class TimewarpOperator implements PostProcessingOperator ); return Sequences.map( baseRunner.run( - queryPlus.withQuerySegmentSpec(new MultipleIntervalSegmentSpec( - Collections.singletonList(modifiedInterval))), + queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(modifiedInterval)) + ) + ), responseContext ), new Function() diff --git a/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java b/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java index 03966e94444..00a84fc92d6 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java +++ b/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java @@ -20,7 +20,6 @@ package org.apache.druid.query.filter; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.collect.RangeSet; import org.apache.druid.timeline.partition.ShardSpec; @@ -29,9 +28,11 @@ import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; /** + * */ public class DimFilterUtils { @@ -87,14 +88,15 @@ public class DimFilterUtils * {@link #filterShards(DimFilter, Iterable, Function, Map)} instead with a cached map * * @param dimFilter The filter to use - * @param input The iterable of objects to be filtered + * @param input The iterable of objects to be filtered * @param converter The function to convert T to ShardSpec that can be filtered by - * @param This can be any type, as long as transform function is provided to convert this to ShardSpec + * @param This can be any type, as long as transform function is provided to convert this to ShardSpec + * * @return The set of filtered object, in the same order as input */ public static Set filterShards(DimFilter dimFilter, Iterable input, Function converter) { - return filterShards(dimFilter, input, converter, new HashMap>>()); + return filterShards(dimFilter, input, converter, new HashMap<>()); } /** @@ -106,15 +108,20 @@ public class DimFilterUtils * between calls with the same dimFilter to save redundant calls of {@link DimFilter#getDimensionRangeSet(String)} * on same dimensions. * - * @param dimFilter The filter to use - * @param input The iterable of objects to be filtered - * @param converter The function to convert T to ShardSpec that can be filtered by + * @param dimFilter The filter to use + * @param input The iterable of objects to be filtered + * @param converter The function to convert T to ShardSpec that can be filtered by * @param dimensionRangeCache The cache of RangeSets of different dimensions for the dimFilter - * @param This can be any type, as long as transform function is provided to convert this to ShardSpec + * @param This can be any type, as long as transform function is provided to convert this to ShardSpec + * * @return The set of filtered object, in the same order as input */ - public static Set filterShards(DimFilter dimFilter, Iterable input, Function converter, - Map>> dimensionRangeCache) + public static Set filterShards( + final DimFilter dimFilter, + final Iterable input, + final Function converter, + final Map>> dimensionRangeCache + ) { Set retSet = new LinkedHashSet<>(); @@ -127,7 +134,7 @@ public class DimFilterUtils List dimensions = shard.getDomainDimensions(); for (String dimension : dimensions) { Optional> optFilterRangeSet = dimensionRangeCache - .computeIfAbsent(dimension, d -> Optional.fromNullable(dimFilter.getDimensionRangeSet(d))); + .computeIfAbsent(dimension, d -> Optional.ofNullable(dimFilter.getDimensionRangeSet(d))); if (optFilterRangeSet.isPresent()) { filterDomain.put(dimension, optFilterRangeSet.get()); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index acfe77688e6..ebae09f184f 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -663,6 +663,26 @@ public class GroupByQueryQueryToolChest extends QueryToolChest subquery) + { + Query current = subquery; + + while (current != null) { + if (!(current instanceof GroupByQuery)) { + return false; + } + + if (current.getDataSource() instanceof QueryDataSource) { + current = ((QueryDataSource) current.getDataSource()).getQuery(); + } else { + current = null; + } + } + + return true; + } + @Override public List resultArrayFields(final GroupByQuery query) { diff --git a/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java b/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java index 34a458d2aaf..5d0c853823a 100644 --- a/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java +++ b/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java @@ -30,6 +30,7 @@ import org.apache.druid.query.SegmentDescriptor; import org.joda.time.Interval; import java.util.List; +import java.util.Objects; /** */ @@ -93,24 +94,13 @@ public class MultipleSpecificSegmentSpec implements QuerySegmentSpec if (o == null || getClass() != o.getClass()) { return false; } - MultipleSpecificSegmentSpec that = (MultipleSpecificSegmentSpec) o; - - if (descriptors != null ? !descriptors.equals(that.descriptors) : that.descriptors != null) { - return false; - } - if (intervals != null ? !intervals.equals(that.intervals) : that.intervals != null) { - return false; - } - - return true; + return Objects.equals(descriptors, that.descriptors); } @Override public int hashCode() { - int result = descriptors != null ? descriptors.hashCode() : 0; - result = 31 * result + (intervals != null ? intervals.hashCode() : 0); - return result; + return Objects.hash(descriptors); } } diff --git a/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java b/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java index 625f0325229..0cea0dbf325 100644 --- a/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.java.util.common.guava.YieldingAccumulator; +import org.apache.druid.query.Queries; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; @@ -38,6 +39,7 @@ import java.io.IOException; import java.util.Collections; /** + * */ public class SpecificSegmentQueryRunner implements QueryRunner { @@ -56,7 +58,13 @@ public class SpecificSegmentQueryRunner implements QueryRunner @Override public Sequence run(final QueryPlus input, final ResponseContext responseContext) { - final QueryPlus queryPlus = input.withQuerySegmentSpec(specificSpec); + final QueryPlus queryPlus = input.withQuery( + Queries.withSpecificSegments( + input.getQuery(), + Collections.singletonList(specificSpec.getDescriptor()) + ) + ); + final Query query = queryPlus.getQuery(); final Thread currThread = Thread.currentThread(); diff --git a/processing/src/test/java/org/apache/druid/query/QueriesTest.java b/processing/src/test/java/org/apache/druid/query/QueriesTest.java index fd5abf23829..16c4783619f 100644 --- a/processing/src/test/java/org/apache/druid/query/QueriesTest.java +++ b/processing/src/test/java/org/apache/druid/query/QueriesTest.java @@ -20,6 +20,8 @@ package org.apache.druid.query; import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; @@ -27,17 +29,26 @@ import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.ConstantPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.timeseries.TimeseriesResultValue; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.util.Arrays; import java.util.Collections; import java.util.List; /** + * */ public class QueriesTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Test public void testVerifyAggregations() { @@ -209,4 +220,114 @@ public class QueriesTest Assert.assertTrue(exceptionOccured); } + + @Test + public void testWithSpecificSegmentsBasic() + { + final ImmutableList descriptors = ImmutableList.of( + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 0), + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 1) + ); + + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals( + new MultipleSpecificSegmentSpec( + ImmutableList.of( + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 0), + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 1) + ) + ) + ) + .granularity(Granularities.ALL) + .build(), + Queries.withSpecificSegments( + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(), + descriptors + ) + ); + } + + @Test + public void testWithSpecificSegmentsSubQueryStack() + { + final ImmutableList descriptors = ImmutableList.of( + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 0), + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 1) + ); + + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals(new MultipleSpecificSegmentSpec(descriptors)) + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(), + Queries.withSpecificSegments( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(), + descriptors + ) + ); + } + + @Test + public void testWithSpecificSegmentsOnUnionIsAnError() + { + final ImmutableList descriptors = ImmutableList.of( + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 0), + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 1) + ); + + final TimeseriesQuery query = + Druids.newTimeseriesQueryBuilder() + .dataSource(new LookupDataSource("lookyloo")) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("Unable to apply specific segments to non-table-based dataSource"); + + final Query> ignored = Queries.withSpecificSegments(query, descriptors); + } } diff --git a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java index 04f86d8ad24..474f7963c58 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java @@ -471,12 +471,14 @@ public class QueryRunnerTestHelper segments )) { Segment segment = holder.getObject().getChunk(0).getObject(); - QueryPlus queryPlusRunning = queryPlus.withQuerySegmentSpec( - new SpecificSegmentSpec( - new SegmentDescriptor( - holder.getInterval(), - holder.getVersion(), - 0 + QueryPlus queryPlusRunning = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new SpecificSegmentSpec( + new SegmentDescriptor( + holder.getInterval(), + holder.getVersion(), + 0 + ) ) ) ); diff --git a/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java b/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java index b8fa4c2b311..778db21b0a3 100644 --- a/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java +++ b/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java @@ -20,7 +20,6 @@ package org.apache.druid.query.filter; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableRangeSet; import com.google.common.collect.ImmutableSet; @@ -36,6 +35,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; public class DimFilterUtilsTest diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index ef112f408c5..bdb771c0127 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -31,6 +31,8 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.CacheStrategy; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.QueryToolChestTestHelper; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -757,6 +759,64 @@ public class GroupByQueryQueryToolChestTest ); } + @Test + public void testCanPerformSubqueryOnGroupBys() + { + Assert.assertTrue( + new GroupByQueryQueryToolChest(null, null).canPerformSubquery( + new GroupByQuery.Builder() + .setDataSource( + new QueryDataSource( + new GroupByQuery.Builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .setGranularity(Granularities.ALL) + .build() + ) + ) + .setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .setGranularity(Granularities.ALL) + .build() + ) + ); + } + + @Test + public void testCanPerformSubqueryOnTimeseries() + { + Assert.assertFalse( + new GroupByQueryQueryToolChest(null, null).canPerformSubquery( + Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .granularity(Granularities.ALL) + .build() + ) + ); + } + + @Test + public void testCanPerformSubqueryOnGroupByOfTimeseries() + { + Assert.assertFalse( + new GroupByQueryQueryToolChest(null, null).canPerformSubquery( + new GroupByQuery.Builder() + .setDataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .granularity(Granularities.ALL) + .build() + ) + ) + .setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .setGranularity(Granularities.ALL) + .build() + ) + ); + } + private AggregatorFactory getComplexAggregatorFactoryForValueType(final ValueType valueType) { switch (valueType) { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index b8662e01f1d..037b2b75a97 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -2983,11 +2983,15 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return new MergeSequence( queryPlus.getQuery().getResultOrdering(), @@ -3326,11 +3330,15 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return new MergeSequence( queryPlus.getQuery().getResultOrdering(), @@ -4083,11 +4091,15 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return new MergeSequence( queryPlus.getQuery().getResultOrdering(), @@ -4349,11 +4361,15 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return new MergeSequence( queryPlus.getQuery().getResultOrdering(), @@ -4426,11 +4442,15 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return new MergeSequence( queryPlus.getQuery().getResultOrdering(), @@ -9968,11 +9988,15 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return factory.getToolchest().mergeResults( @@ -10034,11 +10058,15 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return factory.getToolchest().mergeResults( @@ -10102,11 +10130,15 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return factory.getToolchest().mergeResults( @@ -10183,11 +10215,15 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return factory.getToolchest().mergeResults( diff --git a/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java index 6b503a54111..aebcf257b3e 100644 --- a/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java @@ -72,6 +72,7 @@ import java.util.Collections; import java.util.List; /** + * */ @RunWith(Parameterized.class) public class SearchQueryRunnerTest extends InitializedNullHandlingTest @@ -167,11 +168,15 @@ public class SearchQueryRunnerTest extends InitializedNullHandlingTest ResponseContext responseContext ) { - final QueryPlus> queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-01-12/2011-02-28"))) + final QueryPlus> queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-01-12/2011-02-28"))) + ) ); - final QueryPlus> queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-03-01/2011-04-15"))) + final QueryPlus> queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-03-01/2011-04-15"))) + ) ); return Sequences.concat(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)); } diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 4b365b7ce89..5bf4ee9c911 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -21,7 +21,6 @@ package org.apache.druid.client; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.inject.Inject; import org.apache.druid.client.selector.QueryableDruidServer; @@ -30,26 +29,28 @@ import org.apache.druid.client.selector.TierSelectorStrategy; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.annotations.EscalatedClient; import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; -import org.apache.druid.query.DataSource; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.QueryWatcher; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; -import javax.annotation.Nullable; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -289,14 +290,15 @@ public class BrokerServerView implements TimelineServerView } } - - @Nullable @Override - public VersionedIntervalTimeline getTimeline(DataSource dataSource) + public Optional> getTimeline(final DataSourceAnalysis analysis) { - String table = Iterables.getOnlyElement(dataSource.getTableNames()); + final TableDataSource tableDataSource = + analysis.getBaseTableDataSource() + .orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource())); + synchronized (lock) { - return timelines.get(table); + return Optional.ofNullable(timelines.get(tableDataSource.getName())); } } diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 9fbc354109c..ba8b3ec0952 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -22,7 +22,6 @@ package org.apache.druid.client; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; @@ -53,6 +52,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.BySegmentResultValueClass; import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.Queries; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryMetrics; @@ -66,7 +66,8 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.aggregation.MetricManipulatorFns; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.filter.DimFilterUtils; -import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.server.QueryResource; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.DataSegment; @@ -88,6 +89,7 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -97,6 +99,7 @@ import java.util.function.UnaryOperator; import java.util.stream.Collectors; /** + * */ public class CachingClusteredClient implements QuerySegmentWalker { @@ -231,6 +234,7 @@ public class CachingClusteredClient implements QuerySegmentWalker private final int uncoveredIntervalsLimit; private final Query downstreamQuery; private final Map cachePopulatorKeyMap = new HashMap<>(); + private final DataSourceAnalysis dataSourceAnalysis; private final List intervals; SpecificQueryRunnable(final QueryPlus queryPlus, final ResponseContext responseContext) @@ -248,8 +252,11 @@ public class CachingClusteredClient implements QuerySegmentWalker // and might blow up in some cases https://github.com/apache/druid/issues/2108 this.uncoveredIntervalsLimit = QueryContexts.getUncoveredIntervalsLimit(query); this.downstreamQuery = query.withOverriddenContext(makeDownstreamQueryContext()); + this.dataSourceAnalysis = DataSourceAnalysis.forDataSource(query.getDataSource()); // For nested queries, we need to look at the intervals of the inner most query. - this.intervals = query.getIntervalsOfInnerMostQuery(); + this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec() + .map(QuerySegmentSpec::getIntervals) + .orElse(query.getIntervals()); } private ImmutableMap makeDownstreamQueryContext() @@ -269,12 +276,14 @@ public class CachingClusteredClient implements QuerySegmentWalker Sequence run(final UnaryOperator> timelineConverter) { - @Nullable - TimelineLookup timeline = serverView.getTimeline(query.getDataSource()); - if (timeline == null) { + final Optional> maybeTimeline = serverView.getTimeline( + dataSourceAnalysis + ); + if (!maybeTimeline.isPresent()) { return Sequences.empty(); } - timeline = timelineConverter.apply(timeline); + + final TimelineLookup timeline = timelineConverter.apply(maybeTimeline.get()); if (uncoveredIntervalsLimit > 0) { computeUncoveredIntervals(timeline); } @@ -598,19 +607,17 @@ public class CachingClusteredClient implements QuerySegmentWalker return; } - final MultipleSpecificSegmentSpec segmentsOfServerSpec = new MultipleSpecificSegmentSpec(segmentsOfServer); - // Divide user-provided maxQueuedBytes by the number of servers, and limit each server to that much. final long maxQueuedBytes = QueryContexts.getMaxQueuedBytes(query, httpClientConfig.getMaxQueuedBytes()); final long maxQueuedBytesPerServer = maxQueuedBytes / segmentsByServer.size(); final Sequence serverResults; if (isBySegment) { - serverResults = getBySegmentServerResults(serverRunner, segmentsOfServerSpec, maxQueuedBytesPerServer); + serverResults = getBySegmentServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer); } else if (!server.segmentReplicatable() || !populateCache) { - serverResults = getSimpleServerResults(serverRunner, segmentsOfServerSpec, maxQueuedBytesPerServer); + serverResults = getSimpleServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer); } else { - serverResults = getAndCacheServerResults(serverRunner, segmentsOfServerSpec, maxQueuedBytesPerServer); + serverResults = getAndCacheServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer); } listOfSequences.add(serverResults); }); @@ -619,13 +626,15 @@ public class CachingClusteredClient implements QuerySegmentWalker @SuppressWarnings("unchecked") private Sequence getBySegmentServerResults( final QueryRunner serverRunner, - final MultipleSpecificSegmentSpec segmentsOfServerSpec, + final List segmentsOfServer, long maxQueuedBytesPerServer ) { Sequence>> resultsBySegments = serverRunner .run( - queryPlus.withQuerySegmentSpec(segmentsOfServerSpec).withMaxQueuedBytes(maxQueuedBytesPerServer), + queryPlus.withQuery( + Queries.withSpecificSegments(queryPlus.getQuery(), segmentsOfServer) + ).withMaxQueuedBytes(maxQueuedBytesPerServer), responseContext ); // bySegment results need to be de-serialized, see DirectDruidClient.run() @@ -640,27 +649,33 @@ public class CachingClusteredClient implements QuerySegmentWalker @SuppressWarnings("unchecked") private Sequence getSimpleServerResults( final QueryRunner serverRunner, - final MultipleSpecificSegmentSpec segmentsOfServerSpec, + final List segmentsOfServer, long maxQueuedBytesPerServer ) { return serverRunner.run( - queryPlus.withQuerySegmentSpec(segmentsOfServerSpec).withMaxQueuedBytes(maxQueuedBytesPerServer), + queryPlus.withQuery( + Queries.withSpecificSegments(queryPlus.getQuery(), segmentsOfServer) + ).withMaxQueuedBytes(maxQueuedBytesPerServer), responseContext ); } private Sequence getAndCacheServerResults( final QueryRunner serverRunner, - final MultipleSpecificSegmentSpec segmentsOfServerSpec, + final List segmentsOfServer, long maxQueuedBytesPerServer ) { @SuppressWarnings("unchecked") final Sequence>> resultsBySegments = serverRunner.run( queryPlus - .withQuery((Query>>) downstreamQuery) - .withQuerySegmentSpec(segmentsOfServerSpec) + .withQuery( + Queries.withSpecificSegments( + (Query>>) downstreamQuery, + segmentsOfServer + ) + ) .withMaxQueuedBytes(maxQueuedBytesPerServer), responseContext ); diff --git a/server/src/main/java/org/apache/druid/client/ServerViewUtil.java b/server/src/main/java/org/apache/druid/client/ServerViewUtil.java index 8a52cacf299..b9f1f91a5d3 100644 --- a/server/src/main/java/org/apache/druid/client/ServerViewUtil.java +++ b/server/src/main/java/org/apache/druid/client/ServerViewUtil.java @@ -24,6 +24,7 @@ import org.apache.druid.query.DataSource; import org.apache.druid.query.LocatedSegmentDescriptor; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.TimelineLookup; import org.apache.druid.timeline.TimelineObjectHolder; @@ -33,6 +34,7 @@ import org.joda.time.Interval; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; /** */ @@ -55,13 +57,14 @@ public class ServerViewUtil int numCandidates ) { - TimelineLookup timeline = serverView.getTimeline(datasource); - if (timeline == null) { + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(datasource); + final Optional> maybeTimeline = serverView.getTimeline(analysis); + if (!maybeTimeline.isPresent()) { return Collections.emptyList(); } List located = new ArrayList<>(); for (Interval interval : intervals) { - for (TimelineObjectHolder holder : timeline.lookup(interval)) { + for (TimelineObjectHolder holder : maybeTimeline.get().lookup(interval)) { for (PartitionChunk chunk : holder.getObject()) { ServerSelector selector = chunk.getObject(); final SegmentDescriptor descriptor = new SegmentDescriptor( diff --git a/server/src/main/java/org/apache/druid/client/TimelineServerView.java b/server/src/main/java/org/apache/druid/client/TimelineServerView.java index ed1d4dfb731..47788234242 100644 --- a/server/src/main/java/org/apache/druid/client/TimelineServerView.java +++ b/server/src/main/java/org/apache/druid/client/TimelineServerView.java @@ -20,22 +20,31 @@ package org.apache.druid.client; import org.apache.druid.client.selector.ServerSelector; -import org.apache.druid.query.DataSource; import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineLookup; -import javax.annotation.Nullable; import java.util.List; +import java.util.Optional; import java.util.concurrent.Executor; /** */ public interface TimelineServerView extends ServerView { - @Nullable - TimelineLookup getTimeline(DataSource dataSource); + /** + * Returns the timeline for a datasource, if it exists. The analysis object passed in must represent a scan-based + * datasource of a single table. + * + * @param analysis data source analysis information + * + * @return timeline, if it exists + * + * @throws IllegalStateException if 'analysis' does not represent a scan-based datasource of a single table + */ + Optional> getTimeline(DataSourceAnalysis analysis); /** * Returns a list of {@link ImmutableDruidServer} diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 451e079b26b..fb7b8067008 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -20,7 +20,6 @@ package org.apache.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import org.apache.druid.client.CachingQueryRunner; @@ -42,6 +41,7 @@ import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.MetricsEmittingQueryRunner; import org.apache.druid.query.NoopQueryRunner; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; @@ -60,16 +60,19 @@ import org.apache.druid.segment.Segment; import org.apache.druid.segment.realtime.FireHydrant; import org.apache.druid.segment.realtime.plumber.Sink; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionHolder; import org.joda.time.Interval; import java.io.Closeable; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; +/** + * Query handler for indexing tasks. + */ public class SinkQuerySegmentWalker implements QuerySegmentWalker { private static final EmittingLogger log = new EmittingLogger(SinkQuerySegmentWalker.class); @@ -118,40 +121,17 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker { final Iterable specs = FunctionalIterable .create(intervals) + .transformCat(sinkTimeline::lookup) .transformCat( - new Function>>() - { - @Override - public Iterable> apply(final Interval interval) - { - return sinkTimeline.lookup(interval); - } - } - ) - .transformCat( - new Function, Iterable>() - { - @Override - public Iterable apply(final TimelineObjectHolder holder) - { - return FunctionalIterable - .create(holder.getObject()) - .transform( - new Function, SegmentDescriptor>() - { - @Override - public SegmentDescriptor apply(final PartitionChunk chunk) - { - return new SegmentDescriptor( - holder.getInterval(), - holder.getVersion(), - chunk.getChunkNumber() - ); - } - } - ); - } - } + holder -> FunctionalIterable + .create(holder.getObject()) + .transform( + chunk -> new SegmentDescriptor( + holder.getInterval(), + holder.getVersion(), + chunk.getChunkNumber() + ) + ) ); return getQueryRunnerForSegments(query, specs); @@ -161,16 +141,15 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker public QueryRunner getQueryRunnerForSegments(final Query query, final Iterable specs) { // We only handle one particular dataSource. Make sure that's what we have, then ignore from here on out. - if (!(query.getDataSource() instanceof TableDataSource) - || !dataSource.equals(((TableDataSource) query.getDataSource()).getName())) { - log.makeAlert("Received query for unknown dataSource") - .addData("dataSource", query.getDataSource()) - .emit(); - return new NoopQueryRunner<>(); + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); + final Optional baseTableDataSource = analysis.getBaseTableDataSource(); + + if (!baseTableDataSource.isPresent() || !dataSource.equals(baseTableDataSource.get().getName())) { + // Report error, since we somehow got a query for a datasource we can't handle. + throw new ISE("Cannot handle datasource: %s", analysis.getDataSource()); } // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. - final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); if (!analysis.getPreJoinableClauses().isEmpty()) { throw new ISE("Cannot handle join dataSource"); } @@ -184,6 +163,11 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker final boolean skipIncrementalSegment = query.getContextValue(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false); final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); + // Make sure this query type can handle the subquery, if present. + if (analysis.isQuery() && !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) { + throw new ISE("Cannot handle subquery: %s", analysis.getDataSource()); + } + Iterable> perSegmentRunners = Iterables.transform( specs, descriptor -> { diff --git a/server/src/main/java/org/apache/druid/server/ClientInfoResource.java b/server/src/main/java/org/apache/druid/server/ClientInfoResource.java index 24f6c4435d2..08032a5efe3 100644 --- a/server/src/main/java/org/apache/druid/server/ClientInfoResource.java +++ b/server/src/main/java/org/apache/druid/server/ClientInfoResource.java @@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.LocatedSegmentDescriptor; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.metadata.SegmentMetadataQueryConfig; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.http.security.DatasourceResourceFilter; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthorizationUtils; @@ -64,12 +65,14 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.stream.Collectors; import java.util.stream.Stream; /** + * */ @Path("/druid/v2/datasources") public class ClientInfoResource @@ -152,12 +155,12 @@ public class ClientInfoResource theInterval = Intervals.of(interval); } - TimelineLookup timeline = timelineServerView.getTimeline(new TableDataSource(dataSourceName)); - Iterable> serversLookup = timeline != null ? timeline.lookup( - theInterval - ) : null; - if (serversLookup == null || Iterables.isEmpty(serversLookup)) { - return Collections.EMPTY_MAP; + final Optional> maybeTimeline = + timelineServerView.getTimeline(DataSourceAnalysis.forDataSource(new TableDataSource(dataSourceName))); + final Optional>> maybeServersLookup = + maybeTimeline.map(timeline -> timeline.lookup(theInterval)); + if (!maybeServersLookup.isPresent() || Iterables.isEmpty(maybeServersLookup.get())) { + return Collections.emptyMap(); } Map servedIntervals = new TreeMap<>( new Comparator() @@ -174,7 +177,7 @@ public class ClientInfoResource } ); - for (TimelineObjectHolder holder : serversLookup) { + for (TimelineObjectHolder holder : maybeServersLookup.get()) { final Set dimensions = new HashSet<>(); final Set metrics = new HashSet<>(); final PartitionHolder partitionHolder = holder.getObject(); diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index ddc15aedef0..b484486c03b 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -43,7 +43,7 @@ import org.apache.druid.server.initialization.ServerConfig; import org.joda.time.Interval; /** - * + * Query handler for Broker processes (see CliBroker). */ public class ClientQuerySegmentWalker implements QuerySegmentWalker { @@ -56,7 +56,6 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker private final Cache cache; private final CacheConfig cacheConfig; - @Inject public ClientQuerySegmentWalker( ServiceEmitter emitter, @@ -82,25 +81,27 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker @Override public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) { - // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); - if (!analysis.getPreJoinableClauses().isEmpty()) { - throw new ISE("Cannot handle join dataSource"); - } - return makeRunner(query, baseClient.getQueryRunnerForIntervals(query, intervals)); + if (analysis.isConcreteTableBased()) { + return makeRunner(query, baseClient.getQueryRunnerForIntervals(query, intervals)); + } else { + // In the future, we will check here to see if parts of the query are inlinable, and if that inlining would + // be able to create a concrete table-based query that we can run through the distributed query stack. + throw new ISE("Query dataSource is not table-based, cannot run"); + } } @Override public QueryRunner getQueryRunnerForSegments(Query query, Iterable specs) { - // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); - if (!analysis.getPreJoinableClauses().isEmpty()) { - throw new ISE("Cannot handle join dataSource"); - } - return makeRunner(query, baseClient.getQueryRunnerForSegments(query, specs)); + if (analysis.isConcreteTableBased()) { + return makeRunner(query, baseClient.getQueryRunnerForSegments(query, specs)); + } else { + throw new ISE("Query dataSource is not table-based, cannot run"); + } } private QueryRunner makeRunner(Query query, QueryRunner baseClientRunner) @@ -126,9 +127,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker { PostProcessingOperator postProcessing = objectMapper.convertValue( query.getContextValue("postProcessing"), - new TypeReference>() - { - } + new TypeReference>() {} ); return new FluentQueryRunnerBuilder<>(toolChest) diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index dbb48474def..45b6538bb61 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -23,7 +23,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Ordering; import com.google.inject.Inject; import org.apache.druid.common.guava.SettableSupplier; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.loading.SegmentLoader; @@ -35,8 +38,8 @@ import org.apache.druid.timeline.partition.PartitionHolder; import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.utils.CollectionUtils; -import javax.annotation.Nullable; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; /** @@ -134,19 +137,30 @@ public class SegmentManager return segmentLoader.isSegmentLoaded(segment); } - @Nullable - public VersionedIntervalTimeline getTimeline(String dataSource) + /** + * Returns the timeline for a datasource, if it exists. The analysis object passed in must represent a scan-based + * datasource of a single table. + * + * @param analysis data source analysis information + * + * @return timeline, if it exists + * + * @throws IllegalStateException if 'analysis' does not represent a scan-based datasource of a single table + */ + public Optional> getTimeline(DataSourceAnalysis analysis) { - final DataSourceState dataSourceState = dataSources.get(dataSource); - return dataSourceState == null ? null : dataSourceState.getTimeline(); + final TableDataSource tableDataSource = + analysis.getBaseTableDataSource() + .orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource())); + + return Optional.ofNullable(dataSources.get(tableDataSource.getName())).map(DataSourceState::getTimeline); } /** * Load a single segment. * * @param segment segment to load - * - * @param lazy whether to lazy load columns metadata + * @param lazy whether to lazy load columns metadata * * @return true if the segment was newly loaded, false if it was already loaded * diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java index 41c71160e5e..d4c672c91f4 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java @@ -20,8 +20,6 @@ package org.apache.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; import com.google.inject.Inject; import org.apache.druid.client.CachingQueryRunner; import org.apache.druid.client.cache.Cache; @@ -35,7 +33,6 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.BySegmentQueryRunner; import org.apache.druid.query.CPUTimeMetricQueryRunner; -import org.apache.druid.query.DataSource; import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.MetricsEmittingQueryRunner; import org.apache.druid.query.NoopQueryRunner; @@ -52,7 +49,6 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.ReferenceCountingSegmentQueryRunner; import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner; import org.apache.druid.query.SegmentDescriptor; -import org.apache.druid.query.TableDataSource; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; @@ -61,17 +57,19 @@ import org.apache.druid.server.SegmentManager; import org.apache.druid.server.SetAndVerifyContextQueryRunner; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionHolder; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.util.Collections; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; +/** + * Query handler for Historical processes (see CliHistorical). + */ public class ServerManager implements QuerySegmentWalker { private static final EmittingLogger log = new EmittingLogger(ServerManager.class); @@ -111,110 +109,49 @@ public class ServerManager implements QuerySegmentWalker this.serverConfig = serverConfig; } - private DataSource getInnerMostDataSource(DataSource dataSource) - { - if (dataSource instanceof QueryDataSource) { - return getInnerMostDataSource(((QueryDataSource) dataSource).getQuery().getDataSource()); - } - return dataSource; - } - @Override public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) { - final QueryRunnerFactory> factory = conglomerate.findFactory(query); - if (factory == null) { - throw new ISE("Unknown query type[%s].", query.getClass()); - } - - // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); if (!analysis.getPreJoinableClauses().isEmpty()) { throw new ISE("Cannot handle join dataSource"); } - final QueryToolChest> toolChest = factory.getToolchest(); - final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); + final VersionedIntervalTimeline timeline; + final Optional> maybeTimeline = + segmentManager.getTimeline(analysis); - DataSource dataSource = getInnerMostDataSource(query.getDataSource()); - if (!(dataSource instanceof TableDataSource)) { - throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported"); - } - String dataSourceName = getDataSourceName(dataSource); - - final VersionedIntervalTimeline timeline = segmentManager.getTimeline( - dataSourceName - ); - - if (timeline == null) { - return new NoopQueryRunner(); + if (maybeTimeline.isPresent()) { + timeline = maybeTimeline.get(); + } else { + // Note: this is not correct when there's a right or full outer join going on. + // See https://github.com/apache/druid/issues/9229 for details. + return new NoopQueryRunner<>(); } - FunctionalIterable> queryRunners = FunctionalIterable + FunctionalIterable segmentDescriptors = FunctionalIterable .create(intervals) + .transformCat(timeline::lookup) .transformCat( - new Function>>() - { - @Override - public Iterable> apply(Interval input) - { - return timeline.lookup(input); + holder -> { + if (holder == null) { + return null; } - } - ) - .transformCat( - new Function, Iterable>>() - { - @Override - public Iterable> apply( - @Nullable final TimelineObjectHolder holder - ) - { - if (holder == null) { - return null; - } - return FunctionalIterable - .create(holder.getObject()) - .transform( - new Function, QueryRunner>() - { - @Override - public QueryRunner apply(PartitionChunk input) - { - return buildAndDecorateQueryRunner( - factory, - toolChest, - input.getObject(), - new SegmentDescriptor( - holder.getInterval(), - holder.getVersion(), - input.getChunkNumber() - ), - cpuTimeAccumulator - ); - } - } - ); - } + return FunctionalIterable + .create(holder.getObject()) + .transform( + partitionChunk -> + new SegmentDescriptor( + holder.getInterval(), + holder.getVersion(), + partitionChunk.getChunkNumber() + ) + ); } ); - return CPUTimeMetricQueryRunner.safeBuild( - new FinalizeResultsQueryRunner<>( - toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)), - toolChest - ), - toolChest, - emitter, - cpuTimeAccumulator, - true - ); - } - - private String getDataSourceName(DataSource dataSource) - { - return Iterables.getOnlyElement(dataSource.getTableNames()); + return getQueryRunnerForSegments(query, segmentDescriptors); } @Override @@ -225,7 +162,7 @@ public class ServerManager implements QuerySegmentWalker log.makeAlert("Unknown query type, [%s]", query.getClass()) .addData("dataSource", query.getDataSource()) .emit(); - return new NoopQueryRunner(); + return new NoopQueryRunner<>(); } // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. @@ -235,48 +172,53 @@ public class ServerManager implements QuerySegmentWalker } final QueryToolChest> toolChest = factory.getToolchest(); + final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); - String dataSourceName = getDataSourceName(query.getDataSource()); + final VersionedIntervalTimeline timeline; + final Optional> maybeTimeline = + segmentManager.getTimeline(analysis); - final VersionedIntervalTimeline timeline = segmentManager.getTimeline( - dataSourceName - ); - - if (timeline == null) { - return new NoopQueryRunner(); + // Make sure this query type can handle the subquery, if present. + if (analysis.isQuery() && !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) { + throw new ISE("Cannot handle subquery: %s", analysis.getDataSource()); } - final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); + if (maybeTimeline.isPresent()) { + timeline = maybeTimeline.get(); + } else { + // Note: this is not correct when there's a right or full outer join going on. + // See https://github.com/apache/druid/issues/9229 for details. + return new NoopQueryRunner<>(); + } FunctionalIterable> queryRunners = FunctionalIterable .create(specs) .transformCat( - new Function>>() - { - @Override - @SuppressWarnings("unchecked") - public Iterable> apply(SegmentDescriptor input) - { + descriptor -> { + final PartitionHolder entry = timeline.findEntry( + descriptor.getInterval(), + descriptor.getVersion() + ); - final PartitionHolder entry = timeline.findEntry( - input.getInterval(), input.getVersion() - ); - - if (entry == null) { - return Collections.singletonList( - new ReportTimelineMissingSegmentQueryRunner(input)); - } - - final PartitionChunk chunk = entry.getChunk(input.getPartitionNumber()); - if (chunk == null) { - return Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner(input)); - } - - final ReferenceCountingSegment adapter = chunk.getObject(); - return Collections.singletonList( - buildAndDecorateQueryRunner(factory, toolChest, adapter, input, cpuTimeAccumulator) - ); + if (entry == null) { + return Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor)); } + + final PartitionChunk chunk = entry.getChunk(descriptor.getPartitionNumber()); + if (chunk == null) { + return Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor)); + } + + final ReferenceCountingSegment segment = chunk.getObject(); + return Collections.singletonList( + buildAndDecorateQueryRunner( + factory, + toolChest, + segment, + descriptor, + cpuTimeAccumulator + ) + ); } ); diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index b630b8ec6c3..dd3961e04f4 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -39,6 +39,7 @@ import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; @@ -114,7 +115,9 @@ public class BrokerServerViewTest extends CuratorTestBase Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); - TimelineLookup timeline = brokerServerView.getTimeline(new TableDataSource("test_broker_server_view")); + TimelineLookup timeline = brokerServerView.getTimeline( + DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view")) + ).get(); List serverLookupRes = (List) timeline.lookup( Intervals.of( "2014-10-20T00:00:00Z/P1D" @@ -203,7 +206,9 @@ public class BrokerServerViewTest extends CuratorTestBase Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); - TimelineLookup timeline = brokerServerView.getTimeline(new TableDataSource("test_broker_server_view")); + TimelineLookup timeline = brokerServerView.getTimeline( + DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view")) + ).get(); assertValues( Arrays.asList( createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)), @@ -224,7 +229,9 @@ public class BrokerServerViewTest extends CuratorTestBase // renew segmentRemovedLatch since we still have 4 segments to unannounce segmentRemovedLatch = new CountDownLatch(4); - timeline = brokerServerView.getTimeline(new TableDataSource("test_broker_server_view")); + timeline = brokerServerView.getTimeline( + DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view")) + ).get(); assertValues( Arrays.asList( createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)), diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index a75ad64e080..3efe7bc5d75 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -38,7 +38,6 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.query.DataSource; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Druids; import org.apache.druid.query.Query; @@ -47,8 +46,10 @@ import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.TimelineLookup; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.SingleElementPartitionChunk; @@ -64,11 +65,13 @@ import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; /** + * */ public class CachingClusteredClientFunctionalityTest { @@ -245,9 +248,9 @@ public class CachingClusteredClientFunctionalityTest } @Override - public VersionedIntervalTimeline getTimeline(DataSource dataSource) + public Optional> getTimeline(DataSourceAnalysis analysis) { - return timeline; + return Optional.of(timeline); } @Nullable diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index bd91104aac4..9c0588837ff 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -68,7 +68,6 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.nary.TrinaryFn; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.BySegmentResultValueClass; -import org.apache.druid.query.DataSource; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Druids; import org.apache.druid.query.FinalizeResultsQueryRunner; @@ -98,6 +97,7 @@ import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; import org.apache.druid.query.ordering.StringComparators; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.search.SearchHit; import org.apache.druid.query.search.SearchQuery; import org.apache.druid.query.search.SearchQueryConfig; @@ -148,6 +148,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.TreeMap; import java.util.concurrent.Callable; @@ -2391,9 +2392,9 @@ public class CachingClusteredClientTest } @Override - public VersionedIntervalTimeline getTimeline(DataSource dataSource) + public Optional> getTimeline(DataSourceAnalysis analysis) { - return timeline; + return Optional.of(timeline); } @Override diff --git a/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java b/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java index 60b633d2262..f6c880c0e20 100644 --- a/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java @@ -30,8 +30,8 @@ import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; import org.apache.druid.client.selector.RandomServerSelectorStrategy; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.query.TableDataSource; import org.apache.druid.query.metadata.SegmentMetadataQueryConfig; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.timeline.DataSegment; @@ -48,6 +48,7 @@ import org.junit.Test; import java.util.List; import java.util.Map; +import java.util.Optional; public class ClientInfoResourceTest { @@ -128,7 +129,8 @@ public class ClientInfoResourceTest EasyMock.expect(serverInventoryView.getInventory()).andReturn(ImmutableList.of(server)).anyTimes(); timelineServerView = EasyMock.createMock(TimelineServerView.class); - EasyMock.expect(timelineServerView.getTimeline(EasyMock.anyObject(TableDataSource.class))).andReturn(timeline); + EasyMock.expect(timelineServerView.getTimeline(EasyMock.anyObject(DataSourceAnalysis.class))) + .andReturn((Optional) Optional.of(timeline)); EasyMock.replay(serverInventoryView, timelineServerView); diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index b6c9c1671fc..04f796e9918 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Ordering; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.MapUtils; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.AbstractSegment; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.ReferenceCountingSegment; @@ -49,6 +51,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -382,7 +385,10 @@ public class SegmentManagerTest @Test public void testGetNonExistingTimeline() { - Assert.assertNull(segmentManager.getTimeline("nonExisting")); + Assert.assertEquals( + Optional.empty(), + segmentManager.getTimeline(DataSourceAnalysis.forDataSource(new TableDataSource("nonExisting"))) + ); } @Test @@ -448,7 +454,10 @@ public class SegmentManagerTest dataSources.forEach( (sourceName, dataSourceState) -> { Assert.assertEquals(expectedDataSourceCounts.get(sourceName).longValue(), dataSourceState.getNumSegments()); - Assert.assertEquals(expectedDataSourceSizes.get(sourceName).longValue(), dataSourceState.getTotalSegmentSize()); + Assert.assertEquals( + expectedDataSourceSizes.get(sourceName).longValue(), + dataSourceState.getTotalSegmentSize() + ); Assert.assertEquals( expectedDataSources.get(sourceName).getAllTimelineEntries(), dataSourceState.getTimeline().getAllTimelineEntries() diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java index 1910bc27b65..5aacb8773f3 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java @@ -37,8 +37,9 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.math.expr.Evals; import org.apache.druid.query.Query; -import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.column.ColumnHolder; @@ -48,6 +49,7 @@ import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.joda.time.DateTime; +import org.joda.time.Interval; import java.io.IOException; import java.util.Collection; @@ -87,8 +89,7 @@ public class QueryMaker final Query query = druidQuery.getQuery(); if (plannerContext.getPlannerConfig().isRequireTimeCondition()) { - final Query innerMostQuery = findInnerMostQuery(query); - if (innerMostQuery.getIntervals().equals(Intervals.ONLY_ETERNITY)) { + if (Intervals.ONLY_ETERNITY.equals(findBaseDataSourceIntervals(query))) { throw new CannotBuildQueryException( "requireTimeCondition is enabled, all queries must include a filter condition on the __time column" ); @@ -121,13 +122,12 @@ public class QueryMaker ); } - private Query findInnerMostQuery(Query outerQuery) + private List findBaseDataSourceIntervals(Query query) { - Query query = outerQuery; - while (query.getDataSource() instanceof QueryDataSource) { - query = ((QueryDataSource) query.getDataSource()).getQuery(); - } - return query; + return DataSourceAnalysis.forDataSource(query.getDataSource()) + .getBaseQuerySegmentSpec() + .map(QuerySegmentSpec::getIntervals) + .orElse(query.getIntervals()); } private Sequence execute(Query query, final List newFields, final List newTypes) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java index 0f36273035a..a8e498beb1f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java @@ -26,8 +26,8 @@ import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.selector.ServerSelector; -import org.apache.druid.query.DataSource; import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; @@ -37,6 +37,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.Executor; /** @@ -77,7 +78,7 @@ public class TestServerInventoryView implements TimelineServerView } @Override - public TimelineLookup getTimeline(DataSource dataSource) + public Optional> getTimeline(DataSourceAnalysis analysis) { throw new UnsupportedOperationException(); }