YARN-1115: Provide optional means for a scheduler to check real user ACLs. Contributed by Eric Payne (epayne)

This commit is contained in:
Ahmed Hussein 2021-10-21 17:04:29 +00:00
parent bd077c3814
commit 742d88b1c6
15 changed files with 284 additions and 20 deletions

View File

@ -597,10 +597,12 @@ public SubmitApplicationResponse submitApplication(
// checked here, those that are dependent on RM configuration are validated // checked here, those that are dependent on RM configuration are validated
// in RMAppManager. // in RMAppManager.
UserGroupInformation userUgi = null;
String user = null; String user = null;
try { try {
// Safety // Safety
user = UserGroupInformation.getCurrentUser().getShortUserName(); userUgi = UserGroupInformation.getCurrentUser();
user = userUgi.getShortUserName();
} catch (IOException ie) { } catch (IOException ie) {
LOG.warn("Unable to get the current user.", ie); LOG.warn("Unable to get the current user.", ie);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
@ -691,7 +693,7 @@ public SubmitApplicationResponse submitApplication(
try { try {
// call RMAppManager to submit application directly // call RMAppManager to submit application directly
rmAppManager.submitApplication(submissionContext, rmAppManager.submitApplication(submissionContext,
System.currentTimeMillis(), user); System.currentTimeMillis(), userUgi);
LOG.info("Application with id " + applicationId.getId() + LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user); " submitted by user " + user);

View File

@ -352,16 +352,26 @@ protected synchronized void checkAppNumCompletedLimit() {
} }
} }
@SuppressWarnings("unchecked") @VisibleForTesting
@Deprecated
protected void submitApplication( protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime, ApplicationSubmissionContext submissionContext, long submitTime,
String user) throws YarnException { String user) throws YarnException {
submitApplication(submissionContext, submitTime,
UserGroupInformation.createRemoteUser(user));
}
@VisibleForTesting
@SuppressWarnings("unchecked")
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
UserGroupInformation userUgi) throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId(); ApplicationId applicationId = submissionContext.getApplicationId();
// Passing start time as -1. It will be eventually set in RMAppImpl // Passing start time as -1. It will be eventually set in RMAppImpl
// constructor. // constructor.
RMAppImpl application = createAndPopulateNewRMApp( RMAppImpl application = createAndPopulateNewRMApp(
submissionContext, submitTime, user, false, -1, null); submissionContext, submitTime, userUgi, false, -1, null);
try { try {
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
this.rmContext.getDelegationTokenRenewer() this.rmContext.getDelegationTokenRenewer()
@ -394,11 +404,21 @@ protected void recoverApplication(ApplicationStateData appState,
ApplicationSubmissionContext appContext = ApplicationSubmissionContext appContext =
appState.getApplicationSubmissionContext(); appState.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId(); ApplicationId appId = appContext.getApplicationId();
UserGroupInformation userUgi = null;
if (appState.getRealUser() != null) {
UserGroupInformation realUserUgi = null;
realUserUgi =
UserGroupInformation.createRemoteUser(appState.getRealUser());
userUgi = UserGroupInformation.createProxyUser(appState.getUser(),
realUserUgi);
} else {
userUgi = UserGroupInformation.createRemoteUser(appState.getUser());
}
// create and recover app. // create and recover app.
RMAppImpl application = RMAppImpl application =
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(), createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
appState.getUser(), true, appState.getStartTime(), userUgi, true, appState.getStartTime(),
appState.getState()); appState.getState());
application.handle(new RMAppRecoverEvent(appId, rmState)); application.handle(new RMAppRecoverEvent(appId, rmState));
@ -406,8 +426,9 @@ protected void recoverApplication(ApplicationStateData appState,
private RMAppImpl createAndPopulateNewRMApp( private RMAppImpl createAndPopulateNewRMApp(
ApplicationSubmissionContext submissionContext, long submitTime, ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovery, long startTime, UserGroupInformation userUgi, boolean isRecovery, long startTime,
RMAppState recoveredFinalState) throws YarnException { RMAppState recoveredFinalState) throws YarnException {
String user = userUgi.getShortUserName();
ApplicationPlacementContext placementContext = null; ApplicationPlacementContext placementContext = null;
if (recoveredFinalState == null) { if (recoveredFinalState == null) {
@ -431,7 +452,6 @@ private RMAppImpl createAndPopulateNewRMApp(
// Verify and get the update application priority and set back to // Verify and get the update application priority and set back to
// submissionContext // submissionContext
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
// Application priority needed to be validated only while submitting. During // Application priority needed to be validated only while submitting. During
// recovery, validated priority could be recovered from submission context. // recovery, validated priority could be recovered from submission context.
@ -503,7 +523,7 @@ private RMAppImpl createAndPopulateNewRMApp(
// Create RMApp // Create RMApp
RMAppImpl application = RMAppImpl application =
new RMAppImpl(applicationId, rmContext, this.conf, new RMAppImpl(applicationId, rmContext, this.conf,
submissionContext.getApplicationName(), user, submissionContext.getApplicationName(), userUgi,
submissionContext.getQueue(), submissionContext.getQueue(),
submissionContext, this.scheduler, this.masterService, submissionContext, this.scheduler, this.masterService,
submitTime, submissionContext.getApplicationType(), submitTime, submissionContext.getApplicationType(),

View File

@ -937,7 +937,8 @@ public void storeNewApplication(RMApp app) {
assert context instanceof ApplicationSubmissionContextPBImpl; assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationStateData appState = ApplicationStateData appState =
ApplicationStateData.newInstance(app.getSubmitTime(), ApplicationStateData.newInstance(app.getSubmitTime(),
app.getStartTime(), context, app.getUser(), app.getCallerContext()); app.getStartTime(), context, app.getUser(), app.getRealUser(),
app.getCallerContext());
appState.setApplicationTimeouts(app.getApplicationTimeouts()); appState.setApplicationTimeouts(app.getApplicationTimeouts());
getRMStateStoreEventHandler().handle(new RMStateStoreAppEvent(appState)); getRMStateStoreEventHandler().handle(new RMStateStoreAppEvent(appState));
} }
@ -1170,7 +1171,7 @@ public void removeApplication(RMApp app) {
ApplicationStateData appState = ApplicationStateData appState =
ApplicationStateData.newInstance(app.getSubmitTime(), ApplicationStateData.newInstance(app.getSubmitTime(),
app.getStartTime(), app.getApplicationSubmissionContext(), app.getStartTime(), app.getApplicationSubmissionContext(),
app.getUser(), app.getCallerContext()); app.getUser(), app.getRealUser(), app.getCallerContext());
for(RMAppAttempt appAttempt : app.getAppAttempts().values()) { for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
appState.attempts.put(appAttempt.getAppAttemptId(), null); appState.attempts.put(appAttempt.getAppAttemptId(), null);
} }

View File

@ -92,7 +92,44 @@ public static ApplicationStateData newInstance(long submitTime,
public static ApplicationStateData newInstance(long submitTime, public static ApplicationStateData newInstance(long submitTime,
long startTime, ApplicationSubmissionContext context, String user) { long startTime, ApplicationSubmissionContext context, String user) {
return newInstance(submitTime, startTime, context, user, null); return newInstance(submitTime, startTime, context, user,
(CallerContext) null);
}
public static ApplicationStateData newInstance(long submitTime,
long startTime, String user, String realUser,
ApplicationSubmissionContext submissionContext, RMAppState state,
String diagnostics, long launchTime, long finishTime,
CallerContext callerContext) {
ApplicationStateData appState =
newInstance(submitTime, startTime, user, submissionContext, state,
diagnostics, launchTime, finishTime, callerContext);
if (realUser != null) {
appState.setRealUser(realUser);
}
return appState;
}
public static ApplicationStateData newInstance(long submitTime,
long startTime, String user, String realUser,
ApplicationSubmissionContext submissionContext, RMAppState state,
String diagnostics, long launchTime, long finishTime,
CallerContext callerContext,
Map<ApplicationTimeoutType, Long> applicationTimeouts) {
ApplicationStateData appState =
newInstance(submitTime, startTime, user, submissionContext, state,
diagnostics, launchTime, finishTime, callerContext, applicationTimeouts);
if (realUser != null) {
appState.setRealUser(realUser);
}
return appState;
}
public static ApplicationStateData newInstance(long submitTime,
long startTime, ApplicationSubmissionContext context, String user,
String realUser, CallerContext callerContext) {
return newInstance(submitTime, startTime, user, realUser, context, null, "",
0, 0, callerContext);
} }
public int getAttemptCount() { public int getAttemptCount() {
@ -213,4 +250,8 @@ public abstract void setApplicationSubmissionContext(
@Public @Public
public abstract void setApplicationTimeouts( public abstract void setApplicationTimeouts(
Map<ApplicationTimeoutType, Long> applicationTimeouts); Map<ApplicationTimeoutType, Long> applicationTimeouts);
public abstract String getRealUser();
public abstract void setRealUser(String realUser);
} }

View File

@ -149,6 +149,21 @@ public void setUser(String user) {
builder.setUser(user); builder.setUser(user);
} }
@Override
public String getRealUser() {
ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasRealUser()) {
return null;
}
return (p.getRealUser());
}
@Override
public void setRealUser(String realUser) {
maybeInitBuilder();
builder.setRealUser(realUser);
}
@Override @Override
public ApplicationSubmissionContext getApplicationSubmissionContext() { public ApplicationSubmissionContext getApplicationSubmissionContext() {
ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;

View File

@ -325,4 +325,6 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
* @return Map of envs related to application scheduling preferences. * @return Map of envs related to application scheduling preferences.
*/ */
Map<String, String> getApplicationSchedulingEnvs(); Map<String, String> getApplicationSchedulingEnvs();
String getRealUser();
} }

View File

@ -133,6 +133,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private final RMContext rmContext; private final RMContext rmContext;
private final Configuration conf; private final Configuration conf;
private final String user; private final String user;
private final UserGroupInformation userUgi;
private final String name; private final String name;
private final ApplicationSubmissionContext submissionContext; private final ApplicationSubmissionContext submissionContext;
private final Dispatcher dispatcher; private final Dispatcher dispatcher;
@ -420,7 +421,19 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
String applicationType, Set<String> applicationTags, String applicationType, Set<String> applicationTags,
List<ResourceRequest> amReqs, ApplicationPlacementContext List<ResourceRequest> amReqs, ApplicationPlacementContext
placementContext, long startTime) { placementContext, long startTime) {
this(applicationId, rmContext, config, name,
(user != null ? UserGroupInformation.createRemoteUser(user) : null),
queue, submissionContext, scheduler, masterService, submitTime,
applicationType, applicationTags, amReqs, placementContext, startTime);
}
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
Configuration config, String name, UserGroupInformation userUgi,
String queue, ApplicationSubmissionContext submissionContext,
YarnScheduler scheduler, ApplicationMasterService masterService,
long submitTime, String applicationType, Set<String> applicationTags,
List<ResourceRequest> amReqs, ApplicationPlacementContext
placementContext, long startTime) {
this.systemClock = SystemClock.getInstance(); this.systemClock = SystemClock.getInstance();
this.applicationId = applicationId; this.applicationId = applicationId;
@ -429,7 +442,9 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
this.dispatcher = rmContext.getDispatcher(); this.dispatcher = rmContext.getDispatcher();
this.handler = dispatcher.getEventHandler(); this.handler = dispatcher.getEventHandler();
this.conf = config; this.conf = config;
this.user = StringInterner.weakIntern(user); this.user = StringInterner.weakIntern(
(userUgi != null) ? userUgi.getShortUserName() : null);
this.userUgi = userUgi;
this.queue = queue; this.queue = queue;
this.submissionContext = submissionContext; this.submissionContext = submissionContext;
this.scheduler = scheduler; this.scheduler = scheduler;
@ -1309,7 +1324,7 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event,
ApplicationStateData appState = ApplicationStateData appState =
ApplicationStateData.newInstance(this.submitTime, this.startTime, ApplicationStateData.newInstance(this.submitTime, this.startTime,
this.user, this.submissionContext, this.getUser(), this.getRealUser(), this.submissionContext,
stateToBeStored, diags, this.launchTime, this.storedFinishTime, stateToBeStored, diags, this.launchTime, this.storedFinishTime,
this.callerContext); this.callerContext);
appState.setApplicationTimeouts(this.applicationTimeouts); appState.setApplicationTimeouts(this.applicationTimeouts);
@ -1898,4 +1913,10 @@ long getLogAggregationStartTime() {
Clock getSystemClock() { Clock getSystemClock() {
return systemClock; return systemClock;
} }
@Override
public String getRealUser() {
UserGroupInformation realUserUgi = this.userUgi.getRealUser();
return (realUserUgi != null) ? realUserUgi.getShortUserName() : null;
}
} }

View File

@ -883,6 +883,10 @@ private void addApplicationOnRecovery(ApplicationId applicationId,
} catch (AccessControlException ace) { } catch (AccessControlException ace) {
// Ignore the exception for recovered app as the app was previously // Ignore the exception for recovered app as the app was previously
// accepted. // accepted.
LOG.warn("AccessControlException received when trying to recover "
+ applicationId + " in queue " + queueName + " for user " + user
+ ". Since the app was in the queue prior to recovery, the Capacity"
+ " Scheduler will recover the app anyway.", ace);
} }
queue.getMetrics().submitApp(user); queue.getMetrics().submitApp(user);
SchedulerApplication<FiCaSchedulerApp> application = SchedulerApplication<FiCaSchedulerApp> application =
@ -2777,7 +2781,7 @@ public Priority updateApplicationPriority(Priority newPriority,
ApplicationStateData appState = ApplicationStateData.newInstance( ApplicationStateData appState = ApplicationStateData.newInstance(
rmApp.getSubmitTime(), rmApp.getStartTime(), rmApp.getSubmitTime(), rmApp.getStartTime(),
rmApp.getApplicationSubmissionContext(), rmApp.getUser(), rmApp.getApplicationSubmissionContext(), rmApp.getUser(),
rmApp.getCallerContext()); rmApp.getRealUser(), rmApp.getCallerContext());
appState.setApplicationTimeouts(rmApp.getApplicationTimeouts()); appState.setApplicationTimeouts(rmApp.getApplicationTimeouts());
appState.setLaunchTime(rmApp.getLaunchTime()); appState.setLaunchTime(rmApp.getLaunchTime());
rmContext.getStateStore().updateApplicationStateSynchronously(appState, rmContext.getStateStore().updateApplicationStateSynchronously(appState,

View File

@ -72,6 +72,7 @@ message ApplicationStateDataProto {
optional hadoop.common.RPCCallerContextProto caller_context = 8; optional hadoop.common.RPCCallerContextProto caller_context = 8;
repeated ApplicationTimeoutMapProto application_timeouts = 9; repeated ApplicationTimeoutMapProto application_timeouts = 9;
optional int64 launch_time = 10; optional int64 launch_time = 10;
optional string real_user = 11;
} }
message ApplicationAttemptStateDataProto { message ApplicationAttemptStateDataProto {

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -71,7 +72,7 @@ public void submitApplication(
ApplicationSubmissionContext submissionContext, String user) ApplicationSubmissionContext submissionContext, String user)
throws YarnException { throws YarnException {
super.submitApplication(submissionContext, System.currentTimeMillis(), super.submitApplication(submissionContext, System.currentTimeMillis(),
user); UserGroupInformation.createRemoteUser(user));
} }
public String getUserNameForPlacement(final String user, public String getUserNameForPlacement(final String user,

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -169,7 +170,8 @@ public MyRMAppManager(RMContext context, YarnScheduler scheduler,
@Override @Override
protected void submitApplication( protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime, ApplicationSubmissionContext submissionContext, long submitTime,
String user) throws YarnException { UserGroupInformation userUgi) throws YarnException {
String user = userUgi.getShortUserName();
//Do nothing, just add the application to RMContext //Do nothing, just add the application to RMContext
RMAppImpl application = RMAppImpl application =
new RMAppImpl(submissionContext.getApplicationId(), this.rmContext, new RMAppImpl(submissionContext.getApplicationId(), this.rmContext,

View File

@ -68,6 +68,10 @@ public String getUser() {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");
} }
@Override
public String getRealUser() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override @Override
public ApplicationSubmissionContext getApplicationSubmissionContext() { public ApplicationSubmissionContext getApplicationSubmissionContext() {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");

View File

@ -119,6 +119,11 @@ public String getUser() {
return user; return user;
} }
@Override
public String getRealUser() {
return null;
}
public void setUser(String user) { public void setUser(String user) {
this.user = user; this.user = user;
} }

View File

@ -56,11 +56,15 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
@ -72,9 +76,13 @@
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -91,8 +99,10 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
@ -4812,4 +4822,139 @@ public void tearDown() throws Exception {
cs.stop(); cs.stop();
} }
} }
private static class TestRMAppManager extends RMAppManager {
TestRMAppManager(RMContext context, YarnScheduler scheduler,
ApplicationMasterService masterService,
ApplicationACLsManager applicationACLsManager, Configuration conf) {
super(context, scheduler, masterService, applicationACLsManager, conf);
}
@Override
public void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
UserGroupInformation userUgi) throws YarnException {
super.submitApplication(submissionContext, submitTime, userUgi);
}
}
@Test
public void testSubmitUsingRealUserAcls() throws Exception {
final String realUser = "AdminUser";
final String user0 = "user0";
final String user1 = "user1";
final String queue = "default";
YarnConfiguration conf = new YarnConfiguration();
MockRM rm = new MockRM();
rm.init(conf);
rm.start();
rm.getConfig().setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
UserGroupInformation realUserUgi =
UserGroupInformation.createRemoteUser(realUser);
UserGroupInformation ugi0 =
UserGroupInformation.createProxyUserForTesting("user0", realUserUgi,
new String[] {"group1"});
UserGroupInformation ugi1 =
UserGroupInformation.createProxyUserForTesting("user1", realUserUgi,
new String[] {"group1"});
ApplicationId applicationId0 = TestUtils.getMockApplicationId(0);
ApplicationId applicationId1 = TestUtils.getMockApplicationId(1);
CapacityScheduler cSched = (CapacityScheduler) rm.getResourceScheduler();
ParentQueue rootQueue = (ParentQueue) cSched.getRootQueue();
Map<AccessType, AccessControlList> rootAcls = rootQueue.acls;
rootAcls.put(AccessType.SUBMIT_APP, new AccessControlList(realUser));
rootAcls.put(AccessType.ADMINISTER_QUEUE, new AccessControlList(realUser));
LeafQueue defaultQueue = (LeafQueue)cSched.getQueue(queue);
Map<AccessType, AccessControlList> a = defaultQueue.acls;
a.put(AccessType.SUBMIT_APP, new AccessControlList(realUser));
a.put(AccessType.ADMINISTER_QUEUE, new AccessControlList(realUser));
TestRMAppManager testRmAppManager =
new TestRMAppManager(rmContext, cSched, rm.getApplicationMasterService(),
rm.getApplicationACLsManager(), rm.getConfig());
ContainerLaunchContext clc =
mock(ContainerLaunchContext.class);
ApplicationSubmissionContext asc =
ApplicationSubmissionContext.newInstance(
applicationId0, "myApp0", "default", Priority.newInstance(0),
clc, false, false, 1, Resource.newInstance(1024, 1));
// Each of the following test cases has a proxied user and a real user.
// The proxied users are user0 and user1, depending on the test. The real
// user is always AdminUser.
// Ensure that user0 is not allowed to submit to the default queue when only
// the admin user is in the submit ACL and the admin user does not have the
// USE_REAL_ACLS character prepended.
try {
testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi0);
Assert.fail(user0 + " should not be allowed to submit to the "
+ queue + " queue when only admin user is in submit ACL.");
} catch (YarnException e) {
// This is the expected behavior.
assertTrue("Should have received an AccessControlException.",
e.getCause() instanceof AccessControlException);
}
// With only user0 in the list of users authorized to submit apps to the
// queue, ensure that user0 is allowed to submit to the default queue.
a.put(AccessType.SUBMIT_APP, new AccessControlList(user0));
a.put(AccessType.ADMINISTER_QUEUE, new AccessControlList(realUser));
try {
testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi0);
} catch (YarnException e) {
Assert.fail(user0 + " should be allowed to submit to the "
+ queue + " queue.");
}
// With only user0 in the list of users authorized to submit apps to the
// queue, ensure that user1 is NOT allowed to submit to the default queue
try {
testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi1);
Assert.fail(user1 + " should not be allowed to submit to the "
+ queue + " queue.");
} catch (YarnException e) {
// This is the expected behavior.
assertTrue("Should have received an AccessControlException.",
e.getCause() instanceof AccessControlException);
}
// Even though the admin user is in the list of users allowed to submit to
// the default queue and user1's real user is the admin user, user1 should
// not be allowed to submit to queue because the ACL entry does not have the
// special character prepended in the list.
a.put(AccessType.SUBMIT_APP,
new AccessControlList(user0 + "," + realUser));
try {
testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi1);
Assert.fail(user1 + " should not be allowed to submit to the "
+ queue + " queue.");
} catch (YarnException e) {
// This is the expected behavior.
assertTrue("Should have received an AccessControlException.",
e.getCause() instanceof AccessControlException);
}
// user1 should now be allowed to submit to the default queue because the
// admin user is in the ACL list and has the USE_REAL_ACLS character
// prepended.
a.put(AccessType.SUBMIT_APP,
new AccessControlList(user0 + ","
+ AccessControlList.USE_REAL_ACLS + realUser));
asc.setApplicationId(applicationId1);
try {
testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi1);
} catch (YarnException e) {
LOG.error("failed to submit", e);
Assert.fail(user1 + " should be allowed to submit to the "
+ queue + " queue when real user is" + realUser + ".");
}
rm.stop();
rm.close();
}
} }

View File

@ -167,8 +167,8 @@ Configuration
| Property | Description | | Property | Description |
|:---- |:---- | |:---- |:---- |
| `yarn.scheduler.capacity.<queue-path>.state` | The *state* of the queue. Can be one of `RUNNING` or `STOPPED`. If a queue is in `STOPPED` state, new applications cannot be submitted to *itself* or *any of its child queues*. Thus, if the *root* queue is `STOPPED` no applications can be submitted to the entire cluster. Existing applications continue to completion, thus the queue can be *drained* gracefully. Value is specified as Enumeration. | | `yarn.scheduler.capacity.<queue-path>.state` | The *state* of the queue. Can be one of `RUNNING` or `STOPPED`. If a queue is in `STOPPED` state, new applications cannot be submitted to *itself* or *any of its child queues*. Thus, if the *root* queue is `STOPPED` no applications can be submitted to the entire cluster. Existing applications continue to completion, thus the queue can be *drained* gracefully. Value is specified as Enumeration. |
| `yarn.scheduler.capacity.root.<queue-path>.acl_submit_applications` | The *ACL* which controls who can *submit* applications to the given queue. If the given user/group has necessary ACLs on the given queue or *one of the parent queues in the hierarchy* they can submit applications. *ACLs* for this property *are* inherited from the parent queue if not specified. | | `yarn.scheduler.capacity.root.<queue-path>.acl_submit_applications` | The *ACL* which controls who can *submit* applications to the given queue. If the given user/group has necessary ACLs on the given queue or *one of the parent queues in the hierarchy* they can submit applications. *ACLs* for this property *are* inherited from the parent queue if not specified. If a tilde (~) is prepended to a user name in this list, the real user's ACLs will allow the proxied user to submit to the queue. |
| `yarn.scheduler.capacity.root.<queue-path>.acl_administer_queue` | The *ACL* which controls who can *administer* applications on the given queue. If the given user/group has necessary ACLs on the given queue or *one of the parent queues in the hierarchy* they can administer applications. *ACLs* for this property *are* inherited from the parent queue if not specified. | | `yarn.scheduler.capacity.root.<queue-path>.acl_administer_queue` | The *ACL* which controls who can *administer* applications on the given queue. If the given user/group has necessary ACLs on the given queue or *one of the parent queues in the hierarchy* they can administer applications. *ACLs* for this property *are* inherited from the parent queue if not specified. If a tilde (~) is prepended to a user name in this list, the real user's ACLs will allow the proxied user to administer apps the queue. |
**Note:** An *ACL* is of the form *user1*,*user2* *space* *group1*,*group2*. The special value of * implies *anyone*. The special value of *space* implies *no one*. The default is * for the root queue if not specified. **Note:** An *ACL* is of the form *user1*,*user2* *space* *group1*,*group2*. The special value of * implies *anyone*. The special value of *space* implies *no one*. The default is * for the root queue if not specified.