mirror of https://github.com/apache/druid.git
Remove incorrect optimization (#14246)
This commit is contained in:
parent
e833a4700d
commit
47e48ee657
|
@ -24,7 +24,6 @@ import com.google.common.base.Objects;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ComparisonChain;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.inject.Inject;
|
||||
|
@ -55,7 +54,6 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
@ -64,7 +62,6 @@ import java.util.concurrent.locks.Condition;
|
|||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
/**
|
||||
* Remembers which activeTasks have locked which intervals or which segments. Tasks are permitted to lock an interval
|
||||
|
@ -1155,17 +1152,7 @@ public class TaskLockbox
|
|||
// No locks at all
|
||||
return Collections.emptyList();
|
||||
} else {
|
||||
// Tasks are indexed by locked interval, which are sorted by interval start. Intervals are non-overlapping, so:
|
||||
final NavigableSet<DateTime> dsLockbox = dsRunning.navigableKeySet();
|
||||
final Iterable<DateTime> searchStartTimes = Iterables.concat(
|
||||
// Single interval that starts at or before ours
|
||||
Collections.singletonList(dsLockbox.floor(interval.getStart())),
|
||||
|
||||
// All intervals that start somewhere between our start instant (exclusive) and end instant (exclusive)
|
||||
dsLockbox.subSet(interval.getStart(), false, interval.getEnd(), false)
|
||||
);
|
||||
|
||||
return StreamSupport.stream(searchStartTimes.spliterator(), false)
|
||||
return dsRunning.navigableKeySet().stream()
|
||||
.filter(java.util.Objects::nonNull)
|
||||
.map(dsRunning::get)
|
||||
.filter(java.util.Objects::nonNull)
|
||||
|
|
|
@ -1357,6 +1357,52 @@ public class TaskLockboxTest
|
|||
result.getTasksToFail());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConflictsWithOverlappingSharedLocks() throws Exception
|
||||
{
|
||||
final List<Task> tasks = new ArrayList<>();
|
||||
|
||||
final Task conflictingTask = NoopTask.create(10);
|
||||
tasks.add(conflictingTask);
|
||||
lockbox.add(conflictingTask);
|
||||
taskStorage.insert(conflictingTask, TaskStatus.running(conflictingTask.getId()));
|
||||
TaskLock conflictingLock = tryTimeChunkLock(
|
||||
TaskLockType.SHARED,
|
||||
conflictingTask,
|
||||
Intervals.of("2023-05-01/2023-06-01")
|
||||
).getTaskLock();
|
||||
Assert.assertNotNull(conflictingLock);
|
||||
Assert.assertFalse(conflictingLock.isRevoked());
|
||||
|
||||
final Task floorTask = NoopTask.create(10);
|
||||
tasks.add(floorTask);
|
||||
lockbox.add(floorTask);
|
||||
taskStorage.insert(floorTask, TaskStatus.running(floorTask.getId()));
|
||||
TaskLock floorLock = tryTimeChunkLock(
|
||||
TaskLockType.SHARED,
|
||||
floorTask,
|
||||
Intervals.of("2023-05-26/2023-05-27")
|
||||
).getTaskLock();
|
||||
Assert.assertNotNull(floorLock);
|
||||
Assert.assertFalse(floorLock.isRevoked());
|
||||
|
||||
final Task rightOverlapTask = NoopTask.create(10);
|
||||
tasks.add(rightOverlapTask);
|
||||
lockbox.add(rightOverlapTask);
|
||||
taskStorage.insert(rightOverlapTask, TaskStatus.running(rightOverlapTask.getId()));
|
||||
TaskLock rightOverlapLock = tryTimeChunkLock(
|
||||
TaskLockType.EXCLUSIVE,
|
||||
rightOverlapTask,
|
||||
Intervals.of("2023-05-28/2023-06-03")
|
||||
).getTaskLock();
|
||||
Assert.assertNull(rightOverlapLock);
|
||||
|
||||
Assert.assertEquals(
|
||||
ImmutableSet.of(conflictingLock, floorLock),
|
||||
getAllActiveLocks(tasks)
|
||||
);
|
||||
}
|
||||
|
||||
private Set<TaskLock> getAllActiveLocks(List<Task> tasks)
|
||||
{
|
||||
return tasks.stream()
|
||||
|
|
Loading…
Reference in New Issue