ForkingTaskRunner: Totally ridiculous graceful shutdown mechanism

This commit is contained in:
Gian Merlino 2013-03-19 17:43:29 -07:00
parent c05629e6a4
commit 76f4d12059
5 changed files with 74 additions and 31 deletions

View File

@ -8,6 +8,7 @@ import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
@ -205,13 +206,33 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
@Override
public void shutdown(final String taskid)
{
// TODO shutdown harder after more shutdowns
final Optional<ProcessHolder> processHolder = getProcessHolder(taskid);
if(processHolder.isPresent()) {
processHolder.get().shutdowns.incrementAndGet();
final int shutdowns = processHolder.get().shutdowns.getAndIncrement();
if (shutdowns == 0) {
log.info("Attempting to gracefully shutdown task: %s", taskid);
try {
// TODO this is the WORST
final OutputStream out = processHolder.get().process.getOutputStream();
out.write(
jsonMapper.writeValueAsBytes(
ImmutableMap.of(
"shutdown",
"now"
)
)
);
out.write('\n');
out.flush();
} catch (IOException e) {
throw Throwables.propagate(e);
}
} else {
log.info("Killing process for task: %s", taskid);
processHolder.get().process.destroy();
}
}
}
@Override
public Collection<TaskRunnerWorkItem> getRunningTasks()

View File

@ -144,7 +144,6 @@ public class IndexerCoordinatorNode extends RegisteringNode
private TaskQueue taskQueue = null;
private TaskLockbox taskLockbox = null;
private CuratorFramework curatorFramework = null;
private ScheduledExecutorFactory scheduledExecutorFactory = null;
private IndexerZkConfig indexerZkConfig;
private TaskRunnerFactory taskRunnerFactory = null;
private ResourceManagementSchedulerFactory resourceManagementSchedulerFactory = null;
@ -226,7 +225,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
public void doInit() throws Exception
{
scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
initializeDB();
final ConfigManagerConfig managerConfig = configFactory.build(ConfigManagerConfig.class);

View File

@ -134,6 +134,21 @@ public class IndexerCoordinatorResource
return Response.ok().entity(segments).build();
}
@POST
@Path("/task/{taskid}/shutdown")
@Produces("application/json")
public Response doShutdown(@PathParam("taskid") String taskid)
{
try {
taskMasterLifecycle.getTaskRunner().shutdown(taskid);
}
catch (Exception e) {
return Response.serverError().build();
}
return Response.ok().build();
}
// Legacy endpoint
// TODO Remove
@Deprecated
@ -203,21 +218,6 @@ public class IndexerCoordinatorResource
return Response.ok().entity(retMap).build();
}
@POST
@Path("/shutdown")
@Produces("application/json")
public Response doShutdown(final String taskId)
{
try {
taskMasterLifecycle.getTaskRunner().shutdown(taskId);
}
catch (Exception e) {
return Response.serverError().build();
}
return Response.ok().build();
}
@GET
@Path("/pendingTasks")
@Produces("application/json")

View File

@ -19,15 +19,21 @@
package com.metamx.druid.merger.worker.executor;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.ISE;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -59,7 +65,15 @@ public class ExecutorMain
System.exit(2);
}
// Spawn monitor thread to keep a watch on our parent process
final Task task = node.getJsonMapper().readValue(new File(args[0]), Task.class);
log.info(
"Running with task: %s",
node.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(task)
);
// Spawn monitor thread to keep a watch on parent's stdin
// If a message comes over stdin, we want to handle it
// If stdin reaches eof, the parent is gone, and we should shut down
final ExecutorService parentMonitorExec = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
@ -74,9 +88,25 @@ public class ExecutorMain
@Override
public void run()
{
int b = -1;
try {
b = System.in.read();
final BufferedReader stdinReader = new BufferedReader(new InputStreamReader(System.in));
String messageString;
while ((messageString = stdinReader.readLine()) != null) {
final Map<String, Object> message = node.getJsonMapper()
.readValue(
messageString,
new TypeReference<Map<String, Object>>(){}
);
if (message == null) {
break;
} else if (message.get("shutdown") != null && message.get("shutdown").equals("now")) {
log.info("Shutting down!");
task.shutdown();
} else {
throw new ISE("Unrecognized message from parent: %s", message);
}
}
} catch (Exception e) {
log.error(e, "Failed to read from stdin");
}
@ -88,13 +118,6 @@ public class ExecutorMain
);
try {
final Task task = node.getJsonMapper().readValue(new File(args[0]), Task.class);
log.info(
"Running with task: %s",
node.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(task)
);
final TaskStatus status = node.run(task).get();
log.info(

View File

@ -243,7 +243,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
lifecycle.stop();
}
public synchronized ListenableFuture<TaskStatus> run(Task task)
public ListenableFuture<TaskStatus> run(Task task)
{
return taskRunner.run(task);
}