YARN-6313. YARN logs cli should provide logs for a completed container even when application is still running. Contributed by Xuan Gong.

(cherry picked from commit b88f5e0f7858d1d89b79dfd325b767c34416052d)
This commit is contained in:
Junping Du 2017-03-14 12:56:54 -07:00
parent 0a3aa40fe7
commit 871dc420f8
3 changed files with 160 additions and 54 deletions

View File

@ -44,6 +44,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@ -409,10 +411,11 @@ public class LogsCLI extends Configured implements Tool {
return false;
}
private List<PerContainerLogFileInfo> getContainerLogFiles(
private List<Pair<PerContainerLogFileInfo, String>> getContainerLogFiles(
Configuration conf, String containerIdStr, String nodeHttpAddress)
throws IOException {
List<PerContainerLogFileInfo> logFileInfos = new ArrayList<>();
List<Pair<PerContainerLogFileInfo, String>> logFileInfos
= new ArrayList<>();
Client webServiceClient = Client.create();
try {
WebResource webResource = webServiceClient
@ -438,16 +441,20 @@ public class LogsCLI extends Configured implements Tool {
}
for (int i = 0; i < array.length(); i++) {
JSONObject log = array.getJSONObject(i);
String aggregateType = log.has("logAggregationType") ?
log.getString("logAggregationType") : "N/A";
Object ob = log.get("containerLogInfo");
if (ob instanceof JSONArray) {
JSONArray obArray = (JSONArray)ob;
for (int j = 0; j < obArray.length(); j++) {
logFileInfos.add(generatePerContainerLogFileInfoFromJSON(
obArray.getJSONObject(j)));
logFileInfos.add(new Pair<PerContainerLogFileInfo, String>(
generatePerContainerLogFileInfoFromJSON(
obArray.getJSONObject(j)), aggregateType));
}
} else if (ob instanceof JSONObject) {
logFileInfos.add(generatePerContainerLogFileInfoFromJSON(
(JSONObject)ob));
logFileInfos.add(new Pair<PerContainerLogFileInfo, String>(
generatePerContainerLogFileInfoFromJSON(
(JSONObject)ob), aggregateType));
}
}
} catch (Exception e) {
@ -542,10 +549,8 @@ public class LogsCLI extends Configured implements Tool {
IOUtils.closeQuietly(is);
}
}
// for the case, we have already uploaded partial logs in HDFS
int result = logCliHelper.dumpAContainerLogsForLogType(
newOptions, false);
if (result == 0 || foundAnyLogs) {
if (foundAnyLogs) {
return 0;
} else {
return -1;
@ -586,6 +591,19 @@ public class LogsCLI extends Configured implements Tool {
newOptions);
}
private int printAggregatedContainerLogs(ContainerLogsRequest request,
LogCLIHelpers logCliHelper, boolean useRegex) throws IOException {
return printContainerLogsForFinishedApplication(request,
logCliHelper, useRegex);
}
private int printAggregatedContainerLogsWithoutNodeId(
ContainerLogsRequest request, LogCLIHelpers logCliHelper,
boolean useRegex) throws IOException {
return printContainerLogsForFinishedApplicationWithoutNodeId(request,
logCliHelper, useRegex);
}
@Private
@VisibleForTesting
public ContainerReport getContainerReport(String containerIdStr)
@ -723,9 +741,10 @@ public class LogsCLI extends Configured implements Tool {
}
private int showContainerLogInfo(ContainerLogsRequest request,
LogCLIHelpers logCliHelper) throws IOException, YarnException {
LogCLIHelpers logCliHelper) throws IOException, YarnException,
ClientHandlerException, UniformInterfaceException, JSONException {
if (!request.isAppFinished()) {
return printContainerInfoFromRunningApplication(request);
return printContainerInfoFromRunningApplication(request, logCliHelper);
} else {
return logCliHelper.printAContainerLogMetadata(
request, System.out, System.err);
@ -900,7 +919,8 @@ public class LogsCLI extends Configured implements Tool {
}
private int fetchContainerLogs(ContainerLogsRequest request,
LogCLIHelpers logCliHelper, boolean useRegex) throws IOException {
LogCLIHelpers logCliHelper, boolean useRegex) throws IOException,
ClientHandlerException, UniformInterfaceException, JSONException {
int resultCode = 0;
String appIdStr = request.getAppId().toString();
String containerIdStr = request.getContainerId();
@ -941,14 +961,30 @@ public class LogsCLI extends Configured implements Tool {
return printContainerLogsForFinishedApplicationWithoutNodeId(
request, logCliHelper, useRegex);
} else {
System.err.println("Unable to get logs for this container:"
+ containerIdStr + "for the application:" + appIdStr
+ " with the appOwner: " + appOwner);
System.err.println("The application: " + appIdStr
+ " is still running, and we can not get Container report "
+ "for the container: " + containerIdStr +". Please try later "
+ "or after the application finishes.");
return -1;
nodeHttpAddress = getNodeHttpAddressFromRMWebString(request);
if (nodeHttpAddress != null && !nodeHttpAddress.isEmpty()) {
request.setNodeHttpAddress(nodeHttpAddress);
} else {
// for the case, we have already uploaded partial logs in HDFS
int result = -1;
if (nodeAddress != null && !nodeAddress.isEmpty()) {
result = printAggregatedContainerLogs(
request, logCliHelper, useRegex);
} else {
result = printAggregatedContainerLogsWithoutNodeId(
request, logCliHelper, useRegex);
}
if (result == -1) {
System.err.println("Unable to get logs for this container:"
+ containerIdStr + " for the application:" + appIdStr
+ " with the appOwner: " + appOwner);
System.err.println("The application: " + appIdStr
+ " is still running, and we can not get Container report "
+ "for the container: " + containerIdStr +". Please try later "
+ "or after the application finishes.");
}
return result;
}
}
}
// If the application is not in the final state,
@ -1144,7 +1180,9 @@ public class LogsCLI extends Configured implements Tool {
}
private int printContainerInfoFromRunningApplication(
ContainerLogsRequest options) throws YarnException, IOException {
ContainerLogsRequest options, LogCLIHelpers logCliHelper)
throws YarnException, IOException, ClientHandlerException,
UniformInterfaceException, JSONException {
String containerIdStr = options.getContainerId();
String nodeIdStr = options.getNodeId();
List<ContainerReport> reports =
@ -1152,54 +1190,75 @@ public class LogsCLI extends Configured implements Tool {
List<ContainerReport> filteredReports = filterContainersInfo(
options, reports);
if (filteredReports.isEmpty()) {
StringBuilder sb = new StringBuilder();
if (containerIdStr != null && !containerIdStr.isEmpty()) {
sb.append("Trying to get container with ContainerId: "
+ containerIdStr + "\n");
// if we specify the containerId as well as NodeAddress
String nodeHttpAddress = null;
if (options.getContainerId() != null
&& !options.getContainerId().isEmpty()) {
nodeHttpAddress = getNodeHttpAddressFromRMWebString(options);
}
if (nodeIdStr != null && !nodeIdStr.isEmpty()) {
sb.append("Trying to get container from NodeManager: "
+ nodeIdStr + "\n");
if (nodeHttpAddress != null) {
outputContainerLogMeta(options.getContainerId(), options.getNodeId(),
nodeHttpAddress);
return 0;
} else {
int result = logCliHelper.printAContainerLogMetadata(
options, System.out, System.err);
if (result == -1) {
StringBuilder sb = new StringBuilder();
if (containerIdStr != null && !containerIdStr.isEmpty()) {
sb.append("Trying to get container with ContainerId: "
+ containerIdStr + "\n");
}
if (nodeIdStr != null && !nodeIdStr.isEmpty()) {
sb.append("Trying to get container from NodeManager: "
+ nodeIdStr + "\n");
}
sb.append("Can not find any matched containers for the application: "
+ options.getAppId());
System.err.println(sb.toString());
}
return result;
}
sb.append("Can not find any matched containers for the application: "
+ options.getAppId());
System.err.println(sb.toString());
return -1;
}
for (ContainerReport report : filteredReports) {
String nodeId = report.getAssignedNode().toString();
String nodeHttpAddress = report.getNodeHttpAddress().replaceFirst(
WebAppUtils.getHttpSchemePrefix(getConf()), "");
String containerId = report.getContainerId().toString();
String containerString = String.format(
LogCLIHelpers.CONTAINER_ON_NODE_PATTERN, containerId, nodeId);
outStream.println(containerString);
outStream.println(StringUtils.repeat("=", containerString.length()));
outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN,
"LogFile", "LogLength", "LastModificationTime");
outStream.println(StringUtils.repeat("=", containerString.length()));
List<PerContainerLogFileInfo> infos = getContainerLogFiles(
getConf(), containerId, nodeHttpAddress);
for (PerContainerLogFileInfo info : infos) {
outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN,
info.getFileName(), info.getFileSize(),
info.getLastModifiedTime());
}
outputContainerLogMeta(containerId, nodeId, nodeHttpAddress);
}
return 0;
}
private void outputContainerLogMeta(String containerId, String nodeId,
String nodeHttpAddress) throws IOException {
String containerString = String.format(
LogCLIHelpers.CONTAINER_ON_NODE_PATTERN, containerId, nodeId);
outStream.println(containerString);
outStream.println(StringUtils.repeat("=", containerString.length()));
outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN,
"LogFile", "LogLength", "LastModificationTime", "LogAggregationType");
outStream.println(StringUtils.repeat("=", containerString.length() * 2));
List<Pair<PerContainerLogFileInfo, String>> infos = getContainerLogFiles(
getConf(), containerId, nodeHttpAddress);
for (Pair<PerContainerLogFileInfo, String> info : infos) {
outStream.printf(LogCLIHelpers.PER_LOG_FILE_INFO_PATTERN,
info.getKey().getFileName(), info.getKey().getFileSize(),
info.getKey().getLastModifiedTime(), info.getValue());
}
}
@VisibleForTesting
public Set<String> getMatchedContainerLogFiles(ContainerLogsRequest request,
boolean useRegex) throws IOException {
// fetch all the log files for the container
// filter the log files based on the given -log_files pattern
List<PerContainerLogFileInfo> allLogFileInfos=
List<Pair<PerContainerLogFileInfo, String>> allLogFileInfos=
getContainerLogFiles(getConf(), request.getContainerId(),
request.getNodeHttpAddress());
List<String> fileNames = new ArrayList<String>();
for (PerContainerLogFileInfo fileInfo : allLogFileInfos) {
fileNames.add(fileInfo.getFileName());
for (Pair<PerContainerLogFileInfo, String> fileInfo : allLogFileInfos) {
fileNames.add(fileInfo.getKey().getFileName());
}
return getMatchedLogFiles(request, fileNames,
useRegex);
@ -1217,4 +1276,17 @@ public class LogsCLI extends Configured implements Tool {
.queryParam("size", Long.toString(request.getBytes()))
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
}
@VisibleForTesting
public String getNodeHttpAddressFromRMWebString(ContainerLogsRequest request)
throws ClientHandlerException, UniformInterfaceException, JSONException {
if (request.getNodeId() == null || request.getNodeId().isEmpty()) {
return null;
}
JSONObject nodeInfo = YarnWebServiceUtils
.getNodeInfoFromRMWebService(getConf(), request.getNodeId())
.getJSONObject("node");
return nodeInfo.has("nodeHTTPAddress") ?
nodeInfo.getString("nodeHTTPAddress") : null;
}
}

View File

@ -755,6 +755,23 @@ public class TestLogsCLI {
Set<String> logTypes1 = capturedRequests.get(1).getLogTypes();
Assert.assertTrue(logTypes0.contains("ALL") && (logTypes0.size() == 1));
Assert.assertTrue(logTypes1.contains("ALL") && (logTypes1.size() == 1));
mockYarnClient = createMockYarnClientWithException(
YarnApplicationState.RUNNING, ugi.getShortUserName());
LogsCLI cli2 = spy(new LogsCLIForTest(mockYarnClient));
doReturn(0).when(cli2).printContainerLogsFromRunningApplication(
any(Configuration.class), any(ContainerLogsRequest.class),
any(LogCLIHelpers.class), anyBoolean());
doReturn("123").when(cli2).getNodeHttpAddressFromRMWebString(
any(ContainerLogsRequest.class));
cli2.setConf(new YarnConfiguration());
ContainerId containerId100 = ContainerId.newContainerId(appAttemptId, 100);
exitCode = cli2.run(new String[] {"-applicationId", appId.toString(),
"-containerId", containerId100.toString(), "-nodeAddress", "NM:1234"});
assertTrue(exitCode == 0);
verify(cli2, times(1)).printContainerLogsFromRunningApplication(
any(Configuration.class), logsRequestCaptor.capture(),
any(LogCLIHelpers.class), anyBoolean());
}
@Test (timeout = 15000)
@ -1391,6 +1408,20 @@ public class TestLogsCLI {
return mockClient;
}
private YarnClient createMockYarnClientWithException(
YarnApplicationState appState, String user)
throws YarnException, IOException {
YarnClient mockClient = mock(YarnClient.class);
ApplicationReport mockAppReport = mock(ApplicationReport.class);
doReturn(user).when(mockAppReport).getUser();
doReturn(appState).when(mockAppReport).getYarnApplicationState();
doReturn(mockAppReport).when(mockClient).getApplicationReport(
any(ApplicationId.class));
doThrow(new YarnException()).when(mockClient).getContainerReport(
any(ContainerId.class));
return mockClient;
}
private YarnClient createMockYarnClientWithException()
throws YarnException, IOException {
YarnClient mockClient = mock(YarnClient.class);

View File

@ -51,7 +51,7 @@ import com.google.common.annotations.VisibleForTesting;
public class LogCLIHelpers implements Configurable {
public static final String PER_LOG_FILE_INFO_PATTERN =
"%30s\t%30s\t%30s" + System.getProperty("line.separator");
"%30s\t%30s\t%30s\t%30s" + System.getProperty("line.separator");
public static final String CONTAINER_ON_NODE_PATTERN =
"Container: %s on %s";
@ -164,6 +164,7 @@ public class LogCLIHelpers implements Configurable {
String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
containerId, thisNodeFile.getPath().getName());
out.println(containerString);
out.println("LogAggregationType: AGGREGATED");
out.println(StringUtils.repeat("=", containerString.length()));
// We have to re-create reader object to reset the stream index
// after calling getContainerLogsStream which would move the stream
@ -238,6 +239,7 @@ public class LogCLIHelpers implements Configurable {
String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
containerId, thisNodeFile.getPath().getName());
out.println(containerString);
out.println("LogAggregationType: AGGREGATED");
out.println(StringUtils.repeat("=", containerString.length()));
if (logType == null || logType.isEmpty()) {
if (dumpAContainerLogs(containerId, reader, out,
@ -377,6 +379,7 @@ public class LogCLIHelpers implements Configurable {
CONTAINER_ON_NODE_PATTERN, key,
thisNodeFile.getPath().getName());
out.println(containerString);
out.println("LogAggregationType: AGGREGATED");
out.println(StringUtils.repeat("=", containerString.length()));
while (true) {
try {
@ -454,12 +457,12 @@ public class LogCLIHelpers implements Configurable {
out.println(containerString);
out.println(StringUtils.repeat("=", containerString.length()));
out.printf(PER_LOG_FILE_INFO_PATTERN, "LogFile", "LogLength",
"LastModificationTime");
out.println(StringUtils.repeat("=", containerString.length()));
"LastModificationTime", "LogAggregationType");
out.println(StringUtils.repeat("=", containerString.length() * 2));
for (PerContainerLogFileInfo logMeta : containerLogMeta
.getContainerLogMeta()) {
out.printf(PER_LOG_FILE_INFO_PATTERN, logMeta.getFileName(),
logMeta.getFileSize(), logMeta.getLastModifiedTime());
logMeta.getFileSize(), logMeta.getLastModifiedTime(), "AGGREGATED");
}
}
return 0;