mirror of https://github.com/apache/druid.git
ForkingTaskRunner: Slight tweaks to setup and teardown
This commit is contained in:
parent
a24274029f
commit
c21d7f6ee2
|
@ -142,14 +142,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
final int childPort = findUnusedPort();
|
final int childPort = findUnusedPort();
|
||||||
final String childHost = String.format(config.getHostPattern(), childPort);
|
final String childHost = String.format(config.getHostPattern(), childPort);
|
||||||
|
|
||||||
Iterables.addAll(
|
command.add(config.getJavaCommand());
|
||||||
command,
|
command.add("-cp");
|
||||||
ImmutableList.of(
|
command.add(config.getJavaClasspath());
|
||||||
config.getJavaCommand(),
|
|
||||||
"-cp",
|
|
||||||
config.getJavaClasspath()
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
Iterables.addAll(
|
Iterables.addAll(
|
||||||
command,
|
command,
|
||||||
|
@ -195,25 +190,27 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
final InputStream fromProc = processHolder.process.getInputStream();
|
final InputStream fromProc = processHolder.process.getInputStream();
|
||||||
final OutputStream toLogfile = Files.newOutputStreamSupplier(logFile).getOutput();
|
final OutputStream toLogfile = Files.newOutputStreamSupplier(logFile).getOutput();
|
||||||
|
|
||||||
boolean copyFailed = false;
|
boolean runFailed = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
ByteStreams.copy(fromProc, toLogfile);
|
ByteStreams.copy(fromProc, toLogfile);
|
||||||
|
final int statusCode = processHolder.process.waitFor();
|
||||||
|
log.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
|
||||||
|
|
||||||
|
if (statusCode != 0) {
|
||||||
|
runFailed = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
log.warn(e, "Failed to read from process for task: %s", task.getId());
|
log.warn(e, "Failed to read from process for task: %s", task.getId());
|
||||||
copyFailed = true;
|
runFailed = true;
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
Closeables.closeQuietly(fromProc);
|
Closeables.closeQuietly(fromProc);
|
||||||
Closeables.closeQuietly(toLogfile);
|
Closeables.closeQuietly(toLogfile);
|
||||||
|
Closeables.closeQuietly(toProc);
|
||||||
}
|
}
|
||||||
|
|
||||||
final int statusCode = processHolder.process.waitFor();
|
|
||||||
|
|
||||||
log.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
|
|
||||||
Closeables.closeQuietly(toProc);
|
|
||||||
|
|
||||||
// Upload task logs
|
// Upload task logs
|
||||||
|
|
||||||
// XXX: Consider uploading periodically for very long-lived tasks to prevent
|
// XXX: Consider uploading periodically for very long-lived tasks to prevent
|
||||||
|
@ -222,7 +219,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
|
|
||||||
taskLogPusher.pushTaskLog(task.getId(), logFile);
|
taskLogPusher.pushTaskLog(task.getId(), logFile);
|
||||||
|
|
||||||
if (!copyFailed && statusCode == 0) {
|
if (!runFailed) {
|
||||||
// Process exited successfully
|
// Process exited successfully
|
||||||
return jsonMapper.readValue(statusFile, TaskStatus.class);
|
return jsonMapper.readValue(statusFile, TaskStatus.class);
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue