Fix CachingClusteredClient when querying specific segments (#10125)

* Fix CachingClusteredClient when querying specific segments

* delete useless test

* roll back timeout
This commit is contained in:
Jihoon Son 2020-07-02 17:50:50 -07:00 committed by GitHub
parent ed981ef88e
commit 2b93dc6019
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 20 deletions

View File

@ -175,7 +175,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
@Override @Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext) public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext)
{ {
return CachingClusteredClient.this.run(queryPlus, responseContext, timeline -> timeline); return CachingClusteredClient.this.run(queryPlus, responseContext, timeline -> timeline, false);
} }
}; };
} }
@ -187,10 +187,12 @@ public class CachingClusteredClient implements QuerySegmentWalker
private <T> Sequence<T> run( private <T> Sequence<T> run(
final QueryPlus<T> queryPlus, final QueryPlus<T> queryPlus,
final ResponseContext responseContext, final ResponseContext responseContext,
final UnaryOperator<TimelineLookup<String, ServerSelector>> timelineConverter final UnaryOperator<TimelineLookup<String, ServerSelector>> timelineConverter,
final boolean specificSegments
) )
{ {
final ClusterQueryResult<T> result = new SpecificQueryRunnable<>(queryPlus, responseContext).run(timelineConverter); final ClusterQueryResult<T> result = new SpecificQueryRunnable<>(queryPlus, responseContext)
.run(timelineConverter, specificSegments);
initializeNumRemainingResponsesInResponseContext(queryPlus.getQuery(), responseContext, result.numQueryServers); initializeNumRemainingResponsesInResponseContext(queryPlus.getQuery(), responseContext, result.numQueryServers);
return result.sequence; return result.sequence;
} }
@ -231,7 +233,8 @@ public class CachingClusteredClient implements QuerySegmentWalker
} }
} }
return timeline2; return timeline2;
} },
true
); );
} }
}; };
@ -321,7 +324,10 @@ public class CachingClusteredClient implements QuerySegmentWalker
* @return a pair of a sequence merging results from remote query servers and the number of remote servers * @return a pair of a sequence merging results from remote query servers and the number of remote servers
* participating in query processing. * participating in query processing.
*/ */
ClusterQueryResult<T> run(final UnaryOperator<TimelineLookup<String, ServerSelector>> timelineConverter) ClusterQueryResult<T> run(
final UnaryOperator<TimelineLookup<String, ServerSelector>> timelineConverter,
final boolean specificSegments
)
{ {
final Optional<? extends TimelineLookup<String, ServerSelector>> maybeTimeline = serverView.getTimeline( final Optional<? extends TimelineLookup<String, ServerSelector>> maybeTimeline = serverView.getTimeline(
dataSourceAnalysis dataSourceAnalysis
@ -335,7 +341,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
computeUncoveredIntervals(timeline); computeUncoveredIntervals(timeline);
} }
final Set<SegmentServerSelector> segmentServers = computeSegmentsToQuery(timeline); final Set<SegmentServerSelector> segmentServers = computeSegmentsToQuery(timeline, specificSegments);
@Nullable @Nullable
final byte[] queryCacheKey = computeQueryCacheKey(); final byte[] queryCacheKey = computeQueryCacheKey();
if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) { if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) {
@ -401,11 +407,16 @@ public class CachingClusteredClient implements QuerySegmentWalker
} }
} }
private Set<SegmentServerSelector> computeSegmentsToQuery(TimelineLookup<String, ServerSelector> timeline) private Set<SegmentServerSelector> computeSegmentsToQuery(
TimelineLookup<String, ServerSelector> timeline,
boolean specificSegments
)
{ {
final java.util.function.Function<Interval, List<TimelineObjectHolder<String, ServerSelector>>> lookupFn
= specificSegments ? timeline::lookupWithIncompletePartitions : timeline::lookup;
final List<TimelineObjectHolder<String, ServerSelector>> serversLookup = toolChest.filterSegments( final List<TimelineObjectHolder<String, ServerSelector>> serversLookup = toolChest.filterSegments(
query, query,
intervals.stream().flatMap(i -> timeline.lookup(i).stream()).collect(Collectors.toList()) intervals.stream().flatMap(i -> lookupFn.apply(i).stream()).collect(Collectors.toList())
); );
final Set<SegmentServerSelector> segments = new LinkedHashSet<>(); final Set<SegmentServerSelector> segments = new LinkedHashSet<>();

View File

@ -171,7 +171,7 @@ public class TestHttpClient implements HttpClient
private QueryRunner getQueryRunner() private QueryRunner getQueryRunner()
{ {
if (isSegmentDropped) { if (isSegmentDropped) {
return new ReportTimelineMissingSegmentQueryRunner( return new ReportTimelineMissingSegmentQueryRunner<>(
new SegmentDescriptor(segment.getInterval(), segment.getVersion(), segment.getId().getPartitionNum()) new SegmentDescriptor(segment.getInterval(), segment.getVersion(), segment.getId().getPartitionNum())
); );
} else { } else {

View File

@ -20,6 +20,7 @@
package org.apache.druid.query; package org.apache.druid.query;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.CachingClusteredClient; import org.apache.druid.client.CachingClusteredClient;
@ -35,7 +36,9 @@ import org.apache.druid.client.cache.MapCache;
import org.apache.druid.guice.http.DruidHttpClientConfig; import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.NonnullPair; import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
@ -74,7 +77,7 @@ public class RetryQueryRunnerTest
{ {
private static final Closer CLOSER = Closer.create(); private static final Closer CLOSER = Closer.create();
private static final String DATASOURCE = "datasource"; private static final String DATASOURCE = "datasource";
private static final GeneratorSchemaInfo SCHEMA_INFO = GeneratorBasicSchemas.SCHEMA_MAP.get("basic"); private static final GeneratorSchemaInfo BASE_SCHEMA_INFO = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
private static final boolean USE_PARALLEL_MERGE_POOL_CONFIGURED = false; private static final boolean USE_PARALLEL_MERGE_POOL_CONFIGURED = false;
@Rule @Rule
@ -148,7 +151,7 @@ public class RetryQueryRunnerTest
public void testNoRetry() public void testNoRetry()
{ {
prepareCluster(10); prepareCluster(10);
final Query<Result<TimeseriesResultValue>> query = timeseriesQuery(SCHEMA_INFO.getDataInterval()); final Query<Result<TimeseriesResultValue>> query = timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval());
final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = createQueryRunner( final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = createQueryRunner(
newRetryQueryRunnerConfig(1, false), newRetryQueryRunnerConfig(1, false),
query, query,
@ -165,7 +168,7 @@ public class RetryQueryRunnerTest
public void testRetryForMovedSegment() public void testRetryForMovedSegment()
{ {
prepareCluster(10); prepareCluster(10);
final Query<Result<TimeseriesResultValue>> query = timeseriesQuery(SCHEMA_INFO.getDataInterval()); final Query<Result<TimeseriesResultValue>> query = timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval());
final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = createQueryRunner( final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = createQueryRunner(
newRetryQueryRunnerConfig(1, true), newRetryQueryRunnerConfig(1, true),
query, query,
@ -189,7 +192,7 @@ public class RetryQueryRunnerTest
public void testRetryUntilWeGetFullResult() public void testRetryUntilWeGetFullResult()
{ {
prepareCluster(10); prepareCluster(10);
final Query<Result<TimeseriesResultValue>> query = timeseriesQuery(SCHEMA_INFO.getDataInterval()); final Query<Result<TimeseriesResultValue>> query = timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval());
final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = createQueryRunner( final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = createQueryRunner(
newRetryQueryRunnerConfig(100, false), // retry up to 100 newRetryQueryRunnerConfig(100, false), // retry up to 100
query, query,
@ -209,7 +212,7 @@ public class RetryQueryRunnerTest
public void testFailWithPartialResultsAfterRetry() public void testFailWithPartialResultsAfterRetry()
{ {
prepareCluster(10); prepareCluster(10);
final Query<Result<TimeseriesResultValue>> query = timeseriesQuery(SCHEMA_INFO.getDataInterval()); final Query<Result<TimeseriesResultValue>> query = timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval());
final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = createQueryRunner( final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = createQueryRunner(
newRetryQueryRunnerConfig(1, false), newRetryQueryRunnerConfig(1, false),
query, query,
@ -229,12 +232,26 @@ public class RetryQueryRunnerTest
private void prepareCluster(int numServers) private void prepareCluster(int numServers)
{ {
Preconditions.checkArgument(numServers < 25, "Cannot be larger than 24");
for (int i = 0; i < numServers; i++) { for (int i = 0; i < numServers; i++) {
final DataSegment segment = newSegment(SCHEMA_INFO.getDataInterval(), i); final int partitionId = i % 2;
final int intervalIndex = i / 2;
final Interval interval = Intervals.of("2000-01-01T%02d/PT1H", intervalIndex);
final DataSegment segment = newSegment(interval, partitionId, 2);
addServer( addServer(
SimpleServerView.createServer(i + 1), SimpleServerView.createServer(i + 1),
segment, segment,
segmentGenerator.generate(segment, SCHEMA_INFO, Granularities.NONE, 10) segmentGenerator.generate(
segment,
new GeneratorSchemaInfo(
BASE_SCHEMA_INFO.getColumnSchemas(),
BASE_SCHEMA_INFO.getAggs(),
interval,
BASE_SCHEMA_INFO.isWithRollup()
),
Granularities.NONE,
10
)
); );
} }
} }
@ -315,7 +332,7 @@ public class RetryQueryRunnerTest
return Druids.newTimeseriesQueryBuilder() return Druids.newTimeseriesQueryBuilder()
.dataSource(DATASOURCE) .dataSource(DATASOURCE)
.intervals(ImmutableList.of(interval)) .intervals(ImmutableList.of(interval))
.granularity(Granularities.DAY) .granularity(Granularities.HOUR)
.aggregators(new CountAggregatorFactory("rows")) .aggregators(new CountAggregatorFactory("rows"))
.context( .context(
ImmutableMap.of( ImmutableMap.of(
@ -338,20 +355,26 @@ public class RetryQueryRunnerTest
{ {
return IntStream return IntStream
.range(0, expectedNumResultRows) .range(0, expectedNumResultRows)
.mapToObj(i -> new Result<>(DateTimes.of("2000-01-01"), new TimeseriesResultValue(ImmutableMap.of("rows", 10)))) .mapToObj(
i -> new Result<>(
DateTimes.of(StringUtils.format("2000-01-01T%02d", i / 2)),
new TimeseriesResultValue(ImmutableMap.of("rows", 10))
)
)
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
private static DataSegment newSegment( private static DataSegment newSegment(
Interval interval, Interval interval,
int partitionId int partitionId,
int numCorePartitions
) )
{ {
return DataSegment.builder() return DataSegment.builder()
.dataSource(DATASOURCE) .dataSource(DATASOURCE)
.interval(interval) .interval(interval)
.version("1") .version("1")
.shardSpec(new NumberedShardSpec(partitionId, 0)) .shardSpec(new NumberedShardSpec(partitionId, numCorePartitions))
.size(10) .size(10)
.build(); .build();
} }