Improve documentation for tombstone generation and minor improvement (#13907)

* As a follow up to #13893, this PR improves the comments added along with examples for the code, as well as adds handling for an edge case where the generated tombstone boundaries were overshooting the bounds of MIN_TIME (or MAX_TIME).
This commit is contained in:
Laksh Singla 2023-03-10 06:59:51 +05:30 committed by GitHub
parent 5b0b3a9b2c
commit c16d9da35a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 68 additions and 16 deletions

View File

@ -26,6 +26,7 @@ import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.indexing.DataSchema;
@ -93,7 +94,7 @@ public class TombstoneHelper
List<Interval> retVal = new ArrayList<>();
GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
List<Interval> pushedSegmentsIntervals = getCondensedPushedSegmentsIntervals(pushedSegments);
List<Interval> intervalsForUsedSegments = getCondensedUsedIntervals(
List<Interval> intervalsForUsedSegments = getExistingNonEmptyIntervalsOfDatasource(
dataSchema.getGranularitySpec().inputIntervals(),
dataSchema.getDataSource()
);
@ -165,11 +166,12 @@ public class TombstoneHelper
}
/**
* @param intervalsToDrop Empty intervals in the query that need to be dropped. They should be aligned with the
* replaceGranularity
* See the method body for an example and an indepth explanation as to how the replace interval is created
* @param intervalsToDrop Empty intervals in the query that need to be dropped. They should be aligned with the
* replaceGranularity
* @param intervalsToReplace Intervals in the query which are eligible for replacement with new data.
* They should be aligned with the replaceGranularity
* @param dataSource Datasource on which the replace is to be performed
* @param dataSource Datasource on which the replace is to be performed
* @param replaceGranularity Granularity of the replace query
* @return Intervals computed for the tombstones
* @throws IOException
@ -182,7 +184,7 @@ public class TombstoneHelper
) throws IOException
{
Set<Interval> retVal = new HashSet<>();
List<Interval> usedIntervals = getCondensedUsedIntervals(intervalsToReplace, dataSource);
List<Interval> usedIntervals = getExistingNonEmptyIntervalsOfDatasource(intervalsToReplace, dataSource);
for (Interval intervalToDrop : intervalsToDrop) {
for (Interval usedInterval : usedIntervals) {
@ -194,22 +196,29 @@ public class TombstoneHelper
continue;
}
// Overlap might not be aligned with the granularity if the used interval is not aligned with the granularity
// However we align the boundaries manually, in the following code.
// "overlap" might not be aligned with the if the used interval is not aligned with the granularity of
// the REPLACE i.e. datasource's original granularity and replace's granularity are different
// However, we align the boundaries of the overlap with the replaceGranularity manually, in the following code.
// If the start is aligned, then bucketStart is idempotent, else it will return the latest timestamp less than
// overlap.getStart() which aligns with the replace granularity. That extra interval that we are including
// before the overlap should be contained in intervalToDrop because intervalToDrop is aligned by the
// replaceGranularity, and the overlap's beginning would always be later than intervalToDrop (trivially,
// because its the overlap) and if bucketStart floors the overlap beginning, it cannot floor it before
// the intervalToDrop's start
DateTime alignedIntervalStart = replaceGranularity.bucketStart(overlap.getStart());
long alignedIntervalStartMillis = Math.max(alignedIntervalStart.getMillis(), JodaUtils.MIN_INSTANT);
// If the start is aligned, then 'bucketStart()' is unchanged.
// Else 'bucketStart()' will return the latest timestamp less than overlap.getStart() which aligns with the REPLACE granularity.
// That extra interval that we are adding before the overlap should be contained in 'intervalToDrop' because
// intervalToDrop is aligned by the replaceGranularity.
// If the drop's interval is n, then the extra interval would start from n + 1 (where 1 denotes the replaceGranularity)
// The overlap's beginning would always be later than intervalToDrop (trivially,
// because it is the overlap) and if bucketStart floors the overlap beginning, it cannot floor it before
// the intervalToDrop's start
// For example, if the replace granularity is DAY, intervalsToReplace are 20/02/2023 - 24/02/2023 (always
// aligned with the replaceGranularity), intervalsToDrop are 22/02/2023 - 24/02/2023 (they must also be aligned with the replaceGranularity)
// If the relevant usedIntervals for the datasource are from 22/02/2023 01:00:00 - 23/02/2023 02:00:00, then
// the overlap would be 22/02/2023 01:00:00 - 23/02/2023 02:00:00. When iterating over the overlap we will get
// the intervals from 22/02/2023 - 23/02/2023, and 23/02/2023 - 24/02/2023
// the intervals from 22/02/2023 01:00:00 - 23/02/2023 02:00:00. After aligning it would become
// 22/02/2023T00:00:00Z - 23/02/2023T23:59:59Z
// If the end is aligned, then we do not alter it, else we align the end by geting the earliest time later
// than the overlap's end which aligns with the replace granularity. Using the above-mentioned logic for the
@ -220,7 +229,8 @@ public class TombstoneHelper
} else {
alignedIntervalEnd = replaceGranularity.bucketEnd(overlap.getEnd());
}
Interval alignedTombstoneInterval = new Interval(alignedIntervalStart, alignedIntervalEnd);
long alignedIntervalEndMillis = Math.min(alignedIntervalEnd.getMillis(), JodaUtils.MAX_INSTANT);
Interval alignedTombstoneInterval = Intervals.utc(alignedIntervalStartMillis, alignedIntervalEndMillis);
retVal.add(alignedTombstoneInterval);
}
@ -259,13 +269,16 @@ public class TombstoneHelper
/**
* Helper method to prune required tombstones. Only tombstones that cover used intervals will be created
* since those that not cover used intervals will be redundant.
* Example:
* For a datasource having segments for 2020-01-01/2020-12-31 and 2022-01-01/2022-12-31, this method would return
* the segment 2020-01-01/2020-12-31 if the input intervals asked for the segment between 2019 and 2021.
*
* @param inputIntervals Intervals corresponding to the task
* @param dataSource Datasource corresponding to the task
* @return Intervals corresponding to used segments that overlap with any of the spec's input intervals
* @throws IOException If used segments cannot be retrieved
*/
private List<Interval> getCondensedUsedIntervals(
private List<Interval> getExistingNonEmptyIntervalsOfDatasource(
List<Interval> inputIntervals,
String dataSource
) throws IOException

View File

@ -27,6 +27,7 @@ import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.indexing.DataSchema;
@ -369,6 +370,44 @@ public class TombstoneHelperTest
Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30")), tombstoneIntervals);
}
@Test
public void testTombstoneIntervalsCreatedForReplaceWhenReplaceAll() throws IOException
{
Interval usedInterval = Intervals.ETERNITY;
Interval replaceInterval = Intervals.ETERNITY;
List<Interval> intervalsToDrop = ImmutableList.of(
Intervals.utc(JodaUtils.MIN_INSTANT, 10000),
Intervals.utc(100000, JodaUtils.MAX_INSTANT)
);
Granularity replaceGranularity = Granularities.DAY;
DataSegment existingUsedSegment =
DataSegment.builder()
.dataSource("test")
.interval(usedInterval)
.version("oldVersion")
.size(100)
.build();
Assert.assertFalse(existingUsedSegment.isTombstone());
Mockito.when(taskActionClient.submit(any(TaskAction.class)))
.thenReturn(Collections.singletonList(existingUsedSegment));
TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);
Set<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace(
intervalsToDrop,
ImmutableList.of(replaceInterval),
"test",
replaceGranularity
);
Assert.assertEquals(
ImmutableSet.of(
Intervals.of("-146136543-09-08T08:23:32.096Z/1970-01-02T00:00:00.000Z"),
Intervals.of("1970-01-01T00:00:00.000Z/146140482-04-24T15:36:27.903Z")
),
tombstoneIntervals
);
}
@Test
public void testTombstoneSegmentsForReplaceWhenLockRevoked() throws IOException
{