Merger: Tweaks to task shutdown

This commit is contained in:
Gian Merlino 2013-04-15 18:51:01 -07:00
parent d879c9a41e
commit cdbdf843c0
1 changed files with 9 additions and 10 deletions

View File

@ -125,11 +125,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
// time to adjust process holders
synchronized (tasks) {
if (Thread.interrupted()) {
throw new InterruptedException();
final TaskInfo taskInfo = tasks.get(task.getId());
if (taskInfo.shutdown) {
throw new IllegalStateException("Task has been shut down!");
}
final TaskInfo taskInfo = tasks.get(task.getId());
if (taskInfo == null) {
throw new ISE("WTF?! TaskInfo disappeared for task: %s", task.getId());
}
@ -227,11 +228,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
return TaskStatus.failure(task.getId());
}
}
catch (InterruptedException e) {
log.info(e, "Interrupted during execution");
return TaskStatus.failure(task.getId());
}
catch (IOException e) {
catch (Exception e) {
log.info(e, "Exception caught during execution");
throw Throwables.propagate(e);
}
finally {
@ -288,9 +286,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
log.info("Ignoring request to cancel unknown task: %s", taskid);
return;
}
}
taskInfo.statusFuture.cancel(true);
taskInfo.shutdown = true;
}
if (taskInfo.processHolder != null) {
final int shutdowns = taskInfo.processHolder.shutdowns.getAndIncrement();
@ -397,6 +395,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
private static class TaskInfo
{
private final ListenableFuture<TaskStatus> statusFuture;
private volatile boolean shutdown = false;
private volatile ProcessHolder processHolder = null;
private TaskInfo(ListenableFuture<TaskStatus> statusFuture)