diff --git a/docs/content/configuration/indexing-service.md b/docs/content/configuration/indexing-service.md index d6747d62b46..e69f7f3d643 100644 --- a/docs/content/configuration/indexing-service.md +++ b/docs/content/configuration/indexing-service.md @@ -279,6 +279,7 @@ Additional peon configs include: |`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|/tmp/druid-indexing| |`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000| |`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0| +|`druid.indexer.task.restoreTasksOnRestart`|If true, middleManagers will attempt to stop tasks gracefully on shutdown and restore them on restart.|false| |`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on middleManager restart for restorable tasks to gracefully exit.|PT5M| |`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M| diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java index 06f7c00d68c..fefc6903922 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java @@ -52,6 +52,9 @@ public class TaskConfig @JsonProperty private final List defaultHadoopCoordinates; + @JsonProperty + private final boolean restoreTasksOnRestart; + @JsonProperty private final Period gracefulShutdownTimeout; @@ -65,6 +68,7 @@ public class TaskConfig @JsonProperty("hadoopWorkingPath") String hadoopWorkingPath, @JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary, @JsonProperty("defaultHadoopCoordinates") List defaultHadoopCoordinates, + @JsonProperty("restoreTasksOnRestart") boolean restoreTasksOnRestart, @JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout, @JsonProperty("directoryLockTimeout") Period directoryLockTimeout ) @@ -76,6 +80,7 @@ public class TaskConfig this.defaultHadoopCoordinates = defaultHadoopCoordinates == null ? DEFAULT_DEFAULT_HADOOP_COORDINATES : defaultHadoopCoordinates; + this.restoreTasksOnRestart = restoreTasksOnRestart; this.gracefulShutdownTimeout = gracefulShutdownTimeout == null ? DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT : gracefulShutdownTimeout; @@ -129,6 +134,12 @@ public class TaskConfig return defaultHadoopCoordinates; } + @JsonProperty + public boolean isRestoreTasksOnRestart() + { + return restoreTasksOnRestart; + } + @JsonProperty public Period getGracefulShutdownTimeout() { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index 6a2042bb291..83afb91b029 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -149,7 +149,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer throw new ISE("WTF?! Task[%s] restore file had wrong id[%s].", taskId, task.getId()); } - if (task.canRestore()) { + if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) { log.info("Restoring task[%s].", task.getId()); retVal.add(Pair.of(task, run(task))); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index 92a5843217b..97d3961cef9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -101,7 +101,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker final long elapsed; boolean error = false; - if (task.canRestore()) { + if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) { // Attempt graceful shutdown. graceful = true; log.info("Starting graceful shutdown of task[%s].", task.getId()); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java index 75e16371378..5a6da202126 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java @@ -92,7 +92,7 @@ public class TaskToolboxTest EasyMock.replay(task, mockHandoffNotifierFactory); taskToolbox = new TaskToolboxFactory( - new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, null, null), + new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null), mockTaskActionClientFactory, mockEmitter, mockSegmentPusher, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 71ab5107d46..f1b54df3edb 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -455,7 +455,7 @@ public class RealtimeIndexTaskTest private TaskToolbox makeToolbox(final Task task, final IndexerMetadataStorageCoordinator mdc, final File directory) { final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); - final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, null, null); + final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null); final TaskLockbox taskLockbox = new TaskLockbox(taskStorage); final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 53f409af36a..0210067b170 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -204,7 +204,7 @@ public class IngestSegmentFirehoseFactoryTest EasyMock.replay(notifierFactory); final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( - new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, null, null), + new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, false, null, null), tac, newMockEmitter(), new DataSegmentPusher() diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 23c391719cf..7b28bd90a54 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -298,7 +298,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); EasyMock.replay(notifierFactory); final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( - new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, null, null), + new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, false, null, null), new TaskActionClientFactory() { @Override diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 26c3d168c7e..fbed169867b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -451,7 +451,7 @@ public class TaskLifecycleTest private void setUpAndStartTaskQueue(DataSegmentPusher dataSegmentPusher) { - final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, null, null); + final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null); tsqa = new TaskStorageQueryAdapter(ts); tl = new TaskLockbox(ts); mdc = newMockMDC(); diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index aa29337f4e4..1b1ccbbf71b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -141,7 +141,16 @@ public class WorkerTaskMonitorTest private WorkerTaskMonitor createTaskMonitor() { - final TaskConfig taskConfig = new TaskConfig(Files.createTempDir().toString(), null, null, 0, null, null, null); + final TaskConfig taskConfig = new TaskConfig( + Files.createTempDir().toString(), + null, + null, + 0, + null, + false, + null, + null + ); TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class); TaskActionClient taskActionClient = EasyMock.createNiceMock(TaskActionClient.class); EasyMock.expect(taskActionClientFactory.create(EasyMock.anyObject())).andReturn(taskActionClient).anyTimes();