YARN-2703. Added logUploadedTime into LogValue for better display. Contributed by Xuan Gong.

This commit is contained in:
Zhijie Shen 2014-10-24 14:10:46 -07:00
parent a52eb4bc5f
commit f81dc3f995
6 changed files with 38 additions and 8 deletions

View File

@ -391,6 +391,9 @@ Release 2.6.0 - UNRELEASED
YARN-2209. Replaced AM resync/shutdown command with corresponding exceptions and
made related MR changes. (Jian He via zjshen)
YARN-2703. Added logUploadedTime into LogValue for better display. (Xuan Gong
via zjshen)
OPTIMIZATIONS
BUG FIXES

View File

@ -227,6 +227,7 @@ public class TestLogsCLI {
assertTrue(exitCode == 0);
assertTrue(sysOutStream.toString().contains(
"Hello container_0_0001_01_000001!"));
assertTrue(sysOutStream.toString().contains("LogUploadTime"));
fs.delete(new Path(remoteLogRootDir), true);
fs.delete(new Path(rootLogDir), true);

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Times;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
@ -226,6 +227,9 @@ public class AggregatedLogFormat {
// Write the logFile Type
out.writeUTF(logFile.getName());
// Write the uploaded TimeStamp
out.writeLong(System.currentTimeMillis());
// Write the log length as UTF so that it is printable
out.writeUTF(String.valueOf(fileLength));
@ -636,6 +640,7 @@ public class AggregatedLogFormat {
int bufferSize = 65536;
char[] cbuf = new char[bufferSize];
String fileType;
long uploadTime;
String fileLengthStr;
long fileLength;
@ -646,10 +651,13 @@ public class AggregatedLogFormat {
// EndOfFile
return;
}
uploadTime = valueStream.readLong();
fileLengthStr = valueStream.readUTF();
fileLength = Long.parseLong(fileLengthStr);
writer.write("\n\nLogType:");
writer.write(fileType);
writer.write("\nLogUploadTime:");
writer.write(String.valueOf(uploadTime));
writer.write("\nLogLength:");
writer.write(fileLengthStr);
writer.write("\nLog Contents:\n");
@ -681,10 +689,13 @@ public class AggregatedLogFormat {
byte[] buf = new byte[65535];
String fileType = valueStream.readUTF();
long uploadTime = valueStream.readLong();
String fileLengthStr = valueStream.readUTF();
long fileLength = Long.parseLong(fileLengthStr);
out.print("LogType: ");
out.println(fileType);
out.print("LogUploadTime: ");
out.println(Times.format(uploadTime));
out.print("LogLength: ");
out.println(fileLengthStr);
out.println("Log Contents:");
@ -715,6 +726,7 @@ public class AggregatedLogFormat {
public static class ContainerLogsReader {
private DataInputStream valueStream;
private String currentLogType = null;
private long currentLogUpLoadTime = 0;
private long currentLogLength = 0;
private BoundedInputStream currentLogData = null;
private InputStreamReader currentLogISR;
@ -735,12 +747,14 @@ public class AggregatedLogFormat {
}
currentLogType = null;
currentLogUpLoadTime = 0;
currentLogLength = 0;
currentLogData = null;
currentLogISR = null;
try {
String logType = valueStream.readUTF();
long logUpLoadTime = valueStream.readLong();
String logLengthStr = valueStream.readUTF();
currentLogLength = Long.parseLong(logLengthStr);
currentLogData =
@ -748,6 +762,7 @@ public class AggregatedLogFormat {
currentLogData.setPropagateClose(false);
currentLogISR = new InputStreamReader(currentLogData);
currentLogType = logType;
currentLogUpLoadTime = logUpLoadTime;
} catch (EOFException e) {
}
@ -758,6 +773,10 @@ public class AggregatedLogFormat {
return currentLogType;
}
public long getCurrentLogUpLoadTime() {
return currentLogUpLoadTime;
}
public long getCurrentLogLength() {
return currentLogLength;
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
@ -198,12 +199,13 @@ public class AggregatedLogsBlock extends HtmlBlock {
if (desiredLogType == null || desiredLogType.isEmpty()
|| desiredLogType.equals(logType)) {
long logLength = logReader.getCurrentLogLength();
long logUpLoadTime = logReader.getCurrentLogUpLoadTime();
if (foundLog) {
html.pre()._("\n\n")._();
}
html.p()._("Log Type: " + logType)._();
html.p()._("Log UpLoadTime: " + Times.format(logUpLoadTime))._();
html.p()._("Log Length: " + Long.toString(logLength))._();
long start = logLimits.start < 0

View File

@ -232,7 +232,9 @@ public class TestAggregatedLogFormat {
// aggregated.
String s = writer.toString();
int expectedLength =
"\n\nLogType:stdout".length() + ("\nLogLength:" + numChars).length()
"\n\nLogType:stdout".length()
+ ("\nLogUploadTime:" + System.currentTimeMillis()).length()
+ ("\nLogLength:" + numChars).length()
+ "\nLog Contents:\n".length() + numChars;
Assert.assertTrue("LogType not matched", s.contains("LogType:stdout"));
Assert.assertTrue("log file:stderr should not be aggregated.", !s.contains("LogType:stderr"));

View File

@ -61,7 +61,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -768,19 +767,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8));
String fileType = writtenLines[0].substring(9);
Assert.assertEquals("LogLength:", writtenLines[1].substring(0, 10));
String fileLengthStr = writtenLines[1].substring(11);
Assert.assertEquals("LogUploadTime:", writtenLines[1].substring(0, 14));
String fileUploadedTimeStr = writtenLines[1].substring(15);
Assert.assertEquals("LogLength:", writtenLines[2].substring(0, 10));
String fileLengthStr = writtenLines[2].substring(11);
long fileLength = Long.parseLong(fileLengthStr);
Assert.assertEquals("Log Contents:",
writtenLines[2].substring(0, 13));
writtenLines[3].substring(0, 13));
String logContents = StringUtils.join(
Arrays.copyOfRange(writtenLines, 3, writtenLines.length), "\n");
Arrays.copyOfRange(writtenLines, 4, writtenLines.length), "\n");
perContainerMap.put(fileType, logContents);
LOG.info("LogType:" + fileType);
LOG.info("LogType:" + fileLength);
LOG.info("LogUploadTime:" + fileUploadedTimeStr);
LOG.info("LogLength:" + fileLength);
LOG.info("Log Contents:\n" + perContainerMap.get(fileType));
} catch (EOFException eof) {
break;