YARN-7417. Remove duplicated code from IndexedFileAggregatedLogsBlock

and TFileAggregatedLogsBlock.
           Contributed by Zian Chen
This commit is contained in:
Eric Yang 2018-08-13 16:50:00 -04:00
parent b4031a8f1b
commit 74411ce0ce
4 changed files with 117 additions and 139 deletions

View File

@ -26,6 +26,7 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
@ -1013,7 +1014,7 @@ public void close() {
}
@Private
public static class ContainerLogsReader {
public static class ContainerLogsReader extends InputStream {
private DataInputStream valueStream;
private String currentLogType = null;
private long currentLogLength = 0;

View File

@ -24,6 +24,10 @@
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
import com.google.inject.Inject;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -34,6 +38,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.logaggregation.LogAggregationWebUtils;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
/**
@ -143,6 +148,62 @@ protected boolean checkAcls(Configuration conf, ApplicationId appId,
return true;
}
protected long[] checkParseRange(Block html, long startIndex,
long endIndex, long startTime, long endTime, long logLength, String logType) {
long start = startIndex < 0
? logLength + startIndex : startIndex;
start = start < 0 ? 0 : start;
start = start > logLength ? logLength : start;
long end = endIndex < 0
? logLength + endIndex : endIndex;
end = end < 0 ? 0 : end;
end = end > logLength ? logLength : end;
end = end < start ? start : end;
long toRead = end - start;
if (toRead < logLength) {
html.p().__("Showing " + toRead + " bytes of " + logLength
+ " total. Click ").a(url("logs", $(NM_NODENAME), $(CONTAINER_ID),
$(ENTITY_STRING), $(APP_OWNER),
logType, "?start=0&start.time=" + startTime
+ "&end.time=" + endTime), "here").
__(" for the full log.").__();
}
return new long[]{start, end};
}
protected void processContainerLog(Block html, long[] range, InputStream in,
int bufferSize, byte[] cbuf) throws IOException {
long totalSkipped = 0;
long start = range[0];
long toRead = range[1] - range[0];
while (totalSkipped < start) {
long ret = in.skip(start - totalSkipped);
if (ret == 0) {
//Read one byte
int nextByte = in.read();
// Check if we have reached EOF
if (nextByte == -1) {
throw new IOException("Premature EOF from container log");
}
ret = 1;
}
totalSkipped += ret;
}
int len = 0;
int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
Hamlet.PRE<Hamlet> pre = html.pre();
while (toRead > 0 && (len = in.read(cbuf, 0, currentToRead)) > 0) {
pre.__(new String(cbuf, 0, len, Charset.forName("UTF-8")));
toRead = toRead - len;
currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
}
pre.__();
}
protected static class BlockParameters {
private ApplicationId appId;
private ContainerId containerId;

View File

@ -18,11 +18,7 @@
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
import com.google.inject.Inject;
import java.io.IOException;
@ -53,7 +49,6 @@
import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController.IndexedPerAggregationLogMeta;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet.PRE;
/**
* The Aggregated Logs Block implementation for Indexed File.
@ -179,88 +174,8 @@ protected void render(Block html) {
continue;
}
Algorithm compressName = Compression.getCompressionAlgorithmByName(
compressAlgo);
Decompressor decompressor = compressName.getDecompressor();
FileContext fileContext = FileContext.getFileContext(
thisNodeFile.getPath().toUri(), conf);
FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath());
int bufferSize = 65536;
for (IndexedFileLogMeta candidate : candidates) {
if (candidate.getLastModifiedTime() < startTime
|| candidate.getLastModifiedTime() > endTime) {
continue;
}
byte[] cbuf = new byte[bufferSize];
InputStream in = null;
try {
in = compressName.createDecompressionStream(
new BoundedRangeFileInputStream(fsin,
candidate.getStartIndex(),
candidate.getFileCompressedSize()),
decompressor,
LogAggregationIndexedFileController.getFSInputBufferSize(
conf));
long logLength = candidate.getFileSize();
html.pre().__("\n\n").__();
html.p().__("Log Type: " + candidate.getFileName()).__();
html.p().__("Log Upload Time: " + Times.format(
candidate.getLastModifiedTime())).__();
html.p().__("Log Length: " + Long.toString(
logLength)).__();
long startIndex = start < 0
? logLength + start : start;
startIndex = startIndex < 0 ? 0 : startIndex;
startIndex = startIndex > logLength ? logLength : startIndex;
long endLogIndex = end < 0
? logLength + end : end;
endLogIndex = endLogIndex < 0 ? 0 : endLogIndex;
endLogIndex = endLogIndex > logLength ? logLength : endLogIndex;
endLogIndex = endLogIndex < startIndex ?
startIndex : endLogIndex;
long toRead = endLogIndex - startIndex;
if (toRead < logLength) {
html.p().__("Showing " + toRead + " bytes of " + logLength
+ " total. Click ").a(url("logs", $(NM_NODENAME),
$(CONTAINER_ID), $(ENTITY_STRING), $(APP_OWNER),
candidate.getFileName(), "?start=0&start.time="
+ startTime + "&end.time=" + endTime), "here").
__(" for the full log.").__();
}
long totalSkipped = 0;
while (totalSkipped < startIndex) {
long ret = in.skip(startIndex - totalSkipped);
if (ret == 0) {
//Read one byte
int nextByte = in.read();
// Check if we have reached EOF
if (nextByte == -1) {
throw new IOException("Premature EOF from container log");
}
ret = 1;
}
totalSkipped += ret;
}
int len = 0;
int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
PRE<Hamlet> pre = html.pre();
while (toRead > 0
&& (len = in.read(cbuf, 0, currentToRead)) > 0) {
pre.__(new String(cbuf, 0, len, Charset.forName("UTF-8")));
toRead = toRead - len;
currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
}
pre.__();
foundLog = true;
} catch (Exception ex) {
LOG.error("Error getting logs for " + logEntity, ex);
continue;
} finally {
IOUtils.closeQuietly(in);
}
}
foundLog = readContainerLog(compressAlgo, html, thisNodeFile, start,
end, candidates, startTime, endTime, foundLog, logEntity);
}
if (!foundLog) {
if (desiredLogType.isEmpty()) {
@ -277,4 +192,51 @@ protected void render(Block html) {
LOG.error("Error getting logs for " + logEntity, ex);
}
}
private boolean readContainerLog(String compressAlgo, Block html,
FileStatus thisNodeFile, long start, long end,
List<IndexedFileLogMeta> candidates, long startTime, long endTime,
boolean foundLog, String logEntity) throws IOException {
Algorithm compressName = Compression.getCompressionAlgorithmByName(
compressAlgo);
Decompressor decompressor = compressName.getDecompressor();
FileContext fileContext = FileContext.getFileContext(
thisNodeFile.getPath().toUri(), conf);
FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath());
int bufferSize = 65536;
for (IndexedFileLogMeta candidate : candidates) {
if (candidate.getLastModifiedTime() < startTime
|| candidate.getLastModifiedTime() > endTime) {
continue;
}
byte[] cbuf = new byte[bufferSize];
InputStream in = null;
try {
in = compressName.createDecompressionStream(
new BoundedRangeFileInputStream(fsin, candidate.getStartIndex(),
candidate.getFileCompressedSize()), decompressor,
LogAggregationIndexedFileController.getFSInputBufferSize(conf));
long logLength = candidate.getFileSize();
html.pre().__("\n\n").__();
html.p().__("Log Type: " + candidate.getFileName()).__();
html.p().__(
"Log Upload Time: " + Times.format(candidate.getLastModifiedTime()))
.__();
html.p().__("Log Length: " + Long.toString(logLength)).__();
long[] range = checkParseRange(html, start, end, startTime, endTime,
logLength, candidate.getFileName());
processContainerLog(html, range, in, bufferSize, cbuf);
foundLog = true;
} catch (Exception ex) {
LOG.error("Error getting logs for " + logEntity, ex);
continue;
} finally {
IOUtils.closeQuietly(in);
}
}
return foundLog;
}
}

View File

@ -18,14 +18,11 @@
package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
import com.google.inject.Inject;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -173,7 +170,7 @@ private boolean readContainerLogs(Block html,
long endIndex, String desiredLogType, long logUpLoadTime,
long startTime, long endTime) throws IOException {
int bufferSize = 65536;
char[] cbuf = new char[bufferSize];
byte[] cbuf = new byte[bufferSize];
boolean foundLog = false;
String logType = logReader.nextLog();
@ -189,53 +186,10 @@ private boolean readContainerLogs(Block html,
html.p().__("Log Upload Time: " + Times.format(logUpLoadTime)).__();
html.p().__("Log Length: " + Long.toString(logLength)).__();
long start = startIndex < 0
? logLength + startIndex : startIndex;
start = start < 0 ? 0 : start;
start = start > logLength ? logLength : start;
long end = endIndex < 0
? logLength + endIndex : endIndex;
end = end < 0 ? 0 : end;
end = end > logLength ? logLength : end;
end = end < start ? start : end;
long[] range = checkParseRange(html, startIndex, endIndex, startTime,
endTime, logLength, logType);
long toRead = end - start;
if (toRead < logLength) {
html.p().__("Showing " + toRead + " bytes of " + logLength
+ " total. Click ").a(url("logs", $(NM_NODENAME), $(CONTAINER_ID),
$(ENTITY_STRING), $(APP_OWNER),
logType, "?start=0&start.time=" + startTime
+ "&end.time=" + endTime), "here").
__(" for the full log.").__();
}
long totalSkipped = 0;
while (totalSkipped < start) {
long ret = logReader.skip(start - totalSkipped);
if (ret == 0) {
//Read one byte
int nextByte = logReader.read();
// Check if we have reached EOF
if (nextByte == -1) {
throw new IOException("Premature EOF from container log");
}
ret = 1;
}
totalSkipped += ret;
}
int len = 0;
int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
PRE<Hamlet> pre = html.pre();
while (toRead > 0
&& (len = logReader.read(cbuf, 0, currentToRead)) > 0) {
pre.__(new String(cbuf, 0, len));
toRead = toRead - len;
currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
}
pre.__();
processContainerLog(html, range, logReader, bufferSize, cbuf);
foundLog = true;
}