YARN-6100. Improve YARN webservice to output aggregated container logs. Contributed by Xuan Gong.

This commit is contained in:
Junping Du 2017-02-02 00:45:27 -08:00
parent b64d9093e1
commit c699ce70ea
6 changed files with 334 additions and 235 deletions

View File

@ -64,7 +64,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
@ -510,17 +509,9 @@ public class LogsCLI extends Configured implements Tool {
newOptions.setLogTypes(matchedFiles);
Client webServiceClient = Client.create();
String containerString = String.format(
LogCLIHelpers.CONTAINER_ON_NODE_PATTERN, containerIdStr, nodeId);
out.println(containerString);
out.println(StringUtils.repeat("=", containerString.length()));
boolean foundAnyLogs = false;
byte[] buffer = new byte[65536];
for (String logFile : newOptions.getLogTypes()) {
out.println("LogType:" + logFile);
out.println("Log Upload Time:"
+ Times.format(System.currentTimeMillis()));
out.println("Log Contents:");
InputStream is = null;
try {
ClientResponse response = getResponeFromNMWebService(conf,
@ -542,14 +533,6 @@ public class LogsCLI extends Configured implements Tool {
response.getEntity(String.class));
out.println(msg);
}
StringBuilder sb = new StringBuilder();
sb.append("End of LogType:" + logFile + ".");
if (request.getContainerState() == ContainerState.RUNNING) {
sb.append(" This log file belongs"
+ " to a running container (" + containerIdStr + ") and so may"
+ " not be complete.");
}
out.println(sb.toString());
out.flush();
foundAnyLogs = true;
} catch (ClientHandlerException | UniformInterfaceException ex) {

View File

@ -20,11 +20,17 @@ package org.apache.hadoop.yarn.logaggregation;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
@ -38,6 +44,9 @@ import org.apache.hadoop.yarn.util.Times;
*/
public final class LogToolUtils {
public static final String CONTAINER_ON_NODE_PATTERN =
"Container: %s on %s";
private LogToolUtils() {}
/**
@ -114,4 +123,153 @@ public final class LogToolUtils {
}
return containersLogMeta;
}
/**
* Output container log.
* @param containerId the containerId
* @param nodeId the nodeId
* @param fileName the log file name
* @param fileLength the log file length
* @param outputSize the output size
* @param lastModifiedTime the log file last modified time
* @param fis the log file input stream
* @param os the output stream
* @param buf the buffer
* @param logType the log type.
* @throws IOException if we can not access the log file.
*/
public static void outputContainerLog(String containerId, String nodeId,
String fileName, long fileLength, long outputSize,
String lastModifiedTime, InputStream fis, OutputStream os,
byte[] buf, ContainerLogType logType) throws IOException {
long toSkip = 0;
long totalBytesToRead = fileLength;
long skipAfterRead = 0;
if (outputSize < 0) {
long absBytes = Math.abs(outputSize);
if (absBytes < fileLength) {
toSkip = fileLength - absBytes;
totalBytesToRead = absBytes;
}
org.apache.hadoop.io.IOUtils.skipFully(fis, toSkip);
} else {
if (outputSize < fileLength) {
totalBytesToRead = outputSize;
skipAfterRead = fileLength - outputSize;
}
}
long curRead = 0;
long pendingRead = totalBytesToRead - curRead;
int toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
int len = fis.read(buf, 0, toRead);
boolean keepGoing = (len != -1 && curRead < totalBytesToRead);
if (keepGoing) {
StringBuilder sb = new StringBuilder();
String containerStr = String.format(
LogToolUtils.CONTAINER_ON_NODE_PATTERN,
containerId, nodeId);
sb.append(containerStr + "\n");
sb.append("LogType: " + logType + "\n");
sb.append(StringUtils.repeat("=", containerStr.length()) + "\n");
sb.append("FileName:" + fileName + "\n");
sb.append("LogLastModifiedTime:" + lastModifiedTime + "\n");
sb.append("LogLength:" + Long.toString(fileLength) + "\n");
sb.append("LogContents:\n");
byte[] b = sb.toString().getBytes(
Charset.forName("UTF-8"));
os.write(b, 0, b.length);
}
while (keepGoing) {
os.write(buf, 0, len);
curRead += len;
pendingRead = totalBytesToRead - curRead;
toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
len = fis.read(buf, 0, toRead);
keepGoing = (len != -1 && curRead < totalBytesToRead);
}
org.apache.hadoop.io.IOUtils.skipFully(fis, skipAfterRead);
os.flush();
}
public static boolean outputAggregatedContainerLog(Configuration conf,
ApplicationId appId, String appOwner,
String containerId, String nodeId,
String logFileName, long outputSize, OutputStream os,
byte[] buf) throws IOException {
boolean findLogs = false;
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
.getRemoteNodeFileDir(conf, appId, appOwner);
while (nodeFiles != null && nodeFiles.hasNext()) {
final FileStatus thisNodeFile = nodeFiles.next();
String nodeName = thisNodeFile.getPath().getName();
if (nodeName.equals(appId + ".har")) {
Path p = new Path("har:///"
+ thisNodeFile.getPath().toUri().getRawPath());
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
continue;
}
if ((nodeId == null || nodeName.contains(LogAggregationUtils
.getNodeString(nodeId))) && !nodeName.endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader = null;
try {
reader = new AggregatedLogFormat.LogReader(conf,
thisNodeFile.getPath());
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
while (valueStream != null && !key.toString()
.equals(containerId)) {
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
if (valueStream == null) {
continue;
}
while (true) {
try {
String fileType = valueStream.readUTF();
String fileLengthStr = valueStream.readUTF();
long fileLength = Long.parseLong(fileLengthStr);
if (fileType.equalsIgnoreCase(logFileName)) {
LogToolUtils.outputContainerLog(containerId,
nodeId, fileType, fileLength, outputSize,
Times.format(thisNodeFile.getModificationTime()),
valueStream, os, buf, ContainerLogType.AGGREGATED);
StringBuilder sb = new StringBuilder();
String endOfFile = "End of LogFile:" + fileType;
sb.append("\n" + endOfFile + "\n");
sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ "\n\n");
byte[] b = sb.toString().getBytes(Charset.forName("UTF-8"));
os.write(b, 0, b.length);
findLogs = true;
} else {
long totalSkipped = 0;
long currSkipped = 0;
while (currSkipped != -1 && totalSkipped < fileLength) {
currSkipped = valueStream.skip(
fileLength - totalSkipped);
totalSkipped += currSkipped;
}
}
} catch (EOFException eof) {
break;
}
}
} finally {
if (reader != null) {
reader.close();
}
}
}
}
os.flush();
return findLogs;
}
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
@ -43,25 +41,19 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.server.webapp.WebServices;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo;
@ -70,11 +62,11 @@ import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@ -351,27 +343,27 @@ public class AHSWebServices extends WebServices {
} catch (Exception ex) {
// directly find logs from HDFS.
return sendStreamOutputResponse(appId, null, null, containerIdStr,
filename, format, length);
filename, format, length, false);
}
String appOwner = appInfo.getUser();
ContainerInfo containerInfo;
try {
containerInfo = super.getContainer(
req, res, appId.toString(),
containerId.getApplicationAttemptId().toString(),
containerId.toString());
} catch (Exception ex) {
if (isFinishedState(appInfo.getAppState())) {
// directly find logs from HDFS.
return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
filename, format, length);
}
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
"Can not get ContainerInfo for the container: " + containerId);
if (isFinishedState(appInfo.getAppState())) {
// directly find logs from HDFS.
return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
filename, format, length, false);
}
String nodeId = containerInfo.getNodeId();
if (isRunningState(appInfo.getAppState())) {
ContainerInfo containerInfo;
try {
containerInfo = super.getContainer(
req, res, appId.toString(),
containerId.getApplicationAttemptId().toString(),
containerId.toString());
} catch (Exception ex) {
// output the aggregated logs
return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
filename, format, length, true);
}
String nodeHttpAddress = containerInfo.getNodeHttpAddress();
String uri = "/" + containerId.toString() + "/logs/" + filename;
String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri);
@ -383,9 +375,6 @@ public class AHSWebServices extends WebServices {
HttpServletResponse.SC_TEMPORARY_REDIRECT);
response.header("Location", resURI);
return response.build();
} else if (isFinishedState(appInfo.getAppState())) {
return sendStreamOutputResponse(appId, appOwner, nodeId,
containerIdStr, filename, format, length);
} else {
return createBadResponse(Status.NOT_FOUND,
"The application is not at Running or Finished State.");
@ -410,7 +399,8 @@ public class AHSWebServices extends WebServices {
private Response sendStreamOutputResponse(ApplicationId appId,
String appOwner, String nodeId, String containerIdStr,
String fileName, String format, long bytes) {
String fileName, String format, long bytes,
boolean printEmptyLocalContainerLog) {
String contentType = WebAppUtils.getDefaultLogContentType();
if (format != null && !format.isEmpty()) {
contentType = WebAppUtils.getSupportedLogContentType(format);
@ -424,15 +414,11 @@ public class AHSWebServices extends WebServices {
StreamingOutput stream = null;
try {
stream = getStreamingOutput(appId, appOwner, nodeId,
containerIdStr, fileName, bytes);
containerIdStr, fileName, bytes, printEmptyLocalContainerLog);
} catch (Exception ex) {
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
ex.getMessage());
}
if (stream == null) {
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
"Can not get log for container: " + containerIdStr);
}
ResponseBuilder response = Response.ok(stream);
response.header("Content-Type", contentType);
// Sending the X-Content-Type-Options response header with the value
@ -442,146 +428,30 @@ public class AHSWebServices extends WebServices {
return response.build();
}
private StreamingOutput getStreamingOutput(ApplicationId appId,
String appOwner, final String nodeId, final String containerIdStr,
final String logFile, final long bytes) throws IOException{
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
org.apache.hadoop.fs.Path remoteRootLogDir = new org.apache.hadoop.fs.Path(
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
FileContext.getFileContext(conf).makeQualified(remoteRootLogDir);
FileContext fc = FileContext.getFileContext(
qualifiedRemoteRootLogDir.toUri(), conf);
org.apache.hadoop.fs.Path remoteAppDir = null;
if (appOwner == null) {
org.apache.hadoop.fs.Path toMatch = LogAggregationUtils
.getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
FileStatus[] matching = fc.util().globStatus(toMatch);
if (matching == null || matching.length != 1) {
return null;
}
remoteAppDir = matching[0].getPath();
} else {
remoteAppDir = LogAggregationUtils
.getRemoteAppLogDir(remoteRootLogDir, appId, appOwner, suffix);
}
final RemoteIterator<FileStatus> nodeFiles;
nodeFiles = fc.listStatus(remoteAppDir);
if (!nodeFiles.hasNext()) {
return null;
}
private StreamingOutput getStreamingOutput(final ApplicationId appId,
final String appOwner, final String nodeId, final String containerIdStr,
final String logFile, final long bytes,
final boolean printEmptyLocalContainerLog) throws IOException{
StreamingOutput stream = new StreamingOutput() {
@Override
public void write(OutputStream os) throws IOException,
WebApplicationException {
byte[] buf = new byte[65535];
boolean findLogs = false;
while (nodeFiles.hasNext()) {
final FileStatus thisNodeFile = nodeFiles.next();
String nodeName = thisNodeFile.getPath().getName();
if ((nodeId == null || nodeName.contains(LogAggregationUtils
.getNodeString(nodeId))) && !nodeName.endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader = null;
try {
reader = new AggregatedLogFormat.LogReader(conf,
thisNodeFile.getPath());
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
while (valueStream != null && !key.toString()
.equals(containerIdStr)) {
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
if (valueStream == null) {
continue;
}
while (true) {
try {
String fileType = valueStream.readUTF();
String fileLengthStr = valueStream.readUTF();
long fileLength = Long.parseLong(fileLengthStr);
if (fileType.equalsIgnoreCase(logFile)) {
StringBuilder sb = new StringBuilder();
sb.append("LogType:");
sb.append(fileType + "\n");
sb.append("Log Upload Time:");
sb.append(Times.format(System.currentTimeMillis()) + "\n");
sb.append("LogLength:");
sb.append(fileLengthStr + "\n");
sb.append("Log Contents:\n");
byte[] b = sb.toString().getBytes(
Charset.forName("UTF-8"));
os.write(b, 0, b.length);
long toSkip = 0;
long totalBytesToRead = fileLength;
long skipAfterRead = 0;
if (bytes < 0) {
long absBytes = Math.abs(bytes);
if (absBytes < fileLength) {
toSkip = fileLength - absBytes;
totalBytesToRead = absBytes;
}
org.apache.hadoop.io.IOUtils.skipFully(
valueStream, toSkip);
} else {
if (bytes < fileLength) {
totalBytesToRead = bytes;
skipAfterRead = fileLength - bytes;
}
}
long curRead = 0;
long pendingRead = totalBytesToRead - curRead;
int toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
int len = valueStream.read(buf, 0, toRead);
while (len != -1 && curRead < totalBytesToRead) {
os.write(buf, 0, len);
curRead += len;
pendingRead = totalBytesToRead - curRead;
toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
len = valueStream.read(buf, 0, toRead);
}
org.apache.hadoop.io.IOUtils.skipFully(
valueStream, skipAfterRead);
sb = new StringBuilder();
sb.append("\nEnd of LogType:" + fileType + "\n");
b = sb.toString().getBytes(Charset.forName("UTF-8"));
os.write(b, 0, b.length);
findLogs = true;
} else {
long totalSkipped = 0;
long currSkipped = 0;
while (currSkipped != -1 && totalSkipped < fileLength) {
currSkipped = valueStream.skip(
fileLength - totalSkipped);
totalSkipped += currSkipped;
}
}
} catch (EOFException eof) {
break;
}
}
} finally {
if (reader != null) {
reader.close();
}
}
}
}
os.flush();
boolean findLogs = LogToolUtils.outputAggregatedContainerLog(conf,
appId, appOwner, containerIdStr, nodeId, logFile, bytes, os, buf);
if (!findLogs) {
throw new IOException("Can not find logs for container:"
+ containerIdStr);
} else {
if (printEmptyLocalContainerLog) {
StringBuilder sb = new StringBuilder();
sb.append(containerIdStr + "\n");
sb.append("LogType: " + ContainerLogType.LOCAL + "\n");
sb.append("LogContents:\n");
sb.append(getNoRedirectWarning() + "\n");
os.write(sb.toString().getBytes(Charset.forName("UTF-8")));
}
}
}
};
@ -631,4 +501,12 @@ public class AHSWebServices extends WebServices {
throw new WebApplicationException(ex);
}
}
@Private
@VisibleForTesting
public static String getNoRedirectWarning() {
return "We do not have NodeManager web address, so we can not "
+ "re-direct the request to related NodeManager "
+ "for local container logs.";
}
}

View File

@ -34,7 +34,7 @@ import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -580,7 +580,10 @@ public class TestAHSWebServices extends JerseyTestBase {
responseText = response.getEntity(String.class);
assertTrue(responseText.contains("Hello." + containerId1ForApp100));
int fullTextSize = responseText.getBytes().length;
int tailTextSize = "\nEnd of LogType:syslog\n".getBytes().length;
String tailEndSeparator = StringUtils.repeat("*",
"End of LogFile:syslog".length() + 50) + "\n\n";
int tailTextSize = "\nEnd of LogFile:syslog\n".getBytes().length
+ tailEndSeparator.getBytes().length;
String logMessage = "Hello." + containerId1ForApp100;
int fileContentSize = logMessage.getBytes().length;
@ -681,6 +684,28 @@ public class TestAHSWebServices extends JerseyTestBase {
assertTrue(redirectURL.contains(containerId1.toString()));
assertTrue(redirectURL.contains("/logs/" + fileName));
assertTrue(redirectURL.contains("user.name=" + user));
// If we can not container information from ATS, we would try to
// get aggregated log from remote FileSystem.
ContainerId containerId1000 = ContainerId.newContainerId(
appAttemptId, 1000);
String content = "Hello." + containerId1000;
NodeId nodeId = NodeId.newInstance("test host", 100);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, containerId1000, nodeId, fileName, user, content, true);
r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1000.toString()).path(fileName)
.queryParam("user.name", user)
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
String responseText = response.getEntity(String.class);
assertTrue(responseText.contains(content));
// Also test whether we output the empty local container log, and give
// the warning message.
assertTrue(responseText.contains("LogType: " + ContainerLogType.LOCAL));
assertTrue(responseText.contains(AHSWebServices.getNoRedirectWarning()));
}
@Test(timeout = 10000)

View File

@ -21,6 +21,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
@ -41,6 +42,9 @@ import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -56,6 +60,7 @@ import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainerInfo;
@ -63,6 +68,7 @@ import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMContainerLogsInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.WebApp;
@ -73,6 +79,7 @@ import com.google.inject.Singleton;
@Singleton
@Path("/ws/v1/node")
public class NMWebServices {
private static final Log LOG = LogFactory.getLog(NMWebServices.class);
private Context nmContext;
private ResourceView rview;
private WebApp webapp;
@ -321,17 +328,33 @@ public class NMWebServices {
@Produces({ MediaType.TEXT_PLAIN })
@Public
@Unstable
public Response getLogs(@PathParam("containerid") String containerIdStr,
public Response getLogs(
@PathParam("containerid") final String containerIdStr,
@PathParam("filename") String filename,
@QueryParam("format") String format,
@QueryParam("size") String size) {
ContainerId containerId;
ContainerId tempContainerId;
try {
containerId = ContainerId.fromString(containerIdStr);
tempContainerId = ContainerId.fromString(containerIdStr);
} catch (IllegalArgumentException ex) {
return Response.status(Status.BAD_REQUEST).entity(ex.getMessage()).build();
}
final ContainerId containerId = tempContainerId;
boolean tempIsRunning = false;
// check what is the status for container
try {
Container container = nmContext.getContainers().get(containerId);
tempIsRunning = (container.getContainerState() == ContainerState.RUNNING);
} catch (Exception ex) {
// This NM does not have this container any more. We
// assume the container has already finished.
if (LOG.isDebugEnabled()) {
LOG.debug("Can not find the container:" + containerId
+ " in this node.");
}
}
final boolean isRunning = tempIsRunning;
File logFile = null;
try {
logFile = ContainerLogsUtils.getContainerLogFile(
@ -342,6 +365,8 @@ public class NMWebServices {
return Response.serverError().entity(ex.getMessage()).build();
}
final long bytes = parseLongParam(size);
final String lastModifiedTime = Times.format(logFile.lastModified());
final String outputFileName = filename;
String contentType = WebAppUtils.getDefaultLogContentType();
if (format != null && !format.isEmpty()) {
contentType = WebAppUtils.getSupportedLogContentType(format);
@ -365,39 +390,40 @@ public class NMWebServices {
try {
int bufferSize = 65536;
byte[] buf = new byte[bufferSize];
long toSkip = 0;
long totalBytesToRead = fileLength;
long skipAfterRead = 0;
if (bytes < 0) {
long absBytes = Math.abs(bytes);
if (absBytes < fileLength) {
toSkip = fileLength - absBytes;
totalBytesToRead = absBytes;
}
org.apache.hadoop.io.IOUtils.skipFully(fis, toSkip);
LogToolUtils.outputContainerLog(containerId.toString(),
nmContext.getNodeId().toString(), outputFileName, fileLength,
bytes, lastModifiedTime, fis, os, buf, ContainerLogType.LOCAL);
StringBuilder sb = new StringBuilder();
String endOfFile = "End of LogFile:" + outputFileName;
sb.append(endOfFile + ".");
if (isRunning) {
sb.append("This log file belongs to a running container ("
+ containerIdStr + ") and so may not be complete." + "\n");
} else {
if (bytes < fileLength) {
totalBytesToRead = bytes;
skipAfterRead = fileLength - bytes;
sb.append("\n");
}
sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ "\n\n");
os.write(sb.toString().getBytes(Charset.forName("UTF-8")));
// If we have aggregated logs for this container,
// output the aggregation logs as well.
ApplicationId appId = containerId.getApplicationAttemptId()
.getApplicationId();
Application app = nmContext.getApplications().get(appId);
String appOwner = app == null ? null : app.getUser();
try {
LogToolUtils.outputAggregatedContainerLog(nmContext.getConf(),
appId, appOwner, containerId.toString(),
nmContext.getNodeId().toString(), outputFileName, bytes,
os, buf);
} catch (IOException ex) {
// Something wrong when we try to access the aggregated log.
if (LOG.isDebugEnabled()) {
LOG.debug("Can not access the aggregated log for "
+ "the container:" + containerId);
LOG.debug(ex.getMessage());
}
}
long curRead = 0;
long pendingRead = totalBytesToRead - curRead;
int toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
int len = fis.read(buf, 0, toRead);
while (len != -1 && curRead < totalBytesToRead) {
os.write(buf, 0, len);
curRead += len;
pendingRead = totalBytesToRead - curRead;
toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
len = fis.read(buf, 0, toRead);
}
org.apache.hadoop.io.IOUtils.skipFully(fis, skipAfterRead);
os.flush();
} finally {
IOUtils.closeQuietly(fis);
}

View File

@ -377,8 +377,9 @@ public class TestNMWebServices extends JerseyTestBase {
ClientResponse response = r.path(filename)
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
String responseText = response.getEntity(String.class);
assertEquals(logMessage, responseText);
int fullTextSize = responseText.getBytes().length;
String responseLogMessage = getLogContext(responseText);
assertEquals(logMessage, responseLogMessage);
int fullTextSize = responseLogMessage.getBytes().length;
// specify how many bytes we should get from logs
// specify a position number, it would get the first n bytes from
@ -387,9 +388,10 @@ public class TestNMWebServices extends JerseyTestBase {
.queryParam("size", "5")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertEquals(5, responseText.getBytes().length);
assertEquals(new String(logMessage.getBytes(), 0, 5), responseText);
assertTrue(fullTextSize >= responseText.getBytes().length);
responseLogMessage = getLogContext(responseText);
assertEquals(5, responseLogMessage.getBytes().length);
assertEquals(new String(logMessage.getBytes(), 0, 5), responseLogMessage);
assertTrue(fullTextSize >= responseLogMessage.getBytes().length);
// specify the bytes which is larger than the actual file size,
// we would get the full logs
@ -397,8 +399,9 @@ public class TestNMWebServices extends JerseyTestBase {
.queryParam("size", "10000")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertEquals(fullTextSize, responseText.getBytes().length);
assertEquals(logMessage, responseText);
responseLogMessage = getLogContext(responseText);
assertEquals(fullTextSize, responseLogMessage.getBytes().length);
assertEquals(logMessage, responseLogMessage);
// specify a negative number, it would get the last n bytes from
// container log
@ -406,25 +409,28 @@ public class TestNMWebServices extends JerseyTestBase {
.queryParam("size", "-5")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertEquals(5, responseText.getBytes().length);
responseLogMessage = getLogContext(responseText);
assertEquals(5, responseLogMessage.getBytes().length);
assertEquals(new String(logMessage.getBytes(),
logMessage.getBytes().length - 5, 5), responseText);
assertTrue(fullTextSize >= responseText.getBytes().length);
logMessage.getBytes().length - 5, 5), responseLogMessage);
assertTrue(fullTextSize >= responseLogMessage.getBytes().length);
response = r.path(filename)
.queryParam("size", "-10000")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
responseLogMessage = getLogContext(responseText);
assertEquals("text/plain", response.getType().toString());
assertEquals(fullTextSize, responseText.getBytes().length);
assertEquals(logMessage, responseText);
assertEquals(fullTextSize, responseLogMessage.getBytes().length);
assertEquals(logMessage, responseLogMessage);
// ask and download it
response = r.path(filename)
.queryParam("format", "octet-stream")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertEquals(logMessage, responseText);
responseLogMessage = getLogContext(responseText);
assertEquals(logMessage, responseLogMessage);
assertEquals(200, response.getStatus());
assertEquals("application/octet-stream", response.getType().toString());
@ -466,10 +472,11 @@ public class TestNMWebServices extends JerseyTestBase {
TestNMWebServices.class.getSimpleName() + "temp-log-dir");
try {
String aggregatedLogFile = filename + "-aggregated";
String aggregatedLogMessage = "This is aggregated ;og.";
TestContainerLogsUtils.createContainerLogFileInRemoteFS(
nmContext.getConf(), FileSystem.get(nmContext.getConf()),
tempLogDir.getAbsolutePath(), containerId, nmContext.getNodeId(),
aggregatedLogFile, "user", logMessage, true);
aggregatedLogFile, "user", aggregatedLogMessage, true);
r1 = resource();
response = r1.path("ws").path("v1").path("node")
.path("containers").path(containerIdStr)
@ -492,6 +499,21 @@ public class TestNMWebServices extends JerseyTestBase {
assertEquals(meta.get(0).getFileName(), filename);
}
}
// Test whether we could get aggregated log as well
TestContainerLogsUtils.createContainerLogFileInRemoteFS(
nmContext.getConf(), FileSystem.get(nmContext.getConf()),
tempLogDir.getAbsolutePath(), containerId, nmContext.getNodeId(),
filename, "user", aggregatedLogMessage, true);
response = r.path(filename)
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertTrue(responseText.contains("LogType: "
+ ContainerLogType.AGGREGATED));
assertTrue(responseText.contains(aggregatedLogMessage));
assertTrue(responseText.contains("LogType: "
+ ContainerLogType.LOCAL));
assertTrue(responseText.contains(logMessage));
} finally {
FileUtil.fullyDelete(tempLogDir);
}
@ -501,7 +523,7 @@ public class TestNMWebServices extends JerseyTestBase {
response = r.path(filename).accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertEquals(logMessage, responseText);
assertTrue(responseText.contains(logMessage));
}
public void verifyNodesXML(NodeList nodes) throws JSONException, Exception {
@ -591,4 +613,11 @@ public class TestNMWebServices extends JerseyTestBase {
YarnVersionInfo.getVersion(), resourceManagerVersion);
}
private String getLogContext(String fullMessage) {
String prefix = "LogContents:\n";
String postfix = "End of LogFile:";
int prefixIndex = fullMessage.indexOf(prefix) + prefix.length();
int postfixIndex = fullMessage.indexOf(postfix);
return fullMessage.substring(prefixIndex, postfixIndex);
}
}