diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 35ec79d74ec..7248fcab865 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -960,8 +960,19 @@ public class TaskLockbox } final int priority = lockFilter.getPriority(); - final boolean ignoreAppendLocks = - TaskLockType.REPLACE.name().equals(lockFilter.getContext().get(Tasks.TASK_LOCK_TYPE)); + final boolean isReplaceLock = TaskLockType.REPLACE.name().equals( + lockFilter.getContext().getOrDefault( + Tasks.TASK_LOCK_TYPE, + Tasks.DEFAULT_TASK_LOCK_TYPE + ) + ); + final boolean isUsingConcurrentLocks = Boolean.TRUE.equals( + lockFilter.getContext().getOrDefault( + Tasks.USE_CONCURRENT_LOCKS, + Tasks.DEFAULT_USE_CONCURRENT_LOCKS + ) + ); + final boolean ignoreAppendLocks = isUsingConcurrentLocks || isReplaceLock; running.get(datasource).forEach( (startTime, startTimeLocks) -> startTimeLocks.forEach( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 999d4d0abb2..ab4bf3a504f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -1325,6 +1325,34 @@ public class TaskLockboxTest Assert.assertTrue(conflictingIntervals.isEmpty()); } + @Test + public void testGetLockedIntervalsForLowerPriorityUseConcurrentLocks() + { + final Task task = NoopTask.ofPriority(50); + lockbox.add(task); + taskStorage.insert(task, TaskStatus.running(task.getId())); + tryTimeChunkLock( + TaskLockType.APPEND, + task, + Intervals.of("2017/2018") + ); + + LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy( + task.getDataSource(), + 25, + ImmutableMap.of( + Tasks.TASK_LOCK_TYPE, + TaskLockType.EXCLUSIVE.name(), + Tasks.USE_CONCURRENT_LOCKS, + true + ) + ); + + Map> conflictingIntervals = + lockbox.getLockedIntervals(ImmutableList.of(requestForReplaceLowerPriorityLock)); + Assert.assertTrue(conflictingIntervals.isEmpty()); + } + @Test public void testExclusiveLockCompatibility()