- Ability to cancel pending tasks in the ForkingTaskRunner.

- Other various changes from code review.
This commit is contained in:
Gian Merlino 2013-04-10 09:51:51 -07:00
parent fce3b8c20f
commit eaea8ae163
7 changed files with 227 additions and 179 deletions

View File

@ -74,7 +74,6 @@ public class ExecutorServiceTaskRunner implements TaskRunner, QuerySegmentWalker
@LifecycleStop
public void stop()
{
// TODO is this right
exec.shutdownNow();
}

View File

@ -23,13 +23,13 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.CharMatcher;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
@ -40,8 +40,8 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.tasklogs.TaskLogProvider;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.common.tasklogs.TaskLogProvider;
import com.metamx.druid.merger.common.tasklogs.TaskLogPusher;
import com.metamx.druid.merger.coordinator.config.ForkingTaskRunnerConfig;
import com.metamx.druid.merger.worker.executor.ExecutorMain;
@ -56,6 +56,8 @@ import java.io.RandomAccessFile;
import java.nio.channels.Channels;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@ -69,21 +71,24 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
private static final EmittingLogger log = new EmittingLogger(ForkingTaskRunner.class);
private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property.";
private final Object processLock = new Object();
private final ForkingTaskRunnerConfig config;
private final Properties props;
private final TaskLogPusher taskLogPusher;
private final ListeningExecutorService exec;
private final ObjectMapper jsonMapper;
private final List<ProcessHolder> processes = Lists.newArrayList();
private final Map<String, TaskInfo> tasks = Maps.newHashMap();
public ForkingTaskRunner(
ForkingTaskRunnerConfig config,
Properties props,
TaskLogPusher taskLogPusher,
ExecutorService exec,
ObjectMapper jsonMapper
)
{
this.config = config;
this.props = props;
this.taskLogPusher = taskLogPusher;
this.exec = MoreExecutors.listeningDecorator(exec);
this.jsonMapper = jsonMapper;
@ -92,155 +97,184 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
@Override
public ListenableFuture<TaskStatus> run(final Task task)
{
return exec.submit(
new Callable<TaskStatus>()
{
@Override
public TaskStatus call()
{
final String attemptUUID = UUID.randomUUID().toString();
final File taskDir = new File(config.getBaseTaskDir(), task.getId());
final File attemptDir = new File(taskDir, attemptUUID);
synchronized (tasks) {
if (!tasks.containsKey(task.getId())) {
tasks.put(
task.getId(),
new TaskInfo(
exec.submit(
new Callable<TaskStatus>()
{
@Override
public TaskStatus call()
{
final String attemptUUID = UUID.randomUUID().toString();
final File taskDir = new File(config.getBaseTaskDir(), task.getId());
final File attemptDir = new File(taskDir, attemptUUID);
ProcessHolder processHolder = null;
final ProcessHolder processHolder;
try {
if (!attemptDir.mkdirs()) {
throw new IOException(String.format("Could not create directories: %s", attemptDir));
}
try {
if (!attemptDir.mkdirs()) {
throw new IOException(String.format("Could not create directories: %s", attemptDir));
}
final File taskFile = new File(attemptDir, "task.json");
final File statusFile = new File(attemptDir, "status.json");
final File logFile = new File(attemptDir, "log");
final File taskFile = new File(attemptDir, "task.json");
final File statusFile = new File(attemptDir, "status.json");
final File logFile = new File(attemptDir, "log");
// locked so we can safely assign port = findUnusedPort
synchronized (processLock) {
if (getProcessHolder(task.getId()).isPresent()) {
throw new ISE("Task already running: %s", task.getId());
}
// time to adjust process holders
synchronized (tasks) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
final List<String> command = Lists.newArrayList();
final int childPort = findUnusedPort();
final String childHost = String.format(config.getHostPattern(), childPort);
final TaskInfo taskInfo = tasks.get(task.getId());
if (taskInfo == null) {
throw new ISE("WTF?! TaskInfo disappeared for task: %s", task.getId());
}
Iterables.addAll(
command,
ImmutableList.of(
config.getJavaCommand(),
"-cp",
config.getJavaClasspath()
)
);
if (taskInfo.processHolder != null) {
throw new ISE("WTF?! TaskInfo already has a process holder for task: %s", task.getId());
}
Iterables.addAll(
command,
Splitter.on(CharMatcher.WHITESPACE)
.omitEmptyStrings()
.split(config.getJavaOptions())
);
final List<String> command = Lists.newArrayList();
final int childPort = findUnusedPort();
final String childHost = String.format(config.getHostPattern(), childPort);
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
command.add(
String.format(
"-D%s=%s",
propName.substring(CHILD_PROPERTY_PREFIX.length()),
System.getProperty(propName)
)
);
}
}
Iterables.addAll(
command,
ImmutableList.of(
config.getJavaCommand(),
"-cp",
config.getJavaClasspath()
)
);
command.add(String.format("-Ddruid.host=%s", childHost));
command.add(String.format("-Ddruid.port=%d", childPort));
Iterables.addAll(
command,
Splitter.on(CharMatcher.WHITESPACE)
.omitEmptyStrings()
.split(config.getJavaOptions())
);
command.add(config.getMainClass());
command.add(taskFile.toString());
command.add(statusFile.toString());
for (String propName : props.stringPropertyNames()) {
if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
command.add(
String.format(
"-D%s=%s",
propName.substring(CHILD_PROPERTY_PREFIX.length()),
System.getProperty(propName)
)
);
}
}
jsonMapper.writeValue(taskFile, task);
command.add(String.format("-Ddruid.host=%s", childHost));
command.add(String.format("-Ddruid.port=%d", childPort));
log.info("Running command: %s", Joiner.on(" ").join(command));
processHolder = new ProcessHolder(
task,
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
logFile,
childPort
);
command.add(config.getMainClass());
command.add(taskFile.toString());
command.add(statusFile.toString());
processes.add(processHolder);
}
jsonMapper.writeValue(taskFile, task);
log.info("Logging task %s output to: %s", task.getId(), logFile);
log.info("Running command: %s", Joiner.on(" ").join(command));
taskInfo.processHolder = new ProcessHolder(
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
logFile,
childPort
);
final OutputStream toLogfile = Files.newOutputStreamSupplier(logFile).getOutput();
final InputStream fromProc = processHolder.process.getInputStream();
processHolder = taskInfo.processHolder;
}
boolean copyFailed = false;
log.info("Logging task %s output to: %s", task.getId(), logFile);
try {
ByteStreams.copy(fromProc, toLogfile);
} catch (Exception e) {
log.warn(e, "Failed to read from process for task: %s", task.getId());
copyFailed = true;
} finally {
Closeables.closeQuietly(fromProc);
Closeables.closeQuietly(toLogfile);
}
final OutputStream toProc = processHolder.process.getOutputStream();
final InputStream fromProc = processHolder.process.getInputStream();
final OutputStream toLogfile = Files.newOutputStreamSupplier(logFile).getOutput();
final int statusCode = processHolder.process.waitFor();
boolean copyFailed = false;
log.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
try {
ByteStreams.copy(fromProc, toLogfile);
}
catch (Exception e) {
log.warn(e, "Failed to read from process for task: %s", task.getId());
copyFailed = true;
}
finally {
Closeables.closeQuietly(fromProc);
Closeables.closeQuietly(toLogfile);
}
// Upload task logs
// TODO: For very long-lived tasks, upload periodically? Truncated?
// TODO: Store task logs for each attempt separately?
taskLogPusher.pushTaskLog(task.getId(), logFile);
final int statusCode = processHolder.process.waitFor();
if (!copyFailed && statusCode == 0) {
// Process exited successfully
return jsonMapper.readValue(statusFile, TaskStatus.class);
} else {
// Process exited unsuccessfully
return TaskStatus.failure(task.getId());
}
}
catch (InterruptedException e) {
log.info(e, "Interrupted while waiting for process!");
return TaskStatus.failure(task.getId());
}
catch (IOException e) {
throw Throwables.propagate(e);
}
finally {
try {
if (processHolder != null) {
synchronized (processLock) {
processes.remove(processHolder);
}
}
log.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
Closeables.closeQuietly(toProc);
log.info("Removing temporary directory: %s", attemptDir);
FileUtils.deleteDirectory(attemptDir);
}
catch (Exception e) {
log.error(e, "Failed to delete temporary directory");
}
}
}
}
);
// Upload task logs
// XXX: Consider uploading periodically for very long-lived tasks to prevent
// XXX: bottlenecks at the end or the possibility of losing a lot of logs all
// XXX: at once.
taskLogPusher.pushTaskLog(task.getId(), logFile);
if (!copyFailed && statusCode == 0) {
// Process exited successfully
return jsonMapper.readValue(statusFile, TaskStatus.class);
} else {
// Process exited unsuccessfully
return TaskStatus.failure(task.getId());
}
}
catch (InterruptedException e) {
log.info(e, "Interrupted during execution");
return TaskStatus.failure(task.getId());
}
catch (IOException e) {
throw Throwables.propagate(e);
}
finally {
try {
synchronized (tasks) {
final TaskInfo taskInfo = tasks.remove(task.getId());
if (taskInfo != null && taskInfo.processHolder != null) {
taskInfo.processHolder.process.destroy();
}
}
log.info("Removing temporary directory: %s", attemptDir);
FileUtils.deleteDirectory(attemptDir);
}
catch (Exception e) {
log.error(e, "Suppressing exception caught while cleaning up task");
}
}
}
}
)
)
);
}
return tasks.get(task.getId()).statusFuture;
}
}
@LifecycleStop
public void stop()
{
synchronized (processLock) {
synchronized (tasks) {
exec.shutdown();
for (ProcessHolder processHolder : processes) {
log.info("Destroying process: %s", processHolder.process);
processHolder.process.destroy();
for (TaskInfo taskInfo : tasks.values()) {
if (taskInfo.processHolder != null) {
log.info("Destroying process: %s", taskInfo.processHolder.process);
taskInfo.processHolder.process.destroy();
}
}
}
}
@ -248,14 +282,26 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
@Override
public void shutdown(final String taskid)
{
final Optional<ProcessHolder> processHolder = getProcessHolder(taskid);
if (processHolder.isPresent()) {
final int shutdowns = processHolder.get().shutdowns.getAndIncrement();
final TaskInfo taskInfo;
synchronized (tasks) {
taskInfo = tasks.get(taskid);
if (taskInfo == null) {
log.info("Ignoring request to cancel unknown task: %s", taskid);
return;
}
}
taskInfo.statusFuture.cancel(true);
if (taskInfo.processHolder != null) {
final int shutdowns = taskInfo.processHolder.shutdowns.getAndIncrement();
if (shutdowns == 0) {
log.info("Attempting to gracefully shutdown task: %s", taskid);
try {
// This is gross, but it may still be nicer than talking to the forked JVM via HTTP.
final OutputStream out = processHolder.get().process.getOutputStream();
final OutputStream out = taskInfo.processHolder.process.getOutputStream();
out.write(
jsonMapper.writeValueAsBytes(
ImmutableMap.of(
@ -273,7 +319,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
} else {
// Will trigger normal failure mechanisms due to process exit
log.info("Killing process for task: %s", taskid);
processHolder.get().process.destroy();
taskInfo.processHolder.process.destroy();
}
}
}
@ -299,44 +345,51 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskid, final long offset)
{
final Optional<ProcessHolder> processHolder = getProcessHolder(taskid);
final ProcessHolder processHolder;
if (processHolder.isPresent()) {
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>()
{
@Override
public InputStream getInput() throws IOException
{
final RandomAccessFile raf = new RandomAccessFile(processHolder.get().logFile, "r");
final long rafLength = raf.length();
if (offset > 0) {
raf.seek(offset);
} else if (offset < 0 && offset < rafLength) {
raf.seek(rafLength + offset);
}
return Channels.newInputStream(raf.getChannel());
}
}
);
} else {
return Optional.absent();
synchronized (tasks) {
final TaskInfo taskInfo = tasks.get(taskid);
if (taskInfo != null && taskInfo.processHolder != null) {
processHolder = taskInfo.processHolder;
} else {
return Optional.absent();
}
}
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>()
{
@Override
public InputStream getInput() throws IOException
{
final RandomAccessFile raf = new RandomAccessFile(processHolder.logFile, "r");
final long rafLength = raf.length();
if (offset > 0) {
raf.seek(offset);
} else if (offset < 0 && offset < rafLength) {
raf.seek(rafLength + offset);
}
return Channels.newInputStream(raf.getChannel());
}
}
);
}
private int findUnusedPort()
{
synchronized (processLock) {
synchronized (tasks) {
int port = config.getStartPort();
int maxPortSoFar = -1;
for (ProcessHolder processHolder : processes) {
if (processHolder.port > maxPortSoFar) {
maxPortSoFar = processHolder.port;
}
for (TaskInfo taskInfo : tasks.values()) {
if (taskInfo.processHolder != null) {
if (taskInfo.processHolder.port > maxPortSoFar) {
maxPortSoFar = taskInfo.processHolder.port;
}
if (processHolder.port == port) {
port = maxPortSoFar + 1;
if (taskInfo.processHolder.port == port) {
port = maxPortSoFar + 1;
}
}
}
@ -344,34 +397,26 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
}
}
private Optional<ProcessHolder> getProcessHolder(final String taskid)
private static class TaskInfo
{
synchronized (processLock) {
return Iterables.tryFind(
processes, new Predicate<ProcessHolder>()
{
@Override
public boolean apply(ProcessHolder processHolder)
{
return processHolder.task.getId().equals(taskid);
}
}
);
private final ListenableFuture<TaskStatus> statusFuture;
private volatile ProcessHolder processHolder = null;
private TaskInfo(ListenableFuture<TaskStatus> statusFuture)
{
this.statusFuture = statusFuture;
}
}
private static class ProcessHolder
{
private final Task task;
private final Process process;
private final File logFile;
private final int port;
private final AtomicInteger shutdowns = new AtomicInteger(0);
private AtomicInteger shutdowns = new AtomicInteger(0);
private ProcessHolder(Task task, Process process, File logFile, int port)
private ProcessHolder(Process process, File logFile, int port)
{
this.task = task;
this.process = process;
this.logFile = logFile;
this.port = port;

View File

@ -259,7 +259,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
final ZkWorker zkWorker = findWorkerRunningTask(taskId);
if (zkWorker == null) {
// TODO Ability to shut down pending tasks
// Would be nice to have an ability to shut down pending tasks
log.info("Can't shutdown! No worker running task %s", taskId);
return;
}

View File

@ -80,9 +80,10 @@ public class TaskQueue
*/
public void bootstrap()
{
// TODO: Periodically fixup the database to refer to what we think is happening so bootstraps don't resurrect
// TODO: bogus stuff caused by leader races or whatevs. Also so that bogus stuff is detect by clients in a
// TODO: timely manner.
// NOTE: Bootstraps can resurrect bogus stuff caused by leader races or whatevs.
// We may want to periodically fixup the database to refer to what we think is happening, to prevent
// this from occurring and also so that bogus stuff is detected by clients in a timely manner.
giant.lock();

View File

@ -655,6 +655,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
final ExecutorService runnerExec = Executors.newFixedThreadPool(config.getNumLocalThreads());
return new ForkingTaskRunner(
configFactory.build(ForkingTaskRunnerConfig.class),
props,
persistentTaskLogs,
runnerExec,
getJsonMapper()

View File

@ -394,6 +394,7 @@ public class WorkerNode extends RegisteringNode
if (forkingTaskRunner == null) {
forkingTaskRunner = new ForkingTaskRunner(
configFactory.build(ForkingTaskRunnerConfig.class),
props,
persistentTaskLogs,
Executors.newFixedThreadPool(workerConfig.getCapacity()),
getJsonMapper()

View File

@ -69,6 +69,7 @@ public class WorkerResource
taskRunner.shutdown(taskid);
}
catch (Exception e) {
log.error(e, "Failed to issue shutdown for task: %s", taskid);
return Response.serverError().build();
}
return Response.ok(ImmutableMap.of("task", taskid)).build();