mirror of https://github.com/apache/druid.git
Ignore append locks for compaction when using concurrent locks (#16316)
* Ignore append locks for compaction when using concurrent locks
This commit is contained in:
parent
173a206829
commit
08b5a8b88e
|
@ -960,8 +960,19 @@ public class TaskLockbox
|
||||||
}
|
}
|
||||||
|
|
||||||
final int priority = lockFilter.getPriority();
|
final int priority = lockFilter.getPriority();
|
||||||
final boolean ignoreAppendLocks =
|
final boolean isReplaceLock = TaskLockType.REPLACE.name().equals(
|
||||||
TaskLockType.REPLACE.name().equals(lockFilter.getContext().get(Tasks.TASK_LOCK_TYPE));
|
lockFilter.getContext().getOrDefault(
|
||||||
|
Tasks.TASK_LOCK_TYPE,
|
||||||
|
Tasks.DEFAULT_TASK_LOCK_TYPE
|
||||||
|
)
|
||||||
|
);
|
||||||
|
final boolean isUsingConcurrentLocks = Boolean.TRUE.equals(
|
||||||
|
lockFilter.getContext().getOrDefault(
|
||||||
|
Tasks.USE_CONCURRENT_LOCKS,
|
||||||
|
Tasks.DEFAULT_USE_CONCURRENT_LOCKS
|
||||||
|
)
|
||||||
|
);
|
||||||
|
final boolean ignoreAppendLocks = isUsingConcurrentLocks || isReplaceLock;
|
||||||
|
|
||||||
running.get(datasource).forEach(
|
running.get(datasource).forEach(
|
||||||
(startTime, startTimeLocks) -> startTimeLocks.forEach(
|
(startTime, startTimeLocks) -> startTimeLocks.forEach(
|
||||||
|
|
|
@ -1325,6 +1325,34 @@ public class TaskLockboxTest
|
||||||
Assert.assertTrue(conflictingIntervals.isEmpty());
|
Assert.assertTrue(conflictingIntervals.isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetLockedIntervalsForLowerPriorityUseConcurrentLocks()
|
||||||
|
{
|
||||||
|
final Task task = NoopTask.ofPriority(50);
|
||||||
|
lockbox.add(task);
|
||||||
|
taskStorage.insert(task, TaskStatus.running(task.getId()));
|
||||||
|
tryTimeChunkLock(
|
||||||
|
TaskLockType.APPEND,
|
||||||
|
task,
|
||||||
|
Intervals.of("2017/2018")
|
||||||
|
);
|
||||||
|
|
||||||
|
LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy(
|
||||||
|
task.getDataSource(),
|
||||||
|
25,
|
||||||
|
ImmutableMap.of(
|
||||||
|
Tasks.TASK_LOCK_TYPE,
|
||||||
|
TaskLockType.EXCLUSIVE.name(),
|
||||||
|
Tasks.USE_CONCURRENT_LOCKS,
|
||||||
|
true
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Map<String, List<Interval>> conflictingIntervals =
|
||||||
|
lockbox.getLockedIntervals(ImmutableList.of(requestForReplaceLowerPriorityLock));
|
||||||
|
Assert.assertTrue(conflictingIntervals.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testExclusiveLockCompatibility()
|
public void testExclusiveLockCompatibility()
|
||||||
|
|
Loading…
Reference in New Issue