YARN-2703. Added logUploadedTime into LogValue for better display. Contributed by Xuan Gong.
(cherry picked from commit f81dc3f995
)
This commit is contained in:
parent
2bcda17ee7
commit
1b81105143
|
@ -361,6 +361,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2209. Replaced AM resync/shutdown command with corresponding exceptions and
|
YARN-2209. Replaced AM resync/shutdown command with corresponding exceptions and
|
||||||
made related MR changes. (Jian He via zjshen)
|
made related MR changes. (Jian He via zjshen)
|
||||||
|
|
||||||
|
YARN-2703. Added logUploadedTime into LogValue for better display. (Xuan Gong
|
||||||
|
via zjshen)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -227,6 +227,7 @@ public class TestLogsCLI {
|
||||||
assertTrue(exitCode == 0);
|
assertTrue(exitCode == 0);
|
||||||
assertTrue(sysOutStream.toString().contains(
|
assertTrue(sysOutStream.toString().contains(
|
||||||
"Hello container_0_0001_01_000001!"));
|
"Hello container_0_0001_01_000001!"));
|
||||||
|
assertTrue(sysOutStream.toString().contains("LogUploadTime"));
|
||||||
|
|
||||||
fs.delete(new Path(remoteLogRootDir), true);
|
fs.delete(new Path(remoteLogRootDir), true);
|
||||||
fs.delete(new Path(rootLogDir), true);
|
fs.delete(new Path(rootLogDir), true);
|
||||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.Times;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Predicate;
|
import com.google.common.base.Predicate;
|
||||||
|
@ -226,6 +227,9 @@ public class AggregatedLogFormat {
|
||||||
// Write the logFile Type
|
// Write the logFile Type
|
||||||
out.writeUTF(logFile.getName());
|
out.writeUTF(logFile.getName());
|
||||||
|
|
||||||
|
// Write the uploaded TimeStamp
|
||||||
|
out.writeLong(System.currentTimeMillis());
|
||||||
|
|
||||||
// Write the log length as UTF so that it is printable
|
// Write the log length as UTF so that it is printable
|
||||||
out.writeUTF(String.valueOf(fileLength));
|
out.writeUTF(String.valueOf(fileLength));
|
||||||
|
|
||||||
|
@ -636,6 +640,7 @@ public class AggregatedLogFormat {
|
||||||
int bufferSize = 65536;
|
int bufferSize = 65536;
|
||||||
char[] cbuf = new char[bufferSize];
|
char[] cbuf = new char[bufferSize];
|
||||||
String fileType;
|
String fileType;
|
||||||
|
long uploadTime;
|
||||||
String fileLengthStr;
|
String fileLengthStr;
|
||||||
long fileLength;
|
long fileLength;
|
||||||
|
|
||||||
|
@ -646,10 +651,13 @@ public class AggregatedLogFormat {
|
||||||
// EndOfFile
|
// EndOfFile
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
uploadTime = valueStream.readLong();
|
||||||
fileLengthStr = valueStream.readUTF();
|
fileLengthStr = valueStream.readUTF();
|
||||||
fileLength = Long.parseLong(fileLengthStr);
|
fileLength = Long.parseLong(fileLengthStr);
|
||||||
writer.write("\n\nLogType:");
|
writer.write("\n\nLogType:");
|
||||||
writer.write(fileType);
|
writer.write(fileType);
|
||||||
|
writer.write("\nLogUploadTime:");
|
||||||
|
writer.write(String.valueOf(uploadTime));
|
||||||
writer.write("\nLogLength:");
|
writer.write("\nLogLength:");
|
||||||
writer.write(fileLengthStr);
|
writer.write(fileLengthStr);
|
||||||
writer.write("\nLog Contents:\n");
|
writer.write("\nLog Contents:\n");
|
||||||
|
@ -681,10 +689,13 @@ public class AggregatedLogFormat {
|
||||||
byte[] buf = new byte[65535];
|
byte[] buf = new byte[65535];
|
||||||
|
|
||||||
String fileType = valueStream.readUTF();
|
String fileType = valueStream.readUTF();
|
||||||
|
long uploadTime = valueStream.readLong();
|
||||||
String fileLengthStr = valueStream.readUTF();
|
String fileLengthStr = valueStream.readUTF();
|
||||||
long fileLength = Long.parseLong(fileLengthStr);
|
long fileLength = Long.parseLong(fileLengthStr);
|
||||||
out.print("LogType: ");
|
out.print("LogType: ");
|
||||||
out.println(fileType);
|
out.println(fileType);
|
||||||
|
out.print("LogUploadTime: ");
|
||||||
|
out.println(Times.format(uploadTime));
|
||||||
out.print("LogLength: ");
|
out.print("LogLength: ");
|
||||||
out.println(fileLengthStr);
|
out.println(fileLengthStr);
|
||||||
out.println("Log Contents:");
|
out.println("Log Contents:");
|
||||||
|
@ -715,6 +726,7 @@ public class AggregatedLogFormat {
|
||||||
public static class ContainerLogsReader {
|
public static class ContainerLogsReader {
|
||||||
private DataInputStream valueStream;
|
private DataInputStream valueStream;
|
||||||
private String currentLogType = null;
|
private String currentLogType = null;
|
||||||
|
private long currentLogUpLoadTime = 0;
|
||||||
private long currentLogLength = 0;
|
private long currentLogLength = 0;
|
||||||
private BoundedInputStream currentLogData = null;
|
private BoundedInputStream currentLogData = null;
|
||||||
private InputStreamReader currentLogISR;
|
private InputStreamReader currentLogISR;
|
||||||
|
@ -735,12 +747,14 @@ public class AggregatedLogFormat {
|
||||||
}
|
}
|
||||||
|
|
||||||
currentLogType = null;
|
currentLogType = null;
|
||||||
|
currentLogUpLoadTime = 0;
|
||||||
currentLogLength = 0;
|
currentLogLength = 0;
|
||||||
currentLogData = null;
|
currentLogData = null;
|
||||||
currentLogISR = null;
|
currentLogISR = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
String logType = valueStream.readUTF();
|
String logType = valueStream.readUTF();
|
||||||
|
long logUpLoadTime = valueStream.readLong();
|
||||||
String logLengthStr = valueStream.readUTF();
|
String logLengthStr = valueStream.readUTF();
|
||||||
currentLogLength = Long.parseLong(logLengthStr);
|
currentLogLength = Long.parseLong(logLengthStr);
|
||||||
currentLogData =
|
currentLogData =
|
||||||
|
@ -748,6 +762,7 @@ public class AggregatedLogFormat {
|
||||||
currentLogData.setPropagateClose(false);
|
currentLogData.setPropagateClose(false);
|
||||||
currentLogISR = new InputStreamReader(currentLogData);
|
currentLogISR = new InputStreamReader(currentLogData);
|
||||||
currentLogType = logType;
|
currentLogType = logType;
|
||||||
|
currentLogUpLoadTime = logUpLoadTime;
|
||||||
} catch (EOFException e) {
|
} catch (EOFException e) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -758,6 +773,10 @@ public class AggregatedLogFormat {
|
||||||
return currentLogType;
|
return currentLogType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getCurrentLogUpLoadTime() {
|
||||||
|
return currentLogUpLoadTime;
|
||||||
|
}
|
||||||
|
|
||||||
public long getCurrentLogLength() {
|
public long getCurrentLogLength() {
|
||||||
return currentLogLength;
|
return currentLogLength;
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
||||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.Times;
|
||||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
||||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE;
|
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE;
|
||||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
|
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
|
||||||
|
@ -198,12 +199,13 @@ public class AggregatedLogsBlock extends HtmlBlock {
|
||||||
if (desiredLogType == null || desiredLogType.isEmpty()
|
if (desiredLogType == null || desiredLogType.isEmpty()
|
||||||
|| desiredLogType.equals(logType)) {
|
|| desiredLogType.equals(logType)) {
|
||||||
long logLength = logReader.getCurrentLogLength();
|
long logLength = logReader.getCurrentLogLength();
|
||||||
|
long logUpLoadTime = logReader.getCurrentLogUpLoadTime();
|
||||||
if (foundLog) {
|
if (foundLog) {
|
||||||
html.pre()._("\n\n")._();
|
html.pre()._("\n\n")._();
|
||||||
}
|
}
|
||||||
|
|
||||||
html.p()._("Log Type: " + logType)._();
|
html.p()._("Log Type: " + logType)._();
|
||||||
|
html.p()._("Log UpLoadTime: " + Times.format(logUpLoadTime))._();
|
||||||
html.p()._("Log Length: " + Long.toString(logLength))._();
|
html.p()._("Log Length: " + Long.toString(logLength))._();
|
||||||
|
|
||||||
long start = logLimits.start < 0
|
long start = logLimits.start < 0
|
||||||
|
|
|
@ -232,7 +232,9 @@ public class TestAggregatedLogFormat {
|
||||||
// aggregated.
|
// aggregated.
|
||||||
String s = writer.toString();
|
String s = writer.toString();
|
||||||
int expectedLength =
|
int expectedLength =
|
||||||
"\n\nLogType:stdout".length() + ("\nLogLength:" + numChars).length()
|
"\n\nLogType:stdout".length()
|
||||||
|
+ ("\nLogUploadTime:" + System.currentTimeMillis()).length()
|
||||||
|
+ ("\nLogLength:" + numChars).length()
|
||||||
+ "\nLog Contents:\n".length() + numChars;
|
+ "\nLog Contents:\n".length() + numChars;
|
||||||
Assert.assertTrue("LogType not matched", s.contains("LogType:stdout"));
|
Assert.assertTrue("LogType not matched", s.contains("LogType:stdout"));
|
||||||
Assert.assertTrue("log file:stderr should not be aggregated.", !s.contains("LogType:stderr"));
|
Assert.assertTrue("log file:stderr should not be aggregated.", !s.contains("LogType:stderr"));
|
||||||
|
|
|
@ -61,7 +61,6 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.AbstractFileSystem;
|
import org.apache.hadoop.fs.AbstractFileSystem;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -768,19 +767,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8));
|
Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8));
|
||||||
String fileType = writtenLines[0].substring(9);
|
String fileType = writtenLines[0].substring(9);
|
||||||
|
|
||||||
Assert.assertEquals("LogLength:", writtenLines[1].substring(0, 10));
|
Assert.assertEquals("LogUploadTime:", writtenLines[1].substring(0, 14));
|
||||||
String fileLengthStr = writtenLines[1].substring(11);
|
String fileUploadedTimeStr = writtenLines[1].substring(15);
|
||||||
|
|
||||||
|
Assert.assertEquals("LogLength:", writtenLines[2].substring(0, 10));
|
||||||
|
String fileLengthStr = writtenLines[2].substring(11);
|
||||||
long fileLength = Long.parseLong(fileLengthStr);
|
long fileLength = Long.parseLong(fileLengthStr);
|
||||||
|
|
||||||
Assert.assertEquals("Log Contents:",
|
Assert.assertEquals("Log Contents:",
|
||||||
writtenLines[2].substring(0, 13));
|
writtenLines[3].substring(0, 13));
|
||||||
|
|
||||||
String logContents = StringUtils.join(
|
String logContents = StringUtils.join(
|
||||||
Arrays.copyOfRange(writtenLines, 3, writtenLines.length), "\n");
|
Arrays.copyOfRange(writtenLines, 4, writtenLines.length), "\n");
|
||||||
perContainerMap.put(fileType, logContents);
|
perContainerMap.put(fileType, logContents);
|
||||||
|
|
||||||
LOG.info("LogType:" + fileType);
|
LOG.info("LogType:" + fileType);
|
||||||
LOG.info("LogType:" + fileLength);
|
LOG.info("LogUploadTime:" + fileUploadedTimeStr);
|
||||||
|
LOG.info("LogLength:" + fileLength);
|
||||||
LOG.info("Log Contents:\n" + perContainerMap.get(fileType));
|
LOG.info("Log Contents:\n" + perContainerMap.get(fileType));
|
||||||
} catch (EOFException eof) {
|
} catch (EOFException eof) {
|
||||||
break;
|
break;
|
||||||
|
|
Loading…
Reference in New Issue