diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 7a8445eb618..5d7e4711135 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -965,7 +965,7 @@ public class ResourceSchedulerWrapper @Override public Priority checkAndGetApplicationPriority(Priority priority, - String user, String queueName, ApplicationId applicationId) + UserGroupInformation user, String queueName, ApplicationId applicationId) throws YarnException { // TODO Dummy implementation. return Priority.newInstance(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java index 32459b9688b..fb4484bc638 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java @@ -30,4 +30,6 @@ public enum AccessType { // queue SUBMIT_APP, ADMINISTER_QUEUE, + // application + APPLICATION_MAX_PRIORITY, } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml index 6ac726ee6c5..47db01fbc1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml @@ -97,6 +97,15 @@ + + yarn.scheduler.capacity.root.default.acl_application_max_priority + * + + The ACL of who can submit applications with configured priority. + For e.g, [user={name} group={name} max_priority={priority} default_priority={priority}] + + + yarn.scheduler.capacity.node-locality-delay 40 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 913390a987f..ec71e38ccec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -1597,7 +1597,8 @@ public class ClientRMService extends AbstractService implements } try { - rmAppManager.updateApplicationPriority(applicationId, newAppPriority); + rmAppManager.updateApplicationPriority(callerUGI, applicationId, + newAppPriority); } catch (YarnException ex) { RMAuditLogger.logFailure(callerUGI.getShortUserName(), AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService", diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index a8084451cbe..f0f923b1f9a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -351,19 +351,19 @@ public class RMAppManager implements EventHandler, RMServerUtils.validateApplicationTimeouts( submissionContext.getApplicationTimeouts()); } - + ApplicationId applicationId = submissionContext.getApplicationId(); ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext, isRecovery); // Verify and get the update application priority and set back to // submissionContext + UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user); Priority appPriority = scheduler.checkAndGetApplicationPriority( - submissionContext.getPriority(), user, submissionContext.getQueue(), + submissionContext.getPriority(), userUgi, submissionContext.getQueue(), applicationId); submissionContext.setPriority(appPriority); - UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user); // Since FairScheduler queue mapping is done inside scheduler, // if FairScheduler is used and the queue doesn't exist, we should not // fail here because queue will be created inside FS. Ideally, FS queue @@ -561,12 +561,14 @@ public class RMAppManager implements EventHandler, /** * updateApplicationPriority will invoke scheduler api to update the * new priority to RM and StateStore. + * @param callerUGI user * @param applicationId Application Id * @param newAppPriority proposed new application priority * @throws YarnException Handle exceptions */ - public void updateApplicationPriority(ApplicationId applicationId, - Priority newAppPriority) throws YarnException { + public void updateApplicationPriority(UserGroupInformation callerUGI, + ApplicationId applicationId, Priority newAppPriority) + throws YarnException { RMApp app = this.rmContext.getRMApps().get(applicationId); synchronized (applicationId) { @@ -579,8 +581,8 @@ public class RMAppManager implements EventHandler, // Invoke scheduler api to update priority in scheduler and to // State Store. - Priority appPriority = rmContext.getScheduler() - .updateApplicationPriority(newAppPriority, applicationId, future); + Priority appPriority = rmContext.getScheduler().updateApplicationPriority( + newAppPriority, applicationId, future, callerUGI); if (app.getApplicationPriority().equals(appPriority)) { return; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index c1a985df896..a68989321a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.AbstractResourceRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -780,9 +781,9 @@ public abstract class AbstractYarnScheduler } @Override - public Priority checkAndGetApplicationPriority(Priority priorityFromContext, - String user, String queueName, ApplicationId applicationId) - throws YarnException { + public Priority checkAndGetApplicationPriority( + Priority priorityRequestedByApp, UserGroupInformation user, + String queueName, ApplicationId applicationId) throws YarnException { // Dummy Implementation till Application Priority changes are done in // specific scheduler. return Priority.newInstance(0); @@ -790,7 +791,8 @@ public abstract class AbstractYarnScheduler @Override public Priority updateApplicationPriority(Priority newPriority, - ApplicationId applicationId, SettableFuture future) + ApplicationId applicationId, SettableFuture future, + UserGroupInformation user) throws YarnException { // Dummy Implementation till Application Priority changes are done in // specific scheduler. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index ea1ae600073..608f275133b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -311,7 +311,7 @@ public interface YarnScheduler extends EventHandler { * Verify whether a submitted application priority is valid as per configured * Queue * - * @param priorityFromContext + * @param priorityRequestedByApp * Submitted Application priority. * @param user * User who submitted the Application @@ -321,8 +321,8 @@ public interface YarnScheduler extends EventHandler { * Application ID * @return Updated Priority from scheduler */ - public Priority checkAndGetApplicationPriority(Priority priorityFromContext, - String user, String queueName, ApplicationId applicationId) + public Priority checkAndGetApplicationPriority(Priority priorityRequestedByApp, + UserGroupInformation user, String queueName, ApplicationId applicationId) throws YarnException; /** @@ -334,12 +334,13 @@ public interface YarnScheduler extends EventHandler { * @param applicationId Application ID * * @param future Sets any type of exception happened from StateStore + * @param user who submitted the application * * @return updated priority */ public Priority updateApplicationPriority(Priority newPriority, - ApplicationId applicationId, SettableFuture future) - throws YarnException; + ApplicationId applicationId, SettableFuture future, + UserGroupInformation user) throws YarnException; /** * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLConfigurationParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLConfigurationParser.java new file mode 100644 index 00000000000..4489a01b69d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLConfigurationParser.java @@ -0,0 +1,219 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.Priority; + +/** + * + * PriorityACLConfiguration class is used to parse Application Priority ACL + * configuration from capcity-scheduler.xml + */ +public class AppPriorityACLConfigurationParser { + + private static final Log LOG = LogFactory + .getLog(AppPriorityACLConfigurationParser.class); + + public enum AppPriorityACLKeyType { + USER(1), GROUP(2), MAX_PRIORITY(3), DEFAULT_PRIORITY(4); + + private final int id; + + AppPriorityACLKeyType(int id) { + this.id = id; + } + + public int getId() { + return this.id; + } + } + + public static final String PATTERN_FOR_PRIORITY_ACL = "\\[([^\\]]+)"; + + @Private + public static final String ALL_ACL = "*"; + + @Private + public static final String NONE_ACL = " "; + + public List getPriorityAcl(Priority clusterMaxPriority, + String aclString) { + + List aclList = new ArrayList(); + Matcher matcher = Pattern.compile(PATTERN_FOR_PRIORITY_ACL) + .matcher(aclString); + + /* + * Each ACL group will be separated by "[]". Syntax of each ACL group could + * be like below "user=b1,b2 group=g1 max-priority=a2 default-priority=a1" + * Ideally this means "for this given user/group, maximum possible priority + * is a2 and if the user has not specified any priority, then it is a1." + */ + while (matcher.find()) { + // Get the first ACL sub-group. + String aclSubGroup = matcher.group(1); + if (aclSubGroup.trim().isEmpty()) { + continue; + } + + /* + * Internal storage is PriorityACLGroup which stores each parsed priority + * ACLs group. This will help while looking for a user to priority mapping + * during app submission time. ACLs will be passed in below order only. 1. + * user/group 2. max-priority 3. default-priority + */ + AppPriorityACLGroup userPriorityACL = new AppPriorityACLGroup(); + + // userAndGroupName will hold user acl and group acl as interim storage + // since both user/group acl comes with separate key value pairs. + List userAndGroupName = new ArrayList<>(); + + for (String kvPair : aclSubGroup.trim().split(" +")) { + /* + * There are 3 possible options for key here: 1. user/group 2. + * max-priority 3. default-priority + */ + String[] splits = kvPair.split("="); + + // Ensure that each ACL sub string is key value pair separated by '='. + if (splits != null && splits.length > 1) { + parsePriorityACLType(userPriorityACL, splits, userAndGroupName); + } + } + + // If max_priority is higher to clusterMaxPriority, its better to + // handle here. + if (userPriorityACL.getMaxPriority().getPriority() > clusterMaxPriority + .getPriority()) { + LOG.warn("ACL configuration for '" + userPriorityACL.getMaxPriority() + + "' is greater that cluster max priority. Resetting ACLs to " + + clusterMaxPriority); + userPriorityACL.setMaxPriority( + Priority.newInstance(clusterMaxPriority.getPriority())); + } + + AccessControlList acl = createACLStringForPriority(userAndGroupName); + userPriorityACL.setACLList(acl); + aclList.add(userPriorityACL); + } + + return aclList; + } + + /* + * Parse different types of ACLs sub parts for on priority group and store in + * a map for later processing. + */ + private void parsePriorityACLType(AppPriorityACLGroup userPriorityACL, + String[] splits, List userAndGroupName) { + // Here splits will have the key value pair at index 0 and 1 respectively. + // To parse all keys, its better to convert to PriorityACLConfig enum. + AppPriorityACLKeyType aclType = AppPriorityACLKeyType + .valueOf(StringUtils.toUpperCase(splits[0].trim())); + switch (aclType) { + case MAX_PRIORITY : + userPriorityACL + .setMaxPriority(Priority.newInstance(Integer.parseInt(splits[1]))); + break; + case USER : + userAndGroupName.add(getUserOrGroupACLStringFromConfig(splits[1])); + break; + case GROUP : + userAndGroupName.add(getUserOrGroupACLStringFromConfig(splits[1])); + break; + case DEFAULT_PRIORITY : + int defaultPriority = Integer.parseInt(splits[1]); + Priority priority = (defaultPriority < 0) + ? Priority.newInstance(0) + : Priority.newInstance(defaultPriority); + userPriorityACL.setDefaultPriority(priority); + break; + default: + break; + } + } + + /* + * This method will help to append different types of ACLs keys against one + * priority. For eg,USER will be appended with GROUP as "user2,user4 group1". + */ + private AccessControlList createACLStringForPriority( + List acls) { + + String finalACL = ""; + String userACL = acls.get(0).toString(); + + // If any of user/group is *, consider it as acceptable for all. + // "user" is at index 0, and "group" is at index 1. + if (userACL.trim().equals(ALL_ACL)) { + finalACL = ALL_ACL; + } else if (userACL.equals(NONE_ACL)) { + finalACL = NONE_ACL; + } else { + + // Get USER segment + if (!userACL.trim().isEmpty()) { + // skip last appended "," + finalACL = acls.get(0).toString(); + } + + // Get GROUP segment if any + if (acls.size() > 1) { + String groupACL = acls.get(1).toString(); + if (!groupACL.trim().isEmpty()) { + finalACL = finalACL + " " + + acls.get(1).toString(); + } + } + } + + // Here ACL will look like "user1,user2 group" in ideal cases. + return new AccessControlList(finalACL.trim()); + } + + /* + * This method will help to append user/group acl string against given + * priority. For example "user1,user2 group1,group2" + */ + private StringBuilder getUserOrGroupACLStringFromConfig(String value) { + + // ACL strings could be generate for USER or GRUOP. + // aclList in map contains two entries. 1. USER, 2. GROUP. + StringBuilder aclTypeName = new StringBuilder(); + + if (value.trim().equals(ALL_ACL)) { + aclTypeName.setLength(0); + aclTypeName.append(ALL_ACL); + return aclTypeName; + } + + aclTypeName.append(value.trim()); + return aclTypeName; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLGroup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLGroup.java new file mode 100644 index 00000000000..cb5ebcb7d58 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AppPriorityACLGroup.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.Priority; + +/** + * PriorityACLGroup will hold all ACL related information per priority. + * + */ +public class AppPriorityACLGroup implements Comparable { + + private Priority maxPriority = null; + private Priority defaultPriority = null; + private AccessControlList aclList = null; + + public AppPriorityACLGroup(Priority maxPriority, Priority defaultPriority, + AccessControlList aclList) { + this.setMaxPriority(Priority.newInstance(maxPriority.getPriority())); + this.setDefaultPriority( + Priority.newInstance(defaultPriority.getPriority())); + this.setACLList(aclList); + } + + public AppPriorityACLGroup() { + } + + @Override + public int compareTo(AppPriorityACLGroup o) { + return getMaxPriority().compareTo(o.getMaxPriority()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + AppPriorityACLGroup other = (AppPriorityACLGroup) obj; + if (getMaxPriority() != other.getMaxPriority()) { + return false; + } + + if (getDefaultPriority() != other.getDefaultPriority()) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + final int prime = 517861; + int result = 9511; + result = prime * result + getMaxPriority().getPriority(); + result = prime * result + getDefaultPriority().getPriority(); + return result; + } + + public Priority getMaxPriority() { + return maxPriority; + } + + public Priority getDefaultPriority() { + return defaultPriority; + } + + public AccessControlList getACLList() { + return aclList; + } + + public void setMaxPriority(Priority maxPriority) { + this.maxPriority = maxPriority; + } + + public void setDefaultPriority(Priority defaultPriority) { + this.defaultPriority = defaultPriority; + } + + public void setACLList(AccessControlList accessControlList) { + this.aclList = accessControlList; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 99a7fcf4456..7497118f371 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -119,6 +119,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -224,6 +225,7 @@ public class CapacityScheduler extends private List asyncSchedulerThreads; private ResourceCommitterService resourceCommitterService; private RMNodeLabelsManager labelManager; + private AppPriorityACLsManager appPriorityACLManager; /** * EXPERT @@ -305,8 +307,9 @@ public class CapacityScheduler extends this.usePortForNodeName = this.conf.getUsePortForNodeName(); this.applications = new ConcurrentHashMap<>(); this.labelManager = rmContext.getNodeLabelManager(); + this.appPriorityACLManager = new AppPriorityACLsManager(conf); this.queueManager = new CapacitySchedulerQueueManager(yarnConf, - this.labelManager); + this.labelManager, this.appPriorityACLManager); this.queueManager.setCapacitySchedulerContext(this); this.activitiesManager = new ActivitiesManager(rmContext); @@ -2180,86 +2183,110 @@ public class CapacityScheduler extends } @Override - public Priority checkAndGetApplicationPriority(Priority priorityFromContext, - String user, String queueName, ApplicationId applicationId) - throws YarnException { - Priority appPriority = null; + public Priority checkAndGetApplicationPriority( + Priority priorityRequestedByApp, UserGroupInformation user, + String queueName, ApplicationId applicationId) throws YarnException { + try { + readLock.lock(); + Priority appPriority = priorityRequestedByApp; - // ToDo: Verify against priority ACLs + // Verify the scenario where priority is null from submissionContext. + if (null == appPriority) { + // Verify whether submitted user has any default priority set. If so, + // user's default priority will get precedence over queue default. + // for updateApplicationPriority call flow, this check is done in + // CientRMService itself. + appPriority = this.appPriorityACLManager.getDefaultPriority(queueName, + user); - // Verify the scenario where priority is null from submissionContext. - if (null == priorityFromContext) { - // Get the default priority for the Queue. If Queue is non-existent, then - // use default priority - priorityFromContext = this.queueManager.getDefaultPriorityForQueue( - queueName); + // Get the default priority for the Queue. If Queue is non-existent, + // then + // use default priority. Do it only if user doesnt have any default. + if (null == appPriority) { + appPriority = this.queueManager.getDefaultPriorityForQueue(queueName); + } - LOG.info("Application '" + applicationId - + "' is submitted without priority " - + "hence considering default queue/cluster priority: " - + priorityFromContext.getPriority()); + LOG.info( + "Application '" + applicationId + "' is submitted without priority " + + "hence considering default queue/cluster priority: " + + appPriority.getPriority()); + } + + // Verify whether submitted priority is lesser than max priority + // in the cluster. If it is out of found, defining a max cap. + if (appPriority.getPriority() > getMaxClusterLevelAppPriority() + .getPriority()) { + appPriority = Priority + .newInstance(getMaxClusterLevelAppPriority().getPriority()); + } + + // Lets check for ACLs here. + if (!appPriorityACLManager.checkAccess(user, queueName, appPriority)) { + throw new YarnException(new AccessControlException( + "User " + user + " does not have permission to submit/update " + + applicationId + " for " + appPriority)); + } + + LOG.info("Priority '" + appPriority.getPriority() + + "' is acceptable in queue : " + queueName + " for application: " + + applicationId); + + return appPriority; + } finally { + readLock.unlock(); } - - // Verify whether submitted priority is lesser than max priority - // in the cluster. If it is out of found, defining a max cap. - if (priorityFromContext.compareTo(getMaxClusterLevelAppPriority()) < 0) { - priorityFromContext = Priority - .newInstance(getMaxClusterLevelAppPriority().getPriority()); - } - - appPriority = priorityFromContext; - - LOG.info("Priority '" + appPriority.getPriority() - + "' is acceptable in queue : " + queueName + " for application: " - + applicationId + " for the user: " + user); - - return appPriority; } @Override public Priority updateApplicationPriority(Priority newPriority, - ApplicationId applicationId, SettableFuture future) + ApplicationId applicationId, SettableFuture future, + UserGroupInformation user) throws YarnException { - Priority appPriority = null; - SchedulerApplication application = applications - .get(applicationId); + try { + writeLock.lock(); + Priority appPriority = null; + SchedulerApplication application = applications + .get(applicationId); - if (application == null) { - throw new YarnException("Application '" + applicationId - + "' is not present, hence could not change priority."); - } + if (application == null) { + throw new YarnException("Application '" + applicationId + + "' is not present, hence could not change priority."); + } - RMApp rmApp = rmContext.getRMApps().get(applicationId); + RMApp rmApp = rmContext.getRMApps().get(applicationId); - appPriority = checkAndGetApplicationPriority(newPriority, rmApp.getUser(), - rmApp.getQueue(), applicationId); + appPriority = checkAndGetApplicationPriority(newPriority, user, + rmApp.getQueue(), applicationId); - if (application.getPriority().equals(appPriority)) { - future.set(null); + if (application.getPriority().equals(appPriority)) { + future.set(null); + return appPriority; + } + + // Update new priority in Submission Context to update to StateStore. + rmApp.getApplicationSubmissionContext().setPriority(appPriority); + + // Update to state store + ApplicationStateData appState = ApplicationStateData.newInstance( + rmApp.getSubmitTime(), rmApp.getStartTime(), + rmApp.getApplicationSubmissionContext(), rmApp.getUser(), + rmApp.getCallerContext()); + appState.setApplicationTimeouts(rmApp.getApplicationTimeouts()); + rmContext.getStateStore().updateApplicationStateSynchronously(appState, + false, future); + + // As we use iterator over a TreeSet for OrderingPolicy, once we change + // priority then reinsert back to make order correct. + LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue()); + queue.updateApplicationPriority(application, appPriority); + + LOG.info("Priority '" + appPriority + "' is updated in queue :" + + rmApp.getQueue() + " for application: " + applicationId + + " for the user: " + rmApp.getUser()); return appPriority; + } finally { + writeLock.unlock(); } - - // Update new priority in Submission Context to update to StateStore. - rmApp.getApplicationSubmissionContext().setPriority(appPriority); - - // Update to state store - ApplicationStateData appState = ApplicationStateData.newInstance( - rmApp.getSubmitTime(), rmApp.getStartTime(), - rmApp.getApplicationSubmissionContext(), rmApp.getUser(), - rmApp.getCallerContext()); - appState.setApplicationTimeouts(rmApp.getApplicationTimeouts()); - rmContext.getStateStore().updateApplicationStateSynchronously(appState, - false, future); - - // As we use iterator over a TreeSet for OrderingPolicy, once we change - // priority then reinsert back to make order correct. - LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue()); - queue.updateApplicationPriority(application, appPriority); - - LOG.info("Priority '" + appPriority + "' is updated in queue :" - + rmApp.getQueue() + " for application: " + applicationId - + " for the user: " + rmApp.getUser()); - return appPriority; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index bfaeba47753..eb148d207a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.ReservationACL; @@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; @@ -63,7 +65,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur private static final Log LOG = LogFactory.getLog(CapacitySchedulerConfiguration.class); - + private static final String CS_CONFIGURATION_FILE = "capacity-scheduler.xml"; @Private @@ -274,6 +276,8 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; + AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser(); + public CapacitySchedulerConfiguration() { this(new Configuration()); } @@ -602,6 +606,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur return "acl_" + StringUtils.toLowerCase(acl.toString()); } + private static String getAclKey(AccessType acl) { + return "acl_" + StringUtils.toLowerCase(acl.toString()); + } + @Override public Map getReservationAcls(String queue) { @@ -627,6 +635,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur set(queuePrefix + getAclKey(acl), aclString); } + private void setAcl(String queue, AccessType acl, String aclString) { + String queuePrefix = getQueuePrefix(queue); + set(queuePrefix + getAclKey(acl), aclString); + } + public Map getAcls(String queue) { Map acls = new HashMap(); @@ -650,6 +663,35 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur } } + @VisibleForTesting + public void setPriorityAcls(String queue, Priority priority, + Priority defaultPriority, String[] acls) { + StringBuilder aclString = new StringBuilder(); + + StringBuilder userAndGroup = new StringBuilder(); + for (int i = 0; i < acls.length; i++) { + userAndGroup.append(AppPriorityACLKeyType.values()[i] + "=" + acls[i].trim()) + .append(" "); + } + + aclString.append("[" + userAndGroup.toString().trim() + " " + + "max_priority=" + priority.getPriority() + " " + "default_priority=" + + defaultPriority.getPriority() + "]"); + + setAcl(queue, AccessType.APPLICATION_MAX_PRIORITY, aclString.toString()); + } + + public List getPriorityAcls(String queue, + Priority clusterMaxPriority) { + String queuePrefix = getQueuePrefix(queue); + String defaultAcl = ALL_ACL; + String aclString = get( + queuePrefix + getAclKey(AccessType.APPLICATION_MAX_PRIORITY), + defaultAcl); + + return priorityACLConfig.getPriorityAcl(clusterMaxPriority, aclString); + } + public String[] getQueues(String queue) { LOG.debug("CSConf - getQueues called for: queuePrefix=" + getQueuePrefix(queue)); String[] queues = getStrings(getQueuePrefix(queue) + QUEUES); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 7d29619d759..504acb96d37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -23,6 +23,7 @@ import java.util.Comparator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; @@ -85,4 +86,10 @@ public interface CapacitySchedulerContext { ActivitiesManager getActivitiesManager(); CapacitySchedulerQueueManager getCapacitySchedulerQueueManager(); + + /** + * + * @return Max Cluster level App priority. + */ + Priority getMaxClusterLevelAppPriority(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index 6a3c08a97a8..ddcbc0e8266 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; /** * @@ -86,6 +87,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< private final Map queues = new ConcurrentHashMap<>(); private CSQueue root; private final RMNodeLabelsManager labelManager; + private AppPriorityACLsManager appPriorityACLManager; private QueueStateManager queueStateManager; @@ -94,12 +96,15 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< * Construct the service. * @param conf the configuration * @param labelManager the labelManager + * @param appPriorityACLManager App priority ACL manager */ public CapacitySchedulerQueueManager(Configuration conf, - RMNodeLabelsManager labelManager) { + RMNodeLabelsManager labelManager, + AppPriorityACLsManager appPriorityACLManager) { this.authorizer = YarnAuthorizationProvider.getInstance(conf); this.labelManager = labelManager; this.queueStateManager = new QueueStateManager<>(); + this.appPriorityACLManager = appPriorityACLManager; } @Override @@ -145,7 +150,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< throws IOException { root = parseQueue(this.csContext, conf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP); - setQueueAcls(authorizer, queues); + setQueueAcls(authorizer, appPriorityACLManager, queues); labelManager.reinitializeQueueLabels(getQueueToLabels()); this.queueStateManager.initialize(this); LOG.info("Initialized root queue " + root); @@ -168,7 +173,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< // Re-configure queues root.reinitialize(newRoot, this.csContext.getClusterResource()); - setQueueAcls(authorizer, queues); + setQueueAcls(authorizer, appPriorityACLManager, queues); // Re-calculate headroom for active applications Resource clusterResource = this.csContext.getClusterResource(); @@ -305,12 +310,22 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< * @throws IOException if fails to set queue acls */ public static void setQueueAcls(YarnAuthorizationProvider authorizer, - Map queues) throws IOException { + AppPriorityACLsManager appPriorityACLManager, Map queues) + throws IOException { List permissions = new ArrayList<>(); for (CSQueue queue : queues.values()) { AbstractCSQueue csQueue = (AbstractCSQueue) queue; permissions.add( new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs())); + + if (queue instanceof LeafQueue) { + LeafQueue lQueue = (LeafQueue) queue; + + // Clear Priority ACLs first since reinitialize also call same. + appPriorityACLManager.clearPriorityACLs(lQueue.getQueueName()); + appPriorityACLManager.addPrioirityACLs(lQueue.getPriorityACLs(), + lQueue.getQueueName()); + } } authorizer.setPermission(permissions, UserGroupInformation.getCurrentUser()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index ace0b75ddaf..647c8dba2ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -149,6 +149,9 @@ public class LeafQueue extends AbstractCSQueue { private Map> ignorePartitionExclusivityRMContainers = new ConcurrentHashMap<>(); + List priorityAcls = + new ArrayList(); + @SuppressWarnings({ "unchecked", "rawtypes" }) public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -214,6 +217,9 @@ public class LeafQueue extends AbstractCSQueue { conf.getMaximumApplicationMasterResourcePerQueuePercent( getQueuePath()); + priorityAcls = conf.getPriorityAcls(getQueuePath(), + scheduler.getMaxClusterLevelAppPriority()); + if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels, this.defaultLabelExpression, null)) { throw new IOException( @@ -506,6 +512,16 @@ public class LeafQueue extends AbstractCSQueue { } } + @Private + public List getPriorityACLs() { + try { + readLock.lock(); + return new ArrayList<>(priorityAcls); + } finally { + readLock.unlock(); + } + } + @Override public void reinitialize( CSQueue newlyParsedQueue, Resource clusterResource) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java new file mode 100644 index 00000000000..c1fd0a698bc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java @@ -0,0 +1,230 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.security; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLGroup; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * + * Manager class to store and check permission for Priority ACLs. + */ +public class AppPriorityACLsManager { + + private static final Log LOG = LogFactory + .getLog(AppPriorityACLsManager.class); + + /* + * An internal class to store ACLs specific to each priority. This will be + * used to read and process acl's during app submission time as well. + */ + private static class PriorityACL { + private Priority priority; + private Priority defaultPriority; + private AccessControlList acl; + + PriorityACL(Priority priority, Priority defaultPriority, + AccessControlList acl) { + this.setPriority(priority); + this.setDefaultPriority(defaultPriority); + this.setAcl(acl); + } + + public Priority getPriority() { + return priority; + } + + public void setPriority(Priority maxPriority) { + this.priority = maxPriority; + } + + public Priority getDefaultPriority() { + return defaultPriority; + } + + public void setDefaultPriority(Priority defaultPriority) { + this.defaultPriority = defaultPriority; + } + + public AccessControlList getAcl() { + return acl; + } + + public void setAcl(AccessControlList acl) { + this.acl = acl; + } + } + + private boolean isACLsEnable; + private final ConcurrentMap> allAcls = + new ConcurrentHashMap<>(); + + public AppPriorityACLsManager(Configuration conf) { + this.isACLsEnable = conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE, + YarnConfiguration.DEFAULT_YARN_ACL_ENABLE); + } + + /** + * Clear priority acl during refresh. + * + * @param queueName + * Queue Name + */ + public void clearPriorityACLs(String queueName) { + allAcls.remove(queueName); + } + + /** + * Each Queue could have configured with different priority acl's groups. This + * method helps to store each such ACL list against queue. + * + * @param priorityACLGroups + * List of Priority ACL Groups. + * @param queueName + * Queue Name associate with priority acl groups. + */ + public void addPrioirityACLs(List priorityACLGroups, + String queueName) { + + List priorityACL = allAcls.get(queueName); + if (null == priorityACL) { + priorityACL = new ArrayList(); + allAcls.put(queueName, priorityACL); + } + + // Ensure lowest priority PriorityACLGroup comes first in the list. + Collections.sort(priorityACLGroups); + + for (AppPriorityACLGroup priorityACLGroup : priorityACLGroups) { + priorityACL.add(new PriorityACL(priorityACLGroup.getMaxPriority(), + priorityACLGroup.getDefaultPriority(), + priorityACLGroup.getACLList())); + if (LOG.isDebugEnabled()) { + LOG.debug("Priority ACL group added: max-priority - " + + priorityACLGroup.getMaxPriority() + "default-priority - " + + priorityACLGroup.getDefaultPriority()); + } + } + } + + /** + * Priority based checkAccess to ensure that given user has enough permission + * to submit application at a given priority level. + * + * @param callerUGI + * User who submits the application. + * @param queueName + * Queue to which application is submitted. + * @param submittedPriority + * priority of the application. + * @return True or False to indicate whether application can be submitted at + * submitted priority level or not. + */ + public boolean checkAccess(UserGroupInformation callerUGI, String queueName, + Priority submittedPriority) { + if (!isACLsEnable) { + return true; + } + + List acls = allAcls.get(queueName); + if (acls == null || acls.isEmpty()) { + return true; + } + + PriorityACL approvedPriorityACL = getMappedPriorityAclForUGI(acls, + callerUGI, submittedPriority); + if (approvedPriorityACL == null) { + return false; + } + + return true; + } + + /** + * If an application is submitted without any priority, and submitted user has + * a default priority, this method helps to update this default priority as + * app's priority. + * + * @param queueName + * Submitted queue + * @param user + * User who submitted this application + * @return Default priority associated with given user. + */ + public Priority getDefaultPriority(String queueName, + UserGroupInformation user) { + if (!isACLsEnable) { + return null; + } + + List acls = allAcls.get(queueName); + if (acls == null || acls.isEmpty()) { + return null; + } + + PriorityACL approvedPriorityACL = getMappedPriorityAclForUGI(acls, user, + null); + if (approvedPriorityACL == null) { + return null; + } + + Priority defaultPriority = Priority + .newInstance(approvedPriorityACL.getDefaultPriority().getPriority()); + return defaultPriority; + } + + private PriorityACL getMappedPriorityAclForUGI(List acls , + UserGroupInformation user, Priority submittedPriority) { + + // Iterate through all configured ACLs starting from lower priority. + // If user is found corresponding to a configured priority, then store + // that entry. if failed, continue iterate through whole acl list. + PriorityACL selectedAcl = null; + for (PriorityACL entry : acls) { + AccessControlList list = entry.getAcl(); + + if (list.isUserAllowed(user)) { + selectedAcl = entry; + + // If submittedPriority is passed through the argument, also check + // whether submittedPriority is under max-priority of each ACL group. + if (submittedPriority != null) { + selectedAcl = null; + if (submittedPriority.getPriority() <= entry.getPriority() + .getPriority()) { + return entry; + } + } + } + } + return selectedAcl; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java index e661703c065..fbd5ac38dc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java @@ -43,6 +43,8 @@ public abstract class ACLsTestBase { protected static final String COMMON_USER = "common_user"; protected static final String QUEUE_A_USER = "queueA_user"; protected static final String QUEUE_B_USER = "queueB_user"; + protected static final String QUEUE_A_GROUP = "queueA_group"; + protected static final String QUEUE_B_GROUP = "queueB_group"; protected static final String ROOT_ADMIN = "root_admin"; protected static final String QUEUE_A_ADMIN = "queueA_admin"; protected static final String QUEUE_B_ADMIN = "queueB_admin"; @@ -53,7 +55,7 @@ public abstract class ACLsTestBase { protected static final Log LOG = LogFactory.getLog(TestApplicationACLs.class); - MockRM resourceManager; + protected MockRM resourceManager; Configuration conf; YarnRPC rpc; InetSocketAddress rmAddress; @@ -68,6 +70,7 @@ public abstract class ACLsTestBase { AccessControlList adminACL = new AccessControlList(""); conf.set(YarnConfiguration.YARN_ADMIN_ACL, adminACL.getAclString()); + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); resourceManager = new MockRM(conf) { protected ClientRMService createClientRMService() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 00466ae8c48..23bed228e19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -29,6 +29,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; @@ -463,7 +464,9 @@ public class TestApplicationMasterService { // Change the priority of App1 to 8 Priority appPriority2 = Priority.newInstance(8); - rm.getRMAppManager().updateApplicationPriority(app1.getApplicationId(), + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app1.getUser()); + rm.getRMAppManager().updateApplicationPriority(ugi, app1.getApplicationId(), appPriority2); AllocateResponse response2 = am1.allocate(allocateRequest); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index caf38190697..fc193e0e0d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -1108,7 +1108,7 @@ public class TestClientRMService { when(yarnScheduler.getResourceCalculator()).thenReturn(rs); when(yarnScheduler.checkAndGetApplicationPriority(any(Priority.class), - anyString(), anyString(), any(ApplicationId.class))) + any(UserGroupInformation.class), anyString(), any(ApplicationId.class))) .thenReturn(Priority.newInstance(0)); return yarnScheduler; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index 164ca20c1ed..ff52efd89be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -344,7 +345,10 @@ public class TestApplicationPriority { // Change the priority of App1 to 8 Priority appPriority2 = Priority.newInstance(8); - cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null); + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app1.getUser()); + cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null, + ugi); // get scheduler app FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() @@ -378,7 +382,10 @@ public class TestApplicationPriority { // Change the priority of App1 to 15 Priority appPriority2 = Priority.newInstance(15); - cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null); + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app1.getUser()); + cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null, + ugi); // get scheduler app FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() @@ -428,7 +435,10 @@ public class TestApplicationPriority { // Change the priority of App1 to 8 Priority appPriority2 = Priority.newInstance(8); - cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null); + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app1.getUser()); + cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null, + ugi); // let things settle down Thread.sleep(1000); @@ -557,7 +567,10 @@ public class TestApplicationPriority { // Change the priority of App1 to 3 (lowest) Priority appPriority3 = Priority.newInstance(3); - cs.updateApplicationPriority(appPriority3, app2.getApplicationId(), null); + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app2.getUser()); + cs.updateApplicationPriority(appPriority3, app2.getApplicationId(), null, + ugi); // add request for containers App2 am2.allocate("127.0.0.1", 2 * GB, 3, new ArrayList()); @@ -788,8 +801,10 @@ public class TestApplicationPriority { int appsPendingExpected, int activeAppsExpected, RMApp app) throws YarnException { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app.getUser()); cs.updateApplicationPriority(Priority.newInstance(2), - app.getApplicationId(), null); + app.getApplicationId(), null, ugi); SchedulerEvent removeAttempt; removeAttempt = new AppAttemptRemovedSchedulerEvent( app.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.KILLED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java new file mode 100644 index 00000000000..598bd49b0e7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java @@ -0,0 +1,120 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.List; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.junit.Assert; +import org.junit.Test; + + +public class TestApplicationPriorityACLConfiguration { + + private final int defaultPriorityQueueA = 3; + private final int defaultPriorityQueueB = -1; + private final int maxPriorityQueueA = 5; + private final int maxPriorityQueueB = 10; + private final int clusterMaxPriority = 10; + + private static final String QUEUE_A_USER = "queueA_user"; + private static final String QUEUE_B_USER = "queueB_user"; + private static final String QUEUE_A_GROUP = "queueA_group"; + + private static final String QUEUEA = "queueA"; + private static final String QUEUEB = "queueB"; + private static final String QUEUEC = "queueC"; + + @Test + public void testSimpleACLConfiguration() throws Exception { + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{QUEUEA, QUEUEB, QUEUEC}); + + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 50f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 25f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 25f); + + // Success case: Configure one user/group level priority acl for queue A. + String[] aclsForA = new String[2]; + aclsForA[0] = QUEUE_A_USER; + aclsForA[1] = QUEUE_A_GROUP; + csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, + Priority.newInstance(maxPriorityQueueA), + Priority.newInstance(defaultPriorityQueueA), aclsForA); + + // Try to get the ACL configs and make sure there are errors/exceptions + List pGroupA = csConf.getPriorityAcls( + CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, + Priority.newInstance(clusterMaxPriority)); + + // Validate! + verifyACLs(pGroupA, QUEUE_A_USER, QUEUE_A_GROUP, maxPriorityQueueA, + defaultPriorityQueueA); + } + + @Test + public void testACLConfigurationForInvalidCases() throws Exception { + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{QUEUEA, QUEUEB, QUEUEC}); + + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 50f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 25f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 25f); + + // Success case: Configure one user/group level priority acl for queue A. + String[] aclsForA = new String[2]; + aclsForA[0] = QUEUE_A_USER; + aclsForA[1] = QUEUE_A_GROUP; + csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, + Priority.newInstance(maxPriorityQueueA), + Priority.newInstance(defaultPriorityQueueA), aclsForA); + + String[] aclsForB = new String[1]; + aclsForB[0] = QUEUE_B_USER; + csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, + Priority.newInstance(maxPriorityQueueB), + Priority.newInstance(defaultPriorityQueueB), aclsForB); + + // Try to get the ACL configs and make sure there are errors/exceptions + List pGroupA = csConf.getPriorityAcls( + CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, + Priority.newInstance(clusterMaxPriority)); + List pGroupB = csConf.getPriorityAcls( + CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, + Priority.newInstance(clusterMaxPriority)); + + // Validate stored ACL values with configured ones. + verifyACLs(pGroupA, QUEUE_A_USER, QUEUE_A_GROUP, maxPriorityQueueA, + defaultPriorityQueueA); + verifyACLs(pGroupB, QUEUE_B_USER, "", maxPriorityQueueB, 0); + } + + private void verifyACLs(List pGroup, String queueUser, + String queueGroup, int maxPriority, int defaultPriority) { + AppPriorityACLGroup group = pGroup.get(0); + String aclString = queueUser + " " + queueGroup; + + Assert.assertEquals(aclString.trim(), + group.getACLList().getAclString().trim()); + Assert.assertEquals(maxPriority, group.getMaxPriority().getPriority()); + Assert.assertEquals(defaultPriority, + group.getDefaultPriority().getPriority()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java new file mode 100644 index 00000000000..b41ba836798 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java @@ -0,0 +1,206 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.ACLsTestBase; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; +import org.junit.Test; + + +public class TestApplicationPriorityACLs extends ACLsTestBase { + + private final int defaultPriorityQueueA = 3; + private final int defaultPriorityQueueB = 10; + private final int maxPriorityQueueA = 5; + private final int maxPriorityQueueB = 11; + private final int clusterMaxPriority = 10; + + @Test + public void testApplicationACLs() throws Exception { + + /* + * Cluster Max-priority is 10. User 'queueA_user' has permission to submit + * apps only at priority 5. Default priority for this user is 3. + */ + + // Case 1: App will be submitted with priority 5. + verifyAppSubmitWithPrioritySuccess(QUEUE_A_USER, QUEUEA, 5); + + // Case 2: App will be rejected as submitted priority was 6. + verifyAppSubmitWithPriorityFailure(QUEUE_A_USER, QUEUEA, 6); + + // Case 3: App will be submitted w/o priority, hence consider default 3. + verifyAppSubmitWithPrioritySuccess(QUEUE_A_USER, QUEUEA, -1); + + // Case 4: App will be submitted with priority 11. + verifyAppSubmitWithPrioritySuccess(QUEUE_B_USER, QUEUEB, 11); + } + + private void verifyAppSubmitWithPrioritySuccess(String submitter, + String queueName, int priority) throws Exception { + Priority appPriority = null; + if (priority > 0) { + appPriority = Priority.newInstance(priority); + } else { + // RM will consider default priority for the submitted user. So update + // priority to the default value to compare. + priority = defaultPriorityQueueA; + } + + ApplicationSubmissionContext submissionContext = prepareForAppSubmission( + submitter, queueName, appPriority); + submitAppToRMWithValidAcl(submitter, submissionContext); + + // Ideally get app report here and check the priority. + verifyAppPriorityIsAccepted(submitter, submissionContext.getApplicationId(), + priority); + } + + private void verifyAppSubmitWithPriorityFailure(String submitter, + String queueName, int priority) throws Exception { + Priority appPriority = Priority.newInstance(priority); + ApplicationSubmissionContext submissionContext = prepareForAppSubmission( + submitter, queueName, appPriority); + submitAppToRMWithInValidAcl(submitter, submissionContext); + } + + private ApplicationSubmissionContext prepareForAppSubmission(String submitter, + String queueName, Priority priority) throws Exception { + + GetNewApplicationRequest newAppRequest = GetNewApplicationRequest + .newInstance(); + + ApplicationClientProtocol submitterClient = getRMClientForUser(submitter); + ApplicationId applicationId = submitterClient + .getNewApplication(newAppRequest).getApplicationId(); + + Resource resource = BuilderUtils.newResource(1024, 1); + + ContainerLaunchContext amContainerSpec = ContainerLaunchContext + .newInstance(null, null, null, null, null, null); + ApplicationSubmissionContext appSubmissionContext = ApplicationSubmissionContext + .newInstance(applicationId, "applicationName", queueName, null, + amContainerSpec, false, true, 1, resource, "applicationType"); + appSubmissionContext.setApplicationId(applicationId); + appSubmissionContext.setQueue(queueName); + if (null != priority) { + appSubmissionContext.setPriority(priority); + } + + return appSubmissionContext; + } + + private void submitAppToRMWithValidAcl(String submitter, + ApplicationSubmissionContext appSubmissionContext) + throws YarnException, IOException, InterruptedException { + ApplicationClientProtocol submitterClient = getRMClientForUser(submitter); + SubmitApplicationRequest submitRequest = SubmitApplicationRequest + .newInstance(appSubmissionContext); + submitterClient.submitApplication(submitRequest); + resourceManager.waitForState(appSubmissionContext.getApplicationId(), + RMAppState.ACCEPTED); + } + + private void submitAppToRMWithInValidAcl(String submitter, + ApplicationSubmissionContext appSubmissionContext) + throws YarnException, IOException, InterruptedException { + ApplicationClientProtocol submitterClient = getRMClientForUser(submitter); + SubmitApplicationRequest submitRequest = SubmitApplicationRequest + .newInstance(appSubmissionContext); + try { + submitterClient.submitApplication(submitRequest); + } catch (YarnException ex) { + Assert.assertTrue(ex.getCause() instanceof RemoteException); + } + } + + private void verifyAppPriorityIsAccepted(String submitter, + ApplicationId applicationId, int priority) + throws IOException, InterruptedException { + ApplicationClientProtocol submitterClient = getRMClientForUser(submitter); + + /** + * If priority is greater than cluster max, RM will auto set to cluster max + * Consider this scenario as a special case. + */ + if (priority > clusterMaxPriority) { + priority = clusterMaxPriority; + } + + GetApplicationReportRequest request = GetApplicationReportRequest + .newInstance(applicationId); + try { + GetApplicationReportResponse response = submitterClient + .getApplicationReport(request); + Assert.assertEquals(response.getApplicationReport().getPriority(), + Priority.newInstance(priority)); + } catch (YarnException e) { + Assert.fail("Application submission should not fail."); + } + } + + @Override + protected Configuration createConfiguration() { + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{QUEUEA, QUEUEB, QUEUEC}); + + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 50f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 25f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 25f); + + String[] aclsForA = new String[2]; + aclsForA[0] = QUEUE_A_USER; + aclsForA[1] = QUEUE_A_GROUP; + csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, + Priority.newInstance(maxPriorityQueueA), + Priority.newInstance(defaultPriorityQueueA), aclsForA); + + String[] aclsForB = new String[2]; + aclsForB[0] = QUEUE_B_USER; + aclsForB[1] = QUEUE_B_GROUP; + csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, + Priority.newInstance(maxPriorityQueueB), + Priority.newInstance(defaultPriorityQueueB), aclsForB); + + csConf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); + csConf.set(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class.getName()); + + return csConf; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index a36db44d7ec..1348f515fad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerC import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -856,7 +857,10 @@ public class TestParentQueue { TestUtils.spyHook); YarnAuthorizationProvider authorizer = YarnAuthorizationProvider.getInstance(conf); - CapacitySchedulerQueueManager.setQueueAcls(authorizer, queues); + AppPriorityACLsManager appPriorityACLManager = new AppPriorityACLsManager( + conf); + CapacitySchedulerQueueManager.setQueueAcls(authorizer, + appPriorityACLManager, queues); UserGroupInformation user = UserGroupInformation.getCurrentUser(); // Setup queue configs