From 327c9980aafce52cc02d2b8885fc4e9f628ab23c Mon Sep 17 00:00:00 2001 From: Junping Du Date: Thu, 2 Feb 2017 00:41:18 -0800 Subject: [PATCH] YARN-6100. Improve YARN webservice to output aggregated container logs. Contributed by Xuan Gong. --- .../hadoop/yarn/client/cli/LogsCLI.java | 17 -- .../yarn/logaggregation/LogToolUtils.java | 158 +++++++++++++ .../webapp/AHSWebServices.java | 212 ++++-------------- .../webapp/TestAHSWebServices.java | 29 ++- .../nodemanager/webapp/NMWebServices.java | 93 +++++--- .../nodemanager/webapp/TestNMWebServices.java | 59 +++-- 6 files changed, 333 insertions(+), 235 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java index 1de4cd1724f..3cb1c7d82ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java @@ -64,7 +64,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers; import org.apache.hadoop.yarn.logaggregation.PerContainerLogFileInfo; -import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; @@ -509,17 +508,9 @@ public class LogsCLI extends Configured implements Tool { newOptions.setLogTypes(matchedFiles); Client webServiceClient = Client.create(); - String containerString = String.format( - LogCLIHelpers.CONTAINER_ON_NODE_PATTERN, containerIdStr, nodeId); - out.println(containerString); - out.println(StringUtils.repeat("=", containerString.length())); boolean foundAnyLogs = false; byte[] buffer = new byte[65536]; for (String logFile : newOptions.getLogTypes()) { - out.println("LogType:" + logFile); - out.println("Log Upload Time:" - + Times.format(System.currentTimeMillis())); - out.println("Log Contents:"); InputStream is = null; try { ClientResponse response = getResponeFromNMWebService(conf, @@ -541,14 +532,6 @@ public class LogsCLI extends Configured implements Tool { response.getEntity(String.class)); out.println(msg); } - StringBuilder sb = new StringBuilder(); - sb.append("End of LogType:" + logFile + "."); - if (request.getContainerState() == ContainerState.RUNNING) { - sb.append(" This log file belongs" - + " to a running container (" + containerIdStr + ") and so may" - + " not be complete."); - } - out.println(sb.toString()); out.flush(); foundAnyLogs = true; } catch (ClientHandlerException | UniformInterfaceException ex) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java index e1177364dab..d83a8ae357c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java @@ -20,11 +20,17 @@ package org.apache.hadoop.yarn.logaggregation; import java.io.DataInputStream; import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang.StringUtils; import org.apache.commons.math3.util.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.HarFs; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; @@ -40,6 +46,9 @@ public final class LogToolUtils { private LogToolUtils() {} + public static final String CONTAINER_ON_NODE_PATTERN = + "Container: %s on %s"; + /** * Return a list of {@link ContainerLogMeta} for a container * from Remote FileSystem. @@ -114,4 +123,153 @@ public final class LogToolUtils { } return containersLogMeta; } + + /** + * Output container log. + * @param containerId the containerId + * @param nodeId the nodeId + * @param fileName the log file name + * @param fileLength the log file length + * @param outputSize the output size + * @param lastModifiedTime the log file last modified time + * @param fis the log file input stream + * @param os the output stream + * @param buf the buffer + * @param logType the log type. + * @throws IOException if we can not access the log file. + */ + public static void outputContainerLog(String containerId, String nodeId, + String fileName, long fileLength, long outputSize, + String lastModifiedTime, InputStream fis, OutputStream os, + byte[] buf, ContainerLogType logType) throws IOException { + long toSkip = 0; + long totalBytesToRead = fileLength; + long skipAfterRead = 0; + if (outputSize < 0) { + long absBytes = Math.abs(outputSize); + if (absBytes < fileLength) { + toSkip = fileLength - absBytes; + totalBytesToRead = absBytes; + } + org.apache.hadoop.io.IOUtils.skipFully(fis, toSkip); + } else { + if (outputSize < fileLength) { + totalBytesToRead = outputSize; + skipAfterRead = fileLength - outputSize; + } + } + + long curRead = 0; + long pendingRead = totalBytesToRead - curRead; + int toRead = pendingRead > buf.length ? buf.length + : (int) pendingRead; + int len = fis.read(buf, 0, toRead); + boolean keepGoing = (len != -1 && curRead < totalBytesToRead); + if (keepGoing) { + StringBuilder sb = new StringBuilder(); + String containerStr = String.format( + LogToolUtils.CONTAINER_ON_NODE_PATTERN, + containerId, nodeId); + sb.append(containerStr + "\n"); + sb.append("LogType: " + logType + "\n"); + sb.append(StringUtils.repeat("=", containerStr.length()) + "\n"); + sb.append("FileName:" + fileName + "\n"); + sb.append("LogLastModifiedTime:" + lastModifiedTime + "\n"); + sb.append("LogLength:" + Long.toString(fileLength) + "\n"); + sb.append("LogContents:\n"); + byte[] b = sb.toString().getBytes( + Charset.forName("UTF-8")); + os.write(b, 0, b.length); + } + while (keepGoing) { + os.write(buf, 0, len); + curRead += len; + + pendingRead = totalBytesToRead - curRead; + toRead = pendingRead > buf.length ? buf.length + : (int) pendingRead; + len = fis.read(buf, 0, toRead); + keepGoing = (len != -1 && curRead < totalBytesToRead); + } + org.apache.hadoop.io.IOUtils.skipFully(fis, skipAfterRead); + os.flush(); + } + + public static boolean outputAggregatedContainerLog(Configuration conf, + ApplicationId appId, String appOwner, + String containerId, String nodeId, + String logFileName, long outputSize, OutputStream os, + byte[] buf) throws IOException { + boolean findLogs = false; + RemoteIterator nodeFiles = LogAggregationUtils + .getRemoteNodeFileDir(conf, appId, appOwner); + while (nodeFiles != null && nodeFiles.hasNext()) { + final FileStatus thisNodeFile = nodeFiles.next(); + String nodeName = thisNodeFile.getPath().getName(); + if (nodeName.equals(appId + ".har")) { + Path p = new Path("har:///" + + thisNodeFile.getPath().toUri().getRawPath()); + nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); + continue; + } + if ((nodeId == null || nodeName.contains(LogAggregationUtils + .getNodeString(nodeId))) && !nodeName.endsWith( + LogAggregationUtils.TMP_FILE_SUFFIX)) { + AggregatedLogFormat.LogReader reader = null; + try { + reader = new AggregatedLogFormat.LogReader(conf, + thisNodeFile.getPath()); + DataInputStream valueStream; + LogKey key = new LogKey(); + valueStream = reader.next(key); + while (valueStream != null && !key.toString() + .equals(containerId)) { + // Next container + key = new LogKey(); + valueStream = reader.next(key); + } + if (valueStream == null) { + continue; + } + while (true) { + try { + String fileType = valueStream.readUTF(); + String fileLengthStr = valueStream.readUTF(); + long fileLength = Long.parseLong(fileLengthStr); + if (fileType.equalsIgnoreCase(logFileName)) { + LogToolUtils.outputContainerLog(containerId, + nodeId, fileType, fileLength, outputSize, + Times.format(thisNodeFile.getModificationTime()), + valueStream, os, buf, ContainerLogType.AGGREGATED); + StringBuilder sb = new StringBuilder(); + String endOfFile = "End of LogFile:" + fileType; + sb.append("\n" + endOfFile + "\n"); + sb.append(StringUtils.repeat("*", endOfFile.length() + 50) + + "\n\n"); + byte[] b = sb.toString().getBytes(Charset.forName("UTF-8")); + os.write(b, 0, b.length); + findLogs = true; + } else { + long totalSkipped = 0; + long currSkipped = 0; + while (currSkipped != -1 && totalSkipped < fileLength) { + currSkipped = valueStream.skip( + fileLength - totalSkipped); + totalSkipped += currSkipped; + } + } + } catch (EOFException eof) { + break; + } + } + } finally { + if (reader != null) { + reader.close(); + } + } + } + } + os.flush(); + return findLogs; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java index 9bac4747792..a10bfac704f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; -import java.io.DataInputStream; -import java.io.EOFException; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.Charset; @@ -43,12 +41,10 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Response.Status; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.http.JettyUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -56,13 +52,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.ApplicationBaseProtocol; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; import org.apache.hadoop.yarn.logaggregation.ContainerLogType; -import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.logaggregation.LogToolUtils; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.server.webapp.WebServices; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo; @@ -71,11 +63,11 @@ import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; -import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.NotFoundException; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -360,27 +352,27 @@ public class AHSWebServices extends WebServices { } catch (Exception ex) { // directly find logs from HDFS. return sendStreamOutputResponse(appId, null, null, containerIdStr, - filename, format, length); + filename, format, length, false); } String appOwner = appInfo.getUser(); - - ContainerInfo containerInfo; - try { - containerInfo = super.getContainer( - req, res, appId.toString(), - containerId.getApplicationAttemptId().toString(), - containerId.toString()); - } catch (Exception ex) { - if (isFinishedState(appInfo.getAppState())) { - // directly find logs from HDFS. - return sendStreamOutputResponse(appId, appOwner, null, containerIdStr, - filename, format, length); - } - return createBadResponse(Status.INTERNAL_SERVER_ERROR, - "Can not get ContainerInfo for the container: " + containerId); + if (isFinishedState(appInfo.getAppState())) { + // directly find logs from HDFS. + return sendStreamOutputResponse(appId, appOwner, null, containerIdStr, + filename, format, length, false); } - String nodeId = containerInfo.getNodeId(); + if (isRunningState(appInfo.getAppState())) { + ContainerInfo containerInfo; + try { + containerInfo = super.getContainer( + req, res, appId.toString(), + containerId.getApplicationAttemptId().toString(), + containerId.toString()); + } catch (Exception ex) { + // output the aggregated logs + return sendStreamOutputResponse(appId, appOwner, null, containerIdStr, + filename, format, length, true); + } String nodeHttpAddress = containerInfo.getNodeHttpAddress(); String uri = "/" + containerId.toString() + "/logs/" + filename; String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri); @@ -392,9 +384,6 @@ public class AHSWebServices extends WebServices { HttpServletResponse.SC_TEMPORARY_REDIRECT); response.header("Location", resURI); return response.build(); - } else if (isFinishedState(appInfo.getAppState())) { - return sendStreamOutputResponse(appId, appOwner, nodeId, - containerIdStr, filename, format, length); } else { return createBadResponse(Status.NOT_FOUND, "The application is not at Running or Finished State."); @@ -419,7 +408,8 @@ public class AHSWebServices extends WebServices { private Response sendStreamOutputResponse(ApplicationId appId, String appOwner, String nodeId, String containerIdStr, - String fileName, String format, long bytes) { + String fileName, String format, long bytes, + boolean printEmptyLocalContainerLog) { String contentType = WebAppUtils.getDefaultLogContentType(); if (format != null && !format.isEmpty()) { contentType = WebAppUtils.getSupportedLogContentType(format); @@ -433,15 +423,11 @@ public class AHSWebServices extends WebServices { StreamingOutput stream = null; try { stream = getStreamingOutput(appId, appOwner, nodeId, - containerIdStr, fileName, bytes); + containerIdStr, fileName, bytes, printEmptyLocalContainerLog); } catch (Exception ex) { return createBadResponse(Status.INTERNAL_SERVER_ERROR, ex.getMessage()); } - if (stream == null) { - return createBadResponse(Status.INTERNAL_SERVER_ERROR, - "Can not get log for container: " + containerIdStr); - } ResponseBuilder response = Response.ok(stream); response.header("Content-Type", contentType); // Sending the X-Content-Type-Options response header with the value @@ -451,146 +437,30 @@ public class AHSWebServices extends WebServices { return response.build(); } - private StreamingOutput getStreamingOutput(ApplicationId appId, - String appOwner, final String nodeId, final String containerIdStr, - final String logFile, final long bytes) throws IOException{ - String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); - org.apache.hadoop.fs.Path remoteRootLogDir = new org.apache.hadoop.fs.Path( - conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); - org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir = - FileContext.getFileContext(conf).makeQualified(remoteRootLogDir); - FileContext fc = FileContext.getFileContext( - qualifiedRemoteRootLogDir.toUri(), conf); - org.apache.hadoop.fs.Path remoteAppDir = null; - if (appOwner == null) { - org.apache.hadoop.fs.Path toMatch = LogAggregationUtils - .getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix); - FileStatus[] matching = fc.util().globStatus(toMatch); - if (matching == null || matching.length != 1) { - return null; - } - remoteAppDir = matching[0].getPath(); - } else { - remoteAppDir = LogAggregationUtils - .getRemoteAppLogDir(remoteRootLogDir, appId, appOwner, suffix); - } - final RemoteIterator nodeFiles; - nodeFiles = fc.listStatus(remoteAppDir); - if (!nodeFiles.hasNext()) { - return null; - } - + private StreamingOutput getStreamingOutput(final ApplicationId appId, + final String appOwner, final String nodeId, final String containerIdStr, + final String logFile, final long bytes, + final boolean printEmptyLocalContainerLog) throws IOException{ StreamingOutput stream = new StreamingOutput() { @Override public void write(OutputStream os) throws IOException, WebApplicationException { byte[] buf = new byte[65535]; - boolean findLogs = false; - while (nodeFiles.hasNext()) { - final FileStatus thisNodeFile = nodeFiles.next(); - String nodeName = thisNodeFile.getPath().getName(); - if ((nodeId == null || nodeName.contains(LogAggregationUtils - .getNodeString(nodeId))) && !nodeName.endsWith( - LogAggregationUtils.TMP_FILE_SUFFIX)) { - AggregatedLogFormat.LogReader reader = null; - try { - reader = new AggregatedLogFormat.LogReader(conf, - thisNodeFile.getPath()); - DataInputStream valueStream; - LogKey key = new LogKey(); - valueStream = reader.next(key); - while (valueStream != null && !key.toString() - .equals(containerIdStr)) { - // Next container - key = new LogKey(); - valueStream = reader.next(key); - } - if (valueStream == null) { - continue; - } - while (true) { - try { - String fileType = valueStream.readUTF(); - String fileLengthStr = valueStream.readUTF(); - long fileLength = Long.parseLong(fileLengthStr); - if (fileType.equalsIgnoreCase(logFile)) { - StringBuilder sb = new StringBuilder(); - sb.append("LogType:"); - sb.append(fileType + "\n"); - sb.append("Log Upload Time:"); - sb.append(Times.format(System.currentTimeMillis()) + "\n"); - sb.append("LogLength:"); - sb.append(fileLengthStr + "\n"); - sb.append("Log Contents:\n"); - byte[] b = sb.toString().getBytes( - Charset.forName("UTF-8")); - os.write(b, 0, b.length); - - long toSkip = 0; - long totalBytesToRead = fileLength; - long skipAfterRead = 0; - if (bytes < 0) { - long absBytes = Math.abs(bytes); - if (absBytes < fileLength) { - toSkip = fileLength - absBytes; - totalBytesToRead = absBytes; - } - org.apache.hadoop.io.IOUtils.skipFully( - valueStream, toSkip); - } else { - if (bytes < fileLength) { - totalBytesToRead = bytes; - skipAfterRead = fileLength - bytes; - } - } - - long curRead = 0; - long pendingRead = totalBytesToRead - curRead; - int toRead = pendingRead > buf.length ? buf.length - : (int) pendingRead; - int len = valueStream.read(buf, 0, toRead); - while (len != -1 && curRead < totalBytesToRead) { - os.write(buf, 0, len); - curRead += len; - - pendingRead = totalBytesToRead - curRead; - toRead = pendingRead > buf.length ? buf.length - : (int) pendingRead; - len = valueStream.read(buf, 0, toRead); - } - org.apache.hadoop.io.IOUtils.skipFully( - valueStream, skipAfterRead); - sb = new StringBuilder(); - sb.append("\nEnd of LogType:" + fileType + "\n"); - b = sb.toString().getBytes(Charset.forName("UTF-8")); - os.write(b, 0, b.length); - findLogs = true; - } else { - long totalSkipped = 0; - long currSkipped = 0; - while (currSkipped != -1 && totalSkipped < fileLength) { - currSkipped = valueStream.skip( - fileLength - totalSkipped); - totalSkipped += currSkipped; - } - } - } catch (EOFException eof) { - break; - } - } - } finally { - if (reader != null) { - reader.close(); - } - } - } - } - os.flush(); + boolean findLogs = LogToolUtils.outputAggregatedContainerLog(conf, + appId, appOwner, containerIdStr, nodeId, logFile, bytes, os, buf); if (!findLogs) { throw new IOException("Can not find logs for container:" + containerIdStr); + } else { + if (printEmptyLocalContainerLog) { + StringBuilder sb = new StringBuilder(); + sb.append(containerIdStr + "\n"); + sb.append("LogType: " + ContainerLogType.LOCAL + "\n"); + sb.append("LogContents:\n"); + sb.append(getNoRedirectWarning() + "\n"); + os.write(sb.toString().getBytes(Charset.forName("UTF-8"))); + } } } }; @@ -640,4 +510,12 @@ public class AHSWebServices extends WebServices { throw new WebApplicationException(ex); } } + + @Private + @VisibleForTesting + public static String getNoRedirectWarning() { + return "We do not have NodeManager web address, so we can not " + + "re-direct the request to related NodeManager " + + "for local container logs."; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java index f553bb06105..3d1c9012baf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java @@ -35,7 +35,7 @@ import javax.servlet.FilterConfig; import javax.servlet.ServletException; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.MediaType; - +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -584,7 +584,10 @@ public class TestAHSWebServices extends JerseyTestBase { responseText = response.getEntity(String.class); assertTrue(responseText.contains("Hello." + containerId1ForApp100)); int fullTextSize = responseText.getBytes().length; - int tailTextSize = "\nEnd of LogType:syslog\n".getBytes().length; + String tailEndSeparator = StringUtils.repeat("*", + "End of LogFile:syslog".length() + 50) + "\n\n"; + int tailTextSize = "\nEnd of LogFile:syslog\n".getBytes().length + + tailEndSeparator.getBytes().length; String logMessage = "Hello." + containerId1ForApp100; int fileContentSize = logMessage.getBytes().length; @@ -685,6 +688,28 @@ public class TestAHSWebServices extends JerseyTestBase { assertTrue(redirectURL.contains(containerId1.toString())); assertTrue(redirectURL.contains("/logs/" + fileName)); assertTrue(redirectURL.contains("user.name=" + user)); + + // If we can not container information from ATS, we would try to + // get aggregated log from remote FileSystem. + ContainerId containerId1000 = ContainerId.newContainerId( + appAttemptId, 1000); + String content = "Hello." + containerId1000; + NodeId nodeId = NodeId.newInstance("test host", 100); + TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, + rootLogDir, containerId1000, nodeId, fileName, user, content, true); + r = resource(); + ClientResponse response = r.path("ws").path("v1") + .path("applicationhistory").path("containerlogs") + .path(containerId1000.toString()).path(fileName) + .queryParam("user.name", user) + .accept(MediaType.TEXT_PLAIN) + .get(ClientResponse.class); + String responseText = response.getEntity(String.class); + assertTrue(responseText.contains(content)); + // Also test whether we output the empty local container log, and give + // the warning message. + assertTrue(responseText.contains("LogType: " + ContainerLogType.LOCAL)); + assertTrue(responseText.contains(AHSWebServices.getNoRedirectWarning())); } @Test(timeout = 10000) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java index 1357d5a1ff0..07acd4bff40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Map.Entry; @@ -41,6 +42,9 @@ import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.UriInfo; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.http.JettyUtils; @@ -57,6 +61,7 @@ import org.apache.hadoop.yarn.server.nodemanager.ResourceView; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppsInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainerInfo; @@ -64,6 +69,7 @@ import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMContainerLogsInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NodeInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo; +import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.NotFoundException; import org.apache.hadoop.yarn.webapp.WebApp; @@ -74,6 +80,7 @@ import com.google.inject.Singleton; @Singleton @Path("/ws/v1/node") public class NMWebServices { + private static final Log LOG = LogFactory.getLog(NMWebServices.class); private Context nmContext; private ResourceView rview; private WebApp webapp; @@ -330,17 +337,32 @@ public class NMWebServices { @Produces({ MediaType.TEXT_PLAIN + "; " + JettyUtils.UTF_8 }) @Public @Unstable - public Response getLogs(@PathParam("containerid") String containerIdStr, + public Response getLogs( + @PathParam("containerid") final String containerIdStr, @PathParam("filename") String filename, @QueryParam("format") String format, @QueryParam("size") String size) { - ContainerId containerId; + ContainerId tempContainerId; try { - containerId = ContainerId.fromString(containerIdStr); + tempContainerId = ContainerId.fromString(containerIdStr); } catch (IllegalArgumentException ex) { return Response.status(Status.BAD_REQUEST).build(); } - + final ContainerId containerId = tempContainerId; + boolean tempIsRunning = false; + // check what is the status for container + try { + Container container = nmContext.getContainers().get(containerId); + tempIsRunning = (container.getContainerState() == ContainerState.RUNNING); + } catch (Exception ex) { + // This NM does not have this container any more. We + // assume the container has already finished. + if (LOG.isDebugEnabled()) { + LOG.debug("Can not find the container:" + containerId + + " in this node."); + } + } + final boolean isRunning = tempIsRunning; File logFile = null; try { logFile = ContainerLogsUtils.getContainerLogFile( @@ -351,6 +373,8 @@ public class NMWebServices { return Response.serverError().entity(ex.getMessage()).build(); } final long bytes = parseLongParam(size); + final String lastModifiedTime = Times.format(logFile.lastModified()); + final String outputFileName = filename; String contentType = WebAppUtils.getDefaultLogContentType(); if (format != null && !format.isEmpty()) { contentType = WebAppUtils.getSupportedLogContentType(format); @@ -374,39 +398,40 @@ public class NMWebServices { try { int bufferSize = 65536; byte[] buf = new byte[bufferSize]; - long toSkip = 0; - long totalBytesToRead = fileLength; - long skipAfterRead = 0; - if (bytes < 0) { - long absBytes = Math.abs(bytes); - if (absBytes < fileLength) { - toSkip = fileLength - absBytes; - totalBytesToRead = absBytes; - } - org.apache.hadoop.io.IOUtils.skipFully(fis, toSkip); + LogToolUtils.outputContainerLog(containerId.toString(), + nmContext.getNodeId().toString(), outputFileName, fileLength, + bytes, lastModifiedTime, fis, os, buf, ContainerLogType.LOCAL); + StringBuilder sb = new StringBuilder(); + String endOfFile = "End of LogFile:" + outputFileName; + sb.append(endOfFile + "."); + if (isRunning) { + sb.append("This log file belongs to a running container (" + + containerIdStr + ") and so may not be complete." + "\n"); } else { - if (bytes < fileLength) { - totalBytesToRead = bytes; - skipAfterRead = fileLength - bytes; + sb.append("\n"); + } + sb.append(StringUtils.repeat("*", endOfFile.length() + 50) + + "\n\n"); + os.write(sb.toString().getBytes(Charset.forName("UTF-8"))); + // If we have aggregated logs for this container, + // output the aggregation logs as well. + ApplicationId appId = containerId.getApplicationAttemptId() + .getApplicationId(); + Application app = nmContext.getApplications().get(appId); + String appOwner = app == null ? null : app.getUser(); + try { + LogToolUtils.outputAggregatedContainerLog(nmContext.getConf(), + appId, appOwner, containerId.toString(), + nmContext.getNodeId().toString(), outputFileName, bytes, + os, buf); + } catch (Exception ex) { + // Something wrong when we try to access the aggregated log. + if (LOG.isDebugEnabled()) { + LOG.debug("Can not access the aggregated log for " + + "the container:" + containerId); + LOG.debug(ex.getMessage()); } } - - long curRead = 0; - long pendingRead = totalBytesToRead - curRead; - int toRead = pendingRead > buf.length ? buf.length - : (int) pendingRead; - int len = fis.read(buf, 0, toRead); - while (len != -1 && curRead < totalBytesToRead) { - os.write(buf, 0, len); - curRead += len; - - pendingRead = totalBytesToRead - curRead; - toRead = pendingRead > buf.length ? buf.length - : (int) pendingRead; - len = fis.read(buf, 0, toRead); - } - org.apache.hadoop.io.IOUtils.skipFully(fis, skipAfterRead); - os.flush(); } finally { IOUtils.closeQuietly(fis); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index a6d4153e499..7764ceb02a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -384,8 +384,9 @@ public class TestNMWebServices extends JerseyTestBase { ClientResponse response = r.path(filename) .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); String responseText = response.getEntity(String.class); - assertEquals(logMessage, responseText); - int fullTextSize = responseText.getBytes().length; + String responseLogMessage = getLogContext(responseText); + assertEquals(logMessage, responseLogMessage); + int fullTextSize = responseLogMessage.getBytes().length; // specify how many bytes we should get from logs // specify a position number, it would get the first n bytes from @@ -394,9 +395,10 @@ public class TestNMWebServices extends JerseyTestBase { .queryParam("size", "5") .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); responseText = response.getEntity(String.class); - assertEquals(5, responseText.getBytes().length); - assertEquals(new String(logMessage.getBytes(), 0, 5), responseText); - assertTrue(fullTextSize >= responseText.getBytes().length); + responseLogMessage = getLogContext(responseText); + assertEquals(5, responseLogMessage.getBytes().length); + assertEquals(new String(logMessage.getBytes(), 0, 5), responseLogMessage); + assertTrue(fullTextSize >= responseLogMessage.getBytes().length); // specify the bytes which is larger than the actual file size, // we would get the full logs @@ -404,8 +406,9 @@ public class TestNMWebServices extends JerseyTestBase { .queryParam("size", "10000") .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); responseText = response.getEntity(String.class); - assertEquals(fullTextSize, responseText.getBytes().length); - assertEquals(logMessage, responseText); + responseLogMessage = getLogContext(responseText); + assertEquals(fullTextSize, responseLogMessage.getBytes().length); + assertEquals(logMessage, responseLogMessage); // specify a negative number, it would get the last n bytes from // container log @@ -413,25 +416,28 @@ public class TestNMWebServices extends JerseyTestBase { .queryParam("size", "-5") .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); responseText = response.getEntity(String.class); - assertEquals(5, responseText.getBytes().length); + responseLogMessage = getLogContext(responseText); + assertEquals(5, responseLogMessage.getBytes().length); assertEquals(new String(logMessage.getBytes(), - logMessage.getBytes().length - 5, 5), responseText); - assertTrue(fullTextSize >= responseText.getBytes().length); + logMessage.getBytes().length - 5, 5), responseLogMessage); + assertTrue(fullTextSize >= responseLogMessage.getBytes().length); response = r.path(filename) .queryParam("size", "-10000") .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); responseText = response.getEntity(String.class); + responseLogMessage = getLogContext(responseText); assertEquals("text/plain; charset=utf-8", response.getType().toString()); - assertEquals(fullTextSize, responseText.getBytes().length); - assertEquals(logMessage, responseText); + assertEquals(fullTextSize, responseLogMessage.getBytes().length); + assertEquals(logMessage, responseLogMessage); // ask and download it response = r.path(filename) .queryParam("format", "octet-stream") .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); responseText = response.getEntity(String.class); - assertEquals(logMessage, responseText); + responseLogMessage = getLogContext(responseText); + assertEquals(logMessage, responseLogMessage); assertEquals(200, response.getStatus()); assertEquals("application/octet-stream; charset=utf-8", response.getType().toString()); @@ -475,10 +481,11 @@ public class TestNMWebServices extends JerseyTestBase { TestNMWebServices.class.getSimpleName() + "temp-log-dir"); try { String aggregatedLogFile = filename + "-aggregated"; + String aggregatedLogMessage = "This is aggregated ;og."; TestContainerLogsUtils.createContainerLogFileInRemoteFS( nmContext.getConf(), FileSystem.get(nmContext.getConf()), tempLogDir.getAbsolutePath(), containerId, nmContext.getNodeId(), - aggregatedLogFile, "user", logMessage, true); + aggregatedLogFile, "user", aggregatedLogMessage, true); r1 = resource(); response = r1.path("ws").path("v1").path("node") .path("containers").path(containerIdStr) @@ -501,6 +508,21 @@ public class TestNMWebServices extends JerseyTestBase { assertEquals(meta.get(0).getFileName(), filename); } } + + // Test whether we could get aggregated log as well + TestContainerLogsUtils.createContainerLogFileInRemoteFS( + nmContext.getConf(), FileSystem.get(nmContext.getConf()), + tempLogDir.getAbsolutePath(), containerId, nmContext.getNodeId(), + filename, "user", aggregatedLogMessage, true); + response = r.path(filename) + .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); + responseText = response.getEntity(String.class); + assertTrue(responseText.contains("LogType: " + + ContainerLogType.AGGREGATED)); + assertTrue(responseText.contains(aggregatedLogMessage)); + assertTrue(responseText.contains("LogType: " + + ContainerLogType.LOCAL)); + assertTrue(responseText.contains(logMessage)); } finally { FileUtil.fullyDelete(tempLogDir); } @@ -511,7 +533,7 @@ public class TestNMWebServices extends JerseyTestBase { r.path(filename).accept(MediaType.TEXT_PLAIN) .get(ClientResponse.class); responseText = response.getEntity(String.class); - assertEquals(logMessage, responseText); + assertTrue(responseText.contains(logMessage)); } public void verifyNodesXML(NodeList nodes) throws JSONException, Exception { @@ -601,4 +623,11 @@ public class TestNMWebServices extends JerseyTestBase { YarnVersionInfo.getVersion(), resourceManagerVersion); } + private String getLogContext(String fullMessage) { + String prefix = "LogContents:\n"; + String postfix = "End of LogFile:"; + int prefixIndex = fullMessage.indexOf(prefix) + prefix.length(); + int postfixIndex = fullMessage.indexOf(postfix); + return fullMessage.substring(prefixIndex, postfixIndex); + } }