YARN-4920. ATS/NM should support a link to dowload/get the logs in text format. Contributed by Xuan Gong.
This commit is contained in:
parent
af942585a1
commit
e61d431275
|
@ -18,6 +18,11 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -28,13 +33,30 @@ import javax.ws.rs.Path;
|
|||
import javax.ws.rs.PathParam;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.WebApplicationException;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.StreamingOutput;
|
||||
import javax.ws.rs.core.Response.ResponseBuilder;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||
import org.apache.hadoop.yarn.server.webapp.WebServices;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo;
|
||||
|
@ -42,9 +64,10 @@ import org.apache.hadoop.yarn.server.webapp.dao.AppInfo;
|
|||
import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
|
||||
|
@ -52,9 +75,17 @@ import com.google.inject.Singleton;
|
|||
@Path("/ws/v1/applicationhistory")
|
||||
public class AHSWebServices extends WebServices {
|
||||
|
||||
private static final String NM_DOWNLOAD_URI_STR =
|
||||
"/ws/v1/node/containerlogs";
|
||||
private static final Joiner JOINER = Joiner.on("");
|
||||
private static final Joiner DOT_JOINER = Joiner.on(". ");
|
||||
private final Configuration conf;
|
||||
|
||||
@Inject
|
||||
public AHSWebServices(ApplicationBaseProtocol appBaseProt) {
|
||||
public AHSWebServices(ApplicationBaseProtocol appBaseProt,
|
||||
Configuration conf) {
|
||||
super(appBaseProt);
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@GET
|
||||
|
@ -173,4 +204,239 @@ public class AHSWebServices extends WebServices {
|
|||
}
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/containerlogs/{containerid}/{filename}")
|
||||
@Produces({ MediaType.TEXT_PLAIN })
|
||||
@Public
|
||||
@Unstable
|
||||
public Response getLogs(@Context HttpServletRequest req,
|
||||
@Context HttpServletResponse res,
|
||||
@PathParam("containerid") String containerIdStr,
|
||||
@PathParam("filename") String filename,
|
||||
@QueryParam("download") String download) {
|
||||
init(res);
|
||||
ContainerId containerId;
|
||||
try {
|
||||
containerId = ContainerId.fromString(containerIdStr);
|
||||
} catch (IllegalArgumentException ex) {
|
||||
return createBadResponse(Status.NOT_FOUND,
|
||||
"Invalid ContainerId: " + containerIdStr);
|
||||
}
|
||||
|
||||
boolean downloadFile = parseBooleanParam(download);
|
||||
|
||||
ApplicationId appId = containerId.getApplicationAttemptId()
|
||||
.getApplicationId();
|
||||
AppInfo appInfo;
|
||||
try {
|
||||
appInfo = super.getApp(req, res, appId.toString());
|
||||
} catch (Exception ex) {
|
||||
// directly find logs from HDFS.
|
||||
return sendStreamOutputResponse(appId, null, null, containerIdStr,
|
||||
filename, downloadFile);
|
||||
}
|
||||
String appOwner = appInfo.getUser();
|
||||
|
||||
ContainerInfo containerInfo;
|
||||
try {
|
||||
containerInfo = super.getContainer(
|
||||
req, res, appId.toString(),
|
||||
containerId.getApplicationAttemptId().toString(),
|
||||
containerId.toString());
|
||||
} catch (Exception ex) {
|
||||
if (isFinishedState(appInfo.getAppState())) {
|
||||
// directly find logs from HDFS.
|
||||
return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
|
||||
filename, downloadFile);
|
||||
}
|
||||
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
|
||||
"Can not get ContainerInfo for the container: " + containerId);
|
||||
}
|
||||
String nodeId = containerInfo.getNodeId();
|
||||
if (isRunningState(appInfo.getAppState())) {
|
||||
String nodeHttpAddress = containerInfo.getNodeHttpAddress();
|
||||
String uri = "/" + containerId.toString() + "/" + filename;
|
||||
String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri);
|
||||
String query = req.getQueryString();
|
||||
if (query != null && !query.isEmpty()) {
|
||||
resURI += "?" + query;
|
||||
}
|
||||
ResponseBuilder response = Response.status(
|
||||
HttpServletResponse.SC_TEMPORARY_REDIRECT);
|
||||
response.header("Location", resURI);
|
||||
return response.build();
|
||||
} else if (isFinishedState(appInfo.getAppState())) {
|
||||
return sendStreamOutputResponse(appId, appOwner, nodeId,
|
||||
containerIdStr, filename, downloadFile);
|
||||
} else {
|
||||
return createBadResponse(Status.NOT_FOUND,
|
||||
"The application is not at Running or Finished State.");
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: YARN-5029. RM would send the update event. We could get
|
||||
// the consistent YarnApplicationState.
|
||||
// Will remove YarnApplicationState.ACCEPTED.
|
||||
private boolean isRunningState(YarnApplicationState appState) {
|
||||
return appState == YarnApplicationState.ACCEPTED
|
||||
|| appState == YarnApplicationState.RUNNING;
|
||||
}
|
||||
|
||||
private boolean isFinishedState(YarnApplicationState appState) {
|
||||
return appState == YarnApplicationState.FINISHED
|
||||
|| appState == YarnApplicationState.FAILED
|
||||
|| appState == YarnApplicationState.KILLED;
|
||||
}
|
||||
|
||||
private Response createBadResponse(Status status, String errMessage) {
|
||||
Response response = Response.status(status)
|
||||
.entity(DOT_JOINER.join(status.toString(), errMessage)).build();
|
||||
return response;
|
||||
}
|
||||
|
||||
private boolean parseBooleanParam(String param) {
|
||||
return ("true").equalsIgnoreCase(param);
|
||||
}
|
||||
|
||||
private Response sendStreamOutputResponse(ApplicationId appId,
|
||||
String appOwner, String nodeId, String containerIdStr,
|
||||
String fileName, boolean downloadFile) {
|
||||
StreamingOutput stream = null;
|
||||
try {
|
||||
stream = getStreamingOutput(appId, appOwner, nodeId,
|
||||
containerIdStr, fileName);
|
||||
} catch (Exception ex) {
|
||||
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
|
||||
ex.getMessage());
|
||||
}
|
||||
if (stream == null) {
|
||||
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
|
||||
"Can not get log for container: " + containerIdStr);
|
||||
}
|
||||
ResponseBuilder response = Response.ok(stream);
|
||||
if (downloadFile) {
|
||||
response.header("Content-Type", "application/octet-stream");
|
||||
response.header("Content-Disposition", "attachment; filename="
|
||||
+ fileName);
|
||||
}
|
||||
return response.build();
|
||||
}
|
||||
|
||||
private StreamingOutput getStreamingOutput(ApplicationId appId,
|
||||
String appOwner, final String nodeId, final String containerIdStr,
|
||||
final String logFile) throws IOException{
|
||||
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
|
||||
org.apache.hadoop.fs.Path remoteRootLogDir = new org.apache.hadoop.fs.Path(
|
||||
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
||||
org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
|
||||
FileContext.getFileContext(conf).makeQualified(remoteRootLogDir);
|
||||
FileContext fc = FileContext.getFileContext(
|
||||
qualifiedRemoteRootLogDir.toUri(), conf);
|
||||
org.apache.hadoop.fs.Path remoteAppDir = null;
|
||||
if (appOwner == null) {
|
||||
org.apache.hadoop.fs.Path toMatch = LogAggregationUtils
|
||||
.getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
|
||||
FileStatus[] matching = fc.util().globStatus(toMatch);
|
||||
if (matching == null || matching.length != 1) {
|
||||
return null;
|
||||
}
|
||||
remoteAppDir = matching[0].getPath();
|
||||
} else {
|
||||
remoteAppDir = LogAggregationUtils
|
||||
.getRemoteAppLogDir(remoteRootLogDir, appId, appOwner, suffix);
|
||||
}
|
||||
final RemoteIterator<FileStatus> nodeFiles;
|
||||
nodeFiles = fc.listStatus(remoteAppDir);
|
||||
if (!nodeFiles.hasNext()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
StreamingOutput stream = new StreamingOutput() {
|
||||
|
||||
@Override
|
||||
public void write(OutputStream os) throws IOException,
|
||||
WebApplicationException {
|
||||
byte[] buf = new byte[65535];
|
||||
boolean findLogs = false;
|
||||
while (nodeFiles.hasNext()) {
|
||||
final FileStatus thisNodeFile = nodeFiles.next();
|
||||
String nodeName = thisNodeFile.getPath().getName();
|
||||
if ((nodeId == null || nodeName.contains(LogAggregationUtils
|
||||
.getNodeString(nodeId))) && !nodeName.endsWith(
|
||||
LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
AggregatedLogFormat.LogReader reader =
|
||||
new AggregatedLogFormat.LogReader(conf,
|
||||
thisNodeFile.getPath());
|
||||
DataInputStream valueStream;
|
||||
LogKey key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
while (valueStream != null && !key.toString()
|
||||
.equals(containerIdStr)) {
|
||||
// Next container
|
||||
key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
}
|
||||
if (valueStream == null) {
|
||||
continue;
|
||||
}
|
||||
while (true) {
|
||||
try {
|
||||
String fileType = valueStream.readUTF();
|
||||
String fileLengthStr = valueStream.readUTF();
|
||||
long fileLength = Long.parseLong(fileLengthStr);
|
||||
if (fileType.equalsIgnoreCase(logFile)) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("LogType:");
|
||||
sb.append(fileType + "\n");
|
||||
sb.append("Log Upload Time:");
|
||||
sb.append(Times.format(System.currentTimeMillis()) + "\n");
|
||||
sb.append("LogLength:");
|
||||
sb.append(fileLengthStr + "\n");
|
||||
sb.append("Log Contents:\n");
|
||||
byte[] b = sb.toString().getBytes(Charset.forName("UTF-8"));
|
||||
os.write(b, 0, b.length);
|
||||
|
||||
long curRead = 0;
|
||||
long pendingRead = fileLength - curRead;
|
||||
int toRead = pendingRead > buf.length ? buf.length
|
||||
: (int) pendingRead;
|
||||
int len = valueStream.read(buf, 0, toRead);
|
||||
while (len != -1 && curRead < fileLength) {
|
||||
os.write(buf, 0, len);
|
||||
curRead += len;
|
||||
|
||||
pendingRead = fileLength - curRead;
|
||||
toRead = pendingRead > buf.length ? buf.length
|
||||
: (int) pendingRead;
|
||||
len = valueStream.read(buf, 0, toRead);
|
||||
}
|
||||
sb = new StringBuilder();
|
||||
sb.append("\nEnd of LogType:" + fileType + "\n");
|
||||
b = sb.toString().getBytes(Charset.forName("UTF-8"));
|
||||
os.write(b, 0, b.length);
|
||||
findLogs = true;
|
||||
} else {
|
||||
long totalSkipped = 0;
|
||||
long currSkipped = 0;
|
||||
while (currSkipped != -1 && totalSkipped < fileLength) {
|
||||
currSkipped = valueStream.skip(fileLength - totalSkipped);
|
||||
totalSkipped += currSkipped;
|
||||
}
|
||||
}
|
||||
} catch (EOFException eof) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
os.flush();
|
||||
if (!findLogs) {
|
||||
throw new IOException("Can not find logs for container:"
|
||||
+ containerIdStr);
|
||||
}
|
||||
}
|
||||
};
|
||||
return stream;
|
||||
}
|
||||
}
|
|
@ -80,9 +80,11 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
|||
store = createStore(SCALE);
|
||||
TimelineEntities entities = new TimelineEntities();
|
||||
entities.addEntity(createApplicationTimelineEntity(
|
||||
ApplicationId.newInstance(0, SCALE + 1), true, true, false, false));
|
||||
ApplicationId.newInstance(0, SCALE + 1), true, true, false, false,
|
||||
YarnApplicationState.FINISHED));
|
||||
entities.addEntity(createApplicationTimelineEntity(
|
||||
ApplicationId.newInstance(0, SCALE + 2), true, false, true, false));
|
||||
ApplicationId.newInstance(0, SCALE + 2), true, false, true, false,
|
||||
YarnApplicationState.FINISHED));
|
||||
store.put(entities);
|
||||
}
|
||||
|
||||
|
@ -140,10 +142,10 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
|||
ApplicationId appId = ApplicationId.newInstance(0, i);
|
||||
if (i == 2) {
|
||||
entities.addEntity(createApplicationTimelineEntity(
|
||||
appId, true, false, false, true));
|
||||
appId, true, false, false, true, YarnApplicationState.FINISHED));
|
||||
} else {
|
||||
entities.addEntity(createApplicationTimelineEntity(
|
||||
appId, false, false, false, false));
|
||||
appId, false, false, false, false, YarnApplicationState.FINISHED));
|
||||
}
|
||||
store.put(entities);
|
||||
for (int j = 1; j <= scale; ++j) {
|
||||
|
@ -160,6 +162,16 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
|||
}
|
||||
}
|
||||
}
|
||||
TimelineEntities entities = new TimelineEntities();
|
||||
ApplicationId appId = ApplicationId.newInstance(1234, 1);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
|
||||
entities.addEntity(createApplicationTimelineEntity(
|
||||
appId, true, false, false, false, YarnApplicationState.RUNNING));
|
||||
entities.addEntity(createAppAttemptTimelineEntity(appAttemptId));
|
||||
entities.addEntity(createContainerEntity(containerId));
|
||||
store.put(entities);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -355,7 +367,7 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
|||
historyManager.getApplications(Long.MAX_VALUE, 0L, Long.MAX_VALUE)
|
||||
.values();
|
||||
Assert.assertNotNull(apps);
|
||||
Assert.assertEquals(SCALE + 1, apps.size());
|
||||
Assert.assertEquals(SCALE + 2, apps.size());
|
||||
ApplicationId ignoredAppId = ApplicationId.newInstance(0, SCALE + 2);
|
||||
for (ApplicationReport app : apps) {
|
||||
Assert.assertNotEquals(ignoredAppId, app.getApplicationId());
|
||||
|
@ -467,7 +479,8 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
|||
|
||||
private static TimelineEntity createApplicationTimelineEntity(
|
||||
ApplicationId appId, boolean emptyACLs, boolean noAttemptId,
|
||||
boolean wrongAppId, boolean enableUpdateEvent) {
|
||||
boolean wrongAppId, boolean enableUpdateEvent,
|
||||
YarnApplicationState state) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
|
||||
if (wrongAppId) {
|
||||
|
@ -517,7 +530,7 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
|||
eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
|
||||
FinalApplicationStatus.UNDEFINED.toString());
|
||||
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
|
||||
YarnApplicationState.FINISHED.toString());
|
||||
state.toString());
|
||||
if (!noAttemptId) {
|
||||
eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
|
||||
ApplicationAttemptId.newInstance(appId, 1));
|
||||
|
@ -610,6 +623,8 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
|||
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, 100);
|
||||
entityInfo
|
||||
.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, -1);
|
||||
entityInfo.put(ContainerMetricsConstants
|
||||
.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, "http://test:1234");
|
||||
entity.setOtherInfo(entityInfo);
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
|
||||
|
|
|
@ -19,17 +19,30 @@
|
|||
package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.Writer;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import javax.servlet.FilterConfig;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
|
||||
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
|
||||
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
|
||||
|
@ -42,6 +55,8 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryClientService;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManagerOnTimelineStore;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.TestApplicationHistoryManagerOnTimelineStore;
|
||||
|
@ -81,12 +96,17 @@ import com.sun.jersey.test.framework.WebAppDescriptor;
|
|||
public class TestAHSWebServices extends JerseyTestBase {
|
||||
|
||||
private static ApplicationHistoryClientService historyClientService;
|
||||
private static AHSWebServices ahsWebservice;
|
||||
private static final String[] USERS = new String[] { "foo" , "bar" };
|
||||
private static final int MAX_APPS = 5;
|
||||
private static Configuration conf;
|
||||
private static FileSystem fs;
|
||||
private static final String remoteLogRootDir = "target/logs/";
|
||||
private static final String rootLogDir = "target/LocalLogs";
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClass() throws Exception {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf = new YarnConfiguration();
|
||||
TimelineStore store =
|
||||
TestApplicationHistoryManagerOnTimelineStore.createStore(MAX_APPS);
|
||||
TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
|
||||
|
@ -95,6 +115,8 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
new TimelineDataManager(store, aclsManager);
|
||||
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "foo");
|
||||
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir);
|
||||
dataManager.init(conf);
|
||||
ApplicationACLsManager appAclsManager = new ApplicationACLsManager(conf);
|
||||
ApplicationHistoryManagerOnTimelineStore historyManager =
|
||||
|
@ -108,6 +130,8 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
};
|
||||
historyClientService.init(conf);
|
||||
historyClientService.start();
|
||||
ahsWebservice = new AHSWebServices(historyClientService, conf);
|
||||
fs = FileSystem.get(conf);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -115,6 +139,8 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
if (historyClientService != null) {
|
||||
historyClientService.stop();
|
||||
}
|
||||
fs.delete(new Path(remoteLogRootDir), true);
|
||||
fs.delete(new Path(rootLogDir), true);
|
||||
}
|
||||
|
||||
@Parameterized.Parameters
|
||||
|
@ -127,7 +153,7 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
@Override
|
||||
protected void configureServlets() {
|
||||
bind(JAXBContextResolver.class);
|
||||
bind(AHSWebServices.class);
|
||||
bind(AHSWebServices.class).toInstance(ahsWebservice);;
|
||||
bind(GenericExceptionHandler.class);
|
||||
bind(ApplicationBaseProtocol.class).toInstance(historyClientService);
|
||||
serve("/*").with(GuiceContainer.class);
|
||||
|
@ -471,4 +497,177 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|||
assertEquals(ContainerState.COMPLETE.toString(),
|
||||
container.getString("containerState"));
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testContainerLogsForFinishedApps() throws Exception {
|
||||
String fileName = "syslog";
|
||||
String user = "user1";
|
||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user1");
|
||||
NodeId nodeId = NodeId.newInstance("test host", 100);
|
||||
NodeId nodeId2 = NodeId.newInstance("host2", 1234);
|
||||
//prepare the logs for remote directory
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
// create local logs
|
||||
List<String> rootLogDirList = new ArrayList<String>();
|
||||
rootLogDirList.add(rootLogDir);
|
||||
Path rootLogDirPath = new Path(rootLogDir);
|
||||
if (fs.exists(rootLogDirPath)) {
|
||||
fs.delete(rootLogDirPath, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(rootLogDirPath));
|
||||
|
||||
Path appLogsDir = new Path(rootLogDirPath, appId.toString());
|
||||
if (fs.exists(appLogsDir)) {
|
||||
fs.delete(appLogsDir, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(appLogsDir));
|
||||
|
||||
// create container logs in local log file dir
|
||||
// create two container log files. We can get containerInfo
|
||||
// for container1 from AHS, but can not get such info for
|
||||
// container100
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
|
||||
ContainerId containerId100 = ContainerId.newContainerId(appAttemptId, 100);
|
||||
createContainerLogInLocalDir(appLogsDir, containerId1, fs, fileName,
|
||||
("Hello." + containerId1));
|
||||
createContainerLogInLocalDir(appLogsDir, containerId100, fs, fileName,
|
||||
("Hello." + containerId100));
|
||||
|
||||
// upload container logs to remote log dir
|
||||
Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) +
|
||||
user + "/logs/" + appId.toString());
|
||||
if (fs.exists(path)) {
|
||||
fs.delete(path, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(path));
|
||||
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId,
|
||||
containerId1, path, fs);
|
||||
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId2,
|
||||
containerId100, path, fs);
|
||||
|
||||
// test whether we can find container log from remote diretory if
|
||||
// the containerInfo for this container could be fetched from AHS.
|
||||
WebResource r = resource();
|
||||
ClientResponse response = r.path("ws").path("v1")
|
||||
.path("applicationhistory").path("containerlogs")
|
||||
.path(containerId1.toString()).path(fileName)
|
||||
.queryParam("user.name", user)
|
||||
.accept(MediaType.TEXT_PLAIN)
|
||||
.get(ClientResponse.class);
|
||||
String responseText = response.getEntity(String.class);
|
||||
assertTrue(responseText.contains("Hello." + containerId1));
|
||||
|
||||
// test whether we can find container log from remote diretory if
|
||||
// the containerInfo for this container could not be fetched from AHS.
|
||||
r = resource();
|
||||
response = r.path("ws").path("v1")
|
||||
.path("applicationhistory").path("containerlogs")
|
||||
.path(containerId100.toString()).path(fileName)
|
||||
.queryParam("user.name", user)
|
||||
.accept(MediaType.TEXT_PLAIN)
|
||||
.get(ClientResponse.class);
|
||||
responseText = response.getEntity(String.class);
|
||||
assertTrue(responseText.contains("Hello." + containerId100));
|
||||
|
||||
// create an application which can not be found from AHS
|
||||
ApplicationId appId100 = ApplicationId.newInstance(0, 100);
|
||||
appLogsDir = new Path(rootLogDirPath, appId100.toString());
|
||||
if (fs.exists(appLogsDir)) {
|
||||
fs.delete(appLogsDir, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(appLogsDir));
|
||||
ApplicationAttemptId appAttemptId100 =
|
||||
ApplicationAttemptId.newInstance(appId100, 1);
|
||||
ContainerId containerId1ForApp100 = ContainerId
|
||||
.newContainerId(appAttemptId100, 1);
|
||||
createContainerLogInLocalDir(appLogsDir, containerId1ForApp100, fs,
|
||||
fileName, ("Hello." + containerId1ForApp100));
|
||||
path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) +
|
||||
user + "/logs/" + appId100.toString());
|
||||
if (fs.exists(path)) {
|
||||
fs.delete(path, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(path));
|
||||
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId2,
|
||||
containerId1ForApp100, path, fs);
|
||||
r = resource();
|
||||
response = r.path("ws").path("v1")
|
||||
.path("applicationhistory").path("containerlogs")
|
||||
.path(containerId1ForApp100.toString()).path(fileName)
|
||||
.queryParam("user.name", user)
|
||||
.accept(MediaType.TEXT_PLAIN)
|
||||
.get(ClientResponse.class);
|
||||
responseText = response.getEntity(String.class);
|
||||
assertTrue(responseText.contains("Hello." + containerId1ForApp100));
|
||||
}
|
||||
|
||||
private static void createContainerLogInLocalDir(Path appLogsDir,
|
||||
ContainerId containerId, FileSystem fs, String fileName, String content)
|
||||
throws Exception {
|
||||
Path containerLogsDir = new Path(appLogsDir, containerId.toString());
|
||||
if (fs.exists(containerLogsDir)) {
|
||||
fs.delete(containerLogsDir, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(containerLogsDir));
|
||||
Writer writer =
|
||||
new FileWriter(new File(containerLogsDir.toString(), fileName));
|
||||
writer.write(content);
|
||||
writer.close();
|
||||
}
|
||||
|
||||
private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
|
||||
Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
|
||||
ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
|
||||
Path path =
|
||||
new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
|
||||
AggregatedLogFormat.LogWriter writer =
|
||||
new AggregatedLogFormat.LogWriter(configuration, path, ugi);
|
||||
writer.writeApplicationOwner(ugi.getUserName());
|
||||
|
||||
writer.append(new AggregatedLogFormat.LogKey(containerId),
|
||||
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
|
||||
ugi.getShortUserName()));
|
||||
writer.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testContainerLogsForRunningApps() throws Exception {
|
||||
String fileName = "syslog";
|
||||
String user = "user1";
|
||||
ApplicationId appId = ApplicationId.newInstance(
|
||||
1234, 1);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
|
||||
WebResource r = resource();
|
||||
URI requestURI = r.path("ws").path("v1")
|
||||
.path("applicationhistory").path("containerlogs")
|
||||
.path(containerId1.toString()).path(fileName)
|
||||
.queryParam("user.name", user).getURI();
|
||||
String redirectURL = getRedirectURL(requestURI.toString());
|
||||
assertTrue(redirectURL != null);
|
||||
assertTrue(redirectURL.contains("test:1234"));
|
||||
assertTrue(redirectURL.contains("ws/v1/node/containerlogs"));
|
||||
assertTrue(redirectURL.contains(containerId1.toString()));
|
||||
assertTrue(redirectURL.contains("user.name=" + user));
|
||||
}
|
||||
|
||||
private static String getRedirectURL(String url) {
|
||||
String redirectUrl = null;
|
||||
try {
|
||||
HttpURLConnection conn = (HttpURLConnection) new URL(url)
|
||||
.openConnection();
|
||||
// do not automatically follow the redirection
|
||||
// otherwise we get too many redirections exception
|
||||
conn.setInstanceFollowRedirects(false);
|
||||
if(conn.getResponseCode() == HttpServletResponse.SC_TEMPORARY_REDIRECT) {
|
||||
redirectUrl = conn.getHeaderField("Location");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// throw new RuntimeException(e);
|
||||
}
|
||||
return redirectUrl;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ public class ContainerInfo {
|
|||
protected int containerExitStatus;
|
||||
protected ContainerState containerState;
|
||||
protected String nodeHttpAddress;
|
||||
protected String nodeId;
|
||||
|
||||
public ContainerInfo() {
|
||||
// JAXB needs this
|
||||
|
@ -71,6 +72,7 @@ public class ContainerInfo {
|
|||
containerExitStatus = container.getContainerExitStatus();
|
||||
containerState = container.getContainerState();
|
||||
nodeHttpAddress = container.getNodeHttpAddress();
|
||||
nodeId = container.getAssignedNode().toString();
|
||||
}
|
||||
|
||||
public String getContainerId() {
|
||||
|
@ -124,4 +126,8 @@ public class ContainerInfo {
|
|||
public String getNodeHttpAddress() {
|
||||
return nodeHttpAddress;
|
||||
}
|
||||
|
||||
public String getNodeId() {
|
||||
return nodeId;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import javax.ws.rs.QueryParam;
|
|||
import javax.ws.rs.WebApplicationException;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.ResponseBuilder;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import javax.ws.rs.core.StreamingOutput;
|
||||
import javax.ws.rs.core.UriInfo;
|
||||
|
@ -215,7 +216,8 @@ public class NMWebServices {
|
|||
@Public
|
||||
@Unstable
|
||||
public Response getLogs(@PathParam("containerid") String containerIdStr,
|
||||
@PathParam("filename") String filename) {
|
||||
@PathParam("filename") String filename,
|
||||
@QueryParam("download") String download) {
|
||||
ContainerId containerId;
|
||||
try {
|
||||
containerId = ConverterUtils.toContainerId(containerIdStr);
|
||||
|
@ -232,7 +234,7 @@ public class NMWebServices {
|
|||
} catch (YarnException ex) {
|
||||
return Response.serverError().entity(ex.getMessage()).build();
|
||||
}
|
||||
|
||||
boolean downloadFile = parseBooleanParam(download);
|
||||
try {
|
||||
final FileInputStream fis = ContainerLogsUtils.openLogFileForRead(
|
||||
containerIdStr, logFile, nmContext);
|
||||
|
@ -250,10 +252,22 @@ public class NMWebServices {
|
|||
os.flush();
|
||||
}
|
||||
};
|
||||
|
||||
return Response.ok(stream).build();
|
||||
ResponseBuilder resp = Response.ok(stream);
|
||||
if (downloadFile) {
|
||||
resp.header("Content-Type", "application/octet-stream");
|
||||
resp.header("Content-Disposition", "attachment; filename="
|
||||
+ logFile.getName());
|
||||
}
|
||||
return resp.build();
|
||||
} catch (IOException ex) {
|
||||
return Response.serverError().entity(ex.getMessage()).build();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean parseBooleanParam(String param) {
|
||||
if (param != null) {
|
||||
return ("true").equalsIgnoreCase(param);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,7 +50,6 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer.NMWebApp;
|
||||
|
@ -352,7 +351,16 @@ public class TestNMWebServices extends JerseyTestBase {
|
|||
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
|
||||
String responseText = response.getEntity(String.class);
|
||||
assertEquals(logMessage, responseText);
|
||||
|
||||
|
||||
// ask and download it
|
||||
response = r.path("ws").path("v1").path("node").path("containerlogs")
|
||||
.path(containerIdStr).path(filename).queryParam("download", "true")
|
||||
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
|
||||
responseText = response.getEntity(String.class);
|
||||
assertEquals(logMessage, responseText);
|
||||
assertEquals(200, response.getStatus());
|
||||
assertEquals("application/octet-stream", response.getType().toString());
|
||||
|
||||
// ask for file that doesn't exist
|
||||
response = r.path("ws").path("v1").path("node")
|
||||
.path("containerlogs").path(containerIdStr).path("uhhh")
|
||||
|
|
Loading…
Reference in New Issue