diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java index 2f42ebb21c4..bb835997801 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java @@ -46,6 +46,7 @@ import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Set; import java.util.TreeMap; +import java.util.stream.Collectors; public class TaskLocks { @@ -93,6 +94,12 @@ public class TaskLocks : DateTimes.nowUtc().toString(); } + /** + * Checks if the segments are covered by a non revoked lock + * @param taskLockMap task locks for a task + * @param segments segments to be checked + * @return true if each of the segments is covered by a non-revoked lock + */ public static boolean isLockCoversSegments( NavigableMap> taskLockMap, Collection segments @@ -105,7 +112,11 @@ public class TaskLocks return false; } - final List locks = entry.getValue(); + // taskLockMap may contain revoked locks which need to be filtered + final List locks = entry.getValue() + .stream() + .filter(lock -> !lock.isRevoked()) + .collect(Collectors.toList()); return locks.stream().anyMatch( lock -> { if (lock.getGranularity() == LockGranularity.TIME_CHUNK) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index aaa563fa4e8..761c0b59160 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -832,7 +832,7 @@ public class TaskLockbox * @param lock lock to be revoked */ @VisibleForTesting - protected void revokeLock(String taskId, TaskLock lock) + public void revokeLock(String taskId, TaskLock lock) { giant.lock(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java index 43a403e59a9..e5d1d2882ea 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java @@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.actions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.SegmentLock; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; @@ -34,6 +35,7 @@ import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; import org.apache.druid.indexing.overlord.LockResult; import org.apache.druid.indexing.overlord.SpecificSegmentLockRequest; import org.apache.druid.indexing.overlord.TaskLockbox; +import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.TimeChunkLockRequest; import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator; import org.apache.druid.java.util.common.DateTimes; @@ -62,11 +64,13 @@ public class TaskLocksTest @Before public void setup() { + final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); lockbox = new TaskLockbox( - new HeapMemoryTaskStorage(new TaskStorageConfig(null)), + taskStorage, new TestIndexerMetadataStorageCoordinator() ); task = NoopTask.create(); + taskStorage.insert(task, TaskStatus.running(task.getId())); lockbox.add(task); } @@ -296,6 +300,19 @@ public class TaskLocksTest ); } + @Test + public void testRevokedLocksDoNotCoverSegments() + { + final Set segments = createNumberedPartitionedSegments(); + final Interval interval = Intervals.of("2017-01-01/2017-01-02"); + + final TaskLock lock = tryTimeChunkLock(task, interval, TaskLockType.EXCLUSIVE); + Assert.assertTrue(TaskLocks.isLockCoversSegments(task, lockbox, segments)); + + lockbox.revokeLock(task.getId(), lock); + Assert.assertFalse(TaskLocks.isLockCoversSegments(task, lockbox, segments)); + } + @Test public void testFindReplaceLocksCoveringSegments() {