svn merge -c 1362608 FIXES: MAPREDUCE-4283. Display tail of aggregated logs by default (Jason Lowe via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1362609 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
796272a502
commit
71b35e827c
|
@ -584,6 +584,9 @@ Release 0.23.3 - UNRELEASED
|
|||
MAPREDUCE-4449. Incorrect MR_HISTORY_STORAGE property name in JHAdminConfig
|
||||
(Ahmed Radwan via bobby)
|
||||
|
||||
MAPREDUCE-4283. Display tail of aggregated logs by default (Jason Lowe via
|
||||
bobby)
|
||||
|
||||
Release 0.23.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.hs.webapp;
|
|||
import static org.apache.hadoop.yarn.util.StringHelper.pajoin;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
|
||||
|
||||
|
@ -60,10 +61,10 @@ public class HsWebApp extends WebApp implements AMParams {
|
|||
route(pajoin("/singletaskcounter",TASK_ID, COUNTER_GROUP, COUNTER_NAME),
|
||||
HsController.class, "singleTaskCounter");
|
||||
route("/about", HsController.class, "about");
|
||||
route(pajoin("/logs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER),
|
||||
HsController.class, "logs");
|
||||
route(pajoin("/nmlogs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER),
|
||||
HsController.class, "nmlogs");
|
||||
route(pajoin("/logs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER,
|
||||
CONTAINER_LOG_TYPE), HsController.class, "logs");
|
||||
route(pajoin("/nmlogs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER,
|
||||
CONTAINER_LOG_TYPE), HsController.class, "nmlogs");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,9 +24,11 @@ import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.JOB_ID;
|
|||
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
|
@ -35,6 +37,7 @@ import java.util.Map;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
|
||||
|
@ -44,13 +47,14 @@ import org.apache.hadoop.yarn.Clock;
|
|||
import org.apache.hadoop.yarn.ClusterInfo;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.webapp.log.AggregatedLogsPage;
|
||||
import org.apache.hadoop.yarn.webapp.test.WebAppTests;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.mockito.Mockito.verify;
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Injector;
|
||||
|
||||
public class TestHSWebApp {
|
||||
|
@ -251,6 +255,64 @@ public class TestHSWebApp {
|
|||
"Aggregation is not enabled. Try the nodemanager at "
|
||||
+ MockJobs.NM_HOST + ":" + MockJobs.NM_PORT);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLogsViewSingle() throws IOException {
|
||||
LOG.info("HsLogsPage with params for single log and data limits");
|
||||
TestAppContext ctx = new TestAppContext();
|
||||
Map<String, String> params = new HashMap<String, String>();
|
||||
|
||||
final Configuration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
||||
|
||||
params.put("start", "-2048");
|
||||
params.put("end", "-1024");
|
||||
params.put(CONTAINER_LOG_TYPE, "syslog");
|
||||
params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1)
|
||||
.toString());
|
||||
params.put(NM_NODENAME,
|
||||
BuilderUtils.newNodeId(MockJobs.NM_HOST, MockJobs.NM_PORT).toString());
|
||||
params.put(ENTITY_STRING, "container_10_0001_01_000001");
|
||||
params.put(APP_OWNER, "owner");
|
||||
|
||||
Injector injector =
|
||||
WebAppTests.testPage(AggregatedLogsPage.class, AppContext.class, ctx,
|
||||
params, new AbstractModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(Configuration.class).toInstance(conf);
|
||||
}
|
||||
});
|
||||
PrintWriter spyPw = WebAppTests.getPrintWriter(injector);
|
||||
verify(spyPw).write(
|
||||
"Logs not available for container_10_0001_01_000001."
|
||||
+ " Aggregation may not be complete, "
|
||||
+ "Check back later or try the nodemanager at "
|
||||
+ MockJobs.NM_HOST + ":" + MockJobs.NM_PORT);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLogsViewBadStartEnd() throws IOException {
|
||||
LOG.info("HsLogsPage with bad start/end params");
|
||||
TestAppContext ctx = new TestAppContext();
|
||||
Map<String, String> params = new HashMap<String, String>();
|
||||
|
||||
params.put("start", "foo");
|
||||
params.put("end", "bar");
|
||||
params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1)
|
||||
.toString());
|
||||
params.put(NM_NODENAME,
|
||||
BuilderUtils.newNodeId(MockJobs.NM_HOST, MockJobs.NM_PORT).toString());
|
||||
params.put(ENTITY_STRING, "container_10_0001_01_000001");
|
||||
params.put(APP_OWNER, "owner");
|
||||
|
||||
Injector injector =
|
||||
WebAppTests.testPage(AggregatedLogsPage.class, AppContext.class, ctx,
|
||||
params);
|
||||
PrintWriter spyPw = WebAppTests.getPrintWriter(injector);
|
||||
verify(spyPw).write("Invalid log start value: foo");
|
||||
verify(spyPw).write("Invalid log end value: bar");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -91,6 +91,23 @@ public class AggregatedLogFormat {
|
|||
this.keyString = keyString;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return keyString == null ? 0 : keyString.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj instanceof LogKey) {
|
||||
LogKey other = (LogKey) obj;
|
||||
if (this.keyString == null) {
|
||||
return other.keyString == null;
|
||||
}
|
||||
return this.keyString.equals(other.keyString);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
out.writeUTF(this.keyString);
|
||||
|
@ -360,7 +377,33 @@ public class AggregatedLogFormat {
|
|||
return valueStream;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get a ContainerLogsReader to read the logs for
|
||||
* the specified container.
|
||||
*
|
||||
* @param containerId
|
||||
* @return object to read the container's logs or null if the
|
||||
* logs could not be found
|
||||
* @throws IOException
|
||||
*/
|
||||
public ContainerLogsReader getContainerLogsReader(
|
||||
ContainerId containerId) throws IOException {
|
||||
ContainerLogsReader logReader = null;
|
||||
|
||||
final LogKey containerKey = new LogKey(containerId);
|
||||
LogKey key = new LogKey();
|
||||
DataInputStream valueStream = next(key);
|
||||
while (valueStream != null && !key.equals(containerKey)) {
|
||||
valueStream = next(key);
|
||||
}
|
||||
|
||||
if (valueStream != null) {
|
||||
logReader = new ContainerLogsReader(valueStream);
|
||||
}
|
||||
|
||||
return logReader;
|
||||
}
|
||||
|
||||
//TODO Change Log format and interfaces to be containerId specific.
|
||||
// Avoid returning completeValueStreams.
|
||||
// public List<String> getTypesForContainer(DataInputStream valueStream){}
|
||||
|
@ -489,4 +532,67 @@ public class AggregatedLogFormat {
|
|||
this.fsDataIStream.close();
|
||||
}
|
||||
}
|
||||
|
||||
public static class ContainerLogsReader {
|
||||
private DataInputStream valueStream;
|
||||
private String currentLogType = null;
|
||||
private long currentLogLength = 0;
|
||||
private BoundedInputStream currentLogData = null;
|
||||
private InputStreamReader currentLogISR;
|
||||
|
||||
public ContainerLogsReader(DataInputStream stream) {
|
||||
valueStream = stream;
|
||||
}
|
||||
|
||||
public String nextLog() throws IOException {
|
||||
if (currentLogData != null && currentLogLength > 0) {
|
||||
// seek to the end of the current log, relying on BoundedInputStream
|
||||
// to prevent seeking past the end of the current log
|
||||
do {
|
||||
if (currentLogData.skip(currentLogLength) < 0) {
|
||||
break;
|
||||
}
|
||||
} while (currentLogData.read() != -1);
|
||||
}
|
||||
|
||||
currentLogType = null;
|
||||
currentLogLength = 0;
|
||||
currentLogData = null;
|
||||
currentLogISR = null;
|
||||
|
||||
try {
|
||||
String logType = valueStream.readUTF();
|
||||
String logLengthStr = valueStream.readUTF();
|
||||
currentLogLength = Long.parseLong(logLengthStr);
|
||||
currentLogData =
|
||||
new BoundedInputStream(valueStream, currentLogLength);
|
||||
currentLogData.setPropagateClose(false);
|
||||
currentLogISR = new InputStreamReader(currentLogData);
|
||||
currentLogType = logType;
|
||||
} catch (EOFException e) {
|
||||
}
|
||||
|
||||
return currentLogType;
|
||||
}
|
||||
|
||||
public String getCurrentLogType() {
|
||||
return currentLogType;
|
||||
}
|
||||
|
||||
public long getCurrentLogLength() {
|
||||
return currentLogLength;
|
||||
}
|
||||
|
||||
public long skip(long n) throws IOException {
|
||||
return currentLogData.skip(n);
|
||||
}
|
||||
|
||||
public int read(byte[] buf, int off, int len) throws IOException {
|
||||
return currentLogData.read(buf, off, len);
|
||||
}
|
||||
|
||||
public int read(char[] buf, int off, int len) throws IOException {
|
||||
return currentLogISR.read(buf, off, len);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,10 +2,10 @@ package org.apache.hadoop.yarn.webapp.log;
|
|||
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
|
||||
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
@ -19,10 +19,11 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
@ -41,8 +42,9 @@ public class AggregatedLogsBlock extends HtmlBlock {
|
|||
ContainerId containerId = verifyAndGetContainerId(html);
|
||||
NodeId nodeId = verifyAndGetNodeId(html);
|
||||
String appOwner = verifyAndGetAppOwner(html);
|
||||
LogLimits logLimits = verifyAndGetLogLimits(html);
|
||||
if (containerId == null || nodeId == null || appOwner == null
|
||||
|| appOwner.isEmpty()) {
|
||||
|| appOwner.isEmpty() || logLimits == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -113,24 +115,29 @@ public class AggregatedLogsBlock extends HtmlBlock {
|
|||
return;
|
||||
}
|
||||
|
||||
DataInputStream valueStream;
|
||||
LogKey key = new LogKey();
|
||||
String desiredLogType = $(CONTAINER_LOG_TYPE);
|
||||
try {
|
||||
valueStream = reader.next(key);
|
||||
while (valueStream != null
|
||||
&& !key.toString().equals(containerId.toString())) {
|
||||
valueStream = reader.next(key);
|
||||
}
|
||||
if (valueStream == null) {
|
||||
AggregatedLogFormat.ContainerLogsReader logReader =
|
||||
reader.getContainerLogsReader(containerId);
|
||||
if (logReader == null) {
|
||||
html.h1()._(
|
||||
"Logs not available for " + logEntity
|
||||
+ ". Could be caused by the rentention policy")._();
|
||||
return;
|
||||
}
|
||||
writer().write("<pre>");
|
||||
AggregatedLogFormat.LogReader.readAcontainerLogs(valueStream, writer());
|
||||
writer().write("</pre>");
|
||||
return;
|
||||
|
||||
boolean foundLog = readContainerLogs(html, logReader, logLimits,
|
||||
desiredLogType);
|
||||
|
||||
if (!foundLog) {
|
||||
if (desiredLogType.isEmpty()) {
|
||||
html.h1("No logs available for container " + containerId.toString());
|
||||
} else {
|
||||
html.h1("Unable to locate '" + desiredLogType
|
||||
+ "' log for container " + containerId.toString());
|
||||
}
|
||||
return;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
html.h1()._("Error getting logs for " + logEntity)._();
|
||||
LOG.error("Error getting logs for " + logEntity, e);
|
||||
|
@ -138,6 +145,76 @@ public class AggregatedLogsBlock extends HtmlBlock {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean readContainerLogs(Block html,
|
||||
AggregatedLogFormat.ContainerLogsReader logReader, LogLimits logLimits,
|
||||
String desiredLogType) throws IOException {
|
||||
int bufferSize = 65536;
|
||||
char[] cbuf = new char[bufferSize];
|
||||
|
||||
boolean foundLog = false;
|
||||
String logType = logReader.nextLog();
|
||||
while (logType != null) {
|
||||
if (desiredLogType == null || desiredLogType.isEmpty()
|
||||
|| desiredLogType.equals(logType)) {
|
||||
long logLength = logReader.getCurrentLogLength();
|
||||
|
||||
if (foundLog) {
|
||||
html.pre()._("\n\n")._();
|
||||
}
|
||||
|
||||
html.p()._("Log Type: " + logType)._();
|
||||
html.p()._("Log Length: " + Long.toString(logLength))._();
|
||||
|
||||
long start = logLimits.start < 0
|
||||
? logLength + logLimits.start : logLimits.start;
|
||||
start = start < 0 ? 0 : start;
|
||||
start = start > logLength ? logLength : start;
|
||||
long end = logLimits.end < 0
|
||||
? logLength + logLimits.end : logLimits.end;
|
||||
end = end < 0 ? 0 : end;
|
||||
end = end > logLength ? logLength : end;
|
||||
end = end < start ? start : end;
|
||||
|
||||
long toRead = end - start;
|
||||
if (toRead < logLength) {
|
||||
html.p()._("Showing " + toRead + " bytes of " + logLength
|
||||
+ " total. Click ")
|
||||
.a(url("logs", $(NM_NODENAME), $(CONTAINER_ID),
|
||||
$(ENTITY_STRING), $(APP_OWNER),
|
||||
logType, "?start=0"), "here").
|
||||
_(" for the full log.")._();
|
||||
}
|
||||
|
||||
long totalSkipped = 0;
|
||||
while (totalSkipped < start) {
|
||||
long ret = logReader.skip(start - totalSkipped);
|
||||
if (ret < 0) {
|
||||
throw new IOException( "Premature EOF from container log");
|
||||
}
|
||||
totalSkipped += ret;
|
||||
}
|
||||
|
||||
int len = 0;
|
||||
int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
|
||||
PRE<Hamlet> pre = html.pre();
|
||||
|
||||
while (toRead > 0
|
||||
&& (len = logReader.read(cbuf, 0, currentToRead)) > 0) {
|
||||
pre._(new String(cbuf, 0, len));
|
||||
toRead = toRead - len;
|
||||
currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
|
||||
}
|
||||
|
||||
pre._();
|
||||
foundLog = true;
|
||||
}
|
||||
|
||||
logType = logReader.nextLog();
|
||||
}
|
||||
|
||||
return foundLog;
|
||||
}
|
||||
|
||||
private ContainerId verifyAndGetContainerId(Block html) {
|
||||
String containerIdStr = $(CONTAINER_ID);
|
||||
if (containerIdStr == null || containerIdStr.isEmpty()) {
|
||||
|
@ -180,4 +257,44 @@ public class AggregatedLogsBlock extends HtmlBlock {
|
|||
}
|
||||
return appOwner;
|
||||
}
|
||||
|
||||
private static class LogLimits {
|
||||
long start;
|
||||
long end;
|
||||
}
|
||||
|
||||
private LogLimits verifyAndGetLogLimits(Block html) {
|
||||
long start = -4096;
|
||||
long end = Long.MAX_VALUE;
|
||||
boolean isValid = true;
|
||||
|
||||
String startStr = $("start");
|
||||
if (startStr != null && !startStr.isEmpty()) {
|
||||
try {
|
||||
start = Long.parseLong(startStr);
|
||||
} catch (NumberFormatException e) {
|
||||
isValid = false;
|
||||
html.h1()._("Invalid log start value: " + startStr)._();
|
||||
}
|
||||
}
|
||||
|
||||
String endStr = $("end");
|
||||
if (endStr != null && !endStr.isEmpty()) {
|
||||
try {
|
||||
end = Long.parseLong(endStr);
|
||||
} catch (NumberFormatException e) {
|
||||
isValid = false;
|
||||
html.h1()._("Invalid log end value: " + endStr)._();
|
||||
}
|
||||
}
|
||||
|
||||
if (!isValid) {
|
||||
return null;
|
||||
}
|
||||
|
||||
LogLimits limits = new LogLimits();
|
||||
limits.start = start;
|
||||
limits.end = end;
|
||||
return limits;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue