diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3f00a0eefd2..5bf64701c7b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -770,6 +770,10 @@ Release 2.6.0 - UNRELEASED YARN-2798. Fixed YarnClient to populate the renewer correctly for Timeline delegation tokens. (Zhijie Shen via vinodkv) + YARN-2788. Fixed backwards compatiblity issues with log-aggregation feature + that were caused when adding log-upload-time via YARN-2703. (Xuan Gong via + vinodkv) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index 2e9e92dc093..5ed839847f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -33,6 +34,7 @@ import java.io.PrintWriter; import java.io.Writer; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -170,9 +172,9 @@ public class TestLogsCLI { ApplicationId appId = ApplicationIdPBImpl.newInstance(0, 1); ApplicationAttemptId appAttemptId = ApplicationAttemptIdPBImpl.newInstance(appId, 1); + ContainerId containerId0 = ContainerIdPBImpl.newInstance(appAttemptId, 0); ContainerId containerId1 = ContainerIdPBImpl.newInstance(appAttemptId, 1); ContainerId containerId2 = ContainerIdPBImpl.newInstance(appAttemptId, 2); - NodeId nodeId = NodeId.newInstance("localhost", 1234); // create local logs @@ -201,7 +203,15 @@ public class TestLogsCLI { fs.delete(path, true); } assertTrue(fs.mkdirs(path)); + // upload container logs into remote directory + // the first two logs is empty. When we try to read first two logs, + // we will meet EOF exception, but it will not impact other logs. + // Other logs should be read successfully. + uploadEmptyContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId, + containerId0, path, fs); + uploadEmptyContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId, + containerId1, path, fs); uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId, containerId1, path, fs); uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId, @@ -220,6 +230,9 @@ public class TestLogsCLI { "Hello container_0_0001_01_000002!")); sysOutStream.reset(); + // uploaded two logs for container1. The first log is empty. + // The second one is not empty. + // We can still successfully read logs for container1. exitCode = cli.run(new String[] { "-applicationId", appId.toString(), "-nodeAddress", nodeId.toString(), "-containerId", @@ -227,7 +240,23 @@ public class TestLogsCLI { assertTrue(exitCode == 0); assertTrue(sysOutStream.toString().contains( "Hello container_0_0001_01_000001!")); - assertTrue(sysOutStream.toString().contains("LogUploadTime")); + assertTrue(sysOutStream.toString().contains("Log Upload Time")); + assertTrue(!sysOutStream.toString().contains( + "Logs for container " + containerId1.toString() + + " are not present in this log-file.")); + sysOutStream.reset(); + + // Uploaded the empty log for container0. + // We should see the message showing the log for container0 + // are not present. + exitCode = + cli.run(new String[] { "-applicationId", appId.toString(), + "-nodeAddress", nodeId.toString(), "-containerId", + containerId0.toString() }); + assertTrue(exitCode == -1); + assertTrue(sysOutStream.toString().contains( + "Logs for container " + containerId0.toString() + + " are not present in this log-file.")); fs.delete(new Path(remoteLogRootDir), true); fs.delete(new Path(rootLogDir), true); @@ -266,6 +295,31 @@ public class TestLogsCLI { writer.close(); } + private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ugi, + Configuration configuration, List rootLogDirs, NodeId nodeId, + ContainerId containerId, Path appDir, FileSystem fs) throws Exception { + Path path = + new Path(appDir, LogAggregationUtils.getNodeString(nodeId) + + System.currentTimeMillis()); + AggregatedLogFormat.LogWriter writer = + new AggregatedLogFormat.LogWriter(configuration, path, ugi); + writer.writeApplicationOwner(ugi.getUserName()); + + Map appAcls = + new HashMap(); + appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); + writer.writeApplicationACLs(appAcls); + DataOutputStream out = writer.getWriter().prepareAppendKey(-1); + new AggregatedLogFormat.LogKey(containerId).write(out); + out.close(); + out = writer.getWriter().prepareAppendValue(-1); + new AggregatedLogFormat.LogValue(rootLogDirs, containerId, + UserGroupInformation.getCurrentUser().getShortUserName()).write(out, + new HashSet()); + out.close(); + writer.close(); + } + private YarnClient createMockYarnClient(YarnApplicationState appState) throws YarnException, IOException { YarnClient mockClient = mock(YarnClient.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index 22219be1e08..a434ef59a6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -27,6 +27,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStream; import java.io.PrintStream; import java.io.Writer; import java.security.PrivilegedExceptionAction; @@ -44,6 +45,7 @@ import java.util.Set; import java.util.regex.Pattern; import org.apache.commons.io.input.BoundedInputStream; +import org.apache.commons.io.output.WriterOutputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -233,9 +235,6 @@ public class AggregatedLogFormat { // Write the logFile Type out.writeUTF(logFile.getName()); - // Write the uploaded TimeStamp - out.writeLong(System.currentTimeMillis()); - // Write the log length as UTF so that it is printable out.writeUTF(String.valueOf(fileLength)); @@ -400,6 +399,11 @@ public class AggregatedLogFormat { writeVersion(); } + @VisibleForTesting + public TFile.Writer getWriter() { + return this.writer; + } + private void writeVersion() throws IOException { DataOutputStream out = this.writer.prepareAppendKey(-1); VERSION_KEY.write(out); @@ -639,70 +643,55 @@ public class AggregatedLogFormat { * Writes all logs for a single container to the provided writer. * @param valueStream * @param writer + * @param logUploadedTime * @throws IOException */ public static void readAcontainerLogs(DataInputStream valueStream, - Writer writer) throws IOException { - int bufferSize = 65536; - char[] cbuf = new char[bufferSize]; - String fileType; - long uploadTime; - String fileLengthStr; - long fileLength; - - while (true) { - try { - fileType = valueStream.readUTF(); - } catch (EOFException e) { - // EndOfFile - return; - } - uploadTime = valueStream.readLong(); - fileLengthStr = valueStream.readUTF(); - fileLength = Long.parseLong(fileLengthStr); - writer.write("\n\nLogType:"); - writer.write(fileType); - writer.write("\nLogUploadTime:"); - writer.write(String.valueOf(uploadTime)); - writer.write("\nLogLength:"); - writer.write(fileLengthStr); - writer.write("\nLog Contents:\n"); - // ByteLevel - BoundedInputStream bis = - new BoundedInputStream(valueStream, fileLength); - InputStreamReader reader = new InputStreamReader(bis); - int currentRead = 0; - int totalRead = 0; - while ((currentRead = reader.read(cbuf, 0, bufferSize)) != -1) { - writer.write(cbuf, 0, currentRead); - totalRead += currentRead; + Writer writer, long logUploadedTime) throws IOException { + OutputStream os = null; + PrintStream ps = null; + try { + os = new WriterOutputStream(writer); + ps = new PrintStream(os); + while (true) { + try { + readContainerLogs(valueStream, ps, logUploadedTime); + } catch (EOFException e) { + // EndOfFile + return; + } } + } finally { + IOUtils.cleanup(LOG, ps); + IOUtils.cleanup(LOG, os); } } /** - * Keep calling this till you get a {@link EOFException} for getting logs of - * all types for a single container. - * + * Writes all logs for a single container to the provided writer. * @param valueStream - * @param out + * @param writer * @throws IOException */ - public static void readAContainerLogsForALogType( - DataInputStream valueStream, PrintStream out) - throws IOException { + public static void readAcontainerLogs(DataInputStream valueStream, + Writer writer) throws IOException { + readAcontainerLogs(valueStream, writer, -1); + } + private static void readContainerLogs(DataInputStream valueStream, + PrintStream out, long logUploadedTime) throws IOException { byte[] buf = new byte[65535]; String fileType = valueStream.readUTF(); - long uploadTime = valueStream.readLong(); String fileLengthStr = valueStream.readUTF(); long fileLength = Long.parseLong(fileLengthStr); - out.print("LogType: "); + out.print("LogType:"); out.println(fileType); - out.print("LogUploadTime: "); - out.println(Times.format(uploadTime)); - out.print("LogLength: "); + if (logUploadedTime != -1) { + out.print("Log Upload Time:"); + out.println(Times.format(logUploadedTime)); + } + out.print("LogLength:"); out.println(fileLengthStr); out.println("Log Contents:"); @@ -723,6 +712,35 @@ public class AggregatedLogFormat { out.println(""); } + /** + * Keep calling this till you get a {@link EOFException} for getting logs of + * all types for a single container. + * + * @param valueStream + * @param out + * @param logUploadedTime + * @throws IOException + */ + public static void readAContainerLogsForALogType( + DataInputStream valueStream, PrintStream out, long logUploadedTime) + throws IOException { + readContainerLogs(valueStream, out, logUploadedTime); + } + + /** + * Keep calling this till you get a {@link EOFException} for getting logs of + * all types for a single container. + * + * @param valueStream + * @param out + * @throws IOException + */ + public static void readAContainerLogsForALogType( + DataInputStream valueStream, PrintStream out) + throws IOException { + readAContainerLogsForALogType(valueStream, out, -1); + } + public void close() { IOUtils.cleanup(LOG, scanner, reader, fsDataIStream); } @@ -732,7 +750,6 @@ public class AggregatedLogFormat { public static class ContainerLogsReader { private DataInputStream valueStream; private String currentLogType = null; - private long currentLogUpLoadTime = 0; private long currentLogLength = 0; private BoundedInputStream currentLogData = null; private InputStreamReader currentLogISR; @@ -753,14 +770,12 @@ public class AggregatedLogFormat { } currentLogType = null; - currentLogUpLoadTime = 0; currentLogLength = 0; currentLogData = null; currentLogISR = null; try { String logType = valueStream.readUTF(); - long logUpLoadTime = valueStream.readLong(); String logLengthStr = valueStream.readUTF(); currentLogLength = Long.parseLong(logLengthStr); currentLogData = @@ -768,7 +783,6 @@ public class AggregatedLogFormat { currentLogData.setPropagateClose(false); currentLogISR = new InputStreamReader(currentLogData); currentLogType = logType; - currentLogUpLoadTime = logUpLoadTime; } catch (EOFException e) { } @@ -779,10 +793,6 @@ public class AggregatedLogFormat { return currentLogType; } - public long getCurrentLogUpLoadTime() { - return currentLogUpLoadTime; - } - public long getCurrentLogLength() { return currentLogLength; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java index de06d483873..1546ece9b99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java @@ -78,7 +78,8 @@ public class LogCLIHelpers implements Configurable { reader = new AggregatedLogFormat.LogReader(getConf(), thisNodeFile.getPath()); - if (dumpAContainerLogs(containerId, reader, System.out) > -1) { + if (dumpAContainerLogs(containerId, reader, System.out, + thisNodeFile.getModificationTime()) > -1) { foundContainerLogs = true; } } finally { @@ -97,7 +98,8 @@ public class LogCLIHelpers implements Configurable { @Private public int dumpAContainerLogs(String containerIdStr, - AggregatedLogFormat.LogReader reader, PrintStream out) throws IOException { + AggregatedLogFormat.LogReader reader, PrintStream out, + long logUploadedTime) throws IOException { DataInputStream valueStream; LogKey key = new LogKey(); valueStream = reader.next(key); @@ -112,14 +114,20 @@ public class LogCLIHelpers implements Configurable { return -1; } + boolean foundContainerLogs = false; while (true) { try { - LogReader.readAContainerLogsForALogType(valueStream, out); + LogReader.readAContainerLogsForALogType(valueStream, out, + logUploadedTime); + foundContainerLogs = true; } catch (EOFException eof) { break; } } - return 0; + if (foundContainerLogs) { + return 0; + } + return -1; } @Private @@ -157,13 +165,15 @@ public class LogCLIHelpers implements Configurable { valueStream = reader.next(key); while (valueStream != null) { + String containerString = "\n\nContainer: " + key + " on " + thisNodeFile.getPath().getName(); out.println(containerString); out.println(StringUtils.repeat("=", containerString.length())); while (true) { try { - LogReader.readAContainerLogsForALogType(valueStream, out); + LogReader.readAContainerLogsForALogType(valueStream, out, + thisNodeFile.getModificationTime()); foundAnyLogs = true; } catch (EOFException eof) { break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java index bba32581185..3e9f7a24758 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java @@ -126,6 +126,7 @@ public class AggregatedLogsBlock extends HtmlBlock { .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { continue; } + long logUploadedTime = thisNodeFile.getModificationTime(); reader = new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath()); @@ -164,7 +165,7 @@ public class AggregatedLogsBlock extends HtmlBlock { } foundLog = readContainerLogs(html, logReader, logLimits, - desiredLogType); + desiredLogType, logUploadedTime); } catch (IOException ex) { LOG.error("Error getting logs for " + logEntity, ex); continue; @@ -189,7 +190,7 @@ public class AggregatedLogsBlock extends HtmlBlock { private boolean readContainerLogs(Block html, AggregatedLogFormat.ContainerLogsReader logReader, LogLimits logLimits, - String desiredLogType) throws IOException { + String desiredLogType, long logUpLoadTime) throws IOException { int bufferSize = 65536; char[] cbuf = new char[bufferSize]; @@ -199,13 +200,12 @@ public class AggregatedLogsBlock extends HtmlBlock { if (desiredLogType == null || desiredLogType.isEmpty() || desiredLogType.equals(logType)) { long logLength = logReader.getCurrentLogLength(); - long logUpLoadTime = logReader.getCurrentLogUpLoadTime(); if (foundLog) { html.pre()._("\n\n")._(); } html.p()._("Log Type: " + logType)._(); - html.p()._("Log UpLoadTime: " + Times.format(logUpLoadTime))._(); + html.p()._("Log Upload Time: " + Times.format(logUpLoadTime))._(); html.p()._("Log Length: " + Long.toString(logLength))._(); long start = logLimits.start < 0 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java index bc0485ed4b8..405cb3d52a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.util.Times; import org.junit.After; import org.junit.Assume; import org.junit.Before; @@ -178,9 +179,16 @@ public class TestAggregatedLogFormat { logWriter.close(); } - //Verify the output generated by readAContainerLogs(DataInputStream, Writer) @Test public void testReadAcontainerLogs1() throws Exception { + //Verify the output generated by readAContainerLogs(DataInputStream, Writer, logUploadedTime) + testReadAcontainerLog(true); + + //Verify the output generated by readAContainerLogs(DataInputStream, Writer) + testReadAcontainerLog(false); + } + + private void testReadAcontainerLog(boolean logUploadedTime) throws Exception { Configuration conf = new Configuration(); File workDir = new File(testWorkDir, "testReadAcontainerLogs1"); Path remoteAppLogFile = @@ -233,17 +241,23 @@ public class TestAggregatedLogFormat { LogKey rLogKey = new LogKey(); DataInputStream dis = logReader.next(rLogKey); Writer writer = new StringWriter(); - LogReader.readAcontainerLogs(dis, writer); - + + if (logUploadedTime) { + LogReader.readAcontainerLogs(dis, writer, System.currentTimeMillis()); + } else { + LogReader.readAcontainerLogs(dis, writer); + } + // We should only do the log aggregation for stdout. // Since we could not open the fileInputStream for stderr, this file is not // aggregated. String s = writer.toString(); int expectedLength = - "\n\nLogType:stdout".length() - + ("\nLogUploadTime:" + System.currentTimeMillis()).length() + "LogType:stdout".length() + + (logUploadedTime ? ("\nLog Upload Time:" + Times.format(System + .currentTimeMillis())).length() : 0) + ("\nLogLength:" + numChars).length() - + "\nLog Contents:\n".length() + numChars; + + "\nLog Contents:\n".length() + numChars + "\n".length(); Assert.assertTrue("LogType not matched", s.contains("LogType:stdout")); Assert.assertTrue("log file:stderr should not be aggregated.", !s.contains("LogType:stderr")); Assert.assertTrue("log file:logs should not be aggregated.", !s.contains("LogType:logs")); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 419de88a601..7d911e99c58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -767,30 +767,27 @@ public class TestLogAggregationService extends BaseContainerManagerTest { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream ps = new PrintStream(baos); + LogReader.readAContainerLogsForALogType(valueStream, ps); String writtenLines[] = baos.toString().split( System.getProperty("line.separator")); Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8)); - String fileType = writtenLines[0].substring(9); + String fileType = writtenLines[0].substring(8); - Assert.assertEquals("LogUploadTime:", writtenLines[1].substring(0, 14)); - String fileUploadedTimeStr = writtenLines[1].substring(15); - - Assert.assertEquals("LogLength:", writtenLines[2].substring(0, 10)); - String fileLengthStr = writtenLines[2].substring(11); + Assert.assertEquals("LogLength:", writtenLines[1].substring(0, 10)); + String fileLengthStr = writtenLines[1].substring(10); long fileLength = Long.parseLong(fileLengthStr); Assert.assertEquals("Log Contents:", - writtenLines[3].substring(0, 13)); + writtenLines[2].substring(0, 13)); String logContents = StringUtils.join( - Arrays.copyOfRange(writtenLines, 4, writtenLines.length), "\n"); + Arrays.copyOfRange(writtenLines, 3, writtenLines.length), "\n"); perContainerMap.put(fileType, logContents); LOG.info("LogType:" + fileType); - LOG.info("LogUploadTime:" + fileUploadedTimeStr); LOG.info("LogLength:" + fileLength); LOG.info("Log Contents:\n" + perContainerMap.get(fileType)); } catch (EOFException eof) {