YARN-6011. Add a new web service to list the files on a container in AHSWebService. Contributed by Xuan Gong.
(cherry picked from commit cf695577aa
)
This commit is contained in:
parent
ffb4c22f6e
commit
861e275646
|
@ -24,6 +24,8 @@ import java.io.IOException;
|
|||
import java.io.OutputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
@ -40,6 +42,7 @@ import javax.ws.rs.core.Response;
|
|||
import javax.ws.rs.core.StreamingOutput;
|
||||
import javax.ws.rs.core.Response.ResponseBuilder;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import org.apache.commons.math3.util.Pair;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -56,12 +59,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
|
||||
import org.apache.hadoop.yarn.server.webapp.WebServices;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.AppInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||
|
@ -204,6 +209,115 @@ public class AHSWebServices extends WebServices {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: YARN-6080: Create WebServiceUtils to have common functions used in
|
||||
// RMWebService, NMWebService and AHSWebService.
|
||||
/**
|
||||
* Returns log file's name as well as current file size for a container.
|
||||
*
|
||||
* @param req
|
||||
* HttpServletRequest
|
||||
* @param res
|
||||
* HttpServletResponse
|
||||
* @param containerIdStr
|
||||
* The container ID
|
||||
* @return
|
||||
* The log file's name and current file size
|
||||
*/
|
||||
@GET
|
||||
@Path("/containers/{containerid}/logs")
|
||||
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
||||
public Response getContainerLogsInfo(
|
||||
@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("containerid") String containerIdStr) {
|
||||
ContainerId containerId = null;
|
||||
init(res);
|
||||
try {
|
||||
containerId = ContainerId.fromString(containerIdStr);
|
||||
} catch (Exception e) {
|
||||
throw new BadRequestException("invalid container id, " + containerIdStr);
|
||||
}
|
||||
ApplicationId appId = containerId.getApplicationAttemptId()
|
||||
.getApplicationId();
|
||||
AppInfo appInfo;
|
||||
try {
|
||||
appInfo = super.getApp(req, res, appId.toString());
|
||||
} catch (Exception ex) {
|
||||
// directly find logs from HDFS.
|
||||
return getContainerLogMeta(appId, null, null, containerIdStr);
|
||||
}
|
||||
String appOwner = appInfo.getUser();
|
||||
ContainerInfo containerInfo;
|
||||
try {
|
||||
containerInfo = super.getContainer(
|
||||
req, res, appId.toString(),
|
||||
containerId.getApplicationAttemptId().toString(),
|
||||
containerId.toString());
|
||||
} catch (Exception ex) {
|
||||
if (isFinishedState(appInfo.getAppState())) {
|
||||
// directly find logs from HDFS.
|
||||
return getContainerLogMeta(appId, appOwner, null, containerIdStr);
|
||||
}
|
||||
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
|
||||
"Can not get ContainerInfo for the container: " + containerId);
|
||||
}
|
||||
String nodeId = containerInfo.getNodeId();
|
||||
if (isRunningState(appInfo.getAppState())) {
|
||||
String nodeHttpAddress = containerInfo.getNodeHttpAddress();
|
||||
String uri = "/" + containerId.toString() + "/logs";
|
||||
String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri);
|
||||
String query = req.getQueryString();
|
||||
if (query != null && !query.isEmpty()) {
|
||||
resURI += "?" + query;
|
||||
}
|
||||
ResponseBuilder response = Response.status(
|
||||
HttpServletResponse.SC_TEMPORARY_REDIRECT);
|
||||
response.header("Location", resURI);
|
||||
return response.build();
|
||||
} else if (isFinishedState(appInfo.getAppState())) {
|
||||
return getContainerLogMeta(appId, appOwner, nodeId,
|
||||
containerIdStr);
|
||||
} else {
|
||||
return createBadResponse(Status.NOT_FOUND,
|
||||
"The application is not at Running or Finished State.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the contents of a container's log file in plain text.
|
||||
*
|
||||
* @param req
|
||||
* HttpServletRequest
|
||||
* @param res
|
||||
* HttpServletResponse
|
||||
* @param containerIdStr
|
||||
* The container ID
|
||||
* @param filename
|
||||
* The name of the log file
|
||||
* @param format
|
||||
* The content type
|
||||
* @param size
|
||||
* the size of the log file
|
||||
* @return
|
||||
* The contents of the container's log file
|
||||
*/
|
||||
@GET
|
||||
@Path("/containers/{containerid}/logs/{filename}")
|
||||
@Produces({ MediaType.TEXT_PLAIN })
|
||||
@Public
|
||||
@Unstable
|
||||
public Response getContainerLogFile(@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("containerid") String containerIdStr,
|
||||
@PathParam("filename") String filename,
|
||||
@QueryParam("format") String format,
|
||||
@QueryParam("size") String size) {
|
||||
return getLogs(req, res, containerIdStr, filename, format, size);
|
||||
}
|
||||
|
||||
//TODO: YARN-4993: Refactory ContainersLogsBlock, AggregatedLogsBlock and
|
||||
// container log webservice introduced in AHS to minimize
|
||||
// the duplication.
|
||||
@GET
|
||||
@Path("/containerlogs/{containerid}/{filename}")
|
||||
@Produces({ MediaType.TEXT_PLAIN })
|
||||
|
@ -477,4 +591,92 @@ public class AHSWebServices extends WebServices {
|
|||
}
|
||||
return Long.parseLong(bytes);
|
||||
}
|
||||
|
||||
private Response getContainerLogMeta(ApplicationId appId, String appOwner,
|
||||
final String nodeId, final String containerIdStr) {
|
||||
Map<String, String> containerLogMeta = new HashMap<>();
|
||||
try {
|
||||
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
|
||||
org.apache.hadoop.fs.Path remoteRootLogDir =
|
||||
new org.apache.hadoop.fs.Path(conf.get(
|
||||
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
||||
org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
|
||||
FileContext.getFileContext(conf).makeQualified(remoteRootLogDir);
|
||||
FileContext fc = FileContext.getFileContext(
|
||||
qualifiedRemoteRootLogDir.toUri(), conf);
|
||||
org.apache.hadoop.fs.Path remoteAppDir = null;
|
||||
if (appOwner == null) {
|
||||
org.apache.hadoop.fs.Path toMatch = LogAggregationUtils
|
||||
.getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
|
||||
FileStatus[] matching = fc.util().globStatus(toMatch);
|
||||
if (matching == null || matching.length != 1) {
|
||||
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
|
||||
"Can not get log meta for container: " + containerIdStr);
|
||||
}
|
||||
remoteAppDir = matching[0].getPath();
|
||||
} else {
|
||||
remoteAppDir = LogAggregationUtils.getRemoteAppLogDir(
|
||||
remoteRootLogDir, appId, appOwner, suffix);
|
||||
}
|
||||
final RemoteIterator<FileStatus> nodeFiles;
|
||||
nodeFiles = fc.listStatus(remoteAppDir);
|
||||
if (!nodeFiles.hasNext()) {
|
||||
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
|
||||
"Can not get log meta for container: " + containerIdStr);
|
||||
}
|
||||
String nodeIdStr = (nodeId == null) ? null
|
||||
: LogAggregationUtils.getNodeString(nodeId);
|
||||
while (nodeFiles.hasNext()) {
|
||||
FileStatus thisNodeFile = nodeFiles.next();
|
||||
if (nodeIdStr != null) {
|
||||
if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (!thisNodeFile.getPath().getName().endsWith(
|
||||
LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
AggregatedLogFormat.LogReader reader =
|
||||
new AggregatedLogFormat.LogReader(conf,
|
||||
thisNodeFile.getPath());
|
||||
try {
|
||||
DataInputStream valueStream;
|
||||
LogKey key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
while (valueStream != null) {
|
||||
if (key.toString().equals(containerIdStr)) {
|
||||
while (true) {
|
||||
try {
|
||||
Pair<String, String> logMeta =
|
||||
LogReader.readContainerMetaDataAndSkipData(
|
||||
valueStream, null);
|
||||
containerLogMeta.put(logMeta.getFirst(),
|
||||
logMeta.getSecond());
|
||||
} catch (EOFException eof) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
// Next container
|
||||
key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
ResponseBuilder response = Response.ok(new ContainerLogsInfo(
|
||||
containerLogMeta));
|
||||
// Sending the X-Content-Type-Options response header with the value
|
||||
// nosniff will prevent Internet Explorer from MIME-sniffing a response
|
||||
// away from the declared content-type.
|
||||
response.header("X-Content-Type-Options", "nosniff");
|
||||
return response.build();
|
||||
} catch (Exception ex) {
|
||||
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
|
||||
ex.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
|||
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
|
||||
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
|
||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||
|
@ -559,6 +560,17 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
String responseText = response.getEntity(String.class);
|
||||
assertTrue(responseText.contains("Hello." + containerId1));
|
||||
|
||||
// Do the same test with new API
|
||||
r = resource();
|
||||
response = r.path("ws").path("v1")
|
||||
.path("applicationhistory").path("containers")
|
||||
.path(containerId1.toString()).path("logs").path(fileName)
|
||||
.queryParam("user.name", user)
|
||||
.accept(MediaType.TEXT_PLAIN)
|
||||
.get(ClientResponse.class);
|
||||
responseText = response.getEntity(String.class);
|
||||
assertTrue(responseText.contains("Hello." + containerId1));
|
||||
|
||||
// test whether we can find container log from remote diretory if
|
||||
// the containerInfo for this container could not be fetched from AHS.
|
||||
r = resource();
|
||||
|
@ -571,6 +583,17 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
responseText = response.getEntity(String.class);
|
||||
assertTrue(responseText.contains("Hello." + containerId100));
|
||||
|
||||
// Do the same test with new API
|
||||
r = resource();
|
||||
response = r.path("ws").path("v1")
|
||||
.path("applicationhistory").path("containers")
|
||||
.path(containerId100.toString()).path("logs").path(fileName)
|
||||
.queryParam("user.name", user)
|
||||
.accept(MediaType.TEXT_PLAIN)
|
||||
.get(ClientResponse.class);
|
||||
responseText = response.getEntity(String.class);
|
||||
assertTrue(responseText.contains("Hello." + containerId100));
|
||||
|
||||
// create an application which can not be found from AHS
|
||||
ApplicationId appId100 = ApplicationId.newInstance(0, 100);
|
||||
appLogsDir = new Path(rootLogDirPath, appId100.toString());
|
||||
|
@ -719,6 +742,95 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
assertTrue(redirectURL.contains(containerId1.toString()));
|
||||
assertTrue(redirectURL.contains("/logs/" + fileName));
|
||||
assertTrue(redirectURL.contains("user.name=" + user));
|
||||
|
||||
// Test with new API
|
||||
requestURI = r.path("ws").path("v1")
|
||||
.path("applicationhistory").path("containers")
|
||||
.path(containerId1.toString()).path("logs").path(fileName)
|
||||
.queryParam("user.name", user).getURI();
|
||||
redirectURL = getRedirectURL(requestURI.toString());
|
||||
assertTrue(redirectURL != null);
|
||||
assertTrue(redirectURL.contains("test:1234"));
|
||||
assertTrue(redirectURL.contains("ws/v1/node/containers"));
|
||||
assertTrue(redirectURL.contains(containerId1.toString()));
|
||||
assertTrue(redirectURL.contains("/logs/" + fileName));
|
||||
assertTrue(redirectURL.contains("user.name=" + user));
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testContainerLogsMetaForRunningApps() throws Exception {
|
||||
String user = "user1";
|
||||
ApplicationId appId = ApplicationId.newInstance(
|
||||
1234, 1);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
|
||||
WebResource r = resource();
|
||||
URI requestURI = r.path("ws").path("v1")
|
||||
.path("applicationhistory").path("containers")
|
||||
.path(containerId1.toString()).path("logs")
|
||||
.queryParam("user.name", user).getURI();
|
||||
String redirectURL = getRedirectURL(requestURI.toString());
|
||||
assertTrue(redirectURL != null);
|
||||
assertTrue(redirectURL.contains("test:1234"));
|
||||
assertTrue(redirectURL.contains("ws/v1/node/containers"));
|
||||
assertTrue(redirectURL.contains(containerId1.toString()));
|
||||
assertTrue(redirectURL.contains("/logs"));
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testContainerLogsMetaForFinishedApps() throws Exception {
|
||||
String fileName = "syslog";
|
||||
String user = "user1";
|
||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user1");
|
||||
NodeId nodeId = NodeId.newInstance("test host", 100);
|
||||
//prepare the logs for remote directory
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
// create local logs
|
||||
List<String> rootLogDirList = new ArrayList<String>();
|
||||
rootLogDirList.add(rootLogDir);
|
||||
Path rootLogDirPath = new Path(rootLogDir);
|
||||
if (fs.exists(rootLogDirPath)) {
|
||||
fs.delete(rootLogDirPath, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(rootLogDirPath));
|
||||
Path appLogsDir = new Path(rootLogDirPath, appId.toString());
|
||||
if (fs.exists(appLogsDir)) {
|
||||
fs.delete(appLogsDir, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(appLogsDir));
|
||||
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
|
||||
String content = "Hello." + containerId1;
|
||||
createContainerLogInLocalDir(appLogsDir, containerId1, fs, fileName,
|
||||
content);
|
||||
|
||||
// upload container logs to remote log dir
|
||||
Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) +
|
||||
user + "/logs/" + appId.toString());
|
||||
if (fs.exists(path)) {
|
||||
fs.delete(path, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(path));
|
||||
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId,
|
||||
containerId1, path, fs);
|
||||
|
||||
WebResource r = resource();
|
||||
ClientResponse response = r.path("ws").path("v1")
|
||||
.path("applicationhistory").path("containers")
|
||||
.path(containerId1.toString()).path("logs")
|
||||
.queryParam("user.name", user)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.get(ClientResponse.class);
|
||||
ContainerLogsInfo responseText = response.getEntity(
|
||||
ContainerLogsInfo.class);
|
||||
assertEquals(responseText.getContainerLogsInfo().size(), 1);
|
||||
assertEquals(responseText.getContainerLogsInfo().get(0).getFileName(),
|
||||
fileName);
|
||||
assertEquals(responseText.getContainerLogsInfo().get(0).getFileSize(),
|
||||
String.valueOf(content.length()));
|
||||
}
|
||||
|
||||
private static String getRedirectURL(String url) {
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.webapp.dao;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
/**
|
||||
* {@code ContainerLogsInfo} includes the log meta-data of containers.
|
||||
* <p>
|
||||
* The container log meta-data includes details such as:
|
||||
* <ul>
|
||||
* <li>The filename of the container log.</li>
|
||||
* <li>The size of the container log.</li>
|
||||
* </ul>
|
||||
*/
|
||||
|
||||
@XmlRootElement(name = "containerLogsInfo")
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
public class ContainerLogsInfo {
|
||||
|
||||
@XmlElement(name = "containerLogInfo")
|
||||
protected List<ContainerLogInfo> containerLogsInfo;
|
||||
|
||||
//JAXB needs this
|
||||
public ContainerLogsInfo() {}
|
||||
|
||||
public ContainerLogsInfo(Map<String, String> containerLogMeta)
|
||||
throws YarnException {
|
||||
this.containerLogsInfo = new ArrayList<ContainerLogInfo>();
|
||||
for (Entry<String, String> meta : containerLogMeta.entrySet()) {
|
||||
ContainerLogInfo info = new ContainerLogInfo(meta.getKey(),
|
||||
meta.getValue());
|
||||
containerLogsInfo.add(info);
|
||||
}
|
||||
}
|
||||
|
||||
public List<ContainerLogInfo> getContainerLogsInfo() {
|
||||
return this.containerLogsInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* It includes the log meta-data of a container.
|
||||
*
|
||||
*/
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public static class ContainerLogInfo {
|
||||
private String fileName;
|
||||
private String fileSize;
|
||||
|
||||
//JAXB needs this
|
||||
public ContainerLogInfo() {}
|
||||
|
||||
public ContainerLogInfo(String fileName, String fileSize) {
|
||||
this.setFileName(fileName);
|
||||
this.setFileSize(fileSize);
|
||||
}
|
||||
|
||||
public String getFileName() {
|
||||
return fileName;
|
||||
}
|
||||
|
||||
public void setFileName(String fileName) {
|
||||
this.fileName = fileName;
|
||||
}
|
||||
|
||||
public String getFileSize() {
|
||||
return fileSize;
|
||||
}
|
||||
|
||||
public void setFileSize(String fileSize) {
|
||||
this.fileSize = fileSize;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue