From 8e576570a99270f2db4bd722f451cde0a5c3e4a4 Mon Sep 17 00:00:00 2001 From: Robert Joseph Evans Date: Tue, 17 Jul 2012 19:01:42 +0000 Subject: [PATCH] MAPREDUCE-4283. Display tail of aggregated logs by default (Jason Lowe via bobby) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1362608 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapreduce/v2/hs/webapp/HsWebApp.java | 9 +- .../mapreduce/v2/hs/webapp/TestHSWebApp.java | 64 +++++++- .../logaggregation/AggregatedLogFormat.java | 108 ++++++++++++- .../yarn/webapp/log/AggregatedLogsBlock.java | 147 ++++++++++++++++-- 5 files changed, 310 insertions(+), 21 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 2c24324e777..fdb96de34a9 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -718,6 +718,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4449. Incorrect MR_HISTORY_STORAGE property name in JHAdminConfig (Ahmed Radwan via bobby) + MAPREDUCE-4283. Display tail of aggregated logs by default (Jason Lowe via + bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java index 76991a27cb9..ebc6d46c1fc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java @@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.hs.webapp; import static org.apache.hadoop.yarn.util.StringHelper.pajoin; import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER; import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE; import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING; import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME; @@ -60,10 +61,10 @@ public class HsWebApp extends WebApp implements AMParams { route(pajoin("/singletaskcounter",TASK_ID, COUNTER_GROUP, COUNTER_NAME), HsController.class, "singleTaskCounter"); route("/about", HsController.class, "about"); - route(pajoin("/logs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER), - HsController.class, "logs"); - route(pajoin("/nmlogs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER), - HsController.class, "nmlogs"); + route(pajoin("/logs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER, + CONTAINER_LOG_TYPE), HsController.class, "logs"); + route(pajoin("/nmlogs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER, + CONTAINER_LOG_TYPE), HsController.class, "nmlogs"); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java index 0fb1f7544f8..c617be58890 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java @@ -24,9 +24,11 @@ import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.JOB_ID; import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE; import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER; import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE; import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING; import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.verify; import java.io.IOException; import java.io.PrintWriter; @@ -35,6 +37,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.MockJobs; @@ -44,13 +47,14 @@ import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.ClusterInfo; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.webapp.log.AggregatedLogsPage; import org.apache.hadoop.yarn.webapp.test.WebAppTests; import org.junit.Test; -import static org.mockito.Mockito.verify; +import com.google.inject.AbstractModule; import com.google.inject.Injector; public class TestHSWebApp { @@ -251,6 +255,64 @@ public class TestHSWebApp { "Aggregation is not enabled. Try the nodemanager at " + MockJobs.NM_HOST + ":" + MockJobs.NM_PORT); } + + @Test + public void testLogsViewSingle() throws IOException { + LOG.info("HsLogsPage with params for single log and data limits"); + TestAppContext ctx = new TestAppContext(); + Map params = new HashMap(); + + final Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + + params.put("start", "-2048"); + params.put("end", "-1024"); + params.put(CONTAINER_LOG_TYPE, "syslog"); + params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1) + .toString()); + params.put(NM_NODENAME, + BuilderUtils.newNodeId(MockJobs.NM_HOST, MockJobs.NM_PORT).toString()); + params.put(ENTITY_STRING, "container_10_0001_01_000001"); + params.put(APP_OWNER, "owner"); + + Injector injector = + WebAppTests.testPage(AggregatedLogsPage.class, AppContext.class, ctx, + params, new AbstractModule() { + @Override + protected void configure() { + bind(Configuration.class).toInstance(conf); + } + }); + PrintWriter spyPw = WebAppTests.getPrintWriter(injector); + verify(spyPw).write( + "Logs not available for container_10_0001_01_000001." + + " Aggregation may not be complete, " + + "Check back later or try the nodemanager at " + + MockJobs.NM_HOST + ":" + MockJobs.NM_PORT); + } + + @Test + public void testLogsViewBadStartEnd() throws IOException { + LOG.info("HsLogsPage with bad start/end params"); + TestAppContext ctx = new TestAppContext(); + Map params = new HashMap(); + + params.put("start", "foo"); + params.put("end", "bar"); + params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1) + .toString()); + params.put(NM_NODENAME, + BuilderUtils.newNodeId(MockJobs.NM_HOST, MockJobs.NM_PORT).toString()); + params.put(ENTITY_STRING, "container_10_0001_01_000001"); + params.put(APP_OWNER, "owner"); + + Injector injector = + WebAppTests.testPage(AggregatedLogsPage.class, AppContext.class, ctx, + params); + PrintWriter spyPw = WebAppTests.getPrintWriter(injector); + verify(spyPw).write("Invalid log start value: foo"); + verify(spyPw).write("Invalid log end value: bar"); + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index dfd4c257d3f..407fc9ca26e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -91,6 +91,23 @@ public class AggregatedLogFormat { this.keyString = keyString; } + @Override + public int hashCode() { + return keyString == null ? 0 : keyString.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof LogKey) { + LogKey other = (LogKey) obj; + if (this.keyString == null) { + return other.keyString == null; + } + return this.keyString.equals(other.keyString); + } + return false; + } + @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.keyString); @@ -360,7 +377,33 @@ public class AggregatedLogFormat { return valueStream; } - + /** + * Get a ContainerLogsReader to read the logs for + * the specified container. + * + * @param containerId + * @return object to read the container's logs or null if the + * logs could not be found + * @throws IOException + */ + public ContainerLogsReader getContainerLogsReader( + ContainerId containerId) throws IOException { + ContainerLogsReader logReader = null; + + final LogKey containerKey = new LogKey(containerId); + LogKey key = new LogKey(); + DataInputStream valueStream = next(key); + while (valueStream != null && !key.equals(containerKey)) { + valueStream = next(key); + } + + if (valueStream != null) { + logReader = new ContainerLogsReader(valueStream); + } + + return logReader; + } + //TODO Change Log format and interfaces to be containerId specific. // Avoid returning completeValueStreams. // public List getTypesForContainer(DataInputStream valueStream){} @@ -489,4 +532,67 @@ public class AggregatedLogFormat { this.fsDataIStream.close(); } } + + public static class ContainerLogsReader { + private DataInputStream valueStream; + private String currentLogType = null; + private long currentLogLength = 0; + private BoundedInputStream currentLogData = null; + private InputStreamReader currentLogISR; + + public ContainerLogsReader(DataInputStream stream) { + valueStream = stream; + } + + public String nextLog() throws IOException { + if (currentLogData != null && currentLogLength > 0) { + // seek to the end of the current log, relying on BoundedInputStream + // to prevent seeking past the end of the current log + do { + if (currentLogData.skip(currentLogLength) < 0) { + break; + } + } while (currentLogData.read() != -1); + } + + currentLogType = null; + currentLogLength = 0; + currentLogData = null; + currentLogISR = null; + + try { + String logType = valueStream.readUTF(); + String logLengthStr = valueStream.readUTF(); + currentLogLength = Long.parseLong(logLengthStr); + currentLogData = + new BoundedInputStream(valueStream, currentLogLength); + currentLogData.setPropagateClose(false); + currentLogISR = new InputStreamReader(currentLogData); + currentLogType = logType; + } catch (EOFException e) { + } + + return currentLogType; + } + + public String getCurrentLogType() { + return currentLogType; + } + + public long getCurrentLogLength() { + return currentLogLength; + } + + public long skip(long n) throws IOException { + return currentLogData.skip(n); + } + + public int read(byte[] buf, int off, int len) throws IOException { + return currentLogData.read(buf, off, len); + } + + public int read(char[] buf, int off, int len) throws IOException { + return currentLogISR.read(buf, off, len); + } + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java index 766bf867b96..f43b5745a23 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java @@ -2,10 +2,10 @@ package org.apache.hadoop.yarn.webapp.log; import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER; import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE; import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING; import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME; -import java.io.DataInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Map; @@ -19,10 +19,11 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE; import org.apache.hadoop.yarn.webapp.view.HtmlBlock; import com.google.inject.Inject; @@ -41,8 +42,9 @@ public class AggregatedLogsBlock extends HtmlBlock { ContainerId containerId = verifyAndGetContainerId(html); NodeId nodeId = verifyAndGetNodeId(html); String appOwner = verifyAndGetAppOwner(html); + LogLimits logLimits = verifyAndGetLogLimits(html); if (containerId == null || nodeId == null || appOwner == null - || appOwner.isEmpty()) { + || appOwner.isEmpty() || logLimits == null) { return; } @@ -113,24 +115,29 @@ public class AggregatedLogsBlock extends HtmlBlock { return; } - DataInputStream valueStream; - LogKey key = new LogKey(); + String desiredLogType = $(CONTAINER_LOG_TYPE); try { - valueStream = reader.next(key); - while (valueStream != null - && !key.toString().equals(containerId.toString())) { - valueStream = reader.next(key); - } - if (valueStream == null) { + AggregatedLogFormat.ContainerLogsReader logReader = + reader.getContainerLogsReader(containerId); + if (logReader == null) { html.h1()._( "Logs not available for " + logEntity + ". Could be caused by the rentention policy")._(); return; } - writer().write("
");
-      AggregatedLogFormat.LogReader.readAcontainerLogs(valueStream, writer());
-      writer().write("
"); - return; + + boolean foundLog = readContainerLogs(html, logReader, logLimits, + desiredLogType); + + if (!foundLog) { + if (desiredLogType.isEmpty()) { + html.h1("No logs available for container " + containerId.toString()); + } else { + html.h1("Unable to locate '" + desiredLogType + + "' log for container " + containerId.toString()); + } + return; + } } catch (IOException e) { html.h1()._("Error getting logs for " + logEntity)._(); LOG.error("Error getting logs for " + logEntity, e); @@ -138,6 +145,76 @@ public class AggregatedLogsBlock extends HtmlBlock { } } + private boolean readContainerLogs(Block html, + AggregatedLogFormat.ContainerLogsReader logReader, LogLimits logLimits, + String desiredLogType) throws IOException { + int bufferSize = 65536; + char[] cbuf = new char[bufferSize]; + + boolean foundLog = false; + String logType = logReader.nextLog(); + while (logType != null) { + if (desiredLogType == null || desiredLogType.isEmpty() + || desiredLogType.equals(logType)) { + long logLength = logReader.getCurrentLogLength(); + + if (foundLog) { + html.pre()._("\n\n")._(); + } + + html.p()._("Log Type: " + logType)._(); + html.p()._("Log Length: " + Long.toString(logLength))._(); + + long start = logLimits.start < 0 + ? logLength + logLimits.start : logLimits.start; + start = start < 0 ? 0 : start; + start = start > logLength ? logLength : start; + long end = logLimits.end < 0 + ? logLength + logLimits.end : logLimits.end; + end = end < 0 ? 0 : end; + end = end > logLength ? logLength : end; + end = end < start ? start : end; + + long toRead = end - start; + if (toRead < logLength) { + html.p()._("Showing " + toRead + " bytes of " + logLength + + " total. Click ") + .a(url("logs", $(NM_NODENAME), $(CONTAINER_ID), + $(ENTITY_STRING), $(APP_OWNER), + logType, "?start=0"), "here"). + _(" for the full log.")._(); + } + + long totalSkipped = 0; + while (totalSkipped < start) { + long ret = logReader.skip(start - totalSkipped); + if (ret < 0) { + throw new IOException( "Premature EOF from container log"); + } + totalSkipped += ret; + } + + int len = 0; + int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; + PRE pre = html.pre(); + + while (toRead > 0 + && (len = logReader.read(cbuf, 0, currentToRead)) > 0) { + pre._(new String(cbuf, 0, len)); + toRead = toRead - len; + currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; + } + + pre._(); + foundLog = true; + } + + logType = logReader.nextLog(); + } + + return foundLog; + } + private ContainerId verifyAndGetContainerId(Block html) { String containerIdStr = $(CONTAINER_ID); if (containerIdStr == null || containerIdStr.isEmpty()) { @@ -180,4 +257,44 @@ public class AggregatedLogsBlock extends HtmlBlock { } return appOwner; } + + private static class LogLimits { + long start; + long end; + } + + private LogLimits verifyAndGetLogLimits(Block html) { + long start = -4096; + long end = Long.MAX_VALUE; + boolean isValid = true; + + String startStr = $("start"); + if (startStr != null && !startStr.isEmpty()) { + try { + start = Long.parseLong(startStr); + } catch (NumberFormatException e) { + isValid = false; + html.h1()._("Invalid log start value: " + startStr)._(); + } + } + + String endStr = $("end"); + if (endStr != null && !endStr.isEmpty()) { + try { + end = Long.parseLong(endStr); + } catch (NumberFormatException e) { + isValid = false; + html.h1()._("Invalid log end value: " + endStr)._(); + } + } + + if (!isValid) { + return null; + } + + LogLimits limits = new LogLimits(); + limits.start = start; + limits.end = end; + return limits; + } } \ No newline at end of file