diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 54191adf05d..da64198dd00 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -1220,6 +1220,14 @@ public class TaskLockbox try { try { log.info("Removing task[%s] from activeTasks", task.getId()); + if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == TaskLockType.REPLACE)) { + final int upgradeSegmentsDeleted = metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId()); + log.info( + "Deleted [%d] entries from upgradeSegments table for task[%s] with REPLACE locks.", + upgradeSegmentsDeleted, + task.getId() + ); + } unlockAll(task); } finally { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 14597bd7270..17ced86cfa3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -1919,6 +1919,36 @@ public class TaskLockboxTest Assert.assertTrue(testLockbox.getAllLocks().isEmpty()); } + @Test + public void testUpgradeSegmentsCleanupOnUnlock() + { + final Task replaceTask = NoopTask.create(); + final Task appendTask = NoopTask.create(); + final IndexerSQLMetadataStorageCoordinator coordinator + = EasyMock.createMock(IndexerSQLMetadataStorageCoordinator.class); + // Only the replaceTask should attempt a delete on the upgradeSegments table + EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once(); + EasyMock.replay(coordinator); + + final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator); + + taskLockbox.add(replaceTask); + taskLockbox.tryLock( + replaceTask, + new TimeChunkLockRequest(TaskLockType.REPLACE, replaceTask, Intervals.of("2024/2025"), "v0") + ); + + taskLockbox.add(appendTask); + taskLockbox.tryLock( + appendTask, + new TimeChunkLockRequest(TaskLockType.APPEND, appendTask, Intervals.of("2024/2025"), "v0") + ); + + taskLockbox.remove(replaceTask); + taskLockbox.remove(appendTask); + + EasyMock.verify(coordinator); + } private class TaskLockboxValidator { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index bca79a559af..2fc80adceac 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -277,6 +277,12 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto return null; } + @Override + public int deleteUpgradeSegmentsForTask(final String taskId) + { + throw new UnsupportedOperationException(); + } + public Set getPublished() { return ImmutableSet.copyOf(published); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 2a032545d72..dd0f7d8c98a 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -443,4 +443,11 @@ public interface IndexerMetadataStorageCoordinator * @return DataSegment used segment corresponding to given id */ DataSegment retrieveSegmentForId(String id, boolean includeUnused); + + /** + * Delete entries from the upgrade segments table after the corresponding replace task has ended + * @param taskId - id of the task with replace locks + * @return number of deleted entries from the metadata store + */ + int deleteUpgradeSegmentsForTask(String taskId); } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 252386c56bd..69dca46ea1c 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -2707,6 +2707,22 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor ); } + @Override + public int deleteUpgradeSegmentsForTask(final String taskId) + { + return connector.getDBI().inTransaction( + (handle, status) -> handle + .createStatement( + StringUtils.format( + "DELETE FROM %s WHERE task_id = :task_id", + dbTables.getUpgradeSegmentsTable() + ) + ) + .bind("task_id", taskId) + .execute() + ); + } + private static class PendingSegmentsRecord { private final String sequenceName; diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 9132fb23630..4c3534feacd 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -981,6 +981,37 @@ public class IndexerSQLMetadataStorageCoordinatorTest Assert.assertEquals(defaultSegment, coordinator.retrieveSegmentForId(defaultSegment.getId().toString(), true)); } + @Test + public void testCleanUpgradeSegmentsTableForTask() + { + final String taskToClean = "taskToClean"; + final ReplaceTaskLock replaceLockToClean = new ReplaceTaskLock( + taskToClean, + Intervals.of("2023-01-01/2023-02-01"), + "2023-03-01" + ); + DataSegment segmentToClean0 = createSegment( + Intervals.of("2023-01-01/2023-02-01"), + "2023-02-01", + new NumberedShardSpec(0, 0) + ); + DataSegment segmentToClean1 = createSegment( + Intervals.of("2023-01-01/2023-01-02"), + "2023-01-02", + new NumberedShardSpec(0, 0) + ); + insertIntoUpgradeSegmentsTable( + ImmutableMap.of(segmentToClean0, replaceLockToClean, segmentToClean1, replaceLockToClean) + ); + + // Unrelated task should not result in clean up + Assert.assertEquals(0, coordinator.deleteUpgradeSegmentsForTask("someRandomTask")); + // The two segment entries are deleted + Assert.assertEquals(2, coordinator.deleteUpgradeSegmentsForTask(taskToClean)); + // Nothing further to delete + Assert.assertEquals(0, coordinator.deleteUpgradeSegmentsForTask(taskToClean)); + } + @Test public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOException {