From 3b847512332dc1608f9ca13459fc24d97d58d050 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 10 May 2024 06:38:29 +0530 Subject: [PATCH] Remove unused task action SegmentLockReleaseAction (#16422) Changes: - Remove `SegmentLockReleaseAction` as it is not used anywhere. It is not even registered as a known sub-type of `TaskAction`. - Minor refactor in `TaskLockbox`. No functional change. - Remove `ExpectedException` from `TaskLockboxTest` --- .../actions/SegmentLockReleaseAction.java | 85 ------- .../druid/indexing/overlord/TaskLockbox.java | 232 +++++++----------- .../indexing/overlord/TaskLockboxTest.java | 47 ++-- 3 files changed, 107 insertions(+), 257 deletions(-) delete mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentLockReleaseAction.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentLockReleaseAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentLockReleaseAction.java deleted file mode 100644 index 6a47091710b..00000000000 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentLockReleaseAction.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.actions; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.type.TypeReference; -import org.apache.druid.indexing.common.task.Task; -import org.joda.time.Interval; - -/** - * TaskAction to release a {@link org.apache.druid.indexing.common.SegmentLock}. - * Used by batch tasks when they fail to acquire all necessary locks. - */ -public class SegmentLockReleaseAction implements TaskAction -{ - private final Interval interval; - private final int partitionId; - - @JsonCreator - public SegmentLockReleaseAction(@JsonProperty Interval interval, @JsonProperty int partitionId) - { - this.interval = interval; - this.partitionId = partitionId; - } - - @JsonProperty - public Interval getInterval() - { - return interval; - } - - @JsonProperty - public int getPartitionId() - { - return partitionId; - } - - @Override - public TypeReference getReturnTypeReference() - { - return new TypeReference() - { - }; - } - - @Override - public Void perform(Task task, TaskActionToolbox toolbox) - { - toolbox.getTaskLockbox().unlock(task, interval, partitionId); - return null; - } - - @Override - public boolean isAudited() - { - return false; - } - - @Override - public String toString() - { - return "SegmentLockReleaseAction{" + - "interval=" + interval + - ", partitionId=" + partitionId + - '}'; - } -} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 1155592f325..7776663330b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -24,7 +24,6 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.ComparisonChain; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; @@ -63,13 +62,13 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Consumer; import java.util.stream.Collectors; /** @@ -766,30 +765,12 @@ public class TaskLockbox giant.lock(); try { - return action.perform(isTaskLocksValid(task, intervals)); - } - finally { - giant.unlock(); - } - } - - /** - * Check all locks task acquired are still valid. - * It doesn't check other semantics like acquired locks are enough to overwrite existing segments. - * This kind of semantic should be checked in each caller of {@link #doInCriticalSection}. - */ - private boolean isTaskLocksValid(Task task, Set intervals) - { - giant.lock(); - try { - return intervals - .stream() - .allMatch(interval -> { - final List lockPosses = getOnlyTaskLockPosseContainingInterval(task, interval); - return lockPosses.stream().map(TaskLockPosse::getTaskLock).noneMatch( - TaskLock::isRevoked - ); - }); + // Check if any of the locks held by this task have been revoked + final boolean areTaskLocksValid = intervals.stream().noneMatch(interval -> { + Optional lockPosse = getOnlyTaskLockPosseContainingInterval(task, interval); + return lockPosse.isPresent() && lockPosse.get().getTaskLock().isRevoked(); + }); + return action.perform(areTaskLocksValid); } finally { giant.unlock(); @@ -801,7 +782,7 @@ public class TaskLockbox giant.lock(); try { - lockPosse.forEachTask(taskId -> revokeLock(taskId, lockPosse.getTaskLock())); + lockPosse.taskIds.forEach(taskId -> revokeLock(taskId, lockPosse.getTaskLock())); } finally { giant.unlock(); @@ -1083,22 +1064,20 @@ public class TaskLockbox * @param task task to unlock * @param interval interval to unlock */ - public void unlock(final Task task, final Interval interval, @Nullable Integer partitionId) + private void unlock(final Task task, final Interval interval, @Nullable Integer partitionId) { giant.lock(); try { final String dataSource = task.getDataSource(); - final NavigableMap>> dsRunning = running.get( - task.getDataSource() - ); - - if (dsRunning == null || dsRunning.isEmpty()) { + final NavigableMap>> locksForDatasource + = running.get(task.getDataSource()); + if (locksForDatasource == null || locksForDatasource.isEmpty()) { return; } - final SortedMap> intervalToPosses = dsRunning.get(interval.getStart()); - + final SortedMap> intervalToPosses + = locksForDatasource.get(interval.getStart()); if (intervalToPosses == null || intervalToPosses.isEmpty()) { return; } @@ -1126,18 +1105,15 @@ public class TaskLockbox final boolean removed = taskLockPosse.removeTask(task); if (taskLockPosse.isTasksEmpty()) { - log.info("TaskLock is now empty: %s", taskLock); + log.info("TaskLock[%s] is now empty.", taskLock); possesHolder.remove(taskLockPosse); } - if (possesHolder.isEmpty()) { intervalToPosses.remove(interval); } - if (intervalToPosses.isEmpty()) { - dsRunning.remove(interval.getStart()); + locksForDatasource.remove(interval.getStart()); } - if (running.get(dataSource).isEmpty()) { running.remove(dataSource); } @@ -1227,39 +1203,7 @@ public class TaskLockbox try { try { log.info("Removing task[%s] from activeTasks", task.getId()); - try { - // Clean upgrade segments table for entries associated with replacing task - if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == TaskLockType.REPLACE)) { - final int upgradeSegmentsDeleted = metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId()); - log.info( - "Deleted [%d] entries from upgradeSegments table for task[%s] with REPLACE locks.", - upgradeSegmentsDeleted, task.getId() - ); - } - // Clean pending segments associated with the appending task - if (task instanceof PendingSegmentAllocatingTask) { - final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId(); - if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) { - final Set idsInSameGroup = activeAllocatorIdToTaskIds.get(taskAllocatorId); - idsInSameGroup.remove(task.getId()); - if (idsInSameGroup.isEmpty()) { - final int pendingSegmentsDeleted - = metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId( - task.getDataSource(), - taskAllocatorId - ); - log.info( - "Deleted [%d] entries from pendingSegments table for pending segments group [%s] with APPEND locks.", - pendingSegmentsDeleted, taskAllocatorId - ); - } - activeAllocatorIdToTaskIds.remove(taskAllocatorId); - } - } - } - catch (Exception e) { - log.warn(e, "Failure cleaning up upgradeSegments or pendingSegments tables."); - } + cleanupUpgradeAndPendingSegments(task); unlockAll(task); } finally { @@ -1271,33 +1215,63 @@ public class TaskLockbox } } - /** - * Return the currently-active lock posses for some task. - * - * @param task task for which to locate locks - */ - private List findLockPossesForTask(final Task task) + @GuardedBy("giant") + private void cleanupUpgradeAndPendingSegments(Task task) { - giant.lock(); - try { - // Scan through all locks for this datasource - final NavigableMap>> dsRunning = running.get(task.getDataSource()); - if (dsRunning == null) { - return ImmutableList.of(); - } else { - return dsRunning.values().stream() - .flatMap(map -> map.values().stream()) - .flatMap(Collection::stream) - .filter(taskLockPosse -> taskLockPosse.containsTask(task)) - .collect(Collectors.toList()); + // Clean up upgrade segment entries associated with a REPLACE task + if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == TaskLockType.REPLACE)) { + final int upgradeSegmentsDeleted = metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId()); + log.info( + "Deleted [%d] entries from upgradeSegments table for task[%s] with REPLACE locks.", + upgradeSegmentsDeleted, task.getId() + ); + } + + // Clean up pending segments associated with an APPEND task + if (task instanceof PendingSegmentAllocatingTask) { + final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId(); + if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) { + final Set taskIdsForSameAllocator = activeAllocatorIdToTaskIds.get(taskAllocatorId); + taskIdsForSameAllocator.remove(task.getId()); + + if (taskIdsForSameAllocator.isEmpty()) { + final int pendingSegmentsDeleted = metadataStorageCoordinator + .deletePendingSegmentsForTaskAllocatorId(task.getDataSource(), taskAllocatorId); + log.info( + "Deleted [%d] entries from pendingSegments table for taskAllocatorId[%s].", + pendingSegmentsDeleted, taskAllocatorId + ); + } + activeAllocatorIdToTaskIds.remove(taskAllocatorId); + } } } - finally { - giant.unlock(); + catch (Exception e) { + log.warn(e, "Failure cleaning up upgradeSegments or pendingSegments tables."); } } + /** + * Finds all the lock posses for the given task. + */ + @GuardedBy("giant") + private List findLockPossesForTask(final Task task) + { + // Scan through all locks for this datasource + final NavigableMap>> locksForDatasource + = running.get(task.getDataSource()); + if (locksForDatasource == null) { + return Collections.emptyList(); + } + + return locksForDatasource.values().stream() + .flatMap(map -> map.values().stream()) + .flatMap(Collection::stream) + .filter(taskLockPosse -> taskLockPosse.containsTask(task)) + .collect(Collectors.toList()); + } + private List findLockPossesContainingInterval(final String dataSource, final Interval interval) { giant.lock(); @@ -1342,19 +1316,7 @@ public class TaskLockbox } @VisibleForTesting - List getOnlyTaskLockPosseContainingInterval(Task task, Interval interval) - { - giant.lock(); - try { - return getOnlyTaskLockPosseContainingInterval(task, interval, Collections.emptySet()); - } - finally { - giant.unlock(); - } - } - - @VisibleForTesting - List getOnlyTaskLockPosseContainingInterval(Task task, Interval interval, Set partitionIds) + Optional getOnlyTaskLockPosseContainingInterval(Task task, Interval interval) { giant.lock(); try { @@ -1364,35 +1326,20 @@ public class TaskLockbox .collect(Collectors.toList()); if (filteredPosses.isEmpty()) { - throw new ISE("Cannot find locks for task[%s] and interval[%s]", task.getId(), interval); - } else if (filteredPosses.size() > 1) { - if (filteredPosses.stream() - .anyMatch(posse -> posse.getTaskLock().getGranularity() == LockGranularity.TIME_CHUNK)) { - throw new ISE( - "There are multiple timeChunk lockPosses for task[%s] and interval[%s]?", - task.getId(), - interval - ); - } else { - final Map partitionIdsOfLocks = new HashMap<>(); - for (TaskLockPosse posse : filteredPosses) { - final SegmentLock segmentLock = (SegmentLock) posse.getTaskLock(); - partitionIdsOfLocks.put(segmentLock.getPartitionId(), posse); - } - - if (partitionIds.stream().allMatch(partitionIdsOfLocks::containsKey)) { - return partitionIds.stream().map(partitionIdsOfLocks::get).collect(Collectors.toList()); - } else { - throw new ISE( - "Task[%s] doesn't have locks for interval[%s] partitions[%]", - task.getId(), - interval, - partitionIds.stream().filter(pid -> !partitionIdsOfLocks.containsKey(pid)).collect(Collectors.toList()) - ); - } - } + throw new ISE("Cannot find any lock for task[%s] and interval[%s]", task.getId(), interval); + } else if (filteredPosses.size() == 1) { + return Optional.of(filteredPosses.get(0)); + } else if ( + filteredPosses.stream().anyMatch( + posse -> posse.taskLock.getGranularity() == LockGranularity.TIME_CHUNK + ) + ) { + throw new ISE( + "There are multiple timechunk lockPosses for task[%s] and interval[%s]", + task.getId(), interval + ); } else { - return filteredPosses; + return Optional.empty(); } } finally { @@ -1635,17 +1582,13 @@ public class TaskLockbox Preconditions.checkArgument( taskLock.getGroupId().equals(task.getGroupId()), "groupId[%s] of task[%s] is different from the existing lockPosse's groupId[%s]", - task.getGroupId(), - task.getId(), - taskLock.getGroupId() + task.getGroupId(), task.getId(), taskLock.getGroupId() ); } Preconditions.checkArgument( taskLock.getNonNullPriority() == task.getPriority(), "priority[%s] of task[%s] is different from the existing lockPosse's priority[%s]", - task.getPriority(), - task.getId(), - taskLock.getNonNullPriority() + task.getPriority(), task.getId(), taskLock.getNonNullPriority() ); return taskIds.add(task.getId()); } @@ -1724,12 +1667,6 @@ public class TaskLockbox return false; } - void forEachTask(Consumer action) - { - Preconditions.checkNotNull(action, "action"); - taskIds.forEach(action); - } - @Override public boolean equals(Object o) { @@ -1801,8 +1738,7 @@ public class TaskLockbox /** * Contains the task, request, lock and final result for a segment allocation. */ - @VisibleForTesting - static class SegmentAllocationHolder + private static class SegmentAllocationHolder { final AllocationHolderList list; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index e6b20dd073e..a02b5108767 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -73,7 +73,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.util.ArrayList; import java.util.Arrays; @@ -83,6 +82,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -91,9 +91,6 @@ public class TaskLockboxTest @Rule public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule(); - @Rule - public ExpectedException expectedException = ExpectedException.none(); - private ObjectMapper objectMapper; private TaskStorage taskStorage; private IndexerMetadataStorageCoordinator metadataStorageCoordinator; @@ -105,9 +102,6 @@ public class TaskLockboxTest private final int MEDIUM_PRIORITY = 10; private final int LOW_PRIORITY = 5; - @Rule - public final ExpectedException exception = ExpectedException.none(); - @Before public void setup() { @@ -186,14 +180,16 @@ public class TaskLockboxTest } @Test - public void testLockAfterTaskComplete() throws InterruptedException + public void testLockAfterTaskComplete() { - Task task = NoopTask.create(); - exception.expect(ISE.class); - exception.expectMessage("Unable to grant lock to inactive Task"); + final Task task = NoopTask.create(); lockbox.add(task); lockbox.remove(task); - acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, Intervals.of("2015-01-01/2015-01-02")); + ISE exception = Assert.assertThrows( + ISE.class, + () -> acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, Intervals.of("2015-01-01/2015-01-02")) + ); + Assert.assertTrue(exception.getMessage().contains("Unable to grant lock to inactive Task")); } @Test @@ -311,12 +307,15 @@ public class TaskLockboxTest @Test public void testTryLockAfterTaskComplete() { - Task task = NoopTask.create(); - exception.expect(ISE.class); - exception.expectMessage("Unable to grant lock to inactive Task"); + final Task task = NoopTask.create(); lockbox.add(task); lockbox.remove(task); - Assert.assertFalse(tryTimeChunkLock(TaskLockType.EXCLUSIVE, task, Intervals.of("2015-01-01/2015-01-02")).isOk()); + + ISE exception = Assert.assertThrows( + ISE.class, + () -> tryTimeChunkLock(TaskLockType.EXCLUSIVE, task, Intervals.of("2015-01-01/2015-01-02")) + ); + Assert.assertTrue(exception.getMessage().contains("Unable to grant lock to inactive Task")); } @Test @@ -759,23 +758,23 @@ public class TaskLockboxTest ).isOk() ); - final List highLockPosses = lockbox.getOnlyTaskLockPosseContainingInterval( + final Optional highLockPosse = lockbox.getOnlyTaskLockPosseContainingInterval( highPriorityTask, Intervals.of("2018-12-16T09:00:00/2018-12-16T09:30:00") ); - Assert.assertEquals(1, highLockPosses.size()); - Assert.assertTrue(highLockPosses.get(0).containsTask(highPriorityTask)); - Assert.assertFalse(highLockPosses.get(0).getTaskLock().isRevoked()); + Assert.assertTrue(highLockPosse.isPresent()); + Assert.assertTrue(highLockPosse.get().containsTask(highPriorityTask)); + Assert.assertFalse(highLockPosse.get().getTaskLock().isRevoked()); - final List lowLockPosses = lockbox.getOnlyTaskLockPosseContainingInterval( + final Optional lowLockPosse = lockbox.getOnlyTaskLockPosseContainingInterval( lowPriorityTask, Intervals.of("2018-12-16T09:00:00/2018-12-16T10:00:00") ); - Assert.assertEquals(1, lowLockPosses.size()); - Assert.assertTrue(lowLockPosses.get(0).containsTask(lowPriorityTask)); - Assert.assertTrue(lowLockPosses.get(0).getTaskLock().isRevoked()); + Assert.assertTrue(lowLockPosse.isPresent()); + Assert.assertTrue(lowLockPosse.get().containsTask(lowPriorityTask)); + Assert.assertTrue(lowLockPosse.get().getTaskLock().isRevoked()); } @Test