diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java index 02a3da0e1d0..fdb7fcd5595 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -27,6 +27,7 @@ import com.google.common.collect.Iterables; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.SegmentLock; import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; @@ -34,6 +35,7 @@ import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.PeriodGranularity; @@ -61,11 +63,18 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -122,6 +131,63 @@ public class SegmentAllocateActionTest } } + @Test + public void testManySegmentsSameInterval_noLineageCheck() throws Exception + { + if (lockGranularity == LockGranularity.SEGMENT) { + return; + } + + final Task task = NoopTask.create(); + final int numTasks = 2; + final int numRequests = 200; + + taskActionTestKit.getTaskLockbox().add(task); + + ExecutorService allocatorService = Execs.multiThreaded(4, "allocator-%d"); + + final List> allocateTasks = new ArrayList<>(); + for (int i = 0; i < numRequests; i++) { + final String sequence = "sequence_" + (i % numTasks); + allocateTasks.add(() -> allocateWithoutLineageCheck( + task, + PARTY_TIME, + Granularities.NONE, + Granularities.HOUR, + sequence, + TaskLockType.APPEND + )); + } + + Set allocatedIds = new HashSet<>(); + for (Future future : allocatorService.invokeAll(allocateTasks)) { + allocatedIds.add(future.get()); + } + + Thread.sleep(1_000); + for (Future future : allocatorService.invokeAll(allocateTasks)) { + allocatedIds.add(future.get()); + } + + + final TaskLock lock = Iterables.getOnlyElement( + FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task)) + .filter(input -> input.getInterval().contains(PARTY_TIME)) + ); + Set expectedIds = new HashSet<>(); + for (int i = 0; i < numTasks; i++) { + expectedIds.add( + new SegmentIdWithShardSpec( + DATA_SOURCE, + Granularities.HOUR.bucket(PARTY_TIME), + lock.getVersion(), + new NumberedShardSpec(i, 0) + ) + ); + } + Assert.assertEquals(expectedIds, allocatedIds); + } + @Test public void testManySegmentsSameInterval() { @@ -1122,6 +1188,41 @@ public class SegmentAllocateActionTest ); } + private SegmentIdWithShardSpec allocateWithoutLineageCheck( + final Task task, + final DateTime timestamp, + final Granularity queryGranularity, + final Granularity preferredSegmentGranularity, + final String sequenceName, + final TaskLockType taskLockType + ) + { + final SegmentAllocateAction action = new SegmentAllocateAction( + DATA_SOURCE, + timestamp, + queryGranularity, + preferredSegmentGranularity, + sequenceName, + // prevSegmentId can vary across replicas and isn't deterministic + "random_" + ThreadLocalRandom.current().nextInt(), + true, + NumberedPartialShardSpec.instance(), + lockGranularity, + taskLockType + ); + + try { + if (useBatch) { + return action.performAsync(task, taskActionTestKit.getTaskActionToolbox()).get(); + } else { + return action.perform(task, taskActionTestKit.getTaskActionToolbox()); + } + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + private SegmentIdWithShardSpec allocate( final Task task, final DateTime timestamp, diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 96cfd5dbf04..463232012ed 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1326,7 +1326,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor { this.interval = interval; this.sequenceName = request.getSequenceName(); - this.previousSegmentId = request.getPreviousSegmentId(); + // Even if the previousSegmentId is set, disregard it when skipping lineage check for streaming ingestion + this.previousSegmentId = skipSegmentLineageCheck ? null : request.getPreviousSegmentId(); this.skipSegmentLineageCheck = skipSegmentLineageCheck; this.hashCode = Objects.hash(interval, sequenceName, previousSegmentId, skipSegmentLineageCheck);