diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 96451b7c570..626f4e33344 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -51,6 +52,7 @@ import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; import java.util.Set; +import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -66,11 +68,14 @@ import java.util.stream.StreamSupport; */ public class TaskLockbox { - // Datasource -> Interval -> list of (Tasks + TaskLock) + // Datasource -> startTime -> Interval -> list of (Tasks + TaskLock) // Multiple shared locks can be acquired for the same dataSource and interval. // Note that revoked locks are also maintained in this map to notify that those locks are revoked to the callers when // they acquire the same locks again. - private final Map>> running = new HashMap<>(); + // Also, the key of the second inner map is the start time to find all intervals properly starting with the same + // startTime. + private final Map>>> running = new HashMap<>(); + private final TaskStorage taskStorage; private final ReentrantLock giant = new ReentrantLock(true); private final Condition lockReleaseCondition = giant.newCondition(); @@ -326,7 +331,14 @@ public class TaskLockbox final TaskLockType lockType ) { - return createOrFindLockPosse(task, interval, null, lockType); + giant.lock(); + + try { + return createOrFindLockPosse(task, interval, null, lockType); + } + finally { + giant.unlock(); + } } /** @@ -584,7 +596,8 @@ public class TaskLockbox final TaskLockPosse posseToUse = new TaskLockPosse( new TaskLock(lockType, groupId, dataSource, interval, version, priority, revoked) ); - running.computeIfAbsent(dataSource, k -> new TreeMap<>(Comparators.intervalsByStartThenEnd())) + running.computeIfAbsent(dataSource, k -> new TreeMap<>()) + .computeIfAbsent(interval.getStart(), k -> new TreeMap<>(Comparators.intervalsByStartThenEnd())) .computeIfAbsent(interval, k -> new ArrayList<>()) .add(posseToUse); @@ -612,7 +625,7 @@ public class TaskLockbox CriticalAction action ) throws Exception { - giant.lockInterruptibly(); + giant.lock(); try { return action.perform(isTaskLocksValid(task, intervals)); @@ -624,13 +637,19 @@ public class TaskLockbox private boolean isTaskLocksValid(Task task, List intervals) { - return intervals - .stream() - .allMatch(interval -> { - final TaskLock lock = getOnlyTaskLockPosseContainingInterval(task, interval).getTaskLock(); - // Tasks cannot enter the critical section with a shared lock - return !lock.isRevoked() && lock.getType() != TaskLockType.SHARED; - }); + giant.lock(); + try { + return intervals + .stream() + .allMatch(interval -> { + final TaskLock lock = getOnlyTaskLockPosseContainingInterval(task, interval).getTaskLock(); + // Tasks cannot enter the critical section with a shared lock + return !lock.isRevoked() && lock.getType() != TaskLockType.SHARED; + }); + } + finally { + giant.unlock(); + } } private void revokeLock(TaskLockPosse lockPosse) @@ -676,7 +695,7 @@ public class TaskLockbox final TaskLock revokedLock = lock.revokedCopy(); taskStorage.replaceLock(taskId, lock, revokedLock); - final List possesHolder = running.get(task.getDataSource()).get(lock.getInterval()); + final List possesHolder = running.get(task.getDataSource()).get(lock.getInterval().getStart()).get(lock.getInterval()); final TaskLockPosse foundPosse = possesHolder.stream() .filter(posse -> posse.getTaskLock().equals(lock)) .findFirst() @@ -733,13 +752,19 @@ public class TaskLockbox try { final String dataSource = task.getDataSource(); - final NavigableMap> dsRunning = running.get(task.getDataSource()); + final NavigableMap>> dsRunning = running.get(task.getDataSource()); if (dsRunning == null || dsRunning.isEmpty()) { return; } - final List possesHolder = dsRunning.get(interval); + final SortedMap> intervalToPosses = dsRunning.get(interval.getStart()); + + if (intervalToPosses == null || intervalToPosses.isEmpty()) { + return; + } + + final List possesHolder = intervalToPosses.get(interval); if (possesHolder == null || possesHolder.isEmpty()) { return; } @@ -760,8 +785,12 @@ public class TaskLockbox possesHolder.remove(taskLockPosse); } - if (possesHolder.size() == 0) { - dsRunning.remove(interval); + if (possesHolder.isEmpty()) { + intervalToPosses.remove(interval); + } + + if (intervalToPosses.isEmpty()) { + dsRunning.remove(interval.getStart()); } if (running.get(dataSource).size() == 0) { @@ -797,6 +826,18 @@ public class TaskLockbox } } + public void add(Task task) + { + giant.lock(); + try { + log.info("Adding task[%s] to activeTasks", task.getId()); + activeTasks.add(task.getId()); + } + finally { + giant.unlock(); + } + } + /** * Release all locks for a task and remove task from set of active tasks. Does nothing if the task is not currently locked or not an active task. * @@ -832,11 +873,12 @@ public class TaskLockbox try { // Scan through all locks for this datasource - final NavigableMap> dsRunning = running.get(task.getDataSource()); + final NavigableMap>> dsRunning = running.get(task.getDataSource()); if (dsRunning == null) { return ImmutableList.of(); } else { return dsRunning.values().stream() + .flatMap(map -> map.values().stream()) .flatMap(Collection::stream) .filter(taskLockPosse -> taskLockPosse.containsTask(task)) .collect(Collectors.toList()); @@ -870,29 +912,28 @@ public class TaskLockbox giant.lock(); try { - final NavigableMap> dsRunning = running.get(dataSource); + final NavigableMap>> dsRunning = running.get(dataSource); if (dsRunning == null) { // No locks at all return Collections.emptyList(); } else { // Tasks are indexed by locked interval, which are sorted by interval start. Intervals are non-overlapping, so: - final NavigableSet dsLockbox = dsRunning.navigableKeySet(); - final Iterable searchIntervals = Iterables.concat( + final NavigableSet dsLockbox = dsRunning.navigableKeySet(); + final Iterable searchStartTimes = Iterables.concat( // Single interval that starts at or before ours - Collections.singletonList(dsLockbox.floor(new Interval(interval.getStart(), DateTimes.MAX))), + Collections.singletonList(dsLockbox.floor(interval.getStart())), // All intervals that start somewhere between our start instant (exclusive) and end instant (exclusive) - dsLockbox.subSet( - new Interval(interval.getStart(), DateTimes.MAX), - false, - new Interval(interval.getEnd(), interval.getEnd()), - false - ) + dsLockbox.subSet(interval.getStart(), false, interval.getEnd(), false) ); - return StreamSupport.stream(searchIntervals.spliterator(), false) - .filter(searchInterval -> searchInterval != null && searchInterval.overlaps(interval)) - .flatMap(searchInterval -> dsRunning.get(searchInterval).stream()) + return StreamSupport.stream(searchStartTimes.spliterator(), false) + .filter(java.util.Objects::nonNull) + .map(dsRunning::get) + .filter(java.util.Objects::nonNull) + .flatMap(sortedMap -> sortedMap.entrySet().stream()) + .filter(entry -> entry.getKey().overlaps(interval)) + .flatMap(entry -> entry.getValue().stream()) .collect(Collectors.toList()); } } @@ -901,12 +942,24 @@ public class TaskLockbox } } - public void add(Task task) + @VisibleForTesting + TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval) { giant.lock(); + try { - log.info("Adding task[%s] to activeTasks", task.getId()); - activeTasks.add(task.getId()); + final List filteredPosses = findLockPossesContainingInterval(task.getDataSource(), interval) + .stream() + .filter(lockPosse -> lockPosse.containsTask(task)) + .collect(Collectors.toList()); + + if (filteredPosses.isEmpty()) { + throw new ISE("Cannot find locks for task[%s] and interval[%s]", task.getId(), interval); + } else if (filteredPosses.size() > 1) { + throw new ISE("There are multiple lockPosses for task[%s] and interval[%s]?", task.getId(), interval); + } else { + return filteredPosses.get(0); + } } finally { giant.unlock(); @@ -936,22 +989,6 @@ public class TaskLockbox return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority; } - private TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval) - { - final List filteredPosses = findLockPossesContainingInterval(task.getDataSource(), interval) - .stream() - .filter(lockPosse -> lockPosse.containsTask(task)) - .collect(Collectors.toList()); - - if (filteredPosses.isEmpty()) { - throw new ISE("Cannot find locks for task[%s] and interval[%s]", task.getId(), interval); - } else if (filteredPosses.size() > 1) { - throw new ISE("There are multiple lockPosses for task[%s] and interval[%s]?", task.getId(), interval); - } else { - return filteredPosses.get(0); - } - } - @VisibleForTesting Set getActiveTasks() { @@ -959,7 +996,7 @@ public class TaskLockbox } @VisibleForTesting - Map>> getAllLocks() + Map>>> getAllLocks() { return running; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index d598b1f897b..56042551251 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -35,6 +35,7 @@ import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.TaskLockbox.TaskLockPosse; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -624,6 +625,48 @@ public class TaskLockboxTest Assert.assertTrue(lockbox.getAllLocks().isEmpty()); } + @Test + public void testFindLockPosseAfterRevokeWithDifferentLockIntervals() throws EntryExistsException + { + final Task lowPriorityTask = NoopTask.create(0); + final Task highPriorityTask = NoopTask.create(10); + + taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId())); + taskStorage.insert(highPriorityTask, TaskStatus.running(highPriorityTask.getId())); + lockbox.add(lowPriorityTask); + lockbox.add(highPriorityTask); + + Assert.assertTrue( + lockbox.tryLock( + TaskLockType.EXCLUSIVE, + lowPriorityTask, Intervals.of("2018-12-16T09:00:00/2018-12-16T10:00:00") + ).isOk() + ); + + Assert.assertTrue( + lockbox.tryLock( + TaskLockType.EXCLUSIVE, + highPriorityTask, Intervals.of("2018-12-16T09:00:00/2018-12-16T09:30:00") + ).isOk() + ); + + final TaskLockPosse highLockPosse = lockbox.getOnlyTaskLockPosseContainingInterval( + highPriorityTask, + Intervals.of("2018-12-16T09:00:00/2018-12-16T09:30:00") + ); + + Assert.assertTrue(highLockPosse.containsTask(highPriorityTask)); + Assert.assertFalse(highLockPosse.getTaskLock().isRevoked()); + + final TaskLockPosse lowLockPosse = lockbox.getOnlyTaskLockPosseContainingInterval( + lowPriorityTask, + Intervals.of("2018-12-16T09:00:00/2018-12-16T10:00:00") + ); + + Assert.assertTrue(lowLockPosse.containsTask(lowPriorityTask)); + Assert.assertTrue(lowLockPosse.getTaskLock().isRevoked()); + } + private Set getAllLocks(List tasks) { return tasks.stream()