From 5262b7ba4d018562d4e7d60772af4ddc3d770a23 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Sat, 28 Jan 2012 01:32:29 +0000 Subject: [PATCH] MAPREDUCE-3732. Modified CapacityScheduler to use only users with pending requests for computing user-limits. Contributed by Arun C Murthy. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1236953 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../scheduler/ActiveUsersManager.java | 109 +++++++++++ .../scheduler/AppSchedulingInfo.java | 67 +++++-- .../scheduler/QueueMetrics.java | 40 ++++ .../scheduler/SchedulerApp.java | 5 +- .../scheduler/capacity/CSQueue.java | 7 + .../scheduler/capacity/CapacityScheduler.java | 3 +- .../scheduler/capacity/LeafQueue.java | 34 +++- .../scheduler/capacity/ParentQueue.java | 7 + .../scheduler/fifo/FifoScheduler.java | 19 +- .../capacity/TestApplicationLimits.java | 9 +- .../scheduler/capacity/TestLeafQueue.java | 183 ++++++++++++++---- 12 files changed, 413 insertions(+), 73 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 20cbf3b625b..1e1c6c98267 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -209,6 +209,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3693. Added mapreduce.admin.user.env to mapred-default.xml. (Roman Shapshonik via acmurthy) + MAPREDUCE-3732. Modified CapacityScheduler to use only users with pending + requests for computing user-limits. (Arun C Murthy via vinodkv) + OPTIMIZATIONS MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java new file mode 100644 index 00000000000..18fbca654c2 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.Lock; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + * {@link ActiveUsersManager} tracks active users in the system. + * A user is deemed to be active if he has any running applications with + * outstanding resource requests. + * + * An active user is defined as someone with outstanding resource requests. + */ +@Private +public class ActiveUsersManager { + + private static final Log LOG = LogFactory.getLog(ActiveUsersManager.class); + + private final QueueMetrics metrics; + + private int activeUsers = 0; + private Map> usersApplications = + new HashMap>(); + + public ActiveUsersManager(QueueMetrics metrics) { + this.metrics = metrics; + } + + /** + * An application has new outstanding requests. + * + * @param user application user + * @param applicationId activated application + */ + @Lock({Queue.class, SchedulerApp.class}) + synchronized public void activateApplication( + String user, ApplicationId applicationId) { + Set userApps = usersApplications.get(user); + if (userApps == null) { + userApps = new HashSet(); + usersApplications.put(user, userApps); + ++activeUsers; + metrics.incrActiveUsers(); + LOG.debug("User " + user + " added to activeUsers, currently: " + + activeUsers); + } + if (userApps.add(applicationId)) { + metrics.activateApp(user); + } + } + + /** + * An application has no more outstanding requests. + * + * @param user application user + * @param applicationId deactivated application + */ + @Lock({Queue.class, SchedulerApp.class}) + synchronized public void deactivateApplication( + String user, ApplicationId applicationId) { + Set userApps = usersApplications.get(user); + if (userApps != null) { + if (userApps.remove(applicationId)) { + metrics.deactivateApp(user); + } + if (userApps.isEmpty()) { + usersApplications.remove(user); + --activeUsers; + metrics.decrActiveUsers(); + LOG.debug("User " + user + " removed from activeUsers, currently: " + + activeUsers); + } + } + } + + /** + * Get number of active users i.e. users with applications which have pending + * resource requests. + * @return number of active users + */ + @Lock({Queue.class, SchedulerApp.class}) + synchronized public int getNumActiveUsers() { + return activeUsers; + } +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 6dae436a339..2040505be67 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -36,12 +36,11 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; /** * This class keeps track of all the consumption of an application. This also @@ -59,27 +58,27 @@ public class AppSchedulingInfo { final String user; private final AtomicInteger containerIdCounter = new AtomicInteger(0); - private final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); - final Set priorities = new TreeSet( new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator()); final Map> requests = new HashMap>(); - private final ApplicationStore store; - + //private final ApplicationStore store; + private final ActiveUsersManager activeUsersManager; + /* Allocated by scheduler */ boolean pending = true; // for app metrics public AppSchedulingInfo(ApplicationAttemptId appAttemptId, - String user, Queue queue, ApplicationStore store) { + String user, Queue queue, ActiveUsersManager activeUsersManager, + ApplicationStore store) { this.applicationAttemptId = appAttemptId; this.applicationId = appAttemptId.getApplicationId(); this.queue = queue; this.queueName = queue.getQueueName(); this.user = user; - this.store = store; + //this.store = store; + this.activeUsersManager = activeUsersManager; } public ApplicationId getApplicationId() { @@ -123,7 +122,8 @@ public int getNewContainerId() { * @param requests * resources to be acquired */ - synchronized public void updateResourceRequests(List requests) { + synchronized public void updateResourceRequests( + List requests) { QueueMetrics metrics = queue.getMetrics(); // Update resource requests for (ResourceRequest request : requests) { @@ -138,6 +138,16 @@ synchronized public void updateResourceRequests(List requests) + request); } updatePendingResources = true; + + // Premature optimization? + // Assumes that we won't see more than one priority request updated + // in one call, reasonable assumption... however, it's totally safe + // to activate same application more than once. + // Thus we don't need another loop ala the one in decrementOutstanding() + // which is needed during deactivate. + if (request.getNumContainers() > 0) { + activeUsersManager.activateApplication(user, applicationId); + } } Map asks = this.requests.get(priority); @@ -246,10 +256,7 @@ synchronized private void allocateNodeLocal(SchedulerNode node, Priority priorit this.requests.get(priority).remove(node.getRackName()); } - // Do not remove ANY - ResourceRequest offSwitchRequest = requests.get(priority).get( - RMNode.ANY); - offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1); + decrementOutstanding(requests.get(priority).get(RMNode.ANY)); } /** @@ -271,10 +278,7 @@ synchronized private void allocateRackLocal(SchedulerNode node, Priority priorit this.requests.get(priority).remove(node.getRackName()); } - // Do not remove ANY - ResourceRequest offSwitchRequest = requests.get(priority).get( - RMNode.ANY); - offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1); + decrementOutstanding(requests.get(priority).get(RMNode.ANY)); } /** @@ -291,11 +295,32 @@ synchronized private void allocateOffSwitch(SchedulerNode node, Priority priorit allocate(container); // Update future requirements - - // Do not remove ANY - offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1); + decrementOutstanding(offSwitchRequest); } + synchronized private void decrementOutstanding( + ResourceRequest offSwitchRequest) { + int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1; + + // Do not remove ANY + offSwitchRequest.setNumContainers(numOffSwitchContainers); + + // Do we have any outstanding requests? + // If there is nothing, we need to deactivate this application + if (numOffSwitchContainers == 0) { + boolean deactivate = true; + for (Priority priority : getPriorities()) { + ResourceRequest request = getResourceRequest(priority, RMNodeImpl.ANY); + if (request.getNumContainers() > 0) { + deactivate = false; + break; + } + } + if (deactivate) { + activeUsersManager.deactivateApplication(user, applicationId); + } + } + } synchronized private void allocate(Container container) { // Update consumption and track allocations //TODO: fixme sharad diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 5005d673582..29a0e2f8c9f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -60,6 +60,8 @@ public class QueueMetrics { @Metric("# of pending containers") MutableGaugeInt pendingContainers; @Metric("# of reserved memory in GiB") MutableGaugeInt reservedGB; @Metric("# of reserved containers") MutableGaugeInt reservedContainers; + @Metric("# of active users") MutableGaugeInt activeUsers; + @Metric("# of active users") MutableGaugeInt activeApplications; static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class); static final int GB = 1024; // resource.memory is in MB @@ -287,6 +289,36 @@ public void unreserveResource(String user, Resource res) { } } + public void incrActiveUsers() { + activeUsers.incr(); + } + + public void decrActiveUsers() { + activeUsers.decr(); + } + + public void activateApp(String user) { + activeApplications.incr(); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.activateApp(user); + } + if (parent != null) { + parent.activateApp(user); + } + } + + public void deactivateApp(String user) { + activeApplications.decr(); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.deactivateApp(user); + } + if (parent != null) { + parent.deactivateApp(user); + } + } + public int getAppsSubmitted() { return appsSubmitted.value(); } @@ -338,4 +370,12 @@ public int getReservedGB() { public int getReservedContainers() { return reservedContainers.value(); } + + public int getActiveUsers() { + return activeUsers.value(); + } + + public int getActiveApps() { + return activeApplications.value(); + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java index 8e25e3d2221..c8ed2c08554 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java @@ -102,11 +102,12 @@ public class SchedulerApp { private final RMContext rmContext; public SchedulerApp(ApplicationAttemptId applicationAttemptId, - String user, Queue queue, + String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext, ApplicationStore store) { this.rmContext = rmContext; this.appSchedulingInfo = - new AppSchedulingInfo(applicationAttemptId, user, queue, store); + new AppSchedulingInfo(applicationAttemptId, user, queue, + activeUsersManager, store); this.queue = queue; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 01532de9911..b646e14fb8d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @@ -197,6 +198,12 @@ public void reinitialize(CSQueue queue, Resource clusterResource) */ public void updateClusterResource(Resource clusterResource); + /** + * Get the {@link ActiveUsersManager} for the queue. + * @return the ActiveUsersManager for the queue + */ + public ActiveUsersManager getActiveUsersManager(); + /** * Recover the state of the queue * @param clusterResource the resource of the cluster diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 364494b76cb..1dd92a74070 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -355,7 +355,8 @@ synchronized CSQueue getQueue(String queueName) { // TODO: Fix store SchedulerApp SchedulerApp = - new SchedulerApp(applicationAttemptId, user, queue, rmContext, null); + new SchedulerApp(applicationAttemptId, user, queue, + queue.getActiveUsersManager(), rmContext, null); // Submit to the queue try { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 1faef7a1e8b..08ee09463a0 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -37,6 +37,8 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.Lock; +import org.apache.hadoop.yarn.Lock.NoLock; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -58,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; @@ -120,6 +123,8 @@ public class LeafQueue implements CSQueue { private CapacitySchedulerContext scheduler; + private final ActiveUsersManager activeUsersManager; + final static int DEFAULT_AM_RESOURCE = 2 * 1024; public LeafQueue(CapacitySchedulerContext cs, @@ -132,7 +137,7 @@ public LeafQueue(CapacitySchedulerContext cs, this.metrics = old != null ? old.getMetrics() : QueueMetrics.forQueue(getQueuePath(), parent, cs.getConfiguration().getEnableUserMetrics()); - + this.activeUsersManager = new ActiveUsersManager(metrics); this.minimumAllocation = cs.getMinimumResourceCapability(); this.maximumAllocation = cs.getMaximumResourceCapability(); this.minimumAllocationFactor = @@ -348,6 +353,11 @@ public synchronized int getMaximumActiveApplicationsPerUser() { return maxActiveApplicationsPerUser; } + @Override + public ActiveUsersManager getActiveUsersManager() { + return activeUsersManager; + } + @Override public synchronized float getUsedCapacity() { return usedCapacity; @@ -674,6 +684,12 @@ public synchronized void removeApplication(SchedulerApp application, User user) // Check if we can activate more applications activateApplications(); + // Inform the activeUsersManager + synchronized (application) { + activeUsersManager.deactivateApplication( + application.getUser(), application.getApplicationId()); + } + LOG.info("Application removed -" + " appId: " + application.getApplicationId() + " user: " + application.getUser() + @@ -837,6 +853,7 @@ private synchronized boolean assignToQueue(Resource clusterResource, return true; } + @Lock({LeafQueue.class, SchedulerApp.class}) private Resource computeAndSetUserResourceLimit(SchedulerApp application, Resource clusterResource, Resource required) { String user = application.getUser(); @@ -853,6 +870,7 @@ private int roundUp(int memory) { minimumAllocation.getMemory(); } + @Lock(NoLock.class) private Resource computeUserLimit(SchedulerApp application, Resource clusterResource, Resource required) { // What is our current capacity? @@ -877,11 +895,8 @@ private Resource computeUserLimit(SchedulerApp application, // queue's configured capacity * user-limit-factor. // Also, the queue's configured capacity should be higher than // queue-hard-limit * ulMin - - String userName = application.getUser(); - final int activeUsers = users.size(); - User user = getUser(userName); + final int activeUsers = activeUsersManager.getNumActiveUsers(); int limit = roundUp( @@ -893,12 +908,13 @@ private Resource computeUserLimit(SchedulerApp application, ); if (LOG.isDebugEnabled()) { + String userName = application.getUser(); LOG.debug("User limit computation for " + userName + " in queue " + getQueueName() + " userLimit=" + userLimit + " userLimitFactor=" + userLimitFactor + " required: " + required + - " consumed: " + user.getConsumedResources() + + " consumed: " + getUser(userName).getConsumedResources() + " limit: " + limit + " queueCapacity: " + queueCapacity + " qconsumed: " + consumed + @@ -1308,8 +1324,10 @@ public synchronized void updateClusterResource(Resource clusterResource) { // Update application properties for (SchedulerApp application : activeApplications) { - computeAndSetUserResourceLimit( - application, clusterResource, Resources.none()); + synchronized (application) { + computeAndSetUserResourceLimit( + application, clusterResource, Resources.none()); + } } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 7d3acc5ad38..39aa197f2b0 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; @@ -240,6 +241,12 @@ public float getMaximumCapacity() { return maximumCapacity; } + @Override + public ActiveUsersManager getActiveUsersManager() { + // Should never be called since all applications are submitted to LeafQueues + return null; + } + @Override public synchronized float getUsedCapacity() { return usedCapacity; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 145cb8d20d5..9f5f2cc8ca1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; @@ -124,10 +125,11 @@ public class FifoScheduler implements ResourceScheduler { private Map applications = new TreeMap(); + + private final ActiveUsersManager activeUsersManager; private static final String DEFAULT_QUEUE_NAME = "default"; - private final QueueMetrics metrics = - QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false); + private final QueueMetrics metrics; private final Queue DEFAULT_QUEUE = new Queue() { @Override @@ -174,6 +176,11 @@ public List getQueueUserAclInfo( } }; + public FifoScheduler() { + metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false); + activeUsersManager = new ActiveUsersManager(metrics); + } + @Override public Resource getMinimumResourceCapability() { return minimumAllocation; @@ -288,7 +295,7 @@ private synchronized void addApplication(ApplicationAttemptId appAttemptId, String user) { // TODO: Fix store SchedulerApp schedulerApp = - new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, + new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager, this.rmContext, null); applications.put(appAttemptId, schedulerApp); metrics.submitApp(user); @@ -318,6 +325,12 @@ private synchronized void doneApplication( RMContainerEventType.KILL); } + // Inform the activeUsersManager + synchronized (application) { + activeUsersManager.deactivateApplication( + application.getUser(), application.getApplicationId()); + } + // Clean up pending requests, metrics etc. application.stop(rmAppAttemptFinalState); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index f1a1d956ebf..9c5851a5db8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -302,7 +302,8 @@ public void testHeadroom() throws Exception { final ApplicationAttemptId appAttemptId_0_0 = TestUtils.getMockApplicationAttemptId(0, 0); SchedulerApp app_0_0 = - spy(new SchedulerApp(appAttemptId_0_0, user_0, queue, rmContext, null)); + spy(new SchedulerApp(appAttemptId_0_0, user_0, queue, + queue.getActiveUsersManager(), rmContext, null)); queue.submitApplication(app_0_0, user_0, A); List app_0_0_requests = new ArrayList(); @@ -320,7 +321,8 @@ public void testHeadroom() throws Exception { final ApplicationAttemptId appAttemptId_0_1 = TestUtils.getMockApplicationAttemptId(1, 0); SchedulerApp app_0_1 = - spy(new SchedulerApp(appAttemptId_0_1, user_0, queue, rmContext, null)); + spy(new SchedulerApp(appAttemptId_0_1, user_0, queue, + queue.getActiveUsersManager(), rmContext, null)); queue.submitApplication(app_0_1, user_0, A); List app_0_1_requests = new ArrayList(); @@ -338,7 +340,8 @@ public void testHeadroom() throws Exception { final ApplicationAttemptId appAttemptId_1_0 = TestUtils.getMockApplicationAttemptId(2, 0); SchedulerApp app_1_0 = - spy(new SchedulerApp(appAttemptId_1_0, user_1, queue, rmContext, null)); + spy(new SchedulerApp(appAttemptId_1_0, user_1, queue, + queue.getActiveUsersManager(), rmContext, null)); queue.submitApplication(app_1_0, user_1, A); List app_1_0_requests = new ArrayList(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index bee6e02553a..5e4243cccb7 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -18,8 +18,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -28,9 +38,6 @@ import java.util.List; import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; @@ -48,19 +55,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; - import org.junit.After; import org.junit.Before; import org.junit.Test; - import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; public class TestLeafQueue { - private static final Log LOG = LogFactory.getLog(TestLeafQueue.class); private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -136,7 +141,6 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { final String Q_C1 = Q_C + "." + C1; conf.setCapacity(Q_C1, 100); - LOG.info("Setup top-level queues a and b"); } static LeafQueue stubLeafQueue(LeafQueue queue) { @@ -217,13 +221,15 @@ public void testSingleQueueOneUserMetrics() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null); + new SchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, B); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null); + new SchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_0, B); // same user @@ -264,13 +270,15 @@ public void testSingleQueueWithOneUser() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null); + new SchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null); + new SchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_0, A); // same user @@ -371,6 +379,99 @@ public void testSingleQueueWithOneUser() throws Exception { assertEquals(1, a.getMetrics().getAvailableGB()); } + @Test + public void testUserLimits() throws Exception { + // Mock the queue + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + //unset maxCapacity + a.setMaxCapacity(1.0f); + + // Users + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + SchedulerApp app_0 = + new SchedulerApp(appAttemptId_0, user_0, a, + a.getActiveUsersManager(), rmContext, null); + a.submitApplication(app_0, user_0, A); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + SchedulerApp app_1 = + new SchedulerApp(appAttemptId_1, user_0, a, + a.getActiveUsersManager(), rmContext, null); + a.submitApplication(app_1, user_0, A); // same user + + final ApplicationAttemptId appAttemptId_2 = + TestUtils.getMockApplicationAttemptId(2, 0); + SchedulerApp app_2 = + new SchedulerApp(appAttemptId_2, user_1, a, + a.getActiveUsersManager(), rmContext, null); + a.submitApplication(app_2, user_1, A); + + // Setup some nodes + String host_0 = "host_0"; + SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + String host_1 = "host_1"; + SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (8*GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priority = TestUtils.createMockPriority(1); + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, priority, + recordFactory))); + + app_1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority, + recordFactory))); + + /** + * Start testing... + */ + + // Set user-limit + a.setUserLimit(50); + a.setUserLimitFactor(2); + + // Now, only user_0 should be active since he is the only one with + // outstanding requests + assertEquals("There should only be 1 active user!", + 1, a.getActiveUsersManager().getNumActiveUsers()); + + // This commented code is key to test 'activeUsers'. + // It should fail the test if uncommented since + // it would increase 'activeUsers' to 2 and stop user_2 + // Pre MAPREDUCE-3732 this test should fail without this block too +// app_2.updateResourceRequests(Collections.singletonList( +// TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority, +// recordFactory))); + + // 1 container to user_0 + a.assignContainers(clusterResource, node_0); + assertEquals(2*GB, a.getUsedResources().getMemory()); + assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); + + // Again one to user_0 since he hasn't exceeded user limit yet + a.assignContainers(clusterResource, node_0); + assertEquals(3*GB, a.getUsedResources().getMemory()); + assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + + // One more to user_0 since he is the only active user + a.assignContainers(clusterResource, node_1); + assertEquals(4*GB, a.getUsedResources().getMemory()); + assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); + } + @Test public void testSingleQueueWithMultipleUsers() throws Exception { @@ -388,15 +489,31 @@ public void testSingleQueueWithMultipleUsers() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null); + new SchedulerApp(appAttemptId_0, user_0, a, + a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null); + new SchedulerApp(appAttemptId_1, user_0, a, + a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_1, user_0, A); // same user + final ApplicationAttemptId appAttemptId_2 = + TestUtils.getMockApplicationAttemptId(2, 0); + SchedulerApp app_2 = + new SchedulerApp(appAttemptId_2, user_1, a, + a.getActiveUsersManager(), rmContext, null); + a.submitApplication(app_2, user_1, A); + + final ApplicationAttemptId appAttemptId_3 = + TestUtils.getMockApplicationAttemptId(3, 0); + SchedulerApp app_3 = + new SchedulerApp(appAttemptId_3, user_2, a, + a.getActiveUsersManager(), rmContext, null); + a.submitApplication(app_3, user_2, A); + // Setup some nodes String host_0 = "host_0"; SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); @@ -438,19 +555,8 @@ public void testSingleQueueWithMultipleUsers() throws Exception { assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); - - // Submit more apps - final ApplicationAttemptId appAttemptId_2 = - TestUtils.getMockApplicationAttemptId(2, 0); - SchedulerApp app_2 = - new SchedulerApp(appAttemptId_2, user_1, a, rmContext, null); - a.submitApplication(app_2, user_1, A); - - final ApplicationAttemptId appAttemptId_3 = - TestUtils.getMockApplicationAttemptId(3, 0); - SchedulerApp app_3 = - new SchedulerApp(appAttemptId_3, user_2, a, rmContext, null); - a.submitApplication(app_3, user_2, A); + + // Submit resource requests for other apps now to 'activate' them app_2.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(RMNodeImpl.ANY, 3*GB, 1, priority, @@ -558,13 +664,15 @@ public void testReservation() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null); + new SchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_1, a, rmContext, null); + new SchedulerApp(appAttemptId_1, user_1, a, + mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_1, A); // Setup some nodes @@ -657,13 +765,15 @@ public void testReservationExchange() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null); + new SchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_1, a, rmContext, null); + new SchedulerApp(appAttemptId_1, user_1, a, + mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_1, A); // Setup some nodes @@ -770,7 +880,8 @@ public void testLocalityScheduling() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); SchedulerApp app_0 = - spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null)); + spy(new SchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext, null)); a.submitApplication(app_0, user_0, A); // Setup some nodes and racks @@ -899,7 +1010,8 @@ public void testApplicationPriorityScheduling() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); SchedulerApp app_0 = - spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null)); + spy(new SchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext, null)); a.submitApplication(app_0, user_0, A); // Setup some nodes and racks @@ -1028,7 +1140,8 @@ public void testSchedulingConstraints() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); SchedulerApp app_0 = - spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null)); + spy(new SchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext, null)); a.submitApplication(app_0, user_0, A); // Setup some nodes and racks