Lookup on incomplete partition set in SegmentMetadataQuerySegmentWalker (#15496)

Description
With CentralizedDatasourceSchema (#14989) feature enabled, metadata for appended segments was not being refreshed. This caused numRows to be 0 for the new segments and would probably cause the datasource schema to not include columns from the new segments.

Analysis
The problem turned out in the new QuerySegmentWalker implementation in the Coordinator. It first finds the segment to be queried in the Coordinator timeline. Then it creates a new timeline of the segments present in the timeline.
The problem was that it is looking up complete partition set in the new timeline. Since the appended segments by themselves do not make a complete partition set, no SegmentMetadataQuery were executed.
This commit is contained in:
Rishabh Singh 2023-12-06 15:25:28 +05:30 committed by GitHub
parent 6f51155ccb
commit 6a64f72c67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 108 additions and 10 deletions

View File

@ -201,7 +201,7 @@ public class SegmentMetadataQuerySegmentWalker implements QuerySegmentWalker
)
{
final Function<Interval, List<TimelineObjectHolder<String, SegmentLoadInfo>>> lookupFn
= timeline::lookup;
= timeline::lookupWithIncompletePartitions;
final List<Interval> intervals = query.getIntervals();
List<TimelineObjectHolder<String, SegmentLoadInfo>> timelineObjectHolders =

View File

@ -134,8 +134,8 @@ public class SegmentMetadataQuerySegmentWalkerTest
timelines,
queryRunnerMap,
Lists.newArrayList(
Pair.of(Intervals.of("2011-01-01/2011-01-02"), 5),
Pair.of(Intervals.of("2011-01-05/2011-01-07"), 1))
Pair.of(Intervals.of("2011-01-01/2011-01-02"), Lists.newArrayList(0, 4, 5)),
Pair.of(Intervals.of("2011-01-05/2011-01-07"), Lists.newArrayList(0, 1, 1)))
);
List<SegmentDescriptor> segmentDescriptors =
@ -148,7 +148,7 @@ public class SegmentMetadataQuerySegmentWalkerTest
.collect(
Collectors.toList());
final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery(
SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery(
new TableDataSource(DATASOURCE),
new MultipleSpecificSegmentSpec(
segmentDescriptors
@ -206,10 +206,104 @@ public class SegmentMetadataQuerySegmentWalkerTest
Assert.assertEquals(expectedSegmentIds, actualSegmentIds);
}
@Test
public void testQueryAppendedSegments() throws IOException
{
Map<DataSource, VersionedIntervalTimeline<String, SegmentLoadInfo>> timelines = new HashMap<>();
Map<String, QueryRunner> queryRunnerMap = new HashMap<>();
// populate the core partition set
populateTimeline(
timelines,
queryRunnerMap,
Collections.singletonList(
Pair.of(Intervals.of("2011-01-01/2011-01-02"), Lists.newArrayList(0, 4, 5))
)
);
queryRunnerMap.clear();
// append 2 new segments
Map<String, ServerExpectations> serverExpectationsMap =
populateTimeline(
timelines,
queryRunnerMap,
Collections.singletonList(
Pair.of(Intervals.of("2011-01-01/2011-01-02"), Lists.newArrayList(5, 6, 5))
)
);
List<SegmentDescriptor> segmentDescriptors =
serverExpectationsMap.values()
.stream()
.flatMap(serverExpectations -> Lists.newArrayList(serverExpectations.iterator()).stream())
.map(
ServerExpectation::getSegment)
.map(segment -> segment.getId().toDescriptor())
.collect(
Collectors.toList());
SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery(
new TableDataSource(DATASOURCE),
new MultipleSpecificSegmentSpec(
segmentDescriptors
),
new AllColumnIncluderator(),
false,
QueryContexts.override(
Collections.emptyMap(),
QueryContexts.BROKER_PARALLEL_MERGE_KEY,
false
),
EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
false,
null,
null
);
SegmentMetadataQuerySegmentWalker walker = new SegmentMetadataQuerySegmentWalker(
new TestCoordinatorServerView(timelines, queryRunnerMap),
httpClientConfig,
warehouse,
new ServerConfig(),
new NoopServiceEmitter()
);
Sequence<SegmentAnalysis> resultSequence = walker.getQueryRunnerForSegments(
segmentMetadataQuery,
segmentDescriptors
).run(QueryPlus.wrap(segmentMetadataQuery));
Yielder<SegmentAnalysis> yielder = Yielders.each(resultSequence);
Set<String> actualSegmentIds = new HashSet<>();
try {
while (!yielder.isDone()) {
final SegmentAnalysis analysis = yielder.get();
actualSegmentIds.add(analysis.getId());
yielder = yielder.next(null);
}
}
finally {
yielder.close();
}
Set<String> expectedSegmentIds =
serverExpectationsMap.values()
.stream()
.flatMap(serverExpectations -> Lists.newArrayList(
serverExpectations.iterator()).stream())
.map(
ServerExpectation::getSegment)
.map(segment -> segment.getId().toString())
.collect(
Collectors.toSet());
Assert.assertEquals(expectedSegmentIds, actualSegmentIds);
}
private Map<String, ServerExpectations> populateTimeline(
final Map<DataSource, VersionedIntervalTimeline<String, SegmentLoadInfo>> timelines,
final Map<String, QueryRunner> queryRunnerMap,
final List<Pair<Interval, Integer>> intervalAndChunks
final List<Pair<Interval, List<Integer>>> intervalAndChunks
)
{
VersionedIntervalTimeline<String, SegmentLoadInfo> timeline = new VersionedIntervalTimeline<>(Comparator.naturalOrder());
@ -217,17 +311,21 @@ public class SegmentMetadataQuerySegmentWalkerTest
Map<String, ServerExpectations> serverExpectationsMap = new HashMap<>();
for (Pair<Interval, Integer> intervalAndChunk : intervalAndChunks) {
for (int partitionNum = 0; partitionNum < intervalAndChunk.rhs; partitionNum++) {
for (Pair<Interval, List<Integer>> intervalAndChunk : intervalAndChunks) {
List<Integer> partitionDetails = intervalAndChunk.rhs;
int startNum = partitionDetails.get(0);
int endNum = partitionDetails.get(1);
int corePartitions = partitionDetails.get(2);
for (int partitionNum = startNum; partitionNum <= endNum; partitionNum++) {
Interval interval = intervalAndChunk.lhs;
int numChunks = intervalAndChunk.rhs;
int numChunks = endNum - startNum + 1;
SegmentId segmentId = SegmentId.of(DATASOURCE, interval, "0", partitionNum);
DataSegment mockSegment = EasyMock.mock(DataSegment.class);
final ShardSpec shardSpec;
if (numChunks == 1) {
if (corePartitions == 1) {
shardSpec = new SingleDimensionShardSpec("dimAll", null, null, 0, 1);
} else {
String start = null;
@ -238,7 +336,7 @@ public class SegmentMetadataQuerySegmentWalkerTest
if (partitionNum + 1 < numChunks) {
end = String.valueOf(partitionNum + 1);
}
shardSpec = new SingleDimensionShardSpec("dim", start, end, partitionNum, numChunks);
shardSpec = new SingleDimensionShardSpec("dim", start, end, partitionNum, corePartitions);
}
ServerExpectation<Object> expectation = new ServerExpectation<>(