diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 766d4efea3e..cff5205861a 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -455,6 +455,9 @@ Release 2.8.0 - UNRELEASED YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend container allocation logic. (Wangda Tan via jianhe) + YARN-3635. Refactored current queue mapping implementation in CapacityScheduler + to use a generic PlacementManager framework. (Wangda Tan via jianhe) + BUG FIXES YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 1abb14eedad..c71323fcff5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; @@ -99,9 +100,10 @@ public class RMActiveServiceContext { private long schedulerRecoveryWaitTime = 0; private boolean printLog = true; private boolean isSchedulerReady = false; + private PlacementManager queuePlacementManager = null; public RMActiveServiceContext() { - + queuePlacementManager = new PlacementManager(); } @Private @@ -424,4 +426,16 @@ public class RMActiveServiceContext { public ConcurrentMap getSystemCredentialsForApps() { return systemCredentials; } + + @Private + @Unstable + public PlacementManager getQueuePlacementManager() { + return queuePlacementManager; + } + + @Private + @Unstable + public void setQueuePlacementManager(PlacementManager placementMgr) { + this.queuePlacementManager = placementMgr; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 6fd183875b4..703ec1eb1fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -326,6 +326,15 @@ public class RMAppManager implements EventHandler, private RMAppImpl createAndPopulateNewRMApp( ApplicationSubmissionContext submissionContext, long submitTime, String user, boolean isRecovery) throws YarnException { + // Do queue mapping + if (!isRecovery) { + if (rmContext.getQueuePlacementManager() != null) { + // We only do queue mapping when it's a new application + rmContext.getQueuePlacementManager().placeApplication( + submissionContext, user); + } + } + ApplicationId applicationId = submissionContext.getApplicationId(); ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext, isRecovery); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index bc50268276f..b64c83494e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -124,4 +125,8 @@ public interface RMContext { boolean isSchedulerReadyForAllocatingContainers(); Configuration getYarnConfiguration(); + + PlacementManager getQueuePlacementManager(); + + void setQueuePlacementManager(PlacementManager placementMgr); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index d6d573d965f..840cea7266f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -76,7 +77,6 @@ public class RMContextImpl implements RMContext { * individual fields. */ public RMContextImpl() { - } @VisibleForTesting @@ -438,4 +438,14 @@ public class RMContextImpl implements RMContext { public void setYarnConfiguration(Configuration yarnConfiguration) { this.yarnConfiguration=yarnConfiguration; } + + @Override + public PlacementManager getQueuePlacementManager() { + return this.activeServiceContext.getQueuePlacementManager(); + } + + @Override + public void setQueuePlacementManager(PlacementManager placementMgr) { + this.activeServiceContext.setQueuePlacementManager(placementMgr); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java new file mode 100644 index 00000000000..43a4deb70eb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.placement; + +import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import com.google.common.annotations.VisibleForTesting; + +public class PlacementManager { + private static final Log LOG = LogFactory.getLog(PlacementManager.class); + + List rules; + ReadLock readLock; + WriteLock writeLock; + + public PlacementManager() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + + public void updateRules(List rules) { + try { + writeLock.lock(); + this.rules = rules; + } finally { + writeLock.unlock(); + } + } + + public void placeApplication(ApplicationSubmissionContext asc, String user) + throws YarnException { + try { + readLock.lock(); + if (null == rules || rules.isEmpty()) { + return; + } + + String newQueueName = null; + for (PlacementRule rule : rules) { + newQueueName = rule.getQueueForApp(asc, user); + if (newQueueName != null) { + break; + } + } + + // Failed to get where to place application + if (null == newQueueName && null == asc.getQueue()) { + String msg = "Failed to get where to place application=" + + asc.getApplicationId(); + LOG.error(msg); + throw new YarnException(msg); + } + + // Set it to ApplicationSubmissionContext + if (!StringUtils.equals(asc.getQueue(), newQueueName)) { + LOG.info("Placed application=" + asc.getApplicationId() + " to queue=" + + newQueueName + ", original queue=" + asc.getQueue()); + asc.setQueue(newQueueName); + } + } finally { + readLock.unlock(); + } + } + + @VisibleForTesting + public List getPlacementRules() { + return rules; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java new file mode 100644 index 00000000000..47dc48a51c4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.placement; + +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; + +public abstract class PlacementRule { + public String getName() { + return this.getClass().getName(); + } + + public void initialize(Map parameters, RMContext rmContext) + throws YarnException { + } + + /** + * Get queue for a given application + * + * @param asc application submission context + * @param user userName + * + * @throws YarnException + * if any error happens + * + * @return

+ * non-null value means it is determined + *

+ *

+ * null value means it is undetermined, so next {@link PlacementRule} + * in the {@link PlacementManager} will take care + *

+ */ + public abstract String getQueueForApp(ApplicationSubmissionContext asc, + String user) throws YarnException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java new file mode 100644 index 00000000000..d617d161859 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.placement; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.security.Groups; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType; + +import com.google.common.annotations.VisibleForTesting; + +public class UserGroupMappingPlacementRule extends PlacementRule { + private static final Log LOG = LogFactory + .getLog(UserGroupMappingPlacementRule.class); + + public static final String CURRENT_USER_MAPPING = "%user"; + + public static final String PRIMARY_GROUP_MAPPING = "%primary_group"; + + private boolean overrideWithQueueMappings = false; + private List mappings = null; + private Groups groups; + + @Private + public static class QueueMapping { + + public enum MappingType { + + USER("u"), GROUP("g"); + private final String type; + + private MappingType(String type) { + this.type = type; + } + + public String toString() { + return type; + } + + }; + + MappingType type; + String source; + String queue; + + public QueueMapping(MappingType type, String source, String queue) { + this.type = type; + this.source = source; + this.queue = queue; + } + + public String getQueue() { + return queue; + } + + @Override + public int hashCode() { + return super.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof QueueMapping) { + QueueMapping other = (QueueMapping) obj; + return (other.type.equals(type) && + other.source.equals(source) && + other.queue.equals(queue)); + } else { + return false; + } + } + } + + public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings, + List newMappings, Groups groups) { + this.mappings = newMappings; + this.overrideWithQueueMappings = overrideWithQueueMappings; + this.groups = groups; + } + + private String getMappedQueue(String user) throws IOException { + for (QueueMapping mapping : mappings) { + if (mapping.type == MappingType.USER) { + if (mapping.source.equals(CURRENT_USER_MAPPING)) { + if (mapping.queue.equals(CURRENT_USER_MAPPING)) { + return user; + } else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { + return groups.getGroups(user).get(0); + } else { + return mapping.queue; + } + } + if (user.equals(mapping.source)) { + return mapping.queue; + } + } + if (mapping.type == MappingType.GROUP) { + for (String userGroups : groups.getGroups(user)) { + if (userGroups.equals(mapping.source)) { + return mapping.queue; + } + } + } + } + return null; + } + + @Override + public String getQueueForApp(ApplicationSubmissionContext asc, String user) + throws YarnException { + String queueName = asc.getQueue(); + ApplicationId applicationId = asc.getApplicationId(); + if (mappings != null && mappings.size() > 0) { + try { + String mappedQueue = getMappedQueue(user); + if (mappedQueue != null) { + // We have a mapping, should we use it? + if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) + || overrideWithQueueMappings) { + LOG.info("Application " + applicationId + " user " + user + + " mapping [" + queueName + "] to [" + mappedQueue + + "] override " + overrideWithQueueMappings); + return mappedQueue; + } + } + } catch (IOException ioex) { + String message = "Failed to submit application " + applicationId + + " submitted by user " + user + " reason: " + ioex.getMessage(); + throw new YarnException(message); + } + } + + return queueName; + } + + @VisibleForTesting + public List getQueueMappings() { + return mappings; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index dbaccaf3d07..ad5c76c9814 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -69,6 +69,9 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule; +import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; +import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; @@ -98,8 +101,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -228,16 +229,6 @@ public class CapacityScheduler extends CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".scheduling-interval-ms"; private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5; - - private boolean overrideWithQueueMappings = false; - private List mappings = null; - private Groups groups; - - @VisibleForTesting - public synchronized String getMappedQueueForTest(String user) - throws IOException { - return getMappedQueue(user); - } public CapacityScheduler() { super(CapacityScheduler.class.getName()); @@ -447,29 +438,52 @@ public class CapacityScheduler extends } private static final QueueHook noop = new QueueHook(); - private void initializeQueueMappings() throws IOException { - overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); + @VisibleForTesting + public synchronized UserGroupMappingPlacementRule + getUserGroupMappingPlacementRule() throws IOException { + boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); LOG.info("Initialized queue mappings, override: " + overrideWithQueueMappings); + // Get new user/group mappings - List newMappings = conf.getQueueMappings(); - //check if mappings refer to valid queues + List newMappings = + conf.getQueueMappings(); + // check if mappings refer to valid queues for (QueueMapping mapping : newMappings) { - if (!mapping.queue.equals(CURRENT_USER_MAPPING) && - !mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { - CSQueue queue = queues.get(mapping.queue); + String mappingQueue = mapping.getQueue(); + if (!mappingQueue + .equals(UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) + && !mappingQueue + .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) { + CSQueue queue = queues.get(mappingQueue); if (queue == null || !(queue instanceof LeafQueue)) { - throw new IOException( - "mapping contains invalid or non-leaf queue " + mapping.queue); + throw new IOException("mapping contains invalid or non-leaf queue " + + mappingQueue); } } } - //apply the new mappings since they are valid - mappings = newMappings; + // initialize groups if mappings are present - if (mappings.size() > 0) { - groups = new Groups(conf); + if (newMappings.size() > 0) { + Groups groups = new Groups(conf); + return new UserGroupMappingPlacementRule(overrideWithQueueMappings, + newMappings, groups); } + + return null; + } + + private void updatePlacementRules() throws IOException { + List placementRules = new ArrayList<>(); + + // Initialize UserGroupMappingPlacementRule + // TODO, need make this defineable by configuration. + UserGroupMappingPlacementRule ugRule = getUserGroupMappingPlacementRule(); + if (null != ugRule) { + placementRules.add(ugRule); + } + + rmContext.getQueuePlacementManager().updateRules(placementRules); } @Lock(CapacityScheduler.class) @@ -481,7 +495,7 @@ public class CapacityScheduler extends queues, queues, noop); labelManager.reinitializeQueueLabels(getQueueToLabels()); LOG.info("Initialized root queue " + root); - initializeQueueMappings(); + updatePlacementRules(); setQueueAcls(authorizer, queues); } @@ -502,7 +516,7 @@ public class CapacityScheduler extends // Re-configure queues root.reinitialize(newRoot, clusterResource); - initializeQueueMappings(); + updatePlacementRules(); // Re-calculate headroom for active applications root.updateClusterResource(clusterResource, new ResourceLimits( @@ -647,66 +661,8 @@ public class CapacityScheduler extends return queues.get(queueName); } - private static final String CURRENT_USER_MAPPING = "%user"; - - private static final String PRIMARY_GROUP_MAPPING = "%primary_group"; - - private String getMappedQueue(String user) throws IOException { - for (QueueMapping mapping : mappings) { - if (mapping.type == MappingType.USER) { - if (mapping.source.equals(CURRENT_USER_MAPPING)) { - if (mapping.queue.equals(CURRENT_USER_MAPPING)) { - return user; - } - else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { - return groups.getGroups(user).get(0); - } - else { - return mapping.queue; - } - } - if (user.equals(mapping.source)) { - return mapping.queue; - } - } - if (mapping.type == MappingType.GROUP) { - for (String userGroups : groups.getGroups(user)) { - if (userGroups.equals(mapping.source)) { - return mapping.queue; - } - } - } - } - return null; - } - private synchronized void addApplication(ApplicationId applicationId, String queueName, String user, boolean isAppRecovering, Priority priority) { - - if (mappings != null && mappings.size() > 0) { - try { - String mappedQueue = getMappedQueue(user); - if (mappedQueue != null) { - // We have a mapping, should we use it? - if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) - || overrideWithQueueMappings) { - LOG.info("Application " + applicationId + " user " + user - + " mapping [" + queueName + "] to [" + mappedQueue - + "] override " + overrideWithQueueMappings); - queueName = mappedQueue; - RMApp rmApp = rmContext.getRMApps().get(applicationId); - rmApp.setQueue(queueName); - } - } - } catch (IOException ioex) { - String message = "Failed to submit application " + applicationId + - " submitted by user " + user + " reason: " + ioex.getMessage(); - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, message)); - return; - } - } - // sanity checks. CSQueue queue = getQueue(queueName); if (queue == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index be5e6dd4869..b1461c1040c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; @@ -211,35 +212,6 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final Integer DEFAULT_CONFIGURATION_APPLICATION_PRIORITY = 0; - - @Private - public static class QueueMapping { - - public enum MappingType { - - USER("u"), - GROUP("g"); - private final String type; - private MappingType(String type) { - this.type = type; - } - - public String toString() { - return type; - } - - }; - - MappingType type; - String source; - String queue; - - public QueueMapping(MappingType type, String source, String queue) { - this.type = type; - this.source = source; - this.queue = queue; - } - } @Private public static final String AVERAGE_CAPACITY = "average-capacity"; @@ -747,7 +719,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur */ public List getQueueMappings() { List mappings = - new ArrayList(); + new ArrayList(); Collection mappingsString = getTrimmedStringCollection(QUEUE_MAPPING); for (String mappingValue : mappingsString) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index cbeae5becf4..c4356921dff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -19,16 +19,10 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; - -import static org.mockito.Matchers.isA; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -40,8 +34,11 @@ import java.util.HashMap; import java.util.List; import java.util.concurrent.ConcurrentMap; -import org.junit.Assert; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -57,11 +54,14 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; @@ -73,8 +73,11 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -658,6 +661,39 @@ public class TestAppManager{ Assert.assertTrue(msg.contains("preemptedResources=")); Assert.assertTrue(msg.contains("applicationType=MAPREDUCE")); } + + @Test + public void testRMAppSubmitWithQueueChanged() throws Exception { + // Setup a PlacementManager returns a new queue + PlacementManager placementMgr = mock(PlacementManager.class); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ApplicationSubmissionContext ctx = + (ApplicationSubmissionContext) invocation.getArguments()[0]; + ctx.setQueue("newQueue"); + return null; + } + + }).when(placementMgr).placeApplication(any(ApplicationSubmissionContext.class), + any(String.class)); + rmContext.setQueuePlacementManager(placementMgr); + + asContext.setQueue("oldQueue"); + appMonitor.submitApplication(asContext, "test"); + RMApp app = rmContext.getRMApps().get(appId); + Assert.assertNotNull("app is null", app); + Assert.assertEquals("newQueue", asContext.getQueue()); + + // wait for event to be processed + int timeoutSecs = 0; + while ((getAppEventType() == RMAppEventType.KILL) && timeoutSecs++ < 20) { + Thread.sleep(1000); + } + Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, + getAppEventType()); + } private static ResourceScheduler mockResourceScheduler() { ResourceScheduler scheduler = mock(ResourceScheduler.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java new file mode 100644 index 00000000000..61bc8d9be2c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.placement; + +import java.util.Arrays; + +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.security.GroupMappingServiceProvider; +import org.apache.hadoop.security.Groups; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping; +import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestUserGroupMappingPlacementRule { + YarnConfiguration conf = new YarnConfiguration(); + + @Before + public void setup() { + conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, + SimpleGroupsMapping.class, GroupMappingServiceProvider.class); + } + + private void verifyQueueMapping(QueueMapping queueMapping, String inputUser, + String expectedQueue) throws YarnException { + verifyQueueMapping(queueMapping, inputUser, + YarnConfiguration.DEFAULT_QUEUE_NAME, expectedQueue, false); + } + + private void verifyQueueMapping(QueueMapping queueMapping, String inputUser, + String inputQueue, String expectedQueue, boolean overwrite) throws YarnException { + Groups groups = new Groups(conf); + UserGroupMappingPlacementRule rule = + new UserGroupMappingPlacementRule(overwrite, Arrays.asList(queueMapping), + groups); + ApplicationSubmissionContext asc = + Records.newRecord(ApplicationSubmissionContext.class); + asc.setQueue(inputQueue); + String queue = rule.getQueueForApp(asc, inputUser); + Assert.assertEquals(expectedQueue, queue); + } + + @Test + public void testMapping() throws YarnException { + // simple base case for mapping user to queue + verifyQueueMapping(new QueueMapping(MappingType.USER, "a", "q1"), "a", "q1"); + verifyQueueMapping(new QueueMapping(MappingType.GROUP, "agroup", "q1"), + "a", "q1"); + verifyQueueMapping(new QueueMapping(MappingType.USER, "%user", "q2"), "a", + "q2"); + verifyQueueMapping(new QueueMapping(MappingType.USER, "%user", "%user"), + "a", "a"); + verifyQueueMapping(new QueueMapping(MappingType.USER, "%user", + "%primary_group"), "a", "agroup"); + verifyQueueMapping(new QueueMapping(MappingType.GROUP, "asubgroup1", "q1"), + "a", "q1"); + + // specify overwritten, and see if user specified a queue, and it will be + // overridden + verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"), + "user", "q2", "q1", true); + + // if overwritten not specified, it should be which user specified + verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"), + "user", "q2", "q2", false); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java index 005f40bcadd..1df6b4cded9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java @@ -18,22 +18,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; -import java.util.HashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.security.GroupMappingServiceProvider; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping; -import org.junit.After; +import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; +import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping; +import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class TestQueueMappings { @@ -47,15 +41,23 @@ public class TestQueueMappings { CapacitySchedulerConfiguration.ROOT + "." + Q1; private final static String Q2_PATH = CapacitySchedulerConfiguration.ROOT + "." + Q2; + + private CapacityScheduler cs; + private YarnConfiguration conf; + + @Before + public void setup() { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + conf = new YarnConfiguration(csConf); + cs = new CapacityScheduler(); - private MockRM resourceManager; - - @After - public void tearDown() throws Exception { - if (resourceManager != null) { - LOG.info("Stopping the resource manager"); - resourceManager.stop(); - } + RMContext rmContext = TestUtils.getMockRMContext(); + cs.setConf(conf); + cs.setRMContext(rmContext); + cs.init(conf); + cs.start(); } private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { @@ -67,26 +69,32 @@ public class TestQueueMappings { LOG.info("Setup top-level queues q1 and q2"); } + + @Test + public void testQueueMappingSpecifyingNotExistedQueue() { + // if the mapping specifies a queue that does not exist, reinitialize will + // be failed + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, + "u:user:non_existent_queue"); + boolean fail = false; + try { + cs.reinitialize(conf, null); + } catch (IOException ioex) { + fail = true; + } + Assert.assertTrue("queue initialization failed for non-existent q", fail); + } + + @Test + public void testQueueMappingTrimSpaces() throws IOException { + // space trimming + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, " u : a : " + Q1); + cs.reinitialize(conf, null); + checkQMapping(new QueueMapping(MappingType.USER, "a", Q1)); + } @Test (timeout = 60000) - public void testQueueMapping() throws Exception { - CapacitySchedulerConfiguration csConf = - new CapacitySchedulerConfiguration(); - setupQueueConfiguration(csConf); - YarnConfiguration conf = new YarnConfiguration(csConf); - CapacityScheduler cs = new CapacityScheduler(); - - RMContext rmContext = TestUtils.getMockRMContext(); - cs.setConf(conf); - cs.setRMContext(rmContext); - cs.init(conf); - cs.start(); - - conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, - SimpleGroupsMapping.class, GroupMappingServiceProvider.class); - conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE, - "true"); - + public void testQueueMappingParsingInvalidCases() throws Exception { // configuration parsing tests - negative test cases checkInvalidQMapping(conf, cs, "x:a:b", "invalid specifier"); checkInvalidQMapping(conf, cs, "u:a", "no queue specified"); @@ -97,119 +105,6 @@ public class TestQueueMappings { checkInvalidQMapping(conf, cs, "u::", "empty source and queue"); checkInvalidQMapping(conf, cs, "u:", "missing source missing queue"); checkInvalidQMapping(conf, cs, "u:a:", "empty source missing q"); - - // simple base case for mapping user to queue - conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:a:" + Q1); - cs.reinitialize(conf, null); - checkQMapping("a", Q1, cs); - - // group mapping test - conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:agroup:" + Q1); - cs.reinitialize(conf, null); - checkQMapping("a", Q1, cs); - - // %user tests - conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:" + Q2); - cs.reinitialize(conf, null); - checkQMapping("a", Q2, cs); - - conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:%user"); - cs.reinitialize(conf, null); - checkQMapping("a", "a", cs); - - // %primary_group tests - conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, - "u:%user:%primary_group"); - cs.reinitialize(conf, null); - checkQMapping("a", "agroup", cs); - - // non-primary group mapping - conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, - "g:asubgroup1:" + Q1); - cs.reinitialize(conf, null); - checkQMapping("a", Q1, cs); - - // space trimming - conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, " u : a : " + Q1); - cs.reinitialize(conf, null); - checkQMapping("a", Q1, cs); - - csConf = new CapacitySchedulerConfiguration(); - csConf.set(YarnConfiguration.RM_SCHEDULER, - CapacityScheduler.class.getName()); - setupQueueConfiguration(csConf); - conf = new YarnConfiguration(csConf); - - resourceManager = new MockRM(csConf); - resourceManager.start(); - - conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, - SimpleGroupsMapping.class, GroupMappingServiceProvider.class); - conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE, - "true"); - conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1); - resourceManager.getResourceScheduler().reinitialize(conf, null); - - // ensure that if the user specifies a Q that is still overriden - checkAppQueue(resourceManager, "user", Q2, Q1); - - // toggle admin override and retry - conf.setBoolean( - CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE, - false); - conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1); - setupQueueConfiguration(csConf); - resourceManager.getResourceScheduler().reinitialize(conf, null); - - checkAppQueue(resourceManager, "user", Q2, Q2); - - // ensure that if a user does not specify a Q, the user mapping is used - checkAppQueue(resourceManager, "user", null, Q1); - - conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:usergroup:" + Q2); - setupQueueConfiguration(csConf); - resourceManager.getResourceScheduler().reinitialize(conf, null); - - // ensure that if a user does not specify a Q, the group mapping is used - checkAppQueue(resourceManager, "user", null, Q2); - - // if the mapping specifies a queue that does not exist, the job is rejected - conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, - "u:user:non_existent_queue"); - setupQueueConfiguration(csConf); - - boolean fail = false; - try { - resourceManager.getResourceScheduler().reinitialize(conf, null); - } - catch (IOException ioex) { - fail = true; - } - Assert.assertTrue("queue initialization failed for non-existent q", fail); - resourceManager.stop(); - } - - private void checkAppQueue(MockRM resourceManager, String user, - String submissionQueue, String expected) - throws Exception { - RMApp app = resourceManager.submitApp(200, "name", user, - new HashMap(), false, submissionQueue, -1, - null, "MAPREDUCE", false); - RMAppState expectedState = expected.isEmpty() ? RMAppState.FAILED - : RMAppState.ACCEPTED; - resourceManager.waitForState(app.getApplicationId(), expectedState); - // get scheduler app - CapacityScheduler cs = (CapacityScheduler) - resourceManager.getResourceScheduler(); - SchedulerApplication schedulerApp = - cs.getSchedulerApplications().get(app.getApplicationId()); - String queue = ""; - if (schedulerApp != null) { - queue = schedulerApp.getQueue().getQueueName(); - } - Assert.assertTrue("expected " + expected + " actual " + queue, - expected.equals(queue)); - Assert.assertEquals(expected, app.getQueue()); } private void checkInvalidQMapping(YarnConfiguration conf, @@ -227,10 +122,12 @@ public class TestQueueMappings { fail); } - private void checkQMapping(String user, String expected, CapacityScheduler cs) + private void checkQMapping(QueueMapping expected) throws IOException { - String actual = cs.getMappedQueueForTest(user); - Assert.assertTrue("expected " + expected + " actual " + actual, - expected.equals(actual)); + UserGroupMappingPlacementRule rule = + (UserGroupMappingPlacementRule) cs.getRMContext() + .getQueuePlacementManager().getPlacementRules().get(0); + QueueMapping queueMapping = rule.getQueueMappings().get(0); + Assert.assertEquals(queueMapping, expected); } }