Clean up stale entries from upgradeSegments table (#15637)

* Clean up stale entries from upgradeSegments table
This commit is contained in:
AmatyaAvadhanula 2024-01-17 20:49:52 +05:30 committed by GitHub
parent fc06f2d075
commit a26defd64b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 98 additions and 0 deletions

View File

@ -1220,6 +1220,14 @@ public class TaskLockbox
try { try {
try { try {
log.info("Removing task[%s] from activeTasks", task.getId()); log.info("Removing task[%s] from activeTasks", task.getId());
if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == TaskLockType.REPLACE)) {
final int upgradeSegmentsDeleted = metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
log.info(
"Deleted [%d] entries from upgradeSegments table for task[%s] with REPLACE locks.",
upgradeSegmentsDeleted,
task.getId()
);
}
unlockAll(task); unlockAll(task);
} }
finally { finally {

View File

@ -1919,6 +1919,36 @@ public class TaskLockboxTest
Assert.assertTrue(testLockbox.getAllLocks().isEmpty()); Assert.assertTrue(testLockbox.getAllLocks().isEmpty());
} }
@Test
public void testUpgradeSegmentsCleanupOnUnlock()
{
final Task replaceTask = NoopTask.create();
final Task appendTask = NoopTask.create();
final IndexerSQLMetadataStorageCoordinator coordinator
= EasyMock.createMock(IndexerSQLMetadataStorageCoordinator.class);
// Only the replaceTask should attempt a delete on the upgradeSegments table
EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once();
EasyMock.replay(coordinator);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator);
taskLockbox.add(replaceTask);
taskLockbox.tryLock(
replaceTask,
new TimeChunkLockRequest(TaskLockType.REPLACE, replaceTask, Intervals.of("2024/2025"), "v0")
);
taskLockbox.add(appendTask);
taskLockbox.tryLock(
appendTask,
new TimeChunkLockRequest(TaskLockType.APPEND, appendTask, Intervals.of("2024/2025"), "v0")
);
taskLockbox.remove(replaceTask);
taskLockbox.remove(appendTask);
EasyMock.verify(coordinator);
}
private class TaskLockboxValidator private class TaskLockboxValidator
{ {

View File

@ -277,6 +277,12 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
return null; return null;
} }
@Override
public int deleteUpgradeSegmentsForTask(final String taskId)
{
throw new UnsupportedOperationException();
}
public Set<DataSegment> getPublished() public Set<DataSegment> getPublished()
{ {
return ImmutableSet.copyOf(published); return ImmutableSet.copyOf(published);

View File

@ -443,4 +443,11 @@ public interface IndexerMetadataStorageCoordinator
* @return DataSegment used segment corresponding to given id * @return DataSegment used segment corresponding to given id
*/ */
DataSegment retrieveSegmentForId(String id, boolean includeUnused); DataSegment retrieveSegmentForId(String id, boolean includeUnused);
/**
* Delete entries from the upgrade segments table after the corresponding replace task has ended
* @param taskId - id of the task with replace locks
* @return number of deleted entries from the metadata store
*/
int deleteUpgradeSegmentsForTask(String taskId);
} }

View File

@ -2707,6 +2707,22 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
); );
} }
@Override
public int deleteUpgradeSegmentsForTask(final String taskId)
{
return connector.getDBI().inTransaction(
(handle, status) -> handle
.createStatement(
StringUtils.format(
"DELETE FROM %s WHERE task_id = :task_id",
dbTables.getUpgradeSegmentsTable()
)
)
.bind("task_id", taskId)
.execute()
);
}
private static class PendingSegmentsRecord private static class PendingSegmentsRecord
{ {
private final String sequenceName; private final String sequenceName;

View File

@ -981,6 +981,37 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals(defaultSegment, coordinator.retrieveSegmentForId(defaultSegment.getId().toString(), true)); Assert.assertEquals(defaultSegment, coordinator.retrieveSegmentForId(defaultSegment.getId().toString(), true));
} }
@Test
public void testCleanUpgradeSegmentsTableForTask()
{
final String taskToClean = "taskToClean";
final ReplaceTaskLock replaceLockToClean = new ReplaceTaskLock(
taskToClean,
Intervals.of("2023-01-01/2023-02-01"),
"2023-03-01"
);
DataSegment segmentToClean0 = createSegment(
Intervals.of("2023-01-01/2023-02-01"),
"2023-02-01",
new NumberedShardSpec(0, 0)
);
DataSegment segmentToClean1 = createSegment(
Intervals.of("2023-01-01/2023-01-02"),
"2023-01-02",
new NumberedShardSpec(0, 0)
);
insertIntoUpgradeSegmentsTable(
ImmutableMap.of(segmentToClean0, replaceLockToClean, segmentToClean1, replaceLockToClean)
);
// Unrelated task should not result in clean up
Assert.assertEquals(0, coordinator.deleteUpgradeSegmentsForTask("someRandomTask"));
// The two segment entries are deleted
Assert.assertEquals(2, coordinator.deleteUpgradeSegmentsForTask(taskToClean));
// Nothing further to delete
Assert.assertEquals(0, coordinator.deleteUpgradeSegmentsForTask(taskToClean));
}
@Test @Test
public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOException public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOException
{ {