From d9e8448c502f8bb24cef3ae4c9c6124d001ea625 Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Wed, 31 Jan 2024 09:11:00 +0530 Subject: [PATCH] Close open segments when a newer segment with higher version is allocated (#15727) --- .../appenderator/BaseAppenderatorDriver.java | 50 +++++++++++++------ .../StreamAppenderatorDriver.java | 8 +-- 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index 73d2d3717ab..e27d3ff68b6 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -38,6 +38,7 @@ import org.apache.druid.data.input.Committer; import org.apache.druid.data.input.InputRow; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; @@ -54,11 +55,10 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.NavigableMap; import java.util.Objects; import java.util.Set; import java.util.TreeMap; @@ -179,18 +179,18 @@ public abstract class BaseAppenderatorDriver implements Closeable // Interval Start millis -> List of Segments for this interval // there might be multiple segments for a start interval, for example one segment // can be in APPENDING state and others might be in PUBLISHING state - private final NavigableMap intervalToSegmentStates; + private final Map intervalToSegmentStates; // most recently allocated segment private String lastSegmentId; SegmentsForSequence() { - this.intervalToSegmentStates = new TreeMap<>(); + this.intervalToSegmentStates = new HashMap<>(); } SegmentsForSequence( - NavigableMap intervalToSegmentStates, + Map intervalToSegmentStates, String lastSegmentId ) { @@ -200,21 +200,40 @@ public abstract class BaseAppenderatorDriver implements Closeable void add(SegmentIdWithShardSpec identifier) { + for (Map.Entry entry : intervalToSegmentStates.entrySet()) { + final SegmentWithState appendingSegment = entry.getValue().getAppendingSegment(); + if (appendingSegment == null) { + continue; + } + if (identifier.getInterval().contains(entry.getKey())) { + if (identifier.getVersion().compareTo(appendingSegment.getSegmentIdentifier().getVersion()) > 0) { + entry.getValue().finishAppendingToCurrentActiveSegment(SegmentWithState::finishAppending); + } + } + } intervalToSegmentStates.computeIfAbsent( - identifier.getInterval().getStartMillis(), + identifier.getInterval(), k -> new SegmentsOfInterval(identifier.getInterval()) ).setAppendingSegment(SegmentWithState.newSegment(identifier)); lastSegmentId = identifier.toString(); } - Entry floor(long timestamp) + SegmentsOfInterval findCandidate(long timestamp) { - return intervalToSegmentStates.floorEntry(timestamp); + Interval interval = Intervals.utc(timestamp, timestamp); + SegmentsOfInterval retVal = null; + for (Map.Entry entry : intervalToSegmentStates.entrySet()) { + if (entry.getKey().contains(interval)) { + interval = entry.getKey(); + retVal = entry.getValue(); + } + } + return retVal; } - SegmentsOfInterval get(long timestamp) + SegmentsOfInterval get(Interval interval) { - return intervalToSegmentStates.get(timestamp); + return intervalToSegmentStates.get(interval); } public Stream allSegmentStateStream() @@ -291,16 +310,15 @@ public abstract class BaseAppenderatorDriver implements Closeable return null; } - final Map.Entry candidateEntry = segmentsForSequence.floor( + final SegmentsOfInterval candidate = segmentsForSequence.findCandidate( timestamp.getMillis() ); - if (candidateEntry != null) { - final SegmentsOfInterval segmentsOfInterval = candidateEntry.getValue(); - if (segmentsOfInterval.interval.contains(timestamp)) { - return segmentsOfInterval.appendingSegment == null ? + if (candidate != null) { + if (candidate.interval.contains(timestamp)) { + return candidate.appendingSegment == null ? null : - segmentsOfInterval.appendingSegment.getSegmentIdentifier(); + candidate.appendingSegment.getSegmentIdentifier(); } else { return null; } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index a645f81971e..b2b41bf44f7 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -43,12 +43,14 @@ import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -207,7 +209,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver for (final SegmentIdWithShardSpec identifier : identifiers) { log.info("Moving segment[%s] out of active list.", identifier); - final long key = identifier.getInterval().getStartMillis(); + final Interval key = identifier.getInterval(); final SegmentsOfInterval segmentsOfInterval = activeSegmentsForSequence.get(key); if (segmentsOfInterval == null || segmentsOfInterval.getAppendingSegment() == null || @@ -457,11 +459,11 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver SegmentsForSequence build() { - final NavigableMap map = new TreeMap<>(); + final Map map = new HashMap<>(); for (Entry>> entry : intervalToSegments.entrySet()) { map.put( - entry.getKey().getInterval().getStartMillis(), + entry.getKey().getInterval(), new SegmentsOfInterval(entry.getKey().getInterval(), entry.getValue().lhs, entry.getValue().rhs) ); }