YARN-5418. When partial log aggregation is enabled, display the list of aggregated files on the container log page. (Xuan Gong via wangda)

Change-Id: I1befb0bbaeb89fb315bafe3e2f3379663f8cf1ec
This commit is contained in:
Wangda Tan 2017-12-15 15:38:36 -08:00
parent aa503a29d0
commit 631b5c2db7
8 changed files with 268 additions and 18 deletions

View File

@ -121,4 +121,34 @@ public final class LogAggregationWebUtils {
}
return appOwner;
}
/**
* Parse log start time from html.
* @param startStr the start time string
* @return the startIndex
*/
public static long getLogStartTime(String startStr)
throws NumberFormatException {
long start = 0;
if (startStr != null && !startStr.isEmpty()) {
start = Long.parseLong(startStr);
}
return start;
}
/**
* Parse log end time from html.
* @param endStr the end time string
* @return the endIndex
*/
public static long getLogEndTime(String endStr)
throws NumberFormatException {
long end = Long.MAX_VALUE;
if (endStr != null && !endStr.isEmpty()) {
end = Long.parseLong(endStr);
}
return end;
}
}

View File

@ -494,4 +494,5 @@ public abstract class LogAggregationFileController {
+ "\n\n");
return sb.toString();
}
}

View File

@ -77,11 +77,36 @@ public abstract class LogAggregationHtmlBlock extends HtmlBlock {
end = LogAggregationWebUtils.getLogEndIndex(
html, $("end"));
} catch (NumberFormatException ne) {
html.h1().__("Invalid log start value: " + $("end")).__();
html.h1().__("Invalid log end value: " + $("end")).__();
isValid = false;
}
params.setEndIndex(end);
long startTime = 0;
try {
startTime = LogAggregationWebUtils.getLogStartTime(
$("start.time"));
} catch (NumberFormatException ne) {
html.h1().__("Invalid log start time value: " + $("start.time")).__();
isValid = false;
}
params.setStartTime(startTime);
long endTime = Long.MAX_VALUE;
try {
endTime = LogAggregationWebUtils.getLogEndTime(
$("end.time"));
if (endTime < startTime) {
html.h1().__("Invalid log end time value: " + $("end.time") +
". It should be larger than start time value:" + startTime).__();
isValid = false;
}
} catch (NumberFormatException ne) {
html.h1().__("Invalid log end time value: " + $("end.time")).__();
isValid = false;
}
params.setEndTime(endTime);
if (containerId == null || nodeId == null || appOwner == null
|| appOwner.isEmpty() || !isValid) {
return null;
@ -126,6 +151,8 @@ public abstract class LogAggregationHtmlBlock extends HtmlBlock {
private long start;
private long end;
private String logEntity;
private long startTime;
private long endTime;
public ApplicationId getAppId() {
return appId;
@ -182,5 +209,21 @@ public abstract class LogAggregationHtmlBlock extends HtmlBlock {
public void setLogEntity(String logEntity) {
this.logEntity = logEntity;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public long getEndTime() {
return endTime;
}
public void setEndTime(long endTime) {
this.endTime = endTime;
}
}
}

View File

@ -87,6 +87,8 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
String logEntity = params.getLogEntity();
long start = params.getStartIndex();
long end = params.getEndIndex();
long startTime = params.getStartTime();
long endTime = params.getEndTime();
List<FileStatus> nodeFiles = null;
try {
@ -185,6 +187,10 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath());
int bufferSize = 65536;
for (IndexedFileLogMeta candidate : candidates) {
if (candidate.getLastModificatedTime() < startTime
|| candidate.getLastModificatedTime() > endTime) {
continue;
}
byte[] cbuf = new byte[bufferSize];
InputStream in = null;
try {
@ -217,7 +223,8 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
html.p().__("Showing " + toRead + " bytes of " + logLength
+ " total. Click ").a(url("logs", $(NM_NODENAME),
$(CONTAINER_ID), $(ENTITY_STRING), $(APP_OWNER),
candidate.getFileName(), "?start=0"), "here").
candidate.getFileName(), "?start=0&start.time="
+ startTime + "&end.time=" + endTime), "here").
__(" for the full log.").__();
}
long totalSkipped = 0;

View File

@ -85,6 +85,8 @@ public class TFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
ContainerId containerId = params.getContainerId();
long start = params.getStartIndex();
long end = params.getEndIndex();
long startTime = params.getStartTime();
long endTime = params.getEndTime();
boolean foundLog = false;
String desiredLogType = $(CONTAINER_LOG_TYPE);
@ -107,6 +109,9 @@ public class TFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
continue;
}
long logUploadedTime = thisNodeFile.getModificationTime();
if (logUploadedTime < startTime || logUploadedTime > endTime) {
continue;
}
reader = new AggregatedLogFormat.LogReader(
conf, thisNodeFile.getPath());
@ -138,7 +143,7 @@ public class TFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
}
foundLog = readContainerLogs(html, logReader, start, end,
desiredLogType, logUploadedTime);
desiredLogType, logUploadedTime, startTime, endTime);
} catch (IOException ex) {
LOG.error("Error getting logs for " + logEntity, ex);
continue;
@ -165,8 +170,8 @@ public class TFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
private boolean readContainerLogs(Block html,
AggregatedLogFormat.ContainerLogsReader logReader, long startIndex,
long endIndex, String desiredLogType, long logUpLoadTime)
throws IOException {
long endIndex, String desiredLogType, long logUpLoadTime,
long startTime, long endTime) throws IOException {
int bufferSize = 65536;
char[] cbuf = new char[bufferSize];
@ -199,7 +204,8 @@ public class TFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
html.p().__("Showing " + toRead + " bytes of " + logLength
+ " total. Click ").a(url("logs", $(NM_NODENAME), $(CONTAINER_ID),
$(ENTITY_STRING), $(APP_OWNER),
logType, "?start=0"), "here").
logType, "?start=0&start.time=" + startTime
+ "&end.time=" + endTime), "here").
__(" for the full log.").__();
}

View File

@ -498,6 +498,27 @@ public class WebAppUtils {
return null;
}
/**
* Get a query string.
* @param request HttpServletRequest with the request details
* @return the query parameter string
*/
public static List<NameValuePair> getURLEncodedQueryParam(
HttpServletRequest request) {
String queryString = request.getQueryString();
if (queryString != null && !queryString.isEmpty()) {
String reqEncoding = request.getCharacterEncoding();
if (reqEncoding == null || reqEncoding.isEmpty()) {
reqEncoding = "ISO-8859-1";
}
Charset encoding = Charset.forName(reqEncoding);
List<NameValuePair> params = URLEncodedUtils.parse(queryString,
encoding);
return params;
}
return null;
}
/**
* Get a query string which removes the passed parameter.
* @param httpRequest HttpServletRequest with the request details

View File

@ -29,13 +29,25 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.SubView;
@ -43,13 +55,19 @@ import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet.PRE;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
public class ContainerLogsPage extends NMView {
public static final Logger LOG = LoggerFactory.getLogger(
ContainerLogsPage.class);
public static final String REDIRECT_URL = "redirect.url";
public static final String LOG_AGGREGATION_TYPE = "log.aggregation.type";
public static final String LOG_AGGREGATION_REMOTE_TYPE = "remote";
public static final String LOG_AGGREGATION_LOCAL_TYPE = "local";
@Override protected void preHead(Page.HTML<__> html) {
String redirectUrl = $(REDIRECT_URL);
if (redirectUrl == null || redirectUrl.isEmpty()) {
@ -73,10 +91,13 @@ public class ContainerLogsPage extends NMView {
public static class ContainersLogsBlock extends HtmlBlock implements
YarnWebParams {
private final Context nmContext;
private final LogAggregationFileControllerFactory factory;
@Inject
public ContainersLogsBlock(Context context) {
this.nmContext = context;
this.factory = new LogAggregationFileControllerFactory(
context.getConf());
}
@Override
@ -85,27 +106,73 @@ public class ContainerLogsPage extends NMView {
String redirectUrl = $(REDIRECT_URL);
if (redirectUrl !=null && redirectUrl.equals("false")) {
html.h1("Failed while trying to construct the redirect url to the log" +
" server. Log Server url may not be configured");
" server. Log Server url may not be configured");
//Intentional fallthrough.
}
ContainerId containerId;
ApplicationId appId;
try {
containerId = ContainerId.fromString($(CONTAINER_ID));
appId = containerId.getApplicationAttemptId().getApplicationId();
} catch (IllegalArgumentException ex) {
html.h1("Invalid container ID: " + $(CONTAINER_ID));
return;
}
LogAggregationFileController fileController = null;
boolean foundAggregatedLogs = false;
try {
fileController = this.factory.getFileControllerForRead(
appId, $(APP_OWNER));
foundAggregatedLogs = true;
} catch (IOException fnf) {
// Do Nothing
}
try {
if ($(CONTAINER_LOG_TYPE).isEmpty()) {
html.h2("Local Logs:");
List<File> logFiles = ContainerLogsUtils.getContainerLogDirs(containerId,
request().getRemoteUser(), nmContext);
printLogFileDirectory(html, logFiles);
printLocalLogFileDirectory(html, logFiles);
if (foundAggregatedLogs) {
// print out the aggregated logs if exists
try {
ContainerLogsRequest logRequest = new ContainerLogsRequest();
logRequest.setAppId(appId);
logRequest.setAppOwner($(APP_OWNER));
logRequest.setContainerId($(CONTAINER_ID));
logRequest.setNodeId(this.nmContext.getNodeId().toString());
List<ContainerLogMeta> containersLogMeta = fileController
.readAggregatedLogsMeta(logRequest);
if (containersLogMeta != null && !containersLogMeta.isEmpty()) {
html.h2("Aggregated Logs:");
printAggregatedLogFileDirectory(html, containersLogMeta);
}
} catch (Exception ex) {
if (LOG.isDebugEnabled()) {
LOG.debug(ex.getMessage());
}
}
}
} else {
File logFile = ContainerLogsUtils.getContainerLogFile(containerId,
$(CONTAINER_LOG_TYPE), request().getRemoteUser(), nmContext);
printLogFile(html, logFile);
String aggregationType = $(LOG_AGGREGATION_TYPE);
if (aggregationType == null || aggregationType.isEmpty() ||
aggregationType.trim().toLowerCase().equals(
LOG_AGGREGATION_LOCAL_TYPE)) {
File logFile = ContainerLogsUtils.getContainerLogFile(containerId,
$(CONTAINER_LOG_TYPE), request().getRemoteUser(), nmContext);
printLocalLogFile(html, logFile);
} else if (!LOG_AGGREGATION_LOCAL_TYPE.trim().toLowerCase().equals(
aggregationType) && !LOG_AGGREGATION_REMOTE_TYPE.trim()
.toLowerCase().equals(aggregationType)) {
html.h1("Invalid value for query parameter: "
+ LOG_AGGREGATION_TYPE + ". "
+ "The valid value could be either "
+ LOG_AGGREGATION_LOCAL_TYPE + " or "
+ LOG_AGGREGATION_REMOTE_TYPE + ".");
}
}
} catch (YarnException ex) {
html.h1(ex.getMessage());
@ -114,7 +181,7 @@ public class ContainerLogsPage extends NMView {
}
}
private void printLogFile(Block html, File logFile) {
private void printLocalLogFile(Block html, File logFile) {
long start =
$("start").isEmpty() ? -4 * 1024 : Long.parseLong($("start"));
start = start < 0 ? logFile.length() + start : start;
@ -184,7 +251,8 @@ public class ContainerLogsPage extends NMView {
}
}
private void printLogFileDirectory(Block html, List<File> containerLogsDirs) {
private void printLocalLogFileDirectory(Block html,
List<File> containerLogsDirs) {
// Print out log types in lexical order
Collections.sort(containerLogsDirs);
boolean foundLogFile = false;
@ -207,5 +275,62 @@ public class ContainerLogsPage extends NMView {
return;
}
}
private void printAggregatedLogFileDirectory(Block html,
List<ContainerLogMeta> containersLogMeta) throws ParseException {
List<ContainerLogFileInfo> filesInfo = new ArrayList<>();
for (ContainerLogMeta logMeta : containersLogMeta) {
filesInfo.addAll(logMeta.getContainerLogMeta());
}
//sort the list, so we could list the log file in order.
Collections.sort(filesInfo, new Comparator<ContainerLogFileInfo>() {
@Override
public int compare(ContainerLogFileInfo o1,
ContainerLogFileInfo o2) {
return createAggregatedLogFileName(o1.getFileName(),
o1.getLastModifiedTime()).compareTo(
createAggregatedLogFileName(o2.getFileName(),
o2.getLastModifiedTime()));
}
});
boolean foundLogFile = false;
for (ContainerLogFileInfo fileInfo : filesInfo) {
long timestamp = convertDateToTimeStamp(fileInfo.getLastModifiedTime());
foundLogFile = true;
String fileName = createAggregatedLogFileName(fileInfo.getFileName(),
fileInfo.getLastModifiedTime());
html.p().a(url("containerlogs", $(CONTAINER_ID), $(APP_OWNER),
fileInfo.getFileName(),
"?start=-4096&" + LOG_AGGREGATION_TYPE + "="
+ LOG_AGGREGATION_REMOTE_TYPE + "&start.time="
+ (timestamp - 1000) + "&end.time=" + (timestamp + 1000)),
fileName + " : Total file length is "
+ fileInfo.getFileSize() + " bytes.").__();
}
if (!foundLogFile) {
html.h4("No aggregated logs available for container "
+ $(CONTAINER_ID));
return;
}
}
private String createAggregatedLogFileName(String fileName,
String modificationTime) {
return fileName + "_" + modificationTime;
}
private long convertDateToTimeStamp(String dateTime)
throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat(
"EEE MMM dd HH:mm:ss Z yyyy");
Date d = sdf.parse(dateTime);
Calendar c = Calendar.getInstance();
c.setTime(d);
return c.getTimeInMillis();
}
}
}

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.webapp;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.servlet.FilterChain;
@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.webapp.Controller.RequestContext;
import com.google.inject.Injector;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.http.NameValuePair;
@Singleton
public class NMWebAppFilter extends GuiceContainer{
@ -93,8 +94,24 @@ public class NMWebAppFilter extends GuiceContainer{
ApplicationId appId =
containerId.getApplicationAttemptId().getApplicationId();
Application app = nmContext.getApplications().get(appId);
boolean fetchAggregatedLog = false;
List<NameValuePair> params = WebAppUtils.getURLEncodedQueryParam(
request);
if (params != null) {
for (NameValuePair param : params) {
if (param.getName().equals(ContainerLogsPage
.LOG_AGGREGATION_TYPE)) {
if (param.getValue().equals(ContainerLogsPage
.LOG_AGGREGATION_REMOTE_TYPE)) {
fetchAggregatedLog = true;
}
}
}
}
Configuration nmConf = nmContext.getLocalDirsHandler().getConfig();
if (app == null
if ((app == null || fetchAggregatedLog)
&& nmConf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
String logServerUrl =