YARN-5742 Serve aggregated logs of historical apps from timeline service. Contributed by Rohith Sharma KS
This commit is contained in:
parent
d91d47bc73
commit
8d1981806f
|
@ -18,13 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
@ -35,15 +29,16 @@ import javax.ws.rs.Path;
|
|||
import javax.ws.rs.PathParam;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.WebApplicationException;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.GenericEntity;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.StreamingOutput;
|
||||
import javax.ws.rs.core.Response.ResponseBuilder;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.sun.jersey.api.client.ClientHandlerException;
|
||||
import com.sun.jersey.api.client.UniformInterfaceException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -54,10 +49,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
|
||||
import org.apache.hadoop.yarn.server.webapp.LogWebServiceUtils;
|
||||
import org.apache.hadoop.yarn.server.webapp.WebServices;
|
||||
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||
|
@ -65,21 +58,14 @@ import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo;
|
|||
import org.apache.hadoop.yarn.server.webapp.dao.AppInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils;
|
||||
import org.codehaus.jettison.json.JSONException;
|
||||
import org.codehaus.jettison.json.JSONObject;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import com.sun.jersey.api.client.ClientHandlerException;
|
||||
import com.sun.jersey.api.client.UniformInterfaceException;
|
||||
import org.codehaus.jettison.json.JSONException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -271,15 +257,18 @@ public class AHSWebServices extends WebServices {
|
|||
appInfo = super.getApp(req, res, appId.toString());
|
||||
} catch (Exception ex) {
|
||||
// directly find logs from HDFS.
|
||||
return getContainerLogMeta(appId, null, null, containerIdStr, false);
|
||||
return LogWebServiceUtils
|
||||
.getContainerLogMeta(factory, appId, null, null, containerIdStr,
|
||||
false);
|
||||
}
|
||||
// if the application finishes, directly find logs
|
||||
// from HDFS.
|
||||
if (isFinishedState(appInfo.getAppState())) {
|
||||
return getContainerLogMeta(appId, null, null,
|
||||
containerIdStr, false);
|
||||
if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
|
||||
return LogWebServiceUtils
|
||||
.getContainerLogMeta(factory, appId, null, null, containerIdStr,
|
||||
false);
|
||||
}
|
||||
if (isRunningState(appInfo.getAppState())) {
|
||||
if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
|
||||
String appOwner = appInfo.getUser();
|
||||
String nodeHttpAddress = null;
|
||||
if (nmId != null && !nmId.isEmpty()) {
|
||||
|
@ -301,8 +290,9 @@ public class AHSWebServices extends WebServices {
|
|||
} catch (Exception ex) {
|
||||
// return log meta for the aggregated logs if exists.
|
||||
// It will also return empty log meta for the local logs.
|
||||
return getContainerLogMeta(appId, appOwner, null,
|
||||
containerIdStr, true);
|
||||
return LogWebServiceUtils
|
||||
.getContainerLogMeta(factory, appId, appOwner, null,
|
||||
containerIdStr, true);
|
||||
}
|
||||
nodeHttpAddress = containerInfo.getNodeHttpAddress();
|
||||
// make sure nodeHttpAddress is not null and not empty. Otherwise,
|
||||
|
@ -314,12 +304,14 @@ public class AHSWebServices extends WebServices {
|
|||
// It will also return empty log meta for the local logs.
|
||||
// If this is the redirect request from NM, we should not
|
||||
// re-direct the request back. Simply output the aggregated log meta.
|
||||
return getContainerLogMeta(appId, appOwner, null,
|
||||
containerIdStr, true);
|
||||
return LogWebServiceUtils
|
||||
.getContainerLogMeta(factory, appId, appOwner, null,
|
||||
containerIdStr, true);
|
||||
}
|
||||
}
|
||||
String uri = "/" + containerId.toString() + "/logs";
|
||||
String resURI = JOINER.join(getAbsoluteNMWebAddress(nodeHttpAddress),
|
||||
String resURI = JOINER.join(
|
||||
LogWebServiceUtils.getAbsoluteNMWebAddress(conf, nodeHttpAddress),
|
||||
NM_DOWNLOAD_URI_STR, uri);
|
||||
String query = req.getQueryString();
|
||||
if (query != null && !query.isEmpty()) {
|
||||
|
@ -397,11 +389,11 @@ public class AHSWebServices extends WebServices {
|
|||
try {
|
||||
containerId = ContainerId.fromString(containerIdStr);
|
||||
} catch (IllegalArgumentException ex) {
|
||||
return createBadResponse(Status.NOT_FOUND,
|
||||
return LogWebServiceUtils.createBadResponse(Status.NOT_FOUND,
|
||||
"Invalid ContainerId: " + containerIdStr);
|
||||
}
|
||||
|
||||
final long length = parseLongParam(size);
|
||||
final long length = LogWebServiceUtils.parseLongParam(size);
|
||||
|
||||
ApplicationId appId = containerId.getApplicationAttemptId()
|
||||
.getApplicationId();
|
||||
|
@ -410,17 +402,19 @@ public class AHSWebServices extends WebServices {
|
|||
appInfo = super.getApp(req, res, appId.toString());
|
||||
} catch (Exception ex) {
|
||||
// directly find logs from HDFS.
|
||||
return sendStreamOutputResponse(appId, null, null, containerIdStr,
|
||||
filename, format, length, false);
|
||||
return LogWebServiceUtils
|
||||
.sendStreamOutputResponse(factory, appId, null, null, containerIdStr,
|
||||
filename, format, length, false);
|
||||
}
|
||||
String appOwner = appInfo.getUser();
|
||||
if (isFinishedState(appInfo.getAppState())) {
|
||||
if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
|
||||
// directly find logs from HDFS.
|
||||
return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
|
||||
filename, format, length, false);
|
||||
return LogWebServiceUtils
|
||||
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
||||
containerIdStr, filename, format, length, false);
|
||||
}
|
||||
|
||||
if (isRunningState(appInfo.getAppState())) {
|
||||
if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
|
||||
String nodeHttpAddress = null;
|
||||
if (nmId != null && !nmId.isEmpty()) {
|
||||
try {
|
||||
|
@ -440,8 +434,9 @@ public class AHSWebServices extends WebServices {
|
|||
containerId.toString());
|
||||
} catch (Exception ex) {
|
||||
// output the aggregated logs
|
||||
return sendStreamOutputResponse(appId, appOwner, null,
|
||||
containerIdStr, filename, format, length, true);
|
||||
return LogWebServiceUtils
|
||||
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
||||
containerIdStr, filename, format, length, true);
|
||||
}
|
||||
nodeHttpAddress = containerInfo.getNodeHttpAddress();
|
||||
// make sure nodeHttpAddress is not null and not empty. Otherwise,
|
||||
|
@ -452,12 +447,14 @@ public class AHSWebServices extends WebServices {
|
|||
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
|
||||
|| redirected_from_node) {
|
||||
// output the aggregated logs
|
||||
return sendStreamOutputResponse(appId, appOwner, null,
|
||||
containerIdStr, filename, format, length, true);
|
||||
return LogWebServiceUtils
|
||||
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
||||
containerIdStr, filename, format, length, true);
|
||||
}
|
||||
}
|
||||
String uri = "/" + containerId.toString() + "/logs/" + filename;
|
||||
String resURI = JOINER.join(getAbsoluteNMWebAddress(nodeHttpAddress),
|
||||
String resURI = JOINER.join(
|
||||
LogWebServiceUtils.getAbsoluteNMWebAddress(conf, nodeHttpAddress),
|
||||
NM_DOWNLOAD_URI_STR, uri);
|
||||
String query = req.getQueryString();
|
||||
if (query != null && !query.isEmpty()) {
|
||||
|
@ -468,170 +465,15 @@ public class AHSWebServices extends WebServices {
|
|||
response.header("Location", resURI);
|
||||
return response.build();
|
||||
} else {
|
||||
return createBadResponse(Status.NOT_FOUND,
|
||||
return LogWebServiceUtils.createBadResponse(Status.NOT_FOUND,
|
||||
"The application is not at Running or Finished State.");
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isRunningState(YarnApplicationState appState) {
|
||||
return appState == YarnApplicationState.RUNNING;
|
||||
}
|
||||
|
||||
private boolean isFinishedState(YarnApplicationState appState) {
|
||||
return appState == YarnApplicationState.FINISHED
|
||||
|| appState == YarnApplicationState.FAILED
|
||||
|| appState == YarnApplicationState.KILLED;
|
||||
}
|
||||
|
||||
private Response createBadResponse(Status status, String errMessage) {
|
||||
Response response = Response.status(status)
|
||||
.entity(DOT_JOINER.join(status.toString(), errMessage)).build();
|
||||
return response;
|
||||
}
|
||||
|
||||
private Response sendStreamOutputResponse(ApplicationId appId,
|
||||
String appOwner, String nodeId, String containerIdStr,
|
||||
String fileName, String format, long bytes,
|
||||
boolean printEmptyLocalContainerLog) {
|
||||
String contentType = WebAppUtils.getDefaultLogContentType();
|
||||
if (format != null && !format.isEmpty()) {
|
||||
contentType = WebAppUtils.getSupportedLogContentType(format);
|
||||
if (contentType == null) {
|
||||
String errorMessage = "The valid values for the parameter : format "
|
||||
+ "are " + WebAppUtils.listSupportedLogContentType();
|
||||
return Response.status(Status.BAD_REQUEST).entity(errorMessage)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
StreamingOutput stream = null;
|
||||
try {
|
||||
stream = getStreamingOutput(appId, appOwner, nodeId,
|
||||
containerIdStr, fileName, bytes, printEmptyLocalContainerLog);
|
||||
} catch (Exception ex) {
|
||||
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
|
||||
ex.getMessage());
|
||||
}
|
||||
ResponseBuilder response = Response.ok(stream);
|
||||
response.header("Content-Type", contentType);
|
||||
// Sending the X-Content-Type-Options response header with the value
|
||||
// nosniff will prevent Internet Explorer from MIME-sniffing a response
|
||||
// away from the declared content-type.
|
||||
response.header("X-Content-Type-Options", "nosniff");
|
||||
return response.build();
|
||||
}
|
||||
|
||||
private StreamingOutput getStreamingOutput(final ApplicationId appId,
|
||||
final String appOwner, final String nodeId, final String containerIdStr,
|
||||
final String logFile, final long bytes,
|
||||
final boolean printEmptyLocalContainerLog) throws IOException{
|
||||
StreamingOutput stream = new StreamingOutput() {
|
||||
|
||||
@Override
|
||||
public void write(OutputStream os) throws IOException,
|
||||
WebApplicationException {
|
||||
ContainerLogsRequest request = new ContainerLogsRequest();
|
||||
request.setAppId(appId);
|
||||
request.setAppOwner(appOwner);
|
||||
request.setContainerId(containerIdStr);
|
||||
request.setBytes(bytes);
|
||||
request.setNodeId(nodeId);
|
||||
Set<String> logTypes = new HashSet<>();
|
||||
logTypes.add(logFile);
|
||||
request.setLogTypes(logTypes);
|
||||
boolean findLogs = factory.getFileControllerForRead(appId, appOwner)
|
||||
.readAggregatedLogs(request, os);
|
||||
if (!findLogs) {
|
||||
os.write(("Can not find logs for container:"
|
||||
+ containerIdStr).getBytes(Charset.forName("UTF-8")));
|
||||
} else {
|
||||
if (printEmptyLocalContainerLog) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(containerIdStr + "\n");
|
||||
sb.append("LogAggregationType: "
|
||||
+ ContainerLogAggregationType.LOCAL + "\n");
|
||||
sb.append("LogContents:\n");
|
||||
sb.append(getNoRedirectWarning() + "\n");
|
||||
os.write(sb.toString().getBytes(Charset.forName("UTF-8")));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
return stream;
|
||||
}
|
||||
|
||||
private long parseLongParam(String bytes) {
|
||||
if (bytes == null || bytes.isEmpty()) {
|
||||
return Long.MAX_VALUE;
|
||||
}
|
||||
return Long.parseLong(bytes);
|
||||
}
|
||||
|
||||
private Response getContainerLogMeta(ApplicationId appId, String appOwner,
|
||||
final String nodeId, final String containerIdStr,
|
||||
boolean emptyLocalContainerLogMeta) {
|
||||
try {
|
||||
ContainerLogsRequest request = new ContainerLogsRequest();
|
||||
request.setAppId(appId);
|
||||
request.setAppOwner(appOwner);
|
||||
request.setContainerId(containerIdStr);
|
||||
request.setNodeId(nodeId);
|
||||
List<ContainerLogMeta> containerLogMeta = factory
|
||||
.getFileControllerForRead(appId, appOwner)
|
||||
.readAggregatedLogsMeta(request);
|
||||
if (containerLogMeta.isEmpty()) {
|
||||
throw new NotFoundException(
|
||||
"Can not get log meta for container: " + containerIdStr);
|
||||
}
|
||||
List<ContainerLogsInfo> containersLogsInfo = new ArrayList<>();
|
||||
for (ContainerLogMeta meta : containerLogMeta) {
|
||||
ContainerLogsInfo logInfo = new ContainerLogsInfo(meta,
|
||||
ContainerLogAggregationType.AGGREGATED);
|
||||
containersLogsInfo.add(logInfo);
|
||||
}
|
||||
if (emptyLocalContainerLogMeta) {
|
||||
ContainerLogMeta emptyMeta = new ContainerLogMeta(
|
||||
containerIdStr, "N/A");
|
||||
ContainerLogsInfo empty = new ContainerLogsInfo(emptyMeta,
|
||||
ContainerLogAggregationType.LOCAL);
|
||||
containersLogsInfo.add(empty);
|
||||
}
|
||||
GenericEntity<List<ContainerLogsInfo>> meta = new GenericEntity<List<
|
||||
ContainerLogsInfo>>(containersLogsInfo){};
|
||||
ResponseBuilder response = Response.ok(meta);
|
||||
// Sending the X-Content-Type-Options response header with the value
|
||||
// nosniff will prevent Internet Explorer from MIME-sniffing a response
|
||||
// away from the declared content-type.
|
||||
response.header("X-Content-Type-Options", "nosniff");
|
||||
return response.build();
|
||||
} catch (Exception ex) {
|
||||
throw new WebApplicationException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public static String getNoRedirectWarning() {
|
||||
return "We do not have NodeManager web address, so we can not "
|
||||
+ "re-direct the request to related NodeManager "
|
||||
+ "for local container logs.";
|
||||
}
|
||||
|
||||
private String getAbsoluteNMWebAddress(String nmWebAddress) {
|
||||
if (nmWebAddress.contains(WebAppUtils.HTTP_PREFIX) ||
|
||||
nmWebAddress.contains(WebAppUtils.HTTPS_PREFIX)) {
|
||||
return nmWebAddress;
|
||||
}
|
||||
return WebAppUtils.getHttpSchemePrefix(conf) + nmWebAddress;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@Private
|
||||
@VisibleForTesting @InterfaceAudience.Private
|
||||
public String getNMWebAddressFromRM(Configuration configuration,
|
||||
String nodeId) throws ClientHandlerException,
|
||||
UniformInterfaceException, JSONException {
|
||||
JSONObject nodeInfo = YarnWebServiceUtils.getNodeInfoFromRMWebService(
|
||||
configuration, nodeId).getJSONObject("node");
|
||||
return nodeInfo.has("nodeHTTPAddress") ?
|
||||
nodeInfo.getString("nodeHTTPAddress") : null;
|
||||
String nodeId)
|
||||
throws ClientHandlerException, UniformInterfaceException, JSONException {
|
||||
return LogWebServiceUtils.getNMWebAddressFromRM(configuration, nodeId);
|
||||
}
|
||||
}
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
|||
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
|
||||
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
||||
import org.apache.hadoop.yarn.server.webapp.LogWebServiceUtils;
|
||||
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
|
||||
|
@ -774,7 +775,8 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
// the warning message.
|
||||
assertTrue(responseText.contains("LogAggregationType: "
|
||||
+ ContainerLogAggregationType.LOCAL));
|
||||
assertTrue(responseText.contains(AHSWebServices.getNoRedirectWarning()));
|
||||
assertTrue(
|
||||
responseText.contains(LogWebServiceUtils.getNoRedirectWarning()));
|
||||
|
||||
// If we can not container information from ATS, and we specify the NM id,
|
||||
// but we can not get nm web address, we would still try to
|
||||
|
@ -790,7 +792,8 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
assertTrue(responseText.contains(content));
|
||||
assertTrue(responseText.contains("LogAggregationType: "
|
||||
+ ContainerLogAggregationType.LOCAL));
|
||||
assertTrue(responseText.contains(AHSWebServices.getNoRedirectWarning()));
|
||||
assertTrue(
|
||||
responseText.contains(LogWebServiceUtils.getNoRedirectWarning()));
|
||||
|
||||
// If this is the redirect request, we would not re-direct the request
|
||||
// back and get the aggregated logs.
|
||||
|
|
|
@ -0,0 +1,506 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.webapp;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.inject.Singleton;
|
||||
import com.sun.jersey.api.client.Client;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
import com.sun.jersey.api.client.config.ClientConfig;
|
||||
import com.sun.jersey.api.client.config.DefaultClientConfig;
|
||||
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
|
||||
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
|
||||
import com.sun.jersey.core.util.MultivaluedMapImpl;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.http.JettyUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
||||
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
||||
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.ws.rs.DefaultValue;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.MultivaluedMap;
|
||||
import javax.ws.rs.core.Response;
|
||||
import java.io.IOException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
/**
|
||||
* Support only ATSv2 client only.
|
||||
*/
|
||||
@Singleton @Path("/ws/v2/applicationlog") public class LogWebService {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(LogWebService.class);
|
||||
private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
|
||||
private static final String NM_DOWNLOAD_URI_STR = "/ws/v1/node/containers";
|
||||
private static final Joiner JOINER = Joiner.on("");
|
||||
private static Configuration yarnConf = new YarnConfiguration();
|
||||
private static LogAggregationFileControllerFactory factory;
|
||||
private static String base;
|
||||
private static String defaultClusterid;
|
||||
private volatile Client webTimelineClient;
|
||||
|
||||
static {
|
||||
init();
|
||||
}
|
||||
|
||||
// initialize all the common resources - order is important
|
||||
private static void init() {
|
||||
factory = new LogAggregationFileControllerFactory(yarnConf);
|
||||
base = JOINER.join(WebAppUtils.getHttpSchemePrefix(yarnConf),
|
||||
WebAppUtils.getTimelineReaderWebAppURLWithoutScheme(yarnConf),
|
||||
RESOURCE_URI_STR_V2);
|
||||
defaultClusterid = yarnConf.get(YarnConfiguration.RM_CLUSTER_ID,
|
||||
YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
|
||||
LOG.info("Initialized LogWeService with clusterid " + defaultClusterid
|
||||
+ " for URI: " + base);
|
||||
}
|
||||
|
||||
private Client createTimelineWebClient() {
|
||||
ClientConfig cfg = new DefaultClientConfig();
|
||||
cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
|
||||
Client client = new Client(
|
||||
new URLConnectionClientHandler(new HttpURLConnectionFactory() {
|
||||
@Override public HttpURLConnection getHttpURLConnection(URL url)
|
||||
throws IOException {
|
||||
AuthenticatedURL.Token token = new AuthenticatedURL.Token();
|
||||
HttpURLConnection conn = null;
|
||||
try {
|
||||
conn = new AuthenticatedURL().openConnection(url, token);
|
||||
LOG.info("LogWeService:Connecetion created.");
|
||||
} catch (AuthenticationException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
}), cfg);
|
||||
|
||||
return client;
|
||||
}
|
||||
|
||||
private void initForReadableEndpoints(HttpServletResponse response) {
|
||||
// clear content type
|
||||
response.setContentType(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns log file's name as well as current file size for a container.
|
||||
*
|
||||
* @param req HttpServletRequest
|
||||
* @param res HttpServletResponse
|
||||
* @param containerIdStr The container ID
|
||||
* @param nmId The Node Manager NodeId
|
||||
* @param redirectedFromNode Whether this is a redirected request from NM
|
||||
* @return The log file's name and current file size
|
||||
*/
|
||||
@GET @Path("/containers/{containerid}/logs")
|
||||
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
||||
public Response getContainerLogsInfo(@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
|
||||
@QueryParam(YarnWebServiceParams.NM_ID) String nmId,
|
||||
@QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
|
||||
@DefaultValue("false") boolean redirectedFromNode,
|
||||
@QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) {
|
||||
ContainerId containerId = null;
|
||||
initForReadableEndpoints(res);
|
||||
try {
|
||||
containerId = ContainerId.fromString(containerIdStr);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new BadRequestException("invalid container id, " + containerIdStr);
|
||||
}
|
||||
|
||||
ApplicationId appId =
|
||||
containerId.getApplicationAttemptId().getApplicationId();
|
||||
AppInfo appInfo;
|
||||
try {
|
||||
appInfo = getApp(req, appId.toString(), clusterId);
|
||||
} catch (Exception ex) {
|
||||
// directly find logs from HDFS.
|
||||
return LogWebServiceUtils
|
||||
.getContainerLogMeta(factory, appId, null, null, containerIdStr,
|
||||
false);
|
||||
}
|
||||
// if the application finishes, directly find logs
|
||||
// from HDFS.
|
||||
if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
|
||||
return LogWebServiceUtils
|
||||
.getContainerLogMeta(factory, appId, null, null, containerIdStr,
|
||||
false);
|
||||
}
|
||||
if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
|
||||
String appOwner = appInfo.getUser();
|
||||
String nodeHttpAddress = null;
|
||||
if (nmId != null && !nmId.isEmpty()) {
|
||||
try {
|
||||
nodeHttpAddress =
|
||||
LogWebServiceUtils.getNMWebAddressFromRM(yarnConf, nmId);
|
||||
} catch (Exception ex) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(ex.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
|
||||
ContainerInfo containerInfo;
|
||||
try {
|
||||
containerInfo =
|
||||
getContainer(req, appId.toString(), containerId.toString(),
|
||||
clusterId);
|
||||
} catch (Exception ex) {
|
||||
// return log meta for the aggregated logs if exists.
|
||||
// It will also return empty log meta for the local logs.
|
||||
return LogWebServiceUtils
|
||||
.getContainerLogMeta(factory, appId, appOwner, null,
|
||||
containerIdStr, true);
|
||||
}
|
||||
nodeHttpAddress = containerInfo.getNodeHttpAddress();
|
||||
// make sure nodeHttpAddress is not null and not empty. Otherwise,
|
||||
// we would only get log meta for aggregated logs instead of
|
||||
// re-directing the request
|
||||
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
|
||||
|| redirectedFromNode) {
|
||||
// return log meta for the aggregated logs if exists.
|
||||
// It will also return empty log meta for the local logs.
|
||||
// If this is the redirect request from NM, we should not
|
||||
// re-direct the request back. Simply output the aggregated log meta.
|
||||
return LogWebServiceUtils
|
||||
.getContainerLogMeta(factory, appId, appOwner, null,
|
||||
containerIdStr, true);
|
||||
}
|
||||
}
|
||||
String uri = "/" + containerId.toString() + "/logs";
|
||||
String resURI = JOINER.join(
|
||||
LogWebServiceUtils.getAbsoluteNMWebAddress(yarnConf, nodeHttpAddress),
|
||||
NM_DOWNLOAD_URI_STR, uri);
|
||||
String query = req.getQueryString();
|
||||
if (query != null && !query.isEmpty()) {
|
||||
resURI += "?" + query;
|
||||
}
|
||||
Response.ResponseBuilder response =
|
||||
Response.status(HttpServletResponse.SC_TEMPORARY_REDIRECT);
|
||||
response.header("Location", resURI);
|
||||
return response.build();
|
||||
} else {
|
||||
throw new NotFoundException(
|
||||
"The application is not at Running or Finished State.");
|
||||
}
|
||||
}
|
||||
|
||||
protected ContainerInfo getContainer(HttpServletRequest req, String appId,
|
||||
String containerId, String clusterId) {
|
||||
UserGroupInformation callerUGI = LogWebServiceUtils.getUser(req);
|
||||
String cId = clusterId != null ? clusterId : defaultClusterid;
|
||||
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
|
||||
params.add("fields", "INFO");
|
||||
String path = JOINER.join("clusters/", cId, "/apps/", appId, "/entities/",
|
||||
TimelineEntityType.YARN_CONTAINER.toString(), "/", containerId);
|
||||
TimelineEntity conEntity = null;
|
||||
try {
|
||||
if (callerUGI == null) {
|
||||
conEntity = getEntity(path, params);
|
||||
} else {
|
||||
setUserName(params, callerUGI.getShortUserName());
|
||||
conEntity =
|
||||
callerUGI.doAs(new PrivilegedExceptionAction<TimelineEntity>() {
|
||||
@Override public TimelineEntity run() throws Exception {
|
||||
return getEntity(path, params);
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LogWebServiceUtils.rewrapAndThrowException(e);
|
||||
}
|
||||
if (conEntity == null) {
|
||||
return null;
|
||||
}
|
||||
String nodeHttpAddress = (String) conEntity.getInfo()
|
||||
.get(ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO);
|
||||
|
||||
ContainerInfo info = new ContainerInfo(nodeHttpAddress);
|
||||
return info;
|
||||
}
|
||||
|
||||
protected AppInfo getApp(HttpServletRequest req, String appId,
|
||||
String clusterId) {
|
||||
UserGroupInformation callerUGI = LogWebServiceUtils.getUser(req);
|
||||
|
||||
String cId = clusterId != null ? clusterId : defaultClusterid;
|
||||
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
|
||||
params.add("fields", "INFO");
|
||||
String path = JOINER.join("clusters/", cId, "/apps/", appId);
|
||||
TimelineEntity appEntity = null;
|
||||
|
||||
try {
|
||||
if (callerUGI == null) {
|
||||
appEntity = getEntity(path, params);
|
||||
} else {
|
||||
setUserName(params, callerUGI.getShortUserName());
|
||||
appEntity =
|
||||
callerUGI.doAs(new PrivilegedExceptionAction<TimelineEntity>() {
|
||||
@Override public TimelineEntity run() throws Exception {
|
||||
return getEntity(path, params);
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LogWebServiceUtils.rewrapAndThrowException(e);
|
||||
}
|
||||
|
||||
if (appEntity == null) {
|
||||
return null;
|
||||
}
|
||||
String appOwner = (String) appEntity.getInfo()
|
||||
.get(ApplicationMetricsConstants.USER_ENTITY_INFO);
|
||||
String state = (String) appEntity.getInfo()
|
||||
.get(ApplicationMetricsConstants.STATE_EVENT_INFO);
|
||||
YarnApplicationState appState = YarnApplicationState.valueOf(state);
|
||||
AppInfo info = new AppInfo(appState, appOwner);
|
||||
return info;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the contents of a container's log file in plain text.
|
||||
*
|
||||
* @param req HttpServletRequest
|
||||
* @param res HttpServletResponse
|
||||
* @param containerIdStr The container ID
|
||||
* @param filename The name of the log file
|
||||
* @param format The content type
|
||||
* @param size the size of the log file
|
||||
* @param nmId The Node Manager NodeId
|
||||
* @param redirectedFromNode Whether this is the redirect request from NM
|
||||
* @return The contents of the container's log file
|
||||
*/
|
||||
@GET @Path("/containers/{containerid}/logs/{filename}")
|
||||
@Produces({ MediaType.TEXT_PLAIN }) @InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable public Response getContainerLogFile(
|
||||
@Context HttpServletRequest req, @Context HttpServletResponse res,
|
||||
@PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
|
||||
@PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String filename,
|
||||
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_FORMAT) String format,
|
||||
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_SIZE) String size,
|
||||
@QueryParam(YarnWebServiceParams.NM_ID) String nmId,
|
||||
@QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
|
||||
boolean redirectedFromNode,
|
||||
@QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) {
|
||||
return getLogs(req, res, containerIdStr, filename, format, size, nmId,
|
||||
redirectedFromNode, clusterId);
|
||||
}
|
||||
|
||||
//TODO: YARN-4993: Refactory ContainersLogsBlock, AggregatedLogsBlock and
|
||||
// container log webservice introduced in AHS to minimize
|
||||
// the duplication.
|
||||
@GET @Path("/containerlogs/{containerid}/{filename}")
|
||||
@Produces({ MediaType.TEXT_PLAIN + "; " + JettyUtils.UTF_8 })
|
||||
@InterfaceAudience.Public @InterfaceStability.Unstable
|
||||
public Response getLogs(@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
|
||||
@PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String filename,
|
||||
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_FORMAT) String format,
|
||||
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_SIZE) String size,
|
||||
@QueryParam(YarnWebServiceParams.NM_ID) String nmId,
|
||||
@QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
|
||||
@DefaultValue("false") boolean redirectedFromNode,
|
||||
@QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) {
|
||||
initForReadableEndpoints(res);
|
||||
ContainerId containerId;
|
||||
try {
|
||||
containerId = ContainerId.fromString(containerIdStr);
|
||||
} catch (IllegalArgumentException ex) {
|
||||
return LogWebServiceUtils.createBadResponse(Response.Status.NOT_FOUND,
|
||||
"Invalid ContainerId: " + containerIdStr);
|
||||
}
|
||||
|
||||
final long length = LogWebServiceUtils.parseLongParam(size);
|
||||
|
||||
ApplicationId appId =
|
||||
containerId.getApplicationAttemptId().getApplicationId();
|
||||
AppInfo appInfo;
|
||||
try {
|
||||
appInfo = getApp(req, appId.toString(), clusterId);
|
||||
} catch (Exception ex) {
|
||||
// directly find logs from HDFS.
|
||||
return LogWebServiceUtils
|
||||
.sendStreamOutputResponse(factory, appId, null, null, containerIdStr,
|
||||
filename, format, length, false);
|
||||
}
|
||||
String appOwner = appInfo.getUser();
|
||||
if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
|
||||
// directly find logs from HDFS.
|
||||
return LogWebServiceUtils
|
||||
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
||||
containerIdStr, filename, format, length, false);
|
||||
}
|
||||
|
||||
if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
|
||||
String nodeHttpAddress = null;
|
||||
if (nmId != null && !nmId.isEmpty()) {
|
||||
try {
|
||||
nodeHttpAddress =
|
||||
LogWebServiceUtils.getNMWebAddressFromRM(yarnConf, nmId);
|
||||
} catch (Exception ex) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(ex.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
|
||||
ContainerInfo containerInfo;
|
||||
try {
|
||||
containerInfo =
|
||||
getContainer(req, appId.toString(), containerId.toString(),
|
||||
clusterId);
|
||||
} catch (Exception ex) {
|
||||
// output the aggregated logs
|
||||
return LogWebServiceUtils
|
||||
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
||||
containerIdStr, filename, format, length, true);
|
||||
}
|
||||
nodeHttpAddress = containerInfo.getNodeHttpAddress();
|
||||
// make sure nodeHttpAddress is not null and not empty. Otherwise,
|
||||
// we would only get aggregated logs instead of re-directing the
|
||||
// request.
|
||||
// If this is the redirect request from NM, we should not re-direct the
|
||||
// request back. Simply output the aggregated logs.
|
||||
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
|
||||
|| redirectedFromNode) {
|
||||
// output the aggregated logs
|
||||
return LogWebServiceUtils
|
||||
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
||||
containerIdStr, filename, format, length, true);
|
||||
}
|
||||
}
|
||||
String uri = "/" + containerId.toString() + "/logs/" + filename;
|
||||
String resURI = JOINER.join(
|
||||
LogWebServiceUtils.getAbsoluteNMWebAddress(yarnConf, nodeHttpAddress),
|
||||
NM_DOWNLOAD_URI_STR, uri);
|
||||
String query = req.getQueryString();
|
||||
if (query != null && !query.isEmpty()) {
|
||||
resURI += "?" + query;
|
||||
}
|
||||
Response.ResponseBuilder response =
|
||||
Response.status(HttpServletResponse.SC_TEMPORARY_REDIRECT);
|
||||
response.header("Location", resURI);
|
||||
return response.build();
|
||||
} else {
|
||||
return LogWebServiceUtils.createBadResponse(Response.Status.NOT_FOUND,
|
||||
"The application is not at Running or Finished State.");
|
||||
}
|
||||
}
|
||||
|
||||
protected static class AppInfo {
|
||||
private YarnApplicationState appState;
|
||||
private String user;
|
||||
|
||||
AppInfo(YarnApplicationState appState, String user) {
|
||||
this.appState = appState;
|
||||
this.user = user;
|
||||
}
|
||||
|
||||
public YarnApplicationState getAppState() {
|
||||
return this.appState;
|
||||
}
|
||||
|
||||
public String getUser() {
|
||||
return this.user;
|
||||
}
|
||||
}
|
||||
|
||||
protected static class ContainerInfo {
|
||||
private String nodeHttpAddress;
|
||||
|
||||
ContainerInfo(String nodeHttpAddress) {
|
||||
this.nodeHttpAddress = nodeHttpAddress;
|
||||
}
|
||||
|
||||
public String getNodeHttpAddress() {
|
||||
return nodeHttpAddress;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting protected TimelineEntity getEntity(String path,
|
||||
MultivaluedMap<String, String> params) throws IOException {
|
||||
ClientResponse resp =
|
||||
getClient().resource(base).path(path).queryParams(params)
|
||||
.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON)
|
||||
.get(ClientResponse.class);
|
||||
if (resp == null
|
||||
|| resp.getStatusInfo().getStatusCode() != ClientResponse.Status.OK
|
||||
.getStatusCode()) {
|
||||
String msg =
|
||||
"Response from the timeline reader server is " + ((resp == null) ?
|
||||
"null" :
|
||||
"not successful," + " HTTP error code: " + resp.getStatus()
|
||||
+ ", Server response:\n" + resp.getEntity(String.class));
|
||||
LOG.error(msg);
|
||||
throw new IOException(msg);
|
||||
}
|
||||
TimelineEntity entity = resp.getEntity(TimelineEntity.class);
|
||||
return entity;
|
||||
}
|
||||
|
||||
private Client getClient() {
|
||||
if (webTimelineClient == null) {
|
||||
synchronized (LogWebService.class) {
|
||||
if (webTimelineClient == null) {
|
||||
webTimelineClient = createTimelineWebClient();
|
||||
}
|
||||
}
|
||||
}
|
||||
return webTimelineClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set user.name in non-secure mode to delegate to next rest call.
|
||||
*/
|
||||
private void setUserName(MultivaluedMap<String, String> params, String user) {
|
||||
if (!UserGroupInformation.isSecurityEnabled()) {
|
||||
params.add("user.name", user);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,258 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.webapp;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.sun.jersey.api.client.ClientHandlerException;
|
||||
import com.sun.jersey.api.client.UniformInterfaceException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
|
||||
import org.apache.hadoop.yarn.webapp.ForbiddenException;
|
||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils;
|
||||
import org.codehaus.jettison.json.JSONException;
|
||||
import org.codehaus.jettison.json.JSONObject;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.WebApplicationException;
|
||||
import javax.ws.rs.core.GenericEntity;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.StreamingOutput;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Log web service utils class.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public final class LogWebServiceUtils {
|
||||
|
||||
private LogWebServiceUtils() {
|
||||
}
|
||||
|
||||
private static final Joiner DOT_JOINER = Joiner.on(". ");
|
||||
|
||||
public static Response getContainerLogMeta(
|
||||
LogAggregationFileControllerFactory factory, ApplicationId appId,
|
||||
String appOwner, final String nodeId, final String containerIdStr,
|
||||
boolean emptyLocalContainerLogMeta) {
|
||||
try {
|
||||
ContainerLogsRequest request = new ContainerLogsRequest();
|
||||
request.setAppId(appId);
|
||||
request.setAppOwner(appOwner);
|
||||
request.setContainerId(containerIdStr);
|
||||
request.setNodeId(nodeId);
|
||||
List<ContainerLogMeta> containerLogMeta =
|
||||
factory.getFileControllerForRead(appId, appOwner)
|
||||
.readAggregatedLogsMeta(request);
|
||||
if (containerLogMeta.isEmpty()) {
|
||||
throw new NotFoundException(
|
||||
"Can not get log meta for container: " + containerIdStr);
|
||||
}
|
||||
List<ContainerLogsInfo> containersLogsInfo = new ArrayList<>();
|
||||
for (ContainerLogMeta meta : containerLogMeta) {
|
||||
ContainerLogsInfo logInfo =
|
||||
new ContainerLogsInfo(meta, ContainerLogAggregationType.AGGREGATED);
|
||||
containersLogsInfo.add(logInfo);
|
||||
}
|
||||
if (emptyLocalContainerLogMeta) {
|
||||
ContainerLogMeta emptyMeta =
|
||||
new ContainerLogMeta(containerIdStr, "N/A");
|
||||
ContainerLogsInfo empty =
|
||||
new ContainerLogsInfo(emptyMeta, ContainerLogAggregationType.LOCAL);
|
||||
containersLogsInfo.add(empty);
|
||||
}
|
||||
GenericEntity<List<ContainerLogsInfo>> meta =
|
||||
new GenericEntity<List<ContainerLogsInfo>>(containersLogsInfo) {
|
||||
};
|
||||
Response.ResponseBuilder response = Response.ok(meta);
|
||||
// Sending the X-Content-Type-Options response header with the value
|
||||
// nosniff will prevent Internet Explorer from MIME-sniffing a response
|
||||
// away from the declared content-type.
|
||||
response.header("X-Content-Type-Options", "nosniff");
|
||||
return response.build();
|
||||
} catch (Exception ex) {
|
||||
throw new WebApplicationException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
public static Response sendStreamOutputResponse(
|
||||
LogAggregationFileControllerFactory factory, ApplicationId appId,
|
||||
String appOwner, String nodeId, String containerIdStr, String fileName,
|
||||
String format, long bytes, boolean printEmptyLocalContainerLog) {
|
||||
String contentType = WebAppUtils.getDefaultLogContentType();
|
||||
if (format != null && !format.isEmpty()) {
|
||||
contentType = WebAppUtils.getSupportedLogContentType(format);
|
||||
if (contentType == null) {
|
||||
String errorMessage =
|
||||
"The valid values for the parameter : format " + "are "
|
||||
+ WebAppUtils.listSupportedLogContentType();
|
||||
return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
StreamingOutput stream = null;
|
||||
try {
|
||||
stream =
|
||||
getStreamingOutput(factory, appId, appOwner, nodeId, containerIdStr,
|
||||
fileName, bytes, printEmptyLocalContainerLog);
|
||||
} catch (Exception ex) {
|
||||
return createBadResponse(Response.Status.INTERNAL_SERVER_ERROR,
|
||||
ex.getMessage());
|
||||
}
|
||||
Response.ResponseBuilder response = Response.ok(stream);
|
||||
response.header("Content-Type", contentType);
|
||||
// Sending the X-Content-Type-Options response header with the value
|
||||
// nosniff will prevent Internet Explorer from MIME-sniffing a response
|
||||
// away from the declared content-type.
|
||||
response.header("X-Content-Type-Options", "nosniff");
|
||||
return response.build();
|
||||
}
|
||||
|
||||
private static StreamingOutput getStreamingOutput(
|
||||
final LogAggregationFileControllerFactory factory,
|
||||
final ApplicationId appId, final String appOwner, final String nodeId,
|
||||
final String containerIdStr, final String logFile, final long bytes,
|
||||
final boolean printEmptyLocalContainerLog) throws IOException {
|
||||
StreamingOutput stream = new StreamingOutput() {
|
||||
|
||||
@Override public void write(OutputStream os)
|
||||
throws IOException, WebApplicationException {
|
||||
ContainerLogsRequest request = new ContainerLogsRequest();
|
||||
request.setAppId(appId);
|
||||
request.setAppOwner(appOwner);
|
||||
request.setContainerId(containerIdStr);
|
||||
request.setBytes(bytes);
|
||||
request.setNodeId(nodeId);
|
||||
Set<String> logTypes = new HashSet<>();
|
||||
logTypes.add(logFile);
|
||||
request.setLogTypes(logTypes);
|
||||
boolean findLogs = factory.getFileControllerForRead(appId, appOwner)
|
||||
.readAggregatedLogs(request, os);
|
||||
if (!findLogs) {
|
||||
os.write(("Can not find logs for container:" + containerIdStr)
|
||||
.getBytes(Charset.forName("UTF-8")));
|
||||
} else {
|
||||
if (printEmptyLocalContainerLog) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(containerIdStr + "\n");
|
||||
sb.append("LogAggregationType: " + ContainerLogAggregationType.LOCAL
|
||||
+ "\n");
|
||||
sb.append("LogContents:\n");
|
||||
sb.append(getNoRedirectWarning() + "\n");
|
||||
os.write(sb.toString().getBytes(Charset.forName("UTF-8")));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
return stream;
|
||||
}
|
||||
|
||||
public static String getNoRedirectWarning() {
|
||||
return "We do not have NodeManager web address, so we can not "
|
||||
+ "re-direct the request to related NodeManager "
|
||||
+ "for local container logs.";
|
||||
}
|
||||
|
||||
public static void rewrapAndThrowException(Exception e) {
|
||||
if (e instanceof UndeclaredThrowableException) {
|
||||
rewrapAndThrowThrowable(e.getCause());
|
||||
} else {
|
||||
rewrapAndThrowThrowable(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void rewrapAndThrowThrowable(Throwable t) {
|
||||
if (t instanceof AuthorizationException) {
|
||||
throw new ForbiddenException(t);
|
||||
} else {
|
||||
throw new WebApplicationException(t);
|
||||
}
|
||||
}
|
||||
|
||||
public static long parseLongParam(String bytes) {
|
||||
if (bytes == null || bytes.isEmpty()) {
|
||||
return Long.MAX_VALUE;
|
||||
}
|
||||
return Long.parseLong(bytes);
|
||||
}
|
||||
|
||||
public static Response createBadResponse(Response.Status status,
|
||||
String errMessage) {
|
||||
Response response = Response.status(status)
|
||||
.entity(DOT_JOINER.join(status.toString(), errMessage)).build();
|
||||
return response;
|
||||
}
|
||||
|
||||
public static boolean isRunningState(YarnApplicationState appState) {
|
||||
return appState == YarnApplicationState.RUNNING;
|
||||
}
|
||||
|
||||
public static boolean isFinishedState(YarnApplicationState appState) {
|
||||
return appState == YarnApplicationState.FINISHED
|
||||
|| appState == YarnApplicationState.FAILED
|
||||
|| appState == YarnApplicationState.KILLED;
|
||||
}
|
||||
|
||||
protected static UserGroupInformation getUser(HttpServletRequest req) {
|
||||
String remoteUser = req.getRemoteUser();
|
||||
UserGroupInformation callerUGI = null;
|
||||
if (remoteUser != null) {
|
||||
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
|
||||
}
|
||||
return callerUGI;
|
||||
}
|
||||
|
||||
public static String getNMWebAddressFromRM(Configuration yarnConf,
|
||||
String nodeId)
|
||||
throws ClientHandlerException, UniformInterfaceException, JSONException {
|
||||
JSONObject nodeInfo =
|
||||
YarnWebServiceUtils.getNodeInfoFromRMWebService(yarnConf, nodeId)
|
||||
.getJSONObject("node");
|
||||
return nodeInfo.has("nodeHTTPAddress") ?
|
||||
nodeInfo.getString("nodeHTTPAddress") : null;
|
||||
}
|
||||
|
||||
public static String getAbsoluteNMWebAddress(Configuration yarnConf,
|
||||
String nmWebAddress) {
|
||||
if (nmWebAddress.contains(WebAppUtils.HTTP_PREFIX) || nmWebAddress
|
||||
.contains(WebAppUtils.HTTPS_PREFIX)) {
|
||||
return nmWebAddress;
|
||||
}
|
||||
return WebAppUtils.getHttpSchemePrefix(yarnConf) + nmWebAddress;
|
||||
}
|
||||
}
|
|
@ -35,4 +35,5 @@ public interface YarnWebServiceParams {
|
|||
String RESPONSE_CONTENT_SIZE = "size";
|
||||
String NM_ID = "nm.id";
|
||||
String REDIRECTED_FROM_NODE = "redirected_from_node";
|
||||
String CLUSTER_ID = "clusterid";
|
||||
}
|
||||
|
|
|
@ -0,0 +1,126 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.webapp;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.core.MultivaluedMap;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* test class for log web service.
|
||||
*/
|
||||
public class TestLogWebService {
|
||||
|
||||
private HttpServletRequest request;
|
||||
private LogWebServiceTest logWebService;
|
||||
private static TimelineEntity entity;
|
||||
private ApplicationId appId;
|
||||
private ContainerId cId;
|
||||
private String user = "user1";
|
||||
private Map<String, TimelineEntity> entities;
|
||||
private String nodeHttpAddress = "localhost:0";
|
||||
|
||||
@Before public void setup() throws Exception {
|
||||
appId = ApplicationId.fromString("application_1518143905142_509690");
|
||||
cId =
|
||||
ContainerId.fromString("container_e138_1518143905142_509690_01_000001");
|
||||
entities = new HashMap<>();
|
||||
generateEntity();
|
||||
request = Mockito.mock(HttpServletRequest.class);
|
||||
Mockito.when(request.getRemoteUser())
|
||||
.thenReturn(System.getProperty("user.name"));
|
||||
logWebService = new LogWebServiceTest();
|
||||
|
||||
}
|
||||
|
||||
@Test public void testGetApp() {
|
||||
|
||||
LogWebService.AppInfo app =
|
||||
logWebService.getApp(request, appId.toString(), null);
|
||||
Assert.assertEquals("RUNNING", app.getAppState().toString());
|
||||
Assert.assertEquals(user, app.getUser());
|
||||
}
|
||||
|
||||
@Test public void testGetContainer() {
|
||||
LogWebService.ContainerInfo container = logWebService
|
||||
.getContainer(request, appId.toString(), cId.toString(), null);
|
||||
Assert.assertEquals(nodeHttpAddress, container.getNodeHttpAddress());
|
||||
}
|
||||
|
||||
class LogWebServiceTest extends LogWebService {
|
||||
|
||||
@Override protected TimelineEntity getEntity(String path,
|
||||
MultivaluedMap<String, String> params) throws IOException {
|
||||
if (path.endsWith(cId.toString())) {
|
||||
return entities.get(cId.toString());
|
||||
} else if (path.endsWith(appId.toString())) {
|
||||
return entities.get(appId.toString());
|
||||
} else {
|
||||
throw new IOException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void generateEntity() {
|
||||
createAppEntities();
|
||||
createContainerEntities();
|
||||
}
|
||||
|
||||
private void createContainerEntities() {
|
||||
TimelineEntity timelineEntity =
|
||||
generateEntity(TimelineEntityType.YARN_APPLICATION.toString(),
|
||||
appId.toString());
|
||||
timelineEntity.addInfo(ApplicationMetricsConstants.USER_ENTITY_INFO, user);
|
||||
timelineEntity
|
||||
.addInfo(ApplicationMetricsConstants.STATE_EVENT_INFO, "RUNNING");
|
||||
entities.put(appId.toString(), timelineEntity);
|
||||
}
|
||||
|
||||
private void createAppEntities() {
|
||||
TimelineEntity timelineEntity =
|
||||
generateEntity(TimelineEntityType.YARN_CONTAINER.toString(),
|
||||
cId.toString());
|
||||
timelineEntity
|
||||
.addInfo(ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO,
|
||||
nodeHttpAddress);
|
||||
entities.put(cId.toString(), timelineEntity);
|
||||
}
|
||||
|
||||
private TimelineEntity generateEntity(String entityType,
|
||||
String entityId) {
|
||||
TimelineEntity timelineEntity = new TimelineEntity();
|
||||
timelineEntity.setId(entityId);
|
||||
timelineEntity.setType(entityType);
|
||||
timelineEntity.setCreatedTime(System.currentTimeMillis());
|
||||
return timelineEntity;
|
||||
}
|
||||
}
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.security.TimelineRea
|
|||
import org.apache.hadoop.yarn.server.timelineservice.reader.security.TimelineReaderWhitelistAuthorizationFilterInitializer;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
|
||||
import org.apache.hadoop.yarn.server.util.timeline.TimelineServerUtils;
|
||||
import org.apache.hadoop.yarn.server.webapp.LogWebService;
|
||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
|
@ -201,7 +202,8 @@ public class TimelineReaderServer extends CompositeService {
|
|||
readerWebServer.addJerseyResourcePackage(
|
||||
TimelineReaderWebServices.class.getPackage().getName() + ";"
|
||||
+ GenericExceptionHandler.class.getPackage().getName() + ";"
|
||||
+ YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
|
||||
+ YarnJacksonJaxbJsonProvider.class.getPackage().getName()+ ";"
|
||||
+ LogWebService.class.getPackage().getName(),
|
||||
"/*");
|
||||
readerWebServer.setAttribute(TIMELINE_READER_MANAGER_ATTR,
|
||||
timelineReaderManager);
|
||||
|
|
Loading…
Reference in New Issue