YARN-4913. Yarn logs should take a -out option to write to a directory. Contributed by Xuan Gong.

(cherry picked from commit ef1757790d)
This commit is contained in:
Varun Vasudev 2016-05-18 22:44:33 +05:30
parent 42f64b3bcc
commit 3c9392a31d
3 changed files with 431 additions and 218 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.client.cli; package org.apache.hadoop.yarn.client.cli;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream;
import java.io.StringReader; import java.io.StringReader;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -81,15 +82,13 @@ public class LogsCLI extends Configured implements Tool {
private static final String CONTAINER_LOG_FILES = "logFiles"; private static final String CONTAINER_LOG_FILES = "logFiles";
private static final String SHOW_META_INFO = "show_meta_info"; private static final String SHOW_META_INFO = "show_meta_info";
private static final String LIST_NODES_OPTION = "list_nodes"; private static final String LIST_NODES_OPTION = "list_nodes";
private static final String OUT_OPTION = "out";
public static final String HELP_CMD = "help"; public static final String HELP_CMD = "help";
@Override @Override
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
Options opts = createCommandOpts(); Options opts = createCommandOpts();
Options printOpts = createPrintOpts(opts); Options printOpts = createPrintOpts(opts);
if (args.length < 1) { if (args.length < 1) {
printHelpMessage(printOpts); printHelpMessage(printOpts);
return -1; return -1;
@ -108,6 +107,7 @@ public int run(String[] args) throws Exception {
boolean nodesList = false; boolean nodesList = false;
String[] logFiles = null; String[] logFiles = null;
List<String> amContainersList = new ArrayList<String>(); List<String> amContainersList = new ArrayList<String>();
String localDir = null;
try { try {
CommandLine commandLine = parser.parse(opts, args, true); CommandLine commandLine = parser.parse(opts, args, true);
appIdStr = commandLine.getOptionValue(APPLICATION_ID_OPTION); appIdStr = commandLine.getOptionValue(APPLICATION_ID_OPTION);
@ -117,6 +117,7 @@ public int run(String[] args) throws Exception {
getAMContainerLogs = commandLine.hasOption(AM_CONTAINER_OPTION); getAMContainerLogs = commandLine.hasOption(AM_CONTAINER_OPTION);
showMetaInfo = commandLine.hasOption(SHOW_META_INFO); showMetaInfo = commandLine.hasOption(SHOW_META_INFO);
nodesList = commandLine.hasOption(LIST_NODES_OPTION); nodesList = commandLine.hasOption(LIST_NODES_OPTION);
localDir = commandLine.getOptionValue(OUT_OPTION);
if (getAMContainerLogs) { if (getAMContainerLogs) {
try { try {
amContainersList = parseAMContainer(commandLine, printOpts); amContainersList = parseAMContainer(commandLine, printOpts);
@ -151,7 +152,6 @@ public int run(String[] args) throws Exception {
LogCLIHelpers logCliHelper = new LogCLIHelpers(); LogCLIHelpers logCliHelper = new LogCLIHelpers();
logCliHelper.setConf(getConf()); logCliHelper.setConf(getConf());
boolean appStateObtainedSuccessfully = true;
YarnApplicationState appState = YarnApplicationState.NEW; YarnApplicationState appState = YarnApplicationState.NEW;
ApplicationReport appReport = null; ApplicationReport appReport = null;
try { try {
@ -164,7 +164,9 @@ public int run(String[] args) throws Exception {
return -1; return -1;
} }
} catch (IOException | YarnException e) { } catch (IOException | YarnException e) {
appStateObtainedSuccessfully = false; // If we can not get appReport from either RM or ATS
// We will assume that this app has already finished.
appState = YarnApplicationState.FINISHED;
System.err.println("Unable to get ApplicationState." System.err.println("Unable to get ApplicationState."
+ " Attempting to fetch logs directly from the filesystem."); + " Attempting to fetch logs directly from the filesystem.");
} }
@ -179,19 +181,22 @@ public int run(String[] args) throws Exception {
} }
} }
ContainerLogsRequest request = new ContainerLogsRequest(appId,
isApplicationFinished(appState), appOwner,
nodeAddress, null, containerIdStr);
if (showMetaInfo) { if (showMetaInfo) {
return showMetaInfo(appState, appStateObtainedSuccessfully, return showMetaInfo(request, logCliHelper);
logCliHelper, appId, containerIdStr, nodeAddress, appOwner);
} }
if (nodesList) { if (nodesList) {
return showNodeLists(appState, appStateObtainedSuccessfully, return showNodeLists(request, logCliHelper);
logCliHelper, appId, appOwner);
} }
// To get am logs // To get am logs
if (getAMContainerLogs) { if (getAMContainerLogs) {
return fetchAMContainerLogs(logFiles, appState, appId, appOwner, return fetchAMContainerLogs(request, amContainersList,
amContainersList, logCliHelper); logFiles, logCliHelper, localDir);
} }
int resultCode = 0; int resultCode = 0;
@ -203,16 +208,12 @@ public int run(String[] args) throws Exception {
+ " does not have the container:" + containerId); + " does not have the container:" + containerId);
return -1; return -1;
} }
return fetchContainerLogs(appState, appStateObtainedSuccessfully, return fetchContainerLogs(request, logFiles,
logFiles, appOwner, nodeAddress, containerId, logCliHelper); logCliHelper, localDir);
} else { } else {
if (nodeAddress == null) { if (nodeAddress == null) {
resultCode = resultCode = fetchApplicationLogs(appId, appOwner,
logCliHelper.dumpAllContainersLogs(appId, appOwner, System.out); logCliHelper, localDir);
if (resultCode == -1) {
System.err.println("Can not find the logs for the application: "
+ appId + " with the appOwner: " + appOwner);
}
} else { } else {
System.err.println("Should at least provide ContainerId!"); System.err.println("Should at least provide ContainerId!");
printHelpMessage(printOpts); printHelpMessage(printOpts);
@ -351,13 +352,17 @@ private String[] getContainerLogFiles(Configuration conf,
} }
private void printContainerLogsFromRunningApplication(Configuration conf, private void printContainerLogsFromRunningApplication(Configuration conf,
ContainerId containerId, String nodeHttpAddress, ContainerLogsRequest request, String[] logFiles,
String nodeId, String[] logFiles, LogCLIHelpers logCliHelper, LogCLIHelpers logCliHelper, String localDir) throws IOException {
String appOwner) throws IOException { String appId = request.getAppId().toString();
String appId = containerId.getApplicationAttemptId() String containerIdStr = request.getContainerId().toString();
.getApplicationId().toString();
String containerIdStr = containerId.toString();
String[] requestedLogFiles = logFiles; String[] requestedLogFiles = logFiles;
String nodeHttpAddress = request.getNodeHttpAddress();
String nodeId = request.getNodeId();
String appOwner = request.getAppOwner();
PrintStream out = logCliHelper.createPrintStream(localDir, nodeId,
containerIdStr);
try {
// fetch all the log files for the container // fetch all the log files for the container
if (fetchAllLogFiles(logFiles)) { if (fetchAllLogFiles(logFiles)) {
requestedLogFiles = requestedLogFiles =
@ -365,14 +370,14 @@ private void printContainerLogsFromRunningApplication(Configuration conf,
} }
Client webServiceClient = Client.create(); Client webServiceClient = Client.create();
String containerString = "\n\nContainer: " + containerIdStr; String containerString = "\n\nContainer: " + containerIdStr;
System.out.println(containerString); out.println(containerString);
System.out.println(StringUtils.repeat("=", containerString.length())); out.println(StringUtils.repeat("=", containerString.length()));
for (String logFile : requestedLogFiles) { for (String logFile : requestedLogFiles) {
System.out.println("LogType:" + logFile); out.println("LogType:" + logFile);
System.out.println("Log Upload Time:" out.println("Log Upload Time:"
+ Times.format(System.currentTimeMillis())); + Times.format(System.currentTimeMillis()));
System.out.println("Log Contents:"); out.println("Log Contents:");
try { try {
WebResource webResource = WebResource webResource =
webServiceClient.resource(WebAppUtils.getHttpSchemePrefix(conf) webServiceClient.resource(WebAppUtils.getHttpSchemePrefix(conf)
@ -381,39 +386,40 @@ private void printContainerLogsFromRunningApplication(Configuration conf,
webResource.path("ws").path("v1").path("node") webResource.path("ws").path("v1").path("node")
.path("containerlogs").path(containerIdStr).path(logFile) .path("containerlogs").path(containerIdStr).path(logFile)
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
System.out.println(response.getEntity(String.class)); out.println(response.getEntity(String.class));
System.out.println("End of LogType:" + logFile); out.println("End of LogType:" + logFile);
out.flush();
} catch (ClientHandlerException | UniformInterfaceException ex) { } catch (ClientHandlerException | UniformInterfaceException ex) {
System.err.println("Can not find the log file:" + logFile System.err.println("Can not find the log file:" + logFile
+ " for the container:" + containerIdStr + " in NodeManager:" + " for the container:" + containerIdStr + " in NodeManager:"
+ nodeId); + nodeId);
} }
} }
} finally {
logCliHelper.closePrintStream(out);
}
// for the case, we have already uploaded partial logs in HDFS // for the case, we have already uploaded partial logs in HDFS
logCliHelper.dumpAContainersLogsForALogType(appId, containerIdStr, nodeId, logCliHelper.dumpAContainersLogsForALogType(appId, containerIdStr, nodeId,
appOwner, Arrays.asList(requestedLogFiles), false); appOwner, Arrays.asList(requestedLogFiles), false, localDir);
} }
private void printContainerLogsForFinishedApplication(String appId, private void printContainerLogsForFinishedApplication(
String containerId, String nodeAddress, String[] logFiles, ContainerLogsRequest request, String[] logFiles,
LogCLIHelpers logCliHelper, String appOwner) throws IOException { LogCLIHelpers logCliHelper, String localDir)
String containerString = "\n\nContainer: " + containerId; throws IOException {
System.out.println(containerString); logCliHelper.dumpAContainersLogsForALogType(request.getAppId().toString(),
System.out.println(StringUtils.repeat("=", containerString.length())); request.getContainerId().toString(), request.getNodeId(),
logCliHelper.dumpAContainersLogsForALogType(appId, containerId, request.getAppOwner(), logFiles != null ? Arrays.asList(logFiles)
nodeAddress, appOwner, logFiles != null ? Arrays.asList(logFiles) : null, localDir);
: null);
} }
private int printContainerLogsForFinishedApplicationWithoutNodeId( private int printContainerLogsForFinishedApplicationWithoutNodeId(
String appId, String containerId, String[] logFiles, String appId, String containerId, String[] logFiles,
LogCLIHelpers logCliHelper, String appOwner) throws IOException { LogCLIHelpers logCliHelper, String appOwner, String localDir)
String containerString = "\n\nContainer: " + containerId; throws IOException {
System.out.println(containerString);
System.out.println(StringUtils.repeat("=", containerString.length()));
return logCliHelper.dumpAContainersLogsForALogTypeWithoutNodeId(appId, return logCliHelper.dumpAContainersLogsForALogTypeWithoutNodeId(appId,
containerId, appOwner, logFiles != null ? containerId, appOwner, logFiles != null ?
Arrays.asList(logFiles) : null); Arrays.asList(logFiles) : null, localDir);
} }
private ContainerReport getContainerReport(String containerIdStr) private ContainerReport getContainerReport(String containerIdStr)
@ -433,36 +439,41 @@ private boolean isApplicationFinished(YarnApplicationState appState) {
|| appState == YarnApplicationState.KILLED; || appState == YarnApplicationState.KILLED;
} }
private int printAMContainerLogs(Configuration conf, String appId, private int printAMContainerLogs(Configuration conf,
List<String> amContainers, String[] logFiles, LogCLIHelpers logCliHelper, ContainerLogsRequest request, List<String> amContainers,
String appOwner, boolean applicationFinished) throws Exception { String[] logFiles, LogCLIHelpers logCliHelper, String localDir)
throws Exception {
List<JSONObject> amContainersList = null; List<JSONObject> amContainersList = null;
List<AMLogsRequest> requests = new ArrayList<AMLogsRequest>(); List<ContainerLogsRequest> requests =
new ArrayList<ContainerLogsRequest>();
boolean getAMContainerLists = false; boolean getAMContainerLists = false;
String appId = request.getAppId().toString();
String errorMessage = ""; String errorMessage = "";
try { try {
amContainersList = getAMContainerInfoForRMWebService(conf, appId); amContainersList = getAMContainerInfoForRMWebService(conf, appId);
if (amContainersList != null && !amContainersList.isEmpty()) { if (amContainersList != null && !amContainersList.isEmpty()) {
getAMContainerLists = true; getAMContainerLists = true;
for (JSONObject amContainer : amContainersList) { for (JSONObject amContainer : amContainersList) {
AMLogsRequest request = new AMLogsRequest(applicationFinished); ContainerLogsRequest amRequest = new ContainerLogsRequest(request);
request.setAmContainerId(amContainer.getString("containerId")); amRequest.setContainerId(amContainer.getString("containerId"));
request.setNodeHttpAddress(amContainer.getString("nodeHttpAddress")); amRequest.setNodeHttpAddress(
request.setNodeId(amContainer.getString("nodeId")); amContainer.getString("nodeHttpAddress"));
requests.add(request); amRequest.setNodeId(amContainer.getString("nodeId"));
requests.add(amRequest);
} }
} }
} catch (Exception ex) { } catch (Exception ex) {
errorMessage = ex.getMessage(); errorMessage = ex.getMessage();
if (applicationFinished) { if (request.isAppFinished()) {
try { try {
amContainersList = getAMContainerInfoForAHSWebService(conf, appId); amContainersList = getAMContainerInfoForAHSWebService(conf, appId);
if (amContainersList != null && !amContainersList.isEmpty()) { if (amContainersList != null && !amContainersList.isEmpty()) {
getAMContainerLists = true; getAMContainerLists = true;
for (JSONObject amContainer : amContainersList) { for (JSONObject amContainer : amContainersList) {
AMLogsRequest request = new AMLogsRequest(applicationFinished); ContainerLogsRequest amRequest = new ContainerLogsRequest(
request.setAmContainerId(amContainer.getString("amContainerId")); request);
requests.add(request); amRequest.setContainerId(amContainer.getString("amContainerId"));
requests.add(amRequest);
} }
} }
} catch (Exception e) { } catch (Exception e) {
@ -479,9 +490,9 @@ private int printAMContainerLogs(Configuration conf, String appId,
} }
if (amContainers.contains("ALL")) { if (amContainers.contains("ALL")) {
for (AMLogsRequest request : requests) { for (ContainerLogsRequest amRequest : requests) {
outputAMContainerLogs(request, conf, appId, logFiles, logCliHelper, outputAMContainerLogs(amRequest, conf, logFiles,
appOwner); logCliHelper, localDir);
} }
System.out.println(); System.out.println();
System.out.println("Specified ALL for -am option. " System.out.println("Specified ALL for -am option. "
@ -490,12 +501,12 @@ private int printAMContainerLogs(Configuration conf, String appId,
for (String amContainer : amContainers) { for (String amContainer : amContainers) {
int amContainerId = Integer.parseInt(amContainer.trim()); int amContainerId = Integer.parseInt(amContainer.trim());
if (amContainerId == -1) { if (amContainerId == -1) {
outputAMContainerLogs(requests.get(requests.size() - 1), conf, appId, outputAMContainerLogs(requests.get(requests.size() - 1), conf,
logFiles, logCliHelper, appOwner); logFiles, logCliHelper, localDir);
} else { } else {
if (amContainerId <= requests.size()) { if (amContainerId <= requests.size()) {
outputAMContainerLogs(requests.get(amContainerId - 1), conf, appId, outputAMContainerLogs(requests.get(amContainerId - 1), conf,
logFiles, logCliHelper, appOwner); logFiles, logCliHelper, localDir);
} }
} }
} }
@ -503,11 +514,11 @@ private int printAMContainerLogs(Configuration conf, String appId,
return 0; return 0;
} }
private void outputAMContainerLogs(AMLogsRequest request, Configuration conf, private void outputAMContainerLogs(ContainerLogsRequest request,
String appId, String[] logFiles, LogCLIHelpers logCliHelper, Configuration conf, String[] logFiles,
String appOwner) throws Exception { LogCLIHelpers logCliHelper, String localDir) throws Exception {
String nodeHttpAddress = request.getNodeHttpAddress(); String nodeHttpAddress = request.getNodeHttpAddress();
String containerId = request.getAmContainerId(); String containerId = request.getContainerId();
String nodeId = request.getNodeId(); String nodeId = request.getNodeId();
if (request.isAppFinished()) { if (request.isAppFinished()) {
@ -516,6 +527,7 @@ private void outputAMContainerLogs(AMLogsRequest request, Configuration conf,
try { try {
nodeId = nodeId =
getContainerReport(containerId).getAssignedNode().toString(); getContainerReport(containerId).getAssignedNode().toString();
request.setNodeId(nodeId);
} catch (Exception ex) { } catch (Exception ex) {
System.err.println(ex); System.err.println(ex);
nodeId = null; nodeId = null;
@ -526,8 +538,8 @@ private void outputAMContainerLogs(AMLogsRequest request, Configuration conf,
if(!fetchAllLogFiles(logFiles)) { if(!fetchAllLogFiles(logFiles)) {
requestedLogFilesList = logFiles; requestedLogFilesList = logFiles;
} }
printContainerLogsForFinishedApplication(appId, containerId, nodeId, printContainerLogsForFinishedApplication(request,
requestedLogFilesList, logCliHelper, appOwner); requestedLogFilesList, logCliHelper, localDir);
} }
} }
} else { } else {
@ -540,36 +552,34 @@ private void outputAMContainerLogs(AMLogsRequest request, Configuration conf,
getContainerLogFiles(getConf(), containerId, nodeHttpAddress); getContainerLogFiles(getConf(), containerId, nodeHttpAddress);
} }
printContainerLogsFromRunningApplication(conf, printContainerLogsFromRunningApplication(conf,
ContainerId.fromString(containerId), nodeHttpAddress, nodeId, request, requestedLogFiles, logCliHelper, localDir);
requestedLogFiles, logCliHelper, appOwner);
} }
} }
} }
private int showMetaInfo(YarnApplicationState appState, private int showMetaInfo(ContainerLogsRequest request,
boolean appStateObtainedSuccessfully, LogCLIHelpers logCliHelper, LogCLIHelpers logCliHelper) throws IOException {
ApplicationId appId, String containerIdStr, String nodeAddress, if (!request.isAppFinished()) {
String appOwner) throws IOException {
if (!isApplicationFinished(appState) && appStateObtainedSuccessfully) {
System.err.println("The -show_meta_info command can be only used " System.err.println("The -show_meta_info command can be only used "
+ "with finished applications"); + "with finished applications");
return -1; return -1;
} else { } else {
logCliHelper.printLogMetadata(appId, containerIdStr, nodeAddress, logCliHelper.printLogMetadata(request.getAppId(),
appOwner, System.out, System.err); request.getContainerId(), request.getNodeId(),
request.getAppOwner(), System.out, System.err);
return 0; return 0;
} }
} }
private int showNodeLists(YarnApplicationState appState, private int showNodeLists(ContainerLogsRequest request,
boolean appStateObtainedSuccessfully, LogCLIHelpers logCliHelper, LogCLIHelpers logCliHelper) throws IOException {
ApplicationId appId, String appOwner) throws IOException { if (!request.isAppFinished()) {
if (!isApplicationFinished(appState) && appStateObtainedSuccessfully) {
System.err.println("The -list_nodes command can be only used with " System.err.println("The -list_nodes command can be only used with "
+ "finished applications"); + "finished applications");
return -1; return -1;
} else { } else {
logCliHelper.printNodesList(appId, appOwner, System.out, System.err); logCliHelper.printNodesList(request.getAppId(), request.getAppOwner(),
System.out, System.err);
return 0; return 0;
} }
} }
@ -619,11 +629,15 @@ private Options createCommandOpts() {
opts.addOption(LIST_NODES_OPTION, false, opts.addOption(LIST_NODES_OPTION, false,
"Show the list of nodes that successfully aggregated logs. " "Show the list of nodes that successfully aggregated logs. "
+ "This option can only be used with finished applications."); + "This option can only be used with finished applications.");
opts.addOption(OUT_OPTION, true, "Local directory for storing individual "
+ "container logs. The container logs will be stored based on the "
+ "node the container ran on.");
opts.getOption(APPLICATION_ID_OPTION).setArgName("Application ID"); opts.getOption(APPLICATION_ID_OPTION).setArgName("Application ID");
opts.getOption(CONTAINER_ID_OPTION).setArgName("Container ID"); opts.getOption(CONTAINER_ID_OPTION).setArgName("Container ID");
opts.getOption(NODE_ADDRESS_OPTION).setArgName("Node Address"); opts.getOption(NODE_ADDRESS_OPTION).setArgName("Node Address");
opts.getOption(APP_OWNER_OPTION).setArgName("Application Owner"); opts.getOption(APP_OWNER_OPTION).setArgName("Application Owner");
opts.getOption(AM_CONTAINER_OPTION).setArgName("AM Containers"); opts.getOption(AM_CONTAINER_OPTION).setArgName("AM Containers");
opts.getOption(OUT_OPTION).setArgName("Local Directory");
return opts; return opts;
} }
@ -637,6 +651,7 @@ private Options createPrintOpts(Options commandOpts) {
printOpts.addOption(commandOpts.getOption(CONTAINER_LOG_FILES)); printOpts.addOption(commandOpts.getOption(CONTAINER_LOG_FILES));
printOpts.addOption(commandOpts.getOption(SHOW_META_INFO)); printOpts.addOption(commandOpts.getOption(SHOW_META_INFO));
printOpts.addOption(commandOpts.getOption(LIST_NODES_OPTION)); printOpts.addOption(commandOpts.getOption(LIST_NODES_OPTION));
printOpts.addOption(commandOpts.getOption(OUT_OPTION));
return printOpts; return printOpts;
} }
@ -671,10 +686,9 @@ private List<String> parseAMContainer(CommandLine commandLine,
return amContainersList; return amContainersList;
} }
private int fetchAMContainerLogs(String[] logFiles, private int fetchAMContainerLogs(ContainerLogsRequest request,
YarnApplicationState appState, ApplicationId appId, List<String> amContainersList, String[] logFiles,
String appOwner, List<String> amContainersList, LogCLIHelpers logCliHelper, String localDir) throws Exception {
LogCLIHelpers logCliHelper) throws Exception {
// if we do not specify the value for CONTAINER_LOG_FILES option, // if we do not specify the value for CONTAINER_LOG_FILES option,
// we will only output syslog // we will only output syslog
if (logFiles == null || logFiles.length == 0) { if (logFiles == null || logFiles.length == 0) {
@ -685,10 +699,9 @@ private int fetchAMContainerLogs(String[] logFiles,
// and containerId for all the AM Containers. // and containerId for all the AM Containers.
// After that, we will call NodeManager webService to get the // After that, we will call NodeManager webService to get the
// related logs // related logs
if (appState == YarnApplicationState.ACCEPTED if (!request.isAppFinished()) {
|| appState == YarnApplicationState.RUNNING) { return printAMContainerLogs(getConf(), request, amContainersList,
return printAMContainerLogs(getConf(), appId.toString(), amContainersList, logFiles, logCliHelper, localDir);
logFiles, logCliHelper, appOwner, false);
} else { } else {
// If the application is in the final state, we will call RM webservice // If the application is in the final state, we will call RM webservice
// to get all AppAttempts information first. If we get nothing, // to get all AppAttempts information first. If we get nothing,
@ -698,9 +711,11 @@ private int fetchAMContainerLogs(String[] logFiles,
// to get logs from HDFS directly. // to get logs from HDFS directly.
if (getConf().getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, if (getConf().getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) { YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
return printAMContainerLogs(getConf(), appId.toString(), return printAMContainerLogs(getConf(), request, amContainersList,
amContainersList, logFiles, logCliHelper, appOwner, true); logFiles, logCliHelper, localDir);
} else { } else {
ApplicationId appId = request.getAppId();
String appOwner = request.getAppOwner();
System.err.println("Can not get AMContainers logs for " System.err.println("Can not get AMContainers logs for "
+ "the application:" + appId + " with the appOwner:" + appOwner); + "the application:" + appId + " with the appOwner:" + appOwner);
System.err.println("This application:" + appId + " has finished." System.err.println("This application:" + appId + " has finished."
@ -713,18 +728,18 @@ private int fetchAMContainerLogs(String[] logFiles,
} }
} }
private int fetchContainerLogs(YarnApplicationState appState, private int fetchContainerLogs(ContainerLogsRequest request,
boolean appStateObtainedSuccessfully, String[] logFiles, String[] logFiles, LogCLIHelpers logCliHelper, String localDir)
String appOwner, String nodeAddress, throws IOException {
ContainerId containerId, LogCLIHelpers logCliHelper) throws IOException {
int resultCode = 0; int resultCode = 0;
String appIdStr = containerId.getApplicationAttemptId() String appIdStr = request.getAppId().toString();
.getApplicationId().toString(); String containerIdStr = request.getContainerId();
String containerIdStr = containerId.toString(); String nodeAddress = request.getNodeId();
String appOwner = request.getAppOwner();
boolean isAppFinished = request.isAppFinished();
// if we provide the node address and the application is in the final // if we provide the node address and the application is in the final
// state, we could directly get logs from HDFS. // state, we could directly get logs from HDFS.
if (nodeAddress != null && (!appStateObtainedSuccessfully || if (nodeAddress != null && isAppFinished) {
isApplicationFinished(appState))) {
// if user specified "ALL" as the logFiles param, pass null // if user specified "ALL" as the logFiles param, pass null
// to logCliHelper so that it fetches all the logs // to logCliHelper so that it fetches all the logs
List<String> logs; List<String> logs;
@ -736,7 +751,7 @@ private int fetchContainerLogs(YarnApplicationState appState,
logs = Arrays.asList(logFiles); logs = Arrays.asList(logFiles);
} }
return logCliHelper.dumpAContainersLogsForALogType(appIdStr, return logCliHelper.dumpAContainersLogsForALogType(appIdStr,
containerIdStr, nodeAddress, appOwner, logs); containerIdStr, nodeAddress, appOwner, logs, localDir);
} }
String nodeHttpAddress = null; String nodeHttpAddress = null;
String nodeId = null; String nodeId = null;
@ -749,16 +764,18 @@ private int fetchContainerLogs(YarnApplicationState appState,
report.getNodeHttpAddress().replaceFirst( report.getNodeHttpAddress().replaceFirst(
WebAppUtils.getHttpSchemePrefix(getConf()), ""); WebAppUtils.getHttpSchemePrefix(getConf()), "");
nodeId = report.getAssignedNode().toString(); nodeId = report.getAssignedNode().toString();
request.setNodeId(nodeId);
request.setNodeHttpAddress(nodeHttpAddress);
} catch (IOException | YarnException ex) { } catch (IOException | YarnException ex) {
if (!appStateObtainedSuccessfully || isApplicationFinished(appState)) { if (isAppFinished) {
String[] requestedLogFiles = logFiles; String[] requestedLogFiles = logFiles;
if(fetchAllLogFiles(logFiles)) { if(fetchAllLogFiles(logFiles)) {
requestedLogFiles = null; requestedLogFiles = null;
} }
return printContainerLogsForFinishedApplicationWithoutNodeId( return printContainerLogsForFinishedApplicationWithoutNodeId(
appIdStr, containerIdStr, requestedLogFiles, logCliHelper, appIdStr, containerIdStr, requestedLogFiles, logCliHelper,
appOwner); appOwner, localDir);
} else if (!isApplicationFinished(appState)) { } else {
System.err.println("Unable to get logs for this container:" System.err.println("Unable to get logs for this container:"
+ containerIdStr + "for the application:" + appIdStr + containerIdStr + "for the application:" + appIdStr
+ " with the appOwner: " + appOwner); + " with the appOwner: " + appOwner);
@ -772,12 +789,12 @@ private int fetchContainerLogs(YarnApplicationState appState,
// If the application is not in the final state, // If the application is not in the final state,
// we will provide the NodeHttpAddress and get the container logs // we will provide the NodeHttpAddress and get the container logs
// by calling NodeManager webservice. // by calling NodeManager webservice.
if (!isApplicationFinished(appState)) { if (!isAppFinished) {
if (logFiles == null || logFiles.length == 0) { if (logFiles == null || logFiles.length == 0) {
logFiles = new String[] {"syslog"}; logFiles = new String[] {"syslog"};
} }
printContainerLogsFromRunningApplication(getConf(), containerId, printContainerLogsFromRunningApplication(getConf(), request,
nodeHttpAddress, nodeId, logFiles, logCliHelper, appOwner); logFiles, logCliHelper, localDir);
} else { } else {
String[] requestedLogFiles = logFiles; String[] requestedLogFiles = logFiles;
if(fetchAllLogFiles(logFiles)) { if(fetchAllLogFiles(logFiles)) {
@ -785,52 +802,21 @@ private int fetchContainerLogs(YarnApplicationState appState,
} }
// If the application is in the final state, we will directly // If the application is in the final state, we will directly
// get the container logs from HDFS. // get the container logs from HDFS.
printContainerLogsForFinishedApplication(appIdStr, containerIdStr, printContainerLogsForFinishedApplication(request,
nodeId, requestedLogFiles, logCliHelper, appOwner); requestedLogFiles, logCliHelper, localDir);
} }
return resultCode; return resultCode;
} }
private static class AMLogsRequest { private int fetchApplicationLogs(ApplicationId appId, String appOwner,
private String amContainerId; LogCLIHelpers logCliHelper, String localDir) throws IOException {
private String nodeId; int resultCode =
private String nodeHttpAddress; logCliHelper.dumpAllContainersLogs(appId, appOwner, localDir);
private final boolean isAppFinished; if (resultCode == -1) {
System.err.println("Can not find the logs for the application: "
AMLogsRequest(boolean isAppFinished) { + appId + " with the appOwner: " + appOwner);
this.isAppFinished = isAppFinished;
this.setAmContainerId("");
this.setNodeId("");
this.setNodeHttpAddress("");
}
public String getAmContainerId() {
return amContainerId;
}
public void setAmContainerId(String amContainerId) {
this.amContainerId = amContainerId;
}
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
public String getNodeHttpAddress() {
return nodeHttpAddress;
}
public void setNodeHttpAddress(String nodeHttpAddress) {
this.nodeHttpAddress = nodeHttpAddress;
}
public boolean isAppFinished() {
return isAppFinished;
} }
return resultCode;
} }
private String guessAppOwner(ApplicationReport appReport, private String guessAppOwner(ApplicationReport appReport,
@ -846,4 +832,81 @@ private String guessAppOwner(ApplicationReport appReport,
} }
return appOwner; return appOwner;
} }
private static class ContainerLogsRequest {
private ApplicationId appId;
private String containerId;
private String nodeId;
private String nodeHttpAddress;
private String appOwner;
private boolean appFinished;
public ContainerLogsRequest(ContainerLogsRequest request) {
this.setAppId(request.getAppId());
this.setAppFinished(request.isAppFinished());
this.setAppOwner(request.getAppOwner());
this.setNodeId(request.getNodeId());
this.setNodeHttpAddress(request.getNodeHttpAddress());
this.setContainerId(request.getContainerId());
}
public ContainerLogsRequest(ApplicationId applicationId,
boolean isAppFinished, String owner,
String address, String httpAddress, String container) {
this.setAppId(applicationId);
this.setAppFinished(isAppFinished);
this.setAppOwner(owner);
this.setNodeId(address);
this.setNodeHttpAddress(httpAddress);
this.setContainerId(container);
}
public ApplicationId getAppId() {
return appId;
}
public void setAppId(ApplicationId appId) {
this.appId = appId;
}
public String getContainerId() {
return containerId;
}
public void setContainerId(String containerId) {
this.containerId = containerId;
}
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeAddress) {
this.nodeId = nodeAddress;
}
public String getAppOwner() {
return appOwner;
}
public void setAppOwner(String appOwner) {
this.appOwner = appOwner;
}
public String getNodeHttpAddress() {
return nodeHttpAddress;
}
public void setNodeHttpAddress(String nodeHttpAddress) {
this.nodeHttpAddress = nodeHttpAddress;
}
public boolean isAppFinished() {
return appFinished;
}
public void setAppFinished(boolean appFinished) {
this.appFinished = appFinished;
}
}
} }

View File

@ -26,11 +26,13 @@
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.File; import java.io.File;
import java.io.FileWriter; import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream; import java.io.PrintStream;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.Writer; import java.io.Writer;
@ -41,8 +43,9 @@
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -180,6 +183,10 @@ public void testHelpMessage() throws Exception {
pw.println(" container log files. Use \"ALL\" to fetch"); pw.println(" container log files. Use \"ALL\" to fetch");
pw.println(" all the log files for the container."); pw.println(" all the log files for the container.");
pw.println(" -nodeAddress <Node Address> NodeAddress in the format nodename:port"); pw.println(" -nodeAddress <Node Address> NodeAddress in the format nodename:port");
pw.println(" -out <Local Directory> Local directory for storing individual");
pw.println(" container logs. The container logs will");
pw.println(" be stored based on the node the container");
pw.println(" ran on.");
pw.println(" -show_meta_info Show the log metadata, including log-file"); pw.println(" -show_meta_info Show the log metadata, including log-file");
pw.println(" names, the size of the log files. You can"); pw.println(" names, the size of the log files. You can");
pw.println(" combine this with --containerId to get"); pw.println(" combine this with --containerId to get");
@ -492,6 +499,103 @@ public void testFetchApplictionLogsAsAnotherUser() throws Exception {
} }
} }
@Test (timeout = 15000)
public void testSaveContainerLogsLocally() throws Exception {
String remoteLogRootDir = "target/logs/";
String rootLogDir = "target/LocalLogs";
String localDir = "target/SaveLogs";
Path localPath = new Path(localDir);
Configuration configuration = new Configuration();
configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
configuration
.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir);
configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
configuration.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
FileSystem fs = FileSystem.get(configuration);
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
List<ContainerId> containerIds = new ArrayList<ContainerId>();
ContainerId containerId1 = ContainerId.newContainerId(
appAttemptId, 1);
ContainerId containerId2 = ContainerId.newContainerId(
appAttemptId, 2);
containerIds.add(containerId1);
containerIds.add(containerId2);
List<NodeId> nodeIds = new ArrayList<NodeId>();
NodeId nodeId = NodeId.newInstance("localhost", 1234);
NodeId nodeId2 = NodeId.newInstance("test", 4567);
nodeIds.add(nodeId);
nodeIds.add(nodeId2);
try {
createContainerLogs(configuration, remoteLogRootDir, rootLogDir, fs,
appId, containerIds, nodeIds);
YarnClient mockYarnClient =
createMockYarnClient(YarnApplicationState.FINISHED,
UserGroupInformation.getCurrentUser().getShortUserName());
LogsCLI cli = new LogsCLIForTest(mockYarnClient);
cli.setConf(configuration);
int exitCode = cli.run(new String[] {"-applicationId",
appId.toString(),
"-out" , localPath.toString()});
assertTrue(exitCode == 0);
// make sure we created a dir named as node id
FileStatus[] nodeDir = fs.listStatus(localPath);
Arrays.sort(nodeDir);
assertTrue(nodeDir.length == 2);
assertTrue(nodeDir[0].getPath().getName().contains(
LogAggregationUtils.getNodeString(nodeId)));
assertTrue(nodeDir[1].getPath().getName().contains(
LogAggregationUtils.getNodeString(nodeId2)));
FileStatus[] container1Dir = fs.listStatus(nodeDir[0].getPath());
assertTrue(container1Dir.length == 1);
assertTrue(container1Dir[0].getPath().getName().equals(
containerId1.toString()));
String container1= readContainerContent(container1Dir[0].getPath(), fs);
assertTrue(container1.contains("Hello " + containerId1
+ " in syslog!"));
FileStatus[] container2Dir = fs.listStatus(nodeDir[1].getPath());
assertTrue(container2Dir.length == 1);
assertTrue(container2Dir[0].getPath().getName().equals(
containerId2.toString()));
String container2= readContainerContent(container2Dir[0].getPath(), fs);
assertTrue(container2.contains("Hello " + containerId2
+ " in syslog!"));
} finally {
fs.delete(new Path(remoteLogRootDir), true);
fs.delete(new Path(rootLogDir), true);
fs.delete(localPath, true);
}
}
private String readContainerContent(Path containerPath,
FileSystem fs) throws IOException {
assertTrue(fs.exists(containerPath));
StringBuffer inputLine = new StringBuffer();
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(
fs.open(containerPath)));
String tmp;
while ((tmp = reader.readLine()) != null) {
inputLine.append(tmp);
}
return inputLine.toString();
} finally {
if (reader != null) {
IOUtils.closeQuietly(reader);
}
}
}
@Test (timeout = 15000) @Test (timeout = 15000)
public void testPrintContainerLogMetadata() throws Exception { public void testPrintContainerLogMetadata() throws Exception {
String remoteLogRootDir = "target/logs/"; String remoteLogRootDir = "target/logs/";

View File

@ -24,8 +24,10 @@
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.nio.file.AccessDeniedException; import java.nio.file.AccessDeniedException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List; import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
@ -54,7 +56,7 @@ public class LogCLIHelpers implements Configurable {
public int dumpAContainersLogs(String appId, String containerId, public int dumpAContainersLogs(String appId, String containerId,
String nodeId, String jobOwner) throws IOException { String nodeId, String jobOwner) throws IOException {
return dumpAContainersLogsForALogType(appId, containerId, nodeId, jobOwner, return dumpAContainersLogsForALogType(appId, containerId, nodeId, jobOwner,
null); null, null);
} }
@Private @Private
@ -107,17 +109,17 @@ public static String getOwnerForAppIdOrNull(
@Private @Private
@VisibleForTesting @VisibleForTesting
public int dumpAContainersLogsForALogType(String appId, String containerId, public int dumpAContainersLogsForALogType(String appId, String containerId,
String nodeId, String jobOwner, List<String> logType) String nodeId, String jobOwner, List<String> logType, String localDir)
throws IOException { throws IOException {
return dumpAContainersLogsForALogType(appId, containerId, nodeId, jobOwner, return dumpAContainersLogsForALogType(appId, containerId, nodeId,
logType, true); jobOwner, logType, true, localDir);
} }
@Private @Private
@VisibleForTesting @VisibleForTesting
public int dumpAContainersLogsForALogType(String appId, String containerId, public int dumpAContainersLogsForALogType(String appId, String containerId,
String nodeId, String jobOwner, List<String> logType, String nodeId, String jobOwner, List<String> logType,
boolean outputFailure) throws IOException { boolean outputFailure, String localDir) throws IOException {
ApplicationId applicationId = ConverterUtils.toApplicationId(appId); ApplicationId applicationId = ConverterUtils.toApplicationId(appId);
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir( RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
applicationId, jobOwner); applicationId, jobOwner);
@ -137,17 +139,21 @@ public int dumpAContainersLogsForALogType(String appId, String containerId,
if (fileName.contains(LogAggregationUtils.getNodeString(nodeId)) if (fileName.contains(LogAggregationUtils.getNodeString(nodeId))
&& !fileName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { && !fileName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader = null; AggregatedLogFormat.LogReader reader = null;
PrintStream out = createPrintStream(localDir, fileName, containerId);
try { try {
String containerString = "\n\nContainer: " + containerId;
out.println(containerString);
out.println(StringUtils.repeat("=", containerString.length()));
reader = reader =
new AggregatedLogFormat.LogReader(getConf(), new AggregatedLogFormat.LogReader(getConf(),
thisNodeFile.getPath()); thisNodeFile.getPath());
if (logType == null) { if (logType == null) {
if (dumpAContainerLogs(containerId, reader, System.out, if (dumpAContainerLogs(containerId, reader, out,
thisNodeFile.getModificationTime()) > -1) { thisNodeFile.getModificationTime()) > -1) {
foundContainerLogs = true; foundContainerLogs = true;
} }
} else { } else {
if (dumpAContainerLogsForALogType(containerId, reader, System.out, if (dumpAContainerLogsForALogType(containerId, reader, out,
thisNodeFile.getModificationTime(), logType) > -1) { thisNodeFile.getModificationTime(), logType) > -1) {
foundContainerLogs = true; foundContainerLogs = true;
} }
@ -156,11 +162,14 @@ public int dumpAContainersLogsForALogType(String appId, String containerId,
if (reader != null) { if (reader != null) {
reader.close(); reader.close();
} }
closePrintStream(out);
} }
} }
} }
if (!foundContainerLogs && outputFailure) { if (!foundContainerLogs) {
if (outputFailure) {
containerLogNotFound(containerId); containerLogNotFound(containerId);
}
return -1; return -1;
} }
return 0; return 0;
@ -168,8 +177,8 @@ public int dumpAContainersLogsForALogType(String appId, String containerId,
@Private @Private
public int dumpAContainersLogsForALogTypeWithoutNodeId(String appId, public int dumpAContainersLogsForALogTypeWithoutNodeId(String appId,
String containerId, String jobOwner, List<String> logType) String containerId, String jobOwner, List<String> logType,
throws IOException { String localDir) throws IOException {
ApplicationId applicationId = ConverterUtils.toApplicationId(appId); ApplicationId applicationId = ConverterUtils.toApplicationId(appId);
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir( RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
applicationId, jobOwner); applicationId, jobOwner);
@ -182,17 +191,28 @@ public int dumpAContainersLogsForALogTypeWithoutNodeId(String appId,
if (!thisNodeFile.getPath().getName().endsWith( if (!thisNodeFile.getPath().getName().endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX)) { LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader = null; AggregatedLogFormat.LogReader reader = null;
PrintStream out = System.out;
try { try {
reader = reader =
new AggregatedLogFormat.LogReader(getConf(), new AggregatedLogFormat.LogReader(getConf(),
thisNodeFile.getPath()); thisNodeFile.getPath());
if (getContainerLogsStream(containerId, reader) == null) {
continue;
}
reader =
new AggregatedLogFormat.LogReader(getConf(),
thisNodeFile.getPath());
out = createPrintStream(localDir, thisNodeFile.getPath().getName(),
containerId);
out.println(containerId);
out.println(StringUtils.repeat("=", containerId.length()));
if (logType == null) { if (logType == null) {
if (dumpAContainerLogs(containerId, reader, System.out, if (dumpAContainerLogs(containerId, reader, out,
thisNodeFile.getModificationTime()) > -1) { thisNodeFile.getModificationTime()) > -1) {
foundContainerLogs = true; foundContainerLogs = true;
} }
} else { } else {
if (dumpAContainerLogsForALogType(containerId, reader, System.out, if (dumpAContainerLogsForALogType(containerId, reader, out,
thisNodeFile.getModificationTime(), logType) > -1) { thisNodeFile.getModificationTime(), logType) > -1) {
foundContainerLogs = true; foundContainerLogs = true;
} }
@ -201,6 +221,7 @@ public int dumpAContainersLogsForALogTypeWithoutNodeId(String appId,
if (reader != null) { if (reader != null) {
reader.close(); reader.close();
} }
closePrintStream(out);
} }
} }
} }
@ -210,19 +231,13 @@ public int dumpAContainersLogsForALogTypeWithoutNodeId(String appId,
} }
return 0; return 0;
} }
@Private @Private
public int dumpAContainerLogs(String containerIdStr, public int dumpAContainerLogs(String containerIdStr,
AggregatedLogFormat.LogReader reader, PrintStream out, AggregatedLogFormat.LogReader reader, PrintStream out,
long logUploadedTime) throws IOException { long logUploadedTime) throws IOException {
DataInputStream valueStream; DataInputStream valueStream = getContainerLogsStream(
LogKey key = new LogKey(); containerIdStr, reader);
valueStream = reader.next(key);
while (valueStream != null && !key.toString().equals(containerIdStr)) {
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
if (valueStream == null) { if (valueStream == null) {
return -1; return -1;
@ -244,10 +259,8 @@ public int dumpAContainerLogs(String containerIdStr,
return -1; return -1;
} }
@Private private DataInputStream getContainerLogsStream(String containerIdStr,
public int dumpAContainerLogsForALogType(String containerIdStr, AggregatedLogFormat.LogReader reader) throws IOException {
AggregatedLogFormat.LogReader reader, PrintStream out,
long logUploadedTime, List<String> logType) throws IOException {
DataInputStream valueStream; DataInputStream valueStream;
LogKey key = new LogKey(); LogKey key = new LogKey();
valueStream = reader.next(key); valueStream = reader.next(key);
@ -257,7 +270,15 @@ public int dumpAContainerLogsForALogType(String containerIdStr,
key = new LogKey(); key = new LogKey();
valueStream = reader.next(key); valueStream = reader.next(key);
} }
return valueStream;
}
@Private
public int dumpAContainerLogsForALogType(String containerIdStr,
AggregatedLogFormat.LogReader reader, PrintStream out,
long logUploadedTime, List<String> logType) throws IOException {
DataInputStream valueStream = getContainerLogsStream(
containerIdStr, reader);
if (valueStream == null) { if (valueStream == null) {
return -1; return -1;
} }
@ -283,7 +304,7 @@ public int dumpAContainerLogsForALogType(String containerIdStr,
@Private @Private
public int dumpAllContainersLogs(ApplicationId appId, String appOwner, public int dumpAllContainersLogs(ApplicationId appId, String appOwner,
PrintStream out) throws IOException { String localDir) throws IOException {
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir( RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
appId, appOwner); appId, appOwner);
if (nodeFiles == null) { if (nodeFiles == null) {
@ -310,7 +331,9 @@ public int dumpAllContainersLogs(ApplicationId appId, String appOwner,
valueStream = reader.next(key); valueStream = reader.next(key);
while (valueStream != null) { while (valueStream != null) {
PrintStream out = createPrintStream(localDir,
thisNodeFile.getPath().getName(), key.toString());
try {
String containerString = String containerString =
"\n\nContainer: " + key + " on " "\n\nContainer: " + key + " on "
+ thisNodeFile.getPath().getName(); + thisNodeFile.getPath().getName();
@ -325,6 +348,9 @@ public int dumpAllContainersLogs(ApplicationId appId, String appOwner,
break; break;
} }
} }
} finally {
closePrintStream(out);
}
// Next container // Next container
key = new LogKey(); key = new LogKey();
@ -501,4 +527,24 @@ private static void logDirNoAccessPermission(String remoteAppLogDir,
+ "have permission to access " + remoteAppLogDir + "have permission to access " + remoteAppLogDir
+ ". Error message found: " + errorMessage); + ". Error message found: " + errorMessage);
} }
@Private
public PrintStream createPrintStream(String localDir, String nodeId,
String containerId) throws IOException {
PrintStream out = System.out;
if(localDir != null && !localDir.isEmpty()) {
Path nodePath = new Path(localDir, LogAggregationUtils
.getNodeString(nodeId));
Files.createDirectories(Paths.get(nodePath.toString()));
Path containerLogPath = new Path(nodePath, containerId);
out = new PrintStream(containerLogPath.toString(), "UTF-8");
}
return out;
}
public void closePrintStream(PrintStream out) {
if (out != System.out) {
IOUtils.closeQuietly(out);
}
}
} }