mirror of https://github.com/apache/druid.git
fix forking task runner task shutdown to be more graceful (#8085)
* fix forking task runner shutdown to be more graceful * javadoc
This commit is contained in:
parent
83514958db
commit
cb82d72547
|
@ -527,16 +527,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
|
||||
synchronized (tasks) {
|
||||
for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
|
||||
if (taskWorkItem.processHolder != null) {
|
||||
log.info("Closing output stream to task[%s].", taskWorkItem.getTask().getId());
|
||||
try {
|
||||
taskWorkItem.processHolder.process.getOutputStream().close();
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "Failed to close stdout to task[%s]. Destroying task.", taskWorkItem.getTask().getId());
|
||||
taskWorkItem.processHolder.process.destroy();
|
||||
}
|
||||
}
|
||||
shutdownTaskProcess(taskWorkItem);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -593,12 +584,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
}
|
||||
|
||||
taskInfo.shutdown = true;
|
||||
}
|
||||
|
||||
if (taskInfo.processHolder != null) {
|
||||
// Will trigger normal failure mechanisms due to process exit
|
||||
log.info("Killing process for task: %s", taskid);
|
||||
taskInfo.processHolder.process.destroy();
|
||||
shutdownTaskProcess(taskInfo);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -694,8 +681,10 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
);
|
||||
}
|
||||
|
||||
// Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that
|
||||
// occur while saving.
|
||||
/**
|
||||
* Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that occur
|
||||
* while saving.
|
||||
*/
|
||||
@GuardedBy("tasks")
|
||||
private void saveRunningTasks()
|
||||
{
|
||||
|
@ -714,6 +703,25 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close task output stream (input stream of process) sending EOF telling process to terminate, destroying the process
|
||||
* if an exception is encountered.
|
||||
*/
|
||||
private void shutdownTaskProcess(ForkingTaskRunnerWorkItem taskInfo)
|
||||
{
|
||||
if (taskInfo.processHolder != null) {
|
||||
// Will trigger normal failure mechanisms due to process exit
|
||||
log.info("Closing output stream to task[%s].", taskInfo.getTask().getId());
|
||||
try {
|
||||
taskInfo.processHolder.process.getOutputStream().close();
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "Failed to close stdout to task[%s]. Destroying task.", taskInfo.getTask().getId());
|
||||
taskInfo.processHolder.process.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private File getRestoreFile()
|
||||
{
|
||||
return new File(taskConfig.getBaseTaskDir(), TASK_RESTORE_FILENAME);
|
||||
|
|
Loading…
Reference in New Issue