diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 79ac192229e..6c528792018 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -177,97 +177,98 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker final boolean skipIncrementalSegment = query.getContextValue(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false); final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); + Iterable> perSegmentRunners = Iterables.transform( + specs, + descriptor -> { + final PartitionHolder holder = sinkTimeline.findEntry( + descriptor.getInterval(), + descriptor.getVersion() + ); + if (holder == null) { + return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); + } + + final PartitionChunk chunk = holder.getChunk(descriptor.getPartitionNumber()); + if (chunk == null) { + return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); + } + + final Sink theSink = chunk.getObject(); + final SegmentId sinkSegmentId = theSink.getSegment().getId(); + + Iterable> perHydrantRunners = new SinkQueryRunners<>( + Iterables.transform( + theSink, + hydrant -> { + // Hydrant might swap at any point, but if it's swapped at the start + // then we know it's *definitely* swapped. + final boolean hydrantDefinitelySwapped = hydrant.hasSwapped(); + + if (skipIncrementalSegment && !hydrantDefinitelySwapped) { + return new Pair<>(Intervals.ETERNITY, new NoopQueryRunner<>()); + } + + // Prevent the underlying segment from swapping when its being iterated + final Pair segmentAndCloseable = hydrant.getAndIncrementSegment(); + try { + QueryRunner runner = factory.createRunner(segmentAndCloseable.lhs); + + // 1) Only use caching if data is immutable + // 2) Hydrants are not the same between replicas, make sure cache is local + if (hydrantDefinitelySwapped && cache.isLocal()) { + runner = new CachingQueryRunner<>( + makeHydrantCacheIdentifier(hydrant), + descriptor, + objectMapper, + cache, + toolChest, + runner, + // Always populate in foreground regardless of config + new ForegroundCachePopulator( + objectMapper, + cachePopulatorStats, + cacheConfig.getMaxEntrySize() + ), + cacheConfig + ); + } + // Make it always use Closeable to decrement() + runner = QueryRunnerHelper.makeClosingQueryRunner( + runner, + segmentAndCloseable.rhs + ); + return new Pair<>(segmentAndCloseable.lhs.getDataInterval(), runner); + } + catch (RuntimeException e) { + CloseQuietly.close(segmentAndCloseable.rhs); + throw e; + } + } + ) + ); + return new SpecificSegmentQueryRunner<>( + withPerSinkMetrics( + new BySegmentQueryRunner<>( + sinkSegmentId, + descriptor.getInterval().getStart(), + factory.mergeRunners( + Execs.directExecutor(), + perHydrantRunners + ) + ), + toolChest, + sinkSegmentId, + cpuTimeAccumulator + ), + new SpecificSegmentSpec(descriptor) + ); + } + ); final QueryRunner mergedRunner = toolChest.mergeResults( factory.mergeRunners( queryExecutorService, - FunctionalIterable - .create(specs) - .transform( - descriptor -> { - final PartitionHolder holder = sinkTimeline.findEntry( - descriptor.getInterval(), - descriptor.getVersion() - ); - if (holder == null) { - return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); - } - - final PartitionChunk chunk = holder.getChunk(descriptor.getPartitionNumber()); - if (chunk == null) { - return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); - } - - final Sink theSink = chunk.getObject(); - final SegmentId sinkSegmentId = theSink.getSegment().getId(); - - return new SpecificSegmentQueryRunner<>( - withPerSinkMetrics( - new BySegmentQueryRunner<>( - sinkSegmentId, - descriptor.getInterval().getStart(), - factory.mergeRunners( - Execs.directExecutor(), - new SinkQueryRunners<>( - Iterables.transform( - theSink, - hydrant -> { - // Hydrant might swap at any point, but if it's swapped at the start - // then we know it's *definitely* swapped. - final boolean hydrantDefinitelySwapped = hydrant.hasSwapped(); - - if (skipIncrementalSegment && !hydrantDefinitelySwapped) { - return new Pair<>(Intervals.ETERNITY, new NoopQueryRunner<>()); - } - - // Prevent the underlying segment from swapping when its being iterated - final Pair segment = hydrant.getAndIncrementSegment(); - try { - QueryRunner baseRunner = QueryRunnerHelper.makeClosingQueryRunner( - factory.createRunner(segment.lhs), - segment.rhs - ); - - // 1) Only use caching if data is immutable - // 2) Hydrants are not the same between replicas, make sure cache is local - if (hydrantDefinitelySwapped && cache.isLocal()) { - QueryRunner cachingRunner = new CachingQueryRunner<>( - makeHydrantCacheIdentifier(hydrant), - descriptor, - objectMapper, - cache, - toolChest, - baseRunner, - // Always populate in foreground regardless of config - new ForegroundCachePopulator( - objectMapper, - cachePopulatorStats, - cacheConfig.getMaxEntrySize() - ), - cacheConfig - ); - return new Pair<>(segment.lhs.getDataInterval(), cachingRunner); - } else { - return new Pair<>(segment.lhs.getDataInterval(), baseRunner); - } - } - catch (RuntimeException e) { - CloseQuietly.close(segment.rhs); - throw e; - } - } - ) - ) - ) - ), - toolChest, - sinkSegmentId, - cpuTimeAccumulator - ), - new SpecificSegmentSpec(descriptor) - ); - } - ) + perSegmentRunners ) );