diff --git a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java index 1614464d76a..d25829cd0bc 100644 --- a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java +++ b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java @@ -130,6 +130,13 @@ public interface MetadataStorageActionHandler taskIds = tasks.entrySet().stream() + .filter(entry -> entry.getValue().getStatus().isComplete() + && entry.getValue().getCreatedDate().isBefore(timestamp)) + .map(entry -> entry.getKey()) + .collect(Collectors.toList()); + + taskIds.forEach(taskActions::removeAll); + taskIds.forEach(tasks::remove); + } + finally { + giant.unlock(); + } + } + @Override public List getLocks(final String taskid) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index f0d9d37167f..8690c68ded0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -276,6 +276,12 @@ public class MetadataTaskStorage implements TaskStorage } } + @Override + public void removeTasksOlderThan(long timestamp) + { + handler.removeTasksOlderThan(timestamp); + } + @Override public List getLocks(String taskid) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java index 1edd52c2c38..d8a4806bb57 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java @@ -75,6 +75,13 @@ public interface TaskStorage */ void removeLock(String taskid, TaskLock taskLock); + /** + * Remove the tasks created older than the given timestamp. + * + * @param timestamp timestamp in milliseconds + */ + void removeTasksOlderThan(long timestamp); + /** * Returns task as stored in the storage facility. If the task ID does not exist, this will return an * absentee Optional. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java index 18da374fae7..4ab54404925 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.overlord.helpers; import com.google.inject.Inject; +import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.tasklogs.TaskLogKiller; @@ -35,15 +36,18 @@ public class TaskLogAutoCleaner implements OverlordHelper private final TaskLogKiller taskLogKiller; private final TaskLogAutoCleanerConfig config; + private final TaskStorage taskStorage; @Inject public TaskLogAutoCleaner( TaskLogKiller taskLogKiller, - TaskLogAutoCleanerConfig config + TaskLogAutoCleanerConfig config, + TaskStorage taskStorage ) { this.taskLogKiller = taskLogKiller; this.config = config; + this.taskStorage = taskStorage; } @Override @@ -67,7 +71,9 @@ public class TaskLogAutoCleaner implements OverlordHelper public void run() { try { - taskLogKiller.killOlderThan(System.currentTimeMillis() - config.getDurationToRetain()); + long timestamp = System.currentTimeMillis() - config.getDurationToRetain(); + taskLogKiller.killOlderThan(timestamp); + taskStorage.removeTasksOlderThan(timestamp); } catch (Exception ex) { log.error(ex, "Failed to clean-up the task logs"); diff --git a/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java index 039a6427f02..bff95785f8f 100644 --- a/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java @@ -91,4 +91,13 @@ public class DerbyMetadataStorageActionHandler) handle -> { + handle.createStatement(getSqlRemoveLogsOlderThan()) + .bind("date_time", dateTime.toString()) + .execute(); + handle.createStatement( + StringUtils.format( + "DELETE FROM %s WHERE created_date < :date_time AND active = false", + entryTable + ) + ) + .bind("date_time", dateTime.toString()) + .execute(); + + return null; + } + ); + } + private int removeLock(Handle handle, long lockId) { return handle.createStatement(StringUtils.format("DELETE FROM %s WHERE id = :id", lockTable)) @@ -508,6 +541,15 @@ public abstract class SQLMetadataStorageActionHandler getLocks(final String entryId) { diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java index 46dc7f49049..c34ff93ebbc 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java @@ -381,4 +381,65 @@ public class SQLMetadataStorageActionHandlerTest Assert.assertNotNull(handler.getLockId(entryId, lock1)); Assert.assertNull(handler.getLockId(entryId, lock2)); } + + @Test + public void testRemoveTasksOlderThan() throws Exception + { + final String entryId1 = "1234"; + Map entry1 = ImmutableMap.of("numericId", 1234); + Map status1 = ImmutableMap.of("count", 42, "temp", 1); + handler.insert(entryId1, DateTimes.of("2014-01-01T00:00:00.123"), "testDataSource", entry1, false, status1); + Assert.assertTrue(handler.addLog(entryId1, ImmutableMap.of("logentry", "created"))); + + final String entryId2 = "ABC123"; + Map entry2 = ImmutableMap.of("a", 1); + Map status2 = ImmutableMap.of("count", 42); + handler.insert(entryId2, DateTimes.of("2014-01-01T00:00:00.123"), "test", entry2, true, status2); + Assert.assertTrue(handler.addLog(entryId2, ImmutableMap.of("logentry", "created"))); + + final String entryId3 = "DEF5678"; + Map entry3 = ImmutableMap.of("numericId", 5678); + Map status3 = ImmutableMap.of("count", 21, "temp", 2); + handler.insert(entryId3, DateTimes.of("2014-01-02T12:00:00.123"), "testDataSource", entry3, false, status3); + Assert.assertTrue(handler.addLog(entryId3, ImmutableMap.of("logentry", "created"))); + + Assert.assertEquals(Optional.of(entry1), handler.getEntry(entryId1)); + Assert.assertEquals(Optional.of(entry2), handler.getEntry(entryId2)); + Assert.assertEquals(Optional.of(entry3), handler.getEntry(entryId3)); + + Assert.assertEquals( + ImmutableList.of(entryId2), + handler.getActiveTaskInfo(null).stream() + .map(taskInfo -> taskInfo.getId()) + .collect(Collectors.toList()) + ); + + Assert.assertEquals( + ImmutableList.of(entryId3, entryId1), + handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null).stream() + .map(taskInfo -> taskInfo.getId()) + .collect(Collectors.toList()) + + ); + + handler.removeTasksOlderThan(DateTimes.of("2014-01-02").getMillis()); + // active task not removed. + Assert.assertEquals( + ImmutableList.of(entryId2), + handler.getActiveTaskInfo(null).stream() + .map(taskInfo -> taskInfo.getId()) + .collect(Collectors.toList()) + ); + Assert.assertEquals( + ImmutableList.of(entryId3), + handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null).stream() + .map(taskInfo -> taskInfo.getId()) + .collect(Collectors.toList()) + + ); + // tasklogs + Assert.assertEquals(0, handler.getLogs(entryId1).size()); + Assert.assertEquals(1, handler.getLogs(entryId2).size()); + Assert.assertEquals(1, handler.getLogs(entryId3).size()); + } }