druid.indexer.task.restoreTasksOnRestart configuration.

This commit is contained in:
Gian Merlino 2015-12-22 10:31:20 -08:00
parent 7b5fd76058
commit bad270b6c4
10 changed files with 29 additions and 8 deletions

View File

@ -279,6 +279,7 @@ Additional peon configs include:
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|/tmp/druid-indexing| |`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|/tmp/druid-indexing|
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000| |`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0| |`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0|
|`druid.indexer.task.restoreTasksOnRestart`|If true, middleManagers will attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on middleManager restart for restorable tasks to gracefully exit.|PT5M| |`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on middleManager restart for restorable tasks to gracefully exit.|PT5M|
|`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M| |`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M|

View File

@ -52,6 +52,9 @@ public class TaskConfig
@JsonProperty @JsonProperty
private final List<String> defaultHadoopCoordinates; private final List<String> defaultHadoopCoordinates;
@JsonProperty
private final boolean restoreTasksOnRestart;
@JsonProperty @JsonProperty
private final Period gracefulShutdownTimeout; private final Period gracefulShutdownTimeout;
@ -65,6 +68,7 @@ public class TaskConfig
@JsonProperty("hadoopWorkingPath") String hadoopWorkingPath, @JsonProperty("hadoopWorkingPath") String hadoopWorkingPath,
@JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary, @JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary,
@JsonProperty("defaultHadoopCoordinates") List<String> defaultHadoopCoordinates, @JsonProperty("defaultHadoopCoordinates") List<String> defaultHadoopCoordinates,
@JsonProperty("restoreTasksOnRestart") boolean restoreTasksOnRestart,
@JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout, @JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout,
@JsonProperty("directoryLockTimeout") Period directoryLockTimeout @JsonProperty("directoryLockTimeout") Period directoryLockTimeout
) )
@ -76,6 +80,7 @@ public class TaskConfig
this.defaultHadoopCoordinates = defaultHadoopCoordinates == null this.defaultHadoopCoordinates = defaultHadoopCoordinates == null
? DEFAULT_DEFAULT_HADOOP_COORDINATES ? DEFAULT_DEFAULT_HADOOP_COORDINATES
: defaultHadoopCoordinates; : defaultHadoopCoordinates;
this.restoreTasksOnRestart = restoreTasksOnRestart;
this.gracefulShutdownTimeout = gracefulShutdownTimeout == null this.gracefulShutdownTimeout = gracefulShutdownTimeout == null
? DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT ? DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT
: gracefulShutdownTimeout; : gracefulShutdownTimeout;
@ -129,6 +134,12 @@ public class TaskConfig
return defaultHadoopCoordinates; return defaultHadoopCoordinates;
} }
@JsonProperty
public boolean isRestoreTasksOnRestart()
{
return restoreTasksOnRestart;
}
@JsonProperty @JsonProperty
public Period getGracefulShutdownTimeout() public Period getGracefulShutdownTimeout()
{ {

View File

@ -149,7 +149,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
throw new ISE("WTF?! Task[%s] restore file had wrong id[%s].", taskId, task.getId()); throw new ISE("WTF?! Task[%s] restore file had wrong id[%s].", taskId, task.getId());
} }
if (task.canRestore()) { if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
log.info("Restoring task[%s].", task.getId()); log.info("Restoring task[%s].", task.getId());
retVal.add(Pair.of(task, run(task))); retVal.add(Pair.of(task, run(task)));
} }

View File

@ -101,7 +101,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
final long elapsed; final long elapsed;
boolean error = false; boolean error = false;
if (task.canRestore()) { if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
// Attempt graceful shutdown. // Attempt graceful shutdown.
graceful = true; graceful = true;
log.info("Starting graceful shutdown of task[%s].", task.getId()); log.info("Starting graceful shutdown of task[%s].", task.getId());

View File

@ -92,7 +92,7 @@ public class TaskToolboxTest
EasyMock.replay(task, mockHandoffNotifierFactory); EasyMock.replay(task, mockHandoffNotifierFactory);
taskToolbox = new TaskToolboxFactory( taskToolbox = new TaskToolboxFactory(
new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, null, null), new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null),
mockTaskActionClientFactory, mockTaskActionClientFactory,
mockEmitter, mockEmitter,
mockSegmentPusher, mockSegmentPusher,

View File

@ -455,7 +455,7 @@ public class RealtimeIndexTaskTest
private TaskToolbox makeToolbox(final Task task, final IndexerMetadataStorageCoordinator mdc, final File directory) private TaskToolbox makeToolbox(final Task task, final IndexerMetadataStorageCoordinator mdc, final File directory)
{ {
final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, null, null); final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage); final TaskLockbox taskLockbox = new TaskLockbox(taskStorage);
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
taskLockbox, taskLockbox,

View File

@ -204,7 +204,7 @@ public class IngestSegmentFirehoseFactoryTest
EasyMock.replay(notifierFactory); EasyMock.replay(notifierFactory);
final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory(
new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, null, null), new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, false, null, null),
tac, tac,
newMockEmitter(), newMockEmitter(),
new DataSegmentPusher() new DataSegmentPusher()

View File

@ -298,7 +298,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
EasyMock.replay(notifierFactory); EasyMock.replay(notifierFactory);
final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory(
new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, null, null), new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, false, null, null),
new TaskActionClientFactory() new TaskActionClientFactory()
{ {
@Override @Override

View File

@ -451,7 +451,7 @@ public class TaskLifecycleTest
private void setUpAndStartTaskQueue(DataSegmentPusher dataSegmentPusher) private void setUpAndStartTaskQueue(DataSegmentPusher dataSegmentPusher)
{ {
final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, null, null); final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null);
tsqa = new TaskStorageQueryAdapter(ts); tsqa = new TaskStorageQueryAdapter(ts);
tl = new TaskLockbox(ts); tl = new TaskLockbox(ts);
mdc = newMockMDC(); mdc = newMockMDC();

View File

@ -141,7 +141,16 @@ public class WorkerTaskMonitorTest
private WorkerTaskMonitor createTaskMonitor() private WorkerTaskMonitor createTaskMonitor()
{ {
final TaskConfig taskConfig = new TaskConfig(Files.createTempDir().toString(), null, null, 0, null, null, null); final TaskConfig taskConfig = new TaskConfig(
Files.createTempDir().toString(),
null,
null,
0,
null,
false,
null,
null
);
TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class); TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);
TaskActionClient taskActionClient = EasyMock.createNiceMock(TaskActionClient.class); TaskActionClient taskActionClient = EasyMock.createNiceMock(TaskActionClient.class);
EasyMock.expect(taskActionClientFactory.create(EasyMock.<Task>anyObject())).andReturn(taskActionClient).anyTimes(); EasyMock.expect(taskActionClientFactory.create(EasyMock.<Task>anyObject())).andReturn(taskActionClient).anyTimes();