Ensure ReferenceCountingSegment.decrement() is invoked correctly (#8323)

* fix issue8291. Make sure ReferenceCountingSegment.decrement() is invoked correctly

* add some comments in SinkQuerySegmentWalker.java

* extracting perSegmentRunners and perHydrantRunners to reduce the level of nesting
This commit is contained in:
pphust 2019-08-21 03:01:16 +08:00 committed by Roman Leventov
parent 818bf4990c
commit ffcbd1ecb8
1 changed files with 88 additions and 87 deletions

View File

@ -177,97 +177,98 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
final boolean skipIncrementalSegment = query.getContextValue(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false);
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
Iterable<QueryRunner<T>> perSegmentRunners = Iterables.transform(
specs,
descriptor -> {
final PartitionHolder<Sink> holder = sinkTimeline.findEntry(
descriptor.getInterval(),
descriptor.getVersion()
);
if (holder == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
final PartitionChunk<Sink> chunk = holder.getChunk(descriptor.getPartitionNumber());
if (chunk == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
final Sink theSink = chunk.getObject();
final SegmentId sinkSegmentId = theSink.getSegment().getId();
Iterable<QueryRunner<T>> perHydrantRunners = new SinkQueryRunners<>(
Iterables.transform(
theSink,
hydrant -> {
// Hydrant might swap at any point, but if it's swapped at the start
// then we know it's *definitely* swapped.
final boolean hydrantDefinitelySwapped = hydrant.hasSwapped();
if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
return new Pair<>(Intervals.ETERNITY, new NoopQueryRunner<>());
}
// Prevent the underlying segment from swapping when its being iterated
final Pair<Segment, Closeable> segmentAndCloseable = hydrant.getAndIncrementSegment();
try {
QueryRunner<T> runner = factory.createRunner(segmentAndCloseable.lhs);
// 1) Only use caching if data is immutable
// 2) Hydrants are not the same between replicas, make sure cache is local
if (hydrantDefinitelySwapped && cache.isLocal()) {
runner = new CachingQueryRunner<>(
makeHydrantCacheIdentifier(hydrant),
descriptor,
objectMapper,
cache,
toolChest,
runner,
// Always populate in foreground regardless of config
new ForegroundCachePopulator(
objectMapper,
cachePopulatorStats,
cacheConfig.getMaxEntrySize()
),
cacheConfig
);
}
// Make it always use Closeable to decrement()
runner = QueryRunnerHelper.makeClosingQueryRunner(
runner,
segmentAndCloseable.rhs
);
return new Pair<>(segmentAndCloseable.lhs.getDataInterval(), runner);
}
catch (RuntimeException e) {
CloseQuietly.close(segmentAndCloseable.rhs);
throw e;
}
}
)
);
return new SpecificSegmentQueryRunner<>(
withPerSinkMetrics(
new BySegmentQueryRunner<>(
sinkSegmentId,
descriptor.getInterval().getStart(),
factory.mergeRunners(
Execs.directExecutor(),
perHydrantRunners
)
),
toolChest,
sinkSegmentId,
cpuTimeAccumulator
),
new SpecificSegmentSpec(descriptor)
);
}
);
final QueryRunner<T> mergedRunner =
toolChest.mergeResults(
factory.mergeRunners(
queryExecutorService,
FunctionalIterable
.create(specs)
.transform(
descriptor -> {
final PartitionHolder<Sink> holder = sinkTimeline.findEntry(
descriptor.getInterval(),
descriptor.getVersion()
);
if (holder == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
final PartitionChunk<Sink> chunk = holder.getChunk(descriptor.getPartitionNumber());
if (chunk == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
final Sink theSink = chunk.getObject();
final SegmentId sinkSegmentId = theSink.getSegment().getId();
return new SpecificSegmentQueryRunner<>(
withPerSinkMetrics(
new BySegmentQueryRunner<>(
sinkSegmentId,
descriptor.getInterval().getStart(),
factory.mergeRunners(
Execs.directExecutor(),
new SinkQueryRunners<>(
Iterables.transform(
theSink,
hydrant -> {
// Hydrant might swap at any point, but if it's swapped at the start
// then we know it's *definitely* swapped.
final boolean hydrantDefinitelySwapped = hydrant.hasSwapped();
if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
return new Pair<>(Intervals.ETERNITY, new NoopQueryRunner<>());
}
// Prevent the underlying segment from swapping when its being iterated
final Pair<Segment, Closeable> segment = hydrant.getAndIncrementSegment();
try {
QueryRunner<T> baseRunner = QueryRunnerHelper.makeClosingQueryRunner(
factory.createRunner(segment.lhs),
segment.rhs
);
// 1) Only use caching if data is immutable
// 2) Hydrants are not the same between replicas, make sure cache is local
if (hydrantDefinitelySwapped && cache.isLocal()) {
QueryRunner<T> cachingRunner = new CachingQueryRunner<>(
makeHydrantCacheIdentifier(hydrant),
descriptor,
objectMapper,
cache,
toolChest,
baseRunner,
// Always populate in foreground regardless of config
new ForegroundCachePopulator(
objectMapper,
cachePopulatorStats,
cacheConfig.getMaxEntrySize()
),
cacheConfig
);
return new Pair<>(segment.lhs.getDataInterval(), cachingRunner);
} else {
return new Pair<>(segment.lhs.getDataInterval(), baseRunner);
}
}
catch (RuntimeException e) {
CloseQuietly.close(segment.rhs);
throw e;
}
}
)
)
)
),
toolChest,
sinkSegmentId,
cpuTimeAccumulator
),
new SpecificSegmentSpec(descriptor)
);
}
)
perSegmentRunners
)
);