YARN-3955. Support for application priority ACLs in queues of CapacityScheduler. (Sunil G via wangda)

This commit is contained in:
Wangda Tan 2017-01-09 08:40:39 -08:00
parent db490eccce
commit 287d3d6804
22 changed files with 1130 additions and 98 deletions

View File

@ -963,7 +963,7 @@ final public class ResourceSchedulerWrapper
@Override
public Priority checkAndGetApplicationPriority(Priority priority,
String user, String queueName, ApplicationId applicationId)
UserGroupInformation user, String queueName, ApplicationId applicationId)
throws YarnException {
// TODO Dummy implementation.
return Priority.newInstance(0);

View File

@ -30,4 +30,6 @@ public enum AccessType {
// queue
SUBMIT_APP,
ADMINISTER_QUEUE,
// application
APPLICATION_MAX_PRIORITY,
}

View File

@ -97,6 +97,15 @@
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.acl_application_max_priority</name>
<value>*</value>
<description>
The ACL of who can submit applications with configured priority.
For e.g, [user={name} group={name} max_priority={priority} default_priority={priority}]
</description>
</property>
<property>
<name>yarn.scheduler.capacity.node-locality-delay</name>
<value>40</value>

View File

@ -1614,7 +1614,8 @@ public class ClientRMService extends AbstractService implements
}
try {
rmAppManager.updateApplicationPriority(applicationId, newAppPriority);
rmAppManager.updateApplicationPriority(callerUGI, applicationId,
newAppPriority);
} catch (YarnException ex) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService",

View File

@ -360,12 +360,12 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
// Verify and get the update application priority and set back to
// submissionContext
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
Priority appPriority = scheduler.checkAndGetApplicationPriority(
submissionContext.getPriority(), user, submissionContext.getQueue(),
submissionContext.getPriority(), userUgi, submissionContext.getQueue(),
applicationId);
submissionContext.setPriority(appPriority);
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
// Since FairScheduler queue mapping is done inside scheduler,
// if FairScheduler is used and the queue doesn't exist, we should not
// fail here because queue will be created inside FS. Ideally, FS queue
@ -569,12 +569,14 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
/**
* updateApplicationPriority will invoke scheduler api to update the
* new priority to RM and StateStore.
* @param callerUGI user
* @param applicationId Application Id
* @param newAppPriority proposed new application priority
* @throws YarnException Handle exceptions
*/
public void updateApplicationPriority(ApplicationId applicationId,
Priority newAppPriority) throws YarnException {
public void updateApplicationPriority(UserGroupInformation callerUGI,
ApplicationId applicationId, Priority newAppPriority)
throws YarnException {
RMApp app = this.rmContext.getRMApps().get(applicationId);
synchronized (applicationId) {
@ -587,8 +589,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
// Invoke scheduler api to update priority in scheduler and to
// State Store.
Priority appPriority = rmContext.getScheduler()
.updateApplicationPriority(newAppPriority, applicationId, future);
Priority appPriority = rmContext.getScheduler().updateApplicationPriority(
newAppPriority, applicationId, future, callerUGI);
if (app.getApplicationPriority().equals(appPriority)) {
return;

View File

@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -788,9 +789,9 @@ public abstract class AbstractYarnScheduler
}
@Override
public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
String user, String queueName, ApplicationId applicationId)
throws YarnException {
public Priority checkAndGetApplicationPriority(
Priority priorityRequestedByApp, UserGroupInformation user,
String queueName, ApplicationId applicationId) throws YarnException {
// Dummy Implementation till Application Priority changes are done in
// specific scheduler.
return Priority.newInstance(0);
@ -798,7 +799,8 @@ public abstract class AbstractYarnScheduler
@Override
public Priority updateApplicationPriority(Priority newPriority,
ApplicationId applicationId, SettableFuture<Object> future)
ApplicationId applicationId, SettableFuture<Object> future,
UserGroupInformation user)
throws YarnException {
// Dummy Implementation till Application Priority changes are done in
// specific scheduler.

View File

@ -307,7 +307,7 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
* Verify whether a submitted application priority is valid as per configured
* Queue
*
* @param priorityFromContext
* @param priorityRequestedByApp
* Submitted Application priority.
* @param user
* User who submitted the Application
@ -317,8 +317,8 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
* Application ID
* @return Updated Priority from scheduler
*/
public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
String user, String queueName, ApplicationId applicationId)
public Priority checkAndGetApplicationPriority(Priority priorityRequestedByApp,
UserGroupInformation user, String queueName, ApplicationId applicationId)
throws YarnException;
/**
@ -330,12 +330,13 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
* @param applicationId Application ID
*
* @param future Sets any type of exception happened from StateStore
* @param user who submitted the application
*
* @return updated priority
*/
public Priority updateApplicationPriority(Priority newPriority,
ApplicationId applicationId, SettableFuture<Object> future)
throws YarnException;
ApplicationId applicationId, SettableFuture<Object> future,
UserGroupInformation user) throws YarnException;
/**
*

View File

@ -0,0 +1,219 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.Priority;
/**
*
* PriorityACLConfiguration class is used to parse Application Priority ACL
* configuration from capcity-scheduler.xml
*/
public class AppPriorityACLConfigurationParser {
private static final Log LOG = LogFactory
.getLog(AppPriorityACLConfigurationParser.class);
public enum AppPriorityACLKeyType {
USER(1), GROUP(2), MAX_PRIORITY(3), DEFAULT_PRIORITY(4);
private final int id;
AppPriorityACLKeyType(int id) {
this.id = id;
}
public int getId() {
return this.id;
}
}
public static final String PATTERN_FOR_PRIORITY_ACL = "\\[([^\\]]+)";
@Private
public static final String ALL_ACL = "*";
@Private
public static final String NONE_ACL = " ";
public List<AppPriorityACLGroup> getPriorityAcl(Priority clusterMaxPriority,
String aclString) {
List<AppPriorityACLGroup> aclList = new ArrayList<AppPriorityACLGroup>();
Matcher matcher = Pattern.compile(PATTERN_FOR_PRIORITY_ACL)
.matcher(aclString);
/*
* Each ACL group will be separated by "[]". Syntax of each ACL group could
* be like below "user=b1,b2 group=g1 max-priority=a2 default-priority=a1"
* Ideally this means "for this given user/group, maximum possible priority
* is a2 and if the user has not specified any priority, then it is a1."
*/
while (matcher.find()) {
// Get the first ACL sub-group.
String aclSubGroup = matcher.group(1);
if (aclSubGroup.trim().isEmpty()) {
continue;
}
/*
* Internal storage is PriorityACLGroup which stores each parsed priority
* ACLs group. This will help while looking for a user to priority mapping
* during app submission time. ACLs will be passed in below order only. 1.
* user/group 2. max-priority 3. default-priority
*/
AppPriorityACLGroup userPriorityACL = new AppPriorityACLGroup();
// userAndGroupName will hold user acl and group acl as interim storage
// since both user/group acl comes with separate key value pairs.
List<StringBuilder> userAndGroupName = new ArrayList<>();
for (String kvPair : aclSubGroup.trim().split(" +")) {
/*
* There are 3 possible options for key here: 1. user/group 2.
* max-priority 3. default-priority
*/
String[] splits = kvPair.split("=");
// Ensure that each ACL sub string is key value pair separated by '='.
if (splits != null && splits.length > 1) {
parsePriorityACLType(userPriorityACL, splits, userAndGroupName);
}
}
// If max_priority is higher to clusterMaxPriority, its better to
// handle here.
if (userPriorityACL.getMaxPriority().getPriority() > clusterMaxPriority
.getPriority()) {
LOG.warn("ACL configuration for '" + userPriorityACL.getMaxPriority()
+ "' is greater that cluster max priority. Resetting ACLs to "
+ clusterMaxPriority);
userPriorityACL.setMaxPriority(
Priority.newInstance(clusterMaxPriority.getPriority()));
}
AccessControlList acl = createACLStringForPriority(userAndGroupName);
userPriorityACL.setACLList(acl);
aclList.add(userPriorityACL);
}
return aclList;
}
/*
* Parse different types of ACLs sub parts for on priority group and store in
* a map for later processing.
*/
private void parsePriorityACLType(AppPriorityACLGroup userPriorityACL,
String[] splits, List<StringBuilder> userAndGroupName) {
// Here splits will have the key value pair at index 0 and 1 respectively.
// To parse all keys, its better to convert to PriorityACLConfig enum.
AppPriorityACLKeyType aclType = AppPriorityACLKeyType
.valueOf(StringUtils.toUpperCase(splits[0].trim()));
switch (aclType) {
case MAX_PRIORITY :
userPriorityACL
.setMaxPriority(Priority.newInstance(Integer.parseInt(splits[1])));
break;
case USER :
userAndGroupName.add(getUserOrGroupACLStringFromConfig(splits[1]));
break;
case GROUP :
userAndGroupName.add(getUserOrGroupACLStringFromConfig(splits[1]));
break;
case DEFAULT_PRIORITY :
int defaultPriority = Integer.parseInt(splits[1]);
Priority priority = (defaultPriority < 0)
? Priority.newInstance(0)
: Priority.newInstance(defaultPriority);
userPriorityACL.setDefaultPriority(priority);
break;
default:
break;
}
}
/*
* This method will help to append different types of ACLs keys against one
* priority. For eg,USER will be appended with GROUP as "user2,user4 group1".
*/
private AccessControlList createACLStringForPriority(
List<StringBuilder> acls) {
String finalACL = "";
String userACL = acls.get(0).toString();
// If any of user/group is *, consider it as acceptable for all.
// "user" is at index 0, and "group" is at index 1.
if (userACL.trim().equals(ALL_ACL)) {
finalACL = ALL_ACL;
} else if (userACL.equals(NONE_ACL)) {
finalACL = NONE_ACL;
} else {
// Get USER segment
if (!userACL.trim().isEmpty()) {
// skip last appended ","
finalACL = acls.get(0).toString();
}
// Get GROUP segment if any
if (acls.size() > 1) {
String groupACL = acls.get(1).toString();
if (!groupACL.trim().isEmpty()) {
finalACL = finalACL + " "
+ acls.get(1).toString();
}
}
}
// Here ACL will look like "user1,user2 group" in ideal cases.
return new AccessControlList(finalACL.trim());
}
/*
* This method will help to append user/group acl string against given
* priority. For example "user1,user2 group1,group2"
*/
private StringBuilder getUserOrGroupACLStringFromConfig(String value) {
// ACL strings could be generate for USER or GRUOP.
// aclList in map contains two entries. 1. USER, 2. GROUP.
StringBuilder aclTypeName = new StringBuilder();
if (value.trim().equals(ALL_ACL)) {
aclTypeName.setLength(0);
aclTypeName.append(ALL_ACL);
return aclTypeName;
}
aclTypeName.append(value.trim());
return aclTypeName;
}
}

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Priority;
/**
* PriorityACLGroup will hold all ACL related information per priority.
*
*/
public class AppPriorityACLGroup implements Comparable<AppPriorityACLGroup> {
private Priority maxPriority = null;
private Priority defaultPriority = null;
private AccessControlList aclList = null;
public AppPriorityACLGroup(Priority maxPriority, Priority defaultPriority,
AccessControlList aclList) {
this.setMaxPriority(Priority.newInstance(maxPriority.getPriority()));
this.setDefaultPriority(
Priority.newInstance(defaultPriority.getPriority()));
this.setACLList(aclList);
}
public AppPriorityACLGroup() {
}
@Override
public int compareTo(AppPriorityACLGroup o) {
return getMaxPriority().compareTo(o.getMaxPriority());
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
AppPriorityACLGroup other = (AppPriorityACLGroup) obj;
if (getMaxPriority() != other.getMaxPriority()) {
return false;
}
if (getDefaultPriority() != other.getDefaultPriority()) {
return false;
}
return true;
}
@Override
public int hashCode() {
final int prime = 517861;
int result = 9511;
result = prime * result + getMaxPriority().getPriority();
result = prime * result + getDefaultPriority().getPriority();
return result;
}
public Priority getMaxPriority() {
return maxPriority;
}
public Priority getDefaultPriority() {
return defaultPriority;
}
public AccessControlList getACLList() {
return aclList;
}
public void setMaxPriority(Priority maxPriority) {
this.maxPriority = maxPriority;
}
public void setDefaultPriority(Priority defaultPriority) {
this.defaultPriority = defaultPriority;
}
public void setACLList(AccessControlList accessControlList) {
this.aclList = accessControlList;
}
}

View File

@ -137,6 +137,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@ -227,6 +228,7 @@ public class CapacityScheduler extends
private List<AsyncScheduleThread> asyncSchedulerThreads;
private ResourceCommitterService resourceCommitterService;
private RMNodeLabelsManager labelManager;
private AppPriorityACLsManager appPriorityACLManager;
/**
* EXPERT
@ -308,8 +310,9 @@ public class CapacityScheduler extends
this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.applications = new ConcurrentHashMap<>();
this.labelManager = rmContext.getNodeLabelManager();
this.appPriorityACLManager = new AppPriorityACLsManager(conf);
this.queueManager = new CapacitySchedulerQueueManager(yarnConf,
this.labelManager);
this.labelManager, this.appPriorityACLManager);
this.queueManager.setCapacitySchedulerContext(this);
this.activitiesManager = new ActivitiesManager(rmContext);
@ -2188,46 +2191,67 @@ public class CapacityScheduler extends
}
@Override
public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
String user, String queueName, ApplicationId applicationId)
throws YarnException {
Priority appPriority = null;
// ToDo: Verify against priority ACLs
public Priority checkAndGetApplicationPriority(
Priority priorityRequestedByApp, UserGroupInformation user,
String queueName, ApplicationId applicationId) throws YarnException {
try {
readLock.lock();
Priority appPriority = priorityRequestedByApp;
// Verify the scenario where priority is null from submissionContext.
if (null == priorityFromContext) {
// Get the default priority for the Queue. If Queue is non-existent, then
// use default priority
priorityFromContext = this.queueManager.getDefaultPriorityForQueue(
queueName);
if (null == appPriority) {
// Verify whether submitted user has any default priority set. If so,
// user's default priority will get precedence over queue default.
// for updateApplicationPriority call flow, this check is done in
// CientRMService itself.
appPriority = this.appPriorityACLManager.getDefaultPriority(queueName,
user);
LOG.info("Application '" + applicationId
+ "' is submitted without priority "
// Get the default priority for the Queue. If Queue is non-existent,
// then
// use default priority. Do it only if user doesnt have any default.
if (null == appPriority) {
appPriority = this.queueManager.getDefaultPriorityForQueue(queueName);
}
LOG.info(
"Application '" + applicationId + "' is submitted without priority "
+ "hence considering default queue/cluster priority: "
+ priorityFromContext.getPriority());
+ appPriority.getPriority());
}
// Verify whether submitted priority is lesser than max priority
// in the cluster. If it is out of found, defining a max cap.
if (priorityFromContext.compareTo(getMaxClusterLevelAppPriority()) < 0) {
priorityFromContext = Priority
if (appPriority.getPriority() > getMaxClusterLevelAppPriority()
.getPriority()) {
appPriority = Priority
.newInstance(getMaxClusterLevelAppPriority().getPriority());
}
appPriority = priorityFromContext;
// Lets check for ACLs here.
if (!appPriorityACLManager.checkAccess(user, queueName, appPriority)) {
throw new YarnException(new AccessControlException(
"User " + user + " does not have permission to submit/update "
+ applicationId + " for " + appPriority));
}
LOG.info("Priority '" + appPriority.getPriority()
+ "' is acceptable in queue : " + queueName + " for application: "
+ applicationId + " for the user: " + user);
+ applicationId);
return appPriority;
} finally {
readLock.unlock();
}
}
@Override
public Priority updateApplicationPriority(Priority newPriority,
ApplicationId applicationId, SettableFuture<Object> future)
ApplicationId applicationId, SettableFuture<Object> future,
UserGroupInformation user)
throws YarnException {
try {
writeLock.lock();
Priority appPriority = null;
SchedulerApplication<FiCaSchedulerApp> application = applications
.get(applicationId);
@ -2239,7 +2263,7 @@ public class CapacityScheduler extends
RMApp rmApp = rmContext.getRMApps().get(applicationId);
appPriority = checkAndGetApplicationPriority(newPriority, rmApp.getUser(),
appPriority = checkAndGetApplicationPriority(newPriority, user,
rmApp.getQueue(), applicationId);
if (application.getPriority().equals(appPriority)) {
@ -2268,6 +2292,9 @@ public class CapacityScheduler extends
+ rmApp.getQueue() + " for application: " + applicationId
+ " for the user: " + rmApp.getUser());
return appPriority;
} finally {
writeLock.unlock();
}
}
@Override

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.ReservationACL;
@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
@ -274,6 +276,8 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false;
AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser();
public CapacitySchedulerConfiguration() {
this(new Configuration());
}
@ -602,6 +606,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return "acl_" + StringUtils.toLowerCase(acl.toString());
}
private static String getAclKey(AccessType acl) {
return "acl_" + StringUtils.toLowerCase(acl.toString());
}
@Override
public Map<ReservationACL, AccessControlList> getReservationAcls(String
queue) {
@ -627,6 +635,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
set(queuePrefix + getAclKey(acl), aclString);
}
private void setAcl(String queue, AccessType acl, String aclString) {
String queuePrefix = getQueuePrefix(queue);
set(queuePrefix + getAclKey(acl), aclString);
}
public Map<AccessType, AccessControlList> getAcls(String queue) {
Map<AccessType, AccessControlList> acls =
new HashMap<AccessType, AccessControlList>();
@ -650,6 +663,35 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
}
}
@VisibleForTesting
public void setPriorityAcls(String queue, Priority priority,
Priority defaultPriority, String[] acls) {
StringBuilder aclString = new StringBuilder();
StringBuilder userAndGroup = new StringBuilder();
for (int i = 0; i < acls.length; i++) {
userAndGroup.append(AppPriorityACLKeyType.values()[i] + "=" + acls[i].trim())
.append(" ");
}
aclString.append("[" + userAndGroup.toString().trim() + " "
+ "max_priority=" + priority.getPriority() + " " + "default_priority="
+ defaultPriority.getPriority() + "]");
setAcl(queue, AccessType.APPLICATION_MAX_PRIORITY, aclString.toString());
}
public List<AppPriorityACLGroup> getPriorityAcls(String queue,
Priority clusterMaxPriority) {
String queuePrefix = getQueuePrefix(queue);
String defaultAcl = ALL_ACL;
String aclString = get(
queuePrefix + getAclKey(AccessType.APPLICATION_MAX_PRIORITY),
defaultAcl);
return priorityACLConfig.getPriorityAcl(clusterMaxPriority, aclString);
}
public String[] getQueues(String queue) {
LOG.debug("CSConf - getQueues called for: queuePrefix=" + getQueuePrefix(queue));
String[] queues = getStrings(getQueuePrefix(queue) + QUEUES);

View File

@ -23,6 +23,7 @@ import java.util.Comparator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
@ -85,4 +86,10 @@ public interface CapacitySchedulerContext {
ActivitiesManager getActivitiesManager();
CapacitySchedulerQueueManager getCapacitySchedulerQueueManager();
/**
*
* @return Max Cluster level App priority.
*/
Priority getMaxClusterLevelAppPriority();
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
/**
*
@ -86,6 +87,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
private final Map<String, CSQueue> queues = new ConcurrentHashMap<>();
private CSQueue root;
private final RMNodeLabelsManager labelManager;
private AppPriorityACLsManager appPriorityACLManager;
private QueueStateManager<CSQueue, CapacitySchedulerConfiguration>
queueStateManager;
@ -94,12 +96,15 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
* Construct the service.
* @param conf the configuration
* @param labelManager the labelManager
* @param appPriorityACLManager App priority ACL manager
*/
public CapacitySchedulerQueueManager(Configuration conf,
RMNodeLabelsManager labelManager) {
RMNodeLabelsManager labelManager,
AppPriorityACLsManager appPriorityACLManager) {
this.authorizer = YarnAuthorizationProvider.getInstance(conf);
this.labelManager = labelManager;
this.queueStateManager = new QueueStateManager<>();
this.appPriorityACLManager = appPriorityACLManager;
}
@Override
@ -145,7 +150,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
throws IOException {
root = parseQueue(this.csContext, conf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP);
setQueueAcls(authorizer, queues);
setQueueAcls(authorizer, appPriorityACLManager, queues);
labelManager.reinitializeQueueLabels(getQueueToLabels());
this.queueStateManager.initialize(this);
LOG.info("Initialized root queue " + root);
@ -168,7 +173,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
// Re-configure queues
root.reinitialize(newRoot, this.csContext.getClusterResource());
setQueueAcls(authorizer, queues);
setQueueAcls(authorizer, appPriorityACLManager, queues);
// Re-calculate headroom for active applications
Resource clusterResource = this.csContext.getClusterResource();
@ -305,12 +310,22 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
* @throws IOException if fails to set queue acls
*/
public static void setQueueAcls(YarnAuthorizationProvider authorizer,
Map<String, CSQueue> queues) throws IOException {
AppPriorityACLsManager appPriorityACLManager, Map<String, CSQueue> queues)
throws IOException {
List<Permission> permissions = new ArrayList<>();
for (CSQueue queue : queues.values()) {
AbstractCSQueue csQueue = (AbstractCSQueue) queue;
permissions.add(
new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs()));
if (queue instanceof LeafQueue) {
LeafQueue lQueue = (LeafQueue) queue;
// Clear Priority ACLs first since reinitialize also call same.
appPriorityACLManager.clearPriorityACLs(lQueue.getQueueName());
appPriorityACLManager.addPrioirityACLs(lQueue.getPriorityACLs(),
lQueue.getQueueName());
}
}
authorizer.setPermission(permissions,
UserGroupInformation.getCurrentUser());

View File

@ -140,6 +140,9 @@ public class LeafQueue extends AbstractCSQueue {
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
new ConcurrentHashMap<>();
List<AppPriorityACLGroup> priorityAcls =
new ArrayList<AppPriorityACLGroup>();
@SuppressWarnings({ "unchecked", "rawtypes" })
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
@ -205,6 +208,9 @@ public class LeafQueue extends AbstractCSQueue {
conf.getMaximumApplicationMasterResourcePerQueuePercent(
getQueuePath());
priorityAcls = conf.getPriorityAcls(getQueuePath(),
scheduler.getMaxClusterLevelAppPriority());
if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels,
this.defaultLabelExpression, null)) {
throw new IOException(
@ -497,6 +503,16 @@ public class LeafQueue extends AbstractCSQueue {
}
}
@Private
public List<AppPriorityACLGroup> getPriorityACLs() {
try {
readLock.lock();
return new ArrayList<>(priorityAcls);
} finally {
readLock.unlock();
}
}
@Override
public void reinitialize(
CSQueue newlyParsedQueue, Resource clusterResource)

View File

@ -0,0 +1,230 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.security;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLGroup;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
*
* Manager class to store and check permission for Priority ACLs.
*/
public class AppPriorityACLsManager {
private static final Log LOG = LogFactory
.getLog(AppPriorityACLsManager.class);
/*
* An internal class to store ACLs specific to each priority. This will be
* used to read and process acl's during app submission time as well.
*/
private static class PriorityACL {
private Priority priority;
private Priority defaultPriority;
private AccessControlList acl;
PriorityACL(Priority priority, Priority defaultPriority,
AccessControlList acl) {
this.setPriority(priority);
this.setDefaultPriority(defaultPriority);
this.setAcl(acl);
}
public Priority getPriority() {
return priority;
}
public void setPriority(Priority maxPriority) {
this.priority = maxPriority;
}
public Priority getDefaultPriority() {
return defaultPriority;
}
public void setDefaultPriority(Priority defaultPriority) {
this.defaultPriority = defaultPriority;
}
public AccessControlList getAcl() {
return acl;
}
public void setAcl(AccessControlList acl) {
this.acl = acl;
}
}
private boolean isACLsEnable;
private final ConcurrentMap<String, List<PriorityACL>> allAcls =
new ConcurrentHashMap<>();
public AppPriorityACLsManager(Configuration conf) {
this.isACLsEnable = conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
YarnConfiguration.DEFAULT_YARN_ACL_ENABLE);
}
/**
* Clear priority acl during refresh.
*
* @param queueName
* Queue Name
*/
public void clearPriorityACLs(String queueName) {
allAcls.remove(queueName);
}
/**
* Each Queue could have configured with different priority acl's groups. This
* method helps to store each such ACL list against queue.
*
* @param priorityACLGroups
* List of Priority ACL Groups.
* @param queueName
* Queue Name associate with priority acl groups.
*/
public void addPrioirityACLs(List<AppPriorityACLGroup> priorityACLGroups,
String queueName) {
List<PriorityACL> priorityACL = allAcls.get(queueName);
if (null == priorityACL) {
priorityACL = new ArrayList<PriorityACL>();
allAcls.put(queueName, priorityACL);
}
// Ensure lowest priority PriorityACLGroup comes first in the list.
Collections.sort(priorityACLGroups);
for (AppPriorityACLGroup priorityACLGroup : priorityACLGroups) {
priorityACL.add(new PriorityACL(priorityACLGroup.getMaxPriority(),
priorityACLGroup.getDefaultPriority(),
priorityACLGroup.getACLList()));
if (LOG.isDebugEnabled()) {
LOG.debug("Priority ACL group added: max-priority - "
+ priorityACLGroup.getMaxPriority() + "default-priority - "
+ priorityACLGroup.getDefaultPriority());
}
}
}
/**
* Priority based checkAccess to ensure that given user has enough permission
* to submit application at a given priority level.
*
* @param callerUGI
* User who submits the application.
* @param queueName
* Queue to which application is submitted.
* @param submittedPriority
* priority of the application.
* @return True or False to indicate whether application can be submitted at
* submitted priority level or not.
*/
public boolean checkAccess(UserGroupInformation callerUGI, String queueName,
Priority submittedPriority) {
if (!isACLsEnable) {
return true;
}
List<PriorityACL> acls = allAcls.get(queueName);
if (acls == null || acls.isEmpty()) {
return true;
}
PriorityACL approvedPriorityACL = getMappedPriorityAclForUGI(acls,
callerUGI, submittedPriority);
if (approvedPriorityACL == null) {
return false;
}
return true;
}
/**
* If an application is submitted without any priority, and submitted user has
* a default priority, this method helps to update this default priority as
* app's priority.
*
* @param queueName
* Submitted queue
* @param user
* User who submitted this application
* @return Default priority associated with given user.
*/
public Priority getDefaultPriority(String queueName,
UserGroupInformation user) {
if (!isACLsEnable) {
return null;
}
List<PriorityACL> acls = allAcls.get(queueName);
if (acls == null || acls.isEmpty()) {
return null;
}
PriorityACL approvedPriorityACL = getMappedPriorityAclForUGI(acls, user,
null);
if (approvedPriorityACL == null) {
return null;
}
Priority defaultPriority = Priority
.newInstance(approvedPriorityACL.getDefaultPriority().getPriority());
return defaultPriority;
}
private PriorityACL getMappedPriorityAclForUGI(List<PriorityACL> acls ,
UserGroupInformation user, Priority submittedPriority) {
// Iterate through all configured ACLs starting from lower priority.
// If user is found corresponding to a configured priority, then store
// that entry. if failed, continue iterate through whole acl list.
PriorityACL selectedAcl = null;
for (PriorityACL entry : acls) {
AccessControlList list = entry.getAcl();
if (list.isUserAllowed(user)) {
selectedAcl = entry;
// If submittedPriority is passed through the argument, also check
// whether submittedPriority is under max-priority of each ACL group.
if (submittedPriority != null) {
selectedAcl = null;
if (submittedPriority.getPriority() <= entry.getPriority()
.getPriority()) {
return entry;
}
}
}
}
return selectedAcl;
}
}

View File

@ -43,6 +43,8 @@ public abstract class ACLsTestBase {
protected static final String COMMON_USER = "common_user";
protected static final String QUEUE_A_USER = "queueA_user";
protected static final String QUEUE_B_USER = "queueB_user";
protected static final String QUEUE_A_GROUP = "queueA_group";
protected static final String QUEUE_B_GROUP = "queueB_group";
protected static final String ROOT_ADMIN = "root_admin";
protected static final String QUEUE_A_ADMIN = "queueA_admin";
protected static final String QUEUE_B_ADMIN = "queueB_admin";
@ -53,7 +55,7 @@ public abstract class ACLsTestBase {
protected static final Log LOG = LogFactory.getLog(TestApplicationACLs.class);
MockRM resourceManager;
protected MockRM resourceManager;
Configuration conf;
YarnRPC rpc;
InetSocketAddress rmAddress;
@ -68,6 +70,7 @@ public abstract class ACLsTestBase {
AccessControlList adminACL = new AccessControlList("");
conf.set(YarnConfiguration.YARN_ADMIN_ACL, adminACL.getAclString());
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
resourceManager = new MockRM(conf) {
protected ClientRMService createClientRMService() {

View File

@ -29,6 +29,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@ -463,7 +464,9 @@ public class TestApplicationMasterService {
// Change the priority of App1 to 8
Priority appPriority2 = Priority.newInstance(8);
rm.getRMAppManager().updateApplicationPriority(app1.getApplicationId(),
UserGroupInformation ugi = UserGroupInformation
.createRemoteUser(app1.getUser());
rm.getRMAppManager().updateApplicationPriority(ugi, app1.getApplicationId(),
appPriority2);
AllocateResponse response2 = am1.allocate(allocateRequest);

View File

@ -1123,7 +1123,7 @@ public class TestClientRMService {
when(yarnScheduler.getResourceCalculator()).thenReturn(rs);
when(yarnScheduler.checkAndGetApplicationPriority(any(Priority.class),
anyString(), anyString(), any(ApplicationId.class)))
any(UserGroupInformation.class), anyString(), any(ApplicationId.class)))
.thenReturn(Priority.newInstance(0));
return yarnScheduler;
}

View File

@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@ -344,7 +345,10 @@ public class TestApplicationPriority {
// Change the priority of App1 to 8
Priority appPriority2 = Priority.newInstance(8);
cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null);
UserGroupInformation ugi = UserGroupInformation
.createRemoteUser(app1.getUser());
cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null,
ugi);
// get scheduler app
FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
@ -378,7 +382,10 @@ public class TestApplicationPriority {
// Change the priority of App1 to 15
Priority appPriority2 = Priority.newInstance(15);
cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null);
UserGroupInformation ugi = UserGroupInformation
.createRemoteUser(app1.getUser());
cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null,
ugi);
// get scheduler app
FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
@ -428,7 +435,10 @@ public class TestApplicationPriority {
// Change the priority of App1 to 8
Priority appPriority2 = Priority.newInstance(8);
cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null);
UserGroupInformation ugi = UserGroupInformation
.createRemoteUser(app1.getUser());
cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null,
ugi);
// let things settle down
Thread.sleep(1000);
@ -557,7 +567,10 @@ public class TestApplicationPriority {
// Change the priority of App1 to 3 (lowest)
Priority appPriority3 = Priority.newInstance(3);
cs.updateApplicationPriority(appPriority3, app2.getApplicationId(), null);
UserGroupInformation ugi = UserGroupInformation
.createRemoteUser(app2.getUser());
cs.updateApplicationPriority(appPriority3, app2.getApplicationId(), null,
ugi);
// add request for containers App2
am2.allocate("127.0.0.1", 2 * GB, 3, new ArrayList<ContainerId>());
@ -788,8 +801,10 @@ public class TestApplicationPriority {
int appsPendingExpected, int activeAppsExpected, RMApp app)
throws YarnException {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
UserGroupInformation ugi = UserGroupInformation
.createRemoteUser(app.getUser());
cs.updateApplicationPriority(Priority.newInstance(2),
app.getApplicationId(), null);
app.getApplicationId(), null, ugi);
SchedulerEvent removeAttempt;
removeAttempt = new AppAttemptRemovedSchedulerEvent(
app.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.KILLED,

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.List;
import org.apache.hadoop.yarn.api.records.Priority;
import org.junit.Assert;
import org.junit.Test;
public class TestApplicationPriorityACLConfiguration {
private final int defaultPriorityQueueA = 3;
private final int defaultPriorityQueueB = -1;
private final int maxPriorityQueueA = 5;
private final int maxPriorityQueueB = 10;
private final int clusterMaxPriority = 10;
private static final String QUEUE_A_USER = "queueA_user";
private static final String QUEUE_B_USER = "queueB_user";
private static final String QUEUE_A_GROUP = "queueA_group";
private static final String QUEUEA = "queueA";
private static final String QUEUEB = "queueB";
private static final String QUEUEC = "queueC";
@Test
public void testSimpleACLConfiguration() throws Exception {
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[]{QUEUEA, QUEUEB, QUEUEC});
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 50f);
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 25f);
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 25f);
// Success case: Configure one user/group level priority acl for queue A.
String[] aclsForA = new String[2];
aclsForA[0] = QUEUE_A_USER;
aclsForA[1] = QUEUE_A_GROUP;
csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA,
Priority.newInstance(maxPriorityQueueA),
Priority.newInstance(defaultPriorityQueueA), aclsForA);
// Try to get the ACL configs and make sure there are errors/exceptions
List<AppPriorityACLGroup> pGroupA = csConf.getPriorityAcls(
CapacitySchedulerConfiguration.ROOT + "." + QUEUEA,
Priority.newInstance(clusterMaxPriority));
// Validate!
verifyACLs(pGroupA, QUEUE_A_USER, QUEUE_A_GROUP, maxPriorityQueueA,
defaultPriorityQueueA);
}
@Test
public void testACLConfigurationForInvalidCases() throws Exception {
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[]{QUEUEA, QUEUEB, QUEUEC});
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 50f);
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 25f);
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 25f);
// Success case: Configure one user/group level priority acl for queue A.
String[] aclsForA = new String[2];
aclsForA[0] = QUEUE_A_USER;
aclsForA[1] = QUEUE_A_GROUP;
csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA,
Priority.newInstance(maxPriorityQueueA),
Priority.newInstance(defaultPriorityQueueA), aclsForA);
String[] aclsForB = new String[1];
aclsForB[0] = QUEUE_B_USER;
csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB,
Priority.newInstance(maxPriorityQueueB),
Priority.newInstance(defaultPriorityQueueB), aclsForB);
// Try to get the ACL configs and make sure there are errors/exceptions
List<AppPriorityACLGroup> pGroupA = csConf.getPriorityAcls(
CapacitySchedulerConfiguration.ROOT + "." + QUEUEA,
Priority.newInstance(clusterMaxPriority));
List<AppPriorityACLGroup> pGroupB = csConf.getPriorityAcls(
CapacitySchedulerConfiguration.ROOT + "." + QUEUEB,
Priority.newInstance(clusterMaxPriority));
// Validate stored ACL values with configured ones.
verifyACLs(pGroupA, QUEUE_A_USER, QUEUE_A_GROUP, maxPriorityQueueA,
defaultPriorityQueueA);
verifyACLs(pGroupB, QUEUE_B_USER, "", maxPriorityQueueB, 0);
}
private void verifyACLs(List<AppPriorityACLGroup> pGroup, String queueUser,
String queueGroup, int maxPriority, int defaultPriority) {
AppPriorityACLGroup group = pGroup.get(0);
String aclString = queueUser + " " + queueGroup;
Assert.assertEquals(aclString.trim(),
group.getACLList().getAclString().trim());
Assert.assertEquals(maxPriority, group.getMaxPriority().getPriority());
Assert.assertEquals(defaultPriority,
group.getDefaultPriority().getPriority());
}
}

View File

@ -0,0 +1,206 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ACLsTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Test;
public class TestApplicationPriorityACLs extends ACLsTestBase {
private final int defaultPriorityQueueA = 3;
private final int defaultPriorityQueueB = 10;
private final int maxPriorityQueueA = 5;
private final int maxPriorityQueueB = 11;
private final int clusterMaxPriority = 10;
@Test
public void testApplicationACLs() throws Exception {
/*
* Cluster Max-priority is 10. User 'queueA_user' has permission to submit
* apps only at priority 5. Default priority for this user is 3.
*/
// Case 1: App will be submitted with priority 5.
verifyAppSubmitWithPrioritySuccess(QUEUE_A_USER, QUEUEA, 5);
// Case 2: App will be rejected as submitted priority was 6.
verifyAppSubmitWithPriorityFailure(QUEUE_A_USER, QUEUEA, 6);
// Case 3: App will be submitted w/o priority, hence consider default 3.
verifyAppSubmitWithPrioritySuccess(QUEUE_A_USER, QUEUEA, -1);
// Case 4: App will be submitted with priority 11.
verifyAppSubmitWithPrioritySuccess(QUEUE_B_USER, QUEUEB, 11);
}
private void verifyAppSubmitWithPrioritySuccess(String submitter,
String queueName, int priority) throws Exception {
Priority appPriority = null;
if (priority > 0) {
appPriority = Priority.newInstance(priority);
} else {
// RM will consider default priority for the submitted user. So update
// priority to the default value to compare.
priority = defaultPriorityQueueA;
}
ApplicationSubmissionContext submissionContext = prepareForAppSubmission(
submitter, queueName, appPriority);
submitAppToRMWithValidAcl(submitter, submissionContext);
// Ideally get app report here and check the priority.
verifyAppPriorityIsAccepted(submitter, submissionContext.getApplicationId(),
priority);
}
private void verifyAppSubmitWithPriorityFailure(String submitter,
String queueName, int priority) throws Exception {
Priority appPriority = Priority.newInstance(priority);
ApplicationSubmissionContext submissionContext = prepareForAppSubmission(
submitter, queueName, appPriority);
submitAppToRMWithInValidAcl(submitter, submissionContext);
}
private ApplicationSubmissionContext prepareForAppSubmission(String submitter,
String queueName, Priority priority) throws Exception {
GetNewApplicationRequest newAppRequest = GetNewApplicationRequest
.newInstance();
ApplicationClientProtocol submitterClient = getRMClientForUser(submitter);
ApplicationId applicationId = submitterClient
.getNewApplication(newAppRequest).getApplicationId();
Resource resource = BuilderUtils.newResource(1024, 1);
ContainerLaunchContext amContainerSpec = ContainerLaunchContext
.newInstance(null, null, null, null, null, null);
ApplicationSubmissionContext appSubmissionContext = ApplicationSubmissionContext
.newInstance(applicationId, "applicationName", queueName, null,
amContainerSpec, false, true, 1, resource, "applicationType");
appSubmissionContext.setApplicationId(applicationId);
appSubmissionContext.setQueue(queueName);
if (null != priority) {
appSubmissionContext.setPriority(priority);
}
return appSubmissionContext;
}
private void submitAppToRMWithValidAcl(String submitter,
ApplicationSubmissionContext appSubmissionContext)
throws YarnException, IOException, InterruptedException {
ApplicationClientProtocol submitterClient = getRMClientForUser(submitter);
SubmitApplicationRequest submitRequest = SubmitApplicationRequest
.newInstance(appSubmissionContext);
submitterClient.submitApplication(submitRequest);
resourceManager.waitForState(appSubmissionContext.getApplicationId(),
RMAppState.ACCEPTED);
}
private void submitAppToRMWithInValidAcl(String submitter,
ApplicationSubmissionContext appSubmissionContext)
throws YarnException, IOException, InterruptedException {
ApplicationClientProtocol submitterClient = getRMClientForUser(submitter);
SubmitApplicationRequest submitRequest = SubmitApplicationRequest
.newInstance(appSubmissionContext);
try {
submitterClient.submitApplication(submitRequest);
} catch (YarnException ex) {
Assert.assertTrue(ex.getCause() instanceof RemoteException);
}
}
private void verifyAppPriorityIsAccepted(String submitter,
ApplicationId applicationId, int priority)
throws IOException, InterruptedException {
ApplicationClientProtocol submitterClient = getRMClientForUser(submitter);
/**
* If priority is greater than cluster max, RM will auto set to cluster max
* Consider this scenario as a special case.
*/
if (priority > clusterMaxPriority) {
priority = clusterMaxPriority;
}
GetApplicationReportRequest request = GetApplicationReportRequest
.newInstance(applicationId);
try {
GetApplicationReportResponse response = submitterClient
.getApplicationReport(request);
Assert.assertEquals(response.getApplicationReport().getPriority(),
Priority.newInstance(priority));
} catch (YarnException e) {
Assert.fail("Application submission should not fail.");
}
}
@Override
protected Configuration createConfiguration() {
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[]{QUEUEA, QUEUEB, QUEUEC});
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 50f);
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 25f);
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 25f);
String[] aclsForA = new String[2];
aclsForA[0] = QUEUE_A_USER;
aclsForA[1] = QUEUE_A_GROUP;
csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA,
Priority.newInstance(maxPriorityQueueA),
Priority.newInstance(defaultPriorityQueueA), aclsForA);
String[] aclsForB = new String[2];
aclsForB[0] = QUEUE_B_USER;
aclsForB[1] = QUEUE_B_GROUP;
csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB,
Priority.newInstance(maxPriorityQueueB),
Priority.newInstance(defaultPriorityQueueB), aclsForB);
csConf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
csConf.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getName());
return csConf;
}
}

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerC
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -856,7 +857,10 @@ public class TestParentQueue {
TestUtils.spyHook);
YarnAuthorizationProvider authorizer =
YarnAuthorizationProvider.getInstance(conf);
CapacitySchedulerQueueManager.setQueueAcls(authorizer, queues);
AppPriorityACLsManager appPriorityACLManager = new AppPriorityACLsManager(
conf);
CapacitySchedulerQueueManager.setQueueAcls(authorizer,
appPriorityACLManager, queues);
UserGroupInformation user = UserGroupInformation.getCurrentUser();
// Setup queue configs