From 62d329cac08b214327081a97835653081aa992d1 Mon Sep 17 00:00:00 2001 From: Vrushali C Date: Thu, 11 Oct 2018 16:26:07 -0700 Subject: [PATCH] YARN-5742 Serve aggregated logs of historical apps from timeline service. Contributed by Rohith Sharma KS (cherry picked from commit 8d1981806feb8278966c02a9eff42d72541bb35e) --- .../webapp/AHSWebServices.java | 250 ++------- .../webapp/TestAHSWebServices.java | 7 +- .../yarn/server/webapp/LogWebService.java | 506 ++++++++++++++++++ .../server/webapp/LogWebServiceUtils.java | 258 +++++++++ .../server/webapp/YarnWebServiceParams.java | 1 + .../yarn/server/webapp/TestLogWebService.java | 126 +++++ .../reader/TimelineReaderServer.java | 4 +- 7 files changed, 945 insertions(+), 207 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebServiceUtils.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/webapp/TestLogWebService.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java index 9aa71a7037d..d94605fa05b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java @@ -18,13 +18,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.Charset; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; -import java.util.List; import java.util.Set; import javax.servlet.http.HttpServletRequest; @@ -35,15 +29,16 @@ import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; -import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; -import javax.ws.rs.core.GenericEntity; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Response.Status; -import org.apache.hadoop.classification.InterfaceAudience.Private; + +import com.google.common.annotations.VisibleForTesting; +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.UniformInterfaceException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -54,10 +49,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.ApplicationBaseProtocol; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; -import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; -import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; -import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; +import org.apache.hadoop.yarn.server.webapp.LogWebServiceUtils; import org.apache.hadoop.yarn.server.webapp.WebServices; import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; @@ -65,21 +58,14 @@ import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo; import org.apache.hadoop.yarn.server.webapp.dao.AppInfo; import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; -import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.NotFoundException; -import org.apache.hadoop.yarn.webapp.util.WebAppUtils; -import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.inject.Inject; import com.google.inject.Singleton; -import com.sun.jersey.api.client.ClientHandlerException; -import com.sun.jersey.api.client.UniformInterfaceException; +import org.codehaus.jettison.json.JSONException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -271,15 +257,18 @@ public class AHSWebServices extends WebServices { appInfo = super.getApp(req, res, appId.toString()); } catch (Exception ex) { // directly find logs from HDFS. - return getContainerLogMeta(appId, null, null, containerIdStr, false); + return LogWebServiceUtils + .getContainerLogMeta(factory, appId, null, null, containerIdStr, + false); } // if the application finishes, directly find logs // from HDFS. - if (isFinishedState(appInfo.getAppState())) { - return getContainerLogMeta(appId, null, null, - containerIdStr, false); + if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) { + return LogWebServiceUtils + .getContainerLogMeta(factory, appId, null, null, containerIdStr, + false); } - if (isRunningState(appInfo.getAppState())) { + if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) { String appOwner = appInfo.getUser(); String nodeHttpAddress = null; if (nmId != null && !nmId.isEmpty()) { @@ -301,8 +290,9 @@ public class AHSWebServices extends WebServices { } catch (Exception ex) { // return log meta for the aggregated logs if exists. // It will also return empty log meta for the local logs. - return getContainerLogMeta(appId, appOwner, null, - containerIdStr, true); + return LogWebServiceUtils + .getContainerLogMeta(factory, appId, appOwner, null, + containerIdStr, true); } nodeHttpAddress = containerInfo.getNodeHttpAddress(); // make sure nodeHttpAddress is not null and not empty. Otherwise, @@ -314,12 +304,14 @@ public class AHSWebServices extends WebServices { // It will also return empty log meta for the local logs. // If this is the redirect request from NM, we should not // re-direct the request back. Simply output the aggregated log meta. - return getContainerLogMeta(appId, appOwner, null, - containerIdStr, true); + return LogWebServiceUtils + .getContainerLogMeta(factory, appId, appOwner, null, + containerIdStr, true); } } String uri = "/" + containerId.toString() + "/logs"; - String resURI = JOINER.join(getAbsoluteNMWebAddress(nodeHttpAddress), + String resURI = JOINER.join( + LogWebServiceUtils.getAbsoluteNMWebAddress(conf, nodeHttpAddress), NM_DOWNLOAD_URI_STR, uri); String query = req.getQueryString(); if (query != null && !query.isEmpty()) { @@ -397,11 +389,11 @@ public class AHSWebServices extends WebServices { try { containerId = ContainerId.fromString(containerIdStr); } catch (IllegalArgumentException ex) { - return createBadResponse(Status.NOT_FOUND, + return LogWebServiceUtils.createBadResponse(Status.NOT_FOUND, "Invalid ContainerId: " + containerIdStr); } - final long length = parseLongParam(size); + final long length = LogWebServiceUtils.parseLongParam(size); ApplicationId appId = containerId.getApplicationAttemptId() .getApplicationId(); @@ -410,17 +402,19 @@ public class AHSWebServices extends WebServices { appInfo = super.getApp(req, res, appId.toString()); } catch (Exception ex) { // directly find logs from HDFS. - return sendStreamOutputResponse(appId, null, null, containerIdStr, - filename, format, length, false); + return LogWebServiceUtils + .sendStreamOutputResponse(factory, appId, null, null, containerIdStr, + filename, format, length, false); } String appOwner = appInfo.getUser(); - if (isFinishedState(appInfo.getAppState())) { + if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) { // directly find logs from HDFS. - return sendStreamOutputResponse(appId, appOwner, null, containerIdStr, - filename, format, length, false); + return LogWebServiceUtils + .sendStreamOutputResponse(factory, appId, appOwner, null, + containerIdStr, filename, format, length, false); } - if (isRunningState(appInfo.getAppState())) { + if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) { String nodeHttpAddress = null; if (nmId != null && !nmId.isEmpty()) { try { @@ -440,8 +434,9 @@ public class AHSWebServices extends WebServices { containerId.toString()); } catch (Exception ex) { // output the aggregated logs - return sendStreamOutputResponse(appId, appOwner, null, - containerIdStr, filename, format, length, true); + return LogWebServiceUtils + .sendStreamOutputResponse(factory, appId, appOwner, null, + containerIdStr, filename, format, length, true); } nodeHttpAddress = containerInfo.getNodeHttpAddress(); // make sure nodeHttpAddress is not null and not empty. Otherwise, @@ -452,12 +447,14 @@ public class AHSWebServices extends WebServices { if (nodeHttpAddress == null || nodeHttpAddress.isEmpty() || redirected_from_node) { // output the aggregated logs - return sendStreamOutputResponse(appId, appOwner, null, - containerIdStr, filename, format, length, true); + return LogWebServiceUtils + .sendStreamOutputResponse(factory, appId, appOwner, null, + containerIdStr, filename, format, length, true); } } String uri = "/" + containerId.toString() + "/logs/" + filename; - String resURI = JOINER.join(getAbsoluteNMWebAddress(nodeHttpAddress), + String resURI = JOINER.join( + LogWebServiceUtils.getAbsoluteNMWebAddress(conf, nodeHttpAddress), NM_DOWNLOAD_URI_STR, uri); String query = req.getQueryString(); if (query != null && !query.isEmpty()) { @@ -468,170 +465,15 @@ public class AHSWebServices extends WebServices { response.header("Location", resURI); return response.build(); } else { - return createBadResponse(Status.NOT_FOUND, + return LogWebServiceUtils.createBadResponse(Status.NOT_FOUND, "The application is not at Running or Finished State."); } } - private boolean isRunningState(YarnApplicationState appState) { - return appState == YarnApplicationState.RUNNING; - } - - private boolean isFinishedState(YarnApplicationState appState) { - return appState == YarnApplicationState.FINISHED - || appState == YarnApplicationState.FAILED - || appState == YarnApplicationState.KILLED; - } - - private Response createBadResponse(Status status, String errMessage) { - Response response = Response.status(status) - .entity(DOT_JOINER.join(status.toString(), errMessage)).build(); - return response; - } - - private Response sendStreamOutputResponse(ApplicationId appId, - String appOwner, String nodeId, String containerIdStr, - String fileName, String format, long bytes, - boolean printEmptyLocalContainerLog) { - String contentType = WebAppUtils.getDefaultLogContentType(); - if (format != null && !format.isEmpty()) { - contentType = WebAppUtils.getSupportedLogContentType(format); - if (contentType == null) { - String errorMessage = "The valid values for the parameter : format " - + "are " + WebAppUtils.listSupportedLogContentType(); - return Response.status(Status.BAD_REQUEST).entity(errorMessage) - .build(); - } - } - StreamingOutput stream = null; - try { - stream = getStreamingOutput(appId, appOwner, nodeId, - containerIdStr, fileName, bytes, printEmptyLocalContainerLog); - } catch (Exception ex) { - return createBadResponse(Status.INTERNAL_SERVER_ERROR, - ex.getMessage()); - } - ResponseBuilder response = Response.ok(stream); - response.header("Content-Type", contentType); - // Sending the X-Content-Type-Options response header with the value - // nosniff will prevent Internet Explorer from MIME-sniffing a response - // away from the declared content-type. - response.header("X-Content-Type-Options", "nosniff"); - return response.build(); - } - - private StreamingOutput getStreamingOutput(final ApplicationId appId, - final String appOwner, final String nodeId, final String containerIdStr, - final String logFile, final long bytes, - final boolean printEmptyLocalContainerLog) throws IOException{ - StreamingOutput stream = new StreamingOutput() { - - @Override - public void write(OutputStream os) throws IOException, - WebApplicationException { - ContainerLogsRequest request = new ContainerLogsRequest(); - request.setAppId(appId); - request.setAppOwner(appOwner); - request.setContainerId(containerIdStr); - request.setBytes(bytes); - request.setNodeId(nodeId); - Set logTypes = new HashSet<>(); - logTypes.add(logFile); - request.setLogTypes(logTypes); - boolean findLogs = factory.getFileControllerForRead(appId, appOwner) - .readAggregatedLogs(request, os); - if (!findLogs) { - os.write(("Can not find logs for container:" - + containerIdStr).getBytes(Charset.forName("UTF-8"))); - } else { - if (printEmptyLocalContainerLog) { - StringBuilder sb = new StringBuilder(); - sb.append(containerIdStr + "\n"); - sb.append("LogAggregationType: " - + ContainerLogAggregationType.LOCAL + "\n"); - sb.append("LogContents:\n"); - sb.append(getNoRedirectWarning() + "\n"); - os.write(sb.toString().getBytes(Charset.forName("UTF-8"))); - } - } - } - }; - return stream; - } - - private long parseLongParam(String bytes) { - if (bytes == null || bytes.isEmpty()) { - return Long.MAX_VALUE; - } - return Long.parseLong(bytes); - } - - private Response getContainerLogMeta(ApplicationId appId, String appOwner, - final String nodeId, final String containerIdStr, - boolean emptyLocalContainerLogMeta) { - try { - ContainerLogsRequest request = new ContainerLogsRequest(); - request.setAppId(appId); - request.setAppOwner(appOwner); - request.setContainerId(containerIdStr); - request.setNodeId(nodeId); - List containerLogMeta = factory - .getFileControllerForRead(appId, appOwner) - .readAggregatedLogsMeta(request); - if (containerLogMeta.isEmpty()) { - throw new NotFoundException( - "Can not get log meta for container: " + containerIdStr); - } - List containersLogsInfo = new ArrayList<>(); - for (ContainerLogMeta meta : containerLogMeta) { - ContainerLogsInfo logInfo = new ContainerLogsInfo(meta, - ContainerLogAggregationType.AGGREGATED); - containersLogsInfo.add(logInfo); - } - if (emptyLocalContainerLogMeta) { - ContainerLogMeta emptyMeta = new ContainerLogMeta( - containerIdStr, "N/A"); - ContainerLogsInfo empty = new ContainerLogsInfo(emptyMeta, - ContainerLogAggregationType.LOCAL); - containersLogsInfo.add(empty); - } - GenericEntity> meta = new GenericEntity>(containersLogsInfo){}; - ResponseBuilder response = Response.ok(meta); - // Sending the X-Content-Type-Options response header with the value - // nosniff will prevent Internet Explorer from MIME-sniffing a response - // away from the declared content-type. - response.header("X-Content-Type-Options", "nosniff"); - return response.build(); - } catch (Exception ex) { - throw new WebApplicationException(ex); - } - } - - @Private - @VisibleForTesting - public static String getNoRedirectWarning() { - return "We do not have NodeManager web address, so we can not " - + "re-direct the request to related NodeManager " - + "for local container logs."; - } - - private String getAbsoluteNMWebAddress(String nmWebAddress) { - if (nmWebAddress.contains(WebAppUtils.HTTP_PREFIX) || - nmWebAddress.contains(WebAppUtils.HTTPS_PREFIX)) { - return nmWebAddress; - } - return WebAppUtils.getHttpSchemePrefix(conf) + nmWebAddress; - } - - @VisibleForTesting - @Private + @VisibleForTesting @InterfaceAudience.Private public String getNMWebAddressFromRM(Configuration configuration, - String nodeId) throws ClientHandlerException, - UniformInterfaceException, JSONException { - JSONObject nodeInfo = YarnWebServiceUtils.getNodeInfoFromRMWebService( - configuration, nodeId).getJSONObject("node"); - return nodeInfo.has("nodeHTTPAddress") ? - nodeInfo.getString("nodeHTTPAddress") : null; + String nodeId) + throws ClientHandlerException, UniformInterfaceException, JSONException { + return LogWebServiceUtils.getNMWebAddressFromRM(configuration, nodeId); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java index a4f56ffe512..e72714b0f4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.timeline.TimelineDataManager; import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; +import org.apache.hadoop.yarn.server.webapp.LogWebServiceUtils; import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams; import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; @@ -774,7 +775,8 @@ public class TestAHSWebServices extends JerseyTestBase { // the warning message. assertTrue(responseText.contains("LogAggregationType: " + ContainerLogAggregationType.LOCAL)); - assertTrue(responseText.contains(AHSWebServices.getNoRedirectWarning())); + assertTrue( + responseText.contains(LogWebServiceUtils.getNoRedirectWarning())); // If we can not container information from ATS, and we specify the NM id, // but we can not get nm web address, we would still try to @@ -790,7 +792,8 @@ public class TestAHSWebServices extends JerseyTestBase { assertTrue(responseText.contains(content)); assertTrue(responseText.contains("LogAggregationType: " + ContainerLogAggregationType.LOCAL)); - assertTrue(responseText.contains(AHSWebServices.getNoRedirectWarning())); + assertTrue( + responseText.contains(LogWebServiceUtils.getNoRedirectWarning())); // If this is the redirect request, we would not re-direct the request // back and get the aggregated logs. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java new file mode 100644 index 00000000000..246ee9cc6d4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java @@ -0,0 +1,506 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.webapp; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.inject.Singleton; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; +import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.JettyUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.client.AuthenticatedURL; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.webapp.BadRequestException; +import org.apache.hadoop.yarn.webapp.NotFoundException; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.security.PrivilegedExceptionAction; + +/** + * Support only ATSv2 client only. + */ +@Singleton @Path("/ws/v2/applicationlog") public class LogWebService { + private static final Logger LOG = + LoggerFactory.getLogger(LogWebService.class); + private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/"; + private static final String NM_DOWNLOAD_URI_STR = "/ws/v1/node/containers"; + private static final Joiner JOINER = Joiner.on(""); + private static Configuration yarnConf = new YarnConfiguration(); + private static LogAggregationFileControllerFactory factory; + private static String base; + private static String defaultClusterid; + private volatile Client webTimelineClient; + + static { + init(); + } + + // initialize all the common resources - order is important + private static void init() { + factory = new LogAggregationFileControllerFactory(yarnConf); + base = JOINER.join(WebAppUtils.getHttpSchemePrefix(yarnConf), + WebAppUtils.getTimelineReaderWebAppURLWithoutScheme(yarnConf), + RESOURCE_URI_STR_V2); + defaultClusterid = yarnConf.get(YarnConfiguration.RM_CLUSTER_ID, + YarnConfiguration.DEFAULT_RM_CLUSTER_ID); + LOG.info("Initialized LogWeService with clusterid " + defaultClusterid + + " for URI: " + base); + } + + private Client createTimelineWebClient() { + ClientConfig cfg = new DefaultClientConfig(); + cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class); + Client client = new Client( + new URLConnectionClientHandler(new HttpURLConnectionFactory() { + @Override public HttpURLConnection getHttpURLConnection(URL url) + throws IOException { + AuthenticatedURL.Token token = new AuthenticatedURL.Token(); + HttpURLConnection conn = null; + try { + conn = new AuthenticatedURL().openConnection(url, token); + LOG.info("LogWeService:Connecetion created."); + } catch (AuthenticationException e) { + throw new IOException(e); + } + return conn; + } + }), cfg); + + return client; + } + + private void initForReadableEndpoints(HttpServletResponse response) { + // clear content type + response.setContentType(null); + } + + /** + * Returns log file's name as well as current file size for a container. + * + * @param req HttpServletRequest + * @param res HttpServletResponse + * @param containerIdStr The container ID + * @param nmId The Node Manager NodeId + * @param redirectedFromNode Whether this is a redirected request from NM + * @return The log file's name and current file size + */ + @GET @Path("/containers/{containerid}/logs") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response getContainerLogsInfo(@Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr, + @QueryParam(YarnWebServiceParams.NM_ID) String nmId, + @QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE) + @DefaultValue("false") boolean redirectedFromNode, + @QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) { + ContainerId containerId = null; + initForReadableEndpoints(res); + try { + containerId = ContainerId.fromString(containerIdStr); + } catch (IllegalArgumentException e) { + throw new BadRequestException("invalid container id, " + containerIdStr); + } + + ApplicationId appId = + containerId.getApplicationAttemptId().getApplicationId(); + AppInfo appInfo; + try { + appInfo = getApp(req, appId.toString(), clusterId); + } catch (Exception ex) { + // directly find logs from HDFS. + return LogWebServiceUtils + .getContainerLogMeta(factory, appId, null, null, containerIdStr, + false); + } + // if the application finishes, directly find logs + // from HDFS. + if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) { + return LogWebServiceUtils + .getContainerLogMeta(factory, appId, null, null, containerIdStr, + false); + } + if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) { + String appOwner = appInfo.getUser(); + String nodeHttpAddress = null; + if (nmId != null && !nmId.isEmpty()) { + try { + nodeHttpAddress = + LogWebServiceUtils.getNMWebAddressFromRM(yarnConf, nmId); + } catch (Exception ex) { + if (LOG.isDebugEnabled()) { + LOG.debug(ex.getMessage()); + } + } + } + if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) { + ContainerInfo containerInfo; + try { + containerInfo = + getContainer(req, appId.toString(), containerId.toString(), + clusterId); + } catch (Exception ex) { + // return log meta for the aggregated logs if exists. + // It will also return empty log meta for the local logs. + return LogWebServiceUtils + .getContainerLogMeta(factory, appId, appOwner, null, + containerIdStr, true); + } + nodeHttpAddress = containerInfo.getNodeHttpAddress(); + // make sure nodeHttpAddress is not null and not empty. Otherwise, + // we would only get log meta for aggregated logs instead of + // re-directing the request + if (nodeHttpAddress == null || nodeHttpAddress.isEmpty() + || redirectedFromNode) { + // return log meta for the aggregated logs if exists. + // It will also return empty log meta for the local logs. + // If this is the redirect request from NM, we should not + // re-direct the request back. Simply output the aggregated log meta. + return LogWebServiceUtils + .getContainerLogMeta(factory, appId, appOwner, null, + containerIdStr, true); + } + } + String uri = "/" + containerId.toString() + "/logs"; + String resURI = JOINER.join( + LogWebServiceUtils.getAbsoluteNMWebAddress(yarnConf, nodeHttpAddress), + NM_DOWNLOAD_URI_STR, uri); + String query = req.getQueryString(); + if (query != null && !query.isEmpty()) { + resURI += "?" + query; + } + Response.ResponseBuilder response = + Response.status(HttpServletResponse.SC_TEMPORARY_REDIRECT); + response.header("Location", resURI); + return response.build(); + } else { + throw new NotFoundException( + "The application is not at Running or Finished State."); + } + } + + protected ContainerInfo getContainer(HttpServletRequest req, String appId, + String containerId, String clusterId) { + UserGroupInformation callerUGI = LogWebServiceUtils.getUser(req); + String cId = clusterId != null ? clusterId : defaultClusterid; + MultivaluedMap params = new MultivaluedMapImpl(); + params.add("fields", "INFO"); + String path = JOINER.join("clusters/", cId, "/apps/", appId, "/entities/", + TimelineEntityType.YARN_CONTAINER.toString(), "/", containerId); + TimelineEntity conEntity = null; + try { + if (callerUGI == null) { + conEntity = getEntity(path, params); + } else { + setUserName(params, callerUGI.getShortUserName()); + conEntity = + callerUGI.doAs(new PrivilegedExceptionAction() { + @Override public TimelineEntity run() throws Exception { + return getEntity(path, params); + } + }); + } + } catch (Exception e) { + LogWebServiceUtils.rewrapAndThrowException(e); + } + if (conEntity == null) { + return null; + } + String nodeHttpAddress = (String) conEntity.getInfo() + .get(ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO); + + ContainerInfo info = new ContainerInfo(nodeHttpAddress); + return info; + } + + protected AppInfo getApp(HttpServletRequest req, String appId, + String clusterId) { + UserGroupInformation callerUGI = LogWebServiceUtils.getUser(req); + + String cId = clusterId != null ? clusterId : defaultClusterid; + MultivaluedMap params = new MultivaluedMapImpl(); + params.add("fields", "INFO"); + String path = JOINER.join("clusters/", cId, "/apps/", appId); + TimelineEntity appEntity = null; + + try { + if (callerUGI == null) { + appEntity = getEntity(path, params); + } else { + setUserName(params, callerUGI.getShortUserName()); + appEntity = + callerUGI.doAs(new PrivilegedExceptionAction() { + @Override public TimelineEntity run() throws Exception { + return getEntity(path, params); + } + }); + } + } catch (Exception e) { + LogWebServiceUtils.rewrapAndThrowException(e); + } + + if (appEntity == null) { + return null; + } + String appOwner = (String) appEntity.getInfo() + .get(ApplicationMetricsConstants.USER_ENTITY_INFO); + String state = (String) appEntity.getInfo() + .get(ApplicationMetricsConstants.STATE_EVENT_INFO); + YarnApplicationState appState = YarnApplicationState.valueOf(state); + AppInfo info = new AppInfo(appState, appOwner); + return info; + } + + /** + * Returns the contents of a container's log file in plain text. + * + * @param req HttpServletRequest + * @param res HttpServletResponse + * @param containerIdStr The container ID + * @param filename The name of the log file + * @param format The content type + * @param size the size of the log file + * @param nmId The Node Manager NodeId + * @param redirectedFromNode Whether this is the redirect request from NM + * @return The contents of the container's log file + */ + @GET @Path("/containers/{containerid}/logs/{filename}") + @Produces({ MediaType.TEXT_PLAIN }) @InterfaceAudience.Public + @InterfaceStability.Unstable public Response getContainerLogFile( + @Context HttpServletRequest req, @Context HttpServletResponse res, + @PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr, + @PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String filename, + @QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_FORMAT) String format, + @QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_SIZE) String size, + @QueryParam(YarnWebServiceParams.NM_ID) String nmId, + @QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE) + boolean redirectedFromNode, + @QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) { + return getLogs(req, res, containerIdStr, filename, format, size, nmId, + redirectedFromNode, clusterId); + } + + //TODO: YARN-4993: Refactory ContainersLogsBlock, AggregatedLogsBlock and + // container log webservice introduced in AHS to minimize + // the duplication. + @GET @Path("/containerlogs/{containerid}/{filename}") + @Produces({ MediaType.TEXT_PLAIN + "; " + JettyUtils.UTF_8 }) + @InterfaceAudience.Public @InterfaceStability.Unstable + public Response getLogs(@Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr, + @PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String filename, + @QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_FORMAT) String format, + @QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_SIZE) String size, + @QueryParam(YarnWebServiceParams.NM_ID) String nmId, + @QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE) + @DefaultValue("false") boolean redirectedFromNode, + @QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) { + initForReadableEndpoints(res); + ContainerId containerId; + try { + containerId = ContainerId.fromString(containerIdStr); + } catch (IllegalArgumentException ex) { + return LogWebServiceUtils.createBadResponse(Response.Status.NOT_FOUND, + "Invalid ContainerId: " + containerIdStr); + } + + final long length = LogWebServiceUtils.parseLongParam(size); + + ApplicationId appId = + containerId.getApplicationAttemptId().getApplicationId(); + AppInfo appInfo; + try { + appInfo = getApp(req, appId.toString(), clusterId); + } catch (Exception ex) { + // directly find logs from HDFS. + return LogWebServiceUtils + .sendStreamOutputResponse(factory, appId, null, null, containerIdStr, + filename, format, length, false); + } + String appOwner = appInfo.getUser(); + if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) { + // directly find logs from HDFS. + return LogWebServiceUtils + .sendStreamOutputResponse(factory, appId, appOwner, null, + containerIdStr, filename, format, length, false); + } + + if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) { + String nodeHttpAddress = null; + if (nmId != null && !nmId.isEmpty()) { + try { + nodeHttpAddress = + LogWebServiceUtils.getNMWebAddressFromRM(yarnConf, nmId); + } catch (Exception ex) { + if (LOG.isDebugEnabled()) { + LOG.debug(ex.getMessage()); + } + } + } + if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) { + ContainerInfo containerInfo; + try { + containerInfo = + getContainer(req, appId.toString(), containerId.toString(), + clusterId); + } catch (Exception ex) { + // output the aggregated logs + return LogWebServiceUtils + .sendStreamOutputResponse(factory, appId, appOwner, null, + containerIdStr, filename, format, length, true); + } + nodeHttpAddress = containerInfo.getNodeHttpAddress(); + // make sure nodeHttpAddress is not null and not empty. Otherwise, + // we would only get aggregated logs instead of re-directing the + // request. + // If this is the redirect request from NM, we should not re-direct the + // request back. Simply output the aggregated logs. + if (nodeHttpAddress == null || nodeHttpAddress.isEmpty() + || redirectedFromNode) { + // output the aggregated logs + return LogWebServiceUtils + .sendStreamOutputResponse(factory, appId, appOwner, null, + containerIdStr, filename, format, length, true); + } + } + String uri = "/" + containerId.toString() + "/logs/" + filename; + String resURI = JOINER.join( + LogWebServiceUtils.getAbsoluteNMWebAddress(yarnConf, nodeHttpAddress), + NM_DOWNLOAD_URI_STR, uri); + String query = req.getQueryString(); + if (query != null && !query.isEmpty()) { + resURI += "?" + query; + } + Response.ResponseBuilder response = + Response.status(HttpServletResponse.SC_TEMPORARY_REDIRECT); + response.header("Location", resURI); + return response.build(); + } else { + return LogWebServiceUtils.createBadResponse(Response.Status.NOT_FOUND, + "The application is not at Running or Finished State."); + } + } + + protected static class AppInfo { + private YarnApplicationState appState; + private String user; + + AppInfo(YarnApplicationState appState, String user) { + this.appState = appState; + this.user = user; + } + + public YarnApplicationState getAppState() { + return this.appState; + } + + public String getUser() { + return this.user; + } + } + + protected static class ContainerInfo { + private String nodeHttpAddress; + + ContainerInfo(String nodeHttpAddress) { + this.nodeHttpAddress = nodeHttpAddress; + } + + public String getNodeHttpAddress() { + return nodeHttpAddress; + } + } + + @VisibleForTesting protected TimelineEntity getEntity(String path, + MultivaluedMap params) throws IOException { + ClientResponse resp = + getClient().resource(base).path(path).queryParams(params) + .accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + if (resp == null + || resp.getStatusInfo().getStatusCode() != ClientResponse.Status.OK + .getStatusCode()) { + String msg = + "Response from the timeline reader server is " + ((resp == null) ? + "null" : + "not successful," + " HTTP error code: " + resp.getStatus() + + ", Server response:\n" + resp.getEntity(String.class)); + LOG.error(msg); + throw new IOException(msg); + } + TimelineEntity entity = resp.getEntity(TimelineEntity.class); + return entity; + } + + private Client getClient() { + if (webTimelineClient == null) { + synchronized (LogWebService.class) { + if (webTimelineClient == null) { + webTimelineClient = createTimelineWebClient(); + } + } + } + return webTimelineClient; + } + + /** + * Set user.name in non-secure mode to delegate to next rest call. + */ + private void setUserName(MultivaluedMap params, String user) { + if (!UserGroupInformation.isSecurityEnabled()) { + params.add("user.name", user); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebServiceUtils.java new file mode 100644 index 00000000000..bc301bbb2d8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebServiceUtils.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.webapp; + +import com.google.common.base.Joiner; +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.UniformInterfaceException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AuthorizationException; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; +import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; +import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo; +import org.apache.hadoop.yarn.webapp.ForbiddenException; +import org.apache.hadoop.yarn.webapp.NotFoundException; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.GenericEntity; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.UndeclaredThrowableException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Log web service utils class. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class LogWebServiceUtils { + + private LogWebServiceUtils() { + } + + private static final Joiner DOT_JOINER = Joiner.on(". "); + + public static Response getContainerLogMeta( + LogAggregationFileControllerFactory factory, ApplicationId appId, + String appOwner, final String nodeId, final String containerIdStr, + boolean emptyLocalContainerLogMeta) { + try { + ContainerLogsRequest request = new ContainerLogsRequest(); + request.setAppId(appId); + request.setAppOwner(appOwner); + request.setContainerId(containerIdStr); + request.setNodeId(nodeId); + List containerLogMeta = + factory.getFileControllerForRead(appId, appOwner) + .readAggregatedLogsMeta(request); + if (containerLogMeta.isEmpty()) { + throw new NotFoundException( + "Can not get log meta for container: " + containerIdStr); + } + List containersLogsInfo = new ArrayList<>(); + for (ContainerLogMeta meta : containerLogMeta) { + ContainerLogsInfo logInfo = + new ContainerLogsInfo(meta, ContainerLogAggregationType.AGGREGATED); + containersLogsInfo.add(logInfo); + } + if (emptyLocalContainerLogMeta) { + ContainerLogMeta emptyMeta = + new ContainerLogMeta(containerIdStr, "N/A"); + ContainerLogsInfo empty = + new ContainerLogsInfo(emptyMeta, ContainerLogAggregationType.LOCAL); + containersLogsInfo.add(empty); + } + GenericEntity> meta = + new GenericEntity>(containersLogsInfo) { + }; + Response.ResponseBuilder response = Response.ok(meta); + // Sending the X-Content-Type-Options response header with the value + // nosniff will prevent Internet Explorer from MIME-sniffing a response + // away from the declared content-type. + response.header("X-Content-Type-Options", "nosniff"); + return response.build(); + } catch (Exception ex) { + throw new WebApplicationException(ex); + } + } + + public static Response sendStreamOutputResponse( + LogAggregationFileControllerFactory factory, ApplicationId appId, + String appOwner, String nodeId, String containerIdStr, String fileName, + String format, long bytes, boolean printEmptyLocalContainerLog) { + String contentType = WebAppUtils.getDefaultLogContentType(); + if (format != null && !format.isEmpty()) { + contentType = WebAppUtils.getSupportedLogContentType(format); + if (contentType == null) { + String errorMessage = + "The valid values for the parameter : format " + "are " + + WebAppUtils.listSupportedLogContentType(); + return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage) + .build(); + } + } + StreamingOutput stream = null; + try { + stream = + getStreamingOutput(factory, appId, appOwner, nodeId, containerIdStr, + fileName, bytes, printEmptyLocalContainerLog); + } catch (Exception ex) { + return createBadResponse(Response.Status.INTERNAL_SERVER_ERROR, + ex.getMessage()); + } + Response.ResponseBuilder response = Response.ok(stream); + response.header("Content-Type", contentType); + // Sending the X-Content-Type-Options response header with the value + // nosniff will prevent Internet Explorer from MIME-sniffing a response + // away from the declared content-type. + response.header("X-Content-Type-Options", "nosniff"); + return response.build(); + } + + private static StreamingOutput getStreamingOutput( + final LogAggregationFileControllerFactory factory, + final ApplicationId appId, final String appOwner, final String nodeId, + final String containerIdStr, final String logFile, final long bytes, + final boolean printEmptyLocalContainerLog) throws IOException { + StreamingOutput stream = new StreamingOutput() { + + @Override public void write(OutputStream os) + throws IOException, WebApplicationException { + ContainerLogsRequest request = new ContainerLogsRequest(); + request.setAppId(appId); + request.setAppOwner(appOwner); + request.setContainerId(containerIdStr); + request.setBytes(bytes); + request.setNodeId(nodeId); + Set logTypes = new HashSet<>(); + logTypes.add(logFile); + request.setLogTypes(logTypes); + boolean findLogs = factory.getFileControllerForRead(appId, appOwner) + .readAggregatedLogs(request, os); + if (!findLogs) { + os.write(("Can not find logs for container:" + containerIdStr) + .getBytes(Charset.forName("UTF-8"))); + } else { + if (printEmptyLocalContainerLog) { + StringBuilder sb = new StringBuilder(); + sb.append(containerIdStr + "\n"); + sb.append("LogAggregationType: " + ContainerLogAggregationType.LOCAL + + "\n"); + sb.append("LogContents:\n"); + sb.append(getNoRedirectWarning() + "\n"); + os.write(sb.toString().getBytes(Charset.forName("UTF-8"))); + } + } + } + }; + return stream; + } + + public static String getNoRedirectWarning() { + return "We do not have NodeManager web address, so we can not " + + "re-direct the request to related NodeManager " + + "for local container logs."; + } + + public static void rewrapAndThrowException(Exception e) { + if (e instanceof UndeclaredThrowableException) { + rewrapAndThrowThrowable(e.getCause()); + } else { + rewrapAndThrowThrowable(e); + } + } + + public static void rewrapAndThrowThrowable(Throwable t) { + if (t instanceof AuthorizationException) { + throw new ForbiddenException(t); + } else { + throw new WebApplicationException(t); + } + } + + public static long parseLongParam(String bytes) { + if (bytes == null || bytes.isEmpty()) { + return Long.MAX_VALUE; + } + return Long.parseLong(bytes); + } + + public static Response createBadResponse(Response.Status status, + String errMessage) { + Response response = Response.status(status) + .entity(DOT_JOINER.join(status.toString(), errMessage)).build(); + return response; + } + + public static boolean isRunningState(YarnApplicationState appState) { + return appState == YarnApplicationState.RUNNING; + } + + public static boolean isFinishedState(YarnApplicationState appState) { + return appState == YarnApplicationState.FINISHED + || appState == YarnApplicationState.FAILED + || appState == YarnApplicationState.KILLED; + } + + protected static UserGroupInformation getUser(HttpServletRequest req) { + String remoteUser = req.getRemoteUser(); + UserGroupInformation callerUGI = null; + if (remoteUser != null) { + callerUGI = UserGroupInformation.createRemoteUser(remoteUser); + } + return callerUGI; + } + + public static String getNMWebAddressFromRM(Configuration yarnConf, + String nodeId) + throws ClientHandlerException, UniformInterfaceException, JSONException { + JSONObject nodeInfo = + YarnWebServiceUtils.getNodeInfoFromRMWebService(yarnConf, nodeId) + .getJSONObject("node"); + return nodeInfo.has("nodeHTTPAddress") ? + nodeInfo.getString("nodeHTTPAddress") : null; + } + + public static String getAbsoluteNMWebAddress(Configuration yarnConf, + String nmWebAddress) { + if (nmWebAddress.contains(WebAppUtils.HTTP_PREFIX) || nmWebAddress + .contains(WebAppUtils.HTTPS_PREFIX)) { + return nmWebAddress; + } + return WebAppUtils.getHttpSchemePrefix(yarnConf) + nmWebAddress; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java index 479cc758138..5f96f231524 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java @@ -35,4 +35,5 @@ public interface YarnWebServiceParams { String RESPONSE_CONTENT_SIZE = "size"; String NM_ID = "nm.id"; String REDIRECTED_FROM_NODE = "redirected_from_node"; + String CLUSTER_ID = "clusterid"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/webapp/TestLogWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/webapp/TestLogWebService.java new file mode 100644 index 00000000000..9ceb629791c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/webapp/TestLogWebService.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.webapp; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.MultivaluedMap; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * test class for log web service. + */ +public class TestLogWebService { + + private HttpServletRequest request; + private LogWebServiceTest logWebService; + private static TimelineEntity entity; + private ApplicationId appId; + private ContainerId cId; + private String user = "user1"; + private Map entities; + private String nodeHttpAddress = "localhost:0"; + + @Before public void setup() throws Exception { + appId = ApplicationId.fromString("application_1518143905142_509690"); + cId = + ContainerId.fromString("container_e138_1518143905142_509690_01_000001"); + entities = new HashMap<>(); + generateEntity(); + request = Mockito.mock(HttpServletRequest.class); + Mockito.when(request.getRemoteUser()) + .thenReturn(System.getProperty("user.name")); + logWebService = new LogWebServiceTest(); + + } + + @Test public void testGetApp() { + + LogWebService.AppInfo app = + logWebService.getApp(request, appId.toString(), null); + Assert.assertEquals("RUNNING", app.getAppState().toString()); + Assert.assertEquals(user, app.getUser()); + } + + @Test public void testGetContainer() { + LogWebService.ContainerInfo container = logWebService + .getContainer(request, appId.toString(), cId.toString(), null); + Assert.assertEquals(nodeHttpAddress, container.getNodeHttpAddress()); + } + + class LogWebServiceTest extends LogWebService { + + @Override protected TimelineEntity getEntity(String path, + MultivaluedMap params) throws IOException { + if (path.endsWith(cId.toString())) { + return entities.get(cId.toString()); + } else if (path.endsWith(appId.toString())) { + return entities.get(appId.toString()); + } else { + throw new IOException(); + } + } + } + + private void generateEntity() { + createAppEntities(); + createContainerEntities(); + } + + private void createContainerEntities() { + TimelineEntity timelineEntity = + generateEntity(TimelineEntityType.YARN_APPLICATION.toString(), + appId.toString()); + timelineEntity.addInfo(ApplicationMetricsConstants.USER_ENTITY_INFO, user); + timelineEntity + .addInfo(ApplicationMetricsConstants.STATE_EVENT_INFO, "RUNNING"); + entities.put(appId.toString(), timelineEntity); + } + + private void createAppEntities() { + TimelineEntity timelineEntity = + generateEntity(TimelineEntityType.YARN_CONTAINER.toString(), + cId.toString()); + timelineEntity + .addInfo(ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO, + nodeHttpAddress); + entities.put(cId.toString(), timelineEntity); + } + + private TimelineEntity generateEntity(String entityType, + String entityId) { + TimelineEntity timelineEntity = new TimelineEntity(); + timelineEntity.setId(entityId); + timelineEntity.setType(entityType); + timelineEntity.setCreatedTime(System.currentTimeMillis()); + return timelineEntity; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java index bd2c428c4fb..8f1e7d74e41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.security.TimelineRea import org.apache.hadoop.yarn.server.timelineservice.reader.security.TimelineReaderWhitelistAuthorizationFilterInitializer; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; import org.apache.hadoop.yarn.server.util.timeline.TimelineServerUtils; +import org.apache.hadoop.yarn.server.webapp.LogWebService; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @@ -201,7 +202,8 @@ public class TimelineReaderServer extends CompositeService { readerWebServer.addJerseyResourcePackage( TimelineReaderWebServices.class.getPackage().getName() + ";" + GenericExceptionHandler.class.getPackage().getName() + ";" - + YarnJacksonJaxbJsonProvider.class.getPackage().getName(), + + YarnJacksonJaxbJsonProvider.class.getPackage().getName()+ ";" + + LogWebService.class.getPackage().getName(), "/*"); readerWebServer.setAttribute(TIMELINE_READER_MANAGER_ATTR, timelineReaderManager);