diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java index be03fafd357..f54b0ecd2af 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java @@ -89,7 +89,6 @@ public class TestAppenderatorsManager implements AppenderatorsManager segmentAnnouncer, emitter, queryProcessingPool, - joinableFactory, cache, cacheConfig, cachePopulatorStats, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java index 47cd058b04d..51115c48bae 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java @@ -32,8 +32,6 @@ import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.segment.join.JoinableFactory; -import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; @@ -58,7 +56,6 @@ public class Appenderators DataSegmentAnnouncer segmentAnnouncer, ServiceEmitter emitter, QueryProcessingPool queryProcessingPool, - JoinableFactory joinableFactory, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats, @@ -86,7 +83,6 @@ public class Appenderators emitter, conglomerate, queryProcessingPool, - new JoinableFactoryWrapper(joinableFactory), Preconditions.checkNotNull(cache, "cache"), cacheConfig, cachePopulatorStats diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java index 960779fbf16..e64c315484d 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java @@ -115,7 +115,6 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory segmentAnnouncer, emitter, queryProcessingPool, - joinableFactory, cache, cacheConfig, cachePopulatorStats, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java index 070ac62568a..dba96acc66a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java @@ -105,7 +105,6 @@ public class PeonAppenderatorsManager implements AppenderatorsManager segmentAnnouncer, emitter, queryProcessingPool, - joinableFactory, cache, cacheConfig, cachePopulatorStats, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 7c81b60feab..1bf07fa4146 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -22,7 +22,6 @@ package org.apache.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.Iterables; import org.apache.druid.client.CachingQueryRunner; import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; @@ -59,7 +58,6 @@ import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.StorageAdapter; -import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.realtime.FireHydrant; import org.apache.druid.segment.realtime.plumber.Sink; import org.apache.druid.segment.realtime.plumber.SinkSegmentReference; @@ -69,13 +67,18 @@ import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.utils.CloseableUtils; import org.joda.time.Interval; -import java.io.Closeable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.stream.Collectors; /** * Query handler for indexing tasks. @@ -92,7 +95,6 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker private final ServiceEmitter emitter; private final QueryRunnerFactoryConglomerate conglomerate; private final QueryProcessingPool queryProcessingPool; - private final JoinableFactoryWrapper joinableFactoryWrapper; private final Cache cache; private final CacheConfig cacheConfig; private final CachePopulatorStats cachePopulatorStats; @@ -106,7 +108,6 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker ServiceEmitter emitter, QueryRunnerFactoryConglomerate conglomerate, QueryProcessingPool queryProcessingPool, - JoinableFactoryWrapper joinableFactoryWrapper, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats @@ -118,7 +119,6 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker this.emitter = Preconditions.checkNotNull(emitter, "emitter"); this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate"); this.queryProcessingPool = Preconditions.checkNotNull(queryProcessingPool, "queryProcessingPool"); - this.joinableFactoryWrapper = joinableFactoryWrapper; this.cache = Preconditions.checkNotNull(cache, "cache"); this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig"); this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats"); @@ -186,110 +186,170 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker // We compute the join cache key here itself so it doesn't need to be re-computed for every segment final Optional cacheKeyPrefix = Optional.ofNullable(query.getDataSource().getCacheKey()); - Iterable> perSegmentRunners = Iterables.transform( - specs, - newDescriptor -> { - final SegmentDescriptor descriptor = newIdToBasePendingSegment.getOrDefault(newDescriptor, newDescriptor); - final PartitionChunk chunk = sinkTimeline.findChunk( - descriptor.getInterval(), - descriptor.getVersion(), - descriptor.getPartitionNumber() - ); + // We need to report data for each Sink all-or-nothing, which means we need to acquire references for all + // subsegments (FireHydrants) of a segment (Sink) at once. To ensure they are properly released even when a + // query fails or is canceled, we acquire *all* sink reference upfront, and release them all when the main + // QueryRunner returned by this method is closed. (We can't do the acquisition and releasing at the level of + // each FireHydrant's runner, since then it wouldn't be properly all-or-nothing on a per-Sink basis.) + final List allSegmentReferences = new ArrayList<>(); + final Map segmentIdMap = new HashMap<>(); + final LinkedHashMap>> allRunners = new LinkedHashMap<>(); - if (chunk == null) { - return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); - } - - final Sink theSink = chunk.getObject(); - final SegmentId sinkSegmentId = theSink.getSegment().getId(); - final List segmentReferences = - theSink.acquireSegmentReferences(segmentMapFn, skipIncrementalSegment); - - if (segmentReferences == null) { - // We failed to acquire references for all subsegments. Bail and report the entire sink missing. - return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); - } else if (segmentReferences.isEmpty()) { - return new NoopQueryRunner<>(); - } - - final Closeable releaser = () -> CloseableUtils.closeAll(segmentReferences); - - try { - Iterable> perHydrantRunners = new SinkQueryRunners<>( - Iterables.transform( - segmentReferences, - segmentReference -> { - QueryRunner runner = factory.createRunner(segmentReference.getSegment()); - - // 1) Only use caching if data is immutable - // 2) Hydrants are not the same between replicas, make sure cache is local - if (segmentReference.isImmutable() && cache.isLocal()) { - StorageAdapter storageAdapter = segmentReference.getSegment().asStorageAdapter(); - long segmentMinTime = storageAdapter.getMinTime().getMillis(); - long segmentMaxTime = storageAdapter.getMaxTime().getMillis(); - Interval actualDataInterval = Intervals.utc(segmentMinTime, segmentMaxTime + 1); - runner = new CachingQueryRunner<>( - makeHydrantCacheIdentifier(sinkSegmentId, segmentReference.getHydrantNumber()), - cacheKeyPrefix, - descriptor, - actualDataInterval, - objectMapper, - cache, - toolChest, - runner, - // Always populate in foreground regardless of config - new ForegroundCachePopulator( - objectMapper, - cachePopulatorStats, - cacheConfig.getMaxEntrySize() - ), - cacheConfig - ); - } - return new Pair<>(segmentReference.getSegment().getDataInterval(), runner); - } - ) - ); - return QueryRunnerHelper.makeClosingQueryRunner( - new SpecificSegmentQueryRunner<>( - withPerSinkMetrics( - new BySegmentQueryRunner<>( - sinkSegmentId, - descriptor.getInterval().getStart(), - factory.mergeRunners( - DirectQueryProcessingPool.INSTANCE, - perHydrantRunners - ) - ), - toolChest, - sinkSegmentId, - cpuTimeAccumulator - ), - new SpecificSegmentSpec(descriptor) - ), - releaser - ); - } - catch (Throwable e) { - throw CloseableUtils.closeAndWrapInCatch(e, releaser); - } - } - ); - final QueryRunner mergedRunner = - toolChest.mergeResults( - factory.mergeRunners( - queryProcessingPool, - perSegmentRunners - ) + try { + for (final SegmentDescriptor newDescriptor : specs) { + final SegmentDescriptor descriptor = newIdToBasePendingSegment.getOrDefault(newDescriptor, newDescriptor); + final PartitionChunk chunk = sinkTimeline.findChunk( + descriptor.getInterval(), + descriptor.getVersion(), + descriptor.getPartitionNumber() ); - return CPUTimeMetricQueryRunner.safeBuild( - new FinalizeResultsQueryRunner<>(mergedRunner, toolChest), - toolChest, - emitter, - cpuTimeAccumulator, - true - ); + if (chunk == null) { + allRunners.put( + descriptor, + Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor)) + ); + continue; + } + + final Sink theSink = chunk.getObject(); + final SegmentId sinkSegmentId = theSink.getSegment().getId(); + segmentIdMap.put(descriptor, sinkSegmentId); + final List sinkSegmentReferences = + theSink.acquireSegmentReferences(segmentMapFn, skipIncrementalSegment); + + if (sinkSegmentReferences == null) { + // We failed to acquire references for all subsegments. Bail and report the entire sink missing. + allRunners.put( + descriptor, + Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor)) + ); + } else if (sinkSegmentReferences.isEmpty()) { + allRunners.put(descriptor, Collections.singletonList(new NoopQueryRunner<>())); + } else { + allSegmentReferences.addAll(sinkSegmentReferences); + + allRunners.put( + descriptor, + sinkSegmentReferences.stream().map( + segmentReference -> { + QueryRunner runner = new MetricsEmittingQueryRunner<>( + emitter, + factory.getToolchest(), + factory.createRunner(segmentReference.getSegment()), + QueryMetrics::reportSegmentTime, + queryMetrics -> queryMetrics.segment(sinkSegmentId.toString()) + ); + + // 1) Only use caching if data is immutable + // 2) Hydrants are not the same between replicas, make sure cache is local + if (segmentReference.isImmutable() && cache.isLocal()) { + StorageAdapter storageAdapter = segmentReference.getSegment().asStorageAdapter(); + long segmentMinTime = storageAdapter.getMinTime().getMillis(); + long segmentMaxTime = storageAdapter.getMaxTime().getMillis(); + Interval actualDataInterval = Intervals.utc(segmentMinTime, segmentMaxTime + 1); + runner = new CachingQueryRunner<>( + makeHydrantCacheIdentifier(sinkSegmentId, segmentReference.getHydrantNumber()), + cacheKeyPrefix, + descriptor, + actualDataInterval, + objectMapper, + cache, + toolChest, + runner, + // Always populate in foreground regardless of config + new ForegroundCachePopulator( + objectMapper, + cachePopulatorStats, + cacheConfig.getMaxEntrySize() + ), + cacheConfig + ); + } + + // Regardless of whether caching is enabled, do reportSegmentAndCacheTime outside the + // *possible* caching. + runner = new MetricsEmittingQueryRunner<>( + emitter, + factory.getToolchest(), + runner, + QueryMetrics::reportSegmentAndCacheTime, + queryMetrics -> queryMetrics.segment(sinkSegmentId.toString()) + ).withWaitMeasuredFromNow(); + + // Emit CPU time metrics. + runner = CPUTimeMetricQueryRunner.safeBuild( + runner, + toolChest, + emitter, + cpuTimeAccumulator, + false + ); + + // Run with specific segment descriptor. + runner = new SpecificSegmentQueryRunner<>( + runner, + new SpecificSegmentSpec(descriptor) + ); + + return runner; + } + ).collect(Collectors.toList()) + ); + } + } + + final QueryRunner mergedRunner; + + if (query.context().isBySegment()) { + // bySegment: merge all hydrants for a Sink first, then merge Sinks. Necessary to keep results for the + // same segment together, but causes additional memory usage due to the extra layer of materialization, + // so we only do this if we need to. + mergedRunner = factory.mergeRunners( + queryProcessingPool, + allRunners.entrySet().stream().map( + entry -> new BySegmentQueryRunner<>( + segmentIdMap.get(entry.getKey()), + entry.getKey().getInterval().getStart(), + factory.mergeRunners( + DirectQueryProcessingPool.INSTANCE, + entry.getValue() + ) + ) + ).collect(Collectors.toList()) + ); + } else { + // Not bySegment: merge all hydrants at the same level, rather than grouped by Sink (segment). + mergedRunner = factory.mergeRunners( + queryProcessingPool, + new SinkQueryRunners<>( + allRunners.entrySet().stream().flatMap( + entry -> + entry.getValue().stream().map( + runner -> + Pair.of(entry.getKey().getInterval(), runner) + ) + ).collect(Collectors.toList())) + ); + } + + // 1) Merge results using the toolChest, finalize if necessary. + // 2) Measure CPU time of that operation. + // 3) Release all sink segment references. + return QueryRunnerHelper.makeClosingQueryRunner( + CPUTimeMetricQueryRunner.safeBuild( + new FinalizeResultsQueryRunner<>(toolChest.mergeResults(mergedRunner), toolChest), + toolChest, + emitter, + cpuTimeAccumulator, + true + ), + () -> CloseableUtils.closeAll(allSegmentReferences) + ); + } + catch (Throwable e) { + throw CloseableUtils.closeAndWrapInCatch(e, () -> CloseableUtils.closeAll(allSegmentReferences)); + } } public void registerNewVersionOfPendingSegment( @@ -309,43 +369,6 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker return dataSource; } - /** - * Decorates a Sink's query runner to emit query/segmentAndCache/time, query/segment/time, query/wait/time once - * each for the whole Sink. Also adds CPU time to cpuTimeAccumulator. - */ - private QueryRunner withPerSinkMetrics( - final QueryRunner sinkRunner, - final QueryToolChest> queryToolChest, - final SegmentId sinkSegmentId, - final AtomicLong cpuTimeAccumulator - ) - { - // Note: reportSegmentAndCacheTime and reportSegmentTime are effectively the same here. They don't split apart - // cache vs. non-cache due to the fact that Sinks may be partially cached and partially uncached. Making this - // better would need to involve another accumulator like the cpuTimeAccumulator that we could share with the - // sinkRunner. - String sinkSegmentIdString = sinkSegmentId.toString(); - return CPUTimeMetricQueryRunner.safeBuild( - new MetricsEmittingQueryRunner<>( - emitter, - queryToolChest, - new MetricsEmittingQueryRunner<>( - emitter, - queryToolChest, - sinkRunner, - QueryMetrics::reportSegmentTime, - queryMetrics -> queryMetrics.segment(sinkSegmentIdString) - ), - QueryMetrics::reportSegmentAndCacheTime, - queryMetrics -> queryMetrics.segment(sinkSegmentIdString) - ).withWaitMeasuredFromNow(), - queryToolChest, - emitter, - cpuTimeAccumulator, - false - ); - } - public VersionedIntervalTimeline getSinkTimeline() { return sinkTimeline; diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index bd829ccfa15..8b5b1482357 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -439,7 +439,6 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager serviceEmitter, queryRunnerFactoryConglomerateProvider.get(), queryProcessingPool, - joinableFactoryWrapper, Preconditions.checkNotNull(cache, "cache"), cacheConfig, cachePopulatorStats diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java index 2194ff70401..a271c4540c5 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java @@ -91,7 +91,6 @@ public class FlushingPlumber extends RealtimePlumber conglomerate, segmentAnnouncer, queryProcessingPool, - joinableFactory, null, null, null, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java index efabdcc1685..0380abf9f22 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java @@ -64,8 +64,6 @@ import org.apache.druid.segment.incremental.IncrementalIndexAddResult; import org.apache.druid.segment.incremental.IndexSizeExceededException; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeTuningConfig; -import org.apache.druid.segment.join.JoinableFactory; -import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.FireHydrant; @@ -142,7 +140,6 @@ public class RealtimePlumber implements Plumber QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, QueryProcessingPool queryProcessingPool, - JoinableFactory joinableFactory, DataSegmentPusher dataSegmentPusher, SegmentPublisher segmentPublisher, SegmentHandoffNotifier handoffNotifier, @@ -172,7 +169,6 @@ public class RealtimePlumber implements Plumber emitter, conglomerate, queryProcessingPool, - new JoinableFactoryWrapper(joinableFactory), cache, cacheConfig, cachePopulatorStats diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java index e2ba02cbc0e..8b19153a9de 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -112,7 +112,6 @@ public class RealtimePlumberSchool implements PlumberSchool conglomerate, segmentAnnouncer, queryProcessingPool, - joinableFactory, dataSegmentPusher, segmentPublisher, handoffNotifierFactory.createSegmentHandoffNotifier(schema.getDataSource()), diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java index bcf2f8a2216..e81fd9795d8 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java @@ -61,7 +61,6 @@ import org.apache.druid.segment.incremental.SimpleRowIngestionMeters; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; -import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; @@ -247,7 +246,6 @@ public class StreamAppenderatorTester implements AutoCloseable announcer, emitter, new ForwardingQueryProcessingPool(queryExecutor), - NoopJoinableFactory.INSTANCE, MapCache.create(2048), new CacheConfig(), new CachePopulatorStats(), @@ -295,7 +293,6 @@ public class StreamAppenderatorTester implements AutoCloseable new NoopDataSegmentAnnouncer(), emitter, new ForwardingQueryProcessingPool(queryExecutor), - NoopJoinableFactory.INSTANCE, MapCache.create(2048), new CacheConfig(), new CachePopulatorStats(),