diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 2922f2f16c3..8dba7de3d3d 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -175,7 +175,7 @@ public class CachingClusteredClient implements QuerySegmentWalker @Override public Sequence run(final QueryPlus queryPlus, final ResponseContext responseContext) { - return CachingClusteredClient.this.run(queryPlus, responseContext, timeline -> timeline); + return CachingClusteredClient.this.run(queryPlus, responseContext, timeline -> timeline, false); } }; } @@ -187,10 +187,12 @@ public class CachingClusteredClient implements QuerySegmentWalker private Sequence run( final QueryPlus queryPlus, final ResponseContext responseContext, - final UnaryOperator> timelineConverter + final UnaryOperator> timelineConverter, + final boolean specificSegments ) { - final ClusterQueryResult result = new SpecificQueryRunnable<>(queryPlus, responseContext).run(timelineConverter); + final ClusterQueryResult result = new SpecificQueryRunnable<>(queryPlus, responseContext) + .run(timelineConverter, specificSegments); initializeNumRemainingResponsesInResponseContext(queryPlus.getQuery(), responseContext, result.numQueryServers); return result.sequence; } @@ -231,7 +233,8 @@ public class CachingClusteredClient implements QuerySegmentWalker } } return timeline2; - } + }, + true ); } }; @@ -321,7 +324,10 @@ public class CachingClusteredClient implements QuerySegmentWalker * @return a pair of a sequence merging results from remote query servers and the number of remote servers * participating in query processing. */ - ClusterQueryResult run(final UnaryOperator> timelineConverter) + ClusterQueryResult run( + final UnaryOperator> timelineConverter, + final boolean specificSegments + ) { final Optional> maybeTimeline = serverView.getTimeline( dataSourceAnalysis @@ -335,7 +341,7 @@ public class CachingClusteredClient implements QuerySegmentWalker computeUncoveredIntervals(timeline); } - final Set segmentServers = computeSegmentsToQuery(timeline); + final Set segmentServers = computeSegmentsToQuery(timeline, specificSegments); @Nullable final byte[] queryCacheKey = computeQueryCacheKey(); if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) { @@ -401,11 +407,16 @@ public class CachingClusteredClient implements QuerySegmentWalker } } - private Set computeSegmentsToQuery(TimelineLookup timeline) + private Set computeSegmentsToQuery( + TimelineLookup timeline, + boolean specificSegments + ) { + final java.util.function.Function>> lookupFn + = specificSegments ? timeline::lookupWithIncompletePartitions : timeline::lookup; final List> serversLookup = toolChest.filterSegments( query, - intervals.stream().flatMap(i -> timeline.lookup(i).stream()).collect(Collectors.toList()) + intervals.stream().flatMap(i -> lookupFn.apply(i).stream()).collect(Collectors.toList()) ); final Set segments = new LinkedHashSet<>(); diff --git a/server/src/test/java/org/apache/druid/client/TestHttpClient.java b/server/src/test/java/org/apache/druid/client/TestHttpClient.java index 7828b647028..a899cb0747e 100644 --- a/server/src/test/java/org/apache/druid/client/TestHttpClient.java +++ b/server/src/test/java/org/apache/druid/client/TestHttpClient.java @@ -171,7 +171,7 @@ public class TestHttpClient implements HttpClient private QueryRunner getQueryRunner() { if (isSegmentDropped) { - return new ReportTimelineMissingSegmentQueryRunner( + return new ReportTimelineMissingSegmentQueryRunner<>( new SegmentDescriptor(segment.getInterval(), segment.getVersion(), segment.getId().getPartitionNum()) ); } else { diff --git a/server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java b/server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java index 7ea13f22dcd..3cea53b44e3 100644 --- a/server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java +++ b/server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java @@ -20,6 +20,7 @@ package org.apache.druid.query; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.client.CachingClusteredClient; @@ -35,7 +36,9 @@ import org.apache.druid.client.cache.MapCache; import org.apache.druid.guice.http.DruidHttpClientConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; @@ -74,7 +77,7 @@ public class RetryQueryRunnerTest { private static final Closer CLOSER = Closer.create(); private static final String DATASOURCE = "datasource"; - private static final GeneratorSchemaInfo SCHEMA_INFO = GeneratorBasicSchemas.SCHEMA_MAP.get("basic"); + private static final GeneratorSchemaInfo BASE_SCHEMA_INFO = GeneratorBasicSchemas.SCHEMA_MAP.get("basic"); private static final boolean USE_PARALLEL_MERGE_POOL_CONFIGURED = false; @Rule @@ -148,7 +151,7 @@ public class RetryQueryRunnerTest public void testNoRetry() { prepareCluster(10); - final Query> query = timeseriesQuery(SCHEMA_INFO.getDataInterval()); + final Query> query = timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval()); final RetryQueryRunner> queryRunner = createQueryRunner( newRetryQueryRunnerConfig(1, false), query, @@ -165,7 +168,7 @@ public class RetryQueryRunnerTest public void testRetryForMovedSegment() { prepareCluster(10); - final Query> query = timeseriesQuery(SCHEMA_INFO.getDataInterval()); + final Query> query = timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval()); final RetryQueryRunner> queryRunner = createQueryRunner( newRetryQueryRunnerConfig(1, true), query, @@ -189,7 +192,7 @@ public class RetryQueryRunnerTest public void testRetryUntilWeGetFullResult() { prepareCluster(10); - final Query> query = timeseriesQuery(SCHEMA_INFO.getDataInterval()); + final Query> query = timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval()); final RetryQueryRunner> queryRunner = createQueryRunner( newRetryQueryRunnerConfig(100, false), // retry up to 100 query, @@ -209,7 +212,7 @@ public class RetryQueryRunnerTest public void testFailWithPartialResultsAfterRetry() { prepareCluster(10); - final Query> query = timeseriesQuery(SCHEMA_INFO.getDataInterval()); + final Query> query = timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval()); final RetryQueryRunner> queryRunner = createQueryRunner( newRetryQueryRunnerConfig(1, false), query, @@ -229,12 +232,26 @@ public class RetryQueryRunnerTest private void prepareCluster(int numServers) { + Preconditions.checkArgument(numServers < 25, "Cannot be larger than 24"); for (int i = 0; i < numServers; i++) { - final DataSegment segment = newSegment(SCHEMA_INFO.getDataInterval(), i); + final int partitionId = i % 2; + final int intervalIndex = i / 2; + final Interval interval = Intervals.of("2000-01-01T%02d/PT1H", intervalIndex); + final DataSegment segment = newSegment(interval, partitionId, 2); addServer( SimpleServerView.createServer(i + 1), segment, - segmentGenerator.generate(segment, SCHEMA_INFO, Granularities.NONE, 10) + segmentGenerator.generate( + segment, + new GeneratorSchemaInfo( + BASE_SCHEMA_INFO.getColumnSchemas(), + BASE_SCHEMA_INFO.getAggs(), + interval, + BASE_SCHEMA_INFO.isWithRollup() + ), + Granularities.NONE, + 10 + ) ); } } @@ -315,7 +332,7 @@ public class RetryQueryRunnerTest return Druids.newTimeseriesQueryBuilder() .dataSource(DATASOURCE) .intervals(ImmutableList.of(interval)) - .granularity(Granularities.DAY) + .granularity(Granularities.HOUR) .aggregators(new CountAggregatorFactory("rows")) .context( ImmutableMap.of( @@ -338,20 +355,26 @@ public class RetryQueryRunnerTest { return IntStream .range(0, expectedNumResultRows) - .mapToObj(i -> new Result<>(DateTimes.of("2000-01-01"), new TimeseriesResultValue(ImmutableMap.of("rows", 10)))) + .mapToObj( + i -> new Result<>( + DateTimes.of(StringUtils.format("2000-01-01T%02d", i / 2)), + new TimeseriesResultValue(ImmutableMap.of("rows", 10)) + ) + ) .collect(Collectors.toList()); } private static DataSegment newSegment( Interval interval, - int partitionId + int partitionId, + int numCorePartitions ) { return DataSegment.builder() .dataSource(DATASOURCE) .interval(interval) .version("1") - .shardSpec(new NumberedShardSpec(partitionId, 0)) + .shardSpec(new NumberedShardSpec(partitionId, numCorePartitions)) .size(10) .build(); }