Fix TaskLockbox when there are multiple intervals of the same start but differerent end (#6822)

* Fix TaskLockbox when there are multiple intervals of the same start but differernt end

* fix build

* fix npe
This commit is contained in:
Jihoon Son 2019-01-09 19:38:27 -08:00 committed by Jonathan Wei
parent ea973fee6b
commit 934c83bca6
2 changed files with 131 additions and 51 deletions

View File

@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -51,6 +52,7 @@ import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.Set; import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
@ -66,11 +68,14 @@ import java.util.stream.StreamSupport;
*/ */
public class TaskLockbox public class TaskLockbox
{ {
// Datasource -> Interval -> list of (Tasks + TaskLock) // Datasource -> startTime -> Interval -> list of (Tasks + TaskLock)
// Multiple shared locks can be acquired for the same dataSource and interval. // Multiple shared locks can be acquired for the same dataSource and interval.
// Note that revoked locks are also maintained in this map to notify that those locks are revoked to the callers when // Note that revoked locks are also maintained in this map to notify that those locks are revoked to the callers when
// they acquire the same locks again. // they acquire the same locks again.
private final Map<String, NavigableMap<Interval, List<TaskLockPosse>>> running = new HashMap<>(); // Also, the key of the second inner map is the start time to find all intervals properly starting with the same
// startTime.
private final Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> running = new HashMap<>();
private final TaskStorage taskStorage; private final TaskStorage taskStorage;
private final ReentrantLock giant = new ReentrantLock(true); private final ReentrantLock giant = new ReentrantLock(true);
private final Condition lockReleaseCondition = giant.newCondition(); private final Condition lockReleaseCondition = giant.newCondition();
@ -326,8 +331,15 @@ public class TaskLockbox
final TaskLockType lockType final TaskLockType lockType
) )
{ {
giant.lock();
try {
return createOrFindLockPosse(task, interval, null, lockType); return createOrFindLockPosse(task, interval, null, lockType);
} }
finally {
giant.unlock();
}
}
/** /**
* Create a new {@link TaskLockPosse} or find an existing one for the given task and interval. Note that the returned * Create a new {@link TaskLockPosse} or find an existing one for the given task and interval. Note that the returned
@ -584,7 +596,8 @@ public class TaskLockbox
final TaskLockPosse posseToUse = new TaskLockPosse( final TaskLockPosse posseToUse = new TaskLockPosse(
new TaskLock(lockType, groupId, dataSource, interval, version, priority, revoked) new TaskLock(lockType, groupId, dataSource, interval, version, priority, revoked)
); );
running.computeIfAbsent(dataSource, k -> new TreeMap<>(Comparators.intervalsByStartThenEnd())) running.computeIfAbsent(dataSource, k -> new TreeMap<>())
.computeIfAbsent(interval.getStart(), k -> new TreeMap<>(Comparators.intervalsByStartThenEnd()))
.computeIfAbsent(interval, k -> new ArrayList<>()) .computeIfAbsent(interval, k -> new ArrayList<>())
.add(posseToUse); .add(posseToUse);
@ -612,7 +625,7 @@ public class TaskLockbox
CriticalAction<T> action CriticalAction<T> action
) throws Exception ) throws Exception
{ {
giant.lockInterruptibly(); giant.lock();
try { try {
return action.perform(isTaskLocksValid(task, intervals)); return action.perform(isTaskLocksValid(task, intervals));
@ -624,6 +637,8 @@ public class TaskLockbox
private boolean isTaskLocksValid(Task task, List<Interval> intervals) private boolean isTaskLocksValid(Task task, List<Interval> intervals)
{ {
giant.lock();
try {
return intervals return intervals
.stream() .stream()
.allMatch(interval -> { .allMatch(interval -> {
@ -632,6 +647,10 @@ public class TaskLockbox
return !lock.isRevoked() && lock.getType() != TaskLockType.SHARED; return !lock.isRevoked() && lock.getType() != TaskLockType.SHARED;
}); });
} }
finally {
giant.unlock();
}
}
private void revokeLock(TaskLockPosse lockPosse) private void revokeLock(TaskLockPosse lockPosse)
{ {
@ -676,7 +695,7 @@ public class TaskLockbox
final TaskLock revokedLock = lock.revokedCopy(); final TaskLock revokedLock = lock.revokedCopy();
taskStorage.replaceLock(taskId, lock, revokedLock); taskStorage.replaceLock(taskId, lock, revokedLock);
final List<TaskLockPosse> possesHolder = running.get(task.getDataSource()).get(lock.getInterval()); final List<TaskLockPosse> possesHolder = running.get(task.getDataSource()).get(lock.getInterval().getStart()).get(lock.getInterval());
final TaskLockPosse foundPosse = possesHolder.stream() final TaskLockPosse foundPosse = possesHolder.stream()
.filter(posse -> posse.getTaskLock().equals(lock)) .filter(posse -> posse.getTaskLock().equals(lock))
.findFirst() .findFirst()
@ -733,13 +752,19 @@ public class TaskLockbox
try { try {
final String dataSource = task.getDataSource(); final String dataSource = task.getDataSource();
final NavigableMap<Interval, List<TaskLockPosse>> dsRunning = running.get(task.getDataSource()); final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> dsRunning = running.get(task.getDataSource());
if (dsRunning == null || dsRunning.isEmpty()) { if (dsRunning == null || dsRunning.isEmpty()) {
return; return;
} }
final List<TaskLockPosse> possesHolder = dsRunning.get(interval); final SortedMap<Interval, List<TaskLockPosse>> intervalToPosses = dsRunning.get(interval.getStart());
if (intervalToPosses == null || intervalToPosses.isEmpty()) {
return;
}
final List<TaskLockPosse> possesHolder = intervalToPosses.get(interval);
if (possesHolder == null || possesHolder.isEmpty()) { if (possesHolder == null || possesHolder.isEmpty()) {
return; return;
} }
@ -760,8 +785,12 @@ public class TaskLockbox
possesHolder.remove(taskLockPosse); possesHolder.remove(taskLockPosse);
} }
if (possesHolder.size() == 0) { if (possesHolder.isEmpty()) {
dsRunning.remove(interval); intervalToPosses.remove(interval);
}
if (intervalToPosses.isEmpty()) {
dsRunning.remove(interval.getStart());
} }
if (running.get(dataSource).size() == 0) { if (running.get(dataSource).size() == 0) {
@ -797,6 +826,18 @@ public class TaskLockbox
} }
} }
public void add(Task task)
{
giant.lock();
try {
log.info("Adding task[%s] to activeTasks", task.getId());
activeTasks.add(task.getId());
}
finally {
giant.unlock();
}
}
/** /**
* Release all locks for a task and remove task from set of active tasks. Does nothing if the task is not currently locked or not an active task. * Release all locks for a task and remove task from set of active tasks. Does nothing if the task is not currently locked or not an active task.
* *
@ -832,11 +873,12 @@ public class TaskLockbox
try { try {
// Scan through all locks for this datasource // Scan through all locks for this datasource
final NavigableMap<Interval, List<TaskLockPosse>> dsRunning = running.get(task.getDataSource()); final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> dsRunning = running.get(task.getDataSource());
if (dsRunning == null) { if (dsRunning == null) {
return ImmutableList.of(); return ImmutableList.of();
} else { } else {
return dsRunning.values().stream() return dsRunning.values().stream()
.flatMap(map -> map.values().stream())
.flatMap(Collection::stream) .flatMap(Collection::stream)
.filter(taskLockPosse -> taskLockPosse.containsTask(task)) .filter(taskLockPosse -> taskLockPosse.containsTask(task))
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -870,29 +912,28 @@ public class TaskLockbox
giant.lock(); giant.lock();
try { try {
final NavigableMap<Interval, List<TaskLockPosse>> dsRunning = running.get(dataSource); final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> dsRunning = running.get(dataSource);
if (dsRunning == null) { if (dsRunning == null) {
// No locks at all // No locks at all
return Collections.emptyList(); return Collections.emptyList();
} else { } else {
// Tasks are indexed by locked interval, which are sorted by interval start. Intervals are non-overlapping, so: // Tasks are indexed by locked interval, which are sorted by interval start. Intervals are non-overlapping, so:
final NavigableSet<Interval> dsLockbox = dsRunning.navigableKeySet(); final NavigableSet<DateTime> dsLockbox = dsRunning.navigableKeySet();
final Iterable<Interval> searchIntervals = Iterables.concat( final Iterable<DateTime> searchStartTimes = Iterables.concat(
// Single interval that starts at or before ours // Single interval that starts at or before ours
Collections.singletonList(dsLockbox.floor(new Interval(interval.getStart(), DateTimes.MAX))), Collections.singletonList(dsLockbox.floor(interval.getStart())),
// All intervals that start somewhere between our start instant (exclusive) and end instant (exclusive) // All intervals that start somewhere between our start instant (exclusive) and end instant (exclusive)
dsLockbox.subSet( dsLockbox.subSet(interval.getStart(), false, interval.getEnd(), false)
new Interval(interval.getStart(), DateTimes.MAX),
false,
new Interval(interval.getEnd(), interval.getEnd()),
false
)
); );
return StreamSupport.stream(searchIntervals.spliterator(), false) return StreamSupport.stream(searchStartTimes.spliterator(), false)
.filter(searchInterval -> searchInterval != null && searchInterval.overlaps(interval)) .filter(java.util.Objects::nonNull)
.flatMap(searchInterval -> dsRunning.get(searchInterval).stream()) .map(dsRunning::get)
.filter(java.util.Objects::nonNull)
.flatMap(sortedMap -> sortedMap.entrySet().stream())
.filter(entry -> entry.getKey().overlaps(interval))
.flatMap(entry -> entry.getValue().stream())
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
} }
@ -901,12 +942,24 @@ public class TaskLockbox
} }
} }
public void add(Task task) @VisibleForTesting
TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval)
{ {
giant.lock(); giant.lock();
try { try {
log.info("Adding task[%s] to activeTasks", task.getId()); final List<TaskLockPosse> filteredPosses = findLockPossesContainingInterval(task.getDataSource(), interval)
activeTasks.add(task.getId()); .stream()
.filter(lockPosse -> lockPosse.containsTask(task))
.collect(Collectors.toList());
if (filteredPosses.isEmpty()) {
throw new ISE("Cannot find locks for task[%s] and interval[%s]", task.getId(), interval);
} else if (filteredPosses.size() > 1) {
throw new ISE("There are multiple lockPosses for task[%s] and interval[%s]?", task.getId(), interval);
} else {
return filteredPosses.get(0);
}
} }
finally { finally {
giant.unlock(); giant.unlock();
@ -936,22 +989,6 @@ public class TaskLockbox
return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority; return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
} }
private TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval)
{
final List<TaskLockPosse> filteredPosses = findLockPossesContainingInterval(task.getDataSource(), interval)
.stream()
.filter(lockPosse -> lockPosse.containsTask(task))
.collect(Collectors.toList());
if (filteredPosses.isEmpty()) {
throw new ISE("Cannot find locks for task[%s] and interval[%s]", task.getId(), interval);
} else if (filteredPosses.size() > 1) {
throw new ISE("There are multiple lockPosses for task[%s] and interval[%s]?", task.getId(), interval);
} else {
return filteredPosses.get(0);
}
}
@VisibleForTesting @VisibleForTesting
Set<String> getActiveTasks() Set<String> getActiveTasks()
{ {
@ -959,7 +996,7 @@ public class TaskLockbox
} }
@VisibleForTesting @VisibleForTesting
Map<String, NavigableMap<Interval, List<TaskLockPosse>>> getAllLocks() Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> getAllLocks()
{ {
return running; return running;
} }

View File

@ -35,6 +35,7 @@ import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskLockbox.TaskLockPosse;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
@ -624,6 +625,48 @@ public class TaskLockboxTest
Assert.assertTrue(lockbox.getAllLocks().isEmpty()); Assert.assertTrue(lockbox.getAllLocks().isEmpty());
} }
@Test
public void testFindLockPosseAfterRevokeWithDifferentLockIntervals() throws EntryExistsException
{
final Task lowPriorityTask = NoopTask.create(0);
final Task highPriorityTask = NoopTask.create(10);
taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
taskStorage.insert(highPriorityTask, TaskStatus.running(highPriorityTask.getId()));
lockbox.add(lowPriorityTask);
lockbox.add(highPriorityTask);
Assert.assertTrue(
lockbox.tryLock(
TaskLockType.EXCLUSIVE,
lowPriorityTask, Intervals.of("2018-12-16T09:00:00/2018-12-16T10:00:00")
).isOk()
);
Assert.assertTrue(
lockbox.tryLock(
TaskLockType.EXCLUSIVE,
highPriorityTask, Intervals.of("2018-12-16T09:00:00/2018-12-16T09:30:00")
).isOk()
);
final TaskLockPosse highLockPosse = lockbox.getOnlyTaskLockPosseContainingInterval(
highPriorityTask,
Intervals.of("2018-12-16T09:00:00/2018-12-16T09:30:00")
);
Assert.assertTrue(highLockPosse.containsTask(highPriorityTask));
Assert.assertFalse(highLockPosse.getTaskLock().isRevoked());
final TaskLockPosse lowLockPosse = lockbox.getOnlyTaskLockPosseContainingInterval(
lowPriorityTask,
Intervals.of("2018-12-16T09:00:00/2018-12-16T10:00:00")
);
Assert.assertTrue(lowLockPosse.containsTask(lowPriorityTask));
Assert.assertTrue(lowLockPosse.getTaskLock().isRevoked());
}
private Set<TaskLock> getAllLocks(List<Task> tasks) private Set<TaskLock> getAllLocks(List<Task> tasks)
{ {
return tasks.stream() return tasks.stream()