mirror of https://github.com/apache/druid.git
Optimize isOvershadowed when there is a unique minor version for an interval (#15952)
* Optimize isOvershadowed for intervals with timechunk locking
This commit is contained in:
parent
0cc54e0836
commit
488d376209
|
@ -252,6 +252,23 @@ public class VersionedIntervalTimelineBenchmark
|
|||
blackhole.consume(timeline.isOvershadowed(segment.getInterval(), segment.getVersion(), segment));
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void benchIsOvershadowedTotal(Blackhole blackhole)
|
||||
{
|
||||
blackhole.consume(isOvershadowedTotal());
|
||||
}
|
||||
|
||||
private int isOvershadowedTotal()
|
||||
{
|
||||
int overshadowedCount = 0;
|
||||
for (DataSegment segment : segments) {
|
||||
if (timeline.isOvershadowed(segment.getInterval(), segment.getVersion(), segment)) {
|
||||
overshadowedCount++;
|
||||
}
|
||||
}
|
||||
return overshadowedCount;
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void benchFindFullyOvershadowed(Blackhole blackhole)
|
||||
{
|
||||
|
|
|
@ -458,9 +458,14 @@ public class VersionedIntervalTimeline<VersionType, ObjectType extends Overshado
|
|||
if (entry != null) {
|
||||
final int majorVersionCompare = versionComparator.compare(version, entry.getVersion());
|
||||
if (majorVersionCompare == 0) {
|
||||
for (PartitionChunk<ObjectType> chunk : entry.partitionHolder) {
|
||||
if (chunk.getObject().overshadows(object)) {
|
||||
return true;
|
||||
// If the major versions of the timeline entry and target segment are equal, and
|
||||
// the maximum minor version among the segments is not greater than the minor version of the target segment,
|
||||
// none of the segments in the interval can overshadow it.
|
||||
if (entry.getMaxMinorVersion() > object.getMinorVersion()) {
|
||||
for (PartitionChunk<ObjectType> chunk : entry.partitionHolder) {
|
||||
if (chunk.getObject().overshadows(object)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
|
@ -815,6 +820,15 @@ public class VersionedIntervalTimeline<VersionType, ObjectType extends Overshado
|
|||
return partitionHolder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the maximum minor version across all the added segments.
|
||||
* We do not handle updates of this variable when segments are removed for the sake of simplicity.
|
||||
*/
|
||||
private short getMaxMinorVersion()
|
||||
{
|
||||
return partitionHolder.getMaxMinorVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
|
|
@ -34,16 +34,24 @@ public class PartitionHolder<T extends Overshadowable<T>> implements Iterable<Pa
|
|||
{
|
||||
private final OvershadowableManager<T> overshadowableManager;
|
||||
|
||||
private short maxMinorVersion;
|
||||
|
||||
public static <T extends Overshadowable<T>> PartitionHolder<T> copyWithOnlyVisibleChunks(
|
||||
PartitionHolder<T> partitionHolder
|
||||
)
|
||||
{
|
||||
return new PartitionHolder<>(OvershadowableManager.copyVisible(partitionHolder.overshadowableManager));
|
||||
return new PartitionHolder<>(
|
||||
OvershadowableManager.copyVisible(partitionHolder.overshadowableManager),
|
||||
partitionHolder.maxMinorVersion
|
||||
);
|
||||
}
|
||||
|
||||
public static <T extends Overshadowable<T>> PartitionHolder<T> deepCopy(PartitionHolder<T> partitionHolder)
|
||||
{
|
||||
return new PartitionHolder<>(OvershadowableManager.deepCopy(partitionHolder.overshadowableManager));
|
||||
return new PartitionHolder<>(
|
||||
OvershadowableManager.deepCopy(partitionHolder.overshadowableManager),
|
||||
partitionHolder.maxMinorVersion
|
||||
);
|
||||
}
|
||||
|
||||
public PartitionHolder(PartitionChunk<T> initialChunk)
|
||||
|
@ -60,14 +68,28 @@ public class PartitionHolder<T extends Overshadowable<T>> implements Iterable<Pa
|
|||
}
|
||||
}
|
||||
|
||||
protected PartitionHolder(OvershadowableManager<T> overshadowableManager)
|
||||
protected PartitionHolder(OvershadowableManager<T> overshadowableManager, short maxMinorVersion)
|
||||
{
|
||||
this.overshadowableManager = overshadowableManager;
|
||||
this.maxMinorVersion = maxMinorVersion;
|
||||
}
|
||||
|
||||
public boolean add(PartitionChunk<T> chunk)
|
||||
{
|
||||
return overshadowableManager.addChunk(chunk);
|
||||
boolean added = overshadowableManager.addChunk(chunk);
|
||||
if (added && chunk.getObject().getMinorVersion() > maxMinorVersion) {
|
||||
maxMinorVersion = chunk.getObject().getMinorVersion();
|
||||
}
|
||||
return added;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the maximum minor version across all the added segments.
|
||||
* We do not handle updates of this variable when segments are removed for the sake of simplicity.
|
||||
*/
|
||||
public short getMaxMinorVersion()
|
||||
{
|
||||
return maxMinorVersion;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.druid.java.util.common.Intervals;
|
|||
import org.apache.druid.java.util.common.JodaUtils;
|
||||
import org.apache.druid.java.util.common.MapUtils;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.java.util.common.Stopwatch;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
|
||||
|
@ -1014,7 +1015,8 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
|
|||
@GuardedBy("pollLock")
|
||||
private void doPoll()
|
||||
{
|
||||
log.debug("Starting polling of segment table");
|
||||
final Stopwatch stopwatch = Stopwatch.createStarted();
|
||||
log.info("Starting polling of segment table");
|
||||
|
||||
// some databases such as PostgreSQL require auto-commit turned off
|
||||
// to stream results back, enabling transactions disables auto-commit
|
||||
|
@ -1072,12 +1074,18 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
|
|||
if (segments.isEmpty()) {
|
||||
log.info("No segments found in the database!");
|
||||
} else {
|
||||
log.info("Polled and found %,d segments in the database", segments.size());
|
||||
log.info("Polled and found [%,d] segments in the database in [%,d] ms.", segments.size(), stopwatch.millisElapsed());
|
||||
}
|
||||
stopwatch.restart();
|
||||
|
||||
dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments(
|
||||
Iterables.filter(segments, Objects::nonNull), // Filter corrupted entries (see above in this method).
|
||||
dataSourceProperties
|
||||
);
|
||||
log.info(
|
||||
"Successfully created snapshot from polled segments in [%d] ms. Found [%d] overshadowed segments.",
|
||||
stopwatch.millisElapsed(), dataSourcesSnapshot.getOvershadowedSegments().size()
|
||||
);
|
||||
}
|
||||
|
||||
private static ImmutableMap<String, String> createDefaultDataSourceProperties()
|
||||
|
|
Loading…
Reference in New Issue