YARN-6108. Improve AHS webservice to accept NM address as a parameter to get container logs. Contributed by Xuan Gong.

This commit is contained in:
Junping Du 2017-02-07 02:13:14 -08:00
parent 7afe5b1a31
commit 9dbfab1284
5 changed files with 340 additions and 51 deletions

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.webapp.util;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;
import javax.ws.rs.core.MediaType;
import org.apache.hadoop.conf.Configuration;
import org.codehaus.jettison.json.JSONObject;
/**
* This class contains several utility function which could be used to generate
* Restful calls to RM/NM/AHS.
*
*/
public final class YarnWebServiceUtils {
private YarnWebServiceUtils() {}
/**
* Utility function to get NodeInfo by calling RM WebService.
* @param conf the configuration
* @param nodeId the nodeId
* @return a JSONObject which contains the NodeInfo
* @throws ClientHandlerException if there is an error
* processing the response.
* @throws UniformInterfaceException if the response status
* is 204 (No Content).
*/
public static JSONObject getNodeInfoFromRMWebService(Configuration conf,
String nodeId) throws ClientHandlerException,
UniformInterfaceException {
Client webServiceClient = Client.create();
String webAppAddress = WebAppUtils.getRMWebAppURLWithScheme(conf);
WebResource webResource = webServiceClient.resource(webAppAddress);
ClientResponse response = webResource.path("ws").path("v1")
.path("cluster").path("nodes")
.path(nodeId).accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
return response.getEntity(JSONObject.class);
}
}

View File

@ -41,6 +41,8 @@
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -56,6 +58,7 @@
import org.apache.hadoop.yarn.logaggregation.ContainerLogType;
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.server.webapp.WebServices;
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppInfo;
@ -67,15 +70,21 @@
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.UniformInterfaceException;
@Singleton
@Path("/ws/v1/applicationhistory")
public class AHSWebServices extends WebServices {
private static final Log LOG = LogFactory.getLog(AHSWebServices.class);
private static final String NM_DOWNLOAD_URI_STR =
"/ws/v1/node/containers";
private static final Joiner JOINER = Joiner.on("");
@ -224,6 +233,8 @@ public ContainerInfo getContainer(@Context HttpServletRequest req,
* HttpServletResponse
* @param containerIdStr
* The container ID
* @param nmId
* The Node Manager NodeId
* @return
* The log file's name and current file size
*/
@ -233,7 +244,8 @@ public ContainerInfo getContainer(@Context HttpServletRequest req,
public Response getContainerLogsInfo(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam("containerid") String containerIdStr) {
@PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
@QueryParam(YarnWebServiceParams.NM_ID) String nmId) {
ContainerId containerId = null;
init(res);
try {
@ -258,6 +270,17 @@ public Response getContainerLogsInfo(
}
if (isRunningState(appInfo.getAppState())) {
String appOwner = appInfo.getUser();
String nodeHttpAddress = null;
if (nmId != null && !nmId.isEmpty()) {
try {
nodeHttpAddress = getNMWebAddressFromRM(conf, nmId);
} catch (Exception ex) {
if (LOG.isDebugEnabled()) {
LOG.debug(ex.getMessage());
}
}
}
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
ContainerInfo containerInfo;
try {
containerInfo = super.getContainer(
@ -270,9 +293,20 @@ public Response getContainerLogsInfo(
return getContainerLogMeta(appId, appOwner, null,
containerIdStr, true);
}
String nodeHttpAddress = containerInfo.getNodeHttpAddress();
nodeHttpAddress = containerInfo.getNodeHttpAddress();
// make sure nodeHttpAddress is not null and not empty. Otherwise,
// we would only get log meta for aggregated logs instead of
// re-directing the request
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
// return log meta for the aggregated logs if exists.
// It will also return empty log meta for the local logs.
return getContainerLogMeta(appId, appOwner, null,
containerIdStr, true);
}
}
String uri = "/" + containerId.toString() + "/logs";
String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri);
String resURI = JOINER.join(getAbsoluteNMWebAddress(nodeHttpAddress),
NM_DOWNLOAD_URI_STR, uri);
String query = req.getQueryString();
if (query != null && !query.isEmpty()) {
resURI += "?" + query;
@ -302,6 +336,8 @@ public Response getContainerLogsInfo(
* The content type
* @param size
* the size of the log file
* @param nmId
* The Node Manager NodeId
* @return
* The contents of the container's log file
*/
@ -312,11 +348,13 @@ public Response getContainerLogsInfo(
@Unstable
public Response getContainerLogFile(@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam("containerid") String containerIdStr,
@PathParam("filename") String filename,
@QueryParam("format") String format,
@QueryParam("size") String size) {
return getLogs(req, res, containerIdStr, filename, format, size);
@PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
@PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String filename,
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_FORMAT) String format,
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_SIZE) String size,
@QueryParam(YarnWebServiceParams.NM_ID) String nmId) {
return getLogs(req, res, containerIdStr, filename, format,
size, nmId);
}
//TODO: YARN-4993: Refactory ContainersLogsBlock, AggregatedLogsBlock and
@ -329,10 +367,11 @@ public Response getContainerLogFile(@Context HttpServletRequest req,
@Unstable
public Response getLogs(@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam("containerid") String containerIdStr,
@PathParam("filename") String filename,
@QueryParam("format") String format,
@QueryParam("size") String size) {
@PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
@PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String filename,
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_FORMAT) String format,
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_SIZE) String size,
@QueryParam(YarnWebServiceParams.NM_ID) String nmId) {
init(res);
ContainerId containerId;
try {
@ -362,6 +401,17 @@ public Response getLogs(@Context HttpServletRequest req,
}
if (isRunningState(appInfo.getAppState())) {
String nodeHttpAddress = null;
if (nmId != null && !nmId.isEmpty()) {
try {
nodeHttpAddress = getNMWebAddressFromRM(conf, nmId);
} catch (Exception ex) {
if (LOG.isDebugEnabled()) {
LOG.debug(ex.getMessage());
}
}
}
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
ContainerInfo containerInfo;
try {
containerInfo = super.getContainer(
@ -370,12 +420,22 @@ public Response getLogs(@Context HttpServletRequest req,
containerId.toString());
} catch (Exception ex) {
// output the aggregated logs
return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
filename, format, length, true);
return sendStreamOutputResponse(appId, appOwner, null,
containerIdStr, filename, format, length, true);
}
nodeHttpAddress = containerInfo.getNodeHttpAddress();
// make sure nodeHttpAddress is not null and not empty. Otherwise,
// we would only get aggregated logs instead of re-directing the
// request
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
// output the aggregated logs
return sendStreamOutputResponse(appId, appOwner, null,
containerIdStr, filename, format, length, true);
}
}
String nodeHttpAddress = containerInfo.getNodeHttpAddress();
String uri = "/" + containerId.toString() + "/logs/" + filename;
String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri);
String resURI = JOINER.join(getAbsoluteNMWebAddress(nodeHttpAddress),
NM_DOWNLOAD_URI_STR, uri);
String query = req.getQueryString();
if (query != null && !query.isEmpty()) {
resURI += "?" + query;
@ -482,7 +542,7 @@ private Response getContainerLogMeta(ApplicationId appId, String appOwner,
.getContainerLogMetaFromRemoteFS(conf, appId, containerIdStr,
nodeId, appOwner);
if (containerLogMeta.isEmpty()) {
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
throw new NotFoundException(
"Can not get log meta for container: " + containerIdStr);
}
List<ContainerLogsInfo> containersLogsInfo = new ArrayList<>();
@ -518,4 +578,23 @@ public static String getNoRedirectWarning() {
+ "re-direct the request to related NodeManager "
+ "for local container logs.";
}
private String getAbsoluteNMWebAddress(String nmWebAddress) {
if (nmWebAddress.contains(WebAppUtils.HTTP_PREFIX) ||
nmWebAddress.contains(WebAppUtils.HTTPS_PREFIX)) {
return nmWebAddress;
}
return WebAppUtils.getHttpSchemePrefix(conf) + nmWebAddress;
}
@VisibleForTesting
@Private
public String getNMWebAddressFromRM(Configuration configuration,
String nodeId) throws ClientHandlerException,
UniformInterfaceException, JSONException {
JSONObject nodeInfo = YarnWebServiceUtils.getNodeInfoFromRMWebService(
configuration, nodeId).getJSONObject("node");
return nodeInfo.has("nodeHTTPAddress") ?
nodeInfo.getString("nodeHTTPAddress") : null;
}
}

View File

@ -62,6 +62,7 @@
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@ -84,6 +85,7 @@
import com.google.inject.Guice;
import com.google.inject.Singleton;
import com.google.inject.servlet.ServletModule;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.GenericType;
@ -103,6 +105,8 @@ public class TestAHSWebServices extends JerseyTestBase {
private static FileSystem fs;
private static final String remoteLogRootDir = "target/logs/";
private static final String rootLogDir = "target/LocalLogs";
private static final String NM_WEBADDRESS = "test-nm-web-address:9999";
private static final String NM_ID = "test:1234";
@BeforeClass
public static void setupClass() throws Exception {
@ -130,7 +134,17 @@ protected void serviceStart() throws Exception {
};
historyClientService.init(conf);
historyClientService.start();
ahsWebservice = new AHSWebServices(historyClientService, conf);
ahsWebservice = new AHSWebServices(historyClientService, conf) {
@Override
public String getNMWebAddressFromRM(Configuration configuration,
String nodeId) throws ClientHandlerException,
UniformInterfaceException, JSONException {
if (nodeId.equals(NM_ID)) {
return NM_WEBADDRESS;
}
return null;
}
};
fs = FileSystem.get(conf);
GuiceServletConfig.setInjector(
Guice.createInjector(new WebServletModule()));
@ -676,6 +690,22 @@ public void testContainerLogsForRunningApps() throws Exception {
assertTrue(redirectURL.contains("/logs/" + fileName));
assertTrue(redirectURL.contains("user.name=" + user));
// If we specify NM id, we would re-direct the request
// to this NM's Web Address.
requestURI = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1.toString()).path(fileName)
.queryParam("user.name", user)
.queryParam(YarnWebServiceParams.NM_ID, NM_ID)
.getURI();
redirectURL = getRedirectURL(requestURI.toString());
assertTrue(redirectURL != null);
assertTrue(redirectURL.contains(NM_WEBADDRESS));
assertTrue(redirectURL.contains("ws/v1/node/containers"));
assertTrue(redirectURL.contains(containerId1.toString()));
assertTrue(redirectURL.contains("/logs/" + fileName));
assertTrue(redirectURL.contains("user.name=" + user));
// Test with new API
requestURI = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
@ -689,6 +719,20 @@ public void testContainerLogsForRunningApps() throws Exception {
assertTrue(redirectURL.contains("/logs/" + fileName));
assertTrue(redirectURL.contains("user.name=" + user));
requestURI = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId1.toString()).path("logs").path(fileName)
.queryParam("user.name", user)
.queryParam(YarnWebServiceParams.NM_ID, NM_ID)
.getURI();
redirectURL = getRedirectURL(requestURI.toString());
assertTrue(redirectURL != null);
assertTrue(redirectURL.contains(NM_WEBADDRESS));
assertTrue(redirectURL.contains("ws/v1/node/containers"));
assertTrue(redirectURL.contains(containerId1.toString()));
assertTrue(redirectURL.contains("/logs/" + fileName));
assertTrue(redirectURL.contains("user.name=" + user));
// If we can not container information from ATS, we would try to
// get aggregated log from remote FileSystem.
ContainerId containerId1000 = ContainerId.newContainerId(
@ -710,6 +754,21 @@ public void testContainerLogsForRunningApps() throws Exception {
// the warning message.
assertTrue(responseText.contains("LogType: " + ContainerLogType.LOCAL));
assertTrue(responseText.contains(AHSWebServices.getNoRedirectWarning()));
// If we can not container information from ATS, and we specify the NM id,
// but we can not get nm web address, we would still try to
// get aggregated log from remote FileSystem.
response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1000.toString()).path(fileName)
.queryParam(YarnWebServiceParams.NM_ID, "invalid-nm:1234")
.queryParam("user.name", user)
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertTrue(responseText.contains(content));
assertTrue(responseText.contains("LogType: " + ContainerLogType.LOCAL));
assertTrue(responseText.contains(AHSWebServices.getNoRedirectWarning()));
}
@Test(timeout = 10000)
@ -721,21 +780,38 @@ public void testContainerLogsMetaForRunningApps() throws Exception {
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
WebResource r = resource();
// If we can get Container information from ATS, we re-direct the request
// to the nodemamager who runs the container.
// If we specify the NMID, we re-direct the request by using
// the NM's web address
URI requestURI = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId1.toString()).path("logs")
.queryParam("user.name", user).getURI();
.queryParam("user.name", user)
.queryParam(YarnWebServiceParams.NM_ID, NM_ID)
.getURI();
String redirectURL = getRedirectURL(requestURI.toString());
assertTrue(redirectURL != null);
assertTrue(redirectURL.contains(NM_WEBADDRESS));
assertTrue(redirectURL.contains("ws/v1/node/containers"));
assertTrue(redirectURL.contains(containerId1.toString()));
assertTrue(redirectURL.contains("/logs"));
// If we do not specify the NodeId but can get Container information
// from ATS, we re-direct the request to the node manager
// who runs the container.
requestURI = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId1.toString()).path("logs")
.queryParam("user.name", user).getURI();
redirectURL = getRedirectURL(requestURI.toString());
assertTrue(redirectURL != null);
assertTrue(redirectURL.contains("test:1234"));
assertTrue(redirectURL.contains("ws/v1/node/containers"));
assertTrue(redirectURL.contains(containerId1.toString()));
assertTrue(redirectURL.contains("/logs"));
// If we can not container information from ATS, we would try to
// get aggregated log meta from remote FileSystem.
// If we can not container information from ATS,
// and not specify nodeId,
// we would try to get aggregated log meta from remote FileSystem.
ContainerId containerId1000 = ContainerId.newContainerId(
appAttemptId, 1000);
String fileName = "syslog";
@ -765,6 +841,32 @@ public void testContainerLogsMetaForRunningApps() throws Exception {
assertEquals(logInfo.getLogType(), ContainerLogType.LOCAL.toString());
}
}
// If we can not container information from ATS,
// and we specify NM id, but can not find NM WebAddress for this nodeId,
// we would still try to get aggregated log meta from remote FileSystem.
response = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId1000.toString()).path("logs")
.queryParam(YarnWebServiceParams.NM_ID, "invalid-nm:1234")
.queryParam("user.name", user)
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
responseText = response.getEntity(new GenericType<
List<ContainerLogsInfo>>(){});
assertTrue(responseText.size() == 2);
for (ContainerLogsInfo logInfo : responseText) {
if(logInfo.getLogType().equals(ContainerLogType.AGGREGATED.toString())) {
List<PerContainerLogFileInfo> logMeta = logInfo
.getContainerLogsInfo();
assertTrue(logMeta.size() == 1);
assertEquals(logMeta.get(0).getFileName(), fileName);
assertEquals(logMeta.get(0).getFileSize(), String.valueOf(
content.length()));
} else {
assertEquals(logInfo.getLogType(), ContainerLogType.LOCAL.toString());
}
}
}
@Test(timeout = 10000)

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.webapp;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Common web service parameters which could be used in
* RM/NM/AHS WebService.
*
*/
@InterfaceAudience.LimitedPrivate({"YARN"})
public interface YarnWebServiceParams {
// the params used in container-log related web services
String CONTAINER_ID = "containerid";
String CONTAINER_LOG_FILE_NAME = "filename";
String RESPONSE_CONTENT_FORMAT = "format";
String RESPONSE_CONTENT_SIZE = "size";
String NM_ID = "nm.id";
}

View File

@ -68,6 +68,7 @@
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMContainerLogsInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.BadRequestException;
@ -235,7 +236,7 @@ public ContainerInfo getNodeContainer(@javax.ws.rs.core.Context
public Response getContainerLogsInfo(
@javax.ws.rs.core.Context HttpServletRequest hsr,
@javax.ws.rs.core.Context HttpServletResponse res,
@PathParam("containerid") String containerIdStr) {
@PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr) {
ContainerId containerId = null;
init();
try {
@ -307,10 +308,14 @@ public Response getContainerLogsInfo(
@Public
@Unstable
public Response getContainerLogFile(
@PathParam("containerid") String containerIdStr,
@PathParam("filename") String filename,
@QueryParam("format") String format,
@QueryParam("size") String size) {
@PathParam(YarnWebServiceParams.CONTAINER_ID)
final String containerIdStr,
@PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME)
String filename,
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_FORMAT)
String format,
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_SIZE)
String size) {
return getLogs(containerIdStr, filename, format, size);
}
@ -338,10 +343,14 @@ public Response getContainerLogFile(
@Public
@Unstable
public Response getLogs(
@PathParam("containerid") final String containerIdStr,
@PathParam("filename") String filename,
@QueryParam("format") String format,
@QueryParam("size") String size) {
@PathParam(YarnWebServiceParams.CONTAINER_ID)
final String containerIdStr,
@PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME)
String filename,
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_FORMAT)
String format,
@QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_SIZE)
String size) {
ContainerId tempContainerId;
try {
tempContainerId = ContainerId.fromString(containerIdStr);