Close open segments when a newer segment with higher version is allocated (#15727)

This commit is contained in:
AmatyaAvadhanula 2024-01-31 09:11:00 +05:30 committed by GitHub
parent dbcfb2bb8b
commit d9e8448c50
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 39 additions and 19 deletions

View File

@ -38,6 +38,7 @@ import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRow;
import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
@ -54,11 +55,10 @@ import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
@ -179,18 +179,18 @@ public abstract class BaseAppenderatorDriver implements Closeable
// Interval Start millis -> List of Segments for this interval // Interval Start millis -> List of Segments for this interval
// there might be multiple segments for a start interval, for example one segment // there might be multiple segments for a start interval, for example one segment
// can be in APPENDING state and others might be in PUBLISHING state // can be in APPENDING state and others might be in PUBLISHING state
private final NavigableMap<Long, SegmentsOfInterval> intervalToSegmentStates; private final Map<Interval, SegmentsOfInterval> intervalToSegmentStates;
// most recently allocated segment // most recently allocated segment
private String lastSegmentId; private String lastSegmentId;
SegmentsForSequence() SegmentsForSequence()
{ {
this.intervalToSegmentStates = new TreeMap<>(); this.intervalToSegmentStates = new HashMap<>();
} }
SegmentsForSequence( SegmentsForSequence(
NavigableMap<Long, SegmentsOfInterval> intervalToSegmentStates, Map<Interval, SegmentsOfInterval> intervalToSegmentStates,
String lastSegmentId String lastSegmentId
) )
{ {
@ -200,21 +200,40 @@ public abstract class BaseAppenderatorDriver implements Closeable
void add(SegmentIdWithShardSpec identifier) void add(SegmentIdWithShardSpec identifier)
{ {
for (Map.Entry<Interval, SegmentsOfInterval> entry : intervalToSegmentStates.entrySet()) {
final SegmentWithState appendingSegment = entry.getValue().getAppendingSegment();
if (appendingSegment == null) {
continue;
}
if (identifier.getInterval().contains(entry.getKey())) {
if (identifier.getVersion().compareTo(appendingSegment.getSegmentIdentifier().getVersion()) > 0) {
entry.getValue().finishAppendingToCurrentActiveSegment(SegmentWithState::finishAppending);
}
}
}
intervalToSegmentStates.computeIfAbsent( intervalToSegmentStates.computeIfAbsent(
identifier.getInterval().getStartMillis(), identifier.getInterval(),
k -> new SegmentsOfInterval(identifier.getInterval()) k -> new SegmentsOfInterval(identifier.getInterval())
).setAppendingSegment(SegmentWithState.newSegment(identifier)); ).setAppendingSegment(SegmentWithState.newSegment(identifier));
lastSegmentId = identifier.toString(); lastSegmentId = identifier.toString();
} }
Entry<Long, SegmentsOfInterval> floor(long timestamp) SegmentsOfInterval findCandidate(long timestamp)
{ {
return intervalToSegmentStates.floorEntry(timestamp); Interval interval = Intervals.utc(timestamp, timestamp);
SegmentsOfInterval retVal = null;
for (Map.Entry<Interval, SegmentsOfInterval> entry : intervalToSegmentStates.entrySet()) {
if (entry.getKey().contains(interval)) {
interval = entry.getKey();
retVal = entry.getValue();
}
}
return retVal;
} }
SegmentsOfInterval get(long timestamp) SegmentsOfInterval get(Interval interval)
{ {
return intervalToSegmentStates.get(timestamp); return intervalToSegmentStates.get(interval);
} }
public Stream<SegmentWithState> allSegmentStateStream() public Stream<SegmentWithState> allSegmentStateStream()
@ -291,16 +310,15 @@ public abstract class BaseAppenderatorDriver implements Closeable
return null; return null;
} }
final Map.Entry<Long, SegmentsOfInterval> candidateEntry = segmentsForSequence.floor( final SegmentsOfInterval candidate = segmentsForSequence.findCandidate(
timestamp.getMillis() timestamp.getMillis()
); );
if (candidateEntry != null) { if (candidate != null) {
final SegmentsOfInterval segmentsOfInterval = candidateEntry.getValue(); if (candidate.interval.contains(timestamp)) {
if (segmentsOfInterval.interval.contains(timestamp)) { return candidate.appendingSegment == null ?
return segmentsOfInterval.appendingSegment == null ?
null : null :
segmentsOfInterval.appendingSegment.getSegmentIdentifier(); candidate.appendingSegment.getSegmentIdentifier();
} else { } else {
return null; return null;
} }

View File

@ -43,12 +43,14 @@ import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -207,7 +209,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
for (final SegmentIdWithShardSpec identifier : identifiers) { for (final SegmentIdWithShardSpec identifier : identifiers) {
log.info("Moving segment[%s] out of active list.", identifier); log.info("Moving segment[%s] out of active list.", identifier);
final long key = identifier.getInterval().getStartMillis(); final Interval key = identifier.getInterval();
final SegmentsOfInterval segmentsOfInterval = activeSegmentsForSequence.get(key); final SegmentsOfInterval segmentsOfInterval = activeSegmentsForSequence.get(key);
if (segmentsOfInterval == null || if (segmentsOfInterval == null ||
segmentsOfInterval.getAppendingSegment() == null || segmentsOfInterval.getAppendingSegment() == null ||
@ -457,11 +459,11 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
SegmentsForSequence build() SegmentsForSequence build()
{ {
final NavigableMap<Long, SegmentsOfInterval> map = new TreeMap<>(); final Map<Interval, SegmentsOfInterval> map = new HashMap<>();
for (Entry<SegmentIdWithShardSpec, Pair<SegmentWithState, List<SegmentWithState>>> entry : for (Entry<SegmentIdWithShardSpec, Pair<SegmentWithState, List<SegmentWithState>>> entry :
intervalToSegments.entrySet()) { intervalToSegments.entrySet()) {
map.put( map.put(
entry.getKey().getInterval().getStartMillis(), entry.getKey().getInterval(),
new SegmentsOfInterval(entry.getKey().getInterval(), entry.getValue().lhs, entry.getValue().rhs) new SegmentsOfInterval(entry.getKey().getInterval(), entry.getValue().lhs, entry.getValue().rhs)
); );
} }