Refactor: Add OverlordDuty to replace OverlordHelper and align with CoordinatorDuty (#14235)

Changes:
- Replace `OverlordHelper` with `OverlordDuty` to align with `CoordinatorDuty`
  - Each duty has a `run()` method and defines a `Schedule` with an initial delay and period.
  - Update existing duties `TaskLogAutoCleaner` and `DurableStorageCleaner`
- Add utility class `Configs`
- Update log, error messages and javadocs
- Other minor style improvements
This commit is contained in:
Kashif Faraz 2023-05-12 22:39:56 +05:30 committed by GitHub
parent 6254658f61
commit ba11b3d462
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 626 additions and 253 deletions

View File

@ -29,7 +29,7 @@ import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.overlord.helpers.OverlordHelper;
import org.apache.druid.indexing.overlord.duty.OverlordDuty;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.msq.indexing.DurableStorageCleaner;
import org.apache.druid.msq.indexing.DurableStorageCleanerConfig;
@ -94,7 +94,7 @@ public class MSQDurableStorageModule implements DruidModule
DurableStorageCleanerConfig.class
);
Multibinder.newSetBinder(binder, OverlordHelper.class)
Multibinder.newSetBinder(binder, OverlordDuty.class)
.addBinding()
.to(DurableStorageCleaner.class);
}

View File

@ -27,29 +27,25 @@ import org.apache.druid.frame.util.DurableStorageUtils;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.helpers.OverlordHelper;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.indexing.overlord.duty.DutySchedule;
import org.apache.druid.indexing.overlord.duty.OverlordDuty;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.guice.MultiStageQuery;
import org.apache.druid.storage.StorageConnector;
import org.joda.time.Duration;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
/**
* This method polls the durable storage for any stray directories, i.e. the ones that donot have a controller task
* This duty polls the durable storage for any stray directories, i.e. the ones that donot have a controller task
* associated with it and cleans them periodically.
* This ensures that the tasks which that have exited abruptly or have failed to clean up the durable storage themselves
* donot pollute it with worker outputs and temporary files. See {@link DurableStorageCleanerConfig} for the configs.
*/
public class DurableStorageCleaner implements OverlordHelper
public class DurableStorageCleaner implements OverlordDuty
{
private static final Logger LOG = new Logger(DurableStorageCleaner.class);
private final DurableStorageCleanerConfig config;
@ -75,66 +71,53 @@ public class DurableStorageCleaner implements OverlordHelper
}
@Override
public void schedule(ScheduledExecutorService exec)
public void run() throws Exception
{
LOG.info("Starting the DurableStorageCleaner with the config [%s]", config);
Optional<TaskRunner> taskRunnerOptional = taskMasterProvider.get().getTaskRunner();
if (!taskRunnerOptional.isPresent()) {
LOG.info("DurableStorageCleaner not running since the node is not the leader");
return;
} else {
LOG.info("Running DurableStorageCleaner");
}
ScheduledExecutors.scheduleWithFixedDelay(
exec,
Duration.standardSeconds(config.getDelaySeconds()),
// Added the initial delay explicitly so that we don't have to manually return the signal in the runnable
Duration.standardSeconds(config.getDelaySeconds()),
() -> {
try {
LOG.info("Starting the run of durable storage cleaner");
Optional<TaskRunner> taskRunnerOptional = taskMasterProvider.get().getTaskRunner();
if (!taskRunnerOptional.isPresent()) {
LOG.info("Durable storage cleaner not running since the node is not the leader");
return;
}
TaskRunner taskRunner = taskRunnerOptional.get();
Iterator<String> allFiles = storageConnector.listDir("");
Set<String> runningTaskIds = taskRunner.getRunningTasks()
.stream()
.map(TaskRunnerWorkItem::getTaskId)
.map(DurableStorageUtils::getControllerDirectory)
.collect(Collectors.toSet());
TaskRunner taskRunner = taskRunnerOptional.get();
Iterator<String> allFiles = storageConnector.listDir("");
Set<String> runningTaskIds = taskRunner.getRunningTasks()
.stream()
.map(TaskRunnerWorkItem::getTaskId)
.map(DurableStorageUtils::getControllerDirectory)
.collect(Collectors.toSet());
Set<String> filesToRemove = new HashSet<>();
while (allFiles.hasNext()) {
String currentFile = allFiles.next();
String taskIdFromPathOrEmpty = DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(currentFile);
if (taskIdFromPathOrEmpty != null && !taskIdFromPathOrEmpty.isEmpty()) {
if (runningTaskIds.contains(taskIdFromPathOrEmpty)) {
// do nothing
} else {
filesToRemove.add(currentFile);
}
}
}
if (filesToRemove.isEmpty()) {
LOG.info("DurableStorageCleaner did not find any left over directories to delete");
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Number of files [%d] that do not have a corresponding MSQ task associated with it. These are:\n[%s]\nT",
filesToRemove.size(),
filesToRemove
);
} else {
LOG.info(
"Number of files [%d] that do not have a corresponding MSQ task associated with it.",
filesToRemove.size()
);
}
storageConnector.deleteFiles(filesToRemove);
}
}
catch (IOException e) {
throw new RuntimeException("Error while running the scheduled durable storage cleanup helper", e);
}
Set<String> filesToRemove = new HashSet<>();
while (allFiles.hasNext()) {
String currentFile = allFiles.next();
String taskIdFromPathOrEmpty = DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(currentFile);
if (taskIdFromPathOrEmpty != null && !taskIdFromPathOrEmpty.isEmpty()) {
if (runningTaskIds.contains(taskIdFromPathOrEmpty)) {
// do nothing
} else {
filesToRemove.add(currentFile);
}
);
}
}
if (filesToRemove.isEmpty()) {
LOG.info("There are no leftover directories to delete.");
} else {
LOG.info(
"Removing [%d] files which are not associated with any MSQ task.",
filesToRemove.size()
);
LOG.debug("Files to remove:\n[%s]\n", filesToRemove);
storageConnector.deleteFiles(filesToRemove);
}
}
@Override
public DutySchedule getSchedule()
{
final long delayMillis = config.getDelaySeconds() * 1000;
return new DutySchedule(delayMillis, delayMillis);
}
}

View File

@ -26,18 +26,15 @@ import org.apache.druid.frame.util.DurableStorageUtils;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.duty.DutySchedule;
import org.apache.druid.storage.StorageConnector;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class DurableStorageCleanerTest
{
@ -50,40 +47,46 @@ public class DurableStorageCleanerTest
private static final String STRAY_DIR = "strayDirectory";
@Test
public void testSchedule() throws IOException, InterruptedException
public void testRun() throws Exception
{
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
try {
EasyMock.reset(TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
DurableStorageCleanerConfig durableStorageCleanerConfig = new DurableStorageCleanerConfig();
durableStorageCleanerConfig.delaySeconds = 1L;
durableStorageCleanerConfig.enabled = true;
DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(
durableStorageCleanerConfig,
STORAGE_CONNECTOR,
() -> TASK_MASTER
);
EasyMock.expect(STORAGE_CONNECTOR.listDir(EasyMock.anyString()))
.andReturn(ImmutableList.of(DurableStorageUtils.getControllerDirectory(TASK_ID), STRAY_DIR)
.stream()
.iterator())
.anyTimes();
EasyMock.expect(TASK_RUNNER_WORK_ITEM.getTaskId()).andReturn(TASK_ID)
.anyTimes();
EasyMock.expect((Collection<TaskRunnerWorkItem>) TASK_RUNNER.getRunningTasks())
.andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
.anyTimes();
EasyMock.expect(TASK_MASTER.getTaskRunner()).andReturn(Optional.of(TASK_RUNNER)).anyTimes();
Capture<Set<String>> capturedArguments = EasyMock.newCapture();
STORAGE_CONNECTOR.deleteFiles(EasyMock.capture(capturedArguments));
EasyMock.expectLastCall().once();
EasyMock.replay(TASK_MASTER, TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
durableStorageCleaner.schedule(executor);
Thread.sleep(8000L);
Assert.assertEquals(Sets.newHashSet(STRAY_DIR), capturedArguments.getValue());
}
finally {
executor.shutdownNow();
}
EasyMock.reset(TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
DurableStorageCleanerConfig durableStorageCleanerConfig = new DurableStorageCleanerConfig();
durableStorageCleanerConfig.delaySeconds = 1L;
durableStorageCleanerConfig.enabled = true;
DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(
durableStorageCleanerConfig,
STORAGE_CONNECTOR,
() -> TASK_MASTER
);
EasyMock.expect(STORAGE_CONNECTOR.listDir(EasyMock.anyString()))
.andReturn(ImmutableList.of(DurableStorageUtils.getControllerDirectory(TASK_ID), STRAY_DIR)
.stream()
.iterator())
.anyTimes();
EasyMock.expect(TASK_RUNNER_WORK_ITEM.getTaskId()).andReturn(TASK_ID)
.anyTimes();
EasyMock.expect((Collection<TaskRunnerWorkItem>) TASK_RUNNER.getRunningTasks())
.andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM))
.anyTimes();
EasyMock.expect(TASK_MASTER.getTaskRunner()).andReturn(Optional.of(TASK_RUNNER)).anyTimes();
Capture<Set<String>> capturedArguments = EasyMock.newCapture();
STORAGE_CONNECTOR.deleteFiles(EasyMock.capture(capturedArguments));
EasyMock.expectLastCall().once();
EasyMock.replay(TASK_MASTER, TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR);
durableStorageCleaner.run();
Assert.assertEquals(Sets.newHashSet(STRAY_DIR), capturedArguments.getValue());
}
@Test
public void testGetSchedule()
{
DurableStorageCleanerConfig cleanerConfig = new DurableStorageCleanerConfig();
cleanerConfig.delaySeconds = 10L;
cleanerConfig.enabled = true;
DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(cleanerConfig, null, null);
DutySchedule schedule = durableStorageCleaner.getSchedule();
Assert.assertEquals(cleanerConfig.delaySeconds * 1000, schedule.getPeriodMillis());
Assert.assertEquals(cleanerConfig.delaySeconds * 1000, schedule.getInitialDelayMillis());
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.EnumUtils;
import org.apache.druid.common.config.Configs;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
@ -144,33 +145,35 @@ public class TaskConfig
@JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask
)
{
this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir;
this.baseDir = Configs.valueOrDefault(baseDir, System.getProperty("java.io.tmpdir"));
this.baseTaskDir = new File(defaultDir(baseTaskDir, "persistent/task"));
// This is usually on HDFS or similar, so we can't use java.io.tmpdir
this.hadoopWorkingPath = hadoopWorkingPath == null ? "/tmp/druid-indexing" : hadoopWorkingPath;
this.defaultRowFlushBoundary = defaultRowFlushBoundary == null ? 75000 : defaultRowFlushBoundary;
this.defaultHadoopCoordinates = defaultHadoopCoordinates == null
? DEFAULT_DEFAULT_HADOOP_COORDINATES
: defaultHadoopCoordinates;
this.hadoopWorkingPath = Configs.valueOrDefault(hadoopWorkingPath, "/tmp/druid-indexing");
this.defaultRowFlushBoundary = Configs.valueOrDefault(defaultRowFlushBoundary, 75000);
this.defaultHadoopCoordinates = Configs.valueOrDefault(
defaultHadoopCoordinates,
DEFAULT_DEFAULT_HADOOP_COORDINATES
);
this.restoreTasksOnRestart = restoreTasksOnRestart;
this.gracefulShutdownTimeout = gracefulShutdownTimeout == null
? DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT
: gracefulShutdownTimeout;
this.directoryLockTimeout = directoryLockTimeout == null
? DEFAULT_DIRECTORY_LOCK_TIMEOUT
: directoryLockTimeout;
if (shuffleDataLocations == null) {
this.shuffleDataLocations = Collections.singletonList(
new StorageLocationConfig(new File(defaultDir(null, "intermediary-segments")), null, null)
);
} else {
this.shuffleDataLocations = shuffleDataLocations;
}
this.gracefulShutdownTimeout = Configs.valueOrDefault(
gracefulShutdownTimeout,
DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT
);
this.directoryLockTimeout = Configs.valueOrDefault(
directoryLockTimeout,
DEFAULT_DIRECTORY_LOCK_TIMEOUT
);
this.shuffleDataLocations = Configs.valueOrDefault(
shuffleDataLocations,
Collections.singletonList(
new StorageLocationConfig(new File(defaultDir(null, "intermediary-segments")), null, null)
)
);
this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource;
this.batchMemoryMappedIndex = batchMemoryMappedIndex;
this.encapsulatedTask = enableTaskLevelLogPush;
// Conflict resolution. Assume that if batchMemoryMappedIndex is set (since false is the default) that
// the user changed it intentionally to use legacy, in this case oveeride batchProcessingMode and also
// set it to legacy else just use batchProcessingMode and don't pay attention to batchMemoryMappedIndexMode:
@ -181,13 +184,15 @@ public class TaskConfig
} else {
// batchProcessingMode input string is invalid, log & use the default.
this.batchProcessingMode = BatchProcessingMode.CLOSED_SEGMENTS; // Default
log.warn("Batch processing mode argument value is null or not valid:[%s], defaulting to[%s] ",
batchProcessingMode, this.batchProcessingMode
log.warn(
"Batch processing mode argument value is null or not valid:[%s], defaulting to[%s] ",
batchProcessingMode, this.batchProcessingMode
);
}
log.debug("Batch processing mode:[%s]", this.batchProcessingMode);
this.storeEmptyColumns = storeEmptyColumns == null ? DEFAULT_STORE_EMPTY_COLUMNS : storeEmptyColumns;
this.tmpStorageBytesPerTask = tmpStorageBytesPerTask == null ? DEFAULT_TMP_STORAGE_BYTES_PER_TASK : tmpStorageBytesPerTask;
this.storeEmptyColumns = Configs.valueOrDefault(storeEmptyColumns, DEFAULT_STORE_EMPTY_COLUMNS);
this.tmpStorageBytesPerTask = Configs.valueOrDefault(tmpStorageBytesPerTask, DEFAULT_TMP_STORAGE_BYTES_PER_TASK);
}
private TaskConfig(

View File

@ -34,7 +34,7 @@ import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.helpers.OverlordHelperManager;
import org.apache.druid.indexing.overlord.duty.OverlordDutyExecutor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
@ -91,7 +91,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
final CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig,
final ServiceEmitter emitter,
final SupervisorManager supervisorManager,
final OverlordHelperManager overlordHelperManager,
final OverlordDutyExecutor overlordDutyExecutor,
@IndexingService final DruidLeaderSelector overlordLeaderSelector,
final SegmentAllocationQueue segmentAllocationQueue
)
@ -137,7 +137,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
leaderLifecycle.addManagedInstance(taskRunner);
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addManagedInstance(supervisorManager);
leaderLifecycle.addManagedInstance(overlordHelperManager);
leaderLifecycle.addManagedInstance(overlordDutyExecutor);
leaderLifecycle.addHandler(
new Lifecycle.Handler()
{

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.config.Configs;
import org.joda.time.Duration;
import org.joda.time.Period;
@ -46,7 +47,7 @@ public class TaskQueueConfig
@JsonProperty("storageSyncRate") final Period storageSyncRate
)
{
this.maxSize = maxSize == null ? Integer.MAX_VALUE : maxSize;
this.maxSize = Configs.valueOrDefault(maxSize, Integer.MAX_VALUE);
this.startDelay = defaultDuration(startDelay, "PT1M");
this.restartDelay = defaultDuration(restartDelay, "PT30S");
this.storageSyncRate = defaultDuration(storageSyncRate, "PT1M");

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.duty;
/**
* Schedule of an {@link OverlordDuty}.
*/
public class DutySchedule
{
private final long periodMillis;
private final long initialDelayMillis;
public DutySchedule(long periodMillis, long initialDelayMillis)
{
this.initialDelayMillis = initialDelayMillis;
this.periodMillis = periodMillis;
}
public long getInitialDelayMillis()
{
return initialDelayMillis;
}
public long getPeriodMillis()
{
return periodMillis;
}
}

View File

@ -17,14 +17,14 @@
* under the License.
*/
package org.apache.druid.indexing.overlord.helpers;
import java.util.concurrent.ScheduledExecutorService;
package org.apache.druid.indexing.overlord.duty;
/**
* Represents a duty scheduled to run periodically on the Overlord.
*/
public interface OverlordHelper
public interface OverlordDuty
{
boolean isEnabled();
void schedule(ScheduledExecutorService exec);
void run() throws Exception;
DutySchedule getSchedule();
}

View File

@ -17,37 +17,41 @@
* under the License.
*/
package org.apache.druid.indexing.overlord.helpers;
package org.apache.druid.indexing.overlord.duty;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.joda.time.Duration;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
/**
* Manages the execution of all overlord duties on a single-threaded executor.
*/
public class OverlordHelperManager
public class OverlordDutyExecutor
{
private static final Logger log = new Logger(OverlordHelperManager.class);
private static final Logger log = new Logger(OverlordDutyExecutor.class);
private final ScheduledExecutorFactory execFactory;
private final Set<OverlordHelper> helpers;
private final Set<OverlordDuty> duties;
private volatile ScheduledExecutorService exec;
private final Object startStopLock = new Object();
private volatile boolean started = false;
@Inject
public OverlordHelperManager(
public OverlordDutyExecutor(
ScheduledExecutorFactory scheduledExecutorFactory,
Set<OverlordHelper> helpers)
Set<OverlordDuty> duties
)
{
this.execFactory = scheduledExecutorFactory;
this.helpers = helpers;
this.duties = duties;
}
@LifecycleStart
@ -55,19 +59,15 @@ public class OverlordHelperManager
{
synchronized (startStopLock) {
if (!started) {
log.info("OverlordHelperManager is starting.");
for (OverlordHelper helper : helpers) {
if (helper.isEnabled()) {
if (exec == null) {
exec = execFactory.create(1, "Overlord-Helper-Manager-Exec--%d");
}
helper.schedule(exec);
log.info("Starting OverlordDutyExecutor.");
for (OverlordDuty duty : duties) {
if (duty.isEnabled()) {
schedule(duty);
}
}
started = true;
log.info("OverlordHelperManager is started.");
started = true;
log.info("OverlordDutyExecutor is now running.");
}
}
}
@ -77,15 +77,54 @@ public class OverlordHelperManager
{
synchronized (startStopLock) {
if (started) {
log.info("OverlordHelperManager is stopping.");
log.info("Stopping OverlordDutyExecutor.");
if (exec != null) {
exec.shutdownNow();
exec = null;
}
started = false;
log.info("OverlordHelperManager is stopped.");
log.info("OverlordDutyExecutor has been stopped.");
}
}
}
/**
* Schedules execution of the given overlord duty.
*/
private void schedule(OverlordDuty duty)
{
initExecutor();
final DutySchedule schedule = duty.getSchedule();
final String dutyName = duty.getClass().getName();
ScheduledExecutors.scheduleWithFixedDelay(
exec,
Duration.millis(schedule.getInitialDelayMillis()),
Duration.millis(schedule.getPeriodMillis()),
() -> {
try {
duty.run();
}
catch (Exception e) {
log.error(e, "Error while running duty [%s]", dutyName);
}
}
);
log.info(
"Scheduled overlord duty [%s] with initial delay [%d], period [%d].",
dutyName, schedule.getInitialDelayMillis(), schedule.getPeriodMillis()
);
}
private void initExecutor()
{
if (exec == null) {
final int numThreads = 1;
exec = execFactory.create(numThreads, "Overlord-Duty-Exec--%d");
log.info("Initialized duty executor with [%d] threads", numThreads);
}
}
}

View File

@ -17,20 +17,19 @@
* under the License.
*/
package org.apache.druid.indexing.overlord.helpers;
package org.apache.druid.indexing.overlord.duty;
import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.tasklogs.TaskLogKiller;
import org.joda.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
/**
* Periodically cleans up stale task logs from both metadata store and deep storage.
*
* @see TaskLogAutoCleanerConfig
*/
public class TaskLogAutoCleaner implements OverlordHelper
public class TaskLogAutoCleaner implements OverlordDuty
{
private static final Logger log = new Logger(TaskLogAutoCleaner.class);
@ -57,29 +56,16 @@ public class TaskLogAutoCleaner implements OverlordHelper
}
@Override
public void schedule(ScheduledExecutorService exec)
public void run() throws Exception
{
log.info("Scheduling TaskLogAutoCleaner with config [%s].", config.toString());
long timestamp = System.currentTimeMillis() - config.getDurationToRetain();
taskLogKiller.killOlderThan(timestamp);
taskStorage.removeTasksOlderThan(timestamp);
}
ScheduledExecutors.scheduleWithFixedDelay(
exec,
Duration.millis(config.getInitialDelay()),
Duration.millis(config.getDelay()),
new Runnable()
{
@Override
public void run()
{
try {
long timestamp = System.currentTimeMillis() - config.getDurationToRetain();
taskLogKiller.killOlderThan(timestamp);
taskStorage.removeTasksOlderThan(timestamp);
}
catch (Exception ex) {
log.error(ex, "Failed to clean-up the task logs");
}
}
}
);
@Override
public DutySchedule getSchedule()
{
return new DutySchedule(config.getDelay(), config.getInitialDelay());
}
}

View File

@ -17,15 +17,19 @@
* under the License.
*/
package org.apache.druid.indexing.overlord.helpers;
package org.apache.druid.indexing.overlord.duty;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.common.config.Configs;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
* Config for periodic cleanup of stale task logs from metadata store and deep
* storage.
*/
public class TaskLogAutoCleanerConfig
{
@ -50,26 +54,25 @@ public class TaskLogAutoCleanerConfig
)
{
if (enabled) {
Preconditions.checkNotNull(durationToRetain, "durationToRetain must be provided.");
Preconditions.checkNotNull(durationToRetain, "'durationToRetain' must be provided.");
}
this.enabled = enabled;
if (initialDelay == null) {
this.initialDelay = 60000 + ThreadLocalRandom.current().nextInt(4 * 60000);
} else {
this.initialDelay = initialDelay.longValue();
}
this.delay = delay == null ? 6 * 60 * 60 * 1000 : delay.longValue();
this.durationToRetain = durationToRetain == null ? Long.MAX_VALUE : durationToRetain.longValue();
this.initialDelay = Configs.valueOrDefault(
initialDelay,
60000 + ThreadLocalRandom.current().nextInt(4 * 60000)
);
this.delay = Configs.valueOrDefault(delay, TimeUnit.HOURS.toMillis(6));
this.durationToRetain = Configs.valueOrDefault(durationToRetain, Long.MAX_VALUE);
Preconditions.checkArgument(this.initialDelay > 0, "initialDelay must be > 0.");
Preconditions.checkArgument(this.delay > 0, "delay must be > 0.");
Preconditions.checkArgument(this.durationToRetain > 0, "durationToRetain must be > 0.");
Preconditions.checkArgument(this.initialDelay > 0, "'initialDelay' must be greater than 0.");
Preconditions.checkArgument(this.delay > 0, "'delay' must be greater than 0.");
Preconditions.checkArgument(this.durationToRetain > 0, "'durationToRetain' must be greater than 0.");
}
public boolean isEnabled()
{
return this.enabled;
return enabled;
}
public long getInitialDelay()

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.duty;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import java.util.Collections;
import java.util.concurrent.ScheduledExecutorService;
public class OverlordDutyExecutorTest
{
@Test
public void testStartAndStop()
{
OverlordDuty testDuty1 = Mockito.mock(OverlordDuty.class);
Mockito.when(testDuty1.isEnabled()).thenReturn(true);
Mockito.when(testDuty1.getSchedule()).thenReturn(new DutySchedule(0, 0));
OverlordDuty testDuty2 = Mockito.mock(OverlordDuty.class);
Mockito.when(testDuty2.isEnabled()).thenReturn(true);
Mockito.when(testDuty2.getSchedule()).thenReturn(new DutySchedule(0, 0));
ScheduledExecutorFactory executorFactory = Mockito.mock(ScheduledExecutorFactory.class);
ScheduledExecutorService executorService = Mockito.mock(ScheduledExecutorService.class);
Mockito.when(executorFactory.create(ArgumentMatchers.eq(1), ArgumentMatchers.anyString()))
.thenReturn(executorService);
OverlordDutyExecutor dutyExecutor =
new OverlordDutyExecutor(executorFactory, ImmutableSet.of(testDuty1, testDuty2));
// Invoke start multiple times
dutyExecutor.start();
dutyExecutor.start();
dutyExecutor.start();
// Verify that executor is initialized and each duty is scheduled
Mockito.verify(executorFactory, Mockito.times(1))
.create(ArgumentMatchers.eq(1), ArgumentMatchers.anyString());
Mockito.verify(executorService, Mockito.times(2)).schedule(
ArgumentMatchers.any(Runnable.class),
ArgumentMatchers.anyLong(),
ArgumentMatchers.any()
);
// Invoke stop multiple times
dutyExecutor.stop();
dutyExecutor.stop();
dutyExecutor.stop();
// Verify that the executor shutdown is invoked just once
Mockito.verify(executorService, Mockito.times(1)).shutdownNow();
}
@Test
public void testStartWithNoEnabledDuty()
{
OverlordDuty testDuty = Mockito.mock(OverlordDuty.class);
Mockito.when(testDuty.isEnabled()).thenReturn(false);
ScheduledExecutorFactory executorFactory = Mockito.mock(ScheduledExecutorFactory.class);
OverlordDutyExecutor dutyExecutor =
new OverlordDutyExecutor(executorFactory, Collections.singleton(testDuty));
dutyExecutor.start();
// Verify that executor is not initialized as there is no enabled duty
Mockito.verify(executorFactory, Mockito.never())
.create(ArgumentMatchers.eq(1), ArgumentMatchers.anyString());
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.indexing.overlord.helpers;
package org.apache.druid.indexing.overlord.duty;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.segment.TestHelper;

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.duty;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.tasklogs.TaskLogKiller;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import java.util.concurrent.TimeUnit;
public class TaskLogAutoCleanerTest
{
@Test
public void testRun() throws Exception
{
// Setup config and mocks
final long durationToRetain = TimeUnit.HOURS.toMillis(2);
TaskLogAutoCleanerConfig cleanerConfig =
new TaskLogAutoCleanerConfig(true, 2000L, 1000L, durationToRetain);
TaskStorage taskStorage = Mockito.mock(TaskStorage.class);
TaskLogKiller taskLogKiller = Mockito.mock(TaskLogKiller.class);
TaskLogAutoCleaner taskLogAutoCleaner =
new TaskLogAutoCleaner(taskLogKiller, cleanerConfig, taskStorage);
long expectedExpiryTime = System.currentTimeMillis() - durationToRetain;
taskLogAutoCleaner.run();
// Verify that kill on TaskStorage and TaskLogKiller is invoked with the correct timestamp
Mockito.verify(taskStorage).removeTasksOlderThan(
ArgumentMatchers.longThat(observedExpiryTime -> observedExpiryTime >= expectedExpiryTime)
);
Mockito.verify(taskLogKiller).killOlderThan(
ArgumentMatchers.longThat(observedExpiryTime -> observedExpiryTime >= expectedExpiryTime)
);
}
@Test
public void testGetSchedule()
{
TaskLogAutoCleanerConfig cleanerConfig = new TaskLogAutoCleanerConfig(true, 2000L, 1000L, 60_000L);
TaskLogAutoCleaner taskLogAutoCleaner = new TaskLogAutoCleaner(null, cleanerConfig, null);
Assert.assertTrue(taskLogAutoCleaner.isEnabled());
final DutySchedule schedule = taskLogAutoCleaner.getSchedule();
Assert.assertEquals(cleanerConfig.getDelay(), schedule.getPeriodMillis());
Assert.assertEquals(cleanerConfig.getInitialDelay(), schedule.getInitialDelayMillis());
}
@Test
public void testIsEnabled()
{
TaskLogAutoCleanerConfig cleanerConfig = new TaskLogAutoCleanerConfig(false, 2000L, 1000L, 60_000L);
TaskLogAutoCleaner taskLogAutoCleaner = new TaskLogAutoCleaner(null, cleanerConfig, null);
Assert.assertFalse(taskLogAutoCleaner.isEnabled());
}
}

View File

@ -61,7 +61,7 @@ import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.helpers.OverlordHelperManager;
import org.apache.druid.indexing.overlord.duty.OverlordDutyExecutor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import org.apache.druid.java.util.common.Intervals;
@ -226,7 +226,7 @@ public class OverlordTest
new CoordinatorOverlordServiceConfig(null, null),
serviceEmitter,
supervisorManager,
EasyMock.createNiceMock(OverlordHelperManager.class),
EasyMock.createNiceMock(OverlordDutyExecutor.class),
new TestDruidLeaderSelector(),
EasyMock.createNiceMock(SegmentAllocationQueue.class)
);

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.common.config;
/**
* Utility class for common config operations.
*/
public class Configs
{
/**
* Returns the given {@code value} if it is not null, otherwise returns the
* {@code defaultValue}.
*/
public static long valueOrDefault(Long value, long defaultValue)
{
return value == null ? defaultValue : value;
}
/**
* Returns the given {@code value} if it is not null, otherwise returns the
* {@code defaultValue}.
*/
public static int valueOrDefault(Integer value, int defaultValue)
{
return value == null ? defaultValue : value;
}
/**
* Returns the given {@code value} if it is not null, otherwise returns the
* {@code defaultValue}.
*/
public static boolean valueOrDefault(Boolean value, boolean defaultValue)
{
return value == null ? defaultValue : value;
}
/**
* Returns the given {@code value} if it is not null, otherwise returns the
* {@code defaultValue}.
*/
public static <T> T valueOrDefault(T value, T defaultValue)
{
return value == null ? defaultValue : value;
}
}

View File

@ -24,6 +24,7 @@ import org.apache.druid.guice.annotations.ExtensionPoint;
import java.io.IOException;
/**
* Cleans up stale task logs from deep storage.
*/
@ExtensionPoint
public interface TaskLogKiller

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.common.config;
import org.junit.Assert;
import org.junit.Test;
public class ConfigsTest
{
@Test
public void testValueOrDefault()
{
Assert.assertEquals(10, Configs.valueOrDefault((Integer) 10, 11));
Assert.assertEquals(11, Configs.valueOrDefault((Integer) null, 11));
Assert.assertEquals(10, Configs.valueOrDefault((Long) 10L, 11L));
Assert.assertEquals(11, Configs.valueOrDefault(null, 11L));
Assert.assertFalse(Configs.valueOrDefault((Boolean) false, true));
Assert.assertTrue(Configs.valueOrDefault(null, true));
Assert.assertEquals("abc", Configs.valueOrDefault("abc", "def"));
Assert.assertEquals("def", Configs.valueOrDefault(null, "def"));
}
}

View File

@ -793,7 +793,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
{
DateTime dateTime = DateTimes.utc(timestamp);
connector.retryWithHandle(
(HandleCallback<Void>) handle -> {
handle -> {
handle.createStatement(getSqlRemoveLogsOlderThan())
.bind("date_time", dateTime.toString())
.execute();
@ -883,9 +883,10 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
@Deprecated
public String getSqlRemoveLogsOlderThan()
{
return StringUtils.format("DELETE a FROM %s a INNER JOIN %s b ON a.%s_id = b.id "
+ "WHERE b.created_date < :date_time and b.active = false",
logTable, entryTable, entryTypeName
return StringUtils.format(
"DELETE a FROM %s a INNER JOIN %s b ON a.%s_id = b.id "
+ "WHERE b.created_date < :date_time and b.active = false",
logTable, entryTable, entryTypeName
);
}
@ -970,21 +971,16 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
{
List<TaskIdentifier> taskIdentifiers = new ArrayList<>();
connector.retryWithHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
String sql = StringUtils.format(
"SELECT * FROM %1$s WHERE id > '%2$s' AND type IS null ORDER BY id %3$s",
tableName,
id,
connector.limitClause(limit)
);
Query<Map<String, Object>> query = handle.createQuery(sql);
taskIdentifiers.addAll(query.map(taskIdentifierMapper).list());
return null;
}
handle -> {
String sql = StringUtils.format(
"SELECT * FROM %1$s WHERE id > '%2$s' AND type IS null ORDER BY id %3$s",
tableName,
id,
connector.limitClause(limit)
);
Query<Map<String, Object>> query = handle.createQuery(sql);
taskIdentifiers.addAll(query.map(taskIdentifierMapper).list());
return null;
}
);
return taskIdentifiers;
@ -993,20 +989,16 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
private void updateTaskMetadatas(String tasksTable, List<TaskIdentifier> taskIdentifiers)
{
connector.retryWithHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
Batch batch = handle.createBatch();
String sql = "UPDATE %1$s SET type = '%2$s', group_id = '%3$s' WHERE id = '%4$s'";
for (TaskIdentifier metadata : taskIdentifiers) {
batch.add(StringUtils.format(sql, tasksTable, metadata.getType(), metadata.getGroupId(), metadata.getId())
);
}
batch.execute();
return null;
handle -> {
Batch batch = handle.createBatch();
String sql = "UPDATE %1$s SET type = '%2$s', group_id = '%3$s' WHERE id = '%4$s'";
for (TaskIdentifier metadata : taskIdentifiers) {
batch.add(
StringUtils.format(sql, tasksTable, metadata.getType(), metadata.getGroupId(), metadata.getId())
);
}
batch.execute();
return null;
}
);
}
@ -1015,9 +1007,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
public void populateTaskTypeAndGroupIdAsync()
{
ExecutorService executorService = Executors.newSingleThreadExecutor();
taskMigrationCompleteFuture = executorService.submit(
() -> populateTaskTypeAndGroupId()
);
taskMigrationCompleteFuture = executorService.submit(this::populateTaskTypeAndGroupId);
}
/**

View File

@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.common.config.Configs;
/**
*/
@ -37,11 +38,15 @@ public class CoordinatorOverlordServiceConfig
@JsonProperty("overlordService") String overlordService
)
{
this.enabled = enabled == null ? false : enabled.booleanValue();
this.enabled = Configs.valueOrDefault(enabled, false);
this.overlordService = overlordService;
Preconditions.checkArgument((this.enabled && this.overlordService != null) || !this.enabled,
"coordinator is enabled to be overlord but overlordService is not specified");
if (this.enabled) {
Preconditions.checkArgument(
this.overlordService != null,
"'overlordService' must be specified when running Coordinator as Overlord."
);
}
}
public boolean isEnabled()

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import org.junit.Assert;
import org.junit.Test;
public class CoordinatorOverlordServiceConfigTest
{
@Test
public void testOverlordServiceIsRequiredIfEnabled()
{
IllegalArgumentException e = Assert.assertThrows(
IllegalArgumentException.class,
() -> new CoordinatorOverlordServiceConfig(true, null)
);
Assert.assertEquals(
"'overlordService' must be specified when running Coordinator as Overlord.",
e.getMessage()
);
}
}

View File

@ -85,9 +85,9 @@ import org.apache.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningSt
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.helpers.OverlordHelper;
import org.apache.druid.indexing.overlord.helpers.TaskLogAutoCleaner;
import org.apache.druid.indexing.overlord.helpers.TaskLogAutoCleanerConfig;
import org.apache.druid.indexing.overlord.duty.OverlordDuty;
import org.apache.druid.indexing.overlord.duty.TaskLogAutoCleaner;
import org.apache.druid.indexing.overlord.duty.TaskLogAutoCleanerConfig;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerResource;
import org.apache.druid.indexing.overlord.http.OverlordRedirectInfo;
@ -198,16 +198,11 @@ public class CliOverlord extends ServerRunnable
binder.bind(TaskCountStatsProvider.class).to(TaskMaster.class);
binder.bind(TaskSlotCountStatsProvider.class).to(TaskMaster.class);
binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class);
binder.bind(
new TypeLiteral<List<TaskLogStreamer>>()
{
}
)
.toProvider(
new ListProvider<TaskLogStreamer>()
.add(TaskLogs.class)
)
binder.bind(TaskLogStreamer.class)
.to(SwitchingTaskLogStreamer.class)
.in(LazySingleton.class);
binder.bind(new TypeLiteral<List<TaskLogStreamer>>() {})
.toProvider(new ListProvider<TaskLogStreamer>().add(TaskLogs.class))
.in(LazySingleton.class);
binder.bind(TaskLogStreamer.class)
@ -389,7 +384,7 @@ public class CliOverlord extends ServerRunnable
private void configureOverlordHelpers(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.indexer.logs.kill", TaskLogAutoCleanerConfig.class);
Multibinder.newSetBinder(binder, OverlordHelper.class)
Multibinder.newSetBinder(binder, OverlordDuty.class)
.addBinding()
.to(TaskLogAutoCleaner.class);
}