Clean up stale locks if segment allocation fails (#14966)

* Clean up stale locks if segment allocation fails due to an exception
This commit is contained in:
AmatyaAvadhanula 2023-09-14 14:58:02 +05:30 committed by GitHub
parent 7bbefd5741
commit 0e3df2d2e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 167 additions and 3 deletions

View File

@ -494,9 +494,12 @@ public class TaskLockbox
allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending()); allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending());
holderList.getPending().forEach(holder -> acquireTaskLock(holder, false)); holderList.getPending().forEach(holder -> acquireTaskLock(holder, false));
} }
holderList.getPending().forEach(holder -> addTaskAndPersistLocks(holder, isTimeChunkLock)); holderList.getPending().forEach(holder -> addTaskAndPersistLocks(holder, isTimeChunkLock));
} }
catch (Exception e) {
holderList.clearStaleLocks(this);
throw e;
}
finally { finally {
giant.unlock(); giant.unlock();
} }
@ -711,7 +714,8 @@ public class TaskLockbox
* for the given requests. Updates the holder with the allocated segment if * for the given requests. Updates the holder with the allocated segment if
* the allocation succeeds, otherwise marks it as failed. * the allocation succeeds, otherwise marks it as failed.
*/ */
private void allocateSegmentIds( @VisibleForTesting
void allocateSegmentIds(
String dataSource, String dataSource,
Interval interval, Interval interval,
boolean skipSegmentLineageCheck, boolean skipSegmentLineageCheck,
@ -1598,6 +1602,28 @@ public class TaskLockbox
return pending; return pending;
} }
/**
* When task locks are acquired in an attempt to allocate segments, * a new lock posse might be created.
* However, the posse is associated with the task only after all the segment allocations have succeeded.
* If there is an exception, unlock all such unassociated locks.
*/
void clearStaleLocks(TaskLockbox taskLockbox)
{
all
.stream()
.filter(holder -> holder.acquiredLock != null
&& holder.taskLockPosse != null
&& !holder.taskLockPosse.containsTask(holder.task))
.forEach(holder -> {
holder.taskLockPosse.addTask(holder.task);
taskLockbox.unlock(
holder.task,
holder.acquiredLock.getInterval(),
holder.acquiredLock instanceof SegmentLock ? ((SegmentLock) holder.acquiredLock).getPartitionId() : null
);
log.info("Cleared stale lock[%s] for task[%s]", holder.acquiredLock, holder.task.getId());
});
}
List<SegmentAllocateResult> getResults() List<SegmentAllocateResult> getResults()
{ {
@ -1608,7 +1634,8 @@ public class TaskLockbox
/** /**
* Contains the task, request, lock and final result for a segment allocation. * Contains the task, request, lock and final result for a segment allocation.
*/ */
private static class SegmentAllocationHolder @VisibleForTesting
static class SegmentAllocationHolder
{ {
final AllocationHolderList list; final AllocationHolderList list;

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
@ -34,6 +35,8 @@ import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TimeChunkLock; import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateRequest;
import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.config.TaskStorageConfig;
@ -46,6 +49,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory; import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
@ -1727,6 +1731,117 @@ public class TaskLockboxTest
validator.expectActiveLocks(conflictingLock, floorLock); validator.expectActiveLocks(conflictingLock, floorLock);
} }
@Test
public void testDoNotCleanUsedLockAfterSegmentAllocationFailure()
{
final Task task = NoopTask.create();
final Interval theInterval = Intervals.of("2023/2024");
taskStorage.insert(task, TaskStatus.running(task.getId()));
final TaskLockbox testLockbox = new SegmentAllocationFailingTaskLockbox(taskStorage, metadataStorageCoordinator);
testLockbox.add(task);
final LockResult lockResult = testLockbox.tryLock(task, new TimeChunkLockRequest(
TaskLockType.SHARED,
task,
theInterval,
null
));
Assert.assertTrue(lockResult.isOk());
SegmentAllocateRequest request = new SegmentAllocateRequest(
task,
new SegmentAllocateAction(
task.getDataSource(),
DateTimes.of("2023-01-01"),
Granularities.NONE,
Granularities.YEAR,
task.getId(),
null,
false,
null,
null,
TaskLockType.SHARED
),
90
);
try {
testLockbox.allocateSegments(
ImmutableList.of(request),
"DS",
theInterval,
false,
LockGranularity.TIME_CHUNK
);
}
catch (Exception e) {
// do nothing
}
Assert.assertFalse(testLockbox.getAllLocks().isEmpty());
Assert.assertEquals(
lockResult.getTaskLock(),
testLockbox.getOnlyTaskLockPosseContainingInterval(task, theInterval).get(0).getTaskLock()
);
}
@Test
public void testCleanUpLocksAfterSegmentAllocationFailure()
{
final Task task = NoopTask.create();
taskStorage.insert(task, TaskStatus.running(task.getId()));
final TaskLockbox testLockbox = new SegmentAllocationFailingTaskLockbox(taskStorage, metadataStorageCoordinator);
testLockbox.add(task);
SegmentAllocateRequest request0 = new SegmentAllocateRequest(
task,
new SegmentAllocateAction(
task.getDataSource(),
DateTimes.of("2023-01-01"),
Granularities.NONE,
Granularities.YEAR,
task.getId(),
null,
false,
null,
null,
TaskLockType.SHARED
),
90
);
SegmentAllocateRequest request1 = new SegmentAllocateRequest(
task,
new SegmentAllocateAction(
task.getDataSource(),
DateTimes.of("2023-01-01"),
Granularities.NONE,
Granularities.MONTH,
task.getId(),
null,
false,
null,
null,
TaskLockType.SHARED
),
90
);
try {
testLockbox.allocateSegments(
ImmutableList.of(request0, request1),
"DS",
Intervals.of("2023/2024"),
false,
LockGranularity.TIME_CHUNK
);
}
catch (Exception e) {
// do nothing
}
Assert.assertTrue(testLockbox.getAllLocks().isEmpty());
}
private class TaskLockboxValidator private class TaskLockboxValidator
{ {
@ -1953,4 +2068,26 @@ public class TaskLockboxTest
.contains("FailingLockAcquisition") ? null : super.verifyAndCreateOrFindLockPosse(task, taskLock); .contains("FailingLockAcquisition") ? null : super.verifyAndCreateOrFindLockPosse(task, taskLock);
} }
} }
private static class SegmentAllocationFailingTaskLockbox extends TaskLockbox
{
public SegmentAllocationFailingTaskLockbox(
TaskStorage taskStorage,
IndexerMetadataStorageCoordinator metadataStorageCoordinator
)
{
super(taskStorage, metadataStorageCoordinator);
}
@Override
void allocateSegmentIds(
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
Collection<SegmentAllocationHolder> holders
)
{
throw new RuntimeException("This lockbox cannot allocate segemnts.");
}
}
} }