From dc67296e9d0adfb65f31a96d2653e0eb3bd40f1e Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 9 Mar 2023 11:08:08 +0530 Subject: [PATCH] Fix for OOM in the Tombstone generating logic in MSQ (#13893) fix OOMs using a different logic for generating tombstones --------- Co-authored-by: Paul Rogers --- .../apache/druid/msq/exec/MSQReplaceTest.java | 4 +- .../task/batch/parallel/TombstoneHelper.java | 42 +++--- .../batch/parallel/TombstoneHelperTest.java | 127 +++++++++++++++++- 3 files changed, 148 insertions(+), 25 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index 02c559ee257..271658e0413 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -748,9 +748,7 @@ public class MSQReplaceTest extends MSQTestBase .setExpectedShardSpec(DimensionRangeShardSpec.class) .setExpectedTombstoneIntervals( ImmutableSet.of( - Intervals.of("2001-04-01/P3M"), - Intervals.of("2001-07-01/P3M"), - Intervals.of("2001-10-01/P3M") + Intervals.of("2001-04-01/2002-01-01") ) ) .setExpectedResultRows(expectedResults) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java index 6ad3b22accb..0c915d00253 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java @@ -20,8 +20,6 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; @@ -30,13 +28,13 @@ import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.granularity.Granularity; -import org.apache.druid.java.util.common.granularity.IntervalsByGranularity; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.timeline.partition.TombstoneShardSpec; +import org.joda.time.DateTime; import org.joda.time.Interval; import java.io.IOException; @@ -189,8 +187,6 @@ public class TombstoneHelper for (Interval intervalToDrop : intervalsToDrop) { for (Interval usedInterval : usedIntervals) { - // Overlap will always be finite (not starting from -Inf or ending at +Inf) and lesser than or - // equal to the size of the usedInterval Interval overlap = intervalToDrop.overlap(usedInterval); // No overlap of the dropped segment with the used interval due to which we donot need to generate any tombstone @@ -199,26 +195,34 @@ public class TombstoneHelper } // Overlap might not be aligned with the granularity if the used interval is not aligned with the granularity - // However when fetching from the iterator, the first interval is found using the bucketStart, which - // ensures that the interval is "rounded down" to the first timestamp that aligns with the granularity - // Also, the interval would always be contained inside the "intervalToDrop" because the original REPLACE - // is aligned by the granularity, and by extension all the elements inside the intervals to drop would - // also be aligned by the same granularity (since intervalsToDrop = replaceIntervals - publishIntervals, and - // the right-hand side is always aligned) - // + // However we align the boundaries manually, in the following code. + + // If the start is aligned, then bucketStart is idempotent, else it will return the latest timestamp less than + // overlap.getStart() which aligns with the replace granularity. That extra interval that we are including + // before the overlap should be contained in intervalToDrop because intervalToDrop is aligned by the + // replaceGranularity, and the overlap's beginning would always be later than intervalToDrop (trivially, + // because its the overlap) and if bucketStart floors the overlap beginning, it cannot floor it before + // the intervalToDrop's start + DateTime alignedIntervalStart = replaceGranularity.bucketStart(overlap.getStart()); + // For example, if the replace granularity is DAY, intervalsToReplace are 20/02/2023 - 24/02/2023 (always // aligned with the replaceGranularity), intervalsToDrop are 22/02/2023 - 24/02/2023 (they must also be aligned with the replaceGranularity) // If the relevant usedIntervals for the datasource are from 22/02/2023 01:00:00 - 23/02/2023 02:00:00, then // the overlap would be 22/02/2023 01:00:00 - 23/02/2023 02:00:00. When iterating over the overlap we will get // the intervals from 22/02/2023 - 23/02/2023, and 23/02/2023 - 24/02/2023 - IntervalsByGranularity intervalsToDropByGranularity = new IntervalsByGranularity( - ImmutableList.of(overlap), - replaceGranularity - ); - // Helps in deduplication if required. Since all the intervals are uniformly granular, there should be no - // no overlap post deduplication - retVal.addAll(Sets.newHashSet(intervalsToDropByGranularity.granularityIntervalsIterator())); + // If the end is aligned, then we do not alter it, else we align the end by geting the earliest time later + // than the overlap's end which aligns with the replace granularity. Using the above-mentioned logic for the + // start time, we can also argue that the rounded up end would be contained in the intervalToDrop + DateTime alignedIntervalEnd; + if (replaceGranularity.bucketStart(overlap.getEnd()).equals(overlap.getEnd())) { // Check if the end is aligned + alignedIntervalEnd = overlap.getEnd(); + } else { + alignedIntervalEnd = replaceGranularity.bucketEnd(overlap.getEnd()); + } + Interval alignedTombstoneInterval = new Interval(alignedIntervalStart, alignedIntervalEnd); + + retVal.add(alignedTombstoneInterval); } } return retVal; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java index 3450bf27033..c5977efaa83 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java @@ -147,7 +147,7 @@ public class TombstoneHelperTest replaceGranularity ); Assert.assertEquals( - ImmutableSet.of(Intervals.of("2020-03-05/2020-03-06"), Intervals.of("2020-03-06/2020-03-07")), + ImmutableSet.of(Intervals.of("2020-03-05/2020-03-07")), tombstoneIntervals ); } @@ -183,8 +183,7 @@ public class TombstoneHelperTest Assert.assertEquals( ImmutableSet.of( Intervals.of("2020-03-01/2020-04-01"), - Intervals.of("2020-07-01/2020-08-01"), - Intervals.of("2020-08-01/2020-09-01") + Intervals.of("2020-07-01/2020-09-01") ), tombstoneIntervals ); @@ -248,6 +247,128 @@ public class TombstoneHelperTest Assert.assertEquals(ImmutableSet.of(), tombstoneIntervals); } + @Test + public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnLeft() throws IOException + { + Interval usedInterval = Intervals.of("2020-01-01/2020-12-31"); + Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31"); + Interval intervalToDrop = Intervals.of("2020-02-01/2020-12-31"); + Granularity replaceGranularity = Granularities.DAY; + + DataSegment existingUsedSegment = + DataSegment.builder() + .dataSource("test") + .interval(usedInterval) + .version("oldVersion") + .size(100) + .build(); + Assert.assertFalse(existingUsedSegment.isTombstone()); + Mockito.when(taskActionClient.submit(any(TaskAction.class))) + .thenReturn(Collections.singletonList(existingUsedSegment)); + TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); + + Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( + ImmutableList.of(intervalToDrop), + ImmutableList.of(replaceInterval), + "test", + replaceGranularity + ); + Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-02-01/2020-12-31")), tombstoneIntervals); + } + + @Test + public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnRight() throws IOException + { + Interval usedInterval = Intervals.of("2020-01-01/2020-12-31"); + Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31"); + Interval intervalToDrop = Intervals.of("2020-01-01/2020-11-30"); + Granularity replaceGranularity = Granularities.DAY; + + DataSegment existingUsedSegment = + DataSegment.builder() + .dataSource("test") + .interval(usedInterval) + .version("oldVersion") + .size(100) + .build(); + Assert.assertFalse(existingUsedSegment.isTombstone()); + Mockito.when(taskActionClient.submit(any(TaskAction.class))) + .thenReturn(Collections.singletonList(existingUsedSegment)); + TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); + + Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( + ImmutableList.of(intervalToDrop), + ImmutableList.of(replaceInterval), + "test", + replaceGranularity + ); + Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30")), tombstoneIntervals); + } + + @Test + public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesInMiddle() throws IOException + { + Interval usedInterval = Intervals.of("2020-01-01/2020-12-31"); + Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31"); + List intervalsToDrop = ImmutableList.of( + Intervals.of("2020-01-01/2020-11-30"), + Intervals.of("2020-12-05/2020-12-30") + ); + Granularity replaceGranularity = Granularities.DAY; + + DataSegment existingUsedSegment = + DataSegment.builder() + .dataSource("test") + .interval(usedInterval) + .version("oldVersion") + .size(100) + .build(); + Assert.assertFalse(existingUsedSegment.isTombstone()); + Mockito.when(taskActionClient.submit(any(TaskAction.class))) + .thenReturn(Collections.singletonList(existingUsedSegment)); + TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); + + Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( + intervalsToDrop, + ImmutableList.of(replaceInterval), + "test", + replaceGranularity + ); + Assert.assertEquals( + ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30"), Intervals.of("2020-12-05/2020-12-30")), + tombstoneIntervals + ); + } + + @Test + public void testTombstoneIntervalsCreatedForReplaceWhenExistingGranularityIsEternity() throws IOException + { + Interval usedInterval = Intervals.ETERNITY; + Interval replaceInterval = Intervals.ETERNITY; + List intervalsToDrop = ImmutableList.of(Intervals.of("2020-01-01/2020-11-30")); + Granularity replaceGranularity = Granularities.DAY; + + DataSegment existingUsedSegment = + DataSegment.builder() + .dataSource("test") + .interval(usedInterval) + .version("oldVersion") + .size(100) + .build(); + Assert.assertFalse(existingUsedSegment.isTombstone()); + Mockito.when(taskActionClient.submit(any(TaskAction.class))) + .thenReturn(Collections.singletonList(existingUsedSegment)); + TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient); + + Set tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace( + intervalsToDrop, + ImmutableList.of(replaceInterval), + "test", + replaceGranularity + ); + Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30")), tombstoneIntervals); + } + @Test public void testTombstoneSegmentsForReplaceWhenLockRevoked() throws IOException {