YARN-6100. Improve YARN webservice to output aggregated container logs. Contributed by Xuan Gong.

This commit is contained in:
Junping Du 2017-02-02 00:41:18 -08:00
parent 2a942eed21
commit 327c9980aa
6 changed files with 333 additions and 235 deletions

View File

@ -64,7 +64,6 @@
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers; import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo; import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONException;
@ -509,17 +508,9 @@ public int printContainerLogsFromRunningApplication(Configuration conf,
newOptions.setLogTypes(matchedFiles); newOptions.setLogTypes(matchedFiles);
Client webServiceClient = Client.create(); Client webServiceClient = Client.create();
String containerString = String.format(
LogCLIHelpers.CONTAINER_ON_NODE_PATTERN, containerIdStr, nodeId);
out.println(containerString);
out.println(StringUtils.repeat("=", containerString.length()));
boolean foundAnyLogs = false; boolean foundAnyLogs = false;
byte[] buffer = new byte[65536]; byte[] buffer = new byte[65536];
for (String logFile : newOptions.getLogTypes()) { for (String logFile : newOptions.getLogTypes()) {
out.println("LogType:" + logFile);
out.println("Log Upload Time:"
+ Times.format(System.currentTimeMillis()));
out.println("Log Contents:");
InputStream is = null; InputStream is = null;
try { try {
ClientResponse response = getResponeFromNMWebService(conf, ClientResponse response = getResponeFromNMWebService(conf,
@ -541,14 +532,6 @@ public int printContainerLogsFromRunningApplication(Configuration conf,
response.getEntity(String.class)); response.getEntity(String.class));
out.println(msg); out.println(msg);
} }
StringBuilder sb = new StringBuilder();
sb.append("End of LogType:" + logFile + ".");
if (request.getContainerState() == ContainerState.RUNNING) {
sb.append(" This log file belongs"
+ " to a running container (" + containerIdStr + ") and so may"
+ " not be complete.");
}
out.println(sb.toString());
out.flush(); out.flush();
foundAnyLogs = true; foundAnyLogs = true;
} catch (ClientHandlerException | UniformInterfaceException ex) { } catch (ClientHandlerException | UniformInterfaceException ex) {

View File

@ -20,11 +20,17 @@
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.math3.util.Pair; import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
@ -40,6 +46,9 @@ public final class LogToolUtils {
private LogToolUtils() {} private LogToolUtils() {}
public static final String CONTAINER_ON_NODE_PATTERN =
"Container: %s on %s";
/** /**
* Return a list of {@link ContainerLogMeta} for a container * Return a list of {@link ContainerLogMeta} for a container
* from Remote FileSystem. * from Remote FileSystem.
@ -114,4 +123,153 @@ public static List<ContainerLogMeta> getContainerLogMetaFromRemoteFS(
} }
return containersLogMeta; return containersLogMeta;
} }
/**
* Output container log.
* @param containerId the containerId
* @param nodeId the nodeId
* @param fileName the log file name
* @param fileLength the log file length
* @param outputSize the output size
* @param lastModifiedTime the log file last modified time
* @param fis the log file input stream
* @param os the output stream
* @param buf the buffer
* @param logType the log type.
* @throws IOException if we can not access the log file.
*/
public static void outputContainerLog(String containerId, String nodeId,
String fileName, long fileLength, long outputSize,
String lastModifiedTime, InputStream fis, OutputStream os,
byte[] buf, ContainerLogType logType) throws IOException {
long toSkip = 0;
long totalBytesToRead = fileLength;
long skipAfterRead = 0;
if (outputSize < 0) {
long absBytes = Math.abs(outputSize);
if (absBytes < fileLength) {
toSkip = fileLength - absBytes;
totalBytesToRead = absBytes;
}
org.apache.hadoop.io.IOUtils.skipFully(fis, toSkip);
} else {
if (outputSize < fileLength) {
totalBytesToRead = outputSize;
skipAfterRead = fileLength - outputSize;
}
}
long curRead = 0;
long pendingRead = totalBytesToRead - curRead;
int toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
int len = fis.read(buf, 0, toRead);
boolean keepGoing = (len != -1 && curRead < totalBytesToRead);
if (keepGoing) {
StringBuilder sb = new StringBuilder();
String containerStr = String.format(
LogToolUtils.CONTAINER_ON_NODE_PATTERN,
containerId, nodeId);
sb.append(containerStr + "\n");
sb.append("LogType: " + logType + "\n");
sb.append(StringUtils.repeat("=", containerStr.length()) + "\n");
sb.append("FileName:" + fileName + "\n");
sb.append("LogLastModifiedTime:" + lastModifiedTime + "\n");
sb.append("LogLength:" + Long.toString(fileLength) + "\n");
sb.append("LogContents:\n");
byte[] b = sb.toString().getBytes(
Charset.forName("UTF-8"));
os.write(b, 0, b.length);
}
while (keepGoing) {
os.write(buf, 0, len);
curRead += len;
pendingRead = totalBytesToRead - curRead;
toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
len = fis.read(buf, 0, toRead);
keepGoing = (len != -1 && curRead < totalBytesToRead);
}
org.apache.hadoop.io.IOUtils.skipFully(fis, skipAfterRead);
os.flush();
}
public static boolean outputAggregatedContainerLog(Configuration conf,
ApplicationId appId, String appOwner,
String containerId, String nodeId,
String logFileName, long outputSize, OutputStream os,
byte[] buf) throws IOException {
boolean findLogs = false;
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
.getRemoteNodeFileDir(conf, appId, appOwner);
while (nodeFiles != null && nodeFiles.hasNext()) {
final FileStatus thisNodeFile = nodeFiles.next();
String nodeName = thisNodeFile.getPath().getName();
if (nodeName.equals(appId + ".har")) {
Path p = new Path("har:///"
+ thisNodeFile.getPath().toUri().getRawPath());
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
continue;
}
if ((nodeId == null || nodeName.contains(LogAggregationUtils
.getNodeString(nodeId))) && !nodeName.endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader = null;
try {
reader = new AggregatedLogFormat.LogReader(conf,
thisNodeFile.getPath());
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
while (valueStream != null && !key.toString()
.equals(containerId)) {
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
if (valueStream == null) {
continue;
}
while (true) {
try {
String fileType = valueStream.readUTF();
String fileLengthStr = valueStream.readUTF();
long fileLength = Long.parseLong(fileLengthStr);
if (fileType.equalsIgnoreCase(logFileName)) {
LogToolUtils.outputContainerLog(containerId,
nodeId, fileType, fileLength, outputSize,
Times.format(thisNodeFile.getModificationTime()),
valueStream, os, buf, ContainerLogType.AGGREGATED);
StringBuilder sb = new StringBuilder();
String endOfFile = "End of LogFile:" + fileType;
sb.append("\n" + endOfFile + "\n");
sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ "\n\n");
byte[] b = sb.toString().getBytes(Charset.forName("UTF-8"));
os.write(b, 0, b.length);
findLogs = true;
} else {
long totalSkipped = 0;
long currSkipped = 0;
while (currSkipped != -1 && totalSkipped < fileLength) {
currSkipped = valueStream.skip(
fileLength - totalSkipped);
totalSkipped += currSkipped;
}
}
} catch (EOFException eof) {
break;
}
}
} finally {
if (reader != null) {
reader.close();
}
}
}
}
os.flush();
return findLogs;
}
} }

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.charset.Charset; import java.nio.charset.Charset;
@ -43,12 +41,10 @@
import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.Response.Status;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.http.JettyUtils; import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -56,13 +52,9 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol; import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogType; import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.LogToolUtils; import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.server.webapp.WebServices; import org.apache.hadoop.yarn.server.webapp.WebServices;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo;
@ -71,11 +63,11 @@
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException; import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
@ -360,10 +352,16 @@ public Response getLogs(@Context HttpServletRequest req,
} catch (Exception ex) { } catch (Exception ex) {
// directly find logs from HDFS. // directly find logs from HDFS.
return sendStreamOutputResponse(appId, null, null, containerIdStr, return sendStreamOutputResponse(appId, null, null, containerIdStr,
filename, format, length); filename, format, length, false);
} }
String appOwner = appInfo.getUser(); String appOwner = appInfo.getUser();
if (isFinishedState(appInfo.getAppState())) {
// directly find logs from HDFS.
return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
filename, format, length, false);
}
if (isRunningState(appInfo.getAppState())) {
ContainerInfo containerInfo; ContainerInfo containerInfo;
try { try {
containerInfo = super.getContainer( containerInfo = super.getContainer(
@ -371,16 +369,10 @@ public Response getLogs(@Context HttpServletRequest req,
containerId.getApplicationAttemptId().toString(), containerId.getApplicationAttemptId().toString(),
containerId.toString()); containerId.toString());
} catch (Exception ex) { } catch (Exception ex) {
if (isFinishedState(appInfo.getAppState())) { // output the aggregated logs
// directly find logs from HDFS.
return sendStreamOutputResponse(appId, appOwner, null, containerIdStr, return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
filename, format, length); filename, format, length, true);
} }
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
"Can not get ContainerInfo for the container: " + containerId);
}
String nodeId = containerInfo.getNodeId();
if (isRunningState(appInfo.getAppState())) {
String nodeHttpAddress = containerInfo.getNodeHttpAddress(); String nodeHttpAddress = containerInfo.getNodeHttpAddress();
String uri = "/" + containerId.toString() + "/logs/" + filename; String uri = "/" + containerId.toString() + "/logs/" + filename;
String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri); String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri);
@ -392,9 +384,6 @@ public Response getLogs(@Context HttpServletRequest req,
HttpServletResponse.SC_TEMPORARY_REDIRECT); HttpServletResponse.SC_TEMPORARY_REDIRECT);
response.header("Location", resURI); response.header("Location", resURI);
return response.build(); return response.build();
} else if (isFinishedState(appInfo.getAppState())) {
return sendStreamOutputResponse(appId, appOwner, nodeId,
containerIdStr, filename, format, length);
} else { } else {
return createBadResponse(Status.NOT_FOUND, return createBadResponse(Status.NOT_FOUND,
"The application is not at Running or Finished State."); "The application is not at Running or Finished State.");
@ -419,7 +408,8 @@ private Response createBadResponse(Status status, String errMessage) {
private Response sendStreamOutputResponse(ApplicationId appId, private Response sendStreamOutputResponse(ApplicationId appId,
String appOwner, String nodeId, String containerIdStr, String appOwner, String nodeId, String containerIdStr,
String fileName, String format, long bytes) { String fileName, String format, long bytes,
boolean printEmptyLocalContainerLog) {
String contentType = WebAppUtils.getDefaultLogContentType(); String contentType = WebAppUtils.getDefaultLogContentType();
if (format != null && !format.isEmpty()) { if (format != null && !format.isEmpty()) {
contentType = WebAppUtils.getSupportedLogContentType(format); contentType = WebAppUtils.getSupportedLogContentType(format);
@ -433,15 +423,11 @@ private Response sendStreamOutputResponse(ApplicationId appId,
StreamingOutput stream = null; StreamingOutput stream = null;
try { try {
stream = getStreamingOutput(appId, appOwner, nodeId, stream = getStreamingOutput(appId, appOwner, nodeId,
containerIdStr, fileName, bytes); containerIdStr, fileName, bytes, printEmptyLocalContainerLog);
} catch (Exception ex) { } catch (Exception ex) {
return createBadResponse(Status.INTERNAL_SERVER_ERROR, return createBadResponse(Status.INTERNAL_SERVER_ERROR,
ex.getMessage()); ex.getMessage());
} }
if (stream == null) {
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
"Can not get log for container: " + containerIdStr);
}
ResponseBuilder response = Response.ok(stream); ResponseBuilder response = Response.ok(stream);
response.header("Content-Type", contentType); response.header("Content-Type", contentType);
// Sending the X-Content-Type-Options response header with the value // Sending the X-Content-Type-Options response header with the value
@ -451,146 +437,30 @@ private Response sendStreamOutputResponse(ApplicationId appId,
return response.build(); return response.build();
} }
private StreamingOutput getStreamingOutput(ApplicationId appId, private StreamingOutput getStreamingOutput(final ApplicationId appId,
String appOwner, final String nodeId, final String containerIdStr, final String appOwner, final String nodeId, final String containerIdStr,
final String logFile, final long bytes) throws IOException{ final String logFile, final long bytes,
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); final boolean printEmptyLocalContainerLog) throws IOException{
org.apache.hadoop.fs.Path remoteRootLogDir = new org.apache.hadoop.fs.Path(
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
FileContext.getFileContext(conf).makeQualified(remoteRootLogDir);
FileContext fc = FileContext.getFileContext(
qualifiedRemoteRootLogDir.toUri(), conf);
org.apache.hadoop.fs.Path remoteAppDir = null;
if (appOwner == null) {
org.apache.hadoop.fs.Path toMatch = LogAggregationUtils
.getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
FileStatus[] matching = fc.util().globStatus(toMatch);
if (matching == null || matching.length != 1) {
return null;
}
remoteAppDir = matching[0].getPath();
} else {
remoteAppDir = LogAggregationUtils
.getRemoteAppLogDir(remoteRootLogDir, appId, appOwner, suffix);
}
final RemoteIterator<FileStatus> nodeFiles;
nodeFiles = fc.listStatus(remoteAppDir);
if (!nodeFiles.hasNext()) {
return null;
}
StreamingOutput stream = new StreamingOutput() { StreamingOutput stream = new StreamingOutput() {
@Override @Override
public void write(OutputStream os) throws IOException, public void write(OutputStream os) throws IOException,
WebApplicationException { WebApplicationException {
byte[] buf = new byte[65535]; byte[] buf = new byte[65535];
boolean findLogs = false; boolean findLogs = LogToolUtils.outputAggregatedContainerLog(conf,
while (nodeFiles.hasNext()) { appId, appOwner, containerIdStr, nodeId, logFile, bytes, os, buf);
final FileStatus thisNodeFile = nodeFiles.next();
String nodeName = thisNodeFile.getPath().getName();
if ((nodeId == null || nodeName.contains(LogAggregationUtils
.getNodeString(nodeId))) && !nodeName.endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader = null;
try {
reader = new AggregatedLogFormat.LogReader(conf,
thisNodeFile.getPath());
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
while (valueStream != null && !key.toString()
.equals(containerIdStr)) {
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
if (valueStream == null) {
continue;
}
while (true) {
try {
String fileType = valueStream.readUTF();
String fileLengthStr = valueStream.readUTF();
long fileLength = Long.parseLong(fileLengthStr);
if (fileType.equalsIgnoreCase(logFile)) {
StringBuilder sb = new StringBuilder();
sb.append("LogType:");
sb.append(fileType + "\n");
sb.append("Log Upload Time:");
sb.append(Times.format(System.currentTimeMillis()) + "\n");
sb.append("LogLength:");
sb.append(fileLengthStr + "\n");
sb.append("Log Contents:\n");
byte[] b = sb.toString().getBytes(
Charset.forName("UTF-8"));
os.write(b, 0, b.length);
long toSkip = 0;
long totalBytesToRead = fileLength;
long skipAfterRead = 0;
if (bytes < 0) {
long absBytes = Math.abs(bytes);
if (absBytes < fileLength) {
toSkip = fileLength - absBytes;
totalBytesToRead = absBytes;
}
org.apache.hadoop.io.IOUtils.skipFully(
valueStream, toSkip);
} else {
if (bytes < fileLength) {
totalBytesToRead = bytes;
skipAfterRead = fileLength - bytes;
}
}
long curRead = 0;
long pendingRead = totalBytesToRead - curRead;
int toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
int len = valueStream.read(buf, 0, toRead);
while (len != -1 && curRead < totalBytesToRead) {
os.write(buf, 0, len);
curRead += len;
pendingRead = totalBytesToRead - curRead;
toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
len = valueStream.read(buf, 0, toRead);
}
org.apache.hadoop.io.IOUtils.skipFully(
valueStream, skipAfterRead);
sb = new StringBuilder();
sb.append("\nEnd of LogType:" + fileType + "\n");
b = sb.toString().getBytes(Charset.forName("UTF-8"));
os.write(b, 0, b.length);
findLogs = true;
} else {
long totalSkipped = 0;
long currSkipped = 0;
while (currSkipped != -1 && totalSkipped < fileLength) {
currSkipped = valueStream.skip(
fileLength - totalSkipped);
totalSkipped += currSkipped;
}
}
} catch (EOFException eof) {
break;
}
}
} finally {
if (reader != null) {
reader.close();
}
}
}
}
os.flush();
if (!findLogs) { if (!findLogs) {
throw new IOException("Can not find logs for container:" throw new IOException("Can not find logs for container:"
+ containerIdStr); + containerIdStr);
} else {
if (printEmptyLocalContainerLog) {
StringBuilder sb = new StringBuilder();
sb.append(containerIdStr + "\n");
sb.append("LogType: " + ContainerLogType.LOCAL + "\n");
sb.append("LogContents:\n");
sb.append(getNoRedirectWarning() + "\n");
os.write(sb.toString().getBytes(Charset.forName("UTF-8")));
}
} }
} }
}; };
@ -640,4 +510,12 @@ private Response getContainerLogMeta(ApplicationId appId, String appOwner,
throw new WebApplicationException(ex); throw new WebApplicationException(ex);
} }
} }
@Private
@VisibleForTesting
public static String getNoRedirectWarning() {
return "We do not have NodeManager web address, so we can not "
+ "re-direct the request to related NodeManager "
+ "for local container logs.";
}
} }

View File

@ -35,7 +35,7 @@
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -584,7 +584,10 @@ public void testContainerLogsForFinishedApps() throws Exception {
responseText = response.getEntity(String.class); responseText = response.getEntity(String.class);
assertTrue(responseText.contains("Hello." + containerId1ForApp100)); assertTrue(responseText.contains("Hello." + containerId1ForApp100));
int fullTextSize = responseText.getBytes().length; int fullTextSize = responseText.getBytes().length;
int tailTextSize = "\nEnd of LogType:syslog\n".getBytes().length; String tailEndSeparator = StringUtils.repeat("*",
"End of LogFile:syslog".length() + 50) + "\n\n";
int tailTextSize = "\nEnd of LogFile:syslog\n".getBytes().length
+ tailEndSeparator.getBytes().length;
String logMessage = "Hello." + containerId1ForApp100; String logMessage = "Hello." + containerId1ForApp100;
int fileContentSize = logMessage.getBytes().length; int fileContentSize = logMessage.getBytes().length;
@ -685,6 +688,28 @@ public void testContainerLogsForRunningApps() throws Exception {
assertTrue(redirectURL.contains(containerId1.toString())); assertTrue(redirectURL.contains(containerId1.toString()));
assertTrue(redirectURL.contains("/logs/" + fileName)); assertTrue(redirectURL.contains("/logs/" + fileName));
assertTrue(redirectURL.contains("user.name=" + user)); assertTrue(redirectURL.contains("user.name=" + user));
// If we can not container information from ATS, we would try to
// get aggregated log from remote FileSystem.
ContainerId containerId1000 = ContainerId.newContainerId(
appAttemptId, 1000);
String content = "Hello." + containerId1000;
NodeId nodeId = NodeId.newInstance("test host", 100);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, containerId1000, nodeId, fileName, user, content, true);
r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1000.toString()).path(fileName)
.queryParam("user.name", user)
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
String responseText = response.getEntity(String.class);
assertTrue(responseText.contains(content));
// Also test whether we output the empty local container log, and give
// the warning message.
assertTrue(responseText.contains("LogType: " + ContainerLogType.LOCAL));
assertTrue(responseText.contains(AHSWebServices.getNoRedirectWarning()));
} }
@Test(timeout = 10000) @Test(timeout = 10000)

View File

@ -21,6 +21,7 @@
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -41,6 +42,9 @@
import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo; import javax.ws.rs.core.UriInfo;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.http.JettyUtils; import org.apache.hadoop.http.JettyUtils;
@ -57,6 +61,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppsInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainerInfo;
@ -64,6 +69,7 @@
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NodeInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException; import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApp;
@ -74,6 +80,7 @@
@Singleton @Singleton
@Path("/ws/v1/node") @Path("/ws/v1/node")
public class NMWebServices { public class NMWebServices {
private static final Log LOG = LogFactory.getLog(NMWebServices.class);
private Context nmContext; private Context nmContext;
private ResourceView rview; private ResourceView rview;
private WebApp webapp; private WebApp webapp;
@ -330,17 +337,32 @@ public Response getContainerLogFile(
@Produces({ MediaType.TEXT_PLAIN + "; " + JettyUtils.UTF_8 }) @Produces({ MediaType.TEXT_PLAIN + "; " + JettyUtils.UTF_8 })
@Public @Public
@Unstable @Unstable
public Response getLogs(@PathParam("containerid") String containerIdStr, public Response getLogs(
@PathParam("containerid") final String containerIdStr,
@PathParam("filename") String filename, @PathParam("filename") String filename,
@QueryParam("format") String format, @QueryParam("format") String format,
@QueryParam("size") String size) { @QueryParam("size") String size) {
ContainerId containerId; ContainerId tempContainerId;
try { try {
containerId = ContainerId.fromString(containerIdStr); tempContainerId = ContainerId.fromString(containerIdStr);
} catch (IllegalArgumentException ex) { } catch (IllegalArgumentException ex) {
return Response.status(Status.BAD_REQUEST).build(); return Response.status(Status.BAD_REQUEST).build();
} }
final ContainerId containerId = tempContainerId;
boolean tempIsRunning = false;
// check what is the status for container
try {
Container container = nmContext.getContainers().get(containerId);
tempIsRunning = (container.getContainerState() == ContainerState.RUNNING);
} catch (Exception ex) {
// This NM does not have this container any more. We
// assume the container has already finished.
if (LOG.isDebugEnabled()) {
LOG.debug("Can not find the container:" + containerId
+ " in this node.");
}
}
final boolean isRunning = tempIsRunning;
File logFile = null; File logFile = null;
try { try {
logFile = ContainerLogsUtils.getContainerLogFile( logFile = ContainerLogsUtils.getContainerLogFile(
@ -351,6 +373,8 @@ public Response getLogs(@PathParam("containerid") String containerIdStr,
return Response.serverError().entity(ex.getMessage()).build(); return Response.serverError().entity(ex.getMessage()).build();
} }
final long bytes = parseLongParam(size); final long bytes = parseLongParam(size);
final String lastModifiedTime = Times.format(logFile.lastModified());
final String outputFileName = filename;
String contentType = WebAppUtils.getDefaultLogContentType(); String contentType = WebAppUtils.getDefaultLogContentType();
if (format != null && !format.isEmpty()) { if (format != null && !format.isEmpty()) {
contentType = WebAppUtils.getSupportedLogContentType(format); contentType = WebAppUtils.getSupportedLogContentType(format);
@ -374,39 +398,40 @@ public void write(OutputStream os) throws IOException,
try { try {
int bufferSize = 65536; int bufferSize = 65536;
byte[] buf = new byte[bufferSize]; byte[] buf = new byte[bufferSize];
long toSkip = 0; LogToolUtils.outputContainerLog(containerId.toString(),
long totalBytesToRead = fileLength; nmContext.getNodeId().toString(), outputFileName, fileLength,
long skipAfterRead = 0; bytes, lastModifiedTime, fis, os, buf, ContainerLogType.LOCAL);
if (bytes < 0) { StringBuilder sb = new StringBuilder();
long absBytes = Math.abs(bytes); String endOfFile = "End of LogFile:" + outputFileName;
if (absBytes < fileLength) { sb.append(endOfFile + ".");
toSkip = fileLength - absBytes; if (isRunning) {
totalBytesToRead = absBytes; sb.append("This log file belongs to a running container ("
} + containerIdStr + ") and so may not be complete." + "\n");
org.apache.hadoop.io.IOUtils.skipFully(fis, toSkip);
} else { } else {
if (bytes < fileLength) { sb.append("\n");
totalBytesToRead = bytes; }
skipAfterRead = fileLength - bytes; sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ "\n\n");
os.write(sb.toString().getBytes(Charset.forName("UTF-8")));
// If we have aggregated logs for this container,
// output the aggregation logs as well.
ApplicationId appId = containerId.getApplicationAttemptId()
.getApplicationId();
Application app = nmContext.getApplications().get(appId);
String appOwner = app == null ? null : app.getUser();
try {
LogToolUtils.outputAggregatedContainerLog(nmContext.getConf(),
appId, appOwner, containerId.toString(),
nmContext.getNodeId().toString(), outputFileName, bytes,
os, buf);
} catch (Exception ex) {
// Something wrong when we try to access the aggregated log.
if (LOG.isDebugEnabled()) {
LOG.debug("Can not access the aggregated log for "
+ "the container:" + containerId);
LOG.debug(ex.getMessage());
} }
} }
long curRead = 0;
long pendingRead = totalBytesToRead - curRead;
int toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
int len = fis.read(buf, 0, toRead);
while (len != -1 && curRead < totalBytesToRead) {
os.write(buf, 0, len);
curRead += len;
pendingRead = totalBytesToRead - curRead;
toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
len = fis.read(buf, 0, toRead);
}
org.apache.hadoop.io.IOUtils.skipFully(fis, skipAfterRead);
os.flush();
} finally { } finally {
IOUtils.closeQuietly(fis); IOUtils.closeQuietly(fis);
} }

View File

@ -384,8 +384,9 @@ private void testContainerLogs(WebResource r, ContainerId containerId)
ClientResponse response = r.path(filename) ClientResponse response = r.path(filename)
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
String responseText = response.getEntity(String.class); String responseText = response.getEntity(String.class);
assertEquals(logMessage, responseText); String responseLogMessage = getLogContext(responseText);
int fullTextSize = responseText.getBytes().length; assertEquals(logMessage, responseLogMessage);
int fullTextSize = responseLogMessage.getBytes().length;
// specify how many bytes we should get from logs // specify how many bytes we should get from logs
// specify a position number, it would get the first n bytes from // specify a position number, it would get the first n bytes from
@ -394,9 +395,10 @@ private void testContainerLogs(WebResource r, ContainerId containerId)
.queryParam("size", "5") .queryParam("size", "5")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class); responseText = response.getEntity(String.class);
assertEquals(5, responseText.getBytes().length); responseLogMessage = getLogContext(responseText);
assertEquals(new String(logMessage.getBytes(), 0, 5), responseText); assertEquals(5, responseLogMessage.getBytes().length);
assertTrue(fullTextSize >= responseText.getBytes().length); assertEquals(new String(logMessage.getBytes(), 0, 5), responseLogMessage);
assertTrue(fullTextSize >= responseLogMessage.getBytes().length);
// specify the bytes which is larger than the actual file size, // specify the bytes which is larger than the actual file size,
// we would get the full logs // we would get the full logs
@ -404,8 +406,9 @@ private void testContainerLogs(WebResource r, ContainerId containerId)
.queryParam("size", "10000") .queryParam("size", "10000")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class); responseText = response.getEntity(String.class);
assertEquals(fullTextSize, responseText.getBytes().length); responseLogMessage = getLogContext(responseText);
assertEquals(logMessage, responseText); assertEquals(fullTextSize, responseLogMessage.getBytes().length);
assertEquals(logMessage, responseLogMessage);
// specify a negative number, it would get the last n bytes from // specify a negative number, it would get the last n bytes from
// container log // container log
@ -413,25 +416,28 @@ private void testContainerLogs(WebResource r, ContainerId containerId)
.queryParam("size", "-5") .queryParam("size", "-5")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class); responseText = response.getEntity(String.class);
assertEquals(5, responseText.getBytes().length); responseLogMessage = getLogContext(responseText);
assertEquals(5, responseLogMessage.getBytes().length);
assertEquals(new String(logMessage.getBytes(), assertEquals(new String(logMessage.getBytes(),
logMessage.getBytes().length - 5, 5), responseText); logMessage.getBytes().length - 5, 5), responseLogMessage);
assertTrue(fullTextSize >= responseText.getBytes().length); assertTrue(fullTextSize >= responseLogMessage.getBytes().length);
response = r.path(filename) response = r.path(filename)
.queryParam("size", "-10000") .queryParam("size", "-10000")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class); responseText = response.getEntity(String.class);
responseLogMessage = getLogContext(responseText);
assertEquals("text/plain; charset=utf-8", response.getType().toString()); assertEquals("text/plain; charset=utf-8", response.getType().toString());
assertEquals(fullTextSize, responseText.getBytes().length); assertEquals(fullTextSize, responseLogMessage.getBytes().length);
assertEquals(logMessage, responseText); assertEquals(logMessage, responseLogMessage);
// ask and download it // ask and download it
response = r.path(filename) response = r.path(filename)
.queryParam("format", "octet-stream") .queryParam("format", "octet-stream")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class); responseText = response.getEntity(String.class);
assertEquals(logMessage, responseText); responseLogMessage = getLogContext(responseText);
assertEquals(logMessage, responseLogMessage);
assertEquals(200, response.getStatus()); assertEquals(200, response.getStatus());
assertEquals("application/octet-stream; charset=utf-8", assertEquals("application/octet-stream; charset=utf-8",
response.getType().toString()); response.getType().toString());
@ -475,10 +481,11 @@ private void testContainerLogs(WebResource r, ContainerId containerId)
TestNMWebServices.class.getSimpleName() + "temp-log-dir"); TestNMWebServices.class.getSimpleName() + "temp-log-dir");
try { try {
String aggregatedLogFile = filename + "-aggregated"; String aggregatedLogFile = filename + "-aggregated";
String aggregatedLogMessage = "This is aggregated ;og.";
TestContainerLogsUtils.createContainerLogFileInRemoteFS( TestContainerLogsUtils.createContainerLogFileInRemoteFS(
nmContext.getConf(), FileSystem.get(nmContext.getConf()), nmContext.getConf(), FileSystem.get(nmContext.getConf()),
tempLogDir.getAbsolutePath(), containerId, nmContext.getNodeId(), tempLogDir.getAbsolutePath(), containerId, nmContext.getNodeId(),
aggregatedLogFile, "user", logMessage, true); aggregatedLogFile, "user", aggregatedLogMessage, true);
r1 = resource(); r1 = resource();
response = r1.path("ws").path("v1").path("node") response = r1.path("ws").path("v1").path("node")
.path("containers").path(containerIdStr) .path("containers").path(containerIdStr)
@ -501,6 +508,21 @@ private void testContainerLogs(WebResource r, ContainerId containerId)
assertEquals(meta.get(0).getFileName(), filename); assertEquals(meta.get(0).getFileName(), filename);
} }
} }
// Test whether we could get aggregated log as well
TestContainerLogsUtils.createContainerLogFileInRemoteFS(
nmContext.getConf(), FileSystem.get(nmContext.getConf()),
tempLogDir.getAbsolutePath(), containerId, nmContext.getNodeId(),
filename, "user", aggregatedLogMessage, true);
response = r.path(filename)
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertTrue(responseText.contains("LogType: "
+ ContainerLogType.AGGREGATED));
assertTrue(responseText.contains(aggregatedLogMessage));
assertTrue(responseText.contains("LogType: "
+ ContainerLogType.LOCAL));
assertTrue(responseText.contains(logMessage));
} finally { } finally {
FileUtil.fullyDelete(tempLogDir); FileUtil.fullyDelete(tempLogDir);
} }
@ -511,7 +533,7 @@ private void testContainerLogs(WebResource r, ContainerId containerId)
r.path(filename).accept(MediaType.TEXT_PLAIN) r.path(filename).accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class); .get(ClientResponse.class);
responseText = response.getEntity(String.class); responseText = response.getEntity(String.class);
assertEquals(logMessage, responseText); assertTrue(responseText.contains(logMessage));
} }
public void verifyNodesXML(NodeList nodes) throws JSONException, Exception { public void verifyNodesXML(NodeList nodes) throws JSONException, Exception {
@ -601,4 +623,11 @@ public void verifyNodeInfoGeneric(String id, String healthReport,
YarnVersionInfo.getVersion(), resourceManagerVersion); YarnVersionInfo.getVersion(), resourceManagerVersion);
} }
private String getLogContext(String fullMessage) {
String prefix = "LogContents:\n";
String postfix = "End of LogFile:";
int prefixIndex = fullMessage.indexOf(prefix) + prefix.length();
int postfixIndex = fullMessage.indexOf(postfix);
return fullMessage.substring(prefixIndex, postfixIndex);
}
} }