mirror of https://github.com/apache/druid.git
Fix shared lock acquisition criteria (#13390)
Currently, a shared lock is acquired only when all other locks are also shared locks. This commit updates the behaviour and acquires a shared lock only if all locks of equal or higher priority are either shared locks or are already revoked. The lock type of locks with lower priority does not matter as they can be revoked.
This commit is contained in:
parent
6c3f688a66
commit
de566eb0db
|
@ -474,8 +474,12 @@ public class TaskLockbox
|
|||
if (reusablePosses.size() == 0) {
|
||||
// case 1) this task doesn't have any lock, but others do
|
||||
|
||||
if (request.getType().equals(TaskLockType.SHARED) && isAllSharedLocks(conflictPosses)) {
|
||||
// Any number of shared locks can be acquired for the same dataSource and interval.
|
||||
if (request.getType().equals(TaskLockType.SHARED)
|
||||
&& areAllEqualOrHigherPriorityLocksSharedOrRevoked(conflictPosses, request.getPriority())) {
|
||||
// Any number of shared locks can be acquired for the same dataSource and interval
|
||||
// Exclusive locks of equal or greater priority, if present, must already be revoked
|
||||
// Exclusive locks of lower priority can be revoked
|
||||
revokeAllLowerPriorityNonSharedLocks(conflictPosses, request.getPriority());
|
||||
return createNewTaskLockPosse(request);
|
||||
} else {
|
||||
// During a rolling update, tasks of mixed versions can be run at the same time. Old tasks would request
|
||||
|
@ -1070,10 +1074,33 @@ public class TaskLockbox
|
|||
return running;
|
||||
}
|
||||
|
||||
private static boolean isAllSharedLocks(List<TaskLockPosse> lockPosses)
|
||||
/**
|
||||
* Check if all lockPosses are either shared
|
||||
* OR of lower priority
|
||||
* OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
|
||||
* @param lockPosses conflicting task lock posses to be checked
|
||||
* @param priority priority of the lock to be acquired
|
||||
* @return true if the condititons are met
|
||||
*/
|
||||
private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
|
||||
{
|
||||
return lockPosses.stream()
|
||||
.allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED));
|
||||
.filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
|
||||
.allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
|
||||
|| taskLockPosse.getTaskLock().isRevoked());
|
||||
}
|
||||
|
||||
/**
|
||||
* Revokes all non-shared locks with priorities lower than the provided priority
|
||||
* @param lockPosses conflicting task lock posses which may be revoked
|
||||
* @param priority priority of the lock to be acquired
|
||||
*/
|
||||
private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
|
||||
{
|
||||
lockPosses.stream()
|
||||
.filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
|
||||
.filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
|
||||
.forEach(this::revokeLock);
|
||||
}
|
||||
|
||||
private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
|
||||
|
|
|
@ -169,24 +169,81 @@ public class TaskLockboxTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testTrySharedLock()
|
||||
public void testTrySharedLock() throws EntryExistsException
|
||||
{
|
||||
final Interval interval = Intervals.of("2017-01/2017-02");
|
||||
final List<Task> tasks = new ArrayList<>();
|
||||
final Set<TaskLock> actualLocks = new HashSet<>();
|
||||
final Set<TaskLock> activeLocks = new HashSet<>();
|
||||
|
||||
// test creating new locks
|
||||
for (int i = 0; i < 5; i++) {
|
||||
final Task task = NoopTask.create(Math.min(0, (i - 1) * 10)); // the first two tasks have the same priority
|
||||
// Add an exclusive lock entry of the highest priority
|
||||
Task exclusiveHigherPriorityRevokedLockTask = NoopTask.create(100);
|
||||
tasks.add(exclusiveHigherPriorityRevokedLockTask);
|
||||
taskStorage.insert(
|
||||
exclusiveHigherPriorityRevokedLockTask,
|
||||
TaskStatus.running(exclusiveHigherPriorityRevokedLockTask.getId())
|
||||
);
|
||||
lockbox.add(exclusiveHigherPriorityRevokedLockTask);
|
||||
final TaskLock exclusiveRevokedLock = tryTimeChunkLock(
|
||||
TaskLockType.EXCLUSIVE,
|
||||
exclusiveHigherPriorityRevokedLockTask,
|
||||
interval
|
||||
).getTaskLock();
|
||||
|
||||
// Any equal or lower priority shared lock must fail
|
||||
final Task sharedLockTask = NoopTask.create(100);
|
||||
lockbox.add(sharedLockTask);
|
||||
Assert.assertFalse(tryTimeChunkLock(TaskLockType.SHARED, sharedLockTask, interval).isOk());
|
||||
|
||||
// Revoke existing active exclusive lock
|
||||
lockbox.revokeLock(exclusiveHigherPriorityRevokedLockTask.getId(), exclusiveRevokedLock);
|
||||
Assert.assertEquals(1, getAllLocks(tasks).size());
|
||||
Assert.assertEquals(0, getAllActiveLocks(tasks).size());
|
||||
Assert.assertEquals(activeLocks, getAllActiveLocks(tasks));
|
||||
|
||||
// test creating new shared locks
|
||||
for (int i = 0; i < 3; i++) {
|
||||
final Task task = NoopTask.create(Math.max(0, (i - 1) * 10)); // the first two tasks have the same priority
|
||||
tasks.add(task);
|
||||
taskStorage.insert(task, TaskStatus.running(task.getId()));
|
||||
lockbox.add(task);
|
||||
final TaskLock lock = tryTimeChunkLock(TaskLockType.SHARED, task, interval).getTaskLock();
|
||||
Assert.assertNotNull(lock);
|
||||
actualLocks.add(lock);
|
||||
activeLocks.add(lock);
|
||||
}
|
||||
Assert.assertEquals(4, getAllLocks(tasks).size());
|
||||
Assert.assertEquals(3, getAllActiveLocks(tasks).size());
|
||||
Assert.assertEquals(activeLocks, getAllActiveLocks(tasks));
|
||||
|
||||
// Adding an exclusive task lock of priority 15 should revoke all existing active locks
|
||||
Task exclusiveLowerPriorityLockTask = NoopTask.create(15);
|
||||
tasks.add(exclusiveLowerPriorityLockTask);
|
||||
taskStorage.insert(exclusiveLowerPriorityLockTask, TaskStatus.running(exclusiveLowerPriorityLockTask.getId()));
|
||||
lockbox.add(exclusiveLowerPriorityLockTask);
|
||||
final TaskLock lowerPriorityExclusiveLock = tryTimeChunkLock(
|
||||
TaskLockType.EXCLUSIVE,
|
||||
exclusiveLowerPriorityLockTask,
|
||||
interval
|
||||
).getTaskLock();
|
||||
activeLocks.clear();
|
||||
activeLocks.add(lowerPriorityExclusiveLock);
|
||||
Assert.assertEquals(5, getAllLocks(tasks).size());
|
||||
Assert.assertEquals(getAllLocks(tasks), actualLocks);
|
||||
Assert.assertEquals(1, getAllActiveLocks(tasks).size());
|
||||
Assert.assertEquals(activeLocks, getAllActiveLocks(tasks));
|
||||
|
||||
// Add new shared locks which revoke the active exclusive task lock
|
||||
activeLocks.clear();
|
||||
for (int i = 3; i < 5; i++) {
|
||||
final Task task = NoopTask.create(Math.max(0, (i - 1) * 10)); // the first two tasks have the same priority
|
||||
tasks.add(task);
|
||||
taskStorage.insert(task, TaskStatus.running(task.getId()));
|
||||
lockbox.add(task);
|
||||
final TaskLock lock = tryTimeChunkLock(TaskLockType.SHARED, task, interval).getTaskLock();
|
||||
Assert.assertNotNull(lock);
|
||||
activeLocks.add(lock);
|
||||
}
|
||||
Assert.assertEquals(7, getAllLocks(tasks).size());
|
||||
Assert.assertEquals(2, getAllActiveLocks(tasks).size());
|
||||
Assert.assertEquals(activeLocks, getAllActiveLocks(tasks));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1300,6 +1357,14 @@ public class TaskLockboxTest
|
|||
result.getTasksToFail());
|
||||
}
|
||||
|
||||
private Set<TaskLock> getAllActiveLocks(List<Task> tasks)
|
||||
{
|
||||
return tasks.stream()
|
||||
.flatMap(task -> taskStorage.getLocks(task.getId()).stream())
|
||||
.filter(taskLock -> !taskLock.isRevoked())
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
private Set<TaskLock> getAllLocks(List<Task> tasks)
|
||||
{
|
||||
return tasks.stream()
|
||||
|
|
Loading…
Reference in New Issue