mirror of https://github.com/apache/druid.git
Merge hydrant runners flatly for realtime queries. (#15757)
* Merge hydrant runners flatly for realtime queries. Prior to this patch, we have two layers of mergeRunners for realtime queries: one for each Sink (a logical segment) and one across all Sinks. This is done because to keep metrics and results grouped by Sink, given that each FireHydrant within a Sink has its own separate storage adapter. However, it costs extra memory usage due to the extra layer of materialization. This is especially pronounced for groupBy queries, which only use their merge buffers at the top layer of merging. The lower layer of merging materializes ResultRows directly into the heap, which can cause heap exhaustion if there are enough ResultRows. This patch changes to a single layer of merging when bySegment: false, just like Historicals. To accommodate that, segment metrics like query/segment/time are now per-FireHydrant instead of per-Sink. Two layers of merging are retained when bySegment: true. This isn't common, because it's typically only used when segment level caching is enabled on the Broker, which is off by default. * Use SinkQueryRunners. * Remove unused method.
This commit is contained in:
parent
0ab2781a7f
commit
01e9d963bd
|
@ -89,7 +89,6 @@ public class TestAppenderatorsManager implements AppenderatorsManager
|
||||||
segmentAnnouncer,
|
segmentAnnouncer,
|
||||||
emitter,
|
emitter,
|
||||||
queryProcessingPool,
|
queryProcessingPool,
|
||||||
joinableFactory,
|
|
||||||
cache,
|
cache,
|
||||||
cacheConfig,
|
cacheConfig,
|
||||||
cachePopulatorStats,
|
cachePopulatorStats,
|
||||||
|
|
|
@ -32,8 +32,6 @@ import org.apache.druid.segment.IndexMerger;
|
||||||
import org.apache.druid.segment.incremental.ParseExceptionHandler;
|
import org.apache.druid.segment.incremental.ParseExceptionHandler;
|
||||||
import org.apache.druid.segment.incremental.RowIngestionMeters;
|
import org.apache.druid.segment.incremental.RowIngestionMeters;
|
||||||
import org.apache.druid.segment.indexing.DataSchema;
|
import org.apache.druid.segment.indexing.DataSchema;
|
||||||
import org.apache.druid.segment.join.JoinableFactory;
|
|
||||||
import org.apache.druid.segment.join.JoinableFactoryWrapper;
|
|
||||||
import org.apache.druid.segment.loading.DataSegmentPusher;
|
import org.apache.druid.segment.loading.DataSegmentPusher;
|
||||||
import org.apache.druid.segment.loading.SegmentLoaderConfig;
|
import org.apache.druid.segment.loading.SegmentLoaderConfig;
|
||||||
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
|
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
|
||||||
|
@ -58,7 +56,6 @@ public class Appenderators
|
||||||
DataSegmentAnnouncer segmentAnnouncer,
|
DataSegmentAnnouncer segmentAnnouncer,
|
||||||
ServiceEmitter emitter,
|
ServiceEmitter emitter,
|
||||||
QueryProcessingPool queryProcessingPool,
|
QueryProcessingPool queryProcessingPool,
|
||||||
JoinableFactory joinableFactory,
|
|
||||||
Cache cache,
|
Cache cache,
|
||||||
CacheConfig cacheConfig,
|
CacheConfig cacheConfig,
|
||||||
CachePopulatorStats cachePopulatorStats,
|
CachePopulatorStats cachePopulatorStats,
|
||||||
|
@ -86,7 +83,6 @@ public class Appenderators
|
||||||
emitter,
|
emitter,
|
||||||
conglomerate,
|
conglomerate,
|
||||||
queryProcessingPool,
|
queryProcessingPool,
|
||||||
new JoinableFactoryWrapper(joinableFactory),
|
|
||||||
Preconditions.checkNotNull(cache, "cache"),
|
Preconditions.checkNotNull(cache, "cache"),
|
||||||
cacheConfig,
|
cacheConfig,
|
||||||
cachePopulatorStats
|
cachePopulatorStats
|
||||||
|
|
|
@ -115,7 +115,6 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory
|
||||||
segmentAnnouncer,
|
segmentAnnouncer,
|
||||||
emitter,
|
emitter,
|
||||||
queryProcessingPool,
|
queryProcessingPool,
|
||||||
joinableFactory,
|
|
||||||
cache,
|
cache,
|
||||||
cacheConfig,
|
cacheConfig,
|
||||||
cachePopulatorStats,
|
cachePopulatorStats,
|
||||||
|
|
|
@ -105,7 +105,6 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
|
||||||
segmentAnnouncer,
|
segmentAnnouncer,
|
||||||
emitter,
|
emitter,
|
||||||
queryProcessingPool,
|
queryProcessingPool,
|
||||||
joinableFactory,
|
|
||||||
cache,
|
cache,
|
||||||
cacheConfig,
|
cacheConfig,
|
||||||
cachePopulatorStats,
|
cachePopulatorStats,
|
||||||
|
|
|
@ -22,7 +22,6 @@ package org.apache.druid.segment.realtime.appenderator;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Iterables;
|
|
||||||
import org.apache.druid.client.CachingQueryRunner;
|
import org.apache.druid.client.CachingQueryRunner;
|
||||||
import org.apache.druid.client.cache.Cache;
|
import org.apache.druid.client.cache.Cache;
|
||||||
import org.apache.druid.client.cache.CacheConfig;
|
import org.apache.druid.client.cache.CacheConfig;
|
||||||
|
@ -59,7 +58,6 @@ import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
|
||||||
import org.apache.druid.query.spec.SpecificSegmentSpec;
|
import org.apache.druid.query.spec.SpecificSegmentSpec;
|
||||||
import org.apache.druid.segment.SegmentReference;
|
import org.apache.druid.segment.SegmentReference;
|
||||||
import org.apache.druid.segment.StorageAdapter;
|
import org.apache.druid.segment.StorageAdapter;
|
||||||
import org.apache.druid.segment.join.JoinableFactoryWrapper;
|
|
||||||
import org.apache.druid.segment.realtime.FireHydrant;
|
import org.apache.druid.segment.realtime.FireHydrant;
|
||||||
import org.apache.druid.segment.realtime.plumber.Sink;
|
import org.apache.druid.segment.realtime.plumber.Sink;
|
||||||
import org.apache.druid.segment.realtime.plumber.SinkSegmentReference;
|
import org.apache.druid.segment.realtime.plumber.SinkSegmentReference;
|
||||||
|
@ -69,13 +67,18 @@ import org.apache.druid.timeline.partition.PartitionChunk;
|
||||||
import org.apache.druid.utils.CloseableUtils;
|
import org.apache.druid.utils.CloseableUtils;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Query handler for indexing tasks.
|
* Query handler for indexing tasks.
|
||||||
|
@ -92,7 +95,6 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
|
||||||
private final ServiceEmitter emitter;
|
private final ServiceEmitter emitter;
|
||||||
private final QueryRunnerFactoryConglomerate conglomerate;
|
private final QueryRunnerFactoryConglomerate conglomerate;
|
||||||
private final QueryProcessingPool queryProcessingPool;
|
private final QueryProcessingPool queryProcessingPool;
|
||||||
private final JoinableFactoryWrapper joinableFactoryWrapper;
|
|
||||||
private final Cache cache;
|
private final Cache cache;
|
||||||
private final CacheConfig cacheConfig;
|
private final CacheConfig cacheConfig;
|
||||||
private final CachePopulatorStats cachePopulatorStats;
|
private final CachePopulatorStats cachePopulatorStats;
|
||||||
|
@ -106,7 +108,6 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
|
||||||
ServiceEmitter emitter,
|
ServiceEmitter emitter,
|
||||||
QueryRunnerFactoryConglomerate conglomerate,
|
QueryRunnerFactoryConglomerate conglomerate,
|
||||||
QueryProcessingPool queryProcessingPool,
|
QueryProcessingPool queryProcessingPool,
|
||||||
JoinableFactoryWrapper joinableFactoryWrapper,
|
|
||||||
Cache cache,
|
Cache cache,
|
||||||
CacheConfig cacheConfig,
|
CacheConfig cacheConfig,
|
||||||
CachePopulatorStats cachePopulatorStats
|
CachePopulatorStats cachePopulatorStats
|
||||||
|
@ -118,7 +119,6 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
|
||||||
this.emitter = Preconditions.checkNotNull(emitter, "emitter");
|
this.emitter = Preconditions.checkNotNull(emitter, "emitter");
|
||||||
this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate");
|
this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate");
|
||||||
this.queryProcessingPool = Preconditions.checkNotNull(queryProcessingPool, "queryProcessingPool");
|
this.queryProcessingPool = Preconditions.checkNotNull(queryProcessingPool, "queryProcessingPool");
|
||||||
this.joinableFactoryWrapper = joinableFactoryWrapper;
|
|
||||||
this.cache = Preconditions.checkNotNull(cache, "cache");
|
this.cache = Preconditions.checkNotNull(cache, "cache");
|
||||||
this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig");
|
this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig");
|
||||||
this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats");
|
this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats");
|
||||||
|
@ -186,110 +186,170 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
|
||||||
// We compute the join cache key here itself so it doesn't need to be re-computed for every segment
|
// We compute the join cache key here itself so it doesn't need to be re-computed for every segment
|
||||||
final Optional<byte[]> cacheKeyPrefix = Optional.ofNullable(query.getDataSource().getCacheKey());
|
final Optional<byte[]> cacheKeyPrefix = Optional.ofNullable(query.getDataSource().getCacheKey());
|
||||||
|
|
||||||
Iterable<QueryRunner<T>> perSegmentRunners = Iterables.transform(
|
// We need to report data for each Sink all-or-nothing, which means we need to acquire references for all
|
||||||
specs,
|
// subsegments (FireHydrants) of a segment (Sink) at once. To ensure they are properly released even when a
|
||||||
newDescriptor -> {
|
// query fails or is canceled, we acquire *all* sink reference upfront, and release them all when the main
|
||||||
final SegmentDescriptor descriptor = newIdToBasePendingSegment.getOrDefault(newDescriptor, newDescriptor);
|
// QueryRunner returned by this method is closed. (We can't do the acquisition and releasing at the level of
|
||||||
final PartitionChunk<Sink> chunk = sinkTimeline.findChunk(
|
// each FireHydrant's runner, since then it wouldn't be properly all-or-nothing on a per-Sink basis.)
|
||||||
descriptor.getInterval(),
|
final List<SinkSegmentReference> allSegmentReferences = new ArrayList<>();
|
||||||
descriptor.getVersion(),
|
final Map<SegmentDescriptor, SegmentId> segmentIdMap = new HashMap<>();
|
||||||
descriptor.getPartitionNumber()
|
final LinkedHashMap<SegmentDescriptor, List<QueryRunner<T>>> allRunners = new LinkedHashMap<>();
|
||||||
);
|
|
||||||
|
|
||||||
if (chunk == null) {
|
try {
|
||||||
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
|
for (final SegmentDescriptor newDescriptor : specs) {
|
||||||
}
|
final SegmentDescriptor descriptor = newIdToBasePendingSegment.getOrDefault(newDescriptor, newDescriptor);
|
||||||
|
final PartitionChunk<Sink> chunk = sinkTimeline.findChunk(
|
||||||
final Sink theSink = chunk.getObject();
|
descriptor.getInterval(),
|
||||||
final SegmentId sinkSegmentId = theSink.getSegment().getId();
|
descriptor.getVersion(),
|
||||||
final List<SinkSegmentReference> segmentReferences =
|
descriptor.getPartitionNumber()
|
||||||
theSink.acquireSegmentReferences(segmentMapFn, skipIncrementalSegment);
|
|
||||||
|
|
||||||
if (segmentReferences == null) {
|
|
||||||
// We failed to acquire references for all subsegments. Bail and report the entire sink missing.
|
|
||||||
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
|
|
||||||
} else if (segmentReferences.isEmpty()) {
|
|
||||||
return new NoopQueryRunner<>();
|
|
||||||
}
|
|
||||||
|
|
||||||
final Closeable releaser = () -> CloseableUtils.closeAll(segmentReferences);
|
|
||||||
|
|
||||||
try {
|
|
||||||
Iterable<QueryRunner<T>> perHydrantRunners = new SinkQueryRunners<>(
|
|
||||||
Iterables.transform(
|
|
||||||
segmentReferences,
|
|
||||||
segmentReference -> {
|
|
||||||
QueryRunner<T> runner = factory.createRunner(segmentReference.getSegment());
|
|
||||||
|
|
||||||
// 1) Only use caching if data is immutable
|
|
||||||
// 2) Hydrants are not the same between replicas, make sure cache is local
|
|
||||||
if (segmentReference.isImmutable() && cache.isLocal()) {
|
|
||||||
StorageAdapter storageAdapter = segmentReference.getSegment().asStorageAdapter();
|
|
||||||
long segmentMinTime = storageAdapter.getMinTime().getMillis();
|
|
||||||
long segmentMaxTime = storageAdapter.getMaxTime().getMillis();
|
|
||||||
Interval actualDataInterval = Intervals.utc(segmentMinTime, segmentMaxTime + 1);
|
|
||||||
runner = new CachingQueryRunner<>(
|
|
||||||
makeHydrantCacheIdentifier(sinkSegmentId, segmentReference.getHydrantNumber()),
|
|
||||||
cacheKeyPrefix,
|
|
||||||
descriptor,
|
|
||||||
actualDataInterval,
|
|
||||||
objectMapper,
|
|
||||||
cache,
|
|
||||||
toolChest,
|
|
||||||
runner,
|
|
||||||
// Always populate in foreground regardless of config
|
|
||||||
new ForegroundCachePopulator(
|
|
||||||
objectMapper,
|
|
||||||
cachePopulatorStats,
|
|
||||||
cacheConfig.getMaxEntrySize()
|
|
||||||
),
|
|
||||||
cacheConfig
|
|
||||||
);
|
|
||||||
}
|
|
||||||
return new Pair<>(segmentReference.getSegment().getDataInterval(), runner);
|
|
||||||
}
|
|
||||||
)
|
|
||||||
);
|
|
||||||
return QueryRunnerHelper.makeClosingQueryRunner(
|
|
||||||
new SpecificSegmentQueryRunner<>(
|
|
||||||
withPerSinkMetrics(
|
|
||||||
new BySegmentQueryRunner<>(
|
|
||||||
sinkSegmentId,
|
|
||||||
descriptor.getInterval().getStart(),
|
|
||||||
factory.mergeRunners(
|
|
||||||
DirectQueryProcessingPool.INSTANCE,
|
|
||||||
perHydrantRunners
|
|
||||||
)
|
|
||||||
),
|
|
||||||
toolChest,
|
|
||||||
sinkSegmentId,
|
|
||||||
cpuTimeAccumulator
|
|
||||||
),
|
|
||||||
new SpecificSegmentSpec(descriptor)
|
|
||||||
),
|
|
||||||
releaser
|
|
||||||
);
|
|
||||||
}
|
|
||||||
catch (Throwable e) {
|
|
||||||
throw CloseableUtils.closeAndWrapInCatch(e, releaser);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
final QueryRunner<T> mergedRunner =
|
|
||||||
toolChest.mergeResults(
|
|
||||||
factory.mergeRunners(
|
|
||||||
queryProcessingPool,
|
|
||||||
perSegmentRunners
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
|
|
||||||
return CPUTimeMetricQueryRunner.safeBuild(
|
if (chunk == null) {
|
||||||
new FinalizeResultsQueryRunner<>(mergedRunner, toolChest),
|
allRunners.put(
|
||||||
toolChest,
|
descriptor,
|
||||||
emitter,
|
Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor))
|
||||||
cpuTimeAccumulator,
|
);
|
||||||
true
|
continue;
|
||||||
);
|
}
|
||||||
|
|
||||||
|
final Sink theSink = chunk.getObject();
|
||||||
|
final SegmentId sinkSegmentId = theSink.getSegment().getId();
|
||||||
|
segmentIdMap.put(descriptor, sinkSegmentId);
|
||||||
|
final List<SinkSegmentReference> sinkSegmentReferences =
|
||||||
|
theSink.acquireSegmentReferences(segmentMapFn, skipIncrementalSegment);
|
||||||
|
|
||||||
|
if (sinkSegmentReferences == null) {
|
||||||
|
// We failed to acquire references for all subsegments. Bail and report the entire sink missing.
|
||||||
|
allRunners.put(
|
||||||
|
descriptor,
|
||||||
|
Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor))
|
||||||
|
);
|
||||||
|
} else if (sinkSegmentReferences.isEmpty()) {
|
||||||
|
allRunners.put(descriptor, Collections.singletonList(new NoopQueryRunner<>()));
|
||||||
|
} else {
|
||||||
|
allSegmentReferences.addAll(sinkSegmentReferences);
|
||||||
|
|
||||||
|
allRunners.put(
|
||||||
|
descriptor,
|
||||||
|
sinkSegmentReferences.stream().map(
|
||||||
|
segmentReference -> {
|
||||||
|
QueryRunner<T> runner = new MetricsEmittingQueryRunner<>(
|
||||||
|
emitter,
|
||||||
|
factory.getToolchest(),
|
||||||
|
factory.createRunner(segmentReference.getSegment()),
|
||||||
|
QueryMetrics::reportSegmentTime,
|
||||||
|
queryMetrics -> queryMetrics.segment(sinkSegmentId.toString())
|
||||||
|
);
|
||||||
|
|
||||||
|
// 1) Only use caching if data is immutable
|
||||||
|
// 2) Hydrants are not the same between replicas, make sure cache is local
|
||||||
|
if (segmentReference.isImmutable() && cache.isLocal()) {
|
||||||
|
StorageAdapter storageAdapter = segmentReference.getSegment().asStorageAdapter();
|
||||||
|
long segmentMinTime = storageAdapter.getMinTime().getMillis();
|
||||||
|
long segmentMaxTime = storageAdapter.getMaxTime().getMillis();
|
||||||
|
Interval actualDataInterval = Intervals.utc(segmentMinTime, segmentMaxTime + 1);
|
||||||
|
runner = new CachingQueryRunner<>(
|
||||||
|
makeHydrantCacheIdentifier(sinkSegmentId, segmentReference.getHydrantNumber()),
|
||||||
|
cacheKeyPrefix,
|
||||||
|
descriptor,
|
||||||
|
actualDataInterval,
|
||||||
|
objectMapper,
|
||||||
|
cache,
|
||||||
|
toolChest,
|
||||||
|
runner,
|
||||||
|
// Always populate in foreground regardless of config
|
||||||
|
new ForegroundCachePopulator(
|
||||||
|
objectMapper,
|
||||||
|
cachePopulatorStats,
|
||||||
|
cacheConfig.getMaxEntrySize()
|
||||||
|
),
|
||||||
|
cacheConfig
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Regardless of whether caching is enabled, do reportSegmentAndCacheTime outside the
|
||||||
|
// *possible* caching.
|
||||||
|
runner = new MetricsEmittingQueryRunner<>(
|
||||||
|
emitter,
|
||||||
|
factory.getToolchest(),
|
||||||
|
runner,
|
||||||
|
QueryMetrics::reportSegmentAndCacheTime,
|
||||||
|
queryMetrics -> queryMetrics.segment(sinkSegmentId.toString())
|
||||||
|
).withWaitMeasuredFromNow();
|
||||||
|
|
||||||
|
// Emit CPU time metrics.
|
||||||
|
runner = CPUTimeMetricQueryRunner.safeBuild(
|
||||||
|
runner,
|
||||||
|
toolChest,
|
||||||
|
emitter,
|
||||||
|
cpuTimeAccumulator,
|
||||||
|
false
|
||||||
|
);
|
||||||
|
|
||||||
|
// Run with specific segment descriptor.
|
||||||
|
runner = new SpecificSegmentQueryRunner<>(
|
||||||
|
runner,
|
||||||
|
new SpecificSegmentSpec(descriptor)
|
||||||
|
);
|
||||||
|
|
||||||
|
return runner;
|
||||||
|
}
|
||||||
|
).collect(Collectors.toList())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final QueryRunner<T> mergedRunner;
|
||||||
|
|
||||||
|
if (query.context().isBySegment()) {
|
||||||
|
// bySegment: merge all hydrants for a Sink first, then merge Sinks. Necessary to keep results for the
|
||||||
|
// same segment together, but causes additional memory usage due to the extra layer of materialization,
|
||||||
|
// so we only do this if we need to.
|
||||||
|
mergedRunner = factory.mergeRunners(
|
||||||
|
queryProcessingPool,
|
||||||
|
allRunners.entrySet().stream().map(
|
||||||
|
entry -> new BySegmentQueryRunner<>(
|
||||||
|
segmentIdMap.get(entry.getKey()),
|
||||||
|
entry.getKey().getInterval().getStart(),
|
||||||
|
factory.mergeRunners(
|
||||||
|
DirectQueryProcessingPool.INSTANCE,
|
||||||
|
entry.getValue()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).collect(Collectors.toList())
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
// Not bySegment: merge all hydrants at the same level, rather than grouped by Sink (segment).
|
||||||
|
mergedRunner = factory.mergeRunners(
|
||||||
|
queryProcessingPool,
|
||||||
|
new SinkQueryRunners<>(
|
||||||
|
allRunners.entrySet().stream().flatMap(
|
||||||
|
entry ->
|
||||||
|
entry.getValue().stream().map(
|
||||||
|
runner ->
|
||||||
|
Pair.of(entry.getKey().getInterval(), runner)
|
||||||
|
)
|
||||||
|
).collect(Collectors.toList()))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1) Merge results using the toolChest, finalize if necessary.
|
||||||
|
// 2) Measure CPU time of that operation.
|
||||||
|
// 3) Release all sink segment references.
|
||||||
|
return QueryRunnerHelper.makeClosingQueryRunner(
|
||||||
|
CPUTimeMetricQueryRunner.safeBuild(
|
||||||
|
new FinalizeResultsQueryRunner<>(toolChest.mergeResults(mergedRunner), toolChest),
|
||||||
|
toolChest,
|
||||||
|
emitter,
|
||||||
|
cpuTimeAccumulator,
|
||||||
|
true
|
||||||
|
),
|
||||||
|
() -> CloseableUtils.closeAll(allSegmentReferences)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
catch (Throwable e) {
|
||||||
|
throw CloseableUtils.closeAndWrapInCatch(e, () -> CloseableUtils.closeAll(allSegmentReferences));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void registerNewVersionOfPendingSegment(
|
public void registerNewVersionOfPendingSegment(
|
||||||
|
@ -309,43 +369,6 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
|
||||||
return dataSource;
|
return dataSource;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Decorates a Sink's query runner to emit query/segmentAndCache/time, query/segment/time, query/wait/time once
|
|
||||||
* each for the whole Sink. Also adds CPU time to cpuTimeAccumulator.
|
|
||||||
*/
|
|
||||||
private <T> QueryRunner<T> withPerSinkMetrics(
|
|
||||||
final QueryRunner<T> sinkRunner,
|
|
||||||
final QueryToolChest<T, ? extends Query<T>> queryToolChest,
|
|
||||||
final SegmentId sinkSegmentId,
|
|
||||||
final AtomicLong cpuTimeAccumulator
|
|
||||||
)
|
|
||||||
{
|
|
||||||
// Note: reportSegmentAndCacheTime and reportSegmentTime are effectively the same here. They don't split apart
|
|
||||||
// cache vs. non-cache due to the fact that Sinks may be partially cached and partially uncached. Making this
|
|
||||||
// better would need to involve another accumulator like the cpuTimeAccumulator that we could share with the
|
|
||||||
// sinkRunner.
|
|
||||||
String sinkSegmentIdString = sinkSegmentId.toString();
|
|
||||||
return CPUTimeMetricQueryRunner.safeBuild(
|
|
||||||
new MetricsEmittingQueryRunner<>(
|
|
||||||
emitter,
|
|
||||||
queryToolChest,
|
|
||||||
new MetricsEmittingQueryRunner<>(
|
|
||||||
emitter,
|
|
||||||
queryToolChest,
|
|
||||||
sinkRunner,
|
|
||||||
QueryMetrics::reportSegmentTime,
|
|
||||||
queryMetrics -> queryMetrics.segment(sinkSegmentIdString)
|
|
||||||
),
|
|
||||||
QueryMetrics::reportSegmentAndCacheTime,
|
|
||||||
queryMetrics -> queryMetrics.segment(sinkSegmentIdString)
|
|
||||||
).withWaitMeasuredFromNow(),
|
|
||||||
queryToolChest,
|
|
||||||
emitter,
|
|
||||||
cpuTimeAccumulator,
|
|
||||||
false
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
public VersionedIntervalTimeline<String, Sink> getSinkTimeline()
|
public VersionedIntervalTimeline<String, Sink> getSinkTimeline()
|
||||||
{
|
{
|
||||||
return sinkTimeline;
|
return sinkTimeline;
|
||||||
|
|
|
@ -439,7 +439,6 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
|
||||||
serviceEmitter,
|
serviceEmitter,
|
||||||
queryRunnerFactoryConglomerateProvider.get(),
|
queryRunnerFactoryConglomerateProvider.get(),
|
||||||
queryProcessingPool,
|
queryProcessingPool,
|
||||||
joinableFactoryWrapper,
|
|
||||||
Preconditions.checkNotNull(cache, "cache"),
|
Preconditions.checkNotNull(cache, "cache"),
|
||||||
cacheConfig,
|
cacheConfig,
|
||||||
cachePopulatorStats
|
cachePopulatorStats
|
||||||
|
|
|
@ -91,7 +91,6 @@ public class FlushingPlumber extends RealtimePlumber
|
||||||
conglomerate,
|
conglomerate,
|
||||||
segmentAnnouncer,
|
segmentAnnouncer,
|
||||||
queryProcessingPool,
|
queryProcessingPool,
|
||||||
joinableFactory,
|
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
|
|
@ -64,8 +64,6 @@ import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
|
||||||
import org.apache.druid.segment.incremental.IndexSizeExceededException;
|
import org.apache.druid.segment.incremental.IndexSizeExceededException;
|
||||||
import org.apache.druid.segment.indexing.DataSchema;
|
import org.apache.druid.segment.indexing.DataSchema;
|
||||||
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
|
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
|
||||||
import org.apache.druid.segment.join.JoinableFactory;
|
|
||||||
import org.apache.druid.segment.join.JoinableFactoryWrapper;
|
|
||||||
import org.apache.druid.segment.loading.DataSegmentPusher;
|
import org.apache.druid.segment.loading.DataSegmentPusher;
|
||||||
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
|
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
|
||||||
import org.apache.druid.segment.realtime.FireHydrant;
|
import org.apache.druid.segment.realtime.FireHydrant;
|
||||||
|
@ -142,7 +140,6 @@ public class RealtimePlumber implements Plumber
|
||||||
QueryRunnerFactoryConglomerate conglomerate,
|
QueryRunnerFactoryConglomerate conglomerate,
|
||||||
DataSegmentAnnouncer segmentAnnouncer,
|
DataSegmentAnnouncer segmentAnnouncer,
|
||||||
QueryProcessingPool queryProcessingPool,
|
QueryProcessingPool queryProcessingPool,
|
||||||
JoinableFactory joinableFactory,
|
|
||||||
DataSegmentPusher dataSegmentPusher,
|
DataSegmentPusher dataSegmentPusher,
|
||||||
SegmentPublisher segmentPublisher,
|
SegmentPublisher segmentPublisher,
|
||||||
SegmentHandoffNotifier handoffNotifier,
|
SegmentHandoffNotifier handoffNotifier,
|
||||||
|
@ -172,7 +169,6 @@ public class RealtimePlumber implements Plumber
|
||||||
emitter,
|
emitter,
|
||||||
conglomerate,
|
conglomerate,
|
||||||
queryProcessingPool,
|
queryProcessingPool,
|
||||||
new JoinableFactoryWrapper(joinableFactory),
|
|
||||||
cache,
|
cache,
|
||||||
cacheConfig,
|
cacheConfig,
|
||||||
cachePopulatorStats
|
cachePopulatorStats
|
||||||
|
|
|
@ -112,7 +112,6 @@ public class RealtimePlumberSchool implements PlumberSchool
|
||||||
conglomerate,
|
conglomerate,
|
||||||
segmentAnnouncer,
|
segmentAnnouncer,
|
||||||
queryProcessingPool,
|
queryProcessingPool,
|
||||||
joinableFactory,
|
|
||||||
dataSegmentPusher,
|
dataSegmentPusher,
|
||||||
segmentPublisher,
|
segmentPublisher,
|
||||||
handoffNotifierFactory.createSegmentHandoffNotifier(schema.getDataSource()),
|
handoffNotifierFactory.createSegmentHandoffNotifier(schema.getDataSource()),
|
||||||
|
|
|
@ -61,7 +61,6 @@ import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
|
||||||
import org.apache.druid.segment.indexing.DataSchema;
|
import org.apache.druid.segment.indexing.DataSchema;
|
||||||
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
|
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
|
||||||
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
|
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||||
import org.apache.druid.segment.join.NoopJoinableFactory;
|
|
||||||
import org.apache.druid.segment.loading.DataSegmentPusher;
|
import org.apache.druid.segment.loading.DataSegmentPusher;
|
||||||
import org.apache.druid.segment.loading.SegmentLoaderConfig;
|
import org.apache.druid.segment.loading.SegmentLoaderConfig;
|
||||||
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
|
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
|
||||||
|
@ -247,7 +246,6 @@ public class StreamAppenderatorTester implements AutoCloseable
|
||||||
announcer,
|
announcer,
|
||||||
emitter,
|
emitter,
|
||||||
new ForwardingQueryProcessingPool(queryExecutor),
|
new ForwardingQueryProcessingPool(queryExecutor),
|
||||||
NoopJoinableFactory.INSTANCE,
|
|
||||||
MapCache.create(2048),
|
MapCache.create(2048),
|
||||||
new CacheConfig(),
|
new CacheConfig(),
|
||||||
new CachePopulatorStats(),
|
new CachePopulatorStats(),
|
||||||
|
@ -295,7 +293,6 @@ public class StreamAppenderatorTester implements AutoCloseable
|
||||||
new NoopDataSegmentAnnouncer(),
|
new NoopDataSegmentAnnouncer(),
|
||||||
emitter,
|
emitter,
|
||||||
new ForwardingQueryProcessingPool(queryExecutor),
|
new ForwardingQueryProcessingPool(queryExecutor),
|
||||||
NoopJoinableFactory.INSTANCE,
|
|
||||||
MapCache.create(2048),
|
MapCache.create(2048),
|
||||||
new CacheConfig(),
|
new CacheConfig(),
|
||||||
new CachePopulatorStats(),
|
new CachePopulatorStats(),
|
||||||
|
|
Loading…
Reference in New Issue