Merge branch 'trunk' into HDFS-6584
This commit is contained in:
commit
aaa7e21757
|
@ -88,6 +88,9 @@ Release 2.6.0 - UNRELEASED
|
|||
and enforce/not-enforce strict control of per-container cpu usage. (Varun
|
||||
Vasudev via vinodkv)
|
||||
|
||||
YARN-1250. Generic history service should support application-acls. (Zhijie Shen
|
||||
via junping_du)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Map;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|||
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
|
||||
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
|
||||
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
|
||||
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
|
||||
|
@ -62,12 +64,15 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
ApplicationHistoryManager {
|
||||
|
||||
private TimelineDataManager timelineDataManager;
|
||||
private ApplicationACLsManager aclsManager;
|
||||
private String serverHttpAddress;
|
||||
|
||||
public ApplicationHistoryManagerOnTimelineStore(
|
||||
TimelineDataManager timelineDataManager) {
|
||||
TimelineDataManager timelineDataManager,
|
||||
ApplicationACLsManager aclsManager) {
|
||||
super(ApplicationHistoryManagerOnTimelineStore.class.getName());
|
||||
this.timelineDataManager = timelineDataManager;
|
||||
this.aclsManager = aclsManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -80,7 +85,7 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
@Override
|
||||
public ApplicationReport getApplication(ApplicationId appId)
|
||||
throws YarnException, IOException {
|
||||
return getApplication(appId, ApplicationReportField.ALL);
|
||||
return getApplication(appId, ApplicationReportField.ALL).appReport;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -94,9 +99,9 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
new HashMap<ApplicationId, ApplicationReport>();
|
||||
if (entities != null && entities.getEntities() != null) {
|
||||
for (TimelineEntity entity : entities.getEntities()) {
|
||||
ApplicationReport app =
|
||||
ApplicationReportExt app =
|
||||
generateApplicationReport(entity, ApplicationReportField.ALL);
|
||||
apps.put(app.getApplicationId(), app);
|
||||
apps.put(app.appReport.getApplicationId(), app.appReport);
|
||||
}
|
||||
}
|
||||
return apps;
|
||||
|
@ -106,6 +111,9 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
public Map<ApplicationAttemptId, ApplicationAttemptReport>
|
||||
getApplicationAttempts(ApplicationId appId)
|
||||
throws YarnException, IOException {
|
||||
ApplicationReportExt app = getApplication(
|
||||
appId, ApplicationReportField.USER_AND_ACLS);
|
||||
checkAccess(app);
|
||||
TimelineEntities entities = timelineDataManager.getEntities(
|
||||
AppAttemptMetricsConstants.ENTITY_TYPE,
|
||||
new NameValuePair(
|
||||
|
@ -115,16 +123,10 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
UserGroupInformation.getLoginUser());
|
||||
Map<ApplicationAttemptId, ApplicationAttemptReport> appAttempts =
|
||||
new HashMap<ApplicationAttemptId, ApplicationAttemptReport>();
|
||||
if (entities != null && entities.getEntities() != null) {
|
||||
for (TimelineEntity entity : entities.getEntities()) {
|
||||
ApplicationAttemptReport appAttempt =
|
||||
convertToApplicationAttemptReport(entity);
|
||||
appAttempts.put(appAttempt.getApplicationAttemptId(), appAttempt);
|
||||
}
|
||||
} else {
|
||||
// It is likely that the attemtps are not found due to non-existing
|
||||
// application. In this case, we need to throw ApplicationNotFoundException.
|
||||
getApplication(appId, ApplicationReportField.NONE);
|
||||
for (TimelineEntity entity : entities.getEntities()) {
|
||||
ApplicationAttemptReport appAttempt =
|
||||
convertToApplicationAttemptReport(entity);
|
||||
appAttempts.put(appAttempt.getApplicationAttemptId(), appAttempt);
|
||||
}
|
||||
return appAttempts;
|
||||
}
|
||||
|
@ -132,13 +134,14 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
@Override
|
||||
public ApplicationAttemptReport getApplicationAttempt(
|
||||
ApplicationAttemptId appAttemptId) throws YarnException, IOException {
|
||||
ApplicationReportExt app = getApplication(
|
||||
appAttemptId.getApplicationId(), ApplicationReportField.USER_AND_ACLS);
|
||||
checkAccess(app);
|
||||
TimelineEntity entity = timelineDataManager.getEntity(
|
||||
AppAttemptMetricsConstants.ENTITY_TYPE,
|
||||
appAttemptId.toString(), EnumSet.allOf(Field.class),
|
||||
UserGroupInformation.getLoginUser());
|
||||
if (entity == null) {
|
||||
// Will throw ApplicationNotFoundException first
|
||||
getApplication(appAttemptId.getApplicationId(), ApplicationReportField.NONE);
|
||||
throw new ApplicationAttemptNotFoundException(
|
||||
"The entity for application attempt " + appAttemptId +
|
||||
" doesn't exist in the timeline store");
|
||||
|
@ -150,9 +153,10 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
@Override
|
||||
public ContainerReport getContainer(ContainerId containerId)
|
||||
throws YarnException, IOException {
|
||||
ApplicationReport app = getApplication(
|
||||
ApplicationReportExt app = getApplication(
|
||||
containerId.getApplicationAttemptId().getApplicationId(),
|
||||
ApplicationReportField.USER);
|
||||
ApplicationReportField.USER_AND_ACLS);
|
||||
checkAccess(app);
|
||||
TimelineEntity entity = timelineDataManager.getEntity(
|
||||
ContainerMetricsConstants.ENTITY_TYPE,
|
||||
containerId.toString(), EnumSet.allOf(Field.class),
|
||||
|
@ -162,7 +166,8 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
"The entity for container " + containerId +
|
||||
" doesn't exist in the timeline store");
|
||||
} else {
|
||||
return convertToContainerReport(entity, serverHttpAddress, app.getUser());
|
||||
return convertToContainerReport(
|
||||
entity, serverHttpAddress, app.appReport.getUser());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -176,8 +181,9 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
@Override
|
||||
public Map<ContainerId, ContainerReport> getContainers(
|
||||
ApplicationAttemptId appAttemptId) throws YarnException, IOException {
|
||||
ApplicationReport app = getApplication(
|
||||
appAttemptId.getApplicationId(), ApplicationReportField.USER);
|
||||
ApplicationReportExt app = getApplication(
|
||||
appAttemptId.getApplicationId(), ApplicationReportField.USER_AND_ACLS);
|
||||
checkAccess(app);
|
||||
TimelineEntities entities = timelineDataManager.getEntities(
|
||||
ContainerMetricsConstants.ENTITY_TYPE,
|
||||
new NameValuePair(
|
||||
|
@ -189,15 +195,15 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
new HashMap<ContainerId, ContainerReport>();
|
||||
if (entities != null && entities.getEntities() != null) {
|
||||
for (TimelineEntity entity : entities.getEntities()) {
|
||||
ContainerReport container =
|
||||
convertToContainerReport(entity, serverHttpAddress, app.getUser());
|
||||
ContainerReport container = convertToContainerReport(
|
||||
entity, serverHttpAddress, app.appReport.getUser());
|
||||
containers.put(container.getContainerId(), container);
|
||||
}
|
||||
}
|
||||
return containers;
|
||||
}
|
||||
|
||||
private static ApplicationReport convertToApplicationReport(
|
||||
private static ApplicationReportExt convertToApplicationReport(
|
||||
TimelineEntity entity, ApplicationReportField field) {
|
||||
String user = null;
|
||||
String queue = null;
|
||||
|
@ -209,13 +215,8 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
String diagnosticsInfo = null;
|
||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.UNDEFINED;
|
||||
YarnApplicationState state = null;
|
||||
if (field == ApplicationReportField.NONE) {
|
||||
return ApplicationReport.newInstance(
|
||||
ConverterUtils.toApplicationId(entity.getEntityId()),
|
||||
latestApplicationAttemptId, user, queue, name, null, -1, null, state,
|
||||
diagnosticsInfo, null, createdTime, finishedTime, finalStatus, null,
|
||||
null, 1.0F, type, null);
|
||||
}
|
||||
Map<ApplicationAccessType, String> appViewACLs =
|
||||
new HashMap<ApplicationAccessType, String>();
|
||||
Map<String, Object> entityInfo = entity.getOtherInfo();
|
||||
if (entityInfo != null) {
|
||||
if (entityInfo.containsKey(ApplicationMetricsConstants.USER_ENTITY_INFO)) {
|
||||
|
@ -223,12 +224,17 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
entityInfo.get(ApplicationMetricsConstants.USER_ENTITY_INFO)
|
||||
.toString();
|
||||
}
|
||||
if (field == ApplicationReportField.USER) {
|
||||
return ApplicationReport.newInstance(
|
||||
if (entityInfo.containsKey(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO)) {
|
||||
String appViewACLsStr = entityInfo.get(
|
||||
ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO).toString();
|
||||
appViewACLs.put(ApplicationAccessType.VIEW_APP, appViewACLsStr);
|
||||
}
|
||||
if (field == ApplicationReportField.USER_AND_ACLS) {
|
||||
return new ApplicationReportExt(ApplicationReport.newInstance(
|
||||
ConverterUtils.toApplicationId(entity.getEntityId()),
|
||||
latestApplicationAttemptId, user, queue, name, null, -1, null, state,
|
||||
diagnosticsInfo, null, createdTime, finishedTime, finalStatus, null,
|
||||
null, 1.0F, type, null);
|
||||
null, 1.0F, type, null), appViewACLs);
|
||||
}
|
||||
if (entityInfo.containsKey(ApplicationMetricsConstants.QUEUE_ENTITY_INFO)) {
|
||||
queue =
|
||||
|
@ -292,11 +298,11 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
}
|
||||
}
|
||||
}
|
||||
return ApplicationReport.newInstance(
|
||||
return new ApplicationReportExt(ApplicationReport.newInstance(
|
||||
ConverterUtils.toApplicationId(entity.getEntityId()),
|
||||
latestApplicationAttemptId, user, queue, name, null, -1, null, state,
|
||||
diagnosticsInfo, null, createdTime, finishedTime, finalStatus, null,
|
||||
null, 1.0F, type, null);
|
||||
null, 1.0F, type, null), appViewACLs);
|
||||
}
|
||||
|
||||
private static ApplicationAttemptReport convertToApplicationAttemptReport(
|
||||
|
@ -471,24 +477,39 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
createdTime, finishedTime, diagnosticsInfo, logUrl, exitStatus, state);
|
||||
}
|
||||
|
||||
private ApplicationReport generateApplicationReport(TimelineEntity entity,
|
||||
private ApplicationReportExt generateApplicationReport(TimelineEntity entity,
|
||||
ApplicationReportField field) throws YarnException, IOException {
|
||||
ApplicationReport app = convertToApplicationReport(entity, field);
|
||||
if (field == ApplicationReportField.ALL &&
|
||||
app != null && app.getCurrentApplicationAttemptId() != null) {
|
||||
ApplicationAttemptReport appAttempt =
|
||||
getApplicationAttempt(app.getCurrentApplicationAttemptId());
|
||||
if (appAttempt != null) {
|
||||
app.setHost(appAttempt.getHost());
|
||||
app.setRpcPort(appAttempt.getRpcPort());
|
||||
app.setTrackingUrl(appAttempt.getTrackingUrl());
|
||||
app.setOriginalTrackingUrl(appAttempt.getOriginalTrackingUrl());
|
||||
ApplicationReportExt app = convertToApplicationReport(entity, field);
|
||||
// If only user and acls are pulled to check attempt(s)/container(s) access
|
||||
// control, we can return immediately
|
||||
if (field == ApplicationReportField.USER_AND_ACLS) {
|
||||
return app;
|
||||
}
|
||||
try {
|
||||
checkAccess(app);
|
||||
if (app.appReport.getCurrentApplicationAttemptId() != null) {
|
||||
ApplicationAttemptReport appAttempt =
|
||||
getApplicationAttempt(app.appReport.getCurrentApplicationAttemptId());
|
||||
if (appAttempt != null) {
|
||||
app.appReport.setHost(appAttempt.getHost());
|
||||
app.appReport.setRpcPort(appAttempt.getRpcPort());
|
||||
app.appReport.setTrackingUrl(appAttempt.getTrackingUrl());
|
||||
app.appReport.setOriginalTrackingUrl(appAttempt.getOriginalTrackingUrl());
|
||||
}
|
||||
}
|
||||
} catch (YarnException e) {
|
||||
// YarnExcetpion is thrown because the user doesn't have access
|
||||
app.appReport.setDiagnostics(null);
|
||||
app.appReport.setCurrentApplicationAttemptId(null);
|
||||
}
|
||||
if (app.appReport.getCurrentApplicationAttemptId() == null) {
|
||||
app.appReport.setCurrentApplicationAttemptId(
|
||||
ApplicationAttemptId.newInstance(app.appReport.getApplicationId(), -1));
|
||||
}
|
||||
return app;
|
||||
}
|
||||
|
||||
private ApplicationReport getApplication(ApplicationId appId,
|
||||
private ApplicationReportExt getApplication(ApplicationId appId,
|
||||
ApplicationReportField field) throws YarnException, IOException {
|
||||
TimelineEntity entity = timelineDataManager.getEntity(
|
||||
ApplicationMetricsConstants.ENTITY_TYPE,
|
||||
|
@ -502,10 +523,40 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
|
|||
}
|
||||
}
|
||||
|
||||
private void checkAccess(ApplicationReportExt app)
|
||||
throws YarnException, IOException {
|
||||
if (app.appViewACLs != null) {
|
||||
aclsManager.addApplication(
|
||||
app.appReport.getApplicationId(), app.appViewACLs);
|
||||
try {
|
||||
if (!aclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
|
||||
ApplicationAccessType.VIEW_APP, app.appReport.getUser(),
|
||||
app.appReport.getApplicationId())) {
|
||||
throw new YarnException("User "
|
||||
+ UserGroupInformation.getCurrentUser().getShortUserName()
|
||||
+ " does not have privilage to see this application "
|
||||
+ app.appReport.getApplicationId());
|
||||
}
|
||||
} finally {
|
||||
aclsManager.removeApplication(app.appReport.getApplicationId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static enum ApplicationReportField {
|
||||
ALL, // retrieve all the fields
|
||||
NONE, // retrieve no fields
|
||||
USER // retrieve user info only
|
||||
USER_AND_ACLS // retrieve user and ACLs info only
|
||||
}
|
||||
|
||||
private static class ApplicationReportExt {
|
||||
private ApplicationReport appReport;
|
||||
private Map<ApplicationAccessType, String> appViewACLs;
|
||||
|
||||
public ApplicationReportExt(
|
||||
ApplicationReport appReport,
|
||||
Map<ApplicationAccessType, String> appViewACLs) {
|
||||
this.appReport = appReport;
|
||||
this.appViewACLs = appViewACLs;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
|
||||
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
|
||||
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
||||
|
@ -64,6 +65,7 @@ public class ApplicationHistoryServer extends CompositeService {
|
|||
.getLog(ApplicationHistoryServer.class);
|
||||
|
||||
private ApplicationHistoryClientService ahsClientService;
|
||||
private ApplicationACLsManager aclsManager;
|
||||
private ApplicationHistoryManager historyManager;
|
||||
private TimelineStore timelineStore;
|
||||
private TimelineDelegationTokenSecretManagerService secretManagerService;
|
||||
|
@ -84,6 +86,7 @@ public class ApplicationHistoryServer extends CompositeService {
|
|||
timelineDataManager = createTimelineDataManager(conf);
|
||||
|
||||
// init generic history service afterwards
|
||||
aclsManager = createApplicationACLsManager(conf);
|
||||
historyManager = createApplicationHistoryManager(conf);
|
||||
ahsClientService = createApplicationHistoryClientService(historyManager);
|
||||
addService(ahsClientService);
|
||||
|
@ -168,6 +171,11 @@ public class ApplicationHistoryServer extends CompositeService {
|
|||
return new ApplicationHistoryClientService(historyManager);
|
||||
}
|
||||
|
||||
private ApplicationACLsManager createApplicationACLsManager(
|
||||
Configuration conf) {
|
||||
return new ApplicationACLsManager(conf);
|
||||
}
|
||||
|
||||
private ApplicationHistoryManager createApplicationHistoryManager(
|
||||
Configuration conf) {
|
||||
// Backward compatibility:
|
||||
|
@ -175,7 +183,8 @@ public class ApplicationHistoryServer extends CompositeService {
|
|||
// user has enabled it explicitly.
|
||||
if (conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE) == null ||
|
||||
conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE).length() == 0) {
|
||||
return new ApplicationHistoryManagerOnTimelineStore(timelineDataManager);
|
||||
return new ApplicationHistoryManagerOnTimelineStore(
|
||||
timelineDataManager, aclsManager);
|
||||
} else {
|
||||
LOG.warn("The filesystem based application history store is deprecated.");
|
||||
return new ApplicationHistoryManagerImpl();
|
||||
|
|
|
@ -18,10 +18,16 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.applicationhistoryservice;
|
||||
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -42,40 +48,75 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
|
||||
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
|
||||
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
|
||||
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestApplicationHistoryManagerOnTimelineStore {
|
||||
|
||||
private static ApplicationHistoryManagerOnTimelineStore historyManager;
|
||||
private static final int SCALE = 5;
|
||||
private static TimelineStore store;
|
||||
|
||||
private ApplicationHistoryManagerOnTimelineStore historyManager;
|
||||
private UserGroupInformation callerUGI;
|
||||
private Configuration conf;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
TimelineStore store = new MemoryTimelineStore();
|
||||
public static void prepareStore() throws Exception {
|
||||
store = new MemoryTimelineStore();
|
||||
prepareTimelineStore(store);
|
||||
TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
// Only test the ACLs of the generic history
|
||||
TimelineACLsManager aclsManager = new TimelineACLsManager(new YarnConfiguration());
|
||||
TimelineDataManager dataManager =
|
||||
new TimelineDataManager(store, aclsManager);
|
||||
historyManager = new ApplicationHistoryManagerOnTimelineStore(dataManager);
|
||||
ApplicationACLsManager appAclsManager = new ApplicationACLsManager(conf);
|
||||
historyManager =
|
||||
new ApplicationHistoryManagerOnTimelineStore(dataManager, appAclsManager);
|
||||
historyManager.init(conf);
|
||||
historyManager.start();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
@After
|
||||
public void tearDown() {
|
||||
if (historyManager != null) {
|
||||
historyManager.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> callers() {
|
||||
// user1 is the owner
|
||||
// user2 is the authorized user
|
||||
// user3 is the unauthorized user
|
||||
// admin is the admin acl
|
||||
return Arrays.asList(
|
||||
new Object[][] { { "" }, { "user1" }, { "user2" }, { "user3" }, { "admin" } });
|
||||
}
|
||||
|
||||
public TestApplicationHistoryManagerOnTimelineStore(String caller) {
|
||||
conf = new YarnConfiguration();
|
||||
if (!caller.equals("")) {
|
||||
callerUGI = UserGroupInformation.createRemoteUser(caller, AuthMethod.SIMPLE);
|
||||
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
|
||||
}
|
||||
}
|
||||
|
||||
private static void prepareTimelineStore(TimelineStore store)
|
||||
throws Exception {
|
||||
for (int i = 1; i <= SCALE; ++i) {
|
||||
|
@ -101,23 +142,46 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
|||
|
||||
@Test
|
||||
public void testGetApplicationReport() throws Exception {
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
ApplicationReport app = historyManager.getApplication(appId);
|
||||
final ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
ApplicationReport app;
|
||||
if (callerUGI == null) {
|
||||
app = historyManager.getApplication(appId);
|
||||
} else {
|
||||
app =
|
||||
callerUGI.doAs(new PrivilegedExceptionAction<ApplicationReport> () {
|
||||
@Override
|
||||
public ApplicationReport run() throws Exception {
|
||||
return historyManager.getApplication(appId);
|
||||
}
|
||||
});
|
||||
}
|
||||
Assert.assertNotNull(app);
|
||||
Assert.assertEquals(appId, app.getApplicationId());
|
||||
Assert.assertEquals("test app", app.getName());
|
||||
Assert.assertEquals("test app type", app.getApplicationType());
|
||||
Assert.assertEquals("test user", app.getUser());
|
||||
Assert.assertEquals("user1", app.getUser());
|
||||
Assert.assertEquals("test queue", app.getQueue());
|
||||
Assert.assertEquals(Integer.MAX_VALUE + 2L, app.getStartTime());
|
||||
Assert.assertEquals(Integer.MAX_VALUE + 3L, app.getFinishTime());
|
||||
Assert.assertTrue(Math.abs(app.getProgress() - 1.0F) < 0.0001);
|
||||
Assert.assertEquals("test host", app.getHost());
|
||||
Assert.assertEquals(-100, app.getRpcPort());
|
||||
Assert.assertEquals("test tracking url", app.getTrackingUrl());
|
||||
Assert.assertEquals("test original tracking url",
|
||||
app.getOriginalTrackingUrl());
|
||||
Assert.assertEquals("test diagnostics info", app.getDiagnostics());
|
||||
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
|
||||
Assert.assertEquals(ApplicationAttemptId.newInstance(appId, -1),
|
||||
app.getCurrentApplicationAttemptId());
|
||||
Assert.assertEquals(null, app.getHost());
|
||||
Assert.assertEquals(-1, app.getRpcPort());
|
||||
Assert.assertEquals(null, app.getTrackingUrl());
|
||||
Assert.assertEquals(null, app.getOriginalTrackingUrl());
|
||||
Assert.assertEquals(null, app.getDiagnostics());
|
||||
} else {
|
||||
Assert.assertEquals(ApplicationAttemptId.newInstance(appId, 1),
|
||||
app.getCurrentApplicationAttemptId());
|
||||
Assert.assertEquals("test host", app.getHost());
|
||||
Assert.assertEquals(-100, app.getRpcPort());
|
||||
Assert.assertEquals("test tracking url", app.getTrackingUrl());
|
||||
Assert.assertEquals("test original tracking url",
|
||||
app.getOriginalTrackingUrl());
|
||||
Assert.assertEquals("test diagnostics info", app.getDiagnostics());
|
||||
}
|
||||
Assert.assertEquals(FinalApplicationStatus.UNDEFINED,
|
||||
app.getFinalApplicationStatus());
|
||||
Assert.assertEquals(YarnApplicationState.FINISHED,
|
||||
|
@ -126,10 +190,35 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
|||
|
||||
@Test
|
||||
public void testGetApplicationAttemptReport() throws Exception {
|
||||
ApplicationAttemptId appAttemptId =
|
||||
final ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
|
||||
ApplicationAttemptReport appAttempt =
|
||||
historyManager.getApplicationAttempt(appAttemptId);
|
||||
ApplicationAttemptReport appAttempt;
|
||||
if (callerUGI == null) {
|
||||
appAttempt = historyManager.getApplicationAttempt(appAttemptId);
|
||||
} else {
|
||||
try {
|
||||
appAttempt =
|
||||
callerUGI.doAs(new PrivilegedExceptionAction<ApplicationAttemptReport> () {
|
||||
@Override
|
||||
public ApplicationAttemptReport run() throws Exception {
|
||||
return historyManager.getApplicationAttempt(appAttemptId);
|
||||
}
|
||||
});
|
||||
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
|
||||
// The exception is expected
|
||||
Assert.fail();
|
||||
}
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
|
||||
if (e.getCause().getMessage().contains(
|
||||
"does not have privilage to see this application")) {
|
||||
// The exception is expected
|
||||
return;
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
Assert.assertNotNull(appAttempt);
|
||||
Assert.assertEquals(appAttemptId, appAttempt.getApplicationAttemptId());
|
||||
Assert.assertEquals(ContainerId.newInstance(appAttemptId, 1),
|
||||
|
@ -146,10 +235,36 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
|||
|
||||
@Test
|
||||
public void testGetContainerReport() throws Exception {
|
||||
ContainerId containerId =
|
||||
final ContainerId containerId =
|
||||
ContainerId.newInstance(ApplicationAttemptId.newInstance(
|
||||
ApplicationId.newInstance(0, 1), 1), 1);
|
||||
ContainerReport container = historyManager.getContainer(containerId);
|
||||
ContainerReport container;
|
||||
if (callerUGI == null) {
|
||||
container = historyManager.getContainer(containerId);
|
||||
} else {
|
||||
try {
|
||||
container =
|
||||
callerUGI.doAs(new PrivilegedExceptionAction<ContainerReport> () {
|
||||
@Override
|
||||
public ContainerReport run() throws Exception {
|
||||
return historyManager.getContainer(containerId);
|
||||
}
|
||||
});
|
||||
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
|
||||
// The exception is expected
|
||||
Assert.fail();
|
||||
}
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
|
||||
if (e.getCause().getMessage().contains(
|
||||
"does not have privilage to see this application")) {
|
||||
// The exception is expected
|
||||
return;
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
Assert.assertNotNull(container);
|
||||
Assert.assertEquals(Integer.MAX_VALUE + 1L, container.getCreationTime());
|
||||
Assert.assertEquals(Integer.MAX_VALUE + 2L, container.getFinishTime());
|
||||
|
@ -164,7 +279,7 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
|||
Assert.assertEquals(-1, container.getContainerExitStatus());
|
||||
Assert.assertEquals("http://0.0.0.0:8188/applicationhistory/logs/" +
|
||||
"test host:-100/container_0_0001_01_000001/"
|
||||
+ "container_0_0001_01_000001/test user", container.getLogUrl());
|
||||
+ "container_0_0001_01_000001/user1", container.getLogUrl());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -177,29 +292,104 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
|||
|
||||
@Test
|
||||
public void testGetApplicationAttempts() throws Exception {
|
||||
Collection<ApplicationAttemptReport> appAttempts =
|
||||
historyManager.getApplicationAttempts(ApplicationId.newInstance(0, 1))
|
||||
.values();
|
||||
final ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
Collection<ApplicationAttemptReport> appAttempts;
|
||||
if (callerUGI == null) {
|
||||
appAttempts = historyManager.getApplicationAttempts(appId).values();
|
||||
} else {
|
||||
try {
|
||||
appAttempts = callerUGI.doAs(
|
||||
new PrivilegedExceptionAction<Collection<ApplicationAttemptReport>> () {
|
||||
@Override
|
||||
public Collection<ApplicationAttemptReport> run() throws Exception {
|
||||
return historyManager.getApplicationAttempts(appId).values();
|
||||
}
|
||||
});
|
||||
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
|
||||
// The exception is expected
|
||||
Assert.fail();
|
||||
}
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
|
||||
if (e.getCause().getMessage().contains(
|
||||
"does not have privilage to see this application")) {
|
||||
// The exception is expected
|
||||
return;
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
Assert.assertNotNull(appAttempts);
|
||||
Assert.assertEquals(SCALE, appAttempts.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetContainers() throws Exception {
|
||||
Collection<ContainerReport> containers =
|
||||
historyManager
|
||||
.getContainers(
|
||||
ApplicationAttemptId.newInstance(
|
||||
ApplicationId.newInstance(0, 1), 1)).values();
|
||||
final ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
|
||||
Collection<ContainerReport> containers;
|
||||
if (callerUGI == null) {
|
||||
containers = historyManager.getContainers(appAttemptId).values();
|
||||
} else {
|
||||
try {
|
||||
containers = callerUGI.doAs(
|
||||
new PrivilegedExceptionAction<Collection<ContainerReport>> () {
|
||||
@Override
|
||||
public Collection<ContainerReport> run() throws Exception {
|
||||
return historyManager.getContainers(appAttemptId).values();
|
||||
}
|
||||
});
|
||||
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
|
||||
// The exception is expected
|
||||
Assert.fail();
|
||||
}
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
|
||||
if (e.getCause().getMessage().contains(
|
||||
"does not have privilage to see this application")) {
|
||||
// The exception is expected
|
||||
return;
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
Assert.assertNotNull(containers);
|
||||
Assert.assertEquals(SCALE, containers.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAMContainer() throws Exception {
|
||||
ApplicationAttemptId appAttemptId =
|
||||
final ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
|
||||
ContainerReport container = historyManager.getAMContainer(appAttemptId);
|
||||
ContainerReport container;
|
||||
if (callerUGI == null) {
|
||||
container = historyManager.getAMContainer(appAttemptId);
|
||||
} else {
|
||||
try {
|
||||
container =
|
||||
callerUGI.doAs(new PrivilegedExceptionAction<ContainerReport> () {
|
||||
@Override
|
||||
public ContainerReport run() throws Exception {
|
||||
return historyManager.getAMContainer(appAttemptId);
|
||||
}
|
||||
});
|
||||
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
|
||||
// The exception is expected
|
||||
Assert.fail();
|
||||
}
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
|
||||
if (e.getCause().getMessage().contains(
|
||||
"does not have privilage to see this application")) {
|
||||
// The exception is expected
|
||||
return;
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
Assert.assertNotNull(container);
|
||||
Assert.assertEquals(appAttemptId, container.getContainerId()
|
||||
.getApplicationAttemptId());
|
||||
|
@ -210,14 +400,18 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
|||
TimelineEntity entity = new TimelineEntity();
|
||||
entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
|
||||
entity.setEntityId(appId.toString());
|
||||
entity.addPrimaryFilter(
|
||||
TimelineStore.SystemFilter.ENTITY_OWNER.toString(), "yarn");
|
||||
Map<String, Object> entityInfo = new HashMap<String, Object>();
|
||||
entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, "test app");
|
||||
entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
|
||||
"test app type");
|
||||
entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, "test user");
|
||||
entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, "user1");
|
||||
entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, "test queue");
|
||||
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
|
||||
Integer.MAX_VALUE + 1L);
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
|
||||
"user2");
|
||||
entity.setOtherInfo(entityInfo);
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
tEvent.setEventType(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
|
||||
|
@ -248,6 +442,8 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
|||
entity.setEntityId(appAttemptId.toString());
|
||||
entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER,
|
||||
appAttemptId.getApplicationId().toString());
|
||||
entity.addPrimaryFilter(
|
||||
TimelineStore.SystemFilter.ENTITY_OWNER.toString(), "yarn");
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
tEvent.setEventType(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
|
||||
tEvent.setTimestamp(Integer.MAX_VALUE + 1L);
|
||||
|
@ -287,6 +483,8 @@ public class TestApplicationHistoryManagerOnTimelineStore {
|
|||
entity.setEntityId(containerId.toString());
|
||||
entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER,
|
||||
containerId.getApplicationAttemptId().toString());
|
||||
entity.addPrimaryFilter(
|
||||
TimelineStore.SystemFilter.ENTITY_OWNER.toString(), "yarn");
|
||||
Map<String, Object> entityInfo = new HashMap<String, Object>();
|
||||
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, -1);
|
||||
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, -1);
|
||||
|
|
|
@ -34,6 +34,9 @@ public class ApplicationMetricsConstants {
|
|||
public static final String FINISHED_EVENT_TYPE =
|
||||
"YARN_APPLICATION_FINISHED";
|
||||
|
||||
public static final String ACLS_UPDATED_EVENT_TYPE =
|
||||
"YARN_APPLICATION_ACLS_UPDATED";
|
||||
|
||||
public static final String NAME_ENTITY_INFO =
|
||||
"YARN_APPLICATION_NAME";
|
||||
|
||||
|
@ -49,6 +52,9 @@ public class ApplicationMetricsConstants {
|
|||
public static final String SUBMITTED_TIME_ENTITY_INFO =
|
||||
"YARN_APPLICATION_SUBMITTED_TIME";
|
||||
|
||||
public static final String APP_VIEW_ACLS_ENTITY_INFO =
|
||||
"YARN_APPLICATION_VIEW_ACLS";
|
||||
|
||||
public static final String DIAGNOSTICS_INFO_EVENT_INFO =
|
||||
"YARN_APPLICATION_DIAGNOSTICS_INFO";
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.io.DataInputByteBuffer;
|
|||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
|
@ -365,6 +366,10 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
|||
// Inform the ACLs Manager
|
||||
this.applicationACLsManager.addApplication(applicationId,
|
||||
submissionContext.getAMContainerSpec().getApplicationACLs());
|
||||
String appViewACLs = submissionContext.getAMContainerSpec()
|
||||
.getApplicationACLs().get(ApplicationAccessType.VIEW_APP);
|
||||
rmContext.getSystemMetricsPublisher().appACLsUpdated(
|
||||
application, appViewACLs, System.currentTimeMillis());
|
||||
return application;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
||||
|
||||
public class ApplicationACLsUpdatedEvent extends SystemMetricsEvent {
|
||||
|
||||
private ApplicationId appId;
|
||||
private String viewAppACLs;
|
||||
|
||||
public ApplicationACLsUpdatedEvent(ApplicationId appId,
|
||||
String viewAppACLs,
|
||||
long updatedTime) {
|
||||
super(SystemMetricsEventType.APP_ACLS_UPDATED, updatedTime);
|
||||
this.appId = appId;
|
||||
this.viewAppACLs = viewAppACLs;
|
||||
}
|
||||
|
||||
public ApplicationId getApplicationId() {
|
||||
return appId;
|
||||
}
|
||||
|
||||
public String getViewAppACLs() {
|
||||
return viewAppACLs;
|
||||
}
|
||||
|
||||
}
|
|
@ -23,6 +23,7 @@ public enum SystemMetricsEventType {
|
|||
// app events
|
||||
APP_CREATED,
|
||||
APP_FINISHED,
|
||||
APP_ACLS_UPDATED,
|
||||
|
||||
// app attempt events
|
||||
APP_ATTEMPT_REGISTERED,
|
||||
|
|
|
@ -54,6 +54,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||
|
||||
/**
|
||||
* The class that helps RM publish metrics to the timeline server. RM will
|
||||
* always invoke the methods of this class regardless the service is enabled or
|
||||
* not. If it is disabled, publishing requests will be ignored silently.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public class SystemMetricsPublisher extends CompositeService {
|
||||
|
@ -125,6 +130,18 @@ public class SystemMetricsPublisher extends CompositeService {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void appACLsUpdated(RMApp app, String appViewACLs,
|
||||
long updatedTime) {
|
||||
if (publishSystemMetrics) {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ApplicationACLsUpdatedEvent(
|
||||
app.getApplicationId(),
|
||||
appViewACLs,
|
||||
updatedTime));
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void appAttemptRegistered(RMAppAttempt appAttempt,
|
||||
long registeredTime) {
|
||||
|
@ -202,6 +219,9 @@ public class SystemMetricsPublisher extends CompositeService {
|
|||
case APP_FINISHED:
|
||||
publishApplicationFinishedEvent((ApplicationFinishedEvent) event);
|
||||
break;
|
||||
case APP_ACLS_UPDATED:
|
||||
publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
|
||||
break;
|
||||
case APP_ATTEMPT_REGISTERED:
|
||||
publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
|
||||
break;
|
||||
|
@ -265,6 +285,22 @@ public class SystemMetricsPublisher extends CompositeService {
|
|||
putEntity(entity);
|
||||
}
|
||||
|
||||
private void publishApplicationACLsUpdatedEvent(
|
||||
ApplicationACLsUpdatedEvent event) {
|
||||
TimelineEntity entity =
|
||||
createApplicationEntity(event.getApplicationId());
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
Map<String, Object> entityInfo = new HashMap<String, Object>();
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
|
||||
event.getViewAppACLs());
|
||||
entity.setOtherInfo(entityInfo);
|
||||
tEvent.setEventType(
|
||||
ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE);
|
||||
tEvent.setTimestamp(event.getTimestamp());
|
||||
entity.addEvent(tEvent);
|
||||
putEntity(entity);
|
||||
}
|
||||
|
||||
private static TimelineEntity createApplicationEntity(
|
||||
ApplicationId applicationId) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
|
|
|
@ -24,6 +24,8 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import static org.mockito.Matchers.isA;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
@ -97,7 +99,7 @@ public class TestAppManager{
|
|||
return list;
|
||||
}
|
||||
|
||||
public static RMContext mockRMContext(int n, long time) {
|
||||
public RMContext mockRMContext(int n, long time) {
|
||||
final List<RMApp> apps = newRMApps(n, time, RMAppState.FINISHED);
|
||||
final ConcurrentMap<ApplicationId, RMApp> map = Maps.newConcurrentMap();
|
||||
for (RMApp app : apps) {
|
||||
|
@ -120,8 +122,8 @@ public class TestAppManager{
|
|||
}
|
||||
};
|
||||
((RMContextImpl)context).setStateStore(mock(RMStateStore.class));
|
||||
((RMContextImpl)context).setSystemMetricsPublisher(
|
||||
mock(SystemMetricsPublisher.class));
|
||||
metricsPublisher = mock(SystemMetricsPublisher.class);
|
||||
((RMContextImpl)context).setSystemMetricsPublisher(metricsPublisher);
|
||||
return context;
|
||||
}
|
||||
|
||||
|
@ -200,6 +202,7 @@ public class TestAppManager{
|
|||
}
|
||||
|
||||
private RMContext rmContext;
|
||||
private SystemMetricsPublisher metricsPublisher;
|
||||
private TestRMAppManager appMonitor;
|
||||
private ApplicationSubmissionContext asContext;
|
||||
private ApplicationId appId;
|
||||
|
@ -460,6 +463,8 @@ public class TestAppManager{
|
|||
Assert.assertNotNull("app is null", app);
|
||||
Assert.assertEquals("app id doesn't match", appId, app.getApplicationId());
|
||||
Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
|
||||
verify(metricsPublisher).appACLsUpdated(
|
||||
any(RMApp.class), any(String.class), anyLong());
|
||||
|
||||
// wait for event to be processed
|
||||
int timeoutSecs = 0;
|
||||
|
|
|
@ -99,14 +99,15 @@ public class TestSystemMetricsPublisher {
|
|||
RMApp app = createRMApp(appId);
|
||||
metricsPublisher.appCreated(app, app.getStartTime());
|
||||
metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime());
|
||||
metricsPublisher.appACLsUpdated(app, "uers1,user2", 4L);
|
||||
TimelineEntity entity = null;
|
||||
do {
|
||||
entity =
|
||||
store.getEntity(appId.toString(),
|
||||
ApplicationMetricsConstants.ENTITY_TYPE,
|
||||
EnumSet.allOf(Field.class));
|
||||
// ensure two events are both published before leaving the loop
|
||||
} while (entity == null || entity.getEvents().size() < 2);
|
||||
// ensure three events are both published before leaving the loop
|
||||
} while (entity == null || entity.getEvents().size() < 3);
|
||||
// verify all the fields
|
||||
Assert.assertEquals(ApplicationMetricsConstants.ENTITY_TYPE,
|
||||
entity.getEntityType());
|
||||
|
@ -133,8 +134,12 @@ public class TestSystemMetricsPublisher {
|
|||
Assert.assertEquals(app.getSubmitTime(),
|
||||
entity.getOtherInfo().get(
|
||||
ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO));
|
||||
Assert.assertEquals("uers1,user2",
|
||||
entity.getOtherInfo().get(
|
||||
ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO));
|
||||
boolean hasCreatedEvent = false;
|
||||
boolean hasFinishedEvent = false;
|
||||
boolean hasACLsUpdatedEvent = false;
|
||||
for (TimelineEvent event : entity.getEvents()) {
|
||||
if (event.getEventType().equals(
|
||||
ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
|
||||
|
@ -154,9 +159,13 @@ public class TestSystemMetricsPublisher {
|
|||
ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO));
|
||||
Assert.assertEquals(YarnApplicationState.FINISHED.toString(), event
|
||||
.getEventInfo().get(ApplicationMetricsConstants.STATE_EVENT_INFO));
|
||||
} else if (event.getEventType().equals(
|
||||
ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE)) {
|
||||
hasACLsUpdatedEvent = true;
|
||||
Assert.assertEquals(4L, event.getTimestamp());
|
||||
}
|
||||
}
|
||||
Assert.assertTrue(hasCreatedEvent && hasFinishedEvent);
|
||||
Assert.assertTrue(hasCreatedEvent && hasFinishedEvent && hasACLsUpdatedEvent);
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
|
|
Loading…
Reference in New Issue