From 3c9392a31d61927c9fcdd3f58086094e61dff0a9 Mon Sep 17 00:00:00 2001 From: Varun Vasudev Date: Wed, 18 May 2016 22:44:33 +0530 Subject: [PATCH] YARN-4913. Yarn logs should take a -out option to write to a directory. Contributed by Xuan Gong. (cherry picked from commit ef1757790d89cc72f88f5330761b1c8901c59e94) --- .../hadoop/yarn/client/cli/LogsCLI.java | 415 ++++++++++-------- .../hadoop/yarn/client/cli/TestLogsCLI.java | 106 ++++- .../yarn/logaggregation/LogCLIHelpers.java | 128 ++++-- 3 files changed, 431 insertions(+), 218 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java index 1ac4d61f5d3..039ba3c6800 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.client.cli; import java.io.IOException; +import java.io.PrintStream; import java.io.StringReader; import java.util.ArrayList; import java.util.Arrays; @@ -81,15 +82,13 @@ public class LogsCLI extends Configured implements Tool { private static final String CONTAINER_LOG_FILES = "logFiles"; private static final String SHOW_META_INFO = "show_meta_info"; private static final String LIST_NODES_OPTION = "list_nodes"; + private static final String OUT_OPTION = "out"; public static final String HELP_CMD = "help"; @Override public int run(String[] args) throws Exception { - Options opts = createCommandOpts(); - Options printOpts = createPrintOpts(opts); - if (args.length < 1) { printHelpMessage(printOpts); return -1; @@ -108,6 +107,7 @@ public class LogsCLI extends Configured implements Tool { boolean nodesList = false; String[] logFiles = null; List amContainersList = new ArrayList(); + String localDir = null; try { CommandLine commandLine = parser.parse(opts, args, true); appIdStr = commandLine.getOptionValue(APPLICATION_ID_OPTION); @@ -117,6 +117,7 @@ public class LogsCLI extends Configured implements Tool { getAMContainerLogs = commandLine.hasOption(AM_CONTAINER_OPTION); showMetaInfo = commandLine.hasOption(SHOW_META_INFO); nodesList = commandLine.hasOption(LIST_NODES_OPTION); + localDir = commandLine.getOptionValue(OUT_OPTION); if (getAMContainerLogs) { try { amContainersList = parseAMContainer(commandLine, printOpts); @@ -151,7 +152,6 @@ public class LogsCLI extends Configured implements Tool { LogCLIHelpers logCliHelper = new LogCLIHelpers(); logCliHelper.setConf(getConf()); - boolean appStateObtainedSuccessfully = true; YarnApplicationState appState = YarnApplicationState.NEW; ApplicationReport appReport = null; try { @@ -164,7 +164,9 @@ public class LogsCLI extends Configured implements Tool { return -1; } } catch (IOException | YarnException e) { - appStateObtainedSuccessfully = false; + // If we can not get appReport from either RM or ATS + // We will assume that this app has already finished. + appState = YarnApplicationState.FINISHED; System.err.println("Unable to get ApplicationState." + " Attempting to fetch logs directly from the filesystem."); } @@ -179,19 +181,22 @@ public class LogsCLI extends Configured implements Tool { } } + ContainerLogsRequest request = new ContainerLogsRequest(appId, + isApplicationFinished(appState), appOwner, + nodeAddress, null, containerIdStr); + if (showMetaInfo) { - return showMetaInfo(appState, appStateObtainedSuccessfully, - logCliHelper, appId, containerIdStr, nodeAddress, appOwner); + return showMetaInfo(request, logCliHelper); } if (nodesList) { - return showNodeLists(appState, appStateObtainedSuccessfully, - logCliHelper, appId, appOwner); + return showNodeLists(request, logCliHelper); } + // To get am logs if (getAMContainerLogs) { - return fetchAMContainerLogs(logFiles, appState, appId, appOwner, - amContainersList, logCliHelper); + return fetchAMContainerLogs(request, amContainersList, + logFiles, logCliHelper, localDir); } int resultCode = 0; @@ -203,16 +208,12 @@ public class LogsCLI extends Configured implements Tool { + " does not have the container:" + containerId); return -1; } - return fetchContainerLogs(appState, appStateObtainedSuccessfully, - logFiles, appOwner, nodeAddress, containerId, logCliHelper); + return fetchContainerLogs(request, logFiles, + logCliHelper, localDir); } else { if (nodeAddress == null) { - resultCode = - logCliHelper.dumpAllContainersLogs(appId, appOwner, System.out); - if (resultCode == -1) { - System.err.println("Can not find the logs for the application: " - + appId + " with the appOwner: " + appOwner); - } + resultCode = fetchApplicationLogs(appId, appOwner, + logCliHelper, localDir); } else { System.err.println("Should at least provide ContainerId!"); printHelpMessage(printOpts); @@ -351,69 +352,74 @@ public class LogsCLI extends Configured implements Tool { } private void printContainerLogsFromRunningApplication(Configuration conf, - ContainerId containerId, String nodeHttpAddress, - String nodeId, String[] logFiles, LogCLIHelpers logCliHelper, - String appOwner) throws IOException { - String appId = containerId.getApplicationAttemptId() - .getApplicationId().toString(); - String containerIdStr = containerId.toString(); + ContainerLogsRequest request, String[] logFiles, + LogCLIHelpers logCliHelper, String localDir) throws IOException { + String appId = request.getAppId().toString(); + String containerIdStr = request.getContainerId().toString(); String[] requestedLogFiles = logFiles; - // fetch all the log files for the container - if (fetchAllLogFiles(logFiles)) { - requestedLogFiles = - getContainerLogFiles(getConf(), containerIdStr, nodeHttpAddress); - } - Client webServiceClient = Client.create(); - String containerString = "\n\nContainer: " + containerIdStr; - System.out.println(containerString); - System.out.println(StringUtils.repeat("=", containerString.length())); - - for (String logFile : requestedLogFiles) { - System.out.println("LogType:" + logFile); - System.out.println("Log Upload Time:" - + Times.format(System.currentTimeMillis())); - System.out.println("Log Contents:"); - try { - WebResource webResource = - webServiceClient.resource(WebAppUtils.getHttpSchemePrefix(conf) - + nodeHttpAddress); - ClientResponse response = - webResource.path("ws").path("v1").path("node") - .path("containerlogs").path(containerIdStr).path(logFile) - .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); - System.out.println(response.getEntity(String.class)); - System.out.println("End of LogType:" + logFile); - } catch (ClientHandlerException | UniformInterfaceException ex) { - System.err.println("Can not find the log file:" + logFile - + " for the container:" + containerIdStr + " in NodeManager:" - + nodeId); + String nodeHttpAddress = request.getNodeHttpAddress(); + String nodeId = request.getNodeId(); + String appOwner = request.getAppOwner(); + PrintStream out = logCliHelper.createPrintStream(localDir, nodeId, + containerIdStr); + try { + // fetch all the log files for the container + if (fetchAllLogFiles(logFiles)) { + requestedLogFiles = + getContainerLogFiles(getConf(), containerIdStr, nodeHttpAddress); } + Client webServiceClient = Client.create(); + String containerString = "\n\nContainer: " + containerIdStr; + out.println(containerString); + out.println(StringUtils.repeat("=", containerString.length())); + + for (String logFile : requestedLogFiles) { + out.println("LogType:" + logFile); + out.println("Log Upload Time:" + + Times.format(System.currentTimeMillis())); + out.println("Log Contents:"); + try { + WebResource webResource = + webServiceClient.resource(WebAppUtils.getHttpSchemePrefix(conf) + + nodeHttpAddress); + ClientResponse response = + webResource.path("ws").path("v1").path("node") + .path("containerlogs").path(containerIdStr).path(logFile) + .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); + out.println(response.getEntity(String.class)); + out.println("End of LogType:" + logFile); + out.flush(); + } catch (ClientHandlerException | UniformInterfaceException ex) { + System.err.println("Can not find the log file:" + logFile + + " for the container:" + containerIdStr + " in NodeManager:" + + nodeId); + } + } + } finally { + logCliHelper.closePrintStream(out); } // for the case, we have already uploaded partial logs in HDFS logCliHelper.dumpAContainersLogsForALogType(appId, containerIdStr, nodeId, - appOwner, Arrays.asList(requestedLogFiles), false); + appOwner, Arrays.asList(requestedLogFiles), false, localDir); } - private void printContainerLogsForFinishedApplication(String appId, - String containerId, String nodeAddress, String[] logFiles, - LogCLIHelpers logCliHelper, String appOwner) throws IOException { - String containerString = "\n\nContainer: " + containerId; - System.out.println(containerString); - System.out.println(StringUtils.repeat("=", containerString.length())); - logCliHelper.dumpAContainersLogsForALogType(appId, containerId, - nodeAddress, appOwner, logFiles != null ? Arrays.asList(logFiles) - : null); + private void printContainerLogsForFinishedApplication( + ContainerLogsRequest request, String[] logFiles, + LogCLIHelpers logCliHelper, String localDir) + throws IOException { + logCliHelper.dumpAContainersLogsForALogType(request.getAppId().toString(), + request.getContainerId().toString(), request.getNodeId(), + request.getAppOwner(), logFiles != null ? Arrays.asList(logFiles) + : null, localDir); } private int printContainerLogsForFinishedApplicationWithoutNodeId( String appId, String containerId, String[] logFiles, - LogCLIHelpers logCliHelper, String appOwner) throws IOException { - String containerString = "\n\nContainer: " + containerId; - System.out.println(containerString); - System.out.println(StringUtils.repeat("=", containerString.length())); + LogCLIHelpers logCliHelper, String appOwner, String localDir) + throws IOException { return logCliHelper.dumpAContainersLogsForALogTypeWithoutNodeId(appId, containerId, appOwner, logFiles != null ? - Arrays.asList(logFiles) : null); + Arrays.asList(logFiles) : null, localDir); } private ContainerReport getContainerReport(String containerIdStr) @@ -433,36 +439,41 @@ public class LogsCLI extends Configured implements Tool { || appState == YarnApplicationState.KILLED; } - private int printAMContainerLogs(Configuration conf, String appId, - List amContainers, String[] logFiles, LogCLIHelpers logCliHelper, - String appOwner, boolean applicationFinished) throws Exception { + private int printAMContainerLogs(Configuration conf, + ContainerLogsRequest request, List amContainers, + String[] logFiles, LogCLIHelpers logCliHelper, String localDir) + throws Exception { List amContainersList = null; - List requests = new ArrayList(); + List requests = + new ArrayList(); boolean getAMContainerLists = false; + String appId = request.getAppId().toString(); String errorMessage = ""; try { amContainersList = getAMContainerInfoForRMWebService(conf, appId); if (amContainersList != null && !amContainersList.isEmpty()) { getAMContainerLists = true; for (JSONObject amContainer : amContainersList) { - AMLogsRequest request = new AMLogsRequest(applicationFinished); - request.setAmContainerId(amContainer.getString("containerId")); - request.setNodeHttpAddress(amContainer.getString("nodeHttpAddress")); - request.setNodeId(amContainer.getString("nodeId")); - requests.add(request); + ContainerLogsRequest amRequest = new ContainerLogsRequest(request); + amRequest.setContainerId(amContainer.getString("containerId")); + amRequest.setNodeHttpAddress( + amContainer.getString("nodeHttpAddress")); + amRequest.setNodeId(amContainer.getString("nodeId")); + requests.add(amRequest); } } } catch (Exception ex) { errorMessage = ex.getMessage(); - if (applicationFinished) { + if (request.isAppFinished()) { try { amContainersList = getAMContainerInfoForAHSWebService(conf, appId); if (amContainersList != null && !amContainersList.isEmpty()) { getAMContainerLists = true; for (JSONObject amContainer : amContainersList) { - AMLogsRequest request = new AMLogsRequest(applicationFinished); - request.setAmContainerId(amContainer.getString("amContainerId")); - requests.add(request); + ContainerLogsRequest amRequest = new ContainerLogsRequest( + request); + amRequest.setContainerId(amContainer.getString("amContainerId")); + requests.add(amRequest); } } } catch (Exception e) { @@ -479,9 +490,9 @@ public class LogsCLI extends Configured implements Tool { } if (amContainers.contains("ALL")) { - for (AMLogsRequest request : requests) { - outputAMContainerLogs(request, conf, appId, logFiles, logCliHelper, - appOwner); + for (ContainerLogsRequest amRequest : requests) { + outputAMContainerLogs(amRequest, conf, logFiles, + logCliHelper, localDir); } System.out.println(); System.out.println("Specified ALL for -am option. " @@ -490,12 +501,12 @@ public class LogsCLI extends Configured implements Tool { for (String amContainer : amContainers) { int amContainerId = Integer.parseInt(amContainer.trim()); if (amContainerId == -1) { - outputAMContainerLogs(requests.get(requests.size() - 1), conf, appId, - logFiles, logCliHelper, appOwner); + outputAMContainerLogs(requests.get(requests.size() - 1), conf, + logFiles, logCliHelper, localDir); } else { if (amContainerId <= requests.size()) { - outputAMContainerLogs(requests.get(amContainerId - 1), conf, appId, - logFiles, logCliHelper, appOwner); + outputAMContainerLogs(requests.get(amContainerId - 1), conf, + logFiles, logCliHelper, localDir); } } } @@ -503,11 +514,11 @@ public class LogsCLI extends Configured implements Tool { return 0; } - private void outputAMContainerLogs(AMLogsRequest request, Configuration conf, - String appId, String[] logFiles, LogCLIHelpers logCliHelper, - String appOwner) throws Exception { + private void outputAMContainerLogs(ContainerLogsRequest request, + Configuration conf, String[] logFiles, + LogCLIHelpers logCliHelper, String localDir) throws Exception { String nodeHttpAddress = request.getNodeHttpAddress(); - String containerId = request.getAmContainerId(); + String containerId = request.getContainerId(); String nodeId = request.getNodeId(); if (request.isAppFinished()) { @@ -516,6 +527,7 @@ public class LogsCLI extends Configured implements Tool { try { nodeId = getContainerReport(containerId).getAssignedNode().toString(); + request.setNodeId(nodeId); } catch (Exception ex) { System.err.println(ex); nodeId = null; @@ -526,8 +538,8 @@ public class LogsCLI extends Configured implements Tool { if(!fetchAllLogFiles(logFiles)) { requestedLogFilesList = logFiles; } - printContainerLogsForFinishedApplication(appId, containerId, nodeId, - requestedLogFilesList, logCliHelper, appOwner); + printContainerLogsForFinishedApplication(request, + requestedLogFilesList, logCliHelper, localDir); } } } else { @@ -540,36 +552,34 @@ public class LogsCLI extends Configured implements Tool { getContainerLogFiles(getConf(), containerId, nodeHttpAddress); } printContainerLogsFromRunningApplication(conf, - ContainerId.fromString(containerId), nodeHttpAddress, nodeId, - requestedLogFiles, logCliHelper, appOwner); + request, requestedLogFiles, logCliHelper, localDir); } } } - private int showMetaInfo(YarnApplicationState appState, - boolean appStateObtainedSuccessfully, LogCLIHelpers logCliHelper, - ApplicationId appId, String containerIdStr, String nodeAddress, - String appOwner) throws IOException { - if (!isApplicationFinished(appState) && appStateObtainedSuccessfully) { + private int showMetaInfo(ContainerLogsRequest request, + LogCLIHelpers logCliHelper) throws IOException { + if (!request.isAppFinished()) { System.err.println("The -show_meta_info command can be only used " + "with finished applications"); return -1; } else { - logCliHelper.printLogMetadata(appId, containerIdStr, nodeAddress, - appOwner, System.out, System.err); + logCliHelper.printLogMetadata(request.getAppId(), + request.getContainerId(), request.getNodeId(), + request.getAppOwner(), System.out, System.err); return 0; } } - private int showNodeLists(YarnApplicationState appState, - boolean appStateObtainedSuccessfully, LogCLIHelpers logCliHelper, - ApplicationId appId, String appOwner) throws IOException { - if (!isApplicationFinished(appState) && appStateObtainedSuccessfully) { + private int showNodeLists(ContainerLogsRequest request, + LogCLIHelpers logCliHelper) throws IOException { + if (!request.isAppFinished()) { System.err.println("The -list_nodes command can be only used with " + "finished applications"); return -1; } else { - logCliHelper.printNodesList(appId, appOwner, System.out, System.err); + logCliHelper.printNodesList(request.getAppId(), request.getAppOwner(), + System.out, System.err); return 0; } } @@ -619,11 +629,15 @@ public class LogsCLI extends Configured implements Tool { opts.addOption(LIST_NODES_OPTION, false, "Show the list of nodes that successfully aggregated logs. " + "This option can only be used with finished applications."); + opts.addOption(OUT_OPTION, true, "Local directory for storing individual " + + "container logs. The container logs will be stored based on the " + + "node the container ran on."); opts.getOption(APPLICATION_ID_OPTION).setArgName("Application ID"); opts.getOption(CONTAINER_ID_OPTION).setArgName("Container ID"); opts.getOption(NODE_ADDRESS_OPTION).setArgName("Node Address"); opts.getOption(APP_OWNER_OPTION).setArgName("Application Owner"); opts.getOption(AM_CONTAINER_OPTION).setArgName("AM Containers"); + opts.getOption(OUT_OPTION).setArgName("Local Directory"); return opts; } @@ -637,6 +651,7 @@ public class LogsCLI extends Configured implements Tool { printOpts.addOption(commandOpts.getOption(CONTAINER_LOG_FILES)); printOpts.addOption(commandOpts.getOption(SHOW_META_INFO)); printOpts.addOption(commandOpts.getOption(LIST_NODES_OPTION)); + printOpts.addOption(commandOpts.getOption(OUT_OPTION)); return printOpts; } @@ -671,10 +686,9 @@ public class LogsCLI extends Configured implements Tool { return amContainersList; } - private int fetchAMContainerLogs(String[] logFiles, - YarnApplicationState appState, ApplicationId appId, - String appOwner, List amContainersList, - LogCLIHelpers logCliHelper) throws Exception { + private int fetchAMContainerLogs(ContainerLogsRequest request, + List amContainersList, String[] logFiles, + LogCLIHelpers logCliHelper, String localDir) throws Exception { // if we do not specify the value for CONTAINER_LOG_FILES option, // we will only output syslog if (logFiles == null || logFiles.length == 0) { @@ -685,10 +699,9 @@ public class LogsCLI extends Configured implements Tool { // and containerId for all the AM Containers. // After that, we will call NodeManager webService to get the // related logs - if (appState == YarnApplicationState.ACCEPTED - || appState == YarnApplicationState.RUNNING) { - return printAMContainerLogs(getConf(), appId.toString(), amContainersList, - logFiles, logCliHelper, appOwner, false); + if (!request.isAppFinished()) { + return printAMContainerLogs(getConf(), request, amContainersList, + logFiles, logCliHelper, localDir); } else { // If the application is in the final state, we will call RM webservice // to get all AppAttempts information first. If we get nothing, @@ -698,9 +711,11 @@ public class LogsCLI extends Configured implements Tool { // to get logs from HDFS directly. if (getConf().getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) { - return printAMContainerLogs(getConf(), appId.toString(), - amContainersList, logFiles, logCliHelper, appOwner, true); + return printAMContainerLogs(getConf(), request, amContainersList, + logFiles, logCliHelper, localDir); } else { + ApplicationId appId = request.getAppId(); + String appOwner = request.getAppOwner(); System.err.println("Can not get AMContainers logs for " + "the application:" + appId + " with the appOwner:" + appOwner); System.err.println("This application:" + appId + " has finished." @@ -713,18 +728,18 @@ public class LogsCLI extends Configured implements Tool { } } - private int fetchContainerLogs(YarnApplicationState appState, - boolean appStateObtainedSuccessfully, String[] logFiles, - String appOwner, String nodeAddress, - ContainerId containerId, LogCLIHelpers logCliHelper) throws IOException { + private int fetchContainerLogs(ContainerLogsRequest request, + String[] logFiles, LogCLIHelpers logCliHelper, String localDir) + throws IOException { int resultCode = 0; - String appIdStr = containerId.getApplicationAttemptId() - .getApplicationId().toString(); - String containerIdStr = containerId.toString(); + String appIdStr = request.getAppId().toString(); + String containerIdStr = request.getContainerId(); + String nodeAddress = request.getNodeId(); + String appOwner = request.getAppOwner(); + boolean isAppFinished = request.isAppFinished(); // if we provide the node address and the application is in the final // state, we could directly get logs from HDFS. - if (nodeAddress != null && (!appStateObtainedSuccessfully || - isApplicationFinished(appState))) { + if (nodeAddress != null && isAppFinished) { // if user specified "ALL" as the logFiles param, pass null // to logCliHelper so that it fetches all the logs List logs; @@ -736,7 +751,7 @@ public class LogsCLI extends Configured implements Tool { logs = Arrays.asList(logFiles); } return logCliHelper.dumpAContainersLogsForALogType(appIdStr, - containerIdStr, nodeAddress, appOwner, logs); + containerIdStr, nodeAddress, appOwner, logs, localDir); } String nodeHttpAddress = null; String nodeId = null; @@ -749,16 +764,18 @@ public class LogsCLI extends Configured implements Tool { report.getNodeHttpAddress().replaceFirst( WebAppUtils.getHttpSchemePrefix(getConf()), ""); nodeId = report.getAssignedNode().toString(); + request.setNodeId(nodeId); + request.setNodeHttpAddress(nodeHttpAddress); } catch (IOException | YarnException ex) { - if (!appStateObtainedSuccessfully || isApplicationFinished(appState)) { + if (isAppFinished) { String[] requestedLogFiles = logFiles; if(fetchAllLogFiles(logFiles)) { requestedLogFiles = null; } return printContainerLogsForFinishedApplicationWithoutNodeId( appIdStr, containerIdStr, requestedLogFiles, logCliHelper, - appOwner); - } else if (!isApplicationFinished(appState)) { + appOwner, localDir); + } else { System.err.println("Unable to get logs for this container:" + containerIdStr + "for the application:" + appIdStr + " with the appOwner: " + appOwner); @@ -772,12 +789,12 @@ public class LogsCLI extends Configured implements Tool { // If the application is not in the final state, // we will provide the NodeHttpAddress and get the container logs // by calling NodeManager webservice. - if (!isApplicationFinished(appState)) { + if (!isAppFinished) { if (logFiles == null || logFiles.length == 0) { logFiles = new String[] {"syslog"}; } - printContainerLogsFromRunningApplication(getConf(), containerId, - nodeHttpAddress, nodeId, logFiles, logCliHelper, appOwner); + printContainerLogsFromRunningApplication(getConf(), request, + logFiles, logCliHelper, localDir); } else { String[] requestedLogFiles = logFiles; if(fetchAllLogFiles(logFiles)) { @@ -785,52 +802,21 @@ public class LogsCLI extends Configured implements Tool { } // If the application is in the final state, we will directly // get the container logs from HDFS. - printContainerLogsForFinishedApplication(appIdStr, containerIdStr, - nodeId, requestedLogFiles, logCliHelper, appOwner); + printContainerLogsForFinishedApplication(request, + requestedLogFiles, logCliHelper, localDir); } return resultCode; } - private static class AMLogsRequest { - private String amContainerId; - private String nodeId; - private String nodeHttpAddress; - private final boolean isAppFinished; - - AMLogsRequest(boolean isAppFinished) { - this.isAppFinished = isAppFinished; - this.setAmContainerId(""); - this.setNodeId(""); - this.setNodeHttpAddress(""); - } - - public String getAmContainerId() { - return amContainerId; - } - - public void setAmContainerId(String amContainerId) { - this.amContainerId = amContainerId; - } - - public String getNodeId() { - return nodeId; - } - - public void setNodeId(String nodeId) { - this.nodeId = nodeId; - } - - public String getNodeHttpAddress() { - return nodeHttpAddress; - } - - public void setNodeHttpAddress(String nodeHttpAddress) { - this.nodeHttpAddress = nodeHttpAddress; - } - - public boolean isAppFinished() { - return isAppFinished; + private int fetchApplicationLogs(ApplicationId appId, String appOwner, + LogCLIHelpers logCliHelper, String localDir) throws IOException { + int resultCode = + logCliHelper.dumpAllContainersLogs(appId, appOwner, localDir); + if (resultCode == -1) { + System.err.println("Can not find the logs for the application: " + + appId + " with the appOwner: " + appOwner); } + return resultCode; } private String guessAppOwner(ApplicationReport appReport, @@ -846,4 +832,81 @@ public class LogsCLI extends Configured implements Tool { } return appOwner; } + + private static class ContainerLogsRequest { + private ApplicationId appId; + private String containerId; + private String nodeId; + private String nodeHttpAddress; + private String appOwner; + private boolean appFinished; + + public ContainerLogsRequest(ContainerLogsRequest request) { + this.setAppId(request.getAppId()); + this.setAppFinished(request.isAppFinished()); + this.setAppOwner(request.getAppOwner()); + this.setNodeId(request.getNodeId()); + this.setNodeHttpAddress(request.getNodeHttpAddress()); + this.setContainerId(request.getContainerId()); + } + + public ContainerLogsRequest(ApplicationId applicationId, + boolean isAppFinished, String owner, + String address, String httpAddress, String container) { + this.setAppId(applicationId); + this.setAppFinished(isAppFinished); + this.setAppOwner(owner); + this.setNodeId(address); + this.setNodeHttpAddress(httpAddress); + this.setContainerId(container); + } + + public ApplicationId getAppId() { + return appId; + } + + public void setAppId(ApplicationId appId) { + this.appId = appId; + } + + public String getContainerId() { + return containerId; + } + + public void setContainerId(String containerId) { + this.containerId = containerId; + } + + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeAddress) { + this.nodeId = nodeAddress; + } + + public String getAppOwner() { + return appOwner; + } + + public void setAppOwner(String appOwner) { + this.appOwner = appOwner; + } + + public String getNodeHttpAddress() { + return nodeHttpAddress; + } + + public void setNodeHttpAddress(String nodeHttpAddress) { + this.nodeHttpAddress = nodeHttpAddress; + } + + public boolean isAppFinished() { + return appFinished; + } + + public void setAppFinished(boolean appFinished) { + this.appFinished = appFinished; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index 4f1c629d16b..d649ce78ec2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -26,11 +26,13 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.io.InputStreamReader; import java.io.PrintStream; import java.io.PrintWriter; import java.io.Writer; @@ -41,8 +43,9 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; - +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; @@ -180,6 +183,10 @@ public class TestLogsCLI { pw.println(" container log files. Use \"ALL\" to fetch"); pw.println(" all the log files for the container."); pw.println(" -nodeAddress NodeAddress in the format nodename:port"); + pw.println(" -out Local directory for storing individual"); + pw.println(" container logs. The container logs will"); + pw.println(" be stored based on the node the container"); + pw.println(" ran on."); pw.println(" -show_meta_info Show the log metadata, including log-file"); pw.println(" names, the size of the log files. You can"); pw.println(" combine this with --containerId to get"); @@ -492,6 +499,103 @@ public class TestLogsCLI { } } + @Test (timeout = 15000) + public void testSaveContainerLogsLocally() throws Exception { + String remoteLogRootDir = "target/logs/"; + String rootLogDir = "target/LocalLogs"; + String localDir = "target/SaveLogs"; + Path localPath = new Path(localDir); + + Configuration configuration = new Configuration(); + configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + configuration + .set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir); + configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); + configuration.set(YarnConfiguration.YARN_ADMIN_ACL, "admin"); + + FileSystem fs = FileSystem.get(configuration); + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + List containerIds = new ArrayList(); + ContainerId containerId1 = ContainerId.newContainerId( + appAttemptId, 1); + ContainerId containerId2 = ContainerId.newContainerId( + appAttemptId, 2); + containerIds.add(containerId1); + containerIds.add(containerId2); + + List nodeIds = new ArrayList(); + NodeId nodeId = NodeId.newInstance("localhost", 1234); + NodeId nodeId2 = NodeId.newInstance("test", 4567); + nodeIds.add(nodeId); + nodeIds.add(nodeId2); + + try { + createContainerLogs(configuration, remoteLogRootDir, rootLogDir, fs, + appId, containerIds, nodeIds); + + YarnClient mockYarnClient = + createMockYarnClient(YarnApplicationState.FINISHED, + UserGroupInformation.getCurrentUser().getShortUserName()); + LogsCLI cli = new LogsCLIForTest(mockYarnClient); + cli.setConf(configuration); + int exitCode = cli.run(new String[] {"-applicationId", + appId.toString(), + "-out" , localPath.toString()}); + assertTrue(exitCode == 0); + + // make sure we created a dir named as node id + FileStatus[] nodeDir = fs.listStatus(localPath); + Arrays.sort(nodeDir); + assertTrue(nodeDir.length == 2); + assertTrue(nodeDir[0].getPath().getName().contains( + LogAggregationUtils.getNodeString(nodeId))); + assertTrue(nodeDir[1].getPath().getName().contains( + LogAggregationUtils.getNodeString(nodeId2))); + + FileStatus[] container1Dir = fs.listStatus(nodeDir[0].getPath()); + assertTrue(container1Dir.length == 1); + assertTrue(container1Dir[0].getPath().getName().equals( + containerId1.toString())); + String container1= readContainerContent(container1Dir[0].getPath(), fs); + assertTrue(container1.contains("Hello " + containerId1 + + " in syslog!")); + + FileStatus[] container2Dir = fs.listStatus(nodeDir[1].getPath()); + assertTrue(container2Dir.length == 1); + assertTrue(container2Dir[0].getPath().getName().equals( + containerId2.toString())); + String container2= readContainerContent(container2Dir[0].getPath(), fs); + assertTrue(container2.contains("Hello " + containerId2 + + " in syslog!")); + } finally { + fs.delete(new Path(remoteLogRootDir), true); + fs.delete(new Path(rootLogDir), true); + fs.delete(localPath, true); + } + } + + private String readContainerContent(Path containerPath, + FileSystem fs) throws IOException { + assertTrue(fs.exists(containerPath)); + StringBuffer inputLine = new StringBuffer(); + BufferedReader reader = null; + try { + reader = new BufferedReader(new InputStreamReader( + fs.open(containerPath))); + String tmp; + while ((tmp = reader.readLine()) != null) { + inputLine.append(tmp); + } + return inputLine.toString(); + } finally { + if (reader != null) { + IOUtils.closeQuietly(reader); + } + } + } + @Test (timeout = 15000) public void testPrintContainerLogMetadata() throws Exception { String remoteLogRootDir = "target/logs/"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java index ba0dd892e6e..339df9d18bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java @@ -24,8 +24,10 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; import java.nio.file.AccessDeniedException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.List; - +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configurable; @@ -54,7 +56,7 @@ public class LogCLIHelpers implements Configurable { public int dumpAContainersLogs(String appId, String containerId, String nodeId, String jobOwner) throws IOException { return dumpAContainersLogsForALogType(appId, containerId, nodeId, jobOwner, - null); + null, null); } @Private @@ -107,17 +109,17 @@ public class LogCLIHelpers implements Configurable { @Private @VisibleForTesting public int dumpAContainersLogsForALogType(String appId, String containerId, - String nodeId, String jobOwner, List logType) + String nodeId, String jobOwner, List logType, String localDir) throws IOException { - return dumpAContainersLogsForALogType(appId, containerId, nodeId, jobOwner, - logType, true); + return dumpAContainersLogsForALogType(appId, containerId, nodeId, + jobOwner, logType, true, localDir); } @Private @VisibleForTesting public int dumpAContainersLogsForALogType(String appId, String containerId, String nodeId, String jobOwner, List logType, - boolean outputFailure) throws IOException { + boolean outputFailure, String localDir) throws IOException { ApplicationId applicationId = ConverterUtils.toApplicationId(appId); RemoteIterator nodeFiles = getRemoteNodeFileDir( applicationId, jobOwner); @@ -137,17 +139,21 @@ public class LogCLIHelpers implements Configurable { if (fileName.contains(LogAggregationUtils.getNodeString(nodeId)) && !fileName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { AggregatedLogFormat.LogReader reader = null; + PrintStream out = createPrintStream(localDir, fileName, containerId); try { + String containerString = "\n\nContainer: " + containerId; + out.println(containerString); + out.println(StringUtils.repeat("=", containerString.length())); reader = new AggregatedLogFormat.LogReader(getConf(), thisNodeFile.getPath()); if (logType == null) { - if (dumpAContainerLogs(containerId, reader, System.out, + if (dumpAContainerLogs(containerId, reader, out, thisNodeFile.getModificationTime()) > -1) { foundContainerLogs = true; } } else { - if (dumpAContainerLogsForALogType(containerId, reader, System.out, + if (dumpAContainerLogsForALogType(containerId, reader, out, thisNodeFile.getModificationTime(), logType) > -1) { foundContainerLogs = true; } @@ -156,11 +162,14 @@ public class LogCLIHelpers implements Configurable { if (reader != null) { reader.close(); } + closePrintStream(out); } } } - if (!foundContainerLogs && outputFailure) { - containerLogNotFound(containerId); + if (!foundContainerLogs) { + if (outputFailure) { + containerLogNotFound(containerId); + } return -1; } return 0; @@ -168,8 +177,8 @@ public class LogCLIHelpers implements Configurable { @Private public int dumpAContainersLogsForALogTypeWithoutNodeId(String appId, - String containerId, String jobOwner, List logType) - throws IOException { + String containerId, String jobOwner, List logType, + String localDir) throws IOException { ApplicationId applicationId = ConverterUtils.toApplicationId(appId); RemoteIterator nodeFiles = getRemoteNodeFileDir( applicationId, jobOwner); @@ -182,17 +191,28 @@ public class LogCLIHelpers implements Configurable { if (!thisNodeFile.getPath().getName().endsWith( LogAggregationUtils.TMP_FILE_SUFFIX)) { AggregatedLogFormat.LogReader reader = null; + PrintStream out = System.out; try { reader = new AggregatedLogFormat.LogReader(getConf(), thisNodeFile.getPath()); + if (getContainerLogsStream(containerId, reader) == null) { + continue; + } + reader = + new AggregatedLogFormat.LogReader(getConf(), + thisNodeFile.getPath()); + out = createPrintStream(localDir, thisNodeFile.getPath().getName(), + containerId); + out.println(containerId); + out.println(StringUtils.repeat("=", containerId.length())); if (logType == null) { - if (dumpAContainerLogs(containerId, reader, System.out, + if (dumpAContainerLogs(containerId, reader, out, thisNodeFile.getModificationTime()) > -1) { foundContainerLogs = true; } } else { - if (dumpAContainerLogsForALogType(containerId, reader, System.out, + if (dumpAContainerLogsForALogType(containerId, reader, out, thisNodeFile.getModificationTime(), logType) > -1) { foundContainerLogs = true; } @@ -201,6 +221,7 @@ public class LogCLIHelpers implements Configurable { if (reader != null) { reader.close(); } + closePrintStream(out); } } } @@ -210,19 +231,13 @@ public class LogCLIHelpers implements Configurable { } return 0; } + @Private public int dumpAContainerLogs(String containerIdStr, AggregatedLogFormat.LogReader reader, PrintStream out, long logUploadedTime) throws IOException { - DataInputStream valueStream; - LogKey key = new LogKey(); - valueStream = reader.next(key); - - while (valueStream != null && !key.toString().equals(containerIdStr)) { - // Next container - key = new LogKey(); - valueStream = reader.next(key); - } + DataInputStream valueStream = getContainerLogsStream( + containerIdStr, reader); if (valueStream == null) { return -1; @@ -244,10 +259,8 @@ public class LogCLIHelpers implements Configurable { return -1; } - @Private - public int dumpAContainerLogsForALogType(String containerIdStr, - AggregatedLogFormat.LogReader reader, PrintStream out, - long logUploadedTime, List logType) throws IOException { + private DataInputStream getContainerLogsStream(String containerIdStr, + AggregatedLogFormat.LogReader reader) throws IOException { DataInputStream valueStream; LogKey key = new LogKey(); valueStream = reader.next(key); @@ -257,7 +270,15 @@ public class LogCLIHelpers implements Configurable { key = new LogKey(); valueStream = reader.next(key); } + return valueStream; + } + @Private + public int dumpAContainerLogsForALogType(String containerIdStr, + AggregatedLogFormat.LogReader reader, PrintStream out, + long logUploadedTime, List logType) throws IOException { + DataInputStream valueStream = getContainerLogsStream( + containerIdStr, reader); if (valueStream == null) { return -1; } @@ -283,7 +304,7 @@ public class LogCLIHelpers implements Configurable { @Private public int dumpAllContainersLogs(ApplicationId appId, String appOwner, - PrintStream out) throws IOException { + String localDir) throws IOException { RemoteIterator nodeFiles = getRemoteNodeFileDir( appId, appOwner); if (nodeFiles == null) { @@ -310,20 +331,25 @@ public class LogCLIHelpers implements Configurable { valueStream = reader.next(key); while (valueStream != null) { - - String containerString = - "\n\nContainer: " + key + " on " - + thisNodeFile.getPath().getName(); - out.println(containerString); - out.println(StringUtils.repeat("=", containerString.length())); - while (true) { - try { - LogReader.readAContainerLogsForALogType(valueStream, out, - thisNodeFile.getModificationTime()); - foundAnyLogs = true; - } catch (EOFException eof) { - break; + PrintStream out = createPrintStream(localDir, + thisNodeFile.getPath().getName(), key.toString()); + try { + String containerString = + "\n\nContainer: " + key + " on " + + thisNodeFile.getPath().getName(); + out.println(containerString); + out.println(StringUtils.repeat("=", containerString.length())); + while (true) { + try { + LogReader.readAContainerLogsForALogType(valueStream, out, + thisNodeFile.getModificationTime()); + foundAnyLogs = true; + } catch (EOFException eof) { + break; + } } + } finally { + closePrintStream(out); } // Next container @@ -501,4 +527,24 @@ public class LogCLIHelpers implements Configurable { + "have permission to access " + remoteAppLogDir + ". Error message found: " + errorMessage); } + + @Private + public PrintStream createPrintStream(String localDir, String nodeId, + String containerId) throws IOException { + PrintStream out = System.out; + if(localDir != null && !localDir.isEmpty()) { + Path nodePath = new Path(localDir, LogAggregationUtils + .getNodeString(nodeId)); + Files.createDirectories(Paths.get(nodePath.toString())); + Path containerLogPath = new Path(nodePath, containerId); + out = new PrintStream(containerLogPath.toString(), "UTF-8"); + } + return out; + } + + public void closePrintStream(PrintStream out) { + if (out != System.out) { + IOUtils.closeQuietly(out); + } + } }