YARN-4920. ATS/NM should support a link to dowload/get the logs in text format. Contributed by Xuan Gong.

(cherry picked from commit 3cf223166d452a0f58f92676837a9edb8ddc1139)
This commit is contained in:
Junping Du 2016-05-04 09:40:13 -07:00
parent d43d8a1bcd
commit 1ffb0c43d6
6 changed files with 525 additions and 17 deletions

View File

@ -18,6 +18,11 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.Collections; import java.util.Collections;
import java.util.Set; import java.util.Set;
@ -28,13 +33,30 @@
import javax.ws.rs.PathParam; import javax.ws.rs.PathParam;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam; import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol; import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.server.webapp.WebServices; import org.apache.hadoop.yarn.server.webapp.WebServices;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo;
@ -42,9 +64,10 @@
import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo; import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.BadRequestException;
import com.google.common.base.Joiner;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
@ -52,9 +75,17 @@
@Path("/ws/v1/applicationhistory") @Path("/ws/v1/applicationhistory")
public class AHSWebServices extends WebServices { public class AHSWebServices extends WebServices {
private static final String NM_DOWNLOAD_URI_STR =
"/ws/v1/node/containerlogs";
private static final Joiner JOINER = Joiner.on("");
private static final Joiner DOT_JOINER = Joiner.on(". ");
private final Configuration conf;
@Inject @Inject
public AHSWebServices(ApplicationBaseProtocol appBaseProt) { public AHSWebServices(ApplicationBaseProtocol appBaseProt,
Configuration conf) {
super(appBaseProt); super(appBaseProt);
this.conf = conf;
} }
@GET @GET
@ -173,4 +204,239 @@ public ContainerInfo getContainer(@Context HttpServletRequest req,
} }
} }
@GET
@Path("/containerlogs/{containerid}/{filename}")
@Produces({ MediaType.TEXT_PLAIN })
@Public
@Unstable
public Response getLogs(@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam("containerid") String containerIdStr,
@PathParam("filename") String filename,
@QueryParam("download") String download) {
init(res);
ContainerId containerId;
try {
containerId = ContainerId.fromString(containerIdStr);
} catch (IllegalArgumentException ex) {
return createBadResponse(Status.NOT_FOUND,
"Invalid ContainerId: " + containerIdStr);
}
boolean downloadFile = parseBooleanParam(download);
ApplicationId appId = containerId.getApplicationAttemptId()
.getApplicationId();
AppInfo appInfo;
try {
appInfo = super.getApp(req, res, appId.toString());
} catch (Exception ex) {
// directly find logs from HDFS.
return sendStreamOutputResponse(appId, null, null, containerIdStr,
filename, downloadFile);
}
String appOwner = appInfo.getUser();
ContainerInfo containerInfo;
try {
containerInfo = super.getContainer(
req, res, appId.toString(),
containerId.getApplicationAttemptId().toString(),
containerId.toString());
} catch (Exception ex) {
if (isFinishedState(appInfo.getAppState())) {
// directly find logs from HDFS.
return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
filename, downloadFile);
}
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
"Can not get ContainerInfo for the container: " + containerId);
}
String nodeId = containerInfo.getNodeId();
if (isRunningState(appInfo.getAppState())) {
String nodeHttpAddress = containerInfo.getNodeHttpAddress();
String uri = "/" + containerId.toString() + "/" + filename;
String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri);
String query = req.getQueryString();
if (query != null && !query.isEmpty()) {
resURI += "?" + query;
}
ResponseBuilder response = Response.status(
HttpServletResponse.SC_TEMPORARY_REDIRECT);
response.header("Location", resURI);
return response.build();
} else if (isFinishedState(appInfo.getAppState())) {
return sendStreamOutputResponse(appId, appOwner, nodeId,
containerIdStr, filename, downloadFile);
} else {
return createBadResponse(Status.NOT_FOUND,
"The application is not at Running or Finished State.");
}
}
// TODO: YARN-5029. RM would send the update event. We could get
// the consistent YarnApplicationState.
// Will remove YarnApplicationState.ACCEPTED.
private boolean isRunningState(YarnApplicationState appState) {
return appState == YarnApplicationState.ACCEPTED
|| appState == YarnApplicationState.RUNNING;
}
private boolean isFinishedState(YarnApplicationState appState) {
return appState == YarnApplicationState.FINISHED
|| appState == YarnApplicationState.FAILED
|| appState == YarnApplicationState.KILLED;
}
private Response createBadResponse(Status status, String errMessage) {
Response response = Response.status(status)
.entity(DOT_JOINER.join(status.toString(), errMessage)).build();
return response;
}
private boolean parseBooleanParam(String param) {
return ("true").equalsIgnoreCase(param);
}
private Response sendStreamOutputResponse(ApplicationId appId,
String appOwner, String nodeId, String containerIdStr,
String fileName, boolean downloadFile) {
StreamingOutput stream = null;
try {
stream = getStreamingOutput(appId, appOwner, nodeId,
containerIdStr, fileName);
} catch (Exception ex) {
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
ex.getMessage());
}
if (stream == null) {
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
"Can not get log for container: " + containerIdStr);
}
ResponseBuilder response = Response.ok(stream);
if (downloadFile) {
response.header("Content-Type", "application/octet-stream");
response.header("Content-Disposition", "attachment; filename="
+ fileName);
}
return response.build();
}
private StreamingOutput getStreamingOutput(ApplicationId appId,
String appOwner, final String nodeId, final String containerIdStr,
final String logFile) throws IOException{
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
org.apache.hadoop.fs.Path remoteRootLogDir = new org.apache.hadoop.fs.Path(
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
FileContext.getFileContext(conf).makeQualified(remoteRootLogDir);
FileContext fc = FileContext.getFileContext(
qualifiedRemoteRootLogDir.toUri(), conf);
org.apache.hadoop.fs.Path remoteAppDir = null;
if (appOwner == null) {
org.apache.hadoop.fs.Path toMatch = LogAggregationUtils
.getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
FileStatus[] matching = fc.util().globStatus(toMatch);
if (matching == null || matching.length != 1) {
return null;
}
remoteAppDir = matching[0].getPath();
} else {
remoteAppDir = LogAggregationUtils
.getRemoteAppLogDir(remoteRootLogDir, appId, appOwner, suffix);
}
final RemoteIterator<FileStatus> nodeFiles;
nodeFiles = fc.listStatus(remoteAppDir);
if (!nodeFiles.hasNext()) {
return null;
}
StreamingOutput stream = new StreamingOutput() {
@Override
public void write(OutputStream os) throws IOException,
WebApplicationException {
byte[] buf = new byte[65535];
boolean findLogs = false;
while (nodeFiles.hasNext()) {
final FileStatus thisNodeFile = nodeFiles.next();
String nodeName = thisNodeFile.getPath().getName();
if ((nodeId == null || nodeName.contains(LogAggregationUtils
.getNodeString(nodeId))) && !nodeName.endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(conf,
thisNodeFile.getPath());
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
while (valueStream != null && !key.toString()
.equals(containerIdStr)) {
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
if (valueStream == null) {
continue;
}
while (true) {
try {
String fileType = valueStream.readUTF();
String fileLengthStr = valueStream.readUTF();
long fileLength = Long.parseLong(fileLengthStr);
if (fileType.equalsIgnoreCase(logFile)) {
StringBuilder sb = new StringBuilder();
sb.append("LogType:");
sb.append(fileType + "\n");
sb.append("Log Upload Time:");
sb.append(Times.format(System.currentTimeMillis()) + "\n");
sb.append("LogLength:");
sb.append(fileLengthStr + "\n");
sb.append("Log Contents:\n");
byte[] b = sb.toString().getBytes(Charset.forName("UTF-8"));
os.write(b, 0, b.length);
long curRead = 0;
long pendingRead = fileLength - curRead;
int toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
int len = valueStream.read(buf, 0, toRead);
while (len != -1 && curRead < fileLength) {
os.write(buf, 0, len);
curRead += len;
pendingRead = fileLength - curRead;
toRead = pendingRead > buf.length ? buf.length
: (int) pendingRead;
len = valueStream.read(buf, 0, toRead);
}
sb = new StringBuilder();
sb.append("\nEnd of LogType:" + fileType + "\n");
b = sb.toString().getBytes(Charset.forName("UTF-8"));
os.write(b, 0, b.length);
findLogs = true;
} else {
long totalSkipped = 0;
long currSkipped = 0;
while (currSkipped != -1 && totalSkipped < fileLength) {
currSkipped = valueStream.skip(fileLength - totalSkipped);
totalSkipped += currSkipped;
}
}
} catch (EOFException eof) {
break;
}
}
}
}
os.flush();
if (!findLogs) {
throw new IOException("Can not find logs for container:"
+ containerIdStr);
}
}
};
return stream;
}
} }

View File

@ -80,9 +80,11 @@ public static void prepareStore() throws Exception {
store = createStore(SCALE); store = createStore(SCALE);
TimelineEntities entities = new TimelineEntities(); TimelineEntities entities = new TimelineEntities();
entities.addEntity(createApplicationTimelineEntity( entities.addEntity(createApplicationTimelineEntity(
ApplicationId.newInstance(0, SCALE + 1), true, true, false, false)); ApplicationId.newInstance(0, SCALE + 1), true, true, false, false,
YarnApplicationState.FINISHED));
entities.addEntity(createApplicationTimelineEntity( entities.addEntity(createApplicationTimelineEntity(
ApplicationId.newInstance(0, SCALE + 2), true, false, true, false)); ApplicationId.newInstance(0, SCALE + 2), true, false, true, false,
YarnApplicationState.FINISHED));
store.put(entities); store.put(entities);
} }
@ -140,10 +142,10 @@ private static void prepareTimelineStore(TimelineStore store, int scale)
ApplicationId appId = ApplicationId.newInstance(0, i); ApplicationId appId = ApplicationId.newInstance(0, i);
if (i == 2) { if (i == 2) {
entities.addEntity(createApplicationTimelineEntity( entities.addEntity(createApplicationTimelineEntity(
appId, true, false, false, true)); appId, true, false, false, true, YarnApplicationState.FINISHED));
} else { } else {
entities.addEntity(createApplicationTimelineEntity( entities.addEntity(createApplicationTimelineEntity(
appId, false, false, false, false)); appId, false, false, false, false, YarnApplicationState.FINISHED));
} }
store.put(entities); store.put(entities);
for (int j = 1; j <= scale; ++j) { for (int j = 1; j <= scale; ++j) {
@ -160,6 +162,16 @@ private static void prepareTimelineStore(TimelineStore store, int scale)
} }
} }
} }
TimelineEntities entities = new TimelineEntities();
ApplicationId appId = ApplicationId.newInstance(1234, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
entities.addEntity(createApplicationTimelineEntity(
appId, true, false, false, false, YarnApplicationState.RUNNING));
entities.addEntity(createAppAttemptTimelineEntity(appAttemptId));
entities.addEntity(createContainerEntity(containerId));
store.put(entities);
} }
@Test @Test
@ -355,7 +367,7 @@ public void testGetApplications() throws Exception {
historyManager.getApplications(Long.MAX_VALUE, 0L, Long.MAX_VALUE) historyManager.getApplications(Long.MAX_VALUE, 0L, Long.MAX_VALUE)
.values(); .values();
Assert.assertNotNull(apps); Assert.assertNotNull(apps);
Assert.assertEquals(SCALE + 1, apps.size()); Assert.assertEquals(SCALE + 2, apps.size());
ApplicationId ignoredAppId = ApplicationId.newInstance(0, SCALE + 2); ApplicationId ignoredAppId = ApplicationId.newInstance(0, SCALE + 2);
for (ApplicationReport app : apps) { for (ApplicationReport app : apps) {
Assert.assertNotEquals(ignoredAppId, app.getApplicationId()); Assert.assertNotEquals(ignoredAppId, app.getApplicationId());
@ -467,7 +479,8 @@ public ContainerReport run() throws Exception {
private static TimelineEntity createApplicationTimelineEntity( private static TimelineEntity createApplicationTimelineEntity(
ApplicationId appId, boolean emptyACLs, boolean noAttemptId, ApplicationId appId, boolean emptyACLs, boolean noAttemptId,
boolean wrongAppId, boolean enableUpdateEvent) { boolean wrongAppId, boolean enableUpdateEvent,
YarnApplicationState state) {
TimelineEntity entity = new TimelineEntity(); TimelineEntity entity = new TimelineEntity();
entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE); entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
if (wrongAppId) { if (wrongAppId) {
@ -517,7 +530,7 @@ private static TimelineEntity createApplicationTimelineEntity(
eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
FinalApplicationStatus.UNDEFINED.toString()); FinalApplicationStatus.UNDEFINED.toString());
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
YarnApplicationState.FINISHED.toString()); state.toString());
if (!noAttemptId) { if (!noAttemptId) {
eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
ApplicationAttemptId.newInstance(appId, 1)); ApplicationAttemptId.newInstance(appId, 1));
@ -610,6 +623,8 @@ private static TimelineEntity createContainerEntity(ContainerId containerId) {
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, 100); entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, 100);
entityInfo entityInfo
.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, -1); .put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, -1);
entityInfo.put(ContainerMetricsConstants
.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, "http://test:1234");
entity.setOtherInfo(entityInfo); entity.setOtherInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent(); TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE); tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);

View File

@ -19,17 +19,30 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileWriter;
import java.io.Writer;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Properties; import java.util.Properties;
import javax.servlet.FilterConfig; import javax.servlet.FilterConfig;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter; import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler; import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol; import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
@ -42,6 +55,8 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryClientService; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryClientService;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManagerOnTimelineStore; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManagerOnTimelineStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.TestApplicationHistoryManagerOnTimelineStore; import org.apache.hadoop.yarn.server.applicationhistoryservice.TestApplicationHistoryManagerOnTimelineStore;
@ -81,12 +96,17 @@
public class TestAHSWebServices extends JerseyTestBase { public class TestAHSWebServices extends JerseyTestBase {
private static ApplicationHistoryClientService historyClientService; private static ApplicationHistoryClientService historyClientService;
private static AHSWebServices ahsWebservice;
private static final String[] USERS = new String[] { "foo" , "bar" }; private static final String[] USERS = new String[] { "foo" , "bar" };
private static final int MAX_APPS = 5; private static final int MAX_APPS = 5;
private static Configuration conf;
private static FileSystem fs;
private static final String remoteLogRootDir = "target/logs/";
private static final String rootLogDir = "target/LocalLogs";
@BeforeClass @BeforeClass
public static void setupClass() throws Exception { public static void setupClass() throws Exception {
Configuration conf = new YarnConfiguration(); conf = new YarnConfiguration();
TimelineStore store = TimelineStore store =
TestApplicationHistoryManagerOnTimelineStore.createStore(MAX_APPS); TestApplicationHistoryManagerOnTimelineStore.createStore(MAX_APPS);
TimelineACLsManager aclsManager = new TimelineACLsManager(conf); TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
@ -95,6 +115,8 @@ public static void setupClass() throws Exception {
new TimelineDataManager(store, aclsManager); new TimelineDataManager(store, aclsManager);
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "foo"); conf.set(YarnConfiguration.YARN_ADMIN_ACL, "foo");
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir);
dataManager.init(conf); dataManager.init(conf);
ApplicationACLsManager appAclsManager = new ApplicationACLsManager(conf); ApplicationACLsManager appAclsManager = new ApplicationACLsManager(conf);
ApplicationHistoryManagerOnTimelineStore historyManager = ApplicationHistoryManagerOnTimelineStore historyManager =
@ -108,6 +130,8 @@ protected void serviceStart() throws Exception {
}; };
historyClientService.init(conf); historyClientService.init(conf);
historyClientService.start(); historyClientService.start();
ahsWebservice = new AHSWebServices(historyClientService, conf);
fs = FileSystem.get(conf);
} }
@AfterClass @AfterClass
@ -115,6 +139,8 @@ public static void tearDownClass() throws Exception {
if (historyClientService != null) { if (historyClientService != null) {
historyClientService.stop(); historyClientService.stop();
} }
fs.delete(new Path(remoteLogRootDir), true);
fs.delete(new Path(rootLogDir), true);
} }
@Parameterized.Parameters @Parameterized.Parameters
@ -127,7 +153,7 @@ public static Collection<Object[]> rounds() {
@Override @Override
protected void configureServlets() { protected void configureServlets() {
bind(JAXBContextResolver.class); bind(JAXBContextResolver.class);
bind(AHSWebServices.class); bind(AHSWebServices.class).toInstance(ahsWebservice);;
bind(GenericExceptionHandler.class); bind(GenericExceptionHandler.class);
bind(ApplicationBaseProtocol.class).toInstance(historyClientService); bind(ApplicationBaseProtocol.class).toInstance(historyClientService);
serve("/*").with(GuiceContainer.class); serve("/*").with(GuiceContainer.class);
@ -471,4 +497,177 @@ public void testSingleContainer() throws Exception {
assertEquals(ContainerState.COMPLETE.toString(), assertEquals(ContainerState.COMPLETE.toString(),
container.getString("containerState")); container.getString("containerState"));
} }
@Test(timeout = 10000)
public void testContainerLogsForFinishedApps() throws Exception {
String fileName = "syslog";
String user = "user1";
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user1");
NodeId nodeId = NodeId.newInstance("test host", 100);
NodeId nodeId2 = NodeId.newInstance("host2", 1234);
//prepare the logs for remote directory
ApplicationId appId = ApplicationId.newInstance(0, 1);
// create local logs
List<String> rootLogDirList = new ArrayList<String>();
rootLogDirList.add(rootLogDir);
Path rootLogDirPath = new Path(rootLogDir);
if (fs.exists(rootLogDirPath)) {
fs.delete(rootLogDirPath, true);
}
assertTrue(fs.mkdirs(rootLogDirPath));
Path appLogsDir = new Path(rootLogDirPath, appId.toString());
if (fs.exists(appLogsDir)) {
fs.delete(appLogsDir, true);
}
assertTrue(fs.mkdirs(appLogsDir));
// create container logs in local log file dir
// create two container log files. We can get containerInfo
// for container1 from AHS, but can not get such info for
// container100
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
ContainerId containerId100 = ContainerId.newContainerId(appAttemptId, 100);
createContainerLogInLocalDir(appLogsDir, containerId1, fs, fileName,
("Hello." + containerId1));
createContainerLogInLocalDir(appLogsDir, containerId100, fs, fileName,
("Hello." + containerId100));
// upload container logs to remote log dir
Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) +
user + "/logs/" + appId.toString());
if (fs.exists(path)) {
fs.delete(path, true);
}
assertTrue(fs.mkdirs(path));
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId,
containerId1, path, fs);
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId2,
containerId100, path, fs);
// test whether we can find container log from remote diretory if
// the containerInfo for this container could be fetched from AHS.
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1.toString()).path(fileName)
.queryParam("user.name", user)
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
String responseText = response.getEntity(String.class);
assertTrue(responseText.contains("Hello." + containerId1));
// test whether we can find container log from remote diretory if
// the containerInfo for this container could not be fetched from AHS.
r = resource();
response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId100.toString()).path(fileName)
.queryParam("user.name", user)
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertTrue(responseText.contains("Hello." + containerId100));
// create an application which can not be found from AHS
ApplicationId appId100 = ApplicationId.newInstance(0, 100);
appLogsDir = new Path(rootLogDirPath, appId100.toString());
if (fs.exists(appLogsDir)) {
fs.delete(appLogsDir, true);
}
assertTrue(fs.mkdirs(appLogsDir));
ApplicationAttemptId appAttemptId100 =
ApplicationAttemptId.newInstance(appId100, 1);
ContainerId containerId1ForApp100 = ContainerId
.newContainerId(appAttemptId100, 1);
createContainerLogInLocalDir(appLogsDir, containerId1ForApp100, fs,
fileName, ("Hello." + containerId1ForApp100));
path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) +
user + "/logs/" + appId100.toString());
if (fs.exists(path)) {
fs.delete(path, true);
}
assertTrue(fs.mkdirs(path));
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId2,
containerId1ForApp100, path, fs);
r = resource();
response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1ForApp100.toString()).path(fileName)
.queryParam("user.name", user)
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertTrue(responseText.contains("Hello." + containerId1ForApp100));
}
private static void createContainerLogInLocalDir(Path appLogsDir,
ContainerId containerId, FileSystem fs, String fileName, String content)
throws Exception {
Path containerLogsDir = new Path(appLogsDir, containerId.toString());
if (fs.exists(containerLogsDir)) {
fs.delete(containerLogsDir, true);
}
assertTrue(fs.mkdirs(containerLogsDir));
Writer writer =
new FileWriter(new File(containerLogsDir.toString(), fileName));
writer.write(content);
writer.close();
}
private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
Path path =
new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
AggregatedLogFormat.LogWriter writer =
new AggregatedLogFormat.LogWriter(configuration, path, ugi);
writer.writeApplicationOwner(ugi.getUserName());
writer.append(new AggregatedLogFormat.LogKey(containerId),
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
ugi.getShortUserName()));
writer.close();
}
@Test(timeout = 10000)
public void testContainerLogsForRunningApps() throws Exception {
String fileName = "syslog";
String user = "user1";
ApplicationId appId = ApplicationId.newInstance(
1234, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
WebResource r = resource();
URI requestURI = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs")
.path(containerId1.toString()).path(fileName)
.queryParam("user.name", user).getURI();
String redirectURL = getRedirectURL(requestURI.toString());
assertTrue(redirectURL != null);
assertTrue(redirectURL.contains("test:1234"));
assertTrue(redirectURL.contains("ws/v1/node/containerlogs"));
assertTrue(redirectURL.contains(containerId1.toString()));
assertTrue(redirectURL.contains("user.name=" + user));
}
private static String getRedirectURL(String url) {
String redirectUrl = null;
try {
HttpURLConnection conn = (HttpURLConnection) new URL(url)
.openConnection();
// do not automatically follow the redirection
// otherwise we get too many redirections exception
conn.setInstanceFollowRedirects(false);
if(conn.getResponseCode() == HttpServletResponse.SC_TEMPORARY_REDIRECT) {
redirectUrl = conn.getHeaderField("Location");
}
} catch (Exception e) {
// throw new RuntimeException(e);
}
return redirectUrl;
}
} }

View File

@ -48,6 +48,7 @@ public class ContainerInfo {
protected int containerExitStatus; protected int containerExitStatus;
protected ContainerState containerState; protected ContainerState containerState;
protected String nodeHttpAddress; protected String nodeHttpAddress;
protected String nodeId;
public ContainerInfo() { public ContainerInfo() {
// JAXB needs this // JAXB needs this
@ -71,6 +72,7 @@ public ContainerInfo(ContainerReport container) {
containerExitStatus = container.getContainerExitStatus(); containerExitStatus = container.getContainerExitStatus();
containerState = container.getContainerState(); containerState = container.getContainerState();
nodeHttpAddress = container.getNodeHttpAddress(); nodeHttpAddress = container.getNodeHttpAddress();
nodeId = container.getAssignedNode().toString();
} }
public String getContainerId() { public String getContainerId() {
@ -124,4 +126,8 @@ public ContainerState getContainerState() {
public String getNodeHttpAddress() { public String getNodeHttpAddress() {
return nodeHttpAddress; return nodeHttpAddress;
} }
public String getNodeId() {
return nodeId;
}
} }

View File

@ -33,6 +33,7 @@
import javax.ws.rs.WebApplicationException; import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo; import javax.ws.rs.core.UriInfo;
@ -215,7 +216,8 @@ public ContainerInfo getNodeContainer(@javax.ws.rs.core.Context
@Public @Public
@Unstable @Unstable
public Response getLogs(@PathParam("containerid") String containerIdStr, public Response getLogs(@PathParam("containerid") String containerIdStr,
@PathParam("filename") String filename) { @PathParam("filename") String filename,
@QueryParam("download") String download) {
ContainerId containerId; ContainerId containerId;
try { try {
containerId = ConverterUtils.toContainerId(containerIdStr); containerId = ConverterUtils.toContainerId(containerIdStr);
@ -232,7 +234,7 @@ public Response getLogs(@PathParam("containerid") String containerIdStr,
} catch (YarnException ex) { } catch (YarnException ex) {
return Response.serverError().entity(ex.getMessage()).build(); return Response.serverError().entity(ex.getMessage()).build();
} }
boolean downloadFile = parseBooleanParam(download);
try { try {
final FileInputStream fis = ContainerLogsUtils.openLogFileForRead( final FileInputStream fis = ContainerLogsUtils.openLogFileForRead(
containerIdStr, logFile, nmContext); containerIdStr, logFile, nmContext);
@ -250,10 +252,22 @@ public void write(OutputStream os) throws IOException,
os.flush(); os.flush();
} }
}; };
ResponseBuilder resp = Response.ok(stream);
return Response.ok(stream).build(); if (downloadFile) {
resp.header("Content-Type", "application/octet-stream");
resp.header("Content-Disposition", "attachment; filename="
+ logFile.getName());
}
return resp.build();
} catch (IOException ex) { } catch (IOException ex) {
return Response.serverError().entity(ex.getMessage()).build(); return Response.serverError().entity(ex.getMessage()).build();
} }
} }
private boolean parseBooleanParam(String param) {
if (param != null) {
return ("true").equalsIgnoreCase(param);
}
return false;
}
} }

View File

@ -50,7 +50,6 @@
import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView; import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer.NMWebApp; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer.NMWebApp;
@ -353,6 +352,15 @@ public void testContainerLogs() throws IOException {
String responseText = response.getEntity(String.class); String responseText = response.getEntity(String.class);
assertEquals(logMessage, responseText); assertEquals(logMessage, responseText);
// ask and download it
response = r.path("ws").path("v1").path("node").path("containerlogs")
.path(containerIdStr).path(filename).queryParam("download", "true")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertEquals(logMessage, responseText);
assertEquals(200, response.getStatus());
assertEquals("application/octet-stream", response.getType().toString());
// ask for file that doesn't exist // ask for file that doesn't exist
response = r.path("ws").path("v1").path("node") response = r.path("ws").path("v1").path("node")
.path("containerlogs").path(containerIdStr).path("uhhh") .path("containerlogs").path(containerIdStr).path("uhhh")