diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index d9b4c1e4c86..ca43fe6ad96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -26,6 +26,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintStream; @@ -1013,7 +1014,7 @@ public void close() { } @Private - public static class ContainerLogsReader { + public static class ContainerLogsReader extends InputStream { private DataInputStream valueStream; private String currentLogType = null; private long currentLogLength = 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java index 784102b0337..4ec8794b145 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java @@ -24,6 +24,10 @@ import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME; import com.google.inject.Inject; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -34,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.logaggregation.LogAggregationWebUtils; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet; import org.apache.hadoop.yarn.webapp.view.HtmlBlock; /** @@ -143,6 +148,62 @@ protected boolean checkAcls(Configuration conf, ApplicationId appId, return true; } + protected long[] checkParseRange(Block html, long startIndex, + long endIndex, long startTime, long endTime, long logLength, String logType) { + long start = startIndex < 0 + ? logLength + startIndex : startIndex; + start = start < 0 ? 0 : start; + start = start > logLength ? logLength : start; + long end = endIndex < 0 + ? logLength + endIndex : endIndex; + end = end < 0 ? 0 : end; + end = end > logLength ? logLength : end; + end = end < start ? start : end; + + long toRead = end - start; + if (toRead < logLength) { + html.p().__("Showing " + toRead + " bytes of " + logLength + + " total. Click ").a(url("logs", $(NM_NODENAME), $(CONTAINER_ID), + $(ENTITY_STRING), $(APP_OWNER), + logType, "?start=0&start.time=" + startTime + + "&end.time=" + endTime), "here"). + __(" for the full log.").__(); + } + return new long[]{start, end}; + } + + protected void processContainerLog(Block html, long[] range, InputStream in, + int bufferSize, byte[] cbuf) throws IOException { + long totalSkipped = 0; + long start = range[0]; + long toRead = range[1] - range[0]; + while (totalSkipped < start) { + long ret = in.skip(start - totalSkipped); + if (ret == 0) { + //Read one byte + int nextByte = in.read(); + // Check if we have reached EOF + if (nextByte == -1) { + throw new IOException("Premature EOF from container log"); + } + ret = 1; + } + totalSkipped += ret; + } + + int len = 0; + int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; + Hamlet.PRE pre = html.pre(); + + while (toRead > 0 && (len = in.read(cbuf, 0, currentToRead)) > 0) { + pre.__(new String(cbuf, 0, len, Charset.forName("UTF-8"))); + toRead = toRead - len; + currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; + } + + pre.__(); + } + protected static class BlockParameters { private ApplicationId appId; private ContainerId containerId; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java index 4ef429d498b..eb9634bc31b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java @@ -18,11 +18,7 @@ package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile; -import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER; -import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID; import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE; -import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING; -import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME; import com.google.inject.Inject; import java.io.IOException; @@ -53,7 +49,6 @@ import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedPerAggregationLogMeta; import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet; -import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet.PRE; /** * The Aggregated Logs Block implementation for Indexed File. @@ -179,88 +174,8 @@ protected void render(Block html) { continue; } - Algorithm compressName = Compression.getCompressionAlgorithmByName( - compressAlgo); - Decompressor decompressor = compressName.getDecompressor(); - FileContext fileContext = FileContext.getFileContext( - thisNodeFile.getPath().toUri(), conf); - FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath()); - int bufferSize = 65536; - for (IndexedFileLogMeta candidate : candidates) { - if (candidate.getLastModifiedTime() < startTime - || candidate.getLastModifiedTime() > endTime) { - continue; - } - byte[] cbuf = new byte[bufferSize]; - InputStream in = null; - try { - in = compressName.createDecompressionStream( - new BoundedRangeFileInputStream(fsin, - candidate.getStartIndex(), - candidate.getFileCompressedSize()), - decompressor, - LogAggregationIndexedFileController.getFSInputBufferSize( - conf)); - long logLength = candidate.getFileSize(); - html.pre().__("\n\n").__(); - html.p().__("Log Type: " + candidate.getFileName()).__(); - html.p().__("Log Upload Time: " + Times.format( - candidate.getLastModifiedTime())).__(); - html.p().__("Log Length: " + Long.toString( - logLength)).__(); - long startIndex = start < 0 - ? logLength + start : start; - startIndex = startIndex < 0 ? 0 : startIndex; - startIndex = startIndex > logLength ? logLength : startIndex; - long endLogIndex = end < 0 - ? logLength + end : end; - endLogIndex = endLogIndex < 0 ? 0 : endLogIndex; - endLogIndex = endLogIndex > logLength ? logLength : endLogIndex; - endLogIndex = endLogIndex < startIndex ? - startIndex : endLogIndex; - long toRead = endLogIndex - startIndex; - if (toRead < logLength) { - html.p().__("Showing " + toRead + " bytes of " + logLength - + " total. Click ").a(url("logs", $(NM_NODENAME), - $(CONTAINER_ID), $(ENTITY_STRING), $(APP_OWNER), - candidate.getFileName(), "?start=0&start.time=" - + startTime + "&end.time=" + endTime), "here"). - __(" for the full log.").__(); - } - long totalSkipped = 0; - while (totalSkipped < startIndex) { - long ret = in.skip(startIndex - totalSkipped); - if (ret == 0) { - //Read one byte - int nextByte = in.read(); - // Check if we have reached EOF - if (nextByte == -1) { - throw new IOException("Premature EOF from container log"); - } - ret = 1; - } - totalSkipped += ret; - } - int len = 0; - int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; - PRE pre = html.pre(); - - while (toRead > 0 - && (len = in.read(cbuf, 0, currentToRead)) > 0) { - pre.__(new String(cbuf, 0, len, Charset.forName("UTF-8"))); - toRead = toRead - len; - currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; - } - - pre.__(); - foundLog = true; - } catch (Exception ex) { - LOG.error("Error getting logs for " + logEntity, ex); - continue; - } finally { - IOUtils.closeQuietly(in); - } - } + foundLog = readContainerLog(compressAlgo, html, thisNodeFile, start, + end, candidates, startTime, endTime, foundLog, logEntity); } if (!foundLog) { if (desiredLogType.isEmpty()) { @@ -277,4 +192,51 @@ protected void render(Block html) { LOG.error("Error getting logs for " + logEntity, ex); } } + + private boolean readContainerLog(String compressAlgo, Block html, + FileStatus thisNodeFile, long start, long end, + List candidates, long startTime, long endTime, + boolean foundLog, String logEntity) throws IOException { + Algorithm compressName = Compression.getCompressionAlgorithmByName( + compressAlgo); + Decompressor decompressor = compressName.getDecompressor(); + FileContext fileContext = FileContext.getFileContext( + thisNodeFile.getPath().toUri(), conf); + FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath()); + int bufferSize = 65536; + for (IndexedFileLogMeta candidate : candidates) { + if (candidate.getLastModifiedTime() < startTime + || candidate.getLastModifiedTime() > endTime) { + continue; + } + byte[] cbuf = new byte[bufferSize]; + InputStream in = null; + try { + in = compressName.createDecompressionStream( + new BoundedRangeFileInputStream(fsin, candidate.getStartIndex(), + candidate.getFileCompressedSize()), decompressor, + LogAggregationIndexedFileController.getFSInputBufferSize(conf)); + long logLength = candidate.getFileSize(); + html.pre().__("\n\n").__(); + html.p().__("Log Type: " + candidate.getFileName()).__(); + html.p().__( + "Log Upload Time: " + Times.format(candidate.getLastModifiedTime())) + .__(); + html.p().__("Log Length: " + Long.toString(logLength)).__(); + + long[] range = checkParseRange(html, start, end, startTime, endTime, + logLength, candidate.getFileName()); + processContainerLog(html, range, in, bufferSize, cbuf); + + foundLog = true; + } catch (Exception ex) { + LOG.error("Error getting logs for " + logEntity, ex); + continue; + } finally { + IOUtils.closeQuietly(in); + } + } + return foundLog; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java index 64b62197525..6fb5b90bd8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java @@ -18,14 +18,11 @@ package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile; -import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER; -import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID; import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE; -import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING; -import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME; import com.google.inject.Inject; import java.io.IOException; +import java.io.InputStream; import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -173,7 +170,7 @@ private boolean readContainerLogs(Block html, long endIndex, String desiredLogType, long logUpLoadTime, long startTime, long endTime) throws IOException { int bufferSize = 65536; - char[] cbuf = new char[bufferSize]; + byte[] cbuf = new byte[bufferSize]; boolean foundLog = false; String logType = logReader.nextLog(); @@ -189,53 +186,10 @@ private boolean readContainerLogs(Block html, html.p().__("Log Upload Time: " + Times.format(logUpLoadTime)).__(); html.p().__("Log Length: " + Long.toString(logLength)).__(); - long start = startIndex < 0 - ? logLength + startIndex : startIndex; - start = start < 0 ? 0 : start; - start = start > logLength ? logLength : start; - long end = endIndex < 0 - ? logLength + endIndex : endIndex; - end = end < 0 ? 0 : end; - end = end > logLength ? logLength : end; - end = end < start ? start : end; + long[] range = checkParseRange(html, startIndex, endIndex, startTime, + endTime, logLength, logType); - long toRead = end - start; - if (toRead < logLength) { - html.p().__("Showing " + toRead + " bytes of " + logLength - + " total. Click ").a(url("logs", $(NM_NODENAME), $(CONTAINER_ID), - $(ENTITY_STRING), $(APP_OWNER), - logType, "?start=0&start.time=" + startTime - + "&end.time=" + endTime), "here"). - __(" for the full log.").__(); - } - - long totalSkipped = 0; - while (totalSkipped < start) { - long ret = logReader.skip(start - totalSkipped); - if (ret == 0) { - //Read one byte - int nextByte = logReader.read(); - // Check if we have reached EOF - if (nextByte == -1) { - throw new IOException("Premature EOF from container log"); - } - ret = 1; - } - totalSkipped += ret; - } - - int len = 0; - int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; - PRE pre = html.pre(); - - while (toRead > 0 - && (len = logReader.read(cbuf, 0, currentToRead)) > 0) { - pre.__(new String(cbuf, 0, len)); - toRead = toRead - len; - currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; - } - - pre.__(); + processContainerLog(html, range, logReader, bufferSize, cbuf); foundLog = true; }