mirror of https://github.com/apache/druid.git
Fix for OOM in the Tombstone generating logic in MSQ (#13893)
fix OOMs using a different logic for generating tombstones --------- Co-authored-by: Paul Rogers <paul-rogers@users.noreply.github.com>
This commit is contained in:
parent
c7f4bb5056
commit
dc67296e9d
|
@ -748,9 +748,7 @@ public class MSQReplaceTest extends MSQTestBase
|
|||
.setExpectedShardSpec(DimensionRangeShardSpec.class)
|
||||
.setExpectedTombstoneIntervals(
|
||||
ImmutableSet.of(
|
||||
Intervals.of("2001-04-01/P3M"),
|
||||
Intervals.of("2001-07-01/P3M"),
|
||||
Intervals.of("2001-10-01/P3M")
|
||||
Intervals.of("2001-04-01/2002-01-01")
|
||||
)
|
||||
)
|
||||
.setExpectedResultRows(expectedResults)
|
||||
|
|
|
@ -20,8 +20,6 @@
|
|||
package org.apache.druid.indexing.common.task.batch.parallel;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.druid.indexing.common.TaskLock;
|
||||
import org.apache.druid.indexing.common.actions.LockListAction;
|
||||
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
|
||||
|
@ -30,13 +28,13 @@ import org.apache.druid.indexing.overlord.Segments;
|
|||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.JodaUtils;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
|
||||
import org.apache.druid.segment.indexing.DataSchema;
|
||||
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
|
||||
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.apache.druid.timeline.partition.ShardSpec;
|
||||
import org.apache.druid.timeline.partition.TombstoneShardSpec;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -189,8 +187,6 @@ public class TombstoneHelper
|
|||
for (Interval intervalToDrop : intervalsToDrop) {
|
||||
for (Interval usedInterval : usedIntervals) {
|
||||
|
||||
// Overlap will always be finite (not starting from -Inf or ending at +Inf) and lesser than or
|
||||
// equal to the size of the usedInterval
|
||||
Interval overlap = intervalToDrop.overlap(usedInterval);
|
||||
|
||||
// No overlap of the dropped segment with the used interval due to which we donot need to generate any tombstone
|
||||
|
@ -199,26 +195,34 @@ public class TombstoneHelper
|
|||
}
|
||||
|
||||
// Overlap might not be aligned with the granularity if the used interval is not aligned with the granularity
|
||||
// However when fetching from the iterator, the first interval is found using the bucketStart, which
|
||||
// ensures that the interval is "rounded down" to the first timestamp that aligns with the granularity
|
||||
// Also, the interval would always be contained inside the "intervalToDrop" because the original REPLACE
|
||||
// is aligned by the granularity, and by extension all the elements inside the intervals to drop would
|
||||
// also be aligned by the same granularity (since intervalsToDrop = replaceIntervals - publishIntervals, and
|
||||
// the right-hand side is always aligned)
|
||||
//
|
||||
// However we align the boundaries manually, in the following code.
|
||||
|
||||
// If the start is aligned, then bucketStart is idempotent, else it will return the latest timestamp less than
|
||||
// overlap.getStart() which aligns with the replace granularity. That extra interval that we are including
|
||||
// before the overlap should be contained in intervalToDrop because intervalToDrop is aligned by the
|
||||
// replaceGranularity, and the overlap's beginning would always be later than intervalToDrop (trivially,
|
||||
// because its the overlap) and if bucketStart floors the overlap beginning, it cannot floor it before
|
||||
// the intervalToDrop's start
|
||||
DateTime alignedIntervalStart = replaceGranularity.bucketStart(overlap.getStart());
|
||||
|
||||
// For example, if the replace granularity is DAY, intervalsToReplace are 20/02/2023 - 24/02/2023 (always
|
||||
// aligned with the replaceGranularity), intervalsToDrop are 22/02/2023 - 24/02/2023 (they must also be aligned with the replaceGranularity)
|
||||
// If the relevant usedIntervals for the datasource are from 22/02/2023 01:00:00 - 23/02/2023 02:00:00, then
|
||||
// the overlap would be 22/02/2023 01:00:00 - 23/02/2023 02:00:00. When iterating over the overlap we will get
|
||||
// the intervals from 22/02/2023 - 23/02/2023, and 23/02/2023 - 24/02/2023
|
||||
IntervalsByGranularity intervalsToDropByGranularity = new IntervalsByGranularity(
|
||||
ImmutableList.of(overlap),
|
||||
replaceGranularity
|
||||
);
|
||||
|
||||
// Helps in deduplication if required. Since all the intervals are uniformly granular, there should be no
|
||||
// no overlap post deduplication
|
||||
retVal.addAll(Sets.newHashSet(intervalsToDropByGranularity.granularityIntervalsIterator()));
|
||||
// If the end is aligned, then we do not alter it, else we align the end by geting the earliest time later
|
||||
// than the overlap's end which aligns with the replace granularity. Using the above-mentioned logic for the
|
||||
// start time, we can also argue that the rounded up end would be contained in the intervalToDrop
|
||||
DateTime alignedIntervalEnd;
|
||||
if (replaceGranularity.bucketStart(overlap.getEnd()).equals(overlap.getEnd())) { // Check if the end is aligned
|
||||
alignedIntervalEnd = overlap.getEnd();
|
||||
} else {
|
||||
alignedIntervalEnd = replaceGranularity.bucketEnd(overlap.getEnd());
|
||||
}
|
||||
Interval alignedTombstoneInterval = new Interval(alignedIntervalStart, alignedIntervalEnd);
|
||||
|
||||
retVal.add(alignedTombstoneInterval);
|
||||
}
|
||||
}
|
||||
return retVal;
|
||||
|
|
|
@ -147,7 +147,7 @@ public class TombstoneHelperTest
|
|||
replaceGranularity
|
||||
);
|
||||
Assert.assertEquals(
|
||||
ImmutableSet.of(Intervals.of("2020-03-05/2020-03-06"), Intervals.of("2020-03-06/2020-03-07")),
|
||||
ImmutableSet.of(Intervals.of("2020-03-05/2020-03-07")),
|
||||
tombstoneIntervals
|
||||
);
|
||||
}
|
||||
|
@ -183,8 +183,7 @@ public class TombstoneHelperTest
|
|||
Assert.assertEquals(
|
||||
ImmutableSet.of(
|
||||
Intervals.of("2020-03-01/2020-04-01"),
|
||||
Intervals.of("2020-07-01/2020-08-01"),
|
||||
Intervals.of("2020-08-01/2020-09-01")
|
||||
Intervals.of("2020-07-01/2020-09-01")
|
||||
),
|
||||
tombstoneIntervals
|
||||
);
|
||||
|
@ -248,6 +247,128 @@ public class TombstoneHelperTest
|
|||
Assert.assertEquals(ImmutableSet.of(), tombstoneIntervals);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnLeft() throws IOException
|
||||
{
|
||||
Interval usedInterval = Intervals.of("2020-01-01/2020-12-31");
|
||||
Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31");
|
||||
Interval intervalToDrop = Intervals.of("2020-02-01/2020-12-31");
|
||||
Granularity replaceGranularity = Granularities.DAY;
|
||||
|
||||
DataSegment existingUsedSegment =
|
||||
DataSegment.builder()
|
||||
.dataSource("test")
|
||||
.interval(usedInterval)
|
||||
.version("oldVersion")
|
||||
.size(100)
|
||||
.build();
|
||||
Assert.assertFalse(existingUsedSegment.isTombstone());
|
||||
Mockito.when(taskActionClient.submit(any(TaskAction.class)))
|
||||
.thenReturn(Collections.singletonList(existingUsedSegment));
|
||||
TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);
|
||||
|
||||
Set<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace(
|
||||
ImmutableList.of(intervalToDrop),
|
||||
ImmutableList.of(replaceInterval),
|
||||
"test",
|
||||
replaceGranularity
|
||||
);
|
||||
Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-02-01/2020-12-31")), tombstoneIntervals);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnRight() throws IOException
|
||||
{
|
||||
Interval usedInterval = Intervals.of("2020-01-01/2020-12-31");
|
||||
Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31");
|
||||
Interval intervalToDrop = Intervals.of("2020-01-01/2020-11-30");
|
||||
Granularity replaceGranularity = Granularities.DAY;
|
||||
|
||||
DataSegment existingUsedSegment =
|
||||
DataSegment.builder()
|
||||
.dataSource("test")
|
||||
.interval(usedInterval)
|
||||
.version("oldVersion")
|
||||
.size(100)
|
||||
.build();
|
||||
Assert.assertFalse(existingUsedSegment.isTombstone());
|
||||
Mockito.when(taskActionClient.submit(any(TaskAction.class)))
|
||||
.thenReturn(Collections.singletonList(existingUsedSegment));
|
||||
TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);
|
||||
|
||||
Set<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace(
|
||||
ImmutableList.of(intervalToDrop),
|
||||
ImmutableList.of(replaceInterval),
|
||||
"test",
|
||||
replaceGranularity
|
||||
);
|
||||
Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30")), tombstoneIntervals);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesInMiddle() throws IOException
|
||||
{
|
||||
Interval usedInterval = Intervals.of("2020-01-01/2020-12-31");
|
||||
Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31");
|
||||
List<Interval> intervalsToDrop = ImmutableList.of(
|
||||
Intervals.of("2020-01-01/2020-11-30"),
|
||||
Intervals.of("2020-12-05/2020-12-30")
|
||||
);
|
||||
Granularity replaceGranularity = Granularities.DAY;
|
||||
|
||||
DataSegment existingUsedSegment =
|
||||
DataSegment.builder()
|
||||
.dataSource("test")
|
||||
.interval(usedInterval)
|
||||
.version("oldVersion")
|
||||
.size(100)
|
||||
.build();
|
||||
Assert.assertFalse(existingUsedSegment.isTombstone());
|
||||
Mockito.when(taskActionClient.submit(any(TaskAction.class)))
|
||||
.thenReturn(Collections.singletonList(existingUsedSegment));
|
||||
TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);
|
||||
|
||||
Set<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace(
|
||||
intervalsToDrop,
|
||||
ImmutableList.of(replaceInterval),
|
||||
"test",
|
||||
replaceGranularity
|
||||
);
|
||||
Assert.assertEquals(
|
||||
ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30"), Intervals.of("2020-12-05/2020-12-30")),
|
||||
tombstoneIntervals
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTombstoneIntervalsCreatedForReplaceWhenExistingGranularityIsEternity() throws IOException
|
||||
{
|
||||
Interval usedInterval = Intervals.ETERNITY;
|
||||
Interval replaceInterval = Intervals.ETERNITY;
|
||||
List<Interval> intervalsToDrop = ImmutableList.of(Intervals.of("2020-01-01/2020-11-30"));
|
||||
Granularity replaceGranularity = Granularities.DAY;
|
||||
|
||||
DataSegment existingUsedSegment =
|
||||
DataSegment.builder()
|
||||
.dataSource("test")
|
||||
.interval(usedInterval)
|
||||
.version("oldVersion")
|
||||
.size(100)
|
||||
.build();
|
||||
Assert.assertFalse(existingUsedSegment.isTombstone());
|
||||
Mockito.when(taskActionClient.submit(any(TaskAction.class)))
|
||||
.thenReturn(Collections.singletonList(existingUsedSegment));
|
||||
TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);
|
||||
|
||||
Set<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace(
|
||||
intervalsToDrop,
|
||||
ImmutableList.of(replaceInterval),
|
||||
"test",
|
||||
replaceGranularity
|
||||
);
|
||||
Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30")), tombstoneIntervals);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTombstoneSegmentsForReplaceWhenLockRevoked() throws IOException
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue