YARN-4913. Yarn logs should take a -out option to write to a directory. Contributed by Xuan Gong.

This commit is contained in:
Varun Vasudev 2016-05-18 22:44:33 +05:30
parent bad85f3e3a
commit ef1757790d
3 changed files with 431 additions and 218 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.client.cli;
import java.io.IOException;
import java.io.PrintStream;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
@ -81,15 +82,13 @@ public class LogsCLI extends Configured implements Tool {
private static final String CONTAINER_LOG_FILES = "logFiles";
private static final String SHOW_META_INFO = "show_meta_info";
private static final String LIST_NODES_OPTION = "list_nodes";
private static final String OUT_OPTION = "out";
public static final String HELP_CMD = "help";
@Override
public int run(String[] args) throws Exception {
Options opts = createCommandOpts();
Options printOpts = createPrintOpts(opts);
if (args.length < 1) {
printHelpMessage(printOpts);
return -1;
@ -108,6 +107,7 @@ public class LogsCLI extends Configured implements Tool {
boolean nodesList = false;
String[] logFiles = null;
List<String> amContainersList = new ArrayList<String>();
String localDir = null;
try {
CommandLine commandLine = parser.parse(opts, args, true);
appIdStr = commandLine.getOptionValue(APPLICATION_ID_OPTION);
@ -117,6 +117,7 @@ public class LogsCLI extends Configured implements Tool {
getAMContainerLogs = commandLine.hasOption(AM_CONTAINER_OPTION);
showMetaInfo = commandLine.hasOption(SHOW_META_INFO);
nodesList = commandLine.hasOption(LIST_NODES_OPTION);
localDir = commandLine.getOptionValue(OUT_OPTION);
if (getAMContainerLogs) {
try {
amContainersList = parseAMContainer(commandLine, printOpts);
@ -151,7 +152,6 @@ public class LogsCLI extends Configured implements Tool {
LogCLIHelpers logCliHelper = new LogCLIHelpers();
logCliHelper.setConf(getConf());
boolean appStateObtainedSuccessfully = true;
YarnApplicationState appState = YarnApplicationState.NEW;
ApplicationReport appReport = null;
try {
@ -164,7 +164,9 @@ public class LogsCLI extends Configured implements Tool {
return -1;
}
} catch (IOException | YarnException e) {
appStateObtainedSuccessfully = false;
// If we can not get appReport from either RM or ATS
// We will assume that this app has already finished.
appState = YarnApplicationState.FINISHED;
System.err.println("Unable to get ApplicationState."
+ " Attempting to fetch logs directly from the filesystem.");
}
@ -179,19 +181,22 @@ public class LogsCLI extends Configured implements Tool {
}
}
ContainerLogsRequest request = new ContainerLogsRequest(appId,
isApplicationFinished(appState), appOwner,
nodeAddress, null, containerIdStr);
if (showMetaInfo) {
return showMetaInfo(appState, appStateObtainedSuccessfully,
logCliHelper, appId, containerIdStr, nodeAddress, appOwner);
return showMetaInfo(request, logCliHelper);
}
if (nodesList) {
return showNodeLists(appState, appStateObtainedSuccessfully,
logCliHelper, appId, appOwner);
return showNodeLists(request, logCliHelper);
}
// To get am logs
if (getAMContainerLogs) {
return fetchAMContainerLogs(logFiles, appState, appId, appOwner,
amContainersList, logCliHelper);
return fetchAMContainerLogs(request, amContainersList,
logFiles, logCliHelper, localDir);
}
int resultCode = 0;
@ -203,16 +208,12 @@ public class LogsCLI extends Configured implements Tool {
+ " does not have the container:" + containerId);
return -1;
}
return fetchContainerLogs(appState, appStateObtainedSuccessfully,
logFiles, appOwner, nodeAddress, containerId, logCliHelper);
return fetchContainerLogs(request, logFiles,
logCliHelper, localDir);
} else {
if (nodeAddress == null) {
resultCode =
logCliHelper.dumpAllContainersLogs(appId, appOwner, System.out);
if (resultCode == -1) {
System.err.println("Can not find the logs for the application: "
+ appId + " with the appOwner: " + appOwner);
}
resultCode = fetchApplicationLogs(appId, appOwner,
logCliHelper, localDir);
} else {
System.err.println("Should at least provide ContainerId!");
printHelpMessage(printOpts);
@ -351,69 +352,74 @@ public class LogsCLI extends Configured implements Tool {
}
private void printContainerLogsFromRunningApplication(Configuration conf,
ContainerId containerId, String nodeHttpAddress,
String nodeId, String[] logFiles, LogCLIHelpers logCliHelper,
String appOwner) throws IOException {
String appId = containerId.getApplicationAttemptId()
.getApplicationId().toString();
String containerIdStr = containerId.toString();
ContainerLogsRequest request, String[] logFiles,
LogCLIHelpers logCliHelper, String localDir) throws IOException {
String appId = request.getAppId().toString();
String containerIdStr = request.getContainerId().toString();
String[] requestedLogFiles = logFiles;
// fetch all the log files for the container
if (fetchAllLogFiles(logFiles)) {
requestedLogFiles =
getContainerLogFiles(getConf(), containerIdStr, nodeHttpAddress);
}
Client webServiceClient = Client.create();
String containerString = "\n\nContainer: " + containerIdStr;
System.out.println(containerString);
System.out.println(StringUtils.repeat("=", containerString.length()));
for (String logFile : requestedLogFiles) {
System.out.println("LogType:" + logFile);
System.out.println("Log Upload Time:"
+ Times.format(System.currentTimeMillis()));
System.out.println("Log Contents:");
try {
WebResource webResource =
webServiceClient.resource(WebAppUtils.getHttpSchemePrefix(conf)
+ nodeHttpAddress);
ClientResponse response =
webResource.path("ws").path("v1").path("node")
.path("containerlogs").path(containerIdStr).path(logFile)
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
System.out.println(response.getEntity(String.class));
System.out.println("End of LogType:" + logFile);
} catch (ClientHandlerException | UniformInterfaceException ex) {
System.err.println("Can not find the log file:" + logFile
+ " for the container:" + containerIdStr + " in NodeManager:"
+ nodeId);
String nodeHttpAddress = request.getNodeHttpAddress();
String nodeId = request.getNodeId();
String appOwner = request.getAppOwner();
PrintStream out = logCliHelper.createPrintStream(localDir, nodeId,
containerIdStr);
try {
// fetch all the log files for the container
if (fetchAllLogFiles(logFiles)) {
requestedLogFiles =
getContainerLogFiles(getConf(), containerIdStr, nodeHttpAddress);
}
Client webServiceClient = Client.create();
String containerString = "\n\nContainer: " + containerIdStr;
out.println(containerString);
out.println(StringUtils.repeat("=", containerString.length()));
for (String logFile : requestedLogFiles) {
out.println("LogType:" + logFile);
out.println("Log Upload Time:"
+ Times.format(System.currentTimeMillis()));
out.println("Log Contents:");
try {
WebResource webResource =
webServiceClient.resource(WebAppUtils.getHttpSchemePrefix(conf)
+ nodeHttpAddress);
ClientResponse response =
webResource.path("ws").path("v1").path("node")
.path("containerlogs").path(containerIdStr).path(logFile)
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
out.println(response.getEntity(String.class));
out.println("End of LogType:" + logFile);
out.flush();
} catch (ClientHandlerException | UniformInterfaceException ex) {
System.err.println("Can not find the log file:" + logFile
+ " for the container:" + containerIdStr + " in NodeManager:"
+ nodeId);
}
}
} finally {
logCliHelper.closePrintStream(out);
}
// for the case, we have already uploaded partial logs in HDFS
logCliHelper.dumpAContainersLogsForALogType(appId, containerIdStr, nodeId,
appOwner, Arrays.asList(requestedLogFiles), false);
appOwner, Arrays.asList(requestedLogFiles), false, localDir);
}
private void printContainerLogsForFinishedApplication(String appId,
String containerId, String nodeAddress, String[] logFiles,
LogCLIHelpers logCliHelper, String appOwner) throws IOException {
String containerString = "\n\nContainer: " + containerId;
System.out.println(containerString);
System.out.println(StringUtils.repeat("=", containerString.length()));
logCliHelper.dumpAContainersLogsForALogType(appId, containerId,
nodeAddress, appOwner, logFiles != null ? Arrays.asList(logFiles)
: null);
private void printContainerLogsForFinishedApplication(
ContainerLogsRequest request, String[] logFiles,
LogCLIHelpers logCliHelper, String localDir)
throws IOException {
logCliHelper.dumpAContainersLogsForALogType(request.getAppId().toString(),
request.getContainerId().toString(), request.getNodeId(),
request.getAppOwner(), logFiles != null ? Arrays.asList(logFiles)
: null, localDir);
}
private int printContainerLogsForFinishedApplicationWithoutNodeId(
String appId, String containerId, String[] logFiles,
LogCLIHelpers logCliHelper, String appOwner) throws IOException {
String containerString = "\n\nContainer: " + containerId;
System.out.println(containerString);
System.out.println(StringUtils.repeat("=", containerString.length()));
LogCLIHelpers logCliHelper, String appOwner, String localDir)
throws IOException {
return logCliHelper.dumpAContainersLogsForALogTypeWithoutNodeId(appId,
containerId, appOwner, logFiles != null ?
Arrays.asList(logFiles) : null);
Arrays.asList(logFiles) : null, localDir);
}
private ContainerReport getContainerReport(String containerIdStr)
@ -433,36 +439,41 @@ public class LogsCLI extends Configured implements Tool {
|| appState == YarnApplicationState.KILLED;
}
private int printAMContainerLogs(Configuration conf, String appId,
List<String> amContainers, String[] logFiles, LogCLIHelpers logCliHelper,
String appOwner, boolean applicationFinished) throws Exception {
private int printAMContainerLogs(Configuration conf,
ContainerLogsRequest request, List<String> amContainers,
String[] logFiles, LogCLIHelpers logCliHelper, String localDir)
throws Exception {
List<JSONObject> amContainersList = null;
List<AMLogsRequest> requests = new ArrayList<AMLogsRequest>();
List<ContainerLogsRequest> requests =
new ArrayList<ContainerLogsRequest>();
boolean getAMContainerLists = false;
String appId = request.getAppId().toString();
String errorMessage = "";
try {
amContainersList = getAMContainerInfoForRMWebService(conf, appId);
if (amContainersList != null && !amContainersList.isEmpty()) {
getAMContainerLists = true;
for (JSONObject amContainer : amContainersList) {
AMLogsRequest request = new AMLogsRequest(applicationFinished);
request.setAmContainerId(amContainer.getString("containerId"));
request.setNodeHttpAddress(amContainer.getString("nodeHttpAddress"));
request.setNodeId(amContainer.getString("nodeId"));
requests.add(request);
ContainerLogsRequest amRequest = new ContainerLogsRequest(request);
amRequest.setContainerId(amContainer.getString("containerId"));
amRequest.setNodeHttpAddress(
amContainer.getString("nodeHttpAddress"));
amRequest.setNodeId(amContainer.getString("nodeId"));
requests.add(amRequest);
}
}
} catch (Exception ex) {
errorMessage = ex.getMessage();
if (applicationFinished) {
if (request.isAppFinished()) {
try {
amContainersList = getAMContainerInfoForAHSWebService(conf, appId);
if (amContainersList != null && !amContainersList.isEmpty()) {
getAMContainerLists = true;
for (JSONObject amContainer : amContainersList) {
AMLogsRequest request = new AMLogsRequest(applicationFinished);
request.setAmContainerId(amContainer.getString("amContainerId"));
requests.add(request);
ContainerLogsRequest amRequest = new ContainerLogsRequest(
request);
amRequest.setContainerId(amContainer.getString("amContainerId"));
requests.add(amRequest);
}
}
} catch (Exception e) {
@ -479,9 +490,9 @@ public class LogsCLI extends Configured implements Tool {
}
if (amContainers.contains("ALL")) {
for (AMLogsRequest request : requests) {
outputAMContainerLogs(request, conf, appId, logFiles, logCliHelper,
appOwner);
for (ContainerLogsRequest amRequest : requests) {
outputAMContainerLogs(amRequest, conf, logFiles,
logCliHelper, localDir);
}
System.out.println();
System.out.println("Specified ALL for -am option. "
@ -490,12 +501,12 @@ public class LogsCLI extends Configured implements Tool {
for (String amContainer : amContainers) {
int amContainerId = Integer.parseInt(amContainer.trim());
if (amContainerId == -1) {
outputAMContainerLogs(requests.get(requests.size() - 1), conf, appId,
logFiles, logCliHelper, appOwner);
outputAMContainerLogs(requests.get(requests.size() - 1), conf,
logFiles, logCliHelper, localDir);
} else {
if (amContainerId <= requests.size()) {
outputAMContainerLogs(requests.get(amContainerId - 1), conf, appId,
logFiles, logCliHelper, appOwner);
outputAMContainerLogs(requests.get(amContainerId - 1), conf,
logFiles, logCliHelper, localDir);
}
}
}
@ -503,11 +514,11 @@ public class LogsCLI extends Configured implements Tool {
return 0;
}
private void outputAMContainerLogs(AMLogsRequest request, Configuration conf,
String appId, String[] logFiles, LogCLIHelpers logCliHelper,
String appOwner) throws Exception {
private void outputAMContainerLogs(ContainerLogsRequest request,
Configuration conf, String[] logFiles,
LogCLIHelpers logCliHelper, String localDir) throws Exception {
String nodeHttpAddress = request.getNodeHttpAddress();
String containerId = request.getAmContainerId();
String containerId = request.getContainerId();
String nodeId = request.getNodeId();
if (request.isAppFinished()) {
@ -516,6 +527,7 @@ public class LogsCLI extends Configured implements Tool {
try {
nodeId =
getContainerReport(containerId).getAssignedNode().toString();
request.setNodeId(nodeId);
} catch (Exception ex) {
System.err.println(ex);
nodeId = null;
@ -526,8 +538,8 @@ public class LogsCLI extends Configured implements Tool {
if(!fetchAllLogFiles(logFiles)) {
requestedLogFilesList = logFiles;
}
printContainerLogsForFinishedApplication(appId, containerId, nodeId,
requestedLogFilesList, logCliHelper, appOwner);
printContainerLogsForFinishedApplication(request,
requestedLogFilesList, logCliHelper, localDir);
}
}
} else {
@ -540,36 +552,34 @@ public class LogsCLI extends Configured implements Tool {
getContainerLogFiles(getConf(), containerId, nodeHttpAddress);
}
printContainerLogsFromRunningApplication(conf,
ContainerId.fromString(containerId), nodeHttpAddress, nodeId,
requestedLogFiles, logCliHelper, appOwner);
request, requestedLogFiles, logCliHelper, localDir);
}
}
}
private int showMetaInfo(YarnApplicationState appState,
boolean appStateObtainedSuccessfully, LogCLIHelpers logCliHelper,
ApplicationId appId, String containerIdStr, String nodeAddress,
String appOwner) throws IOException {
if (!isApplicationFinished(appState) && appStateObtainedSuccessfully) {
private int showMetaInfo(ContainerLogsRequest request,
LogCLIHelpers logCliHelper) throws IOException {
if (!request.isAppFinished()) {
System.err.println("The -show_meta_info command can be only used "
+ "with finished applications");
return -1;
} else {
logCliHelper.printLogMetadata(appId, containerIdStr, nodeAddress,
appOwner, System.out, System.err);
logCliHelper.printLogMetadata(request.getAppId(),
request.getContainerId(), request.getNodeId(),
request.getAppOwner(), System.out, System.err);
return 0;
}
}
private int showNodeLists(YarnApplicationState appState,
boolean appStateObtainedSuccessfully, LogCLIHelpers logCliHelper,
ApplicationId appId, String appOwner) throws IOException {
if (!isApplicationFinished(appState) && appStateObtainedSuccessfully) {
private int showNodeLists(ContainerLogsRequest request,
LogCLIHelpers logCliHelper) throws IOException {
if (!request.isAppFinished()) {
System.err.println("The -list_nodes command can be only used with "
+ "finished applications");
return -1;
} else {
logCliHelper.printNodesList(appId, appOwner, System.out, System.err);
logCliHelper.printNodesList(request.getAppId(), request.getAppOwner(),
System.out, System.err);
return 0;
}
}
@ -619,11 +629,15 @@ public class LogsCLI extends Configured implements Tool {
opts.addOption(LIST_NODES_OPTION, false,
"Show the list of nodes that successfully aggregated logs. "
+ "This option can only be used with finished applications.");
opts.addOption(OUT_OPTION, true, "Local directory for storing individual "
+ "container logs. The container logs will be stored based on the "
+ "node the container ran on.");
opts.getOption(APPLICATION_ID_OPTION).setArgName("Application ID");
opts.getOption(CONTAINER_ID_OPTION).setArgName("Container ID");
opts.getOption(NODE_ADDRESS_OPTION).setArgName("Node Address");
opts.getOption(APP_OWNER_OPTION).setArgName("Application Owner");
opts.getOption(AM_CONTAINER_OPTION).setArgName("AM Containers");
opts.getOption(OUT_OPTION).setArgName("Local Directory");
return opts;
}
@ -637,6 +651,7 @@ public class LogsCLI extends Configured implements Tool {
printOpts.addOption(commandOpts.getOption(CONTAINER_LOG_FILES));
printOpts.addOption(commandOpts.getOption(SHOW_META_INFO));
printOpts.addOption(commandOpts.getOption(LIST_NODES_OPTION));
printOpts.addOption(commandOpts.getOption(OUT_OPTION));
return printOpts;
}
@ -671,10 +686,9 @@ public class LogsCLI extends Configured implements Tool {
return amContainersList;
}
private int fetchAMContainerLogs(String[] logFiles,
YarnApplicationState appState, ApplicationId appId,
String appOwner, List<String> amContainersList,
LogCLIHelpers logCliHelper) throws Exception {
private int fetchAMContainerLogs(ContainerLogsRequest request,
List<String> amContainersList, String[] logFiles,
LogCLIHelpers logCliHelper, String localDir) throws Exception {
// if we do not specify the value for CONTAINER_LOG_FILES option,
// we will only output syslog
if (logFiles == null || logFiles.length == 0) {
@ -685,10 +699,9 @@ public class LogsCLI extends Configured implements Tool {
// and containerId for all the AM Containers.
// After that, we will call NodeManager webService to get the
// related logs
if (appState == YarnApplicationState.ACCEPTED
|| appState == YarnApplicationState.RUNNING) {
return printAMContainerLogs(getConf(), appId.toString(), amContainersList,
logFiles, logCliHelper, appOwner, false);
if (!request.isAppFinished()) {
return printAMContainerLogs(getConf(), request, amContainersList,
logFiles, logCliHelper, localDir);
} else {
// If the application is in the final state, we will call RM webservice
// to get all AppAttempts information first. If we get nothing,
@ -698,9 +711,11 @@ public class LogsCLI extends Configured implements Tool {
// to get logs from HDFS directly.
if (getConf().getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
return printAMContainerLogs(getConf(), appId.toString(),
amContainersList, logFiles, logCliHelper, appOwner, true);
return printAMContainerLogs(getConf(), request, amContainersList,
logFiles, logCliHelper, localDir);
} else {
ApplicationId appId = request.getAppId();
String appOwner = request.getAppOwner();
System.err.println("Can not get AMContainers logs for "
+ "the application:" + appId + " with the appOwner:" + appOwner);
System.err.println("This application:" + appId + " has finished."
@ -713,18 +728,18 @@ public class LogsCLI extends Configured implements Tool {
}
}
private int fetchContainerLogs(YarnApplicationState appState,
boolean appStateObtainedSuccessfully, String[] logFiles,
String appOwner, String nodeAddress,
ContainerId containerId, LogCLIHelpers logCliHelper) throws IOException {
private int fetchContainerLogs(ContainerLogsRequest request,
String[] logFiles, LogCLIHelpers logCliHelper, String localDir)
throws IOException {
int resultCode = 0;
String appIdStr = containerId.getApplicationAttemptId()
.getApplicationId().toString();
String containerIdStr = containerId.toString();
String appIdStr = request.getAppId().toString();
String containerIdStr = request.getContainerId();
String nodeAddress = request.getNodeId();
String appOwner = request.getAppOwner();
boolean isAppFinished = request.isAppFinished();
// if we provide the node address and the application is in the final
// state, we could directly get logs from HDFS.
if (nodeAddress != null && (!appStateObtainedSuccessfully ||
isApplicationFinished(appState))) {
if (nodeAddress != null && isAppFinished) {
// if user specified "ALL" as the logFiles param, pass null
// to logCliHelper so that it fetches all the logs
List<String> logs;
@ -736,7 +751,7 @@ public class LogsCLI extends Configured implements Tool {
logs = Arrays.asList(logFiles);
}
return logCliHelper.dumpAContainersLogsForALogType(appIdStr,
containerIdStr, nodeAddress, appOwner, logs);
containerIdStr, nodeAddress, appOwner, logs, localDir);
}
String nodeHttpAddress = null;
String nodeId = null;
@ -749,16 +764,18 @@ public class LogsCLI extends Configured implements Tool {
report.getNodeHttpAddress().replaceFirst(
WebAppUtils.getHttpSchemePrefix(getConf()), "");
nodeId = report.getAssignedNode().toString();
request.setNodeId(nodeId);
request.setNodeHttpAddress(nodeHttpAddress);
} catch (IOException | YarnException ex) {
if (!appStateObtainedSuccessfully || isApplicationFinished(appState)) {
if (isAppFinished) {
String[] requestedLogFiles = logFiles;
if(fetchAllLogFiles(logFiles)) {
requestedLogFiles = null;
}
return printContainerLogsForFinishedApplicationWithoutNodeId(
appIdStr, containerIdStr, requestedLogFiles, logCliHelper,
appOwner);
} else if (!isApplicationFinished(appState)) {
appOwner, localDir);
} else {
System.err.println("Unable to get logs for this container:"
+ containerIdStr + "for the application:" + appIdStr
+ " with the appOwner: " + appOwner);
@ -772,12 +789,12 @@ public class LogsCLI extends Configured implements Tool {
// If the application is not in the final state,
// we will provide the NodeHttpAddress and get the container logs
// by calling NodeManager webservice.
if (!isApplicationFinished(appState)) {
if (!isAppFinished) {
if (logFiles == null || logFiles.length == 0) {
logFiles = new String[] {"syslog"};
}
printContainerLogsFromRunningApplication(getConf(), containerId,
nodeHttpAddress, nodeId, logFiles, logCliHelper, appOwner);
printContainerLogsFromRunningApplication(getConf(), request,
logFiles, logCliHelper, localDir);
} else {
String[] requestedLogFiles = logFiles;
if(fetchAllLogFiles(logFiles)) {
@ -785,52 +802,21 @@ public class LogsCLI extends Configured implements Tool {
}
// If the application is in the final state, we will directly
// get the container logs from HDFS.
printContainerLogsForFinishedApplication(appIdStr, containerIdStr,
nodeId, requestedLogFiles, logCliHelper, appOwner);
printContainerLogsForFinishedApplication(request,
requestedLogFiles, logCliHelper, localDir);
}
return resultCode;
}
private static class AMLogsRequest {
private String amContainerId;
private String nodeId;
private String nodeHttpAddress;
private final boolean isAppFinished;
AMLogsRequest(boolean isAppFinished) {
this.isAppFinished = isAppFinished;
this.setAmContainerId("");
this.setNodeId("");
this.setNodeHttpAddress("");
}
public String getAmContainerId() {
return amContainerId;
}
public void setAmContainerId(String amContainerId) {
this.amContainerId = amContainerId;
}
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
public String getNodeHttpAddress() {
return nodeHttpAddress;
}
public void setNodeHttpAddress(String nodeHttpAddress) {
this.nodeHttpAddress = nodeHttpAddress;
}
public boolean isAppFinished() {
return isAppFinished;
private int fetchApplicationLogs(ApplicationId appId, String appOwner,
LogCLIHelpers logCliHelper, String localDir) throws IOException {
int resultCode =
logCliHelper.dumpAllContainersLogs(appId, appOwner, localDir);
if (resultCode == -1) {
System.err.println("Can not find the logs for the application: "
+ appId + " with the appOwner: " + appOwner);
}
return resultCode;
}
private String guessAppOwner(ApplicationReport appReport,
@ -846,4 +832,81 @@ public class LogsCLI extends Configured implements Tool {
}
return appOwner;
}
private static class ContainerLogsRequest {
private ApplicationId appId;
private String containerId;
private String nodeId;
private String nodeHttpAddress;
private String appOwner;
private boolean appFinished;
public ContainerLogsRequest(ContainerLogsRequest request) {
this.setAppId(request.getAppId());
this.setAppFinished(request.isAppFinished());
this.setAppOwner(request.getAppOwner());
this.setNodeId(request.getNodeId());
this.setNodeHttpAddress(request.getNodeHttpAddress());
this.setContainerId(request.getContainerId());
}
public ContainerLogsRequest(ApplicationId applicationId,
boolean isAppFinished, String owner,
String address, String httpAddress, String container) {
this.setAppId(applicationId);
this.setAppFinished(isAppFinished);
this.setAppOwner(owner);
this.setNodeId(address);
this.setNodeHttpAddress(httpAddress);
this.setContainerId(container);
}
public ApplicationId getAppId() {
return appId;
}
public void setAppId(ApplicationId appId) {
this.appId = appId;
}
public String getContainerId() {
return containerId;
}
public void setContainerId(String containerId) {
this.containerId = containerId;
}
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeAddress) {
this.nodeId = nodeAddress;
}
public String getAppOwner() {
return appOwner;
}
public void setAppOwner(String appOwner) {
this.appOwner = appOwner;
}
public String getNodeHttpAddress() {
return nodeHttpAddress;
}
public void setNodeHttpAddress(String nodeHttpAddress) {
this.nodeHttpAddress = nodeHttpAddress;
}
public boolean isAppFinished() {
return appFinished;
}
public void setAppFinished(boolean appFinished) {
this.appFinished = appFinished;
}
}
}

View File

@ -26,11 +26,13 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.Writer;
@ -41,8 +43,9 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
@ -180,6 +183,10 @@ public class TestLogsCLI {
pw.println(" container log files. Use \"ALL\" to fetch");
pw.println(" all the log files for the container.");
pw.println(" -nodeAddress <Node Address> NodeAddress in the format nodename:port");
pw.println(" -out <Local Directory> Local directory for storing individual");
pw.println(" container logs. The container logs will");
pw.println(" be stored based on the node the container");
pw.println(" ran on.");
pw.println(" -show_meta_info Show the log metadata, including log-file");
pw.println(" names, the size of the log files. You can");
pw.println(" combine this with --containerId to get");
@ -492,6 +499,103 @@ public class TestLogsCLI {
}
}
@Test (timeout = 15000)
public void testSaveContainerLogsLocally() throws Exception {
String remoteLogRootDir = "target/logs/";
String rootLogDir = "target/LocalLogs";
String localDir = "target/SaveLogs";
Path localPath = new Path(localDir);
Configuration configuration = new Configuration();
configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
configuration
.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir);
configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
configuration.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
FileSystem fs = FileSystem.get(configuration);
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
List<ContainerId> containerIds = new ArrayList<ContainerId>();
ContainerId containerId1 = ContainerId.newContainerId(
appAttemptId, 1);
ContainerId containerId2 = ContainerId.newContainerId(
appAttemptId, 2);
containerIds.add(containerId1);
containerIds.add(containerId2);
List<NodeId> nodeIds = new ArrayList<NodeId>();
NodeId nodeId = NodeId.newInstance("localhost", 1234);
NodeId nodeId2 = NodeId.newInstance("test", 4567);
nodeIds.add(nodeId);
nodeIds.add(nodeId2);
try {
createContainerLogs(configuration, remoteLogRootDir, rootLogDir, fs,
appId, containerIds, nodeIds);
YarnClient mockYarnClient =
createMockYarnClient(YarnApplicationState.FINISHED,
UserGroupInformation.getCurrentUser().getShortUserName());
LogsCLI cli = new LogsCLIForTest(mockYarnClient);
cli.setConf(configuration);
int exitCode = cli.run(new String[] {"-applicationId",
appId.toString(),
"-out" , localPath.toString()});
assertTrue(exitCode == 0);
// make sure we created a dir named as node id
FileStatus[] nodeDir = fs.listStatus(localPath);
Arrays.sort(nodeDir);
assertTrue(nodeDir.length == 2);
assertTrue(nodeDir[0].getPath().getName().contains(
LogAggregationUtils.getNodeString(nodeId)));
assertTrue(nodeDir[1].getPath().getName().contains(
LogAggregationUtils.getNodeString(nodeId2)));
FileStatus[] container1Dir = fs.listStatus(nodeDir[0].getPath());
assertTrue(container1Dir.length == 1);
assertTrue(container1Dir[0].getPath().getName().equals(
containerId1.toString()));
String container1= readContainerContent(container1Dir[0].getPath(), fs);
assertTrue(container1.contains("Hello " + containerId1
+ " in syslog!"));
FileStatus[] container2Dir = fs.listStatus(nodeDir[1].getPath());
assertTrue(container2Dir.length == 1);
assertTrue(container2Dir[0].getPath().getName().equals(
containerId2.toString()));
String container2= readContainerContent(container2Dir[0].getPath(), fs);
assertTrue(container2.contains("Hello " + containerId2
+ " in syslog!"));
} finally {
fs.delete(new Path(remoteLogRootDir), true);
fs.delete(new Path(rootLogDir), true);
fs.delete(localPath, true);
}
}
private String readContainerContent(Path containerPath,
FileSystem fs) throws IOException {
assertTrue(fs.exists(containerPath));
StringBuffer inputLine = new StringBuffer();
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(
fs.open(containerPath)));
String tmp;
while ((tmp = reader.readLine()) != null) {
inputLine.append(tmp);
}
return inputLine.toString();
} finally {
if (reader != null) {
IOUtils.closeQuietly(reader);
}
}
}
@Test (timeout = 15000)
public void testPrintContainerLogMetadata() throws Exception {
String remoteLogRootDir = "target/logs/";

View File

@ -24,8 +24,10 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.AccessDeniedException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configurable;
@ -54,7 +56,7 @@ public class LogCLIHelpers implements Configurable {
public int dumpAContainersLogs(String appId, String containerId,
String nodeId, String jobOwner) throws IOException {
return dumpAContainersLogsForALogType(appId, containerId, nodeId, jobOwner,
null);
null, null);
}
@Private
@ -107,17 +109,17 @@ public class LogCLIHelpers implements Configurable {
@Private
@VisibleForTesting
public int dumpAContainersLogsForALogType(String appId, String containerId,
String nodeId, String jobOwner, List<String> logType)
String nodeId, String jobOwner, List<String> logType, String localDir)
throws IOException {
return dumpAContainersLogsForALogType(appId, containerId, nodeId, jobOwner,
logType, true);
return dumpAContainersLogsForALogType(appId, containerId, nodeId,
jobOwner, logType, true, localDir);
}
@Private
@VisibleForTesting
public int dumpAContainersLogsForALogType(String appId, String containerId,
String nodeId, String jobOwner, List<String> logType,
boolean outputFailure) throws IOException {
boolean outputFailure, String localDir) throws IOException {
ApplicationId applicationId = ConverterUtils.toApplicationId(appId);
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
applicationId, jobOwner);
@ -137,17 +139,21 @@ public class LogCLIHelpers implements Configurable {
if (fileName.contains(LogAggregationUtils.getNodeString(nodeId))
&& !fileName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader = null;
PrintStream out = createPrintStream(localDir, fileName, containerId);
try {
String containerString = "\n\nContainer: " + containerId;
out.println(containerString);
out.println(StringUtils.repeat("=", containerString.length()));
reader =
new AggregatedLogFormat.LogReader(getConf(),
thisNodeFile.getPath());
if (logType == null) {
if (dumpAContainerLogs(containerId, reader, System.out,
if (dumpAContainerLogs(containerId, reader, out,
thisNodeFile.getModificationTime()) > -1) {
foundContainerLogs = true;
}
} else {
if (dumpAContainerLogsForALogType(containerId, reader, System.out,
if (dumpAContainerLogsForALogType(containerId, reader, out,
thisNodeFile.getModificationTime(), logType) > -1) {
foundContainerLogs = true;
}
@ -156,11 +162,14 @@ public class LogCLIHelpers implements Configurable {
if (reader != null) {
reader.close();
}
closePrintStream(out);
}
}
}
if (!foundContainerLogs && outputFailure) {
containerLogNotFound(containerId);
if (!foundContainerLogs) {
if (outputFailure) {
containerLogNotFound(containerId);
}
return -1;
}
return 0;
@ -168,8 +177,8 @@ public class LogCLIHelpers implements Configurable {
@Private
public int dumpAContainersLogsForALogTypeWithoutNodeId(String appId,
String containerId, String jobOwner, List<String> logType)
throws IOException {
String containerId, String jobOwner, List<String> logType,
String localDir) throws IOException {
ApplicationId applicationId = ConverterUtils.toApplicationId(appId);
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
applicationId, jobOwner);
@ -182,17 +191,28 @@ public class LogCLIHelpers implements Configurable {
if (!thisNodeFile.getPath().getName().endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader = null;
PrintStream out = System.out;
try {
reader =
new AggregatedLogFormat.LogReader(getConf(),
thisNodeFile.getPath());
if (getContainerLogsStream(containerId, reader) == null) {
continue;
}
reader =
new AggregatedLogFormat.LogReader(getConf(),
thisNodeFile.getPath());
out = createPrintStream(localDir, thisNodeFile.getPath().getName(),
containerId);
out.println(containerId);
out.println(StringUtils.repeat("=", containerId.length()));
if (logType == null) {
if (dumpAContainerLogs(containerId, reader, System.out,
if (dumpAContainerLogs(containerId, reader, out,
thisNodeFile.getModificationTime()) > -1) {
foundContainerLogs = true;
}
} else {
if (dumpAContainerLogsForALogType(containerId, reader, System.out,
if (dumpAContainerLogsForALogType(containerId, reader, out,
thisNodeFile.getModificationTime(), logType) > -1) {
foundContainerLogs = true;
}
@ -201,6 +221,7 @@ public class LogCLIHelpers implements Configurable {
if (reader != null) {
reader.close();
}
closePrintStream(out);
}
}
}
@ -210,19 +231,13 @@ public class LogCLIHelpers implements Configurable {
}
return 0;
}
@Private
public int dumpAContainerLogs(String containerIdStr,
AggregatedLogFormat.LogReader reader, PrintStream out,
long logUploadedTime) throws IOException {
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
while (valueStream != null && !key.toString().equals(containerIdStr)) {
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
DataInputStream valueStream = getContainerLogsStream(
containerIdStr, reader);
if (valueStream == null) {
return -1;
@ -244,10 +259,8 @@ public class LogCLIHelpers implements Configurable {
return -1;
}
@Private
public int dumpAContainerLogsForALogType(String containerIdStr,
AggregatedLogFormat.LogReader reader, PrintStream out,
long logUploadedTime, List<String> logType) throws IOException {
private DataInputStream getContainerLogsStream(String containerIdStr,
AggregatedLogFormat.LogReader reader) throws IOException {
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
@ -257,7 +270,15 @@ public class LogCLIHelpers implements Configurable {
key = new LogKey();
valueStream = reader.next(key);
}
return valueStream;
}
@Private
public int dumpAContainerLogsForALogType(String containerIdStr,
AggregatedLogFormat.LogReader reader, PrintStream out,
long logUploadedTime, List<String> logType) throws IOException {
DataInputStream valueStream = getContainerLogsStream(
containerIdStr, reader);
if (valueStream == null) {
return -1;
}
@ -283,7 +304,7 @@ public class LogCLIHelpers implements Configurable {
@Private
public int dumpAllContainersLogs(ApplicationId appId, String appOwner,
PrintStream out) throws IOException {
String localDir) throws IOException {
RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
appId, appOwner);
if (nodeFiles == null) {
@ -310,20 +331,25 @@ public class LogCLIHelpers implements Configurable {
valueStream = reader.next(key);
while (valueStream != null) {
String containerString =
"\n\nContainer: " + key + " on "
+ thisNodeFile.getPath().getName();
out.println(containerString);
out.println(StringUtils.repeat("=", containerString.length()));
while (true) {
try {
LogReader.readAContainerLogsForALogType(valueStream, out,
thisNodeFile.getModificationTime());
foundAnyLogs = true;
} catch (EOFException eof) {
break;
PrintStream out = createPrintStream(localDir,
thisNodeFile.getPath().getName(), key.toString());
try {
String containerString =
"\n\nContainer: " + key + " on "
+ thisNodeFile.getPath().getName();
out.println(containerString);
out.println(StringUtils.repeat("=", containerString.length()));
while (true) {
try {
LogReader.readAContainerLogsForALogType(valueStream, out,
thisNodeFile.getModificationTime());
foundAnyLogs = true;
} catch (EOFException eof) {
break;
}
}
} finally {
closePrintStream(out);
}
// Next container
@ -501,4 +527,24 @@ public class LogCLIHelpers implements Configurable {
+ "have permission to access " + remoteAppLogDir
+ ". Error message found: " + errorMessage);
}
@Private
public PrintStream createPrintStream(String localDir, String nodeId,
String containerId) throws IOException {
PrintStream out = System.out;
if(localDir != null && !localDir.isEmpty()) {
Path nodePath = new Path(localDir, LogAggregationUtils
.getNodeString(nodeId));
Files.createDirectories(Paths.get(nodePath.toString()));
Path containerLogPath = new Path(nodePath, containerId);
out = new PrintStream(containerLogPath.toString(), "UTF-8");
}
return out;
}
public void closePrintStream(PrintStream out) {
if (out != System.out) {
IOUtils.closeQuietly(out);
}
}
}