Fix issue that tasks tables in metadata storage are not cleared (#6592)

* tasks tables in metadata storage are not cleared

* address comments. remove tasklogs and revert obsolete changes

* address comments. change comment and update doc.

* address comments. update doc more detailed

* address comments. remove redundant log and update doc more detailed.

* address comments. update document
This commit is contained in:
seoeun 2018-11-22 12:50:31 +09:00 committed by Mingming Qiu
parent 92cce04165
commit 22a5bf97a2
10 changed files with 173 additions and 5 deletions

View File

@ -130,6 +130,13 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
*/
void removeLock(long lockId);
/**
* Remove the tasks created older than the given timestamp.
*
* @param timestamp timestamp in milliseconds
*/
void removeTasksOlderThan(long timestamp);
/**
* Add a log to the entry with the given id.
*

View File

@ -544,13 +544,13 @@ If you are running the indexing service in remote mode, the task logs must be st
|--------|-----------|-------|
|`druid.indexer.logs.type`|Choices:noop, s3, azure, google, hdfs, file. Where to store task logs|file|
You can also configure the Overlord to automatically retain the task logs only for last x milliseconds by configuring following additional properties.
You can also configure the Overlord to automatically retain the task logs in log directory and entries in task-related metadata storage tables only for last x milliseconds by configuring following additional properties.
Caution: Automatic log file deletion typically works based on log file modification timestamp on the backing store, so large clock skews between druid nodes and backing store nodes might result in un-intended behavior.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. |false|
|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In milliseconds, task logs to be retained created in last x milliseconds. |None|
|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. If set to true, overlord will submit kill tasks periodically based on `druid.indexer.logs.kill.delay` specified, which will delete task logs from the log directory as well as tasks and tasklogs table entries in metadata storage except for tasks created in the last `druid.indexer.logs.kill.durationToRetain` period. |false|
|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In milliseconds, task logs and entries in task-related metadata storage tables to be retained created in last x milliseconds. |None|
|`druid.indexer.logs.kill.initialDelay`| Optional. Number of milliseconds after overlord start when first auto kill is run. |random value less than 300000 (5 mins)|
|`druid.indexer.logs.kill.delay`|Optional. Number of milliseconds of delay between successive executions of auto kill run. |21600000 (6 hours)|

View File

@ -363,6 +363,26 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
}
@Override
public void removeTasksOlderThan(final long timestamp)
{
giant.lock();
try {
List<String> taskIds = tasks.entrySet().stream()
.filter(entry -> entry.getValue().getStatus().isComplete()
&& entry.getValue().getCreatedDate().isBefore(timestamp))
.map(entry -> entry.getKey())
.collect(Collectors.toList());
taskIds.forEach(taskActions::removeAll);
taskIds.forEach(tasks::remove);
}
finally {
giant.unlock();
}
}
@Override
public List<TaskLock> getLocks(final String taskid)
{

View File

@ -276,6 +276,12 @@ public class MetadataTaskStorage implements TaskStorage
}
}
@Override
public void removeTasksOlderThan(long timestamp)
{
handler.removeTasksOlderThan(timestamp);
}
@Override
public List<TaskLock> getLocks(String taskid)
{

View File

@ -75,6 +75,13 @@ public interface TaskStorage
*/
void removeLock(String taskid, TaskLock taskLock);
/**
* Remove the tasks created older than the given timestamp.
*
* @param timestamp timestamp in milliseconds
*/
void removeTasksOlderThan(long timestamp);
/**
* Returns task as stored in the storage facility. If the task ID does not exist, this will return an
* absentee Optional.

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.overlord.helpers;
import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.tasklogs.TaskLogKiller;
@ -35,15 +36,18 @@ public class TaskLogAutoCleaner implements OverlordHelper
private final TaskLogKiller taskLogKiller;
private final TaskLogAutoCleanerConfig config;
private final TaskStorage taskStorage;
@Inject
public TaskLogAutoCleaner(
TaskLogKiller taskLogKiller,
TaskLogAutoCleanerConfig config
TaskLogAutoCleanerConfig config,
TaskStorage taskStorage
)
{
this.taskLogKiller = taskLogKiller;
this.config = config;
this.taskStorage = taskStorage;
}
@Override
@ -67,7 +71,9 @@ public class TaskLogAutoCleaner implements OverlordHelper
public void run()
{
try {
taskLogKiller.killOlderThan(System.currentTimeMillis() - config.getDurationToRetain());
long timestamp = System.currentTimeMillis() - config.getDurationToRetain();
taskLogKiller.killOlderThan(timestamp);
taskStorage.removeTasksOlderThan(timestamp);
}
catch (Exception ex) {
log.error(ex, "Failed to clean-up the task logs");

View File

@ -91,4 +91,13 @@ public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, L
}
return sql;
}
@Deprecated
@Override
public String getSqlRemoveLogsOlderThan()
{
return StringUtils.format("DELETE FROM %s WHERE %s_id in ("
+ " SELECT id FROM %s WHERE created_date < :date_time and active = false)",
getLogTable(), getEntryTypeName(), getEntryTable());
}
}

View File

@ -89,4 +89,14 @@ public class PostgreSQLMetadataStorageActionHandler<EntryType, StatusType, LogTy
}
return sql;
}
@Deprecated
@Override
public String getSqlRemoveLogsOlderThan()
{
return StringUtils.format("DELETE FROM %s USING %s "
+ "WHERE %s_id = %s.id AND created_date < :date_time and active = false",
getLogTable(), getEntryTable(), getEntryTypeName(), getEntryTable());
}
}

View File

@ -113,6 +113,16 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
return entryTable;
}
protected String getLogTable()
{
return logTable;
}
protected String getEntryTypeName()
{
return entryTypeName;
}
public TypeReference getEntryType()
{
return entryType;
@ -439,6 +449,29 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
);
}
@Override
public void removeTasksOlderThan(final long timestamp)
{
DateTime dateTime = DateTimes.utc(timestamp);
connector.retryWithHandle(
(HandleCallback<Void>) handle -> {
handle.createStatement(getSqlRemoveLogsOlderThan())
.bind("date_time", dateTime.toString())
.execute();
handle.createStatement(
StringUtils.format(
"DELETE FROM %s WHERE created_date < :date_time AND active = false",
entryTable
)
)
.bind("date_time", dateTime.toString())
.execute();
return null;
}
);
}
private int removeLock(Handle handle, long lockId)
{
return handle.createStatement(StringUtils.format("DELETE FROM %s WHERE id = :id", lockTable))
@ -508,6 +541,15 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
);
}
@Deprecated
public String getSqlRemoveLogsOlderThan()
{
return StringUtils.format("DELETE a FROM %s a INNER JOIN %s b ON a.%s_id = b.id "
+ "WHERE b.created_date < :date_time and b.active = false",
logTable, entryTable, entryTypeName
);
}
@Override
public Map<Long, LockType> getLocks(final String entryId)
{

View File

@ -381,4 +381,65 @@ public class SQLMetadataStorageActionHandlerTest
Assert.assertNotNull(handler.getLockId(entryId, lock1));
Assert.assertNull(handler.getLockId(entryId, lock2));
}
@Test
public void testRemoveTasksOlderThan() throws Exception
{
final String entryId1 = "1234";
Map<String, Integer> entry1 = ImmutableMap.of("numericId", 1234);
Map<String, Integer> status1 = ImmutableMap.of("count", 42, "temp", 1);
handler.insert(entryId1, DateTimes.of("2014-01-01T00:00:00.123"), "testDataSource", entry1, false, status1);
Assert.assertTrue(handler.addLog(entryId1, ImmutableMap.of("logentry", "created")));
final String entryId2 = "ABC123";
Map<String, Integer> entry2 = ImmutableMap.of("a", 1);
Map<String, Integer> status2 = ImmutableMap.of("count", 42);
handler.insert(entryId2, DateTimes.of("2014-01-01T00:00:00.123"), "test", entry2, true, status2);
Assert.assertTrue(handler.addLog(entryId2, ImmutableMap.of("logentry", "created")));
final String entryId3 = "DEF5678";
Map<String, Integer> entry3 = ImmutableMap.of("numericId", 5678);
Map<String, Integer> status3 = ImmutableMap.of("count", 21, "temp", 2);
handler.insert(entryId3, DateTimes.of("2014-01-02T12:00:00.123"), "testDataSource", entry3, false, status3);
Assert.assertTrue(handler.addLog(entryId3, ImmutableMap.of("logentry", "created")));
Assert.assertEquals(Optional.of(entry1), handler.getEntry(entryId1));
Assert.assertEquals(Optional.of(entry2), handler.getEntry(entryId2));
Assert.assertEquals(Optional.of(entry3), handler.getEntry(entryId3));
Assert.assertEquals(
ImmutableList.of(entryId2),
handler.getActiveTaskInfo(null).stream()
.map(taskInfo -> taskInfo.getId())
.collect(Collectors.toList())
);
Assert.assertEquals(
ImmutableList.of(entryId3, entryId1),
handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null).stream()
.map(taskInfo -> taskInfo.getId())
.collect(Collectors.toList())
);
handler.removeTasksOlderThan(DateTimes.of("2014-01-02").getMillis());
// active task not removed.
Assert.assertEquals(
ImmutableList.of(entryId2),
handler.getActiveTaskInfo(null).stream()
.map(taskInfo -> taskInfo.getId())
.collect(Collectors.toList())
);
Assert.assertEquals(
ImmutableList.of(entryId3),
handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null).stream()
.map(taskInfo -> taskInfo.getId())
.collect(Collectors.toList())
);
// tasklogs
Assert.assertEquals(0, handler.getLogs(entryId1).size());
Assert.assertEquals(1, handler.getLogs(entryId2).size());
Assert.assertEquals(1, handler.getLogs(entryId3).size());
}
}