ForkingTaskRunner: Use guava Closer to closer stuff

This commit is contained in:
Gian Merlino 2013-07-25 14:37:45 -07:00
parent 952b8ce06b
commit 5d44f0f15b
1 changed files with 102 additions and 104 deletions

View File

@ -32,6 +32,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.google.common.io.Closer;
import com.google.common.io.Files; import com.google.common.io.Files;
import com.google.common.io.InputSupplier; import com.google.common.io.InputSupplier;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
@ -116,137 +117,135 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
final ProcessHolder processHolder; final ProcessHolder processHolder;
try { try {
if (!attemptDir.mkdirs()) { final Closer closer = Closer.create();
throw new IOException(String.format("Could not create directories: %s", attemptDir)); try {
} if (!attemptDir.mkdirs()) {
throw new IOException(String.format("Could not create directories: %s", attemptDir));
final File taskFile = new File(attemptDir, "task.json");
final File statusFile = new File(attemptDir, "status.json");
final File logFile = new File(attemptDir, "log");
// time to adjust process holders
synchronized (tasks) {
final TaskInfo taskInfo = tasks.get(task.getId());
if (taskInfo.shutdown) {
throw new IllegalStateException("Task has been shut down!");
} }
if (taskInfo == null) { final File taskFile = new File(attemptDir, "task.json");
throw new ISE("WTF?! TaskInfo disappeared for task: %s", task.getId()); final File statusFile = new File(attemptDir, "status.json");
} final File logFile = new File(attemptDir, "log");
if (taskInfo.processHolder != null) { // time to adjust process holders
throw new ISE("WTF?! TaskInfo already has a process holder for task: %s", task.getId()); synchronized (tasks) {
} final TaskInfo taskInfo = tasks.get(task.getId());
final List<String> command = Lists.newArrayList(); if (taskInfo.shutdown) {
final int childPort = findUnusedPort(); throw new IllegalStateException("Task has been shut down!");
final String childHost = String.format(config.getHostPattern(), childPort); }
command.add(config.getJavaCommand()); if (taskInfo == null) {
command.add("-cp"); throw new ISE("WTF?! TaskInfo disappeared for task: %s", task.getId());
command.add(config.getJavaClasspath()); }
Iterables.addAll( if (taskInfo.processHolder != null) {
command, throw new ISE("WTF?! TaskInfo already has a process holder for task: %s", task.getId());
Splitter.on(CharMatcher.WHITESPACE) }
.omitEmptyStrings()
.split(config.getJavaOptions())
);
for (String propName : props.stringPropertyNames()) { final List<String> command = Lists.newArrayList();
for (String allowedPrefix : config.getAllowedPrefixes()) { final int childPort = findUnusedPort();
if (propName.startsWith(allowedPrefix)) { final String childHost = String.format(config.getHostPattern(), childPort);
command.add(config.getJavaCommand());
command.add("-cp");
command.add(config.getJavaClasspath());
Iterables.addAll(
command,
Splitter.on(CharMatcher.WHITESPACE)
.omitEmptyStrings()
.split(config.getJavaOptions())
);
for (String propName : props.stringPropertyNames()) {
for (String allowedPrefix : config.getAllowedPrefixes()) {
if (propName.startsWith(allowedPrefix)) {
command.add(
String.format(
"-D%s=%s",
propName,
props.getProperty(propName)
)
);
}
}
}
// Override child JVM specific properties
for (String propName : props.stringPropertyNames()) {
if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
command.add( command.add(
String.format( String.format(
"-D%s=%s", "-D%s=%s",
propName, propName.substring(CHILD_PROPERTY_PREFIX.length()),
props.getProperty(propName) props.getProperty(propName)
) )
); );
} }
} }
}
// Override child JVM specific properties String nodeType = task.getNodeType();
for (String propName : props.stringPropertyNames()) { if (nodeType != null) {
if (propName.startsWith(CHILD_PROPERTY_PREFIX)) { command.add(String.format("-Ddruid.executor.nodeType=%s", nodeType));
command.add(
String.format(
"-D%s=%s",
propName.substring(CHILD_PROPERTY_PREFIX.length()),
props.getProperty(propName)
)
);
} }
command.add(String.format("-Ddruid.host=%s", childHost));
command.add(String.format("-Ddruid.port=%d", childPort));
command.add(config.getMainClass());
command.add(taskFile.toString());
command.add(statusFile.toString());
jsonMapper.writeValue(taskFile, task);
log.info("Running command: %s", Joiner.on(" ").join(command));
taskInfo.processHolder = new ProcessHolder(
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
logFile,
childPort
);
processHolder = taskInfo.processHolder;
processHolder.registerWithCloser(closer);
} }
String nodeType = task.getNodeType(); log.info("Logging task %s output to: %s", task.getId(), logFile);
if (nodeType != null) {
command.add(String.format("-Ddruid.executor.nodeType=%s", nodeType));
}
command.add(String.format("-Ddruid.host=%s", childHost)); final InputStream fromProc = processHolder.process.getInputStream();
command.add(String.format("-Ddruid.port=%d", childPort)); final OutputStream toLogfile = closer.register(
Files.newOutputStreamSupplier(logFile).getOutput()
command.add(config.getMainClass());
command.add(taskFile.toString());
command.add(statusFile.toString());
jsonMapper.writeValue(taskFile, task);
log.info("Running command: %s", Joiner.on(" ").join(command));
taskInfo.processHolder = new ProcessHolder(
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
logFile,
childPort
); );
processHolder = taskInfo.processHolder; boolean runFailed = true;
}
log.info("Logging task %s output to: %s", task.getId(), logFile);
final InputStream fromProc = processHolder.process.getInputStream();
final OutputStream toLogfile = Files.newOutputStreamSupplier(logFile).getOutput();
boolean runFailed = false;
try {
ByteStreams.copy(fromProc, toLogfile); ByteStreams.copy(fromProc, toLogfile);
final int statusCode = processHolder.process.waitFor(); final int statusCode = processHolder.process.waitFor();
log.info("Process exited with status[%d] for task: %s", statusCode, task.getId()); log.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
if (statusCode != 0) { if (statusCode == 0) {
runFailed = true; runFailed = false;
} }
toLogfile.close(); // Upload task logs
}
catch (Exception e) {
log.warn(e, "Failed to log process output for task: %s", task.getId());
runFailed = true;
Closeables.close(toLogfile, true);
}
finally {
Closeables.close(processHolder, true);
}
// Upload task logs // XXX: Consider uploading periodically for very long-lived tasks to prevent
// XXX: bottlenecks at the end or the possibility of losing a lot of logs all
// XXX: at once.
// XXX: Consider uploading periodically for very long-lived tasks to prevent taskLogPusher.pushTaskLog(task.getId(), logFile);
// XXX: bottlenecks at the end or the possibility of losing a lot of logs all
// XXX: at once.
taskLogPusher.pushTaskLog(task.getId(), logFile); if (!runFailed) {
// Process exited successfully
if (!runFailed) { return jsonMapper.readValue(statusFile, TaskStatus.class);
// Process exited successfully } else {
return jsonMapper.readValue(statusFile, TaskStatus.class); // Process exited unsuccessfully
} else { return TaskStatus.failure(task.getId());
// Process exited unsuccessfully }
return TaskStatus.failure(task.getId()); } catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
} }
} }
catch (Exception e) { catch (Exception e) {
@ -403,7 +402,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
} }
} }
private static class ProcessHolder implements Closeable private static class ProcessHolder
{ {
private final Process process; private final Process process;
private final File logFile; private final File logFile;
@ -416,11 +415,10 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
this.port = port; this.port = port;
} }
@Override private void registerWithCloser(Closer closer)
public void close() throws IOException
{ {
process.getInputStream().close(); closer.register(process.getInputStream());
process.getOutputStream().close(); closer.register(process.getOutputStream());
} }
} }
} }