YARN-6313. YARN logs cli should provide logs for a completed container even when application is still running. Contributed by Xuan Gong.

(cherry picked from commit b88f5e0f7858d1d89b79dfd325b767c34416052d)
This commit is contained in:
Junping Du 2017-03-14 12:56:54 -07:00
parent f254002f1d
commit 0e7879052a
3 changed files with 160 additions and 54 deletions

View File

@ -44,6 +44,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException; import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers; import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo; import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
@ -409,10 +411,11 @@ public class LogsCLI extends Configured implements Tool {
return false; return false;
} }
private List<PerContainerLogFileInfo> getContainerLogFiles( private List<Pair<PerContainerLogFileInfo, String>> getContainerLogFiles(
Configuration conf, String containerIdStr, String nodeHttpAddress) Configuration conf, String containerIdStr, String nodeHttpAddress)
throws IOException { throws IOException {
List<PerContainerLogFileInfo> logFileInfos = new ArrayList<>(); List<Pair<PerContainerLogFileInfo, String>> logFileInfos
= new ArrayList<>();
Client webServiceClient = Client.create(); Client webServiceClient = Client.create();
try { try {
WebResource webResource = webServiceClient WebResource webResource = webServiceClient
@ -439,16 +442,20 @@ public class LogsCLI extends Configured implements Tool {
} }
for (int i = 0; i < array.length(); i++) { for (int i = 0; i < array.length(); i++) {
JSONObject log = array.getJSONObject(i); JSONObject log = array.getJSONObject(i);
String aggregateType = log.has("logAggregationType") ?
log.getString("logAggregationType") : "N/A";
Object ob = log.get("containerLogInfo"); Object ob = log.get("containerLogInfo");
if (ob instanceof JSONArray) { if (ob instanceof JSONArray) {
JSONArray obArray = (JSONArray)ob; JSONArray obArray = (JSONArray)ob;
for (int j = 0; j < obArray.length(); j++) { for (int j = 0; j < obArray.length(); j++) {
logFileInfos.add(generatePerContainerLogFileInfoFromJSON( logFileInfos.add(new Pair<PerContainerLogFileInfo, String>(
obArray.getJSONObject(j))); generatePerContainerLogFileInfoFromJSON(
obArray.getJSONObject(j)), aggregateType));
} }
} else if (ob instanceof JSONObject) { } else if (ob instanceof JSONObject) {
logFileInfos.add(generatePerContainerLogFileInfoFromJSON( logFileInfos.add(new Pair<PerContainerLogFileInfo, String>(
(JSONObject)ob)); generatePerContainerLogFileInfoFromJSON(
(JSONObject)ob), aggregateType));
} }
} }
} catch (Exception e) { } catch (Exception e) {
@ -543,10 +550,8 @@ public class LogsCLI extends Configured implements Tool {
IOUtils.closeQuietly(is); IOUtils.closeQuietly(is);
} }
} }
// for the case, we have already uploaded partial logs in HDFS
int result = logCliHelper.dumpAContainerLogsForLogType( if (foundAnyLogs) {
newOptions, false);
if (result == 0 || foundAnyLogs) {
return 0; return 0;
} else { } else {
return -1; return -1;
@ -587,6 +592,19 @@ public class LogsCLI extends Configured implements Tool {
newOptions); newOptions);
} }
private int printAggregatedContainerLogs(ContainerLogsRequest request,
LogCLIHelpers logCliHelper, boolean useRegex) throws IOException {
return printContainerLogsForFinishedApplication(request,
logCliHelper, useRegex);
}
private int printAggregatedContainerLogsWithoutNodeId(
ContainerLogsRequest request, LogCLIHelpers logCliHelper,
boolean useRegex) throws IOException {
return printContainerLogsForFinishedApplicationWithoutNodeId(request,
logCliHelper, useRegex);
}
@Private @Private
@VisibleForTesting @VisibleForTesting
public ContainerReport getContainerReport(String containerIdStr) public ContainerReport getContainerReport(String containerIdStr)
@ -724,9 +742,10 @@ public class LogsCLI extends Configured implements Tool {
} }
private int showContainerLogInfo(ContainerLogsRequest request, private int showContainerLogInfo(ContainerLogsRequest request,
LogCLIHelpers logCliHelper) throws IOException, YarnException { LogCLIHelpers logCliHelper) throws IOException, YarnException,
ClientHandlerException, UniformInterfaceException, JSONException {
if (!request.isAppFinished()) { if (!request.isAppFinished()) {
return printContainerInfoFromRunningApplication(request); return printContainerInfoFromRunningApplication(request, logCliHelper);
} else { } else {
return logCliHelper.printAContainerLogMetadata( return logCliHelper.printAContainerLogMetadata(
request, System.out, System.err); request, System.out, System.err);
@ -901,7 +920,8 @@ public class LogsCLI extends Configured implements Tool {
} }
private int fetchContainerLogs(ContainerLogsRequest request, private int fetchContainerLogs(ContainerLogsRequest request,
LogCLIHelpers logCliHelper, boolean useRegex) throws IOException { LogCLIHelpers logCliHelper, boolean useRegex) throws IOException,
ClientHandlerException, UniformInterfaceException, JSONException {
int resultCode = 0; int resultCode = 0;
String appIdStr = request.getAppId().toString(); String appIdStr = request.getAppId().toString();
String containerIdStr = request.getContainerId(); String containerIdStr = request.getContainerId();
@ -942,14 +962,30 @@ public class LogsCLI extends Configured implements Tool {
return printContainerLogsForFinishedApplicationWithoutNodeId( return printContainerLogsForFinishedApplicationWithoutNodeId(
request, logCliHelper, useRegex); request, logCliHelper, useRegex);
} else { } else {
nodeHttpAddress = getNodeHttpAddressFromRMWebString(request);
if (nodeHttpAddress != null && !nodeHttpAddress.isEmpty()) {
request.setNodeHttpAddress(nodeHttpAddress);
} else {
// for the case, we have already uploaded partial logs in HDFS
int result = -1;
if (nodeAddress != null && !nodeAddress.isEmpty()) {
result = printAggregatedContainerLogs(
request, logCliHelper, useRegex);
} else {
result = printAggregatedContainerLogsWithoutNodeId(
request, logCliHelper, useRegex);
}
if (result == -1) {
System.err.println("Unable to get logs for this container:" System.err.println("Unable to get logs for this container:"
+ containerIdStr + "for the application:" + appIdStr + containerIdStr + " for the application:" + appIdStr
+ " with the appOwner: " + appOwner); + " with the appOwner: " + appOwner);
System.err.println("The application: " + appIdStr System.err.println("The application: " + appIdStr
+ " is still running, and we can not get Container report " + " is still running, and we can not get Container report "
+ "for the container: " + containerIdStr +". Please try later " + "for the container: " + containerIdStr +". Please try later "
+ "or after the application finishes."); + "or after the application finishes.");
return -1; }
return result;
}
} }
} }
// If the application is not in the final state, // If the application is not in the final state,
@ -1145,7 +1181,9 @@ public class LogsCLI extends Configured implements Tool {
} }
private int printContainerInfoFromRunningApplication( private int printContainerInfoFromRunningApplication(
ContainerLogsRequest options) throws YarnException, IOException { ContainerLogsRequest options, LogCLIHelpers logCliHelper)
throws YarnException, IOException, ClientHandlerException,
UniformInterfaceException, JSONException {
String containerIdStr = options.getContainerId(); String containerIdStr = options.getContainerId();
String nodeIdStr = options.getNodeId(); String nodeIdStr = options.getNodeId();
List<ContainerReport> reports = List<ContainerReport> reports =
@ -1153,6 +1191,20 @@ public class LogsCLI extends Configured implements Tool {
List<ContainerReport> filteredReports = filterContainersInfo( List<ContainerReport> filteredReports = filterContainersInfo(
options, reports); options, reports);
if (filteredReports.isEmpty()) { if (filteredReports.isEmpty()) {
// if we specify the containerId as well as NodeAddress
String nodeHttpAddress = null;
if (options.getContainerId() != null
&& !options.getContainerId().isEmpty()) {
nodeHttpAddress = getNodeHttpAddressFromRMWebString(options);
}
if (nodeHttpAddress != null) {
outputContainerLogMeta(options.getContainerId(), options.getNodeId(),
nodeHttpAddress);
return 0;
} else {
int result = logCliHelper.printAContainerLogMetadata(
options, System.out, System.err);
if (result == -1) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
if (containerIdStr != null && !containerIdStr.isEmpty()) { if (containerIdStr != null && !containerIdStr.isEmpty()) {
sb.append("Trying to get container with ContainerId: " sb.append("Trying to get container with ContainerId: "
@ -1165,42 +1217,49 @@ public class LogsCLI extends Configured implements Tool {
sb.append("Can not find any matched containers for the application: " sb.append("Can not find any matched containers for the application: "
+ options.getAppId()); + options.getAppId());
System.err.println(sb.toString()); System.err.println(sb.toString());
return -1; }
return result;
}
} }
for (ContainerReport report : filteredReports) { for (ContainerReport report : filteredReports) {
String nodeId = report.getAssignedNode().toString(); String nodeId = report.getAssignedNode().toString();
String nodeHttpAddress = report.getNodeHttpAddress().replaceFirst( String nodeHttpAddress = report.getNodeHttpAddress().replaceFirst(
WebAppUtils.getHttpSchemePrefix(getConf()), ""); WebAppUtils.getHttpSchemePrefix(getConf()), "");
String containerId = report.getContainerId().toString(); String containerId = report.getContainerId().toString();
outputContainerLogMeta(containerId, nodeId, nodeHttpAddress);
}
return 0;
}
private void outputContainerLogMeta(String containerId, String nodeId,
String nodeHttpAddress) throws IOException {
String containerString = String.format( String containerString = String.format(
LogCLIHelpers.CONTAINER_ON_NODE_PATTERN, containerId, nodeId); LogCLIHelpers.CONTAINER_ON_NODE_PATTERN, containerId, nodeId);
outStream.println(containerString); outStream.println(containerString);
outStream.println(StringUtils.repeat("=", containerString.length())); outStream.println(StringUtils.repeat("=", containerString.length()));
outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN, outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN,
"LogFile", "LogLength", "LastModificationTime"); "LogFile", "LogLength", "LastModificationTime", "LogAggregationType");
outStream.println(StringUtils.repeat("=", containerString.length())); outStream.println(StringUtils.repeat("=", containerString.length() * 2));
List<PerContainerLogFileInfo> infos = getContainerLogFiles( List<Pair<PerContainerLogFileInfo, String>> infos = getContainerLogFiles(
getConf(), containerId, nodeHttpAddress); getConf(), containerId, nodeHttpAddress);
for (PerContainerLogFileInfo info : infos) { for (Pair<PerContainerLogFileInfo, String> info : infos) {
outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN, outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN,
info.getFileName(), info.getFileSize(), info.getKey().getFileName(), info.getKey().getFileSize(),
info.getLastModifiedTime()); info.getKey().getLastModifiedTime(), info.getValue());
} }
} }
return 0;
}
@VisibleForTesting @VisibleForTesting
public Set<String> getMatchedContainerLogFiles(ContainerLogsRequest request, public Set<String> getMatchedContainerLogFiles(ContainerLogsRequest request,
boolean useRegex) throws IOException { boolean useRegex) throws IOException {
// fetch all the log files for the container // fetch all the log files for the container
// filter the log files based on the given -log_files pattern // filter the log files based on the given -log_files pattern
List<PerContainerLogFileInfo> allLogFileInfos= List<Pair<PerContainerLogFileInfo, String>> allLogFileInfos=
getContainerLogFiles(getConf(), request.getContainerId(), getContainerLogFiles(getConf(), request.getContainerId(),
request.getNodeHttpAddress()); request.getNodeHttpAddress());
List<String> fileNames = new ArrayList<String>(); List<String> fileNames = new ArrayList<String>();
for (PerContainerLogFileInfo fileInfo : allLogFileInfos) { for (Pair<PerContainerLogFileInfo, String> fileInfo : allLogFileInfos) {
fileNames.add(fileInfo.getFileName()); fileNames.add(fileInfo.getKey().getFileName());
} }
return getMatchedLogFiles(request, fileNames, return getMatchedLogFiles(request, fileNames,
useRegex); useRegex);
@ -1218,4 +1277,17 @@ public class LogsCLI extends Configured implements Tool {
.queryParam("size", Long.toString(request.getBytes())) .queryParam("size", Long.toString(request.getBytes()))
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
} }
@VisibleForTesting
public String getNodeHttpAddressFromRMWebString(ContainerLogsRequest request)
throws ClientHandlerException, UniformInterfaceException, JSONException {
if (request.getNodeId() == null || request.getNodeId().isEmpty()) {
return null;
}
JSONObject nodeInfo = YarnWebServiceUtils
.getNodeInfoFromRMWebService(getConf(), request.getNodeId())
.getJSONObject("node");
return nodeInfo.has("nodeHTTPAddress") ?
nodeInfo.getString("nodeHTTPAddress") : null;
}
} }

View File

@ -755,6 +755,23 @@ public class TestLogsCLI {
Set<String> logTypes1 = capturedRequests.get(1).getLogTypes(); Set<String> logTypes1 = capturedRequests.get(1).getLogTypes();
Assert.assertTrue(logTypes0.contains("ALL") && (logTypes0.size() == 1)); Assert.assertTrue(logTypes0.contains("ALL") && (logTypes0.size() == 1));
Assert.assertTrue(logTypes1.contains("ALL") && (logTypes1.size() == 1)); Assert.assertTrue(logTypes1.contains("ALL") && (logTypes1.size() == 1));
mockYarnClient = createMockYarnClientWithException(
YarnApplicationState.RUNNING, ugi.getShortUserName());
LogsCLI cli2 = spy(new LogsCLIForTest(mockYarnClient));
doReturn(0).when(cli2).printContainerLogsFromRunningApplication(
any(Configuration.class), any(ContainerLogsRequest.class),
any(LogCLIHelpers.class), anyBoolean());
doReturn("123").when(cli2).getNodeHttpAddressFromRMWebString(
any(ContainerLogsRequest.class));
cli2.setConf(new YarnConfiguration());
ContainerId containerId100 = ContainerId.newContainerId(appAttemptId, 100);
exitCode = cli2.run(new String[] {"-applicationId", appId.toString(),
"-containerId", containerId100.toString(), "-nodeAddress", "NM:1234"});
assertTrue(exitCode == 0);
verify(cli2, times(1)).printContainerLogsFromRunningApplication(
any(Configuration.class), logsRequestCaptor.capture(),
any(LogCLIHelpers.class), anyBoolean());
} }
@Test (timeout = 15000) @Test (timeout = 15000)
@ -1391,6 +1408,20 @@ public class TestLogsCLI {
return mockClient; return mockClient;
} }
private YarnClient createMockYarnClientWithException(
YarnApplicationState appState, String user)
throws YarnException, IOException {
YarnClient mockClient = mock(YarnClient.class);
ApplicationReport mockAppReport = mock(ApplicationReport.class);
doReturn(user).when(mockAppReport).getUser();
doReturn(appState).when(mockAppReport).getYarnApplicationState();
doReturn(mockAppReport).when(mockClient).getApplicationReport(
any(ApplicationId.class));
doThrow(new YarnException()).when(mockClient).getContainerReport(
any(ContainerId.class));
return mockClient;
}
private YarnClient createMockYarnClientWithException() private YarnClient createMockYarnClientWithException()
throws YarnException, IOException { throws YarnException, IOException {
YarnClient mockClient = mock(YarnClient.class); YarnClient mockClient = mock(YarnClient.class);

View File

@ -51,7 +51,7 @@ import com.google.common.annotations.VisibleForTesting;
public class LogCLIHelpers implements Configurable { public class LogCLIHelpers implements Configurable {
public static final String PER_LOG_FILE_INFO_PATTERN = public static final String PER_LOG_FILE_INFO_PATTERN =
"%30s\t%30s\t%30s" + System.getProperty("line.separator"); "%30s\t%30s\t%30s\t%30s" + System.getProperty("line.separator");
public static final String CONTAINER_ON_NODE_PATTERN = public static final String CONTAINER_ON_NODE_PATTERN =
"Container: %s on %s"; "Container: %s on %s";
@ -164,6 +164,7 @@ public class LogCLIHelpers implements Configurable {
String containerString = String.format(CONTAINER_ON_NODE_PATTERN, String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
containerId, thisNodeFile.getPath().getName()); containerId, thisNodeFile.getPath().getName());
out.println(containerString); out.println(containerString);
out.println("LogAggregationType: AGGREGATED");
out.println(StringUtils.repeat("=", containerString.length())); out.println(StringUtils.repeat("=", containerString.length()));
// We have to re-create reader object to reset the stream index // We have to re-create reader object to reset the stream index
// after calling getContainerLogsStream which would move the stream // after calling getContainerLogsStream which would move the stream
@ -238,6 +239,7 @@ public class LogCLIHelpers implements Configurable {
String containerString = String.format(CONTAINER_ON_NODE_PATTERN, String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
containerId, thisNodeFile.getPath().getName()); containerId, thisNodeFile.getPath().getName());
out.println(containerString); out.println(containerString);
out.println("LogAggregationType: AGGREGATED");
out.println(StringUtils.repeat("=", containerString.length())); out.println(StringUtils.repeat("=", containerString.length()));
if (logType == null || logType.isEmpty()) { if (logType == null || logType.isEmpty()) {
if (dumpAContainerLogs(containerId, reader, out, if (dumpAContainerLogs(containerId, reader, out,
@ -377,6 +379,7 @@ public class LogCLIHelpers implements Configurable {
CONTAINER_ON_NODE_PATTERN, key, CONTAINER_ON_NODE_PATTERN, key,
thisNodeFile.getPath().getName()); thisNodeFile.getPath().getName());
out.println(containerString); out.println(containerString);
out.println("LogAggregationType: AGGREGATED");
out.println(StringUtils.repeat("=", containerString.length())); out.println(StringUtils.repeat("=", containerString.length()));
while (true) { while (true) {
try { try {
@ -454,12 +457,12 @@ public class LogCLIHelpers implements Configurable {
out.println(containerString); out.println(containerString);
out.println(StringUtils.repeat("=", containerString.length())); out.println(StringUtils.repeat("=", containerString.length()));
out.printf(PER_LOG_FILE_INFO_PATTERN, "LogFile", "LogLength", out.printf(PER_LOG_FILE_INFO_PATTERN, "LogFile", "LogLength",
"LastModificationTime"); "LastModificationTime", "LogAggregationType");
out.println(StringUtils.repeat("=", containerString.length())); out.println(StringUtils.repeat("=", containerString.length() * 2));
for (PerContainerLogFileInfo logMeta : containerLogMeta for (PerContainerLogFileInfo logMeta : containerLogMeta
.getContainerLogMeta()) { .getContainerLogMeta()) {
out.printf(PER_LOG_FILE_INFO_PATTERN, logMeta.getFileName(), out.printf(PER_LOG_FILE_INFO_PATTERN, logMeta.getFileName(),
logMeta.getFileSize(), logMeta.getLastModifiedTime()); logMeta.getFileSize(), logMeta.getLastModifiedTime(), "AGGREGATED");
} }
} }
return 0; return 0;