MAPREDUCE-4283. Display tail of aggregated logs by default (Jason Lowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1362608 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-07-17 19:01:42 +00:00
parent 177f6feaa1
commit 8e576570a9
5 changed files with 310 additions and 21 deletions

View File

@ -718,6 +718,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4449. Incorrect MR_HISTORY_STORAGE property name in JHAdminConfig
(Ahmed Radwan via bobby)
MAPREDUCE-4283. Display tail of aggregated logs by default (Jason Lowe via
bobby)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -21,6 +21,7 @@
import static org.apache.hadoop.yarn.util.StringHelper.pajoin;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
@ -60,10 +61,10 @@ public void setup() {
route(pajoin("/singletaskcounter",TASK_ID, COUNTER_GROUP, COUNTER_NAME),
HsController.class, "singleTaskCounter");
route("/about", HsController.class, "about");
route(pajoin("/logs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER),
HsController.class, "logs");
route(pajoin("/nmlogs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER),
HsController.class, "nmlogs");
route(pajoin("/logs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER,
CONTAINER_LOG_TYPE), HsController.class, "logs");
route(pajoin("/nmlogs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER,
CONTAINER_LOG_TYPE), HsController.class, "nmlogs");
}
}

View File

@ -24,9 +24,11 @@
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.io.PrintWriter;
@ -35,6 +37,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
@ -44,13 +47,14 @@
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.webapp.log.AggregatedLogsPage;
import org.apache.hadoop.yarn.webapp.test.WebAppTests;
import org.junit.Test;
import static org.mockito.Mockito.verify;
import com.google.inject.AbstractModule;
import com.google.inject.Injector;
public class TestHSWebApp {
@ -251,6 +255,64 @@ public void testLogsView2() throws IOException {
"Aggregation is not enabled. Try the nodemanager at "
+ MockJobs.NM_HOST + ":" + MockJobs.NM_PORT);
}
@Test
public void testLogsViewSingle() throws IOException {
LOG.info("HsLogsPage with params for single log and data limits");
TestAppContext ctx = new TestAppContext();
Map<String, String> params = new HashMap<String, String>();
final Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
params.put("start", "-2048");
params.put("end", "-1024");
params.put(CONTAINER_LOG_TYPE, "syslog");
params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1)
.toString());
params.put(NM_NODENAME,
BuilderUtils.newNodeId(MockJobs.NM_HOST, MockJobs.NM_PORT).toString());
params.put(ENTITY_STRING, "container_10_0001_01_000001");
params.put(APP_OWNER, "owner");
Injector injector =
WebAppTests.testPage(AggregatedLogsPage.class, AppContext.class, ctx,
params, new AbstractModule() {
@Override
protected void configure() {
bind(Configuration.class).toInstance(conf);
}
});
PrintWriter spyPw = WebAppTests.getPrintWriter(injector);
verify(spyPw).write(
"Logs not available for container_10_0001_01_000001."
+ " Aggregation may not be complete, "
+ "Check back later or try the nodemanager at "
+ MockJobs.NM_HOST + ":" + MockJobs.NM_PORT);
}
@Test
public void testLogsViewBadStartEnd() throws IOException {
LOG.info("HsLogsPage with bad start/end params");
TestAppContext ctx = new TestAppContext();
Map<String, String> params = new HashMap<String, String>();
params.put("start", "foo");
params.put("end", "bar");
params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1)
.toString());
params.put(NM_NODENAME,
BuilderUtils.newNodeId(MockJobs.NM_HOST, MockJobs.NM_PORT).toString());
params.put(ENTITY_STRING, "container_10_0001_01_000001");
params.put(APP_OWNER, "owner");
Injector injector =
WebAppTests.testPage(AggregatedLogsPage.class, AppContext.class, ctx,
params);
PrintWriter spyPw = WebAppTests.getPrintWriter(injector);
verify(spyPw).write("Invalid log start value: foo");
verify(spyPw).write("Invalid log end value: bar");
}
}

View File

@ -91,6 +91,23 @@ public LogKey(String keyString) {
this.keyString = keyString;
}
@Override
public int hashCode() {
return keyString == null ? 0 : keyString.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof LogKey) {
LogKey other = (LogKey) obj;
if (this.keyString == null) {
return other.keyString == null;
}
return this.keyString.equals(other.keyString);
}
return false;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.keyString);
@ -360,7 +377,33 @@ public DataInputStream next(LogKey key) throws IOException {
return valueStream;
}
/**
* Get a ContainerLogsReader to read the logs for
* the specified container.
*
* @param containerId
* @return object to read the container's logs or null if the
* logs could not be found
* @throws IOException
*/
public ContainerLogsReader getContainerLogsReader(
ContainerId containerId) throws IOException {
ContainerLogsReader logReader = null;
final LogKey containerKey = new LogKey(containerId);
LogKey key = new LogKey();
DataInputStream valueStream = next(key);
while (valueStream != null && !key.equals(containerKey)) {
valueStream = next(key);
}
if (valueStream != null) {
logReader = new ContainerLogsReader(valueStream);
}
return logReader;
}
//TODO Change Log format and interfaces to be containerId specific.
// Avoid returning completeValueStreams.
// public List<String> getTypesForContainer(DataInputStream valueStream){}
@ -489,4 +532,67 @@ public void close() throws IOException {
this.fsDataIStream.close();
}
}
public static class ContainerLogsReader {
private DataInputStream valueStream;
private String currentLogType = null;
private long currentLogLength = 0;
private BoundedInputStream currentLogData = null;
private InputStreamReader currentLogISR;
public ContainerLogsReader(DataInputStream stream) {
valueStream = stream;
}
public String nextLog() throws IOException {
if (currentLogData != null && currentLogLength > 0) {
// seek to the end of the current log, relying on BoundedInputStream
// to prevent seeking past the end of the current log
do {
if (currentLogData.skip(currentLogLength) < 0) {
break;
}
} while (currentLogData.read() != -1);
}
currentLogType = null;
currentLogLength = 0;
currentLogData = null;
currentLogISR = null;
try {
String logType = valueStream.readUTF();
String logLengthStr = valueStream.readUTF();
currentLogLength = Long.parseLong(logLengthStr);
currentLogData =
new BoundedInputStream(valueStream, currentLogLength);
currentLogData.setPropagateClose(false);
currentLogISR = new InputStreamReader(currentLogData);
currentLogType = logType;
} catch (EOFException e) {
}
return currentLogType;
}
public String getCurrentLogType() {
return currentLogType;
}
public long getCurrentLogLength() {
return currentLogLength;
}
public long skip(long n) throws IOException {
return currentLogData.skip(n);
}
public int read(byte[] buf, int off, int len) throws IOException {
return currentLogData.read(buf, off, len);
}
public int read(char[] buf, int off, int len) throws IOException {
return currentLogISR.read(buf, off, len);
}
}
}

View File

@ -2,10 +2,10 @@
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
import java.io.DataInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;
@ -19,10 +19,11 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import com.google.inject.Inject;
@ -41,8 +42,9 @@ protected void render(Block html) {
ContainerId containerId = verifyAndGetContainerId(html);
NodeId nodeId = verifyAndGetNodeId(html);
String appOwner = verifyAndGetAppOwner(html);
LogLimits logLimits = verifyAndGetLogLimits(html);
if (containerId == null || nodeId == null || appOwner == null
|| appOwner.isEmpty()) {
|| appOwner.isEmpty() || logLimits == null) {
return;
}
@ -113,24 +115,29 @@ protected void render(Block html) {
return;
}
DataInputStream valueStream;
LogKey key = new LogKey();
String desiredLogType = $(CONTAINER_LOG_TYPE);
try {
valueStream = reader.next(key);
while (valueStream != null
&& !key.toString().equals(containerId.toString())) {
valueStream = reader.next(key);
}
if (valueStream == null) {
AggregatedLogFormat.ContainerLogsReader logReader =
reader.getContainerLogsReader(containerId);
if (logReader == null) {
html.h1()._(
"Logs not available for " + logEntity
+ ". Could be caused by the rentention policy")._();
return;
}
writer().write("<pre>");
AggregatedLogFormat.LogReader.readAcontainerLogs(valueStream, writer());
writer().write("</pre>");
return;
boolean foundLog = readContainerLogs(html, logReader, logLimits,
desiredLogType);
if (!foundLog) {
if (desiredLogType.isEmpty()) {
html.h1("No logs available for container " + containerId.toString());
} else {
html.h1("Unable to locate '" + desiredLogType
+ "' log for container " + containerId.toString());
}
return;
}
} catch (IOException e) {
html.h1()._("Error getting logs for " + logEntity)._();
LOG.error("Error getting logs for " + logEntity, e);
@ -138,6 +145,76 @@ protected void render(Block html) {
}
}
private boolean readContainerLogs(Block html,
AggregatedLogFormat.ContainerLogsReader logReader, LogLimits logLimits,
String desiredLogType) throws IOException {
int bufferSize = 65536;
char[] cbuf = new char[bufferSize];
boolean foundLog = false;
String logType = logReader.nextLog();
while (logType != null) {
if (desiredLogType == null || desiredLogType.isEmpty()
|| desiredLogType.equals(logType)) {
long logLength = logReader.getCurrentLogLength();
if (foundLog) {
html.pre()._("\n\n")._();
}
html.p()._("Log Type: " + logType)._();
html.p()._("Log Length: " + Long.toString(logLength))._();
long start = logLimits.start < 0
? logLength + logLimits.start : logLimits.start;
start = start < 0 ? 0 : start;
start = start > logLength ? logLength : start;
long end = logLimits.end < 0
? logLength + logLimits.end : logLimits.end;
end = end < 0 ? 0 : end;
end = end > logLength ? logLength : end;
end = end < start ? start : end;
long toRead = end - start;
if (toRead < logLength) {
html.p()._("Showing " + toRead + " bytes of " + logLength
+ " total. Click ")
.a(url("logs", $(NM_NODENAME), $(CONTAINER_ID),
$(ENTITY_STRING), $(APP_OWNER),
logType, "?start=0"), "here").
_(" for the full log.")._();
}
long totalSkipped = 0;
while (totalSkipped < start) {
long ret = logReader.skip(start - totalSkipped);
if (ret < 0) {
throw new IOException( "Premature EOF from container log");
}
totalSkipped += ret;
}
int len = 0;
int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
PRE<Hamlet> pre = html.pre();
while (toRead > 0
&& (len = logReader.read(cbuf, 0, currentToRead)) > 0) {
pre._(new String(cbuf, 0, len));
toRead = toRead - len;
currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
}
pre._();
foundLog = true;
}
logType = logReader.nextLog();
}
return foundLog;
}
private ContainerId verifyAndGetContainerId(Block html) {
String containerIdStr = $(CONTAINER_ID);
if (containerIdStr == null || containerIdStr.isEmpty()) {
@ -180,4 +257,44 @@ private String verifyAndGetAppOwner(Block html) {
}
return appOwner;
}
private static class LogLimits {
long start;
long end;
}
private LogLimits verifyAndGetLogLimits(Block html) {
long start = -4096;
long end = Long.MAX_VALUE;
boolean isValid = true;
String startStr = $("start");
if (startStr != null && !startStr.isEmpty()) {
try {
start = Long.parseLong(startStr);
} catch (NumberFormatException e) {
isValid = false;
html.h1()._("Invalid log start value: " + startStr)._();
}
}
String endStr = $("end");
if (endStr != null && !endStr.isEmpty()) {
try {
end = Long.parseLong(endStr);
} catch (NumberFormatException e) {
isValid = false;
html.h1()._("Invalid log end value: " + endStr)._();
}
}
if (!isValid) {
return null;
}
LogLimits limits = new LogLimits();
limits.start = start;
limits.end = end;
return limits;
}
}