Remove unused task action SegmentLockReleaseAction (#16422)

Changes:
- Remove `SegmentLockReleaseAction` as it is not used anywhere.
It is not even registered as a known sub-type of `TaskAction`.
- Minor refactor in `TaskLockbox`. No functional change.
- Remove `ExpectedException` from `TaskLockboxTest`
This commit is contained in:
Kashif Faraz 2024-05-10 06:38:29 +05:30 committed by GitHub
parent d0f3fdab37
commit 3b84751233
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 107 additions and 257 deletions

View File

@ -1,85 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexing.common.task.Task;
import org.joda.time.Interval;
/**
* TaskAction to release a {@link org.apache.druid.indexing.common.SegmentLock}.
* Used by batch tasks when they fail to acquire all necessary locks.
*/
public class SegmentLockReleaseAction implements TaskAction<Void>
{
private final Interval interval;
private final int partitionId;
@JsonCreator
public SegmentLockReleaseAction(@JsonProperty Interval interval, @JsonProperty int partitionId)
{
this.interval = interval;
this.partitionId = partitionId;
}
@JsonProperty
public Interval getInterval()
{
return interval;
}
@JsonProperty
public int getPartitionId()
{
return partitionId;
}
@Override
public TypeReference<Void> getReturnTypeReference()
{
return new TypeReference<Void>()
{
};
}
@Override
public Void perform(Task task, TaskActionToolbox toolbox)
{
toolbox.getTaskLockbox().unlock(task, interval, partitionId);
return null;
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{
return "SegmentLockReleaseAction{" +
"interval=" + interval +
", partitionId=" + partitionId +
'}';
}
}

View File

@ -24,7 +24,6 @@ import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
@ -63,13 +62,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
@ -766,30 +765,12 @@ public class TaskLockbox
giant.lock();
try {
return action.perform(isTaskLocksValid(task, intervals));
}
finally {
giant.unlock();
}
}
/**
* Check all locks task acquired are still valid.
* It doesn't check other semantics like acquired locks are enough to overwrite existing segments.
* This kind of semantic should be checked in each caller of {@link #doInCriticalSection}.
*/
private boolean isTaskLocksValid(Task task, Set<Interval> intervals)
{
giant.lock();
try {
return intervals
.stream()
.allMatch(interval -> {
final List<TaskLockPosse> lockPosses = getOnlyTaskLockPosseContainingInterval(task, interval);
return lockPosses.stream().map(TaskLockPosse::getTaskLock).noneMatch(
TaskLock::isRevoked
);
});
// Check if any of the locks held by this task have been revoked
final boolean areTaskLocksValid = intervals.stream().noneMatch(interval -> {
Optional<TaskLockPosse> lockPosse = getOnlyTaskLockPosseContainingInterval(task, interval);
return lockPosse.isPresent() && lockPosse.get().getTaskLock().isRevoked();
});
return action.perform(areTaskLocksValid);
}
finally {
giant.unlock();
@ -801,7 +782,7 @@ public class TaskLockbox
giant.lock();
try {
lockPosse.forEachTask(taskId -> revokeLock(taskId, lockPosse.getTaskLock()));
lockPosse.taskIds.forEach(taskId -> revokeLock(taskId, lockPosse.getTaskLock()));
}
finally {
giant.unlock();
@ -1083,22 +1064,20 @@ public class TaskLockbox
* @param task task to unlock
* @param interval interval to unlock
*/
public void unlock(final Task task, final Interval interval, @Nullable Integer partitionId)
private void unlock(final Task task, final Interval interval, @Nullable Integer partitionId)
{
giant.lock();
try {
final String dataSource = task.getDataSource();
final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> dsRunning = running.get(
task.getDataSource()
);
if (dsRunning == null || dsRunning.isEmpty()) {
final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> locksForDatasource
= running.get(task.getDataSource());
if (locksForDatasource == null || locksForDatasource.isEmpty()) {
return;
}
final SortedMap<Interval, List<TaskLockPosse>> intervalToPosses = dsRunning.get(interval.getStart());
final SortedMap<Interval, List<TaskLockPosse>> intervalToPosses
= locksForDatasource.get(interval.getStart());
if (intervalToPosses == null || intervalToPosses.isEmpty()) {
return;
}
@ -1126,18 +1105,15 @@ public class TaskLockbox
final boolean removed = taskLockPosse.removeTask(task);
if (taskLockPosse.isTasksEmpty()) {
log.info("TaskLock is now empty: %s", taskLock);
log.info("TaskLock[%s] is now empty.", taskLock);
possesHolder.remove(taskLockPosse);
}
if (possesHolder.isEmpty()) {
intervalToPosses.remove(interval);
}
if (intervalToPosses.isEmpty()) {
dsRunning.remove(interval.getStart());
locksForDatasource.remove(interval.getStart());
}
if (running.get(dataSource).isEmpty()) {
running.remove(dataSource);
}
@ -1227,39 +1203,7 @@ public class TaskLockbox
try {
try {
log.info("Removing task[%s] from activeTasks", task.getId());
try {
// Clean upgrade segments table for entries associated with replacing task
if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == TaskLockType.REPLACE)) {
final int upgradeSegmentsDeleted = metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
log.info(
"Deleted [%d] entries from upgradeSegments table for task[%s] with REPLACE locks.",
upgradeSegmentsDeleted, task.getId()
);
}
// Clean pending segments associated with the appending task
if (task instanceof PendingSegmentAllocatingTask) {
final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId();
if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) {
final Set<String> idsInSameGroup = activeAllocatorIdToTaskIds.get(taskAllocatorId);
idsInSameGroup.remove(task.getId());
if (idsInSameGroup.isEmpty()) {
final int pendingSegmentsDeleted
= metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(
task.getDataSource(),
taskAllocatorId
);
log.info(
"Deleted [%d] entries from pendingSegments table for pending segments group [%s] with APPEND locks.",
pendingSegmentsDeleted, taskAllocatorId
);
}
activeAllocatorIdToTaskIds.remove(taskAllocatorId);
}
}
}
catch (Exception e) {
log.warn(e, "Failure cleaning up upgradeSegments or pendingSegments tables.");
}
cleanupUpgradeAndPendingSegments(task);
unlockAll(task);
}
finally {
@ -1271,33 +1215,63 @@ public class TaskLockbox
}
}
/**
* Return the currently-active lock posses for some task.
*
* @param task task for which to locate locks
*/
private List<TaskLockPosse> findLockPossesForTask(final Task task)
@GuardedBy("giant")
private void cleanupUpgradeAndPendingSegments(Task task)
{
giant.lock();
try {
// Scan through all locks for this datasource
final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> dsRunning = running.get(task.getDataSource());
if (dsRunning == null) {
return ImmutableList.of();
} else {
return dsRunning.values().stream()
.flatMap(map -> map.values().stream())
.flatMap(Collection::stream)
.filter(taskLockPosse -> taskLockPosse.containsTask(task))
.collect(Collectors.toList());
// Clean up upgrade segment entries associated with a REPLACE task
if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == TaskLockType.REPLACE)) {
final int upgradeSegmentsDeleted = metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
log.info(
"Deleted [%d] entries from upgradeSegments table for task[%s] with REPLACE locks.",
upgradeSegmentsDeleted, task.getId()
);
}
// Clean up pending segments associated with an APPEND task
if (task instanceof PendingSegmentAllocatingTask) {
final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId();
if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) {
final Set<String> taskIdsForSameAllocator = activeAllocatorIdToTaskIds.get(taskAllocatorId);
taskIdsForSameAllocator.remove(task.getId());
if (taskIdsForSameAllocator.isEmpty()) {
final int pendingSegmentsDeleted = metadataStorageCoordinator
.deletePendingSegmentsForTaskAllocatorId(task.getDataSource(), taskAllocatorId);
log.info(
"Deleted [%d] entries from pendingSegments table for taskAllocatorId[%s].",
pendingSegmentsDeleted, taskAllocatorId
);
}
activeAllocatorIdToTaskIds.remove(taskAllocatorId);
}
}
}
finally {
giant.unlock();
catch (Exception e) {
log.warn(e, "Failure cleaning up upgradeSegments or pendingSegments tables.");
}
}
/**
* Finds all the lock posses for the given task.
*/
@GuardedBy("giant")
private List<TaskLockPosse> findLockPossesForTask(final Task task)
{
// Scan through all locks for this datasource
final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> locksForDatasource
= running.get(task.getDataSource());
if (locksForDatasource == null) {
return Collections.emptyList();
}
return locksForDatasource.values().stream()
.flatMap(map -> map.values().stream())
.flatMap(Collection::stream)
.filter(taskLockPosse -> taskLockPosse.containsTask(task))
.collect(Collectors.toList());
}
private List<TaskLockPosse> findLockPossesContainingInterval(final String dataSource, final Interval interval)
{
giant.lock();
@ -1342,19 +1316,7 @@ public class TaskLockbox
}
@VisibleForTesting
List<TaskLockPosse> getOnlyTaskLockPosseContainingInterval(Task task, Interval interval)
{
giant.lock();
try {
return getOnlyTaskLockPosseContainingInterval(task, interval, Collections.emptySet());
}
finally {
giant.unlock();
}
}
@VisibleForTesting
List<TaskLockPosse> getOnlyTaskLockPosseContainingInterval(Task task, Interval interval, Set<Integer> partitionIds)
Optional<TaskLockPosse> getOnlyTaskLockPosseContainingInterval(Task task, Interval interval)
{
giant.lock();
try {
@ -1364,35 +1326,20 @@ public class TaskLockbox
.collect(Collectors.toList());
if (filteredPosses.isEmpty()) {
throw new ISE("Cannot find locks for task[%s] and interval[%s]", task.getId(), interval);
} else if (filteredPosses.size() > 1) {
if (filteredPosses.stream()
.anyMatch(posse -> posse.getTaskLock().getGranularity() == LockGranularity.TIME_CHUNK)) {
throw new ISE(
"There are multiple timeChunk lockPosses for task[%s] and interval[%s]?",
task.getId(),
interval
);
} else {
final Map<Integer, TaskLockPosse> partitionIdsOfLocks = new HashMap<>();
for (TaskLockPosse posse : filteredPosses) {
final SegmentLock segmentLock = (SegmentLock) posse.getTaskLock();
partitionIdsOfLocks.put(segmentLock.getPartitionId(), posse);
}
if (partitionIds.stream().allMatch(partitionIdsOfLocks::containsKey)) {
return partitionIds.stream().map(partitionIdsOfLocks::get).collect(Collectors.toList());
} else {
throw new ISE(
"Task[%s] doesn't have locks for interval[%s] partitions[%]",
task.getId(),
interval,
partitionIds.stream().filter(pid -> !partitionIdsOfLocks.containsKey(pid)).collect(Collectors.toList())
);
}
}
throw new ISE("Cannot find any lock for task[%s] and interval[%s]", task.getId(), interval);
} else if (filteredPosses.size() == 1) {
return Optional.of(filteredPosses.get(0));
} else if (
filteredPosses.stream().anyMatch(
posse -> posse.taskLock.getGranularity() == LockGranularity.TIME_CHUNK
)
) {
throw new ISE(
"There are multiple timechunk lockPosses for task[%s] and interval[%s]",
task.getId(), interval
);
} else {
return filteredPosses;
return Optional.empty();
}
}
finally {
@ -1635,17 +1582,13 @@ public class TaskLockbox
Preconditions.checkArgument(
taskLock.getGroupId().equals(task.getGroupId()),
"groupId[%s] of task[%s] is different from the existing lockPosse's groupId[%s]",
task.getGroupId(),
task.getId(),
taskLock.getGroupId()
task.getGroupId(), task.getId(), taskLock.getGroupId()
);
}
Preconditions.checkArgument(
taskLock.getNonNullPriority() == task.getPriority(),
"priority[%s] of task[%s] is different from the existing lockPosse's priority[%s]",
task.getPriority(),
task.getId(),
taskLock.getNonNullPriority()
task.getPriority(), task.getId(), taskLock.getNonNullPriority()
);
return taskIds.add(task.getId());
}
@ -1724,12 +1667,6 @@ public class TaskLockbox
return false;
}
void forEachTask(Consumer<String> action)
{
Preconditions.checkNotNull(action, "action");
taskIds.forEach(action);
}
@Override
public boolean equals(Object o)
{
@ -1801,8 +1738,7 @@ public class TaskLockbox
/**
* Contains the task, request, lock and final result for a segment allocation.
*/
@VisibleForTesting
static class SegmentAllocationHolder
private static class SegmentAllocationHolder
{
final AllocationHolderList list;

View File

@ -73,7 +73,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.ArrayList;
import java.util.Arrays;
@ -83,6 +82,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -91,9 +91,6 @@ public class TaskLockboxTest
@Rule
public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule();
@Rule
public ExpectedException expectedException = ExpectedException.none();
private ObjectMapper objectMapper;
private TaskStorage taskStorage;
private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
@ -105,9 +102,6 @@ public class TaskLockboxTest
private final int MEDIUM_PRIORITY = 10;
private final int LOW_PRIORITY = 5;
@Rule
public final ExpectedException exception = ExpectedException.none();
@Before
public void setup()
{
@ -186,14 +180,16 @@ public class TaskLockboxTest
}
@Test
public void testLockAfterTaskComplete() throws InterruptedException
public void testLockAfterTaskComplete()
{
Task task = NoopTask.create();
exception.expect(ISE.class);
exception.expectMessage("Unable to grant lock to inactive Task");
final Task task = NoopTask.create();
lockbox.add(task);
lockbox.remove(task);
acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, Intervals.of("2015-01-01/2015-01-02"));
ISE exception = Assert.assertThrows(
ISE.class,
() -> acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, Intervals.of("2015-01-01/2015-01-02"))
);
Assert.assertTrue(exception.getMessage().contains("Unable to grant lock to inactive Task"));
}
@Test
@ -311,12 +307,15 @@ public class TaskLockboxTest
@Test
public void testTryLockAfterTaskComplete()
{
Task task = NoopTask.create();
exception.expect(ISE.class);
exception.expectMessage("Unable to grant lock to inactive Task");
final Task task = NoopTask.create();
lockbox.add(task);
lockbox.remove(task);
Assert.assertFalse(tryTimeChunkLock(TaskLockType.EXCLUSIVE, task, Intervals.of("2015-01-01/2015-01-02")).isOk());
ISE exception = Assert.assertThrows(
ISE.class,
() -> tryTimeChunkLock(TaskLockType.EXCLUSIVE, task, Intervals.of("2015-01-01/2015-01-02"))
);
Assert.assertTrue(exception.getMessage().contains("Unable to grant lock to inactive Task"));
}
@Test
@ -759,23 +758,23 @@ public class TaskLockboxTest
).isOk()
);
final List<TaskLockPosse> highLockPosses = lockbox.getOnlyTaskLockPosseContainingInterval(
final Optional<TaskLockPosse> highLockPosse = lockbox.getOnlyTaskLockPosseContainingInterval(
highPriorityTask,
Intervals.of("2018-12-16T09:00:00/2018-12-16T09:30:00")
);
Assert.assertEquals(1, highLockPosses.size());
Assert.assertTrue(highLockPosses.get(0).containsTask(highPriorityTask));
Assert.assertFalse(highLockPosses.get(0).getTaskLock().isRevoked());
Assert.assertTrue(highLockPosse.isPresent());
Assert.assertTrue(highLockPosse.get().containsTask(highPriorityTask));
Assert.assertFalse(highLockPosse.get().getTaskLock().isRevoked());
final List<TaskLockPosse> lowLockPosses = lockbox.getOnlyTaskLockPosseContainingInterval(
final Optional<TaskLockPosse> lowLockPosse = lockbox.getOnlyTaskLockPosseContainingInterval(
lowPriorityTask,
Intervals.of("2018-12-16T09:00:00/2018-12-16T10:00:00")
);
Assert.assertEquals(1, lowLockPosses.size());
Assert.assertTrue(lowLockPosses.get(0).containsTask(lowPriorityTask));
Assert.assertTrue(lowLockPosses.get(0).getTaskLock().isRevoked());
Assert.assertTrue(lowLockPosse.isPresent());
Assert.assertTrue(lowLockPosse.get().containsTask(lowPriorityTask));
Assert.assertTrue(lowLockPosse.get().getTaskLock().isRevoked());
}
@Test