diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java new file mode 100644 index 00000000000..e601086b8c4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java @@ -0,0 +1,302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; + +/** + * Handles tracking and enforcement for user and queue maxRunningApps + * constraints + */ +public class MaxRunningAppsEnforcer { + private final QueueManager queueMgr; + + // Tracks the number of running applications by user. + private final Map usersNumRunnableApps; + @VisibleForTesting + final ListMultimap usersNonRunnableApps; + + public MaxRunningAppsEnforcer(QueueManager queueMgr) { + this.queueMgr = queueMgr; + this.usersNumRunnableApps = new HashMap(); + this.usersNonRunnableApps = ArrayListMultimap.create(); + } + + /** + * Checks whether making the application runnable would exceed any + * maxRunningApps limits. + */ + public boolean canAppBeRunnable(FSQueue queue, String user) { + Integer userNumRunnable = usersNumRunnableApps.get(user); + if (userNumRunnable == null) { + userNumRunnable = 0; + } + if (userNumRunnable >= queueMgr.getUserMaxApps(user)) { + return false; + } + // Check queue and all parent queues + while (queue != null) { + int queueMaxApps = queueMgr.getQueueMaxApps(queue.getName()); + if (queue.getNumRunnableApps() >= queueMaxApps) { + return false; + } + queue = queue.getParent(); + } + + return true; + } + + /** + * Tracks the given new runnable app for purposes of maintaining max running + * app limits. + */ + public void trackRunnableApp(FSSchedulerApp app) { + String user = app.getUser(); + FSLeafQueue queue = app.getQueue(); + // Increment running counts for all parent queues + FSParentQueue parent = queue.getParent(); + while (parent != null) { + parent.incrementRunnableApps(); + parent = parent.getParent(); + } + + Integer userNumRunnable = usersNumRunnableApps.get(user); + usersNumRunnableApps.put(user, (userNumRunnable == null ? 0 + : userNumRunnable) + 1); + } + + /** + * Tracks the given new non runnable app so that it can be made runnable when + * it would not violate max running app limits. + */ + public void trackNonRunnableApp(FSSchedulerApp app) { + String user = app.getUser(); + usersNonRunnableApps.put(user, app.getAppSchedulable()); + } + + /** + * Updates the relevant tracking variables after a runnable app with the given + * queue and user has been removed. Checks to see whether any other applications + * are now runnable and makes them so. + * + * Runs in O(n log(n)) where n is the number of queues that are under the + * highest queue that went from having no slack to having slack. + */ + public void updateRunnabilityOnAppRemoval(FSSchedulerApp app) { + // Update usersRunnableApps + String user = app.getUser(); + int newUserNumRunning = usersNumRunnableApps.get(user) - 1; + if (newUserNumRunning == 0) { + usersNumRunnableApps.remove(user); + } else { + usersNumRunnableApps.put(user, newUserNumRunning); + } + + // Update runnable app bookkeeping for queues: + // childqueueX might have no pending apps itself, but if a queue higher up + // in the hierarchy parentqueueY has a maxRunningApps set, an app completion + // in childqueueX could allow an app in some other distant child of + // parentqueueY to become runnable. + // An app removal will only possibly allow another app to become runnable if + // the queue was already at its max before the removal. + // Thus we find the ancestor queue highest in the tree for which the app + // that was at its maxRunningApps before the removal. + FSLeafQueue queue = app.getQueue(); + FSQueue highestQueueWithAppsNowRunnable = (queue.getNumRunnableApps() == + queueMgr.getQueueMaxApps(queue.getName()) - 1) ? queue : null; + FSParentQueue parent = queue.getParent(); + while (parent != null) { + if (parent.getNumRunnableApps() == queueMgr.getQueueMaxApps(parent + .getName())) { + highestQueueWithAppsNowRunnable = parent; + } + parent.decrementRunnableApps(); + parent = parent.getParent(); + } + + List> appsNowMaybeRunnable = + new ArrayList>(); + + // Compile lists of apps which may now be runnable + // We gather lists instead of building a set of all non-runnable apps so + // that this whole operation can be O(number of queues) instead of + // O(number of apps) + if (highestQueueWithAppsNowRunnable != null) { + gatherPossiblyRunnableAppLists(highestQueueWithAppsNowRunnable, + appsNowMaybeRunnable); + } + if (newUserNumRunning == queueMgr.getUserMaxApps(user) - 1) { + List userWaitingApps = usersNonRunnableApps.get(user); + if (userWaitingApps != null) { + appsNowMaybeRunnable.add(userWaitingApps); + } + } + + // Scan through and check whether this means that any apps are now runnable + Iterator iter = new MultiListStartTimeIterator( + appsNowMaybeRunnable); + FSSchedulerApp prev = null; + int numNowRunnable = 0; + while (iter.hasNext()) { + FSSchedulerApp next = iter.next(); + if (next == prev) { + continue; + } + + if (canAppBeRunnable(next.getQueue(), next.getUser())) { + trackRunnableApp(next); + AppSchedulable appSched = next.getAppSchedulable(); + next.getQueue().makeAppRunnable(appSched); + if (!usersNonRunnableApps.remove(next.getUser(), appSched)) { + throw new IllegalStateException("Waiting app " + next + + " expected to be in usersNonRunnableApps"); + } + + // No more than one app per list will be able to be made runnable, so + // we can stop looking after we've found that many + if (numNowRunnable >= appsNowMaybeRunnable.size()) { + break; + } + } + + prev = next; + } + } + + /** + * Stops tracking the given non-runnable app + */ + public void untrackNonRunnableApp(FSSchedulerApp app) { + usersNonRunnableApps.remove(app.getUser(), app.getAppSchedulable()); + } + + /** + * Traverses the queue hierarchy under the given queue to gather all lists + * of non-runnable applications. + */ + private void gatherPossiblyRunnableAppLists(FSQueue queue, + List> appLists) { + if (queue.getNumRunnableApps() < queueMgr.getQueueMaxApps(queue.getName())) { + if (queue instanceof FSLeafQueue) { + appLists.add(((FSLeafQueue)queue).getNonRunnableAppSchedulables()); + } else { + for (FSQueue child : queue.getChildQueues()) { + gatherPossiblyRunnableAppLists(child, appLists); + } + } + } + } + + /** + * Takes a list of lists, each of which is ordered by start time, and returns + * their elements in order of start time. + * + * We maintain positions in each of the lists. Each next() call advances + * the position in one of the lists. We maintain a heap that orders lists + * by the start time of the app in the current position in that list. + * This allows us to pick which list to advance in O(log(num lists)) instead + * of O(num lists) time. + */ + private static class MultiListStartTimeIterator implements + Iterator { + + private List[] appLists; + private int[] curPositionsInAppLists; + private PriorityQueue appListsByCurStartTime; + + @SuppressWarnings("unchecked") + public MultiListStartTimeIterator(List> appListList) { + appLists = appListList.toArray(new List[appListList.size()]); + curPositionsInAppLists = new int[appLists.length]; + appListsByCurStartTime = new PriorityQueue(); + for (int i = 0; i < appLists.length; i++) { + long time = appLists[i].isEmpty() ? Long.MAX_VALUE : appLists[i].get(0) + .getStartTime(); + appListsByCurStartTime.add(new IndexAndTime(i, time)); + } + } + + @Override + public boolean hasNext() { + return !appListsByCurStartTime.isEmpty() + && appListsByCurStartTime.peek().time != Long.MAX_VALUE; + } + + @Override + public FSSchedulerApp next() { + IndexAndTime indexAndTime = appListsByCurStartTime.remove(); + int nextListIndex = indexAndTime.index; + AppSchedulable next = appLists[nextListIndex] + .get(curPositionsInAppLists[nextListIndex]); + curPositionsInAppLists[nextListIndex]++; + + if (curPositionsInAppLists[nextListIndex] < appLists[nextListIndex].size()) { + indexAndTime.time = appLists[nextListIndex] + .get(curPositionsInAppLists[nextListIndex]).getStartTime(); + } else { + indexAndTime.time = Long.MAX_VALUE; + } + appListsByCurStartTime.add(indexAndTime); + + return next.getApp(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not supported"); + } + + private static class IndexAndTime implements Comparable { + public int index; + public long time; + + public IndexAndTime(int index, long time) { + this.index = index; + this.time = time; + } + + @Override + public int compareTo(IndexAndTime o) { + return time < o.time ? -1 : (time > o.time ? 1 : 0); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof IndexAndTime)) { + return false; + } + IndexAndTime other = (IndexAndTime)o; + return other.time == time; + } + + @Override + public int hashCode() { + return (int)time; + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java new file mode 100644 index 00000000000..20f6e3d7757 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.junit.Before; +import org.junit.Test; + +public class TestMaxRunningAppsEnforcer { + private QueueManager queueManager; + private Map queueMaxApps; + private Map userMaxApps; + private MaxRunningAppsEnforcer maxAppsEnforcer; + private int appNum; + private TestFairScheduler.MockClock clock; + + @Before + public void setup() throws Exception { + clock = new TestFairScheduler.MockClock(); + FairScheduler scheduler = mock(FairScheduler.class); + when(scheduler.getConf()).thenReturn( + new FairSchedulerConfiguration(new Configuration())); + when(scheduler.getClock()).thenReturn(clock); + + queueManager = new QueueManager(scheduler); + queueManager.initialize(); + + queueMaxApps = queueManager.info.queueMaxApps; + userMaxApps = queueManager.info.userMaxApps; + maxAppsEnforcer = new MaxRunningAppsEnforcer(queueManager); + appNum = 0; + } + + private FSSchedulerApp addApp(FSLeafQueue queue, String user) { + ApplicationId appId = ApplicationId.newInstance(0l, appNum++); + ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0); + boolean runnable = maxAppsEnforcer.canAppBeRunnable(queue, user); + FSSchedulerApp app = new FSSchedulerApp(attId, user, queue, null, null); + queue.addApp(app, runnable); + if (runnable) { + maxAppsEnforcer.trackRunnableApp(app); + } else { + maxAppsEnforcer.trackNonRunnableApp(app); + } + return app; + } + + private void removeApp(FSSchedulerApp app) { + app.getQueue().removeApp(app); + maxAppsEnforcer.updateRunnabilityOnAppRemoval(app); + } + + @Test + public void testRemoveDoesNotEnableAnyApp() { + FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1", true); + FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue2", true); + queueMaxApps.put("root", 2); + queueMaxApps.put("root.queue1", 1); + queueMaxApps.put("root.queue2", 1); + FSSchedulerApp app1 = addApp(leaf1, "user"); + addApp(leaf2, "user"); + addApp(leaf2, "user"); + assertEquals(1, leaf1.getRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getNonRunnableAppSchedulables().size()); + removeApp(app1); + assertEquals(0, leaf1.getRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getNonRunnableAppSchedulables().size()); + } + + @Test + public void testRemoveEnablesAppOnCousinQueue() { + FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); + FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); + queueMaxApps.put("root.queue1", 2); + FSSchedulerApp app1 = addApp(leaf1, "user"); + addApp(leaf2, "user"); + addApp(leaf2, "user"); + assertEquals(1, leaf1.getRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getNonRunnableAppSchedulables().size()); + removeApp(app1); + assertEquals(0, leaf1.getRunnableAppSchedulables().size()); + assertEquals(2, leaf2.getRunnableAppSchedulables().size()); + assertEquals(0, leaf2.getNonRunnableAppSchedulables().size()); + } + + @Test + public void testRemoveEnablesOneByQueueOneByUser() { + FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.leaf1", true); + FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.leaf2", true); + queueMaxApps.put("root.queue1.leaf1", 2); + userMaxApps.put("user1", 1); + FSSchedulerApp app1 = addApp(leaf1, "user1"); + addApp(leaf1, "user2"); + addApp(leaf1, "user3"); + addApp(leaf2, "user1"); + assertEquals(2, leaf1.getRunnableAppSchedulables().size()); + assertEquals(1, leaf1.getNonRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getNonRunnableAppSchedulables().size()); + removeApp(app1); + assertEquals(2, leaf1.getRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getRunnableAppSchedulables().size()); + assertEquals(0, leaf1.getNonRunnableAppSchedulables().size()); + assertEquals(0, leaf2.getNonRunnableAppSchedulables().size()); + } + + @Test + public void testRemoveEnablingOrderedByStartTime() { + FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); + FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); + queueMaxApps.put("root.queue1", 2); + FSSchedulerApp app1 = addApp(leaf1, "user"); + addApp(leaf2, "user"); + addApp(leaf2, "user"); + clock.tick(20); + addApp(leaf1, "user"); + assertEquals(1, leaf1.getRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getRunnableAppSchedulables().size()); + assertEquals(1, leaf1.getNonRunnableAppSchedulables().size()); + assertEquals(1, leaf2.getNonRunnableAppSchedulables().size()); + removeApp(app1); + assertEquals(0, leaf1.getRunnableAppSchedulables().size()); + assertEquals(2, leaf2.getRunnableAppSchedulables().size()); + assertEquals(0, leaf2.getNonRunnableAppSchedulables().size()); + } + +}