From 22ac37615a933f9cee8cf19ad0182586a037b690 Mon Sep 17 00:00:00 2001 From: Junping Du Date: Wed, 4 May 2016 09:40:13 -0700 Subject: [PATCH] YARN-4920. ATS/NM should support a link to dowload/get the logs in text format. Contributed by Xuan Gong. (cherry picked from commit 3cf223166d452a0f58f92676837a9edb8ddc1139) (cherry picked from commit c79dc07dc193904f2586a5d64ea2f4e56d2396b8) --- .../webapp/AHSWebServices.java | 270 +++++++++++++++++- ...licationHistoryManagerOnTimelineStore.java | 29 +- .../webapp/TestAHSWebServices.java | 203 ++++++++++++- .../yarn/server/webapp/dao/ContainerInfo.java | 6 + .../nodemanager/webapp/NMWebServices.java | 22 +- .../nodemanager/webapp/TestNMWebServices.java | 12 +- 6 files changed, 525 insertions(+), 17 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java index e7a22bd1681..75dce070f56 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java @@ -18,6 +18,11 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; import java.util.Collections; import java.util.Set; @@ -28,13 +33,30 @@ import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; +import javax.ws.rs.core.Response.ResponseBuilder; +import javax.ws.rs.core.Response.Status; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.ApplicationBaseProtocol; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.server.webapp.WebServices; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo; @@ -42,9 +64,10 @@ import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; +import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.webapp.BadRequestException; - +import com.google.common.base.Joiner; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -52,9 +75,17 @@ @Path("/ws/v1/applicationhistory") public class AHSWebServices extends WebServices { + private static final String NM_DOWNLOAD_URI_STR = + "/ws/v1/node/containerlogs"; + private static final Joiner JOINER = Joiner.on(""); + private static final Joiner DOT_JOINER = Joiner.on(". "); + private final Configuration conf; + @Inject - public AHSWebServices(ApplicationBaseProtocol appBaseProt) { + public AHSWebServices(ApplicationBaseProtocol appBaseProt, + Configuration conf) { super(appBaseProt); + this.conf = conf; } @GET @@ -173,4 +204,239 @@ public ContainerInfo getContainer(@Context HttpServletRequest req, } } + @GET + @Path("/containerlogs/{containerid}/{filename}") + @Produces({ MediaType.TEXT_PLAIN }) + @Public + @Unstable + public Response getLogs(@Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("containerid") String containerIdStr, + @PathParam("filename") String filename, + @QueryParam("download") String download) { + init(res); + ContainerId containerId; + try { + containerId = ContainerId.fromString(containerIdStr); + } catch (IllegalArgumentException ex) { + return createBadResponse(Status.NOT_FOUND, + "Invalid ContainerId: " + containerIdStr); + } + + boolean downloadFile = parseBooleanParam(download); + + ApplicationId appId = containerId.getApplicationAttemptId() + .getApplicationId(); + AppInfo appInfo; + try { + appInfo = super.getApp(req, res, appId.toString()); + } catch (Exception ex) { + // directly find logs from HDFS. + return sendStreamOutputResponse(appId, null, null, containerIdStr, + filename, downloadFile); + } + String appOwner = appInfo.getUser(); + + ContainerInfo containerInfo; + try { + containerInfo = super.getContainer( + req, res, appId.toString(), + containerId.getApplicationAttemptId().toString(), + containerId.toString()); + } catch (Exception ex) { + if (isFinishedState(appInfo.getAppState())) { + // directly find logs from HDFS. + return sendStreamOutputResponse(appId, appOwner, null, containerIdStr, + filename, downloadFile); + } + return createBadResponse(Status.INTERNAL_SERVER_ERROR, + "Can not get ContainerInfo for the container: " + containerId); + } + String nodeId = containerInfo.getNodeId(); + if (isRunningState(appInfo.getAppState())) { + String nodeHttpAddress = containerInfo.getNodeHttpAddress(); + String uri = "/" + containerId.toString() + "/" + filename; + String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri); + String query = req.getQueryString(); + if (query != null && !query.isEmpty()) { + resURI += "?" + query; + } + ResponseBuilder response = Response.status( + HttpServletResponse.SC_TEMPORARY_REDIRECT); + response.header("Location", resURI); + return response.build(); + } else if (isFinishedState(appInfo.getAppState())) { + return sendStreamOutputResponse(appId, appOwner, nodeId, + containerIdStr, filename, downloadFile); + } else { + return createBadResponse(Status.NOT_FOUND, + "The application is not at Running or Finished State."); + } + } + + // TODO: YARN-5029. RM would send the update event. We could get + // the consistent YarnApplicationState. + // Will remove YarnApplicationState.ACCEPTED. + private boolean isRunningState(YarnApplicationState appState) { + return appState == YarnApplicationState.ACCEPTED + || appState == YarnApplicationState.RUNNING; + } + + private boolean isFinishedState(YarnApplicationState appState) { + return appState == YarnApplicationState.FINISHED + || appState == YarnApplicationState.FAILED + || appState == YarnApplicationState.KILLED; + } + + private Response createBadResponse(Status status, String errMessage) { + Response response = Response.status(status) + .entity(DOT_JOINER.join(status.toString(), errMessage)).build(); + return response; + } + + private boolean parseBooleanParam(String param) { + return ("true").equalsIgnoreCase(param); + } + + private Response sendStreamOutputResponse(ApplicationId appId, + String appOwner, String nodeId, String containerIdStr, + String fileName, boolean downloadFile) { + StreamingOutput stream = null; + try { + stream = getStreamingOutput(appId, appOwner, nodeId, + containerIdStr, fileName); + } catch (Exception ex) { + return createBadResponse(Status.INTERNAL_SERVER_ERROR, + ex.getMessage()); + } + if (stream == null) { + return createBadResponse(Status.INTERNAL_SERVER_ERROR, + "Can not get log for container: " + containerIdStr); + } + ResponseBuilder response = Response.ok(stream); + if (downloadFile) { + response.header("Content-Type", "application/octet-stream"); + response.header("Content-Disposition", "attachment; filename=" + + fileName); + } + return response.build(); + } + + private StreamingOutput getStreamingOutput(ApplicationId appId, + String appOwner, final String nodeId, final String containerIdStr, + final String logFile) throws IOException{ + String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); + org.apache.hadoop.fs.Path remoteRootLogDir = new org.apache.hadoop.fs.Path( + conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); + org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir = + FileContext.getFileContext(conf).makeQualified(remoteRootLogDir); + FileContext fc = FileContext.getFileContext( + qualifiedRemoteRootLogDir.toUri(), conf); + org.apache.hadoop.fs.Path remoteAppDir = null; + if (appOwner == null) { + org.apache.hadoop.fs.Path toMatch = LogAggregationUtils + .getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix); + FileStatus[] matching = fc.util().globStatus(toMatch); + if (matching == null || matching.length != 1) { + return null; + } + remoteAppDir = matching[0].getPath(); + } else { + remoteAppDir = LogAggregationUtils + .getRemoteAppLogDir(remoteRootLogDir, appId, appOwner, suffix); + } + final RemoteIterator nodeFiles; + nodeFiles = fc.listStatus(remoteAppDir); + if (!nodeFiles.hasNext()) { + return null; + } + + StreamingOutput stream = new StreamingOutput() { + + @Override + public void write(OutputStream os) throws IOException, + WebApplicationException { + byte[] buf = new byte[65535]; + boolean findLogs = false; + while (nodeFiles.hasNext()) { + final FileStatus thisNodeFile = nodeFiles.next(); + String nodeName = thisNodeFile.getPath().getName(); + if ((nodeId == null || nodeName.contains(LogAggregationUtils + .getNodeString(nodeId))) && !nodeName.endsWith( + LogAggregationUtils.TMP_FILE_SUFFIX)) { + AggregatedLogFormat.LogReader reader = + new AggregatedLogFormat.LogReader(conf, + thisNodeFile.getPath()); + DataInputStream valueStream; + LogKey key = new LogKey(); + valueStream = reader.next(key); + while (valueStream != null && !key.toString() + .equals(containerIdStr)) { + // Next container + key = new LogKey(); + valueStream = reader.next(key); + } + if (valueStream == null) { + continue; + } + while (true) { + try { + String fileType = valueStream.readUTF(); + String fileLengthStr = valueStream.readUTF(); + long fileLength = Long.parseLong(fileLengthStr); + if (fileType.equalsIgnoreCase(logFile)) { + StringBuilder sb = new StringBuilder(); + sb.append("LogType:"); + sb.append(fileType + "\n"); + sb.append("Log Upload Time:"); + sb.append(Times.format(System.currentTimeMillis()) + "\n"); + sb.append("LogLength:"); + sb.append(fileLengthStr + "\n"); + sb.append("Log Contents:\n"); + byte[] b = sb.toString().getBytes(Charset.forName("UTF-8")); + os.write(b, 0, b.length); + + long curRead = 0; + long pendingRead = fileLength - curRead; + int toRead = pendingRead > buf.length ? buf.length + : (int) pendingRead; + int len = valueStream.read(buf, 0, toRead); + while (len != -1 && curRead < fileLength) { + os.write(buf, 0, len); + curRead += len; + + pendingRead = fileLength - curRead; + toRead = pendingRead > buf.length ? buf.length + : (int) pendingRead; + len = valueStream.read(buf, 0, toRead); + } + sb = new StringBuilder(); + sb.append("\nEnd of LogType:" + fileType + "\n"); + b = sb.toString().getBytes(Charset.forName("UTF-8")); + os.write(b, 0, b.length); + findLogs = true; + } else { + long totalSkipped = 0; + long currSkipped = 0; + while (currSkipped != -1 && totalSkipped < fileLength) { + currSkipped = valueStream.skip(fileLength - totalSkipped); + totalSkipped += currSkipped; + } + } + } catch (EOFException eof) { + break; + } + } + } + } + os.flush(); + if (!findLogs) { + throw new IOException("Can not find logs for container:" + + containerIdStr); + } + } + }; + return stream; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java index dfc5b81b579..3c975842486 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java @@ -80,9 +80,11 @@ public static void prepareStore() throws Exception { store = createStore(SCALE); TimelineEntities entities = new TimelineEntities(); entities.addEntity(createApplicationTimelineEntity( - ApplicationId.newInstance(0, SCALE + 1), true, true, false, false)); + ApplicationId.newInstance(0, SCALE + 1), true, true, false, false, + YarnApplicationState.FINISHED)); entities.addEntity(createApplicationTimelineEntity( - ApplicationId.newInstance(0, SCALE + 2), true, false, true, false)); + ApplicationId.newInstance(0, SCALE + 2), true, false, true, false, + YarnApplicationState.FINISHED)); store.put(entities); } @@ -140,10 +142,10 @@ private static void prepareTimelineStore(TimelineStore store, int scale) ApplicationId appId = ApplicationId.newInstance(0, i); if (i == 2) { entities.addEntity(createApplicationTimelineEntity( - appId, true, false, false, true)); + appId, true, false, false, true, YarnApplicationState.FINISHED)); } else { entities.addEntity(createApplicationTimelineEntity( - appId, false, false, false, false)); + appId, false, false, false, false, YarnApplicationState.FINISHED)); } store.put(entities); for (int j = 1; j <= scale; ++j) { @@ -160,6 +162,16 @@ private static void prepareTimelineStore(TimelineStore store, int scale) } } } + TimelineEntities entities = new TimelineEntities(); + ApplicationId appId = ApplicationId.newInstance(1234, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1); + entities.addEntity(createApplicationTimelineEntity( + appId, true, false, false, false, YarnApplicationState.RUNNING)); + entities.addEntity(createAppAttemptTimelineEntity(appAttemptId)); + entities.addEntity(createContainerEntity(containerId)); + store.put(entities); } @Test @@ -355,7 +367,7 @@ public void testGetApplications() throws Exception { historyManager.getApplications(Long.MAX_VALUE, 0L, Long.MAX_VALUE) .values(); Assert.assertNotNull(apps); - Assert.assertEquals(SCALE + 1, apps.size()); + Assert.assertEquals(SCALE + 2, apps.size()); ApplicationId ignoredAppId = ApplicationId.newInstance(0, SCALE + 2); for (ApplicationReport app : apps) { Assert.assertNotEquals(ignoredAppId, app.getApplicationId()); @@ -467,7 +479,8 @@ public ContainerReport run() throws Exception { private static TimelineEntity createApplicationTimelineEntity( ApplicationId appId, boolean emptyACLs, boolean noAttemptId, - boolean wrongAppId, boolean enableUpdateEvent) { + boolean wrongAppId, boolean enableUpdateEvent, + YarnApplicationState state) { TimelineEntity entity = new TimelineEntity(); entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE); if (wrongAppId) { @@ -517,7 +530,7 @@ private static TimelineEntity createApplicationTimelineEntity( eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, FinalApplicationStatus.UNDEFINED.toString()); eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, - YarnApplicationState.FINISHED.toString()); + state.toString()); if (!noAttemptId) { eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, ApplicationAttemptId.newInstance(appId, 1)); @@ -610,6 +623,8 @@ private static TimelineEntity createContainerEntity(ContainerId containerId) { entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, 100); entityInfo .put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, -1); + entityInfo.put(ContainerMetricsConstants + .ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, "http://test:1234"); entity.setOtherInfo(entityInfo); TimelineEvent tEvent = new TimelineEvent(); tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java index 20dfe454cae..f985fe4cb84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java @@ -19,17 +19,30 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.File; +import java.io.FileWriter; +import java.io.Writer; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Properties; import javax.servlet.FilterConfig; import javax.servlet.ServletException; +import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.MediaType; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.server.AuthenticationFilter; import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler; import org.apache.hadoop.yarn.api.ApplicationBaseProtocol; @@ -42,6 +55,8 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryClientService; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManagerOnTimelineStore; import org.apache.hadoop.yarn.server.applicationhistoryservice.TestApplicationHistoryManagerOnTimelineStore; @@ -81,12 +96,17 @@ public class TestAHSWebServices extends JerseyTestBase { private static ApplicationHistoryClientService historyClientService; + private static AHSWebServices ahsWebservice; private static final String[] USERS = new String[] { "foo" , "bar" }; private static final int MAX_APPS = 5; + private static Configuration conf; + private static FileSystem fs; + private static final String remoteLogRootDir = "target/logs/"; + private static final String rootLogDir = "target/LocalLogs"; @BeforeClass public static void setupClass() throws Exception { - Configuration conf = new YarnConfiguration(); + conf = new YarnConfiguration(); TimelineStore store = TestApplicationHistoryManagerOnTimelineStore.createStore(MAX_APPS); TimelineACLsManager aclsManager = new TimelineACLsManager(conf); @@ -95,6 +115,8 @@ public static void setupClass() throws Exception { new TimelineDataManager(store, aclsManager); conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); conf.set(YarnConfiguration.YARN_ADMIN_ACL, "foo"); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir); dataManager.init(conf); ApplicationACLsManager appAclsManager = new ApplicationACLsManager(conf); ApplicationHistoryManagerOnTimelineStore historyManager = @@ -108,6 +130,8 @@ protected void serviceStart() throws Exception { }; historyClientService.init(conf); historyClientService.start(); + ahsWebservice = new AHSWebServices(historyClientService, conf); + fs = FileSystem.get(conf); } @AfterClass @@ -115,6 +139,8 @@ public static void tearDownClass() throws Exception { if (historyClientService != null) { historyClientService.stop(); } + fs.delete(new Path(remoteLogRootDir), true); + fs.delete(new Path(rootLogDir), true); } @Parameterized.Parameters @@ -127,7 +153,7 @@ public static Collection rounds() { @Override protected void configureServlets() { bind(JAXBContextResolver.class); - bind(AHSWebServices.class); + bind(AHSWebServices.class).toInstance(ahsWebservice);; bind(GenericExceptionHandler.class); bind(ApplicationBaseProtocol.class).toInstance(historyClientService); serve("/*").with(GuiceContainer.class); @@ -471,4 +497,177 @@ public void testSingleContainer() throws Exception { assertEquals(ContainerState.COMPLETE.toString(), container.getString("containerState")); } + + @Test(timeout = 10000) + public void testContainerLogsForFinishedApps() throws Exception { + String fileName = "syslog"; + String user = "user1"; + UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user1"); + NodeId nodeId = NodeId.newInstance("test host", 100); + NodeId nodeId2 = NodeId.newInstance("host2", 1234); + //prepare the logs for remote directory + ApplicationId appId = ApplicationId.newInstance(0, 1); + // create local logs + List rootLogDirList = new ArrayList(); + rootLogDirList.add(rootLogDir); + Path rootLogDirPath = new Path(rootLogDir); + if (fs.exists(rootLogDirPath)) { + fs.delete(rootLogDirPath, true); + } + assertTrue(fs.mkdirs(rootLogDirPath)); + + Path appLogsDir = new Path(rootLogDirPath, appId.toString()); + if (fs.exists(appLogsDir)) { + fs.delete(appLogsDir, true); + } + assertTrue(fs.mkdirs(appLogsDir)); + + // create container logs in local log file dir + // create two container log files. We can get containerInfo + // for container1 from AHS, but can not get such info for + // container100 + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); + ContainerId containerId100 = ContainerId.newContainerId(appAttemptId, 100); + createContainerLogInLocalDir(appLogsDir, containerId1, fs, fileName, + ("Hello." + containerId1)); + createContainerLogInLocalDir(appLogsDir, containerId100, fs, fileName, + ("Hello." + containerId100)); + + // upload container logs to remote log dir + Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) + + user + "/logs/" + appId.toString()); + if (fs.exists(path)) { + fs.delete(path, true); + } + assertTrue(fs.mkdirs(path)); + uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId, + containerId1, path, fs); + uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId2, + containerId100, path, fs); + + // test whether we can find container log from remote diretory if + // the containerInfo for this container could be fetched from AHS. + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1") + .path("applicationhistory").path("containerlogs") + .path(containerId1.toString()).path(fileName) + .queryParam("user.name", user) + .accept(MediaType.TEXT_PLAIN) + .get(ClientResponse.class); + String responseText = response.getEntity(String.class); + assertTrue(responseText.contains("Hello." + containerId1)); + + // test whether we can find container log from remote diretory if + // the containerInfo for this container could not be fetched from AHS. + r = resource(); + response = r.path("ws").path("v1") + .path("applicationhistory").path("containerlogs") + .path(containerId100.toString()).path(fileName) + .queryParam("user.name", user) + .accept(MediaType.TEXT_PLAIN) + .get(ClientResponse.class); + responseText = response.getEntity(String.class); + assertTrue(responseText.contains("Hello." + containerId100)); + + // create an application which can not be found from AHS + ApplicationId appId100 = ApplicationId.newInstance(0, 100); + appLogsDir = new Path(rootLogDirPath, appId100.toString()); + if (fs.exists(appLogsDir)) { + fs.delete(appLogsDir, true); + } + assertTrue(fs.mkdirs(appLogsDir)); + ApplicationAttemptId appAttemptId100 = + ApplicationAttemptId.newInstance(appId100, 1); + ContainerId containerId1ForApp100 = ContainerId + .newContainerId(appAttemptId100, 1); + createContainerLogInLocalDir(appLogsDir, containerId1ForApp100, fs, + fileName, ("Hello." + containerId1ForApp100)); + path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) + + user + "/logs/" + appId100.toString()); + if (fs.exists(path)) { + fs.delete(path, true); + } + assertTrue(fs.mkdirs(path)); + uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId2, + containerId1ForApp100, path, fs); + r = resource(); + response = r.path("ws").path("v1") + .path("applicationhistory").path("containerlogs") + .path(containerId1ForApp100.toString()).path(fileName) + .queryParam("user.name", user) + .accept(MediaType.TEXT_PLAIN) + .get(ClientResponse.class); + responseText = response.getEntity(String.class); + assertTrue(responseText.contains("Hello." + containerId1ForApp100)); + } + + private static void createContainerLogInLocalDir(Path appLogsDir, + ContainerId containerId, FileSystem fs, String fileName, String content) + throws Exception { + Path containerLogsDir = new Path(appLogsDir, containerId.toString()); + if (fs.exists(containerLogsDir)) { + fs.delete(containerLogsDir, true); + } + assertTrue(fs.mkdirs(containerLogsDir)); + Writer writer = + new FileWriter(new File(containerLogsDir.toString(), fileName)); + writer.write(content); + writer.close(); + } + + private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi, + Configuration configuration, List rootLogDirs, NodeId nodeId, + ContainerId containerId, Path appDir, FileSystem fs) throws Exception { + Path path = + new Path(appDir, LogAggregationUtils.getNodeString(nodeId)); + AggregatedLogFormat.LogWriter writer = + new AggregatedLogFormat.LogWriter(configuration, path, ugi); + writer.writeApplicationOwner(ugi.getUserName()); + + writer.append(new AggregatedLogFormat.LogKey(containerId), + new AggregatedLogFormat.LogValue(rootLogDirs, containerId, + ugi.getShortUserName())); + writer.close(); + } + + @Test(timeout = 10000) + public void testContainerLogsForRunningApps() throws Exception { + String fileName = "syslog"; + String user = "user1"; + ApplicationId appId = ApplicationId.newInstance( + 1234, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); + WebResource r = resource(); + URI requestURI = r.path("ws").path("v1") + .path("applicationhistory").path("containerlogs") + .path(containerId1.toString()).path(fileName) + .queryParam("user.name", user).getURI(); + String redirectURL = getRedirectURL(requestURI.toString()); + assertTrue(redirectURL != null); + assertTrue(redirectURL.contains("test:1234")); + assertTrue(redirectURL.contains("ws/v1/node/containerlogs")); + assertTrue(redirectURL.contains(containerId1.toString())); + assertTrue(redirectURL.contains("user.name=" + user)); + } + + private static String getRedirectURL(String url) { + String redirectUrl = null; + try { + HttpURLConnection conn = (HttpURLConnection) new URL(url) + .openConnection(); + // do not automatically follow the redirection + // otherwise we get too many redirections exception + conn.setInstanceFollowRedirects(false); + if(conn.getResponseCode() == HttpServletResponse.SC_TEMPORARY_REDIRECT) { + redirectUrl = conn.getHeaderField("Location"); + } + } catch (Exception e) { + // throw new RuntimeException(e); + } + return redirectUrl; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerInfo.java index d0d4df68f3d..f127f9cda5c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerInfo.java @@ -48,6 +48,7 @@ public class ContainerInfo { protected int containerExitStatus; protected ContainerState containerState; protected String nodeHttpAddress; + protected String nodeId; public ContainerInfo() { // JAXB needs this @@ -71,6 +72,7 @@ public ContainerInfo(ContainerReport container) { containerExitStatus = container.getContainerExitStatus(); containerState = container.getContainerState(); nodeHttpAddress = container.getNodeHttpAddress(); + nodeId = container.getAssignedNode().toString(); } public String getContainerId() { @@ -124,4 +126,8 @@ public ContainerState getContainerState() { public String getNodeHttpAddress() { return nodeHttpAddress; } + + public String getNodeId() { + return nodeId; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java index fddeb042e5f..57e729c025f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java @@ -33,6 +33,7 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.UriInfo; @@ -215,7 +216,8 @@ public ContainerInfo getNodeContainer(@javax.ws.rs.core.Context @Public @Unstable public Response getLogs(@PathParam("containerid") String containerIdStr, - @PathParam("filename") String filename) { + @PathParam("filename") String filename, + @QueryParam("download") String download) { ContainerId containerId; try { containerId = ConverterUtils.toContainerId(containerIdStr); @@ -232,7 +234,7 @@ public Response getLogs(@PathParam("containerid") String containerIdStr, } catch (YarnException ex) { return Response.serverError().entity(ex.getMessage()).build(); } - + boolean downloadFile = parseBooleanParam(download); try { final FileInputStream fis = ContainerLogsUtils.openLogFileForRead( containerIdStr, logFile, nmContext); @@ -250,10 +252,22 @@ public void write(OutputStream os) throws IOException, os.flush(); } }; - - return Response.ok(stream).build(); + ResponseBuilder resp = Response.ok(stream); + if (downloadFile) { + resp.header("Content-Type", "application/octet-stream"); + resp.header("Content-Disposition", "attachment; filename=" + + logFile.getName()); + } + return resp.build(); } catch (IOException ex) { return Response.serverError().entity(ex.getMessage()).build(); } } + + private boolean parseBooleanParam(String param) { + if (param != null) { + return ("true").equalsIgnoreCase(param); + } + return false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index 1f5590ca46f..c10d4c88794 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -50,7 +50,6 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.ResourceView; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer.NMWebApp; @@ -352,7 +351,16 @@ public void testContainerLogs() throws IOException { .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); String responseText = response.getEntity(String.class); assertEquals(logMessage, responseText); - + + // ask and download it + response = r.path("ws").path("v1").path("node").path("containerlogs") + .path(containerIdStr).path(filename).queryParam("download", "true") + .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); + responseText = response.getEntity(String.class); + assertEquals(logMessage, responseText); + assertEquals(200, response.getStatus()); + assertEquals("application/octet-stream", response.getType().toString()); + // ask for file that doesn't exist response = r.path("ws").path("v1").path("node") .path("containerlogs").path(containerIdStr).path("uhhh")