From 488d376209f5090c3cf062495c14e44983e28be2 Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Wed, 20 Mar 2024 19:30:00 +0530 Subject: [PATCH] Optimize isOvershadowed when there is a unique minor version for an interval (#15952) * Optimize isOvershadowed for intervals with timechunk locking --- .../VersionedIntervalTimelineBenchmark.java | 17 +++++++++++ .../timeline/VersionedIntervalTimeline.java | 20 +++++++++++-- .../timeline/partition/PartitionHolder.java | 30 ++++++++++++++++--- .../metadata/SqlSegmentsMetadataManager.java | 12 ++++++-- 4 files changed, 70 insertions(+), 9 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineBenchmark.java b/benchmarks/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineBenchmark.java index a8a4414614c..52fb11c8870 100644 --- a/benchmarks/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineBenchmark.java @@ -252,6 +252,23 @@ public class VersionedIntervalTimelineBenchmark blackhole.consume(timeline.isOvershadowed(segment.getInterval(), segment.getVersion(), segment)); } + @Benchmark + public void benchIsOvershadowedTotal(Blackhole blackhole) + { + blackhole.consume(isOvershadowedTotal()); + } + + private int isOvershadowedTotal() + { + int overshadowedCount = 0; + for (DataSegment segment : segments) { + if (timeline.isOvershadowed(segment.getInterval(), segment.getVersion(), segment)) { + overshadowedCount++; + } + } + return overshadowedCount; + } + @Benchmark public void benchFindFullyOvershadowed(Blackhole blackhole) { diff --git a/processing/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java b/processing/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java index 7d012e928dd..586f445fc0f 100644 --- a/processing/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java +++ b/processing/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java @@ -458,9 +458,14 @@ public class VersionedIntervalTimeline chunk : entry.partitionHolder) { - if (chunk.getObject().overshadows(object)) { - return true; + // If the major versions of the timeline entry and target segment are equal, and + // the maximum minor version among the segments is not greater than the minor version of the target segment, + // none of the segments in the interval can overshadow it. + if (entry.getMaxMinorVersion() > object.getMinorVersion()) { + for (PartitionChunk chunk : entry.partitionHolder) { + if (chunk.getObject().overshadows(object)) { + return true; + } } } return false; @@ -815,6 +820,15 @@ public class VersionedIntervalTimeline> implements Iterable overshadowableManager; + private short maxMinorVersion; + public static > PartitionHolder copyWithOnlyVisibleChunks( PartitionHolder partitionHolder ) { - return new PartitionHolder<>(OvershadowableManager.copyVisible(partitionHolder.overshadowableManager)); + return new PartitionHolder<>( + OvershadowableManager.copyVisible(partitionHolder.overshadowableManager), + partitionHolder.maxMinorVersion + ); } public static > PartitionHolder deepCopy(PartitionHolder partitionHolder) { - return new PartitionHolder<>(OvershadowableManager.deepCopy(partitionHolder.overshadowableManager)); + return new PartitionHolder<>( + OvershadowableManager.deepCopy(partitionHolder.overshadowableManager), + partitionHolder.maxMinorVersion + ); } public PartitionHolder(PartitionChunk initialChunk) @@ -60,14 +68,28 @@ public class PartitionHolder> implements Iterable overshadowableManager) + protected PartitionHolder(OvershadowableManager overshadowableManager, short maxMinorVersion) { this.overshadowableManager = overshadowableManager; + this.maxMinorVersion = maxMinorVersion; } public boolean add(PartitionChunk chunk) { - return overshadowableManager.addChunk(chunk); + boolean added = overshadowableManager.addChunk(chunk); + if (added && chunk.getObject().getMinorVersion() > maxMinorVersion) { + maxMinorVersion = chunk.getObject().getMinorVersion(); + } + return added; + } + + /** + * Returns the maximum minor version across all the added segments. + * We do not handle updates of this variable when segments are removed for the sake of simplicity. + */ + public short getMaxMinorVersion() + { + return maxMinorVersion; } @Nullable diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 66a60e072c0..1f36280eee0 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -43,6 +43,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -1014,7 +1015,8 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager @GuardedBy("pollLock") private void doPoll() { - log.debug("Starting polling of segment table"); + final Stopwatch stopwatch = Stopwatch.createStarted(); + log.info("Starting polling of segment table"); // some databases such as PostgreSQL require auto-commit turned off // to stream results back, enabling transactions disables auto-commit @@ -1072,12 +1074,18 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager if (segments.isEmpty()) { log.info("No segments found in the database!"); } else { - log.info("Polled and found %,d segments in the database", segments.size()); + log.info("Polled and found [%,d] segments in the database in [%,d] ms.", segments.size(), stopwatch.millisElapsed()); } + stopwatch.restart(); + dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments( Iterables.filter(segments, Objects::nonNull), // Filter corrupted entries (see above in this method). dataSourceProperties ); + log.info( + "Successfully created snapshot from polled segments in [%d] ms. Found [%d] overshadowed segments.", + stopwatch.millisElapsed(), dataSourcesSnapshot.getOvershadowedSegments().size() + ); } private static ImmutableMap createDefaultDataSourceProperties()