mirror of https://github.com/apache/druid.git
Co-authored-by: AmatyaAvadhanula <amatya.avadhanula@imply.io>
This commit is contained in:
parent
b30eab36b9
commit
f43964a808
|
@ -103,6 +103,9 @@ public class ConcurrentReplaceAndAppendTest extends IngestionTestBase
|
||||||
private static final Interval YEAR_23 = Intervals.of("2023/2024");
|
private static final Interval YEAR_23 = Intervals.of("2023/2024");
|
||||||
private static final Interval JAN_23 = Intervals.of("2023-01/2023-02");
|
private static final Interval JAN_23 = Intervals.of("2023-01/2023-02");
|
||||||
private static final Interval DEC_23 = Intervals.of("2023-12/2024-01");
|
private static final Interval DEC_23 = Intervals.of("2023-12/2024-01");
|
||||||
|
private static final Interval JAN_FEB_MAR_23 = Intervals.of("2023-01-01/2023-04-01");
|
||||||
|
private static final Interval APR_MAY_JUN_23 = Intervals.of("2023-04-01/2023-07-01");
|
||||||
|
private static final Interval JUL_AUG_SEP_23 = Intervals.of("2023-07-01/2023-10-01");
|
||||||
private static final Interval OCT_NOV_DEC_23 = Intervals.of("2023-10-01/2024-01-01");
|
private static final Interval OCT_NOV_DEC_23 = Intervals.of("2023-10-01/2024-01-01");
|
||||||
private static final Interval FIRST_OF_JAN_23 = Intervals.of("2023-01-01/2023-01-02");
|
private static final Interval FIRST_OF_JAN_23 = Intervals.of("2023-01-01/2023-01-02");
|
||||||
|
|
||||||
|
@ -599,6 +602,185 @@ public class ConcurrentReplaceAndAppendTest extends IngestionTestBase
|
||||||
verifyIntervalHasVisibleSegments(JAN_23, segmentV01);
|
verifyIntervalHasVisibleSegments(JAN_23, segmentV01);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLockReplaceQuarterAllocateAppendYear()
|
||||||
|
{
|
||||||
|
final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
|
||||||
|
Assert.assertNotNull(replaceLock);
|
||||||
|
|
||||||
|
final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, replaceLock.getVersion());
|
||||||
|
final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, replaceLock.getVersion());
|
||||||
|
final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, replaceLock.getVersion());
|
||||||
|
final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, replaceLock.getVersion());
|
||||||
|
|
||||||
|
Assert.assertTrue(
|
||||||
|
replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4)
|
||||||
|
.isSuccess()
|
||||||
|
);
|
||||||
|
verifyIntervalHasUsedSegments(YEAR_23, segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4);
|
||||||
|
verifyIntervalHasVisibleSegments(YEAR_23, segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4);
|
||||||
|
|
||||||
|
final SegmentIdWithShardSpec pendingSegment
|
||||||
|
= appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR);
|
||||||
|
Assert.assertEquals(JAN_FEB_MAR_23, pendingSegment.getInterval());
|
||||||
|
Assert.assertEquals(replaceLock.getVersion(), pendingSegment.getVersion());
|
||||||
|
|
||||||
|
final DataSegment appendedSegment = asSegment(pendingSegment);
|
||||||
|
appendTask.commitAppendSegments(appendedSegment);
|
||||||
|
|
||||||
|
verifyIntervalHasUsedSegments(YEAR_23, appendedSegment, segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4);
|
||||||
|
verifyIntervalHasVisibleSegments(YEAR_23, appendedSegment, segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLockAllocateAppendYearReplaceQuarter()
|
||||||
|
{
|
||||||
|
final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
|
||||||
|
Assert.assertNotNull(replaceLock);
|
||||||
|
|
||||||
|
final SegmentIdWithShardSpec pendingSegment
|
||||||
|
= appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR);
|
||||||
|
Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
|
||||||
|
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
|
||||||
|
|
||||||
|
final DataSegment segmentV01 = asSegment(pendingSegment);
|
||||||
|
appendTask.commitAppendSegments(segmentV01);
|
||||||
|
|
||||||
|
verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
|
||||||
|
verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
|
||||||
|
|
||||||
|
final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, replaceLock.getVersion());
|
||||||
|
final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, replaceLock.getVersion());
|
||||||
|
final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, replaceLock.getVersion());
|
||||||
|
final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, replaceLock.getVersion());
|
||||||
|
|
||||||
|
Assert.assertFalse(
|
||||||
|
replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4)
|
||||||
|
.isSuccess()
|
||||||
|
);
|
||||||
|
|
||||||
|
verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
|
||||||
|
verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLockAllocateReplaceQuarterAppendYear()
|
||||||
|
{
|
||||||
|
final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
|
||||||
|
Assert.assertNotNull(replaceLock);
|
||||||
|
|
||||||
|
final SegmentIdWithShardSpec pendingSegment
|
||||||
|
= appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR);
|
||||||
|
Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
|
||||||
|
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
|
||||||
|
|
||||||
|
final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, replaceLock.getVersion());
|
||||||
|
final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, replaceLock.getVersion());
|
||||||
|
final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, replaceLock.getVersion());
|
||||||
|
final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, replaceLock.getVersion());
|
||||||
|
|
||||||
|
Assert.assertFalse(
|
||||||
|
replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4)
|
||||||
|
.isSuccess()
|
||||||
|
);
|
||||||
|
|
||||||
|
final DataSegment segmentV01 = asSegment(pendingSegment);
|
||||||
|
appendTask.commitAppendSegments(segmentV01);
|
||||||
|
|
||||||
|
verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
|
||||||
|
verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAllocateLockReplaceQuarterAppendYear()
|
||||||
|
{
|
||||||
|
final SegmentIdWithShardSpec pendingSegment
|
||||||
|
= appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR);
|
||||||
|
Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
|
||||||
|
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
|
||||||
|
|
||||||
|
final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
|
||||||
|
Assert.assertNotNull(replaceLock);
|
||||||
|
|
||||||
|
final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, replaceLock.getVersion());
|
||||||
|
final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, replaceLock.getVersion());
|
||||||
|
final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, replaceLock.getVersion());
|
||||||
|
final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, replaceLock.getVersion());
|
||||||
|
|
||||||
|
Assert.assertFalse(
|
||||||
|
replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4)
|
||||||
|
.isSuccess()
|
||||||
|
);
|
||||||
|
|
||||||
|
final DataSegment segmentV01 = asSegment(pendingSegment);
|
||||||
|
appendTask.commitAppendSegments(segmentV01);
|
||||||
|
|
||||||
|
verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
|
||||||
|
verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAllocateLockAppendYearReplaceQuarter()
|
||||||
|
{
|
||||||
|
final SegmentIdWithShardSpec pendingSegment
|
||||||
|
= appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR);
|
||||||
|
Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
|
||||||
|
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
|
||||||
|
|
||||||
|
final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
|
||||||
|
Assert.assertNotNull(replaceLock);
|
||||||
|
|
||||||
|
final DataSegment segmentV01 = asSegment(pendingSegment);
|
||||||
|
appendTask.commitAppendSegments(segmentV01);
|
||||||
|
|
||||||
|
verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
|
||||||
|
verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
|
||||||
|
|
||||||
|
final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, replaceLock.getVersion());
|
||||||
|
final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, replaceLock.getVersion());
|
||||||
|
final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, replaceLock.getVersion());
|
||||||
|
final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, replaceLock.getVersion());
|
||||||
|
|
||||||
|
Assert.assertFalse(
|
||||||
|
replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4)
|
||||||
|
.isSuccess()
|
||||||
|
);
|
||||||
|
|
||||||
|
verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
|
||||||
|
verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAllocateAppendLockYearReplaceQuarter()
|
||||||
|
{
|
||||||
|
final SegmentIdWithShardSpec pendingSegment
|
||||||
|
= appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR);
|
||||||
|
Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
|
||||||
|
Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
|
||||||
|
|
||||||
|
final DataSegment segmentV01 = asSegment(pendingSegment);
|
||||||
|
appendTask.commitAppendSegments(segmentV01);
|
||||||
|
|
||||||
|
verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
|
||||||
|
verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
|
||||||
|
|
||||||
|
final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
|
||||||
|
Assert.assertNotNull(replaceLock);
|
||||||
|
|
||||||
|
final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, replaceLock.getVersion());
|
||||||
|
final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, replaceLock.getVersion());
|
||||||
|
final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, replaceLock.getVersion());
|
||||||
|
final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, replaceLock.getVersion());
|
||||||
|
|
||||||
|
Assert.assertTrue(
|
||||||
|
replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4)
|
||||||
|
.isSuccess()
|
||||||
|
);
|
||||||
|
|
||||||
|
verifyIntervalHasUsedSegments(YEAR_23, segmentV01, segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4);
|
||||||
|
verifyIntervalHasVisibleSegments(YEAR_23, segmentV1Q1, segmentV1Q2, segmentV1Q3, segmentV1Q4);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAllocateAppendMonthLockReplaceDay()
|
public void testAllocateAppendMonthLockReplaceDay()
|
||||||
{
|
{
|
||||||
|
|
|
@ -32,6 +32,7 @@ import com.google.common.hash.Hashing;
|
||||||
import com.google.common.io.BaseEncoding;
|
import com.google.common.io.BaseEncoding;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import org.apache.commons.lang.StringEscapeUtils;
|
import org.apache.commons.lang.StringEscapeUtils;
|
||||||
|
import org.apache.druid.error.DruidException;
|
||||||
import org.apache.druid.error.InvalidInput;
|
import org.apache.druid.error.InvalidInput;
|
||||||
import org.apache.druid.indexing.overlord.DataSourceMetadata;
|
import org.apache.druid.indexing.overlord.DataSourceMetadata;
|
||||||
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
|
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
|
||||||
|
@ -901,7 +902,15 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
||||||
} else if (pendingSegment.getId().getVersion().compareTo(replaceVersion) >= 0) {
|
} else if (pendingSegment.getId().getVersion().compareTo(replaceVersion) >= 0) {
|
||||||
return false;
|
return false;
|
||||||
} else if (!replaceInterval.contains(pendingSegment.getId().getInterval())) {
|
} else if (!replaceInterval.contains(pendingSegment.getId().getInterval())) {
|
||||||
return false;
|
final SegmentId pendingSegmentId = pendingSegment.getId().asSegmentId();
|
||||||
|
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
|
||||||
|
.ofCategory(DruidException.Category.UNSUPPORTED)
|
||||||
|
.build(
|
||||||
|
"Replacing with a finer segment granularity than a concurrent append is unsupported."
|
||||||
|
+ " Cannot upgrade pendingSegment[%s] to version[%s] as the replace interval[%s]"
|
||||||
|
+ " does not fully contain the pendingSegment interval[%s].",
|
||||||
|
pendingSegmentId, replaceVersion, replaceInterval, pendingSegmentId.getInterval()
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
// Do not upgrade already upgraded pending segment
|
// Do not upgrade already upgraded pending segment
|
||||||
return pendingSegment.getSequenceName() == null
|
return pendingSegment.getSequenceName() == null
|
||||||
|
@ -2201,10 +2210,16 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
||||||
newInterval = replaceInterval;
|
newInterval = replaceInterval;
|
||||||
break;
|
break;
|
||||||
} else if (replaceInterval.overlaps(oldInterval)) {
|
} else if (replaceInterval.overlaps(oldInterval)) {
|
||||||
throw new ISE(
|
final String conflictingSegmentId = oldSegment.getId().toString();
|
||||||
"Incompatible segment intervals for commit: [%s] and [%s].",
|
final String upgradeVersion = upgradeSegmentToLockVersion.get(conflictingSegmentId);
|
||||||
oldInterval, replaceInterval
|
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
|
||||||
);
|
.ofCategory(DruidException.Category.UNSUPPORTED)
|
||||||
|
.build(
|
||||||
|
"Replacing with a finer segment granularity than a concurrent append is unsupported."
|
||||||
|
+ " Cannot upgrade segment[%s] to version[%s] as the replace interval[%s]"
|
||||||
|
+ " does not fully contain the pending segment interval[%s].",
|
||||||
|
conflictingSegmentId, upgradeVersion, replaceInterval, oldInterval
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -319,6 +319,73 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
|
||||||
Assert.assertEquals(replaceLock.getVersion(), Iterables.getOnlyElement(observedLockVersions));
|
Assert.assertEquals(replaceLock.getVersion(), Iterables.getOnlyElement(observedLockVersions));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCommitReplaceSegments_partiallyOverlappingPendingSegmentUnsupported()
|
||||||
|
{
|
||||||
|
final ReplaceTaskLock replaceLock = new ReplaceTaskLock("g1", Intervals.of("2023-01-01/2023-02-01"), "2023-02-01");
|
||||||
|
final Set<DataSegment> segmentsAppendedWithReplaceLock = new HashSet<>();
|
||||||
|
final Map<DataSegment, ReplaceTaskLock> appendedSegmentToReplaceLockMap = new HashMap<>();
|
||||||
|
final PendingSegmentRecord pendingSegmentForInterval = new PendingSegmentRecord(
|
||||||
|
new SegmentIdWithShardSpec(
|
||||||
|
"foo",
|
||||||
|
Intervals.of("2023-01-01/2024-01-01"),
|
||||||
|
"2023-01-02",
|
||||||
|
new NumberedShardSpec(100, 0)
|
||||||
|
),
|
||||||
|
"",
|
||||||
|
"",
|
||||||
|
null,
|
||||||
|
"append"
|
||||||
|
);
|
||||||
|
for (int i = 1; i < 9; i++) {
|
||||||
|
final DataSegment segment = new DataSegment(
|
||||||
|
"foo",
|
||||||
|
Intervals.of("2023-01-0" + i + "/2023-01-0" + (i + 1)),
|
||||||
|
"2023-01-0" + i,
|
||||||
|
ImmutableMap.of("path", "a-" + i),
|
||||||
|
ImmutableList.of("dim1"),
|
||||||
|
ImmutableList.of("m1"),
|
||||||
|
new LinearShardSpec(0),
|
||||||
|
9,
|
||||||
|
100
|
||||||
|
);
|
||||||
|
segmentsAppendedWithReplaceLock.add(segment);
|
||||||
|
appendedSegmentToReplaceLockMap.put(segment, replaceLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
segmentSchemaTestUtils.insertUsedSegments(segmentsAppendedWithReplaceLock, Collections.emptyMap());
|
||||||
|
derbyConnector.retryWithHandle(
|
||||||
|
handle -> coordinator.insertPendingSegmentsIntoMetastore(
|
||||||
|
handle,
|
||||||
|
ImmutableList.of(pendingSegmentForInterval),
|
||||||
|
"foo",
|
||||||
|
true
|
||||||
|
)
|
||||||
|
);
|
||||||
|
insertIntoUpgradeSegmentsTable(appendedSegmentToReplaceLockMap, derbyConnectorRule.metadataTablesConfigSupplier().get());
|
||||||
|
|
||||||
|
final Set<DataSegment> replacingSegments = new HashSet<>();
|
||||||
|
for (int i = 1; i < 9; i++) {
|
||||||
|
final DataSegment segment = new DataSegment(
|
||||||
|
"foo",
|
||||||
|
Intervals.of("2023-01-01/2023-02-01"),
|
||||||
|
"2023-02-01",
|
||||||
|
ImmutableMap.of("path", "b-" + i),
|
||||||
|
ImmutableList.of("dim1"),
|
||||||
|
ImmutableList.of("m1"),
|
||||||
|
new NumberedShardSpec(i, 9),
|
||||||
|
9,
|
||||||
|
100
|
||||||
|
);
|
||||||
|
replacingSegments.add(segment);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertFalse(
|
||||||
|
coordinator.commitReplaceSegments(replacingSegments, ImmutableSet.of(replaceLock), null)
|
||||||
|
.isSuccess()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCommitReplaceSegments()
|
public void testCommitReplaceSegments()
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue