diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java index 15664b93141..81c6119b8fa 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java @@ -29,7 +29,7 @@ import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.annotations.Self; -import org.apache.druid.indexing.overlord.helpers.OverlordHelper; +import org.apache.druid.indexing.overlord.duty.OverlordDuty; import org.apache.druid.initialization.DruidModule; import org.apache.druid.msq.indexing.DurableStorageCleaner; import org.apache.druid.msq.indexing.DurableStorageCleanerConfig; @@ -94,7 +94,7 @@ public class MSQDurableStorageModule implements DruidModule DurableStorageCleanerConfig.class ); - Multibinder.newSetBinder(binder, OverlordHelper.class) + Multibinder.newSetBinder(binder, OverlordDuty.class) .addBinding() .to(DurableStorageCleaner.class); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java index 4d7a0760207..f2b5be699b3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java @@ -27,29 +27,25 @@ import org.apache.druid.frame.util.DurableStorageUtils; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; -import org.apache.druid.indexing.overlord.helpers.OverlordHelper; -import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; +import org.apache.druid.indexing.overlord.duty.DutySchedule; +import org.apache.druid.indexing.overlord.duty.OverlordDuty; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.guice.MultiStageQuery; import org.apache.druid.storage.StorageConnector; -import org.joda.time.Duration; -import java.io.IOException; import java.util.HashSet; import java.util.Iterator; import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; /** - * This method polls the durable storage for any stray directories, i.e. the ones that donot have a controller task + * This duty polls the durable storage for any stray directories, i.e. the ones that donot have a controller task * associated with it and cleans them periodically. * This ensures that the tasks which that have exited abruptly or have failed to clean up the durable storage themselves * donot pollute it with worker outputs and temporary files. See {@link DurableStorageCleanerConfig} for the configs. */ -public class DurableStorageCleaner implements OverlordHelper +public class DurableStorageCleaner implements OverlordDuty { - private static final Logger LOG = new Logger(DurableStorageCleaner.class); private final DurableStorageCleanerConfig config; @@ -75,66 +71,53 @@ public class DurableStorageCleaner implements OverlordHelper } @Override - public void schedule(ScheduledExecutorService exec) + public void run() throws Exception { - LOG.info("Starting the DurableStorageCleaner with the config [%s]", config); + Optional taskRunnerOptional = taskMasterProvider.get().getTaskRunner(); + if (!taskRunnerOptional.isPresent()) { + LOG.info("DurableStorageCleaner not running since the node is not the leader"); + return; + } else { + LOG.info("Running DurableStorageCleaner"); + } - ScheduledExecutors.scheduleWithFixedDelay( - exec, - Duration.standardSeconds(config.getDelaySeconds()), - // Added the initial delay explicitly so that we don't have to manually return the signal in the runnable - Duration.standardSeconds(config.getDelaySeconds()), - () -> { - try { - LOG.info("Starting the run of durable storage cleaner"); - Optional taskRunnerOptional = taskMasterProvider.get().getTaskRunner(); - if (!taskRunnerOptional.isPresent()) { - LOG.info("Durable storage cleaner not running since the node is not the leader"); - return; - } - TaskRunner taskRunner = taskRunnerOptional.get(); - Iterator allFiles = storageConnector.listDir(""); - Set runningTaskIds = taskRunner.getRunningTasks() - .stream() - .map(TaskRunnerWorkItem::getTaskId) - .map(DurableStorageUtils::getControllerDirectory) - .collect(Collectors.toSet()); + TaskRunner taskRunner = taskRunnerOptional.get(); + Iterator allFiles = storageConnector.listDir(""); + Set runningTaskIds = taskRunner.getRunningTasks() + .stream() + .map(TaskRunnerWorkItem::getTaskId) + .map(DurableStorageUtils::getControllerDirectory) + .collect(Collectors.toSet()); - Set filesToRemove = new HashSet<>(); - while (allFiles.hasNext()) { - String currentFile = allFiles.next(); - String taskIdFromPathOrEmpty = DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(currentFile); - if (taskIdFromPathOrEmpty != null && !taskIdFromPathOrEmpty.isEmpty()) { - if (runningTaskIds.contains(taskIdFromPathOrEmpty)) { - // do nothing - } else { - filesToRemove.add(currentFile); - } - } - } - if (filesToRemove.isEmpty()) { - LOG.info("DurableStorageCleaner did not find any left over directories to delete"); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Number of files [%d] that do not have a corresponding MSQ task associated with it. These are:\n[%s]\nT", - filesToRemove.size(), - filesToRemove - ); - } else { - LOG.info( - "Number of files [%d] that do not have a corresponding MSQ task associated with it.", - filesToRemove.size() - ); - } - storageConnector.deleteFiles(filesToRemove); - } - } - catch (IOException e) { - throw new RuntimeException("Error while running the scheduled durable storage cleanup helper", e); - } + Set filesToRemove = new HashSet<>(); + while (allFiles.hasNext()) { + String currentFile = allFiles.next(); + String taskIdFromPathOrEmpty = DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(currentFile); + if (taskIdFromPathOrEmpty != null && !taskIdFromPathOrEmpty.isEmpty()) { + if (runningTaskIds.contains(taskIdFromPathOrEmpty)) { + // do nothing + } else { + filesToRemove.add(currentFile); } - ); + } + } + if (filesToRemove.isEmpty()) { + LOG.info("There are no leftover directories to delete."); + } else { + LOG.info( + "Removing [%d] files which are not associated with any MSQ task.", + filesToRemove.size() + ); + LOG.debug("Files to remove:\n[%s]\n", filesToRemove); + storageConnector.deleteFiles(filesToRemove); + } + } + + @Override + public DutySchedule getSchedule() + { + final long delayMillis = config.getDelaySeconds() * 1000; + return new DutySchedule(delayMillis, delayMillis); } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java index e466acb83fc..0f674739b28 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java @@ -26,18 +26,15 @@ import org.apache.druid.frame.util.DurableStorageUtils; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; +import org.apache.druid.indexing.overlord.duty.DutySchedule; import org.apache.druid.storage.StorageConnector; import org.easymock.Capture; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; -import java.io.IOException; import java.util.Collection; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; - public class DurableStorageCleanerTest { @@ -50,40 +47,46 @@ public class DurableStorageCleanerTest private static final String STRAY_DIR = "strayDirectory"; @Test - public void testSchedule() throws IOException, InterruptedException + public void testRun() throws Exception { - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - try { - EasyMock.reset(TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR); - DurableStorageCleanerConfig durableStorageCleanerConfig = new DurableStorageCleanerConfig(); - durableStorageCleanerConfig.delaySeconds = 1L; - durableStorageCleanerConfig.enabled = true; - DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner( - durableStorageCleanerConfig, - STORAGE_CONNECTOR, - () -> TASK_MASTER - ); - EasyMock.expect(STORAGE_CONNECTOR.listDir(EasyMock.anyString())) - .andReturn(ImmutableList.of(DurableStorageUtils.getControllerDirectory(TASK_ID), STRAY_DIR) - .stream() - .iterator()) - .anyTimes(); - EasyMock.expect(TASK_RUNNER_WORK_ITEM.getTaskId()).andReturn(TASK_ID) - .anyTimes(); - EasyMock.expect((Collection) TASK_RUNNER.getRunningTasks()) - .andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM)) - .anyTimes(); - EasyMock.expect(TASK_MASTER.getTaskRunner()).andReturn(Optional.of(TASK_RUNNER)).anyTimes(); - Capture> capturedArguments = EasyMock.newCapture(); - STORAGE_CONNECTOR.deleteFiles(EasyMock.capture(capturedArguments)); - EasyMock.expectLastCall().once(); - EasyMock.replay(TASK_MASTER, TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR); - durableStorageCleaner.schedule(executor); - Thread.sleep(8000L); - Assert.assertEquals(Sets.newHashSet(STRAY_DIR), capturedArguments.getValue()); - } - finally { - executor.shutdownNow(); - } + EasyMock.reset(TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR); + DurableStorageCleanerConfig durableStorageCleanerConfig = new DurableStorageCleanerConfig(); + durableStorageCleanerConfig.delaySeconds = 1L; + durableStorageCleanerConfig.enabled = true; + DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner( + durableStorageCleanerConfig, + STORAGE_CONNECTOR, + () -> TASK_MASTER + ); + EasyMock.expect(STORAGE_CONNECTOR.listDir(EasyMock.anyString())) + .andReturn(ImmutableList.of(DurableStorageUtils.getControllerDirectory(TASK_ID), STRAY_DIR) + .stream() + .iterator()) + .anyTimes(); + EasyMock.expect(TASK_RUNNER_WORK_ITEM.getTaskId()).andReturn(TASK_ID) + .anyTimes(); + EasyMock.expect((Collection) TASK_RUNNER.getRunningTasks()) + .andReturn(ImmutableList.of(TASK_RUNNER_WORK_ITEM)) + .anyTimes(); + EasyMock.expect(TASK_MASTER.getTaskRunner()).andReturn(Optional.of(TASK_RUNNER)).anyTimes(); + Capture> capturedArguments = EasyMock.newCapture(); + STORAGE_CONNECTOR.deleteFiles(EasyMock.capture(capturedArguments)); + EasyMock.expectLastCall().once(); + EasyMock.replay(TASK_MASTER, TASK_RUNNER, TASK_RUNNER_WORK_ITEM, STORAGE_CONNECTOR); + durableStorageCleaner.run(); + Assert.assertEquals(Sets.newHashSet(STRAY_DIR), capturedArguments.getValue()); + } + + @Test + public void testGetSchedule() + { + DurableStorageCleanerConfig cleanerConfig = new DurableStorageCleanerConfig(); + cleanerConfig.delaySeconds = 10L; + cleanerConfig.enabled = true; + DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(cleanerConfig, null, null); + + DutySchedule schedule = durableStorageCleaner.getSchedule(); + Assert.assertEquals(cleanerConfig.delaySeconds * 1000, schedule.getPeriodMillis()); + Assert.assertEquals(cleanerConfig.delaySeconds * 1000, schedule.getInitialDelayMillis()); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java index e7b2a031e4a..c10b4963898 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.EnumUtils; +import org.apache.druid.common.config.Configs; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.logger.Logger; @@ -144,33 +145,35 @@ public class TaskConfig @JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask ) { - this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir; + this.baseDir = Configs.valueOrDefault(baseDir, System.getProperty("java.io.tmpdir")); this.baseTaskDir = new File(defaultDir(baseTaskDir, "persistent/task")); // This is usually on HDFS or similar, so we can't use java.io.tmpdir - this.hadoopWorkingPath = hadoopWorkingPath == null ? "/tmp/druid-indexing" : hadoopWorkingPath; - this.defaultRowFlushBoundary = defaultRowFlushBoundary == null ? 75000 : defaultRowFlushBoundary; - this.defaultHadoopCoordinates = defaultHadoopCoordinates == null - ? DEFAULT_DEFAULT_HADOOP_COORDINATES - : defaultHadoopCoordinates; + this.hadoopWorkingPath = Configs.valueOrDefault(hadoopWorkingPath, "/tmp/druid-indexing"); + this.defaultRowFlushBoundary = Configs.valueOrDefault(defaultRowFlushBoundary, 75000); + this.defaultHadoopCoordinates = Configs.valueOrDefault( + defaultHadoopCoordinates, + DEFAULT_DEFAULT_HADOOP_COORDINATES + ); this.restoreTasksOnRestart = restoreTasksOnRestart; - this.gracefulShutdownTimeout = gracefulShutdownTimeout == null - ? DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT - : gracefulShutdownTimeout; - this.directoryLockTimeout = directoryLockTimeout == null - ? DEFAULT_DIRECTORY_LOCK_TIMEOUT - : directoryLockTimeout; - if (shuffleDataLocations == null) { - this.shuffleDataLocations = Collections.singletonList( - new StorageLocationConfig(new File(defaultDir(null, "intermediary-segments")), null, null) - ); - } else { - this.shuffleDataLocations = shuffleDataLocations; - } + this.gracefulShutdownTimeout = Configs.valueOrDefault( + gracefulShutdownTimeout, + DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT + ); + this.directoryLockTimeout = Configs.valueOrDefault( + directoryLockTimeout, + DEFAULT_DIRECTORY_LOCK_TIMEOUT + ); + this.shuffleDataLocations = Configs.valueOrDefault( + shuffleDataLocations, + Collections.singletonList( + new StorageLocationConfig(new File(defaultDir(null, "intermediary-segments")), null, null) + ) + ); + this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource; - this.batchMemoryMappedIndex = batchMemoryMappedIndex; - this.encapsulatedTask = enableTaskLevelLogPush; + // Conflict resolution. Assume that if batchMemoryMappedIndex is set (since false is the default) that // the user changed it intentionally to use legacy, in this case oveeride batchProcessingMode and also // set it to legacy else just use batchProcessingMode and don't pay attention to batchMemoryMappedIndexMode: @@ -181,13 +184,15 @@ public class TaskConfig } else { // batchProcessingMode input string is invalid, log & use the default. this.batchProcessingMode = BatchProcessingMode.CLOSED_SEGMENTS; // Default - log.warn("Batch processing mode argument value is null or not valid:[%s], defaulting to[%s] ", - batchProcessingMode, this.batchProcessingMode + log.warn( + "Batch processing mode argument value is null or not valid:[%s], defaulting to[%s] ", + batchProcessingMode, this.batchProcessingMode ); } log.debug("Batch processing mode:[%s]", this.batchProcessingMode); - this.storeEmptyColumns = storeEmptyColumns == null ? DEFAULT_STORE_EMPTY_COLUMNS : storeEmptyColumns; - this.tmpStorageBytesPerTask = tmpStorageBytesPerTask == null ? DEFAULT_TMP_STORAGE_BYTES_PER_TASK : tmpStorageBytesPerTask; + + this.storeEmptyColumns = Configs.valueOrDefault(storeEmptyColumns, DEFAULT_STORE_EMPTY_COLUMNS); + this.tmpStorageBytesPerTask = Configs.valueOrDefault(tmpStorageBytesPerTask, DEFAULT_TMP_STORAGE_BYTES_PER_TASK); } private TaskConfig( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index 28c623fdadb..1eab403585f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -34,7 +34,7 @@ import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.config.TaskQueueConfig; -import org.apache.druid.indexing.overlord.helpers.OverlordHelperManager; +import org.apache.druid.indexing.overlord.duty.OverlordDutyExecutor; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -91,7 +91,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro final CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig, final ServiceEmitter emitter, final SupervisorManager supervisorManager, - final OverlordHelperManager overlordHelperManager, + final OverlordDutyExecutor overlordDutyExecutor, @IndexingService final DruidLeaderSelector overlordLeaderSelector, final SegmentAllocationQueue segmentAllocationQueue ) @@ -137,7 +137,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro leaderLifecycle.addManagedInstance(taskRunner); leaderLifecycle.addManagedInstance(taskQueue); leaderLifecycle.addManagedInstance(supervisorManager); - leaderLifecycle.addManagedInstance(overlordHelperManager); + leaderLifecycle.addManagedInstance(overlordDutyExecutor); leaderLifecycle.addHandler( new Lifecycle.Handler() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java index 27f50d4937c..2b03814b3af 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java @@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord.config; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.common.config.Configs; import org.joda.time.Duration; import org.joda.time.Period; @@ -46,7 +47,7 @@ public class TaskQueueConfig @JsonProperty("storageSyncRate") final Period storageSyncRate ) { - this.maxSize = maxSize == null ? Integer.MAX_VALUE : maxSize; + this.maxSize = Configs.valueOrDefault(maxSize, Integer.MAX_VALUE); this.startDelay = defaultDuration(startDelay, "PT1M"); this.restartDelay = defaultDuration(restartDelay, "PT30S"); this.storageSyncRate = defaultDuration(storageSyncRate, "PT1M"); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/DutySchedule.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/DutySchedule.java new file mode 100644 index 00000000000..f9a10f601b2 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/DutySchedule.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.duty; + +/** + * Schedule of an {@link OverlordDuty}. + */ +public class DutySchedule +{ + private final long periodMillis; + private final long initialDelayMillis; + + public DutySchedule(long periodMillis, long initialDelayMillis) + { + this.initialDelayMillis = initialDelayMillis; + this.periodMillis = periodMillis; + } + + public long getInitialDelayMillis() + { + return initialDelayMillis; + } + + public long getPeriodMillis() + { + return periodMillis; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/OverlordHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDuty.java similarity index 80% rename from indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/OverlordHelper.java rename to indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDuty.java index b86e02db390..69f7c1f6feb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/OverlordHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDuty.java @@ -17,14 +17,14 @@ * under the License. */ -package org.apache.druid.indexing.overlord.helpers; - -import java.util.concurrent.ScheduledExecutorService; +package org.apache.druid.indexing.overlord.duty; /** + * Represents a duty scheduled to run periodically on the Overlord. */ -public interface OverlordHelper +public interface OverlordDuty { boolean isEnabled(); - void schedule(ScheduledExecutorService exec); + void run() throws Exception; + DutySchedule getSchedule(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/OverlordHelperManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java similarity index 51% rename from indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/OverlordHelperManager.java rename to indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java index e6e08befe44..9ba4e90f0f6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/OverlordHelperManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java @@ -17,37 +17,41 @@ * under the License. */ -package org.apache.druid.indexing.overlord.helpers; +package org.apache.druid.indexing.overlord.duty; import com.google.inject.Inject; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; +import org.joda.time.Duration; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; /** + * Manages the execution of all overlord duties on a single-threaded executor. */ -public class OverlordHelperManager +public class OverlordDutyExecutor { - private static final Logger log = new Logger(OverlordHelperManager.class); + private static final Logger log = new Logger(OverlordDutyExecutor.class); private final ScheduledExecutorFactory execFactory; - private final Set helpers; + private final Set duties; private volatile ScheduledExecutorService exec; private final Object startStopLock = new Object(); private volatile boolean started = false; @Inject - public OverlordHelperManager( + public OverlordDutyExecutor( ScheduledExecutorFactory scheduledExecutorFactory, - Set helpers) + Set duties + ) { this.execFactory = scheduledExecutorFactory; - this.helpers = helpers; + this.duties = duties; } @LifecycleStart @@ -55,19 +59,15 @@ public class OverlordHelperManager { synchronized (startStopLock) { if (!started) { - log.info("OverlordHelperManager is starting."); - - for (OverlordHelper helper : helpers) { - if (helper.isEnabled()) { - if (exec == null) { - exec = execFactory.create(1, "Overlord-Helper-Manager-Exec--%d"); - } - helper.schedule(exec); + log.info("Starting OverlordDutyExecutor."); + for (OverlordDuty duty : duties) { + if (duty.isEnabled()) { + schedule(duty); } } - started = true; - log.info("OverlordHelperManager is started."); + started = true; + log.info("OverlordDutyExecutor is now running."); } } } @@ -77,15 +77,54 @@ public class OverlordHelperManager { synchronized (startStopLock) { if (started) { - log.info("OverlordHelperManager is stopping."); + log.info("Stopping OverlordDutyExecutor."); if (exec != null) { exec.shutdownNow(); exec = null; } started = false; - log.info("OverlordHelperManager is stopped."); + log.info("OverlordDutyExecutor has been stopped."); } } } + + /** + * Schedules execution of the given overlord duty. + */ + private void schedule(OverlordDuty duty) + { + initExecutor(); + + final DutySchedule schedule = duty.getSchedule(); + final String dutyName = duty.getClass().getName(); + + ScheduledExecutors.scheduleWithFixedDelay( + exec, + Duration.millis(schedule.getInitialDelayMillis()), + Duration.millis(schedule.getPeriodMillis()), + () -> { + try { + duty.run(); + } + catch (Exception e) { + log.error(e, "Error while running duty [%s]", dutyName); + } + } + ); + + log.info( + "Scheduled overlord duty [%s] with initial delay [%d], period [%d].", + dutyName, schedule.getInitialDelayMillis(), schedule.getPeriodMillis() + ); + } + + private void initExecutor() + { + if (exec == null) { + final int numThreads = 1; + exec = execFactory.create(numThreads, "Overlord-Duty-Exec--%d"); + log.info("Initialized duty executor with [%d] threads", numThreads); + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/TaskLogAutoCleaner.java similarity index 60% rename from indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java rename to indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/TaskLogAutoCleaner.java index 4ab54404925..5f8f3c1c166 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/TaskLogAutoCleaner.java @@ -17,20 +17,19 @@ * under the License. */ -package org.apache.druid.indexing.overlord.helpers; +package org.apache.druid.indexing.overlord.duty; import com.google.inject.Inject; import org.apache.druid.indexing.overlord.TaskStorage; -import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.tasklogs.TaskLogKiller; -import org.joda.time.Duration; - -import java.util.concurrent.ScheduledExecutorService; /** + * Periodically cleans up stale task logs from both metadata store and deep storage. + * + * @see TaskLogAutoCleanerConfig */ -public class TaskLogAutoCleaner implements OverlordHelper +public class TaskLogAutoCleaner implements OverlordDuty { private static final Logger log = new Logger(TaskLogAutoCleaner.class); @@ -57,29 +56,16 @@ public class TaskLogAutoCleaner implements OverlordHelper } @Override - public void schedule(ScheduledExecutorService exec) + public void run() throws Exception { - log.info("Scheduling TaskLogAutoCleaner with config [%s].", config.toString()); + long timestamp = System.currentTimeMillis() - config.getDurationToRetain(); + taskLogKiller.killOlderThan(timestamp); + taskStorage.removeTasksOlderThan(timestamp); + } - ScheduledExecutors.scheduleWithFixedDelay( - exec, - Duration.millis(config.getInitialDelay()), - Duration.millis(config.getDelay()), - new Runnable() - { - @Override - public void run() - { - try { - long timestamp = System.currentTimeMillis() - config.getDurationToRetain(); - taskLogKiller.killOlderThan(timestamp); - taskStorage.removeTasksOlderThan(timestamp); - } - catch (Exception ex) { - log.error(ex, "Failed to clean-up the task logs"); - } - } - } - ); + @Override + public DutySchedule getSchedule() + { + return new DutySchedule(config.getDelay(), config.getInitialDelay()); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/TaskLogAutoCleanerConfig.java similarity index 69% rename from indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfig.java rename to indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/TaskLogAutoCleanerConfig.java index e8cda74fb70..71945ad9230 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/TaskLogAutoCleanerConfig.java @@ -17,15 +17,19 @@ * under the License. */ -package org.apache.druid.indexing.overlord.helpers; +package org.apache.druid.indexing.overlord.duty; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.druid.common.config.Configs; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; /** + * Config for periodic cleanup of stale task logs from metadata store and deep + * storage. */ public class TaskLogAutoCleanerConfig { @@ -50,26 +54,25 @@ public class TaskLogAutoCleanerConfig ) { if (enabled) { - Preconditions.checkNotNull(durationToRetain, "durationToRetain must be provided."); + Preconditions.checkNotNull(durationToRetain, "'durationToRetain' must be provided."); } this.enabled = enabled; - if (initialDelay == null) { - this.initialDelay = 60000 + ThreadLocalRandom.current().nextInt(4 * 60000); - } else { - this.initialDelay = initialDelay.longValue(); - } - this.delay = delay == null ? 6 * 60 * 60 * 1000 : delay.longValue(); - this.durationToRetain = durationToRetain == null ? Long.MAX_VALUE : durationToRetain.longValue(); + this.initialDelay = Configs.valueOrDefault( + initialDelay, + 60000 + ThreadLocalRandom.current().nextInt(4 * 60000) + ); + this.delay = Configs.valueOrDefault(delay, TimeUnit.HOURS.toMillis(6)); + this.durationToRetain = Configs.valueOrDefault(durationToRetain, Long.MAX_VALUE); - Preconditions.checkArgument(this.initialDelay > 0, "initialDelay must be > 0."); - Preconditions.checkArgument(this.delay > 0, "delay must be > 0."); - Preconditions.checkArgument(this.durationToRetain > 0, "durationToRetain must be > 0."); + Preconditions.checkArgument(this.initialDelay > 0, "'initialDelay' must be greater than 0."); + Preconditions.checkArgument(this.delay > 0, "'delay' must be greater than 0."); + Preconditions.checkArgument(this.durationToRetain > 0, "'durationToRetain' must be greater than 0."); } public boolean isEnabled() { - return this.enabled; + return enabled; } public long getInitialDelay() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java new file mode 100644 index 00000000000..00169b0a955 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.duty; + +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import java.util.Collections; +import java.util.concurrent.ScheduledExecutorService; + +public class OverlordDutyExecutorTest +{ + + @Test + public void testStartAndStop() + { + OverlordDuty testDuty1 = Mockito.mock(OverlordDuty.class); + Mockito.when(testDuty1.isEnabled()).thenReturn(true); + Mockito.when(testDuty1.getSchedule()).thenReturn(new DutySchedule(0, 0)); + + OverlordDuty testDuty2 = Mockito.mock(OverlordDuty.class); + Mockito.when(testDuty2.isEnabled()).thenReturn(true); + Mockito.when(testDuty2.getSchedule()).thenReturn(new DutySchedule(0, 0)); + + ScheduledExecutorFactory executorFactory = Mockito.mock(ScheduledExecutorFactory.class); + ScheduledExecutorService executorService = Mockito.mock(ScheduledExecutorService.class); + Mockito.when(executorFactory.create(ArgumentMatchers.eq(1), ArgumentMatchers.anyString())) + .thenReturn(executorService); + + OverlordDutyExecutor dutyExecutor = + new OverlordDutyExecutor(executorFactory, ImmutableSet.of(testDuty1, testDuty2)); + + // Invoke start multiple times + dutyExecutor.start(); + dutyExecutor.start(); + dutyExecutor.start(); + + // Verify that executor is initialized and each duty is scheduled + Mockito.verify(executorFactory, Mockito.times(1)) + .create(ArgumentMatchers.eq(1), ArgumentMatchers.anyString()); + Mockito.verify(executorService, Mockito.times(2)).schedule( + ArgumentMatchers.any(Runnable.class), + ArgumentMatchers.anyLong(), + ArgumentMatchers.any() + ); + + // Invoke stop multiple times + dutyExecutor.stop(); + dutyExecutor.stop(); + dutyExecutor.stop(); + + // Verify that the executor shutdown is invoked just once + Mockito.verify(executorService, Mockito.times(1)).shutdownNow(); + } + + @Test + public void testStartWithNoEnabledDuty() + { + OverlordDuty testDuty = Mockito.mock(OverlordDuty.class); + Mockito.when(testDuty.isEnabled()).thenReturn(false); + + ScheduledExecutorFactory executorFactory = Mockito.mock(ScheduledExecutorFactory.class); + OverlordDutyExecutor dutyExecutor = + new OverlordDutyExecutor(executorFactory, Collections.singleton(testDuty)); + + dutyExecutor.start(); + + // Verify that executor is not initialized as there is no enabled duty + Mockito.verify(executorFactory, Mockito.never()) + .create(ArgumentMatchers.eq(1), ArgumentMatchers.anyString()); + } + +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/TaskLogAutoCleanerConfigTest.java similarity index 97% rename from indexing-service/src/test/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfigTest.java rename to indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/TaskLogAutoCleanerConfigTest.java index ecbb003618b..1c78a98851e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/TaskLogAutoCleanerConfigTest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.indexing.overlord.helpers; +package org.apache.druid.indexing.overlord.duty; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.segment.TestHelper; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/TaskLogAutoCleanerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/TaskLogAutoCleanerTest.java new file mode 100644 index 00000000000..edd9df957f1 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/TaskLogAutoCleanerTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.duty; + +import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.tasklogs.TaskLogKiller; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import java.util.concurrent.TimeUnit; + +public class TaskLogAutoCleanerTest +{ + @Test + public void testRun() throws Exception + { + // Setup config and mocks + final long durationToRetain = TimeUnit.HOURS.toMillis(2); + TaskLogAutoCleanerConfig cleanerConfig = + new TaskLogAutoCleanerConfig(true, 2000L, 1000L, durationToRetain); + + TaskStorage taskStorage = Mockito.mock(TaskStorage.class); + TaskLogKiller taskLogKiller = Mockito.mock(TaskLogKiller.class); + + TaskLogAutoCleaner taskLogAutoCleaner = + new TaskLogAutoCleaner(taskLogKiller, cleanerConfig, taskStorage); + + long expectedExpiryTime = System.currentTimeMillis() - durationToRetain; + taskLogAutoCleaner.run(); + + // Verify that kill on TaskStorage and TaskLogKiller is invoked with the correct timestamp + Mockito.verify(taskStorage).removeTasksOlderThan( + ArgumentMatchers.longThat(observedExpiryTime -> observedExpiryTime >= expectedExpiryTime) + ); + Mockito.verify(taskLogKiller).killOlderThan( + ArgumentMatchers.longThat(observedExpiryTime -> observedExpiryTime >= expectedExpiryTime) + ); + } + + @Test + public void testGetSchedule() + { + TaskLogAutoCleanerConfig cleanerConfig = new TaskLogAutoCleanerConfig(true, 2000L, 1000L, 60_000L); + TaskLogAutoCleaner taskLogAutoCleaner = new TaskLogAutoCleaner(null, cleanerConfig, null); + Assert.assertTrue(taskLogAutoCleaner.isEnabled()); + + final DutySchedule schedule = taskLogAutoCleaner.getSchedule(); + Assert.assertEquals(cleanerConfig.getDelay(), schedule.getPeriodMillis()); + Assert.assertEquals(cleanerConfig.getInitialDelay(), schedule.getInitialDelayMillis()); + } + + @Test + public void testIsEnabled() + { + TaskLogAutoCleanerConfig cleanerConfig = new TaskLogAutoCleanerConfig(false, 2000L, 1000L, 60_000L); + TaskLogAutoCleaner taskLogAutoCleaner = new TaskLogAutoCleaner(null, cleanerConfig, null); + Assert.assertFalse(taskLogAutoCleaner.isEnabled()); + } + +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index c33917de571..9738f8b9333 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -61,7 +61,7 @@ import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.config.TaskQueueConfig; -import org.apache.druid.indexing.overlord.helpers.OverlordHelperManager; +import org.apache.druid.indexing.overlord.duty.OverlordDutyExecutor; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator; import org.apache.druid.java.util.common.Intervals; @@ -226,7 +226,7 @@ public class OverlordTest new CoordinatorOverlordServiceConfig(null, null), serviceEmitter, supervisorManager, - EasyMock.createNiceMock(OverlordHelperManager.class), + EasyMock.createNiceMock(OverlordDutyExecutor.class), new TestDruidLeaderSelector(), EasyMock.createNiceMock(SegmentAllocationQueue.class) ); diff --git a/processing/src/main/java/org/apache/druid/common/config/Configs.java b/processing/src/main/java/org/apache/druid/common/config/Configs.java new file mode 100644 index 00000000000..433a2548a4b --- /dev/null +++ b/processing/src/main/java/org/apache/druid/common/config/Configs.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.common.config; + +/** + * Utility class for common config operations. + */ +public class Configs +{ + /** + * Returns the given {@code value} if it is not null, otherwise returns the + * {@code defaultValue}. + */ + public static long valueOrDefault(Long value, long defaultValue) + { + return value == null ? defaultValue : value; + } + + /** + * Returns the given {@code value} if it is not null, otherwise returns the + * {@code defaultValue}. + */ + public static int valueOrDefault(Integer value, int defaultValue) + { + return value == null ? defaultValue : value; + } + + /** + * Returns the given {@code value} if it is not null, otherwise returns the + * {@code defaultValue}. + */ + public static boolean valueOrDefault(Boolean value, boolean defaultValue) + { + return value == null ? defaultValue : value; + } + + /** + * Returns the given {@code value} if it is not null, otherwise returns the + * {@code defaultValue}. + */ + public static T valueOrDefault(T value, T defaultValue) + { + return value == null ? defaultValue : value; + } + +} diff --git a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogKiller.java b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogKiller.java index b01e783a6d2..d2a3d0e92fd 100644 --- a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogKiller.java +++ b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogKiller.java @@ -24,6 +24,7 @@ import org.apache.druid.guice.annotations.ExtensionPoint; import java.io.IOException; /** + * Cleans up stale task logs from deep storage. */ @ExtensionPoint public interface TaskLogKiller diff --git a/processing/src/test/java/org/apache/druid/common/config/ConfigsTest.java b/processing/src/test/java/org/apache/druid/common/config/ConfigsTest.java new file mode 100644 index 00000000000..edfc9a25164 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/common/config/ConfigsTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.common.config; + +import org.junit.Assert; +import org.junit.Test; + +public class ConfigsTest +{ + @Test + public void testValueOrDefault() + { + Assert.assertEquals(10, Configs.valueOrDefault((Integer) 10, 11)); + Assert.assertEquals(11, Configs.valueOrDefault((Integer) null, 11)); + + Assert.assertEquals(10, Configs.valueOrDefault((Long) 10L, 11L)); + Assert.assertEquals(11, Configs.valueOrDefault(null, 11L)); + + Assert.assertFalse(Configs.valueOrDefault((Boolean) false, true)); + Assert.assertTrue(Configs.valueOrDefault(null, true)); + + Assert.assertEquals("abc", Configs.valueOrDefault("abc", "def")); + Assert.assertEquals("def", Configs.valueOrDefault(null, "def")); + } + +} diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index 436af7ad694..ef083fd36e0 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -793,7 +793,7 @@ public abstract class SQLMetadataStorageActionHandler) handle -> { + handle -> { handle.createStatement(getSqlRemoveLogsOlderThan()) .bind("date_time", dateTime.toString()) .execute(); @@ -883,9 +883,10 @@ public abstract class SQLMetadataStorageActionHandler taskIdentifiers = new ArrayList<>(); connector.retryWithHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) - { - String sql = StringUtils.format( - "SELECT * FROM %1$s WHERE id > '%2$s' AND type IS null ORDER BY id %3$s", - tableName, - id, - connector.limitClause(limit) - ); - Query> query = handle.createQuery(sql); - taskIdentifiers.addAll(query.map(taskIdentifierMapper).list()); - return null; - } + handle -> { + String sql = StringUtils.format( + "SELECT * FROM %1$s WHERE id > '%2$s' AND type IS null ORDER BY id %3$s", + tableName, + id, + connector.limitClause(limit) + ); + Query> query = handle.createQuery(sql); + taskIdentifiers.addAll(query.map(taskIdentifierMapper).list()); + return null; } ); return taskIdentifiers; @@ -993,20 +989,16 @@ public abstract class SQLMetadataStorageActionHandler taskIdentifiers) { connector.retryWithHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) - { - Batch batch = handle.createBatch(); - String sql = "UPDATE %1$s SET type = '%2$s', group_id = '%3$s' WHERE id = '%4$s'"; - for (TaskIdentifier metadata : taskIdentifiers) { - batch.add(StringUtils.format(sql, tasksTable, metadata.getType(), metadata.getGroupId(), metadata.getId()) - ); - } - batch.execute(); - return null; + handle -> { + Batch batch = handle.createBatch(); + String sql = "UPDATE %1$s SET type = '%2$s', group_id = '%3$s' WHERE id = '%4$s'"; + for (TaskIdentifier metadata : taskIdentifiers) { + batch.add( + StringUtils.format(sql, tasksTable, metadata.getType(), metadata.getGroupId(), metadata.getId()) + ); } + batch.execute(); + return null; } ); } @@ -1015,9 +1007,7 @@ public abstract class SQLMetadataStorageActionHandler populateTaskTypeAndGroupId() - ); + taskMigrationCompleteFuture = executorService.submit(this::populateTaskTypeAndGroupId); } /** diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorOverlordServiceConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorOverlordServiceConfig.java index d7e0f534c06..3f9df32c61d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorOverlordServiceConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorOverlordServiceConfig.java @@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.druid.common.config.Configs; /** */ @@ -37,11 +38,15 @@ public class CoordinatorOverlordServiceConfig @JsonProperty("overlordService") String overlordService ) { - this.enabled = enabled == null ? false : enabled.booleanValue(); + this.enabled = Configs.valueOrDefault(enabled, false); this.overlordService = overlordService; - Preconditions.checkArgument((this.enabled && this.overlordService != null) || !this.enabled, - "coordinator is enabled to be overlord but overlordService is not specified"); + if (this.enabled) { + Preconditions.checkArgument( + this.overlordService != null, + "'overlordService' must be specified when running Coordinator as Overlord." + ); + } } public boolean isEnabled() diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorOverlordServiceConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorOverlordServiceConfigTest.java new file mode 100644 index 00000000000..57013de6408 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorOverlordServiceConfigTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import org.junit.Assert; +import org.junit.Test; + +public class CoordinatorOverlordServiceConfigTest +{ + @Test + public void testOverlordServiceIsRequiredIfEnabled() + { + IllegalArgumentException e = Assert.assertThrows( + IllegalArgumentException.class, + () -> new CoordinatorOverlordServiceConfig(true, null) + ); + Assert.assertEquals( + "'overlordService' must be specified when running Coordinator as Overlord.", + e.getMessage() + ); + } +} diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 7451855b217..b67d6592e2e 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -85,9 +85,9 @@ import org.apache.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningSt import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.config.TaskQueueConfig; -import org.apache.druid.indexing.overlord.helpers.OverlordHelper; -import org.apache.druid.indexing.overlord.helpers.TaskLogAutoCleaner; -import org.apache.druid.indexing.overlord.helpers.TaskLogAutoCleanerConfig; +import org.apache.druid.indexing.overlord.duty.OverlordDuty; +import org.apache.druid.indexing.overlord.duty.TaskLogAutoCleaner; +import org.apache.druid.indexing.overlord.duty.TaskLogAutoCleanerConfig; import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory; import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerResource; import org.apache.druid.indexing.overlord.http.OverlordRedirectInfo; @@ -198,16 +198,11 @@ public class CliOverlord extends ServerRunnable binder.bind(TaskCountStatsProvider.class).to(TaskMaster.class); binder.bind(TaskSlotCountStatsProvider.class).to(TaskMaster.class); - binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class); - binder.bind( - new TypeLiteral>() - { - } - ) - .toProvider( - new ListProvider() - .add(TaskLogs.class) - ) + binder.bind(TaskLogStreamer.class) + .to(SwitchingTaskLogStreamer.class) + .in(LazySingleton.class); + binder.bind(new TypeLiteral>() {}) + .toProvider(new ListProvider().add(TaskLogs.class)) .in(LazySingleton.class); binder.bind(TaskLogStreamer.class) @@ -389,7 +384,7 @@ public class CliOverlord extends ServerRunnable private void configureOverlordHelpers(Binder binder) { JsonConfigProvider.bind(binder, "druid.indexer.logs.kill", TaskLogAutoCleanerConfig.class); - Multibinder.newSetBinder(binder, OverlordHelper.class) + Multibinder.newSetBinder(binder, OverlordDuty.class) .addBinding() .to(TaskLogAutoCleaner.class); }