YARN-5742 Serve aggregated logs of historical apps from timeline service. Contributed by Rohith Sharma KS
(cherry picked from commit 8d1981806f
)
This commit is contained in:
parent
551e911493
commit
62d329cac0
|
@ -18,13 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
|
package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.nio.charset.Charset;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
@ -35,15 +29,16 @@ import javax.ws.rs.Path;
|
||||||
import javax.ws.rs.PathParam;
|
import javax.ws.rs.PathParam;
|
||||||
import javax.ws.rs.Produces;
|
import javax.ws.rs.Produces;
|
||||||
import javax.ws.rs.QueryParam;
|
import javax.ws.rs.QueryParam;
|
||||||
import javax.ws.rs.WebApplicationException;
|
|
||||||
import javax.ws.rs.core.Context;
|
import javax.ws.rs.core.Context;
|
||||||
import javax.ws.rs.core.GenericEntity;
|
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
import javax.ws.rs.core.StreamingOutput;
|
|
||||||
import javax.ws.rs.core.Response.ResponseBuilder;
|
import javax.ws.rs.core.Response.ResponseBuilder;
|
||||||
import javax.ws.rs.core.Response.Status;
|
import javax.ws.rs.core.Response.Status;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.sun.jersey.api.client.ClientHandlerException;
|
||||||
|
import com.sun.jersey.api.client.UniformInterfaceException;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -54,10 +49,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
|
||||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
|
||||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
|
|
||||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
||||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
|
import org.apache.hadoop.yarn.server.webapp.LogWebServiceUtils;
|
||||||
import org.apache.hadoop.yarn.server.webapp.WebServices;
|
import org.apache.hadoop.yarn.server.webapp.WebServices;
|
||||||
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
|
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||||
|
@ -65,21 +58,14 @@ import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.AppInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.AppInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
|
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||||
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
||||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
|
||||||
import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils;
|
|
||||||
import org.codehaus.jettison.json.JSONException;
|
|
||||||
import org.codehaus.jettison.json.JSONObject;
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import com.sun.jersey.api.client.ClientHandlerException;
|
import org.codehaus.jettison.json.JSONException;
|
||||||
import com.sun.jersey.api.client.UniformInterfaceException;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -271,15 +257,18 @@ public class AHSWebServices extends WebServices {
|
||||||
appInfo = super.getApp(req, res, appId.toString());
|
appInfo = super.getApp(req, res, appId.toString());
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
// directly find logs from HDFS.
|
// directly find logs from HDFS.
|
||||||
return getContainerLogMeta(appId, null, null, containerIdStr, false);
|
return LogWebServiceUtils
|
||||||
|
.getContainerLogMeta(factory, appId, null, null, containerIdStr,
|
||||||
|
false);
|
||||||
}
|
}
|
||||||
// if the application finishes, directly find logs
|
// if the application finishes, directly find logs
|
||||||
// from HDFS.
|
// from HDFS.
|
||||||
if (isFinishedState(appInfo.getAppState())) {
|
if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
|
||||||
return getContainerLogMeta(appId, null, null,
|
return LogWebServiceUtils
|
||||||
containerIdStr, false);
|
.getContainerLogMeta(factory, appId, null, null, containerIdStr,
|
||||||
|
false);
|
||||||
}
|
}
|
||||||
if (isRunningState(appInfo.getAppState())) {
|
if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
|
||||||
String appOwner = appInfo.getUser();
|
String appOwner = appInfo.getUser();
|
||||||
String nodeHttpAddress = null;
|
String nodeHttpAddress = null;
|
||||||
if (nmId != null && !nmId.isEmpty()) {
|
if (nmId != null && !nmId.isEmpty()) {
|
||||||
|
@ -301,8 +290,9 @@ public class AHSWebServices extends WebServices {
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
// return log meta for the aggregated logs if exists.
|
// return log meta for the aggregated logs if exists.
|
||||||
// It will also return empty log meta for the local logs.
|
// It will also return empty log meta for the local logs.
|
||||||
return getContainerLogMeta(appId, appOwner, null,
|
return LogWebServiceUtils
|
||||||
containerIdStr, true);
|
.getContainerLogMeta(factory, appId, appOwner, null,
|
||||||
|
containerIdStr, true);
|
||||||
}
|
}
|
||||||
nodeHttpAddress = containerInfo.getNodeHttpAddress();
|
nodeHttpAddress = containerInfo.getNodeHttpAddress();
|
||||||
// make sure nodeHttpAddress is not null and not empty. Otherwise,
|
// make sure nodeHttpAddress is not null and not empty. Otherwise,
|
||||||
|
@ -314,12 +304,14 @@ public class AHSWebServices extends WebServices {
|
||||||
// It will also return empty log meta for the local logs.
|
// It will also return empty log meta for the local logs.
|
||||||
// If this is the redirect request from NM, we should not
|
// If this is the redirect request from NM, we should not
|
||||||
// re-direct the request back. Simply output the aggregated log meta.
|
// re-direct the request back. Simply output the aggregated log meta.
|
||||||
return getContainerLogMeta(appId, appOwner, null,
|
return LogWebServiceUtils
|
||||||
containerIdStr, true);
|
.getContainerLogMeta(factory, appId, appOwner, null,
|
||||||
|
containerIdStr, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
String uri = "/" + containerId.toString() + "/logs";
|
String uri = "/" + containerId.toString() + "/logs";
|
||||||
String resURI = JOINER.join(getAbsoluteNMWebAddress(nodeHttpAddress),
|
String resURI = JOINER.join(
|
||||||
|
LogWebServiceUtils.getAbsoluteNMWebAddress(conf, nodeHttpAddress),
|
||||||
NM_DOWNLOAD_URI_STR, uri);
|
NM_DOWNLOAD_URI_STR, uri);
|
||||||
String query = req.getQueryString();
|
String query = req.getQueryString();
|
||||||
if (query != null && !query.isEmpty()) {
|
if (query != null && !query.isEmpty()) {
|
||||||
|
@ -397,11 +389,11 @@ public class AHSWebServices extends WebServices {
|
||||||
try {
|
try {
|
||||||
containerId = ContainerId.fromString(containerIdStr);
|
containerId = ContainerId.fromString(containerIdStr);
|
||||||
} catch (IllegalArgumentException ex) {
|
} catch (IllegalArgumentException ex) {
|
||||||
return createBadResponse(Status.NOT_FOUND,
|
return LogWebServiceUtils.createBadResponse(Status.NOT_FOUND,
|
||||||
"Invalid ContainerId: " + containerIdStr);
|
"Invalid ContainerId: " + containerIdStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
final long length = parseLongParam(size);
|
final long length = LogWebServiceUtils.parseLongParam(size);
|
||||||
|
|
||||||
ApplicationId appId = containerId.getApplicationAttemptId()
|
ApplicationId appId = containerId.getApplicationAttemptId()
|
||||||
.getApplicationId();
|
.getApplicationId();
|
||||||
|
@ -410,17 +402,19 @@ public class AHSWebServices extends WebServices {
|
||||||
appInfo = super.getApp(req, res, appId.toString());
|
appInfo = super.getApp(req, res, appId.toString());
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
// directly find logs from HDFS.
|
// directly find logs from HDFS.
|
||||||
return sendStreamOutputResponse(appId, null, null, containerIdStr,
|
return LogWebServiceUtils
|
||||||
filename, format, length, false);
|
.sendStreamOutputResponse(factory, appId, null, null, containerIdStr,
|
||||||
|
filename, format, length, false);
|
||||||
}
|
}
|
||||||
String appOwner = appInfo.getUser();
|
String appOwner = appInfo.getUser();
|
||||||
if (isFinishedState(appInfo.getAppState())) {
|
if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
|
||||||
// directly find logs from HDFS.
|
// directly find logs from HDFS.
|
||||||
return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
|
return LogWebServiceUtils
|
||||||
filename, format, length, false);
|
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
||||||
|
containerIdStr, filename, format, length, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isRunningState(appInfo.getAppState())) {
|
if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
|
||||||
String nodeHttpAddress = null;
|
String nodeHttpAddress = null;
|
||||||
if (nmId != null && !nmId.isEmpty()) {
|
if (nmId != null && !nmId.isEmpty()) {
|
||||||
try {
|
try {
|
||||||
|
@ -440,8 +434,9 @@ public class AHSWebServices extends WebServices {
|
||||||
containerId.toString());
|
containerId.toString());
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
// output the aggregated logs
|
// output the aggregated logs
|
||||||
return sendStreamOutputResponse(appId, appOwner, null,
|
return LogWebServiceUtils
|
||||||
containerIdStr, filename, format, length, true);
|
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
||||||
|
containerIdStr, filename, format, length, true);
|
||||||
}
|
}
|
||||||
nodeHttpAddress = containerInfo.getNodeHttpAddress();
|
nodeHttpAddress = containerInfo.getNodeHttpAddress();
|
||||||
// make sure nodeHttpAddress is not null and not empty. Otherwise,
|
// make sure nodeHttpAddress is not null and not empty. Otherwise,
|
||||||
|
@ -452,12 +447,14 @@ public class AHSWebServices extends WebServices {
|
||||||
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
|
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
|
||||||
|| redirected_from_node) {
|
|| redirected_from_node) {
|
||||||
// output the aggregated logs
|
// output the aggregated logs
|
||||||
return sendStreamOutputResponse(appId, appOwner, null,
|
return LogWebServiceUtils
|
||||||
containerIdStr, filename, format, length, true);
|
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
||||||
|
containerIdStr, filename, format, length, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
String uri = "/" + containerId.toString() + "/logs/" + filename;
|
String uri = "/" + containerId.toString() + "/logs/" + filename;
|
||||||
String resURI = JOINER.join(getAbsoluteNMWebAddress(nodeHttpAddress),
|
String resURI = JOINER.join(
|
||||||
|
LogWebServiceUtils.getAbsoluteNMWebAddress(conf, nodeHttpAddress),
|
||||||
NM_DOWNLOAD_URI_STR, uri);
|
NM_DOWNLOAD_URI_STR, uri);
|
||||||
String query = req.getQueryString();
|
String query = req.getQueryString();
|
||||||
if (query != null && !query.isEmpty()) {
|
if (query != null && !query.isEmpty()) {
|
||||||
|
@ -468,170 +465,15 @@ public class AHSWebServices extends WebServices {
|
||||||
response.header("Location", resURI);
|
response.header("Location", resURI);
|
||||||
return response.build();
|
return response.build();
|
||||||
} else {
|
} else {
|
||||||
return createBadResponse(Status.NOT_FOUND,
|
return LogWebServiceUtils.createBadResponse(Status.NOT_FOUND,
|
||||||
"The application is not at Running or Finished State.");
|
"The application is not at Running or Finished State.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isRunningState(YarnApplicationState appState) {
|
@VisibleForTesting @InterfaceAudience.Private
|
||||||
return appState == YarnApplicationState.RUNNING;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isFinishedState(YarnApplicationState appState) {
|
|
||||||
return appState == YarnApplicationState.FINISHED
|
|
||||||
|| appState == YarnApplicationState.FAILED
|
|
||||||
|| appState == YarnApplicationState.KILLED;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Response createBadResponse(Status status, String errMessage) {
|
|
||||||
Response response = Response.status(status)
|
|
||||||
.entity(DOT_JOINER.join(status.toString(), errMessage)).build();
|
|
||||||
return response;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Response sendStreamOutputResponse(ApplicationId appId,
|
|
||||||
String appOwner, String nodeId, String containerIdStr,
|
|
||||||
String fileName, String format, long bytes,
|
|
||||||
boolean printEmptyLocalContainerLog) {
|
|
||||||
String contentType = WebAppUtils.getDefaultLogContentType();
|
|
||||||
if (format != null && !format.isEmpty()) {
|
|
||||||
contentType = WebAppUtils.getSupportedLogContentType(format);
|
|
||||||
if (contentType == null) {
|
|
||||||
String errorMessage = "The valid values for the parameter : format "
|
|
||||||
+ "are " + WebAppUtils.listSupportedLogContentType();
|
|
||||||
return Response.status(Status.BAD_REQUEST).entity(errorMessage)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
StreamingOutput stream = null;
|
|
||||||
try {
|
|
||||||
stream = getStreamingOutput(appId, appOwner, nodeId,
|
|
||||||
containerIdStr, fileName, bytes, printEmptyLocalContainerLog);
|
|
||||||
} catch (Exception ex) {
|
|
||||||
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
|
|
||||||
ex.getMessage());
|
|
||||||
}
|
|
||||||
ResponseBuilder response = Response.ok(stream);
|
|
||||||
response.header("Content-Type", contentType);
|
|
||||||
// Sending the X-Content-Type-Options response header with the value
|
|
||||||
// nosniff will prevent Internet Explorer from MIME-sniffing a response
|
|
||||||
// away from the declared content-type.
|
|
||||||
response.header("X-Content-Type-Options", "nosniff");
|
|
||||||
return response.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
private StreamingOutput getStreamingOutput(final ApplicationId appId,
|
|
||||||
final String appOwner, final String nodeId, final String containerIdStr,
|
|
||||||
final String logFile, final long bytes,
|
|
||||||
final boolean printEmptyLocalContainerLog) throws IOException{
|
|
||||||
StreamingOutput stream = new StreamingOutput() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void write(OutputStream os) throws IOException,
|
|
||||||
WebApplicationException {
|
|
||||||
ContainerLogsRequest request = new ContainerLogsRequest();
|
|
||||||
request.setAppId(appId);
|
|
||||||
request.setAppOwner(appOwner);
|
|
||||||
request.setContainerId(containerIdStr);
|
|
||||||
request.setBytes(bytes);
|
|
||||||
request.setNodeId(nodeId);
|
|
||||||
Set<String> logTypes = new HashSet<>();
|
|
||||||
logTypes.add(logFile);
|
|
||||||
request.setLogTypes(logTypes);
|
|
||||||
boolean findLogs = factory.getFileControllerForRead(appId, appOwner)
|
|
||||||
.readAggregatedLogs(request, os);
|
|
||||||
if (!findLogs) {
|
|
||||||
os.write(("Can not find logs for container:"
|
|
||||||
+ containerIdStr).getBytes(Charset.forName("UTF-8")));
|
|
||||||
} else {
|
|
||||||
if (printEmptyLocalContainerLog) {
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
sb.append(containerIdStr + "\n");
|
|
||||||
sb.append("LogAggregationType: "
|
|
||||||
+ ContainerLogAggregationType.LOCAL + "\n");
|
|
||||||
sb.append("LogContents:\n");
|
|
||||||
sb.append(getNoRedirectWarning() + "\n");
|
|
||||||
os.write(sb.toString().getBytes(Charset.forName("UTF-8")));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
return stream;
|
|
||||||
}
|
|
||||||
|
|
||||||
private long parseLongParam(String bytes) {
|
|
||||||
if (bytes == null || bytes.isEmpty()) {
|
|
||||||
return Long.MAX_VALUE;
|
|
||||||
}
|
|
||||||
return Long.parseLong(bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Response getContainerLogMeta(ApplicationId appId, String appOwner,
|
|
||||||
final String nodeId, final String containerIdStr,
|
|
||||||
boolean emptyLocalContainerLogMeta) {
|
|
||||||
try {
|
|
||||||
ContainerLogsRequest request = new ContainerLogsRequest();
|
|
||||||
request.setAppId(appId);
|
|
||||||
request.setAppOwner(appOwner);
|
|
||||||
request.setContainerId(containerIdStr);
|
|
||||||
request.setNodeId(nodeId);
|
|
||||||
List<ContainerLogMeta> containerLogMeta = factory
|
|
||||||
.getFileControllerForRead(appId, appOwner)
|
|
||||||
.readAggregatedLogsMeta(request);
|
|
||||||
if (containerLogMeta.isEmpty()) {
|
|
||||||
throw new NotFoundException(
|
|
||||||
"Can not get log meta for container: " + containerIdStr);
|
|
||||||
}
|
|
||||||
List<ContainerLogsInfo> containersLogsInfo = new ArrayList<>();
|
|
||||||
for (ContainerLogMeta meta : containerLogMeta) {
|
|
||||||
ContainerLogsInfo logInfo = new ContainerLogsInfo(meta,
|
|
||||||
ContainerLogAggregationType.AGGREGATED);
|
|
||||||
containersLogsInfo.add(logInfo);
|
|
||||||
}
|
|
||||||
if (emptyLocalContainerLogMeta) {
|
|
||||||
ContainerLogMeta emptyMeta = new ContainerLogMeta(
|
|
||||||
containerIdStr, "N/A");
|
|
||||||
ContainerLogsInfo empty = new ContainerLogsInfo(emptyMeta,
|
|
||||||
ContainerLogAggregationType.LOCAL);
|
|
||||||
containersLogsInfo.add(empty);
|
|
||||||
}
|
|
||||||
GenericEntity<List<ContainerLogsInfo>> meta = new GenericEntity<List<
|
|
||||||
ContainerLogsInfo>>(containersLogsInfo){};
|
|
||||||
ResponseBuilder response = Response.ok(meta);
|
|
||||||
// Sending the X-Content-Type-Options response header with the value
|
|
||||||
// nosniff will prevent Internet Explorer from MIME-sniffing a response
|
|
||||||
// away from the declared content-type.
|
|
||||||
response.header("X-Content-Type-Options", "nosniff");
|
|
||||||
return response.build();
|
|
||||||
} catch (Exception ex) {
|
|
||||||
throw new WebApplicationException(ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Private
|
|
||||||
@VisibleForTesting
|
|
||||||
public static String getNoRedirectWarning() {
|
|
||||||
return "We do not have NodeManager web address, so we can not "
|
|
||||||
+ "re-direct the request to related NodeManager "
|
|
||||||
+ "for local container logs.";
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getAbsoluteNMWebAddress(String nmWebAddress) {
|
|
||||||
if (nmWebAddress.contains(WebAppUtils.HTTP_PREFIX) ||
|
|
||||||
nmWebAddress.contains(WebAppUtils.HTTPS_PREFIX)) {
|
|
||||||
return nmWebAddress;
|
|
||||||
}
|
|
||||||
return WebAppUtils.getHttpSchemePrefix(conf) + nmWebAddress;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
@Private
|
|
||||||
public String getNMWebAddressFromRM(Configuration configuration,
|
public String getNMWebAddressFromRM(Configuration configuration,
|
||||||
String nodeId) throws ClientHandlerException,
|
String nodeId)
|
||||||
UniformInterfaceException, JSONException {
|
throws ClientHandlerException, UniformInterfaceException, JSONException {
|
||||||
JSONObject nodeInfo = YarnWebServiceUtils.getNodeInfoFromRMWebService(
|
return LogWebServiceUtils.getNMWebAddressFromRM(configuration, nodeId);
|
||||||
configuration, nodeId).getJSONObject("node");
|
|
||||||
return nodeInfo.has("nodeHTTPAddress") ?
|
|
||||||
nodeInfo.getString("nodeHTTPAddress") : null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
|
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
||||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.webapp.LogWebServiceUtils;
|
||||||
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
|
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
|
||||||
|
@ -774,7 +775,8 @@ public class TestAHSWebServices extends JerseyTestBase {
|
||||||
// the warning message.
|
// the warning message.
|
||||||
assertTrue(responseText.contains("LogAggregationType: "
|
assertTrue(responseText.contains("LogAggregationType: "
|
||||||
+ ContainerLogAggregationType.LOCAL));
|
+ ContainerLogAggregationType.LOCAL));
|
||||||
assertTrue(responseText.contains(AHSWebServices.getNoRedirectWarning()));
|
assertTrue(
|
||||||
|
responseText.contains(LogWebServiceUtils.getNoRedirectWarning()));
|
||||||
|
|
||||||
// If we can not container information from ATS, and we specify the NM id,
|
// If we can not container information from ATS, and we specify the NM id,
|
||||||
// but we can not get nm web address, we would still try to
|
// but we can not get nm web address, we would still try to
|
||||||
|
@ -790,7 +792,8 @@ public class TestAHSWebServices extends JerseyTestBase {
|
||||||
assertTrue(responseText.contains(content));
|
assertTrue(responseText.contains(content));
|
||||||
assertTrue(responseText.contains("LogAggregationType: "
|
assertTrue(responseText.contains("LogAggregationType: "
|
||||||
+ ContainerLogAggregationType.LOCAL));
|
+ ContainerLogAggregationType.LOCAL));
|
||||||
assertTrue(responseText.contains(AHSWebServices.getNoRedirectWarning()));
|
assertTrue(
|
||||||
|
responseText.contains(LogWebServiceUtils.getNoRedirectWarning()));
|
||||||
|
|
||||||
// If this is the redirect request, we would not re-direct the request
|
// If this is the redirect request, we would not re-direct the request
|
||||||
// back and get the aggregated logs.
|
// back and get the aggregated logs.
|
||||||
|
|
|
@ -0,0 +1,506 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.webapp;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import com.google.inject.Singleton;
|
||||||
|
import com.sun.jersey.api.client.Client;
|
||||||
|
import com.sun.jersey.api.client.ClientResponse;
|
||||||
|
import com.sun.jersey.api.client.config.ClientConfig;
|
||||||
|
import com.sun.jersey.api.client.config.DefaultClientConfig;
|
||||||
|
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
|
||||||
|
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
|
||||||
|
import com.sun.jersey.core.util.MultivaluedMapImpl;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.http.JettyUtils;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
||||||
|
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
||||||
|
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||||
|
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
||||||
|
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
||||||
|
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||||
|
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
||||||
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
import javax.ws.rs.DefaultValue;
|
||||||
|
import javax.ws.rs.GET;
|
||||||
|
import javax.ws.rs.Path;
|
||||||
|
import javax.ws.rs.PathParam;
|
||||||
|
import javax.ws.rs.Produces;
|
||||||
|
import javax.ws.rs.QueryParam;
|
||||||
|
import javax.ws.rs.core.Context;
|
||||||
|
import javax.ws.rs.core.MediaType;
|
||||||
|
import javax.ws.rs.core.MultivaluedMap;
|
||||||
|
import javax.ws.rs.core.Response;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.HttpURLConnection;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Support only ATSv2 client only.
|
||||||
|
*/
|
||||||
|
@Singleton @Path("/ws/v2/applicationlog") public class LogWebService {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(LogWebService.class);
|
||||||
|
private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
|
||||||
|
private static final String NM_DOWNLOAD_URI_STR = "/ws/v1/node/containers";
|
||||||
|
private static final Joiner JOINER = Joiner.on("");
|
||||||
|
private static Configuration yarnConf = new YarnConfiguration();
|
||||||
|
private static LogAggregationFileControllerFactory factory;
|
||||||
|
private static String base;
|
||||||
|
private static String defaultClusterid;
|
||||||
|
private volatile Client webTimelineClient;
|
||||||
|
|
||||||
|
static {
|
||||||
|
init();
|
||||||
|
}
|
||||||
|
|
||||||
|
// initialize all the common resources - order is important
|
||||||
|
private static void init() {
|
||||||
|
factory = new LogAggregationFileControllerFactory(yarnConf);
|
||||||
|
base = JOINER.join(WebAppUtils.getHttpSchemePrefix(yarnConf),
|
||||||
|
WebAppUtils.getTimelineReaderWebAppURLWithoutScheme(yarnConf),
|
||||||
|
RESOURCE_URI_STR_V2);
|
||||||
|
defaultClusterid = yarnConf.get(YarnConfiguration.RM_CLUSTER_ID,
|
||||||
|
YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
|
||||||
|
LOG.info("Initialized LogWeService with clusterid " + defaultClusterid
|
||||||
|
+ " for URI: " + base);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Client createTimelineWebClient() {
|
||||||
|
ClientConfig cfg = new DefaultClientConfig();
|
||||||
|
cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
|
||||||
|
Client client = new Client(
|
||||||
|
new URLConnectionClientHandler(new HttpURLConnectionFactory() {
|
||||||
|
@Override public HttpURLConnection getHttpURLConnection(URL url)
|
||||||
|
throws IOException {
|
||||||
|
AuthenticatedURL.Token token = new AuthenticatedURL.Token();
|
||||||
|
HttpURLConnection conn = null;
|
||||||
|
try {
|
||||||
|
conn = new AuthenticatedURL().openConnection(url, token);
|
||||||
|
LOG.info("LogWeService:Connecetion created.");
|
||||||
|
} catch (AuthenticationException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
return conn;
|
||||||
|
}
|
||||||
|
}), cfg);
|
||||||
|
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initForReadableEndpoints(HttpServletResponse response) {
|
||||||
|
// clear content type
|
||||||
|
response.setContentType(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns log file's name as well as current file size for a container.
|
||||||
|
*
|
||||||
|
* @param req HttpServletRequest
|
||||||
|
* @param res HttpServletResponse
|
||||||
|
* @param containerIdStr The container ID
|
||||||
|
* @param nmId The Node Manager NodeId
|
||||||
|
* @param redirectedFromNode Whether this is a redirected request from NM
|
||||||
|
* @return The log file's name and current file size
|
||||||
|
*/
|
||||||
|
@GET @Path("/containers/{containerid}/logs")
|
||||||
|
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
||||||
|
public Response getContainerLogsInfo(@Context HttpServletRequest req,
|
||||||
|
@Context HttpServletResponse res,
|
||||||
|
@PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
|
||||||
|
@QueryParam(YarnWebServiceParams.NM_ID) String nmId,
|
||||||
|
@QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
|
||||||
|
@DefaultValue("false") boolean redirectedFromNode,
|
||||||
|
@QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) {
|
||||||
|
ContainerId containerId = null;
|
||||||
|
initForReadableEndpoints(res);
|
||||||
|
try {
|
||||||
|
containerId = ContainerId.fromString(containerIdStr);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
throw new BadRequestException("invalid container id, " + containerIdStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
ApplicationId appId =
|
||||||
|
containerId.getApplicationAttemptId().getApplicationId();
|
||||||
|
AppInfo appInfo;
|
||||||
|
try {
|
||||||
|
appInfo = getApp(req, appId.toString(), clusterId);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
// directly find logs from HDFS.
|
||||||
|
return LogWebServiceUtils
|
||||||
|
.getContainerLogMeta(factory, appId, null, null, containerIdStr,
|
||||||
|
false);
|
||||||
|
}
|
||||||
|
// if the application finishes, directly find logs
|
||||||
|
// from HDFS.
|
||||||
|
if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
|
||||||
|
return LogWebServiceUtils
|
||||||
|
.getContainerLogMeta(factory, appId, null, null, containerIdStr,
|
||||||
|
false);
|
||||||
|
}
|
||||||
|
if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
|
||||||
|
String appOwner = appInfo.getUser();
|
||||||
|
String nodeHttpAddress = null;
|
||||||
|
if (nmId != null && !nmId.isEmpty()) {
|
||||||
|
try {
|
||||||
|
nodeHttpAddress =
|
||||||
|
LogWebServiceUtils.getNMWebAddressFromRM(yarnConf, nmId);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(ex.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
|
||||||
|
ContainerInfo containerInfo;
|
||||||
|
try {
|
||||||
|
containerInfo =
|
||||||
|
getContainer(req, appId.toString(), containerId.toString(),
|
||||||
|
clusterId);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
// return log meta for the aggregated logs if exists.
|
||||||
|
// It will also return empty log meta for the local logs.
|
||||||
|
return LogWebServiceUtils
|
||||||
|
.getContainerLogMeta(factory, appId, appOwner, null,
|
||||||
|
containerIdStr, true);
|
||||||
|
}
|
||||||
|
nodeHttpAddress = containerInfo.getNodeHttpAddress();
|
||||||
|
// make sure nodeHttpAddress is not null and not empty. Otherwise,
|
||||||
|
// we would only get log meta for aggregated logs instead of
|
||||||
|
// re-directing the request
|
||||||
|
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
|
||||||
|
|| redirectedFromNode) {
|
||||||
|
// return log meta for the aggregated logs if exists.
|
||||||
|
// It will also return empty log meta for the local logs.
|
||||||
|
// If this is the redirect request from NM, we should not
|
||||||
|
// re-direct the request back. Simply output the aggregated log meta.
|
||||||
|
return LogWebServiceUtils
|
||||||
|
.getContainerLogMeta(factory, appId, appOwner, null,
|
||||||
|
containerIdStr, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String uri = "/" + containerId.toString() + "/logs";
|
||||||
|
String resURI = JOINER.join(
|
||||||
|
LogWebServiceUtils.getAbsoluteNMWebAddress(yarnConf, nodeHttpAddress),
|
||||||
|
NM_DOWNLOAD_URI_STR, uri);
|
||||||
|
String query = req.getQueryString();
|
||||||
|
if (query != null && !query.isEmpty()) {
|
||||||
|
resURI += "?" + query;
|
||||||
|
}
|
||||||
|
Response.ResponseBuilder response =
|
||||||
|
Response.status(HttpServletResponse.SC_TEMPORARY_REDIRECT);
|
||||||
|
response.header("Location", resURI);
|
||||||
|
return response.build();
|
||||||
|
} else {
|
||||||
|
throw new NotFoundException(
|
||||||
|
"The application is not at Running or Finished State.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ContainerInfo getContainer(HttpServletRequest req, String appId,
|
||||||
|
String containerId, String clusterId) {
|
||||||
|
UserGroupInformation callerUGI = LogWebServiceUtils.getUser(req);
|
||||||
|
String cId = clusterId != null ? clusterId : defaultClusterid;
|
||||||
|
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
|
||||||
|
params.add("fields", "INFO");
|
||||||
|
String path = JOINER.join("clusters/", cId, "/apps/", appId, "/entities/",
|
||||||
|
TimelineEntityType.YARN_CONTAINER.toString(), "/", containerId);
|
||||||
|
TimelineEntity conEntity = null;
|
||||||
|
try {
|
||||||
|
if (callerUGI == null) {
|
||||||
|
conEntity = getEntity(path, params);
|
||||||
|
} else {
|
||||||
|
setUserName(params, callerUGI.getShortUserName());
|
||||||
|
conEntity =
|
||||||
|
callerUGI.doAs(new PrivilegedExceptionAction<TimelineEntity>() {
|
||||||
|
@Override public TimelineEntity run() throws Exception {
|
||||||
|
return getEntity(path, params);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LogWebServiceUtils.rewrapAndThrowException(e);
|
||||||
|
}
|
||||||
|
if (conEntity == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
String nodeHttpAddress = (String) conEntity.getInfo()
|
||||||
|
.get(ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO);
|
||||||
|
|
||||||
|
ContainerInfo info = new ContainerInfo(nodeHttpAddress);
|
||||||
|
return info;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected AppInfo getApp(HttpServletRequest req, String appId,
|
||||||
|
String clusterId) {
|
||||||
|
UserGroupInformation callerUGI = LogWebServiceUtils.getUser(req);
|
||||||
|
|
||||||
|
String cId = clusterId != null ? clusterId : defaultClusterid;
|
||||||
|
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
|
||||||
|
params.add("fields", "INFO");
|
||||||
|
String path = JOINER.join("clusters/", cId, "/apps/", appId);
|
||||||
|
TimelineEntity appEntity = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (callerUGI == null) {
|
||||||
|
appEntity = getEntity(path, params);
|
||||||
|
} else {
|
||||||
|
setUserName(params, callerUGI.getShortUserName());
|
||||||
|
appEntity =
|
||||||
|
callerUGI.doAs(new PrivilegedExceptionAction<TimelineEntity>() {
|
||||||
|
@Override public TimelineEntity run() throws Exception {
|
||||||
|
return getEntity(path, params);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LogWebServiceUtils.rewrapAndThrowException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (appEntity == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
String appOwner = (String) appEntity.getInfo()
|
||||||
|
.get(ApplicationMetricsConstants.USER_ENTITY_INFO);
|
||||||
|
String state = (String) appEntity.getInfo()
|
||||||
|
.get(ApplicationMetricsConstants.STATE_EVENT_INFO);
|
||||||
|
YarnApplicationState appState = YarnApplicationState.valueOf(state);
|
||||||
|
AppInfo info = new AppInfo(appState, appOwner);
|
||||||
|
return info;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the contents of a container's log file in plain text.
|
||||||
|
*
|
||||||
|
* @param req HttpServletRequest
|
||||||
|
* @param res HttpServletResponse
|
||||||
|
* @param containerIdStr The container ID
|
||||||
|
* @param filename The name of the log file
|
||||||
|
* @param format The content type
|
||||||
|
* @param size the size of the log file
|
||||||
|
* @param nmId The Node Manager NodeId
|
||||||
|
* @param redirectedFromNode Whether this is the redirect request from NM
|
||||||
|
* @return The contents of the container's log file
|
||||||
|
*/
|
||||||
|
@GET @Path("/containers/{containerid}/logs/{filename}")
|
||||||
|
@Produces({ MediaType.TEXT_PLAIN }) @InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Unstable public Response getContainerLogFile(
|
||||||
|
@Context HttpServletRequest req, @Context HttpServletResponse res,
|
||||||
|
@PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
|
||||||
|
@PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String filename,
|
||||||
|
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_FORMAT) String format,
|
||||||
|
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_SIZE) String size,
|
||||||
|
@QueryParam(YarnWebServiceParams.NM_ID) String nmId,
|
||||||
|
@QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
|
||||||
|
boolean redirectedFromNode,
|
||||||
|
@QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) {
|
||||||
|
return getLogs(req, res, containerIdStr, filename, format, size, nmId,
|
||||||
|
redirectedFromNode, clusterId);
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO: YARN-4993: Refactory ContainersLogsBlock, AggregatedLogsBlock and
|
||||||
|
// container log webservice introduced in AHS to minimize
|
||||||
|
// the duplication.
|
||||||
|
@GET @Path("/containerlogs/{containerid}/{filename}")
|
||||||
|
@Produces({ MediaType.TEXT_PLAIN + "; " + JettyUtils.UTF_8 })
|
||||||
|
@InterfaceAudience.Public @InterfaceStability.Unstable
|
||||||
|
public Response getLogs(@Context HttpServletRequest req,
|
||||||
|
@Context HttpServletResponse res,
|
||||||
|
@PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
|
||||||
|
@PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String filename,
|
||||||
|
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_FORMAT) String format,
|
||||||
|
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_SIZE) String size,
|
||||||
|
@QueryParam(YarnWebServiceParams.NM_ID) String nmId,
|
||||||
|
@QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
|
||||||
|
@DefaultValue("false") boolean redirectedFromNode,
|
||||||
|
@QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) {
|
||||||
|
initForReadableEndpoints(res);
|
||||||
|
ContainerId containerId;
|
||||||
|
try {
|
||||||
|
containerId = ContainerId.fromString(containerIdStr);
|
||||||
|
} catch (IllegalArgumentException ex) {
|
||||||
|
return LogWebServiceUtils.createBadResponse(Response.Status.NOT_FOUND,
|
||||||
|
"Invalid ContainerId: " + containerIdStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
final long length = LogWebServiceUtils.parseLongParam(size);
|
||||||
|
|
||||||
|
ApplicationId appId =
|
||||||
|
containerId.getApplicationAttemptId().getApplicationId();
|
||||||
|
AppInfo appInfo;
|
||||||
|
try {
|
||||||
|
appInfo = getApp(req, appId.toString(), clusterId);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
// directly find logs from HDFS.
|
||||||
|
return LogWebServiceUtils
|
||||||
|
.sendStreamOutputResponse(factory, appId, null, null, containerIdStr,
|
||||||
|
filename, format, length, false);
|
||||||
|
}
|
||||||
|
String appOwner = appInfo.getUser();
|
||||||
|
if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
|
||||||
|
// directly find logs from HDFS.
|
||||||
|
return LogWebServiceUtils
|
||||||
|
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
||||||
|
containerIdStr, filename, format, length, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
|
||||||
|
String nodeHttpAddress = null;
|
||||||
|
if (nmId != null && !nmId.isEmpty()) {
|
||||||
|
try {
|
||||||
|
nodeHttpAddress =
|
||||||
|
LogWebServiceUtils.getNMWebAddressFromRM(yarnConf, nmId);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(ex.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
|
||||||
|
ContainerInfo containerInfo;
|
||||||
|
try {
|
||||||
|
containerInfo =
|
||||||
|
getContainer(req, appId.toString(), containerId.toString(),
|
||||||
|
clusterId);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
// output the aggregated logs
|
||||||
|
return LogWebServiceUtils
|
||||||
|
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
||||||
|
containerIdStr, filename, format, length, true);
|
||||||
|
}
|
||||||
|
nodeHttpAddress = containerInfo.getNodeHttpAddress();
|
||||||
|
// make sure nodeHttpAddress is not null and not empty. Otherwise,
|
||||||
|
// we would only get aggregated logs instead of re-directing the
|
||||||
|
// request.
|
||||||
|
// If this is the redirect request from NM, we should not re-direct the
|
||||||
|
// request back. Simply output the aggregated logs.
|
||||||
|
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
|
||||||
|
|| redirectedFromNode) {
|
||||||
|
// output the aggregated logs
|
||||||
|
return LogWebServiceUtils
|
||||||
|
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
||||||
|
containerIdStr, filename, format, length, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String uri = "/" + containerId.toString() + "/logs/" + filename;
|
||||||
|
String resURI = JOINER.join(
|
||||||
|
LogWebServiceUtils.getAbsoluteNMWebAddress(yarnConf, nodeHttpAddress),
|
||||||
|
NM_DOWNLOAD_URI_STR, uri);
|
||||||
|
String query = req.getQueryString();
|
||||||
|
if (query != null && !query.isEmpty()) {
|
||||||
|
resURI += "?" + query;
|
||||||
|
}
|
||||||
|
Response.ResponseBuilder response =
|
||||||
|
Response.status(HttpServletResponse.SC_TEMPORARY_REDIRECT);
|
||||||
|
response.header("Location", resURI);
|
||||||
|
return response.build();
|
||||||
|
} else {
|
||||||
|
return LogWebServiceUtils.createBadResponse(Response.Status.NOT_FOUND,
|
||||||
|
"The application is not at Running or Finished State.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static class AppInfo {
|
||||||
|
private YarnApplicationState appState;
|
||||||
|
private String user;
|
||||||
|
|
||||||
|
AppInfo(YarnApplicationState appState, String user) {
|
||||||
|
this.appState = appState;
|
||||||
|
this.user = user;
|
||||||
|
}
|
||||||
|
|
||||||
|
public YarnApplicationState getAppState() {
|
||||||
|
return this.appState;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getUser() {
|
||||||
|
return this.user;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static class ContainerInfo {
|
||||||
|
private String nodeHttpAddress;
|
||||||
|
|
||||||
|
ContainerInfo(String nodeHttpAddress) {
|
||||||
|
this.nodeHttpAddress = nodeHttpAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getNodeHttpAddress() {
|
||||||
|
return nodeHttpAddress;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting protected TimelineEntity getEntity(String path,
|
||||||
|
MultivaluedMap<String, String> params) throws IOException {
|
||||||
|
ClientResponse resp =
|
||||||
|
getClient().resource(base).path(path).queryParams(params)
|
||||||
|
.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON)
|
||||||
|
.get(ClientResponse.class);
|
||||||
|
if (resp == null
|
||||||
|
|| resp.getStatusInfo().getStatusCode() != ClientResponse.Status.OK
|
||||||
|
.getStatusCode()) {
|
||||||
|
String msg =
|
||||||
|
"Response from the timeline reader server is " + ((resp == null) ?
|
||||||
|
"null" :
|
||||||
|
"not successful," + " HTTP error code: " + resp.getStatus()
|
||||||
|
+ ", Server response:\n" + resp.getEntity(String.class));
|
||||||
|
LOG.error(msg);
|
||||||
|
throw new IOException(msg);
|
||||||
|
}
|
||||||
|
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
|
||||||
|
return entity;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Client getClient() {
|
||||||
|
if (webTimelineClient == null) {
|
||||||
|
synchronized (LogWebService.class) {
|
||||||
|
if (webTimelineClient == null) {
|
||||||
|
webTimelineClient = createTimelineWebClient();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return webTimelineClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set user.name in non-secure mode to delegate to next rest call.
|
||||||
|
*/
|
||||||
|
private void setUserName(MultivaluedMap<String, String> params, String user) {
|
||||||
|
if (!UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
params.add("user.name", user);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,258 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.webapp;
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import com.sun.jersey.api.client.ClientHandlerException;
|
||||||
|
import com.sun.jersey.api.client.UniformInterfaceException;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
|
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
|
||||||
|
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
||||||
|
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
|
||||||
|
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
||||||
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
|
||||||
|
import org.apache.hadoop.yarn.webapp.ForbiddenException;
|
||||||
|
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||||
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
|
import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils;
|
||||||
|
import org.codehaus.jettison.json.JSONException;
|
||||||
|
import org.codehaus.jettison.json.JSONObject;
|
||||||
|
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.ws.rs.WebApplicationException;
|
||||||
|
import javax.ws.rs.core.GenericEntity;
|
||||||
|
import javax.ws.rs.core.Response;
|
||||||
|
import javax.ws.rs.core.StreamingOutput;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.lang.reflect.UndeclaredThrowableException;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Log web service utils class.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public final class LogWebServiceUtils {
|
||||||
|
|
||||||
|
private LogWebServiceUtils() {
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final Joiner DOT_JOINER = Joiner.on(". ");
|
||||||
|
|
||||||
|
public static Response getContainerLogMeta(
|
||||||
|
LogAggregationFileControllerFactory factory, ApplicationId appId,
|
||||||
|
String appOwner, final String nodeId, final String containerIdStr,
|
||||||
|
boolean emptyLocalContainerLogMeta) {
|
||||||
|
try {
|
||||||
|
ContainerLogsRequest request = new ContainerLogsRequest();
|
||||||
|
request.setAppId(appId);
|
||||||
|
request.setAppOwner(appOwner);
|
||||||
|
request.setContainerId(containerIdStr);
|
||||||
|
request.setNodeId(nodeId);
|
||||||
|
List<ContainerLogMeta> containerLogMeta =
|
||||||
|
factory.getFileControllerForRead(appId, appOwner)
|
||||||
|
.readAggregatedLogsMeta(request);
|
||||||
|
if (containerLogMeta.isEmpty()) {
|
||||||
|
throw new NotFoundException(
|
||||||
|
"Can not get log meta for container: " + containerIdStr);
|
||||||
|
}
|
||||||
|
List<ContainerLogsInfo> containersLogsInfo = new ArrayList<>();
|
||||||
|
for (ContainerLogMeta meta : containerLogMeta) {
|
||||||
|
ContainerLogsInfo logInfo =
|
||||||
|
new ContainerLogsInfo(meta, ContainerLogAggregationType.AGGREGATED);
|
||||||
|
containersLogsInfo.add(logInfo);
|
||||||
|
}
|
||||||
|
if (emptyLocalContainerLogMeta) {
|
||||||
|
ContainerLogMeta emptyMeta =
|
||||||
|
new ContainerLogMeta(containerIdStr, "N/A");
|
||||||
|
ContainerLogsInfo empty =
|
||||||
|
new ContainerLogsInfo(emptyMeta, ContainerLogAggregationType.LOCAL);
|
||||||
|
containersLogsInfo.add(empty);
|
||||||
|
}
|
||||||
|
GenericEntity<List<ContainerLogsInfo>> meta =
|
||||||
|
new GenericEntity<List<ContainerLogsInfo>>(containersLogsInfo) {
|
||||||
|
};
|
||||||
|
Response.ResponseBuilder response = Response.ok(meta);
|
||||||
|
// Sending the X-Content-Type-Options response header with the value
|
||||||
|
// nosniff will prevent Internet Explorer from MIME-sniffing a response
|
||||||
|
// away from the declared content-type.
|
||||||
|
response.header("X-Content-Type-Options", "nosniff");
|
||||||
|
return response.build();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
throw new WebApplicationException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Response sendStreamOutputResponse(
|
||||||
|
LogAggregationFileControllerFactory factory, ApplicationId appId,
|
||||||
|
String appOwner, String nodeId, String containerIdStr, String fileName,
|
||||||
|
String format, long bytes, boolean printEmptyLocalContainerLog) {
|
||||||
|
String contentType = WebAppUtils.getDefaultLogContentType();
|
||||||
|
if (format != null && !format.isEmpty()) {
|
||||||
|
contentType = WebAppUtils.getSupportedLogContentType(format);
|
||||||
|
if (contentType == null) {
|
||||||
|
String errorMessage =
|
||||||
|
"The valid values for the parameter : format " + "are "
|
||||||
|
+ WebAppUtils.listSupportedLogContentType();
|
||||||
|
return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
StreamingOutput stream = null;
|
||||||
|
try {
|
||||||
|
stream =
|
||||||
|
getStreamingOutput(factory, appId, appOwner, nodeId, containerIdStr,
|
||||||
|
fileName, bytes, printEmptyLocalContainerLog);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
return createBadResponse(Response.Status.INTERNAL_SERVER_ERROR,
|
||||||
|
ex.getMessage());
|
||||||
|
}
|
||||||
|
Response.ResponseBuilder response = Response.ok(stream);
|
||||||
|
response.header("Content-Type", contentType);
|
||||||
|
// Sending the X-Content-Type-Options response header with the value
|
||||||
|
// nosniff will prevent Internet Explorer from MIME-sniffing a response
|
||||||
|
// away from the declared content-type.
|
||||||
|
response.header("X-Content-Type-Options", "nosniff");
|
||||||
|
return response.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static StreamingOutput getStreamingOutput(
|
||||||
|
final LogAggregationFileControllerFactory factory,
|
||||||
|
final ApplicationId appId, final String appOwner, final String nodeId,
|
||||||
|
final String containerIdStr, final String logFile, final long bytes,
|
||||||
|
final boolean printEmptyLocalContainerLog) throws IOException {
|
||||||
|
StreamingOutput stream = new StreamingOutput() {
|
||||||
|
|
||||||
|
@Override public void write(OutputStream os)
|
||||||
|
throws IOException, WebApplicationException {
|
||||||
|
ContainerLogsRequest request = new ContainerLogsRequest();
|
||||||
|
request.setAppId(appId);
|
||||||
|
request.setAppOwner(appOwner);
|
||||||
|
request.setContainerId(containerIdStr);
|
||||||
|
request.setBytes(bytes);
|
||||||
|
request.setNodeId(nodeId);
|
||||||
|
Set<String> logTypes = new HashSet<>();
|
||||||
|
logTypes.add(logFile);
|
||||||
|
request.setLogTypes(logTypes);
|
||||||
|
boolean findLogs = factory.getFileControllerForRead(appId, appOwner)
|
||||||
|
.readAggregatedLogs(request, os);
|
||||||
|
if (!findLogs) {
|
||||||
|
os.write(("Can not find logs for container:" + containerIdStr)
|
||||||
|
.getBytes(Charset.forName("UTF-8")));
|
||||||
|
} else {
|
||||||
|
if (printEmptyLocalContainerLog) {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append(containerIdStr + "\n");
|
||||||
|
sb.append("LogAggregationType: " + ContainerLogAggregationType.LOCAL
|
||||||
|
+ "\n");
|
||||||
|
sb.append("LogContents:\n");
|
||||||
|
sb.append(getNoRedirectWarning() + "\n");
|
||||||
|
os.write(sb.toString().getBytes(Charset.forName("UTF-8")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
return stream;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getNoRedirectWarning() {
|
||||||
|
return "We do not have NodeManager web address, so we can not "
|
||||||
|
+ "re-direct the request to related NodeManager "
|
||||||
|
+ "for local container logs.";
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void rewrapAndThrowException(Exception e) {
|
||||||
|
if (e instanceof UndeclaredThrowableException) {
|
||||||
|
rewrapAndThrowThrowable(e.getCause());
|
||||||
|
} else {
|
||||||
|
rewrapAndThrowThrowable(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void rewrapAndThrowThrowable(Throwable t) {
|
||||||
|
if (t instanceof AuthorizationException) {
|
||||||
|
throw new ForbiddenException(t);
|
||||||
|
} else {
|
||||||
|
throw new WebApplicationException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long parseLongParam(String bytes) {
|
||||||
|
if (bytes == null || bytes.isEmpty()) {
|
||||||
|
return Long.MAX_VALUE;
|
||||||
|
}
|
||||||
|
return Long.parseLong(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Response createBadResponse(Response.Status status,
|
||||||
|
String errMessage) {
|
||||||
|
Response response = Response.status(status)
|
||||||
|
.entity(DOT_JOINER.join(status.toString(), errMessage)).build();
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isRunningState(YarnApplicationState appState) {
|
||||||
|
return appState == YarnApplicationState.RUNNING;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isFinishedState(YarnApplicationState appState) {
|
||||||
|
return appState == YarnApplicationState.FINISHED
|
||||||
|
|| appState == YarnApplicationState.FAILED
|
||||||
|
|| appState == YarnApplicationState.KILLED;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static UserGroupInformation getUser(HttpServletRequest req) {
|
||||||
|
String remoteUser = req.getRemoteUser();
|
||||||
|
UserGroupInformation callerUGI = null;
|
||||||
|
if (remoteUser != null) {
|
||||||
|
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
|
||||||
|
}
|
||||||
|
return callerUGI;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getNMWebAddressFromRM(Configuration yarnConf,
|
||||||
|
String nodeId)
|
||||||
|
throws ClientHandlerException, UniformInterfaceException, JSONException {
|
||||||
|
JSONObject nodeInfo =
|
||||||
|
YarnWebServiceUtils.getNodeInfoFromRMWebService(yarnConf, nodeId)
|
||||||
|
.getJSONObject("node");
|
||||||
|
return nodeInfo.has("nodeHTTPAddress") ?
|
||||||
|
nodeInfo.getString("nodeHTTPAddress") : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getAbsoluteNMWebAddress(Configuration yarnConf,
|
||||||
|
String nmWebAddress) {
|
||||||
|
if (nmWebAddress.contains(WebAppUtils.HTTP_PREFIX) || nmWebAddress
|
||||||
|
.contains(WebAppUtils.HTTPS_PREFIX)) {
|
||||||
|
return nmWebAddress;
|
||||||
|
}
|
||||||
|
return WebAppUtils.getHttpSchemePrefix(yarnConf) + nmWebAddress;
|
||||||
|
}
|
||||||
|
}
|
|
@ -35,4 +35,5 @@ public interface YarnWebServiceParams {
|
||||||
String RESPONSE_CONTENT_SIZE = "size";
|
String RESPONSE_CONTENT_SIZE = "size";
|
||||||
String NM_ID = "nm.id";
|
String NM_ID = "nm.id";
|
||||||
String REDIRECTED_FROM_NODE = "redirected_from_node";
|
String REDIRECTED_FROM_NODE = "redirected_from_node";
|
||||||
|
String CLUSTER_ID = "clusterid";
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,126 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.webapp;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||||
|
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||||
|
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.ws.rs.core.MultivaluedMap;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* test class for log web service.
|
||||||
|
*/
|
||||||
|
public class TestLogWebService {
|
||||||
|
|
||||||
|
private HttpServletRequest request;
|
||||||
|
private LogWebServiceTest logWebService;
|
||||||
|
private static TimelineEntity entity;
|
||||||
|
private ApplicationId appId;
|
||||||
|
private ContainerId cId;
|
||||||
|
private String user = "user1";
|
||||||
|
private Map<String, TimelineEntity> entities;
|
||||||
|
private String nodeHttpAddress = "localhost:0";
|
||||||
|
|
||||||
|
@Before public void setup() throws Exception {
|
||||||
|
appId = ApplicationId.fromString("application_1518143905142_509690");
|
||||||
|
cId =
|
||||||
|
ContainerId.fromString("container_e138_1518143905142_509690_01_000001");
|
||||||
|
entities = new HashMap<>();
|
||||||
|
generateEntity();
|
||||||
|
request = Mockito.mock(HttpServletRequest.class);
|
||||||
|
Mockito.when(request.getRemoteUser())
|
||||||
|
.thenReturn(System.getProperty("user.name"));
|
||||||
|
logWebService = new LogWebServiceTest();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testGetApp() {
|
||||||
|
|
||||||
|
LogWebService.AppInfo app =
|
||||||
|
logWebService.getApp(request, appId.toString(), null);
|
||||||
|
Assert.assertEquals("RUNNING", app.getAppState().toString());
|
||||||
|
Assert.assertEquals(user, app.getUser());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testGetContainer() {
|
||||||
|
LogWebService.ContainerInfo container = logWebService
|
||||||
|
.getContainer(request, appId.toString(), cId.toString(), null);
|
||||||
|
Assert.assertEquals(nodeHttpAddress, container.getNodeHttpAddress());
|
||||||
|
}
|
||||||
|
|
||||||
|
class LogWebServiceTest extends LogWebService {
|
||||||
|
|
||||||
|
@Override protected TimelineEntity getEntity(String path,
|
||||||
|
MultivaluedMap<String, String> params) throws IOException {
|
||||||
|
if (path.endsWith(cId.toString())) {
|
||||||
|
return entities.get(cId.toString());
|
||||||
|
} else if (path.endsWith(appId.toString())) {
|
||||||
|
return entities.get(appId.toString());
|
||||||
|
} else {
|
||||||
|
throw new IOException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void generateEntity() {
|
||||||
|
createAppEntities();
|
||||||
|
createContainerEntities();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createContainerEntities() {
|
||||||
|
TimelineEntity timelineEntity =
|
||||||
|
generateEntity(TimelineEntityType.YARN_APPLICATION.toString(),
|
||||||
|
appId.toString());
|
||||||
|
timelineEntity.addInfo(ApplicationMetricsConstants.USER_ENTITY_INFO, user);
|
||||||
|
timelineEntity
|
||||||
|
.addInfo(ApplicationMetricsConstants.STATE_EVENT_INFO, "RUNNING");
|
||||||
|
entities.put(appId.toString(), timelineEntity);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createAppEntities() {
|
||||||
|
TimelineEntity timelineEntity =
|
||||||
|
generateEntity(TimelineEntityType.YARN_CONTAINER.toString(),
|
||||||
|
cId.toString());
|
||||||
|
timelineEntity
|
||||||
|
.addInfo(ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO,
|
||||||
|
nodeHttpAddress);
|
||||||
|
entities.put(cId.toString(), timelineEntity);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TimelineEntity generateEntity(String entityType,
|
||||||
|
String entityId) {
|
||||||
|
TimelineEntity timelineEntity = new TimelineEntity();
|
||||||
|
timelineEntity.setId(entityId);
|
||||||
|
timelineEntity.setType(entityType);
|
||||||
|
timelineEntity.setCreatedTime(System.currentTimeMillis());
|
||||||
|
return timelineEntity;
|
||||||
|
}
|
||||||
|
}
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.security.TimelineRea
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.reader.security.TimelineReaderWhitelistAuthorizationFilterInitializer;
|
import org.apache.hadoop.yarn.server.timelineservice.reader.security.TimelineReaderWhitelistAuthorizationFilterInitializer;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
|
||||||
import org.apache.hadoop.yarn.server.util.timeline.TimelineServerUtils;
|
import org.apache.hadoop.yarn.server.util.timeline.TimelineServerUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.webapp.LogWebService;
|
||||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||||
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
||||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
|
@ -201,7 +202,8 @@ public class TimelineReaderServer extends CompositeService {
|
||||||
readerWebServer.addJerseyResourcePackage(
|
readerWebServer.addJerseyResourcePackage(
|
||||||
TimelineReaderWebServices.class.getPackage().getName() + ";"
|
TimelineReaderWebServices.class.getPackage().getName() + ";"
|
||||||
+ GenericExceptionHandler.class.getPackage().getName() + ";"
|
+ GenericExceptionHandler.class.getPackage().getName() + ";"
|
||||||
+ YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
|
+ YarnJacksonJaxbJsonProvider.class.getPackage().getName()+ ";"
|
||||||
|
+ LogWebService.class.getPackage().getName(),
|
||||||
"/*");
|
"/*");
|
||||||
readerWebServer.setAttribute(TIMELINE_READER_MANAGER_ATTR,
|
readerWebServer.setAttribute(TIMELINE_READER_MANAGER_ATTR,
|
||||||
timelineReaderManager);
|
timelineReaderManager);
|
||||||
|
|
Loading…
Reference in New Issue