From 9afec2ed1721467aef7f2cd025d713273b12a6ca Mon Sep 17 00:00:00 2001 From: Eric E Payne Date: Fri, 11 Sep 2020 13:29:26 +0000 Subject: [PATCH] YARN-10390: LeafQueue: retain user limits cache across assignContainers() calls. Contributed by Samir Khan (samkhan). --- .../scheduler/capacity/CapacityScheduler.java | 3 +- .../scheduler/capacity/LeafQueue.java | 67 ++- .../scheduler/capacity/UsersManager.java | 35 +- .../capacity/TestCapacitySchedulerPerf.java | 243 ++++++-- .../scheduler/capacity/TestLeafQueue.java | 535 ++++++++++++++++++ 5 files changed, 807 insertions(+), 76 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 2c87b33063f..678fd2d6f8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -224,7 +224,8 @@ public class CapacityScheduler extends private boolean usePortForNodeName; private boolean scheduleAsynchronously; - private List asyncSchedulerThreads; + @VisibleForTesting + protected List asyncSchedulerThreads; private ResourceCommitterService resourceCommitterService; private RMNodeLabelsManager labelManager; private AppPriorityACLsManager appPriorityACLManager; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 05150a373ce..8de9e3c301f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; @@ -121,6 +122,16 @@ public class LeafQueue extends AbstractCSQueue { private volatile OrderingPolicy orderingPolicy = null; + // Map>> + // Not thread safe: only the last level is a ConcurrentMap + @VisibleForTesting + Map>> + userLimitsCache = new HashMap<>(); + + // Not thread safe + @VisibleForTesting + long currentUserLimitCacheVersion = 0; + // record all ignore partition exclusivityRMContainer, this will be used to do // preemption, key is the partition of the RMContainer allocated on private Map> ignorePartitionExclusivityRMContainers = @@ -1066,6 +1077,47 @@ public class LeafQueue extends AbstractCSQueue { return null; } + private ConcurrentMap getUserLimitCache( + String partition, + SchedulingMode schedulingMode) { + synchronized (userLimitsCache) { + long latestVersion = usersManager.getLatestVersionOfUsersState(); + + if (latestVersion != this.currentUserLimitCacheVersion) { + // User limits cache needs invalidating + this.currentUserLimitCacheVersion = latestVersion; + userLimitsCache.clear(); + + Map> + uLCByPartition = new HashMap<>(); + userLimitsCache.put(partition, uLCByPartition); + + ConcurrentMap uLCBySchedulingMode = + new ConcurrentHashMap<>(); + uLCByPartition.put(schedulingMode, uLCBySchedulingMode); + + return uLCBySchedulingMode; + } + + // User limits cache does not need invalidating + Map> + uLCByPartition = userLimitsCache.get(partition); + if (uLCByPartition == null) { + uLCByPartition = new HashMap<>(); + userLimitsCache.put(partition, uLCByPartition); + } + + ConcurrentMap uLCBySchedulingMode = + uLCByPartition.get(schedulingMode); + if (uLCBySchedulingMode == null) { + uLCBySchedulingMode = new ConcurrentHashMap<>(); + uLCByPartition.put(schedulingMode, uLCBySchedulingMode); + } + + return uLCBySchedulingMode; + } + } + @Override public CSAssignment assignContainers(Resource clusterResource, CandidateNodeSet candidates, @@ -1112,7 +1164,8 @@ public class LeafQueue extends AbstractCSQueue { return CSAssignment.NULL_ASSIGNMENT; } - Map userLimits = new HashMap<>(); + ConcurrentMap userLimits = + this.getUserLimitCache(candidates.getPartition(), schedulingMode); boolean needAssignToQueueCheck = true; IteratorSelector sel = new IteratorSelector(); sel.setPartition(candidates.getPartition()); @@ -1157,7 +1210,13 @@ public class LeafQueue extends AbstractCSQueue { cachedUserLimit); if (cul == null) { cul = new CachedUserLimit(userLimit); - userLimits.put(application.getUser(), cul); + CachedUserLimit retVal = + userLimits.putIfAbsent(application.getUser(), cul); + if (retVal != null) { + // another thread updated the user limit cache before us + cul = retVal; + userLimit = cul.userLimit; + } } // Check user limit boolean userAssignable = true; @@ -2234,8 +2293,8 @@ public class LeafQueue extends AbstractCSQueue { static class CachedUserLimit { final Resource userLimit; - boolean canAssign = true; - Resource reservation = Resources.none(); + volatile boolean canAssign = true; + volatile Resource reservation = Resources.none(); CachedUserLimit(Resource userLimit) { this.userLimit = userLimit; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index 00b3a77d6fc..a8b0164e42f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -72,7 +71,7 @@ public class UsersManager implements AbstractUsersManager { // To detect whether there is a change in user count for every user-limit // calculation. - private AtomicLong latestVersionOfUsersState = new AtomicLong(0); + private long latestVersionOfUsersState = 0; private Map> localVersionOfActiveUsersState = new HashMap>(); private Map> localVersionOfAllUsersState = @@ -91,8 +90,12 @@ public class UsersManager implements AbstractUsersManager { new HashMap>(); // Pre-computed list of user-limits. - Map> preComputedActiveUserLimit = new ConcurrentHashMap<>(); - Map> preComputedAllUserLimit = new ConcurrentHashMap<>(); + @VisibleForTesting + Map> preComputedActiveUserLimit = + new HashMap<>(); + @VisibleForTesting + Map> preComputedAllUserLimit = + new HashMap<>(); private float activeUsersTimesWeights = 0.0f; private float allUsersTimesWeights = 0.0f; @@ -361,9 +364,9 @@ public class UsersManager implements AbstractUsersManager { writeLock.lock(); try { - long value = latestVersionOfUsersState.incrementAndGet(); + long value = ++latestVersionOfUsersState; if (value < 0) { - latestVersionOfUsersState.set(0); + latestVersionOfUsersState = 0; } } finally { writeLock.unlock(); @@ -581,6 +584,15 @@ public class UsersManager implements AbstractUsersManager { return userSpecificUserLimit; } + protected long getLatestVersionOfUsersState() { + readLock.lock(); + try { + return latestVersionOfUsersState; + } finally { + readLock.unlock(); + } + } + /* * Recompute user-limit under following conditions: 1. cached user-limit does * not exist in local map. 2. Total User count doesn't match with local cached @@ -588,8 +600,13 @@ public class UsersManager implements AbstractUsersManager { */ private boolean isRecomputeNeeded(SchedulingMode schedulingMode, String nodePartition, boolean isActive) { - return (getLocalVersionOfUsersState(nodePartition, schedulingMode, - isActive) != latestVersionOfUsersState.get()); + readLock.lock(); + try { + return (getLocalVersionOfUsersState(nodePartition, schedulingMode, + isActive) != latestVersionOfUsersState); + } finally { + readLock.unlock(); + } } /* @@ -610,7 +627,7 @@ public class UsersManager implements AbstractUsersManager { localVersionOfUsersState.put(nodePartition, localVersion); } - localVersion.put(schedulingMode, latestVersionOfUsersState.get()); + localVersion.put(schedulingMode, latestVersionOfUsersState); } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java index c58cf5480b3..b71fe063927 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java @@ -41,11 +41,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptM import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -57,6 +59,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.PriorityQueue; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES; import static org.junit.Assert.assertEquals; @@ -72,6 +75,29 @@ public class TestCapacitySchedulerPerf { return "resource-" + idx; } + public static class CapacitySchedulerPerf extends CapacityScheduler { + volatile boolean enable = false; + AtomicLong count = new AtomicLong(0); + + public CapacitySchedulerPerf() { + super(); + } + + @Override + CSAssignment allocateContainersToNode( + CandidateNodeSet candidates, + boolean withNodeHeartbeat) { + CSAssignment retVal = super.allocateContainersToNode(candidates, + withNodeHeartbeat); + + if (enable) { + count.incrementAndGet(); + } + + return retVal; + } + } + // This test is run only when when -DRunCapacitySchedulerPerfTests=true is set // on the command line. In addition, this test has tunables for the following: // Number of queues: -DNumberOfQueues (default=100) @@ -88,6 +114,9 @@ public class TestCapacitySchedulerPerf { throws Exception { Assume.assumeTrue(Boolean.valueOf( System.getProperty("RunCapacitySchedulerPerfTests"))); + int numThreads = Integer.valueOf(System.getProperty( + "CapacitySchedulerPerfTestsNumThreads", "0")); + if (numOfResourceTypes > 2) { // Initialize resource map Map riMap = new HashMap<>(); @@ -112,13 +141,30 @@ public class TestCapacitySchedulerPerf { CapacitySchedulerConfiguration csconf = createCSConfWithManyQueues(numQueues); + if (numThreads > 0) { + csconf.setScheduleAynschronously(true); + csconf.setInt( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, + numThreads); + csconf.setLong( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + + ".scheduling-interval-ms", 0); + } YarnConfiguration conf = new YarnConfiguration(csconf); // Don't reset resource types since we have already configured resource // types conf.setBoolean(TEST_CONF_RESET_RESOURCE_TYPES, false); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); + + if (numThreads > 0) { + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacitySchedulerPerf.class, + ResourceScheduler.class); + // avoid getting skipped (see CapacityScheduler.shouldSkipNodeSchedule) + conf.setLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 600000); + } else { + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + } MockRM rm = new MockRM(conf); rm.start(); @@ -189,6 +235,13 @@ public class TestCapacitySchedulerPerf { RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + if (numThreads > 0) { + // disable async scheduling threads + for (CapacityScheduler.AsyncScheduleThread t : cs.asyncSchedulerThreads) { + t.suspendSchedule(); + } + } + FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[totalApps]; for (int i=0;i limit, not used >= limit - cs.handle(new NodeUpdateSchedulerEvent(node)); - cs.handle(new NodeUpdateSchedulerEvent(node2)); + if (numThreads > 0) { + // enable async scheduling threads + for (CapacityScheduler.AsyncScheduleThread t : cs.asyncSchedulerThreads) { + t.beginSchedule(); + } - // make sure only the extra apps have allocated containers - for (int i=0;i limit, not used >= limit + cs.handle(new NodeUpdateSchedulerEvent(node)); + cs.handle(new NodeUpdateSchedulerEvent(node2)); + + // make sure only the extra apps have allocated containers + for (int i=0;i queue = new PriorityQueue<>(topn, - Collections.reverseOrder()); - long n = Time.monotonicNow(); - long timespent = 0; - for (int i = 0; i < iterations; i+=2) { - if (i > 0 && i % printInterval == 0){ - long ts = (Time.monotonicNow() - n); - if (queue.size() < topn) { - queue.offer(ts); - } else { - Long last = queue.peek(); - if (last > ts) { - queue.poll(); + if (numThreads > 0) { + System.out.println("Starting now"); + ((CapacitySchedulerPerf) cs).enable = true; + long start = Time.monotonicNow(); + Thread.sleep(60000); + long end = Time.monotonicNow(); + ((CapacitySchedulerPerf) cs).enable = false; + long numOps = ((CapacitySchedulerPerf) cs).count.get(); + System.out.println("Number of operations: " + numOps); + System.out.println("Time taken: " + (end - start) + " ms"); + System.out.println("" + (numOps * 1000 / (end - start)) + + " ops / second"); + } else { + final int topn = 20; + final int iterations = 2000000; + final int printInterval = 20000; + final float numerator = 1000.0f * printInterval; + PriorityQueue queue = new PriorityQueue<>(topn, + Collections.reverseOrder()); + + long n = Time.monotonicNow(); + long timespent = 0; + for (int i = 0; i < iterations; i+=2) { + if (i > 0 && i % printInterval == 0){ + long ts = (Time.monotonicNow() - n); + if (queue.size() < topn) { queue.offer(ts); + } else { + Long last = queue.peek(); + if (last > ts) { + queue.poll(); + queue.offer(ts); + } } + System.out.println(i + " " + (numerator / ts)); + n = Time.monotonicNow(); } - System.out.println(i + " " + (numerator / ts)); - n= Time.monotonicNow(); + cs.handle(new NodeUpdateSchedulerEvent(node)); + cs.handle(new NodeUpdateSchedulerEvent(node2)); } - cs.handle(new NodeUpdateSchedulerEvent(node)); - cs.handle(new NodeUpdateSchedulerEvent(node2)); + timespent = 0; + int entries = queue.size(); + while (queue.size() > 0) { + long l = queue.poll(); + timespent += l; + } + System.out.println("#ResourceTypes = " + numOfResourceTypes + + ". Avg of fastest " + entries + + ": " + numerator / (timespent / entries) + " ops/sec of " + + appCount + " apps on " + pctActiveQueues + "% of " + numQueues + + " queues."); } - timespent=0; - int entries = queue.size(); - while(queue.size() > 0){ - long l = queue.poll(); - timespent += l; - } - System.out.println( - "#ResourceTypes = " + numOfResourceTypes + ". Avg of fastest " + entries - + ": " + numerator / (timespent / entries) + " ops/sec of " - + appCount + " apps on " + pctActiveQueues + "% of " + numQueues - + " queues."); - // make sure only the extra apps have allocated containers - for (int i=0;i 0) { + // count the number of apps with allocated containers + int numNotPending = 0; + for (int i = 0; i < totalApps; i++) { + boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending(); + if (!pending) { + numNotPending++; + assertEquals(0, + fiCaApps[i].getTotalPendingRequestsPerPartition().size()); + } else { + assertEquals(1*GB, + fiCaApps[i].getTotalPendingRequestsPerPartition() + .get(RMNodeLabelsManager.NO_LABEL).getMemorySize()); + } + } + + // make sure only extra apps have allocated containers + assertEquals(activeQueues, numNotPending); + } else { + // make sure only the extra apps have allocated containers + for (int i = 0; i < totalApps; i++) { + boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending(); + if (i < activeQueues) { + assertFalse(pending); + assertEquals(0, + fiCaApps[i].getTotalPendingRequestsPerPartition().size()); + } else { + assertTrue(pending); + assertEquals(1 * GB, + fiCaApps[i].getTotalPendingRequestsPerPartition() + .get(RMNodeLabelsManager.NO_LABEL).getMemorySize()); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index f664e038f16..3bfb0ab6889 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -28,6 +28,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler .capacity.CapacitySchedulerConfiguration.ROOT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -45,15 +46,18 @@ import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; @@ -900,6 +904,537 @@ public class TestLeafQueue { assertEquals(expectedRatio, b.getUsersManager().getUsageRatio(""), 0.001); } + @Test + public void testUserLimitCache() throws Exception { + // Parameters + final int numNodes = 4; + final int nodeSize = 100; + final int numAllocationThreads = 2; + final int numUsers = 40; + final int containerSize = 1 * GB; + final int numContainersPerApp = 10; + final int runTime = 5000; // in ms + + Random random = new Random(); + + // Setup nodes + FiCaSchedulerNode[] nodes = new FiCaSchedulerNode[numNodes]; + Map nodesMap = new HashMap<>(nodes.length); + for (int i = 0; i < numNodes; i++) { + String host = "127.0.0." + i; + FiCaSchedulerNode node = TestUtils.getMockNode(host, DEFAULT_RACK, 0, + nodeSize * GB, nodeSize); + nodes[i] = node; + nodesMap.put(node.getNodeID(), node); + } + + Resource clusterResource = + Resources.createResource(numNodes * (nodeSize * GB), + numNodes * nodeSize); + + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + when(csContext.getClusterResource()).thenReturn(clusterResource); + + // working with just one queue + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{A}); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 100); + csConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, + 100); + + // reinitialize queues + CSQueueStore newQueues = new CSQueueStore(); + CSQueue newRoot = + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, + newQueues, queues, + TestUtils.spyHook); + queues = newQueues; + root.reinitialize(newRoot, csContext.getClusterResource()); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + + // Mock the queue + LeafQueue leafQueue = stubLeafQueue((LeafQueue) queues.get(A)); + + // Set user limit factor so some users are at their limit and the + // user limit cache has more than just a few entries + leafQueue.setUserLimitFactor(10 / nodeSize); + + // Flag to let allocation threads know to stop + AtomicBoolean stopThreads = new AtomicBoolean(false); + AtomicBoolean errorInThreads = new AtomicBoolean(false); + + // Set up allocation threads + Thread[] threads = new Thread[numAllocationThreads]; + for (int i = 0; i < numAllocationThreads; i++) { + threads[i] = new Thread(new Runnable() { + @Override + public void run() { + try { + boolean alwaysNull = true; + while (!stopThreads.get()) { + CSAssignment assignment = leafQueue.assignContainers( + clusterResource, + nodes[random.nextInt(numNodes)], + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, leafQueue, + nodesMap, leafQueue.applicationAttemptMap); + + if (assignment != CSAssignment.NULL_ASSIGNMENT) { + alwaysNull = false; + } + Thread.sleep(500); + } + + // One more assignment but not committing so that the + // user limits cache is updated to the latest version + CSAssignment assignment = leafQueue.assignContainers( + clusterResource, + nodes[random.nextInt(numNodes)], + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + + if (alwaysNull && assignment == CSAssignment.NULL_ASSIGNMENT) { + LOG.error("Thread only got null assignments"); + errorInThreads.set(true); + } + } catch (Exception e) { + LOG.error("Thread exiting because of exception", e); + errorInThreads.set(true); + } + } + }, "Scheduling Thread " + i); + } + + // Set up users and some apps + final String[] users = new String[numUsers]; + for (int i = 0; i < users.length; i++) { + users[i] = "user_" + i; + } + List applicationAttemptIds = + new ArrayList<>(10); + List apps = new ArrayList<>(10); + Priority priority = TestUtils.createMockPriority(1); + + // Start up 10 apps to begin with + int appId; + for (appId = 0; appId < 10; appId++) { + String user = users[random.nextInt(users.length)]; + ApplicationAttemptId applicationAttemptId = + TestUtils.getMockApplicationAttemptId(appId, 0); + FiCaSchedulerApp app = new FiCaSchedulerApp(applicationAttemptId, + user, + leafQueue, leafQueue.getUsersManager(), spyRMContext); + + leafQueue.submitApplicationAttempt(app, user); + app.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, containerSize, + numContainersPerApp, true, priority, recordFactory))); + + applicationAttemptIds.add(applicationAttemptId); + apps.add(app); + } + + // Start threads + for (int i = 0; i < numAllocationThreads; i++) { + threads[i].start(); + } + + final long startTime = Time.monotonicNow(); + while (true) { + // Start a new app about half the iterations and stop a random app the + // rest of the iterations + boolean startOrStopApp = random.nextBoolean(); + if (startOrStopApp || (apps.size() == 1)) { + // start a new app + String user = users[random.nextInt(users.length)]; + ApplicationAttemptId applicationAttemptId = + TestUtils.getMockApplicationAttemptId(appId, 0); + FiCaSchedulerApp app = new FiCaSchedulerApp(applicationAttemptId, + user, + leafQueue, leafQueue.getUsersManager(), spyRMContext); + + leafQueue.submitApplicationAttempt(app, user); + app.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, containerSize, + numContainersPerApp, true, priority, recordFactory))); + + applicationAttemptIds.add(applicationAttemptId); + apps.add(app); + + appId++; + } else { + // stop a random app + int i = random.nextInt(apps.size()); + FiCaSchedulerApp app = apps.get(i); + leafQueue.finishApplication(app.getApplicationId(), app.getUser()); + leafQueue.releaseResource(clusterResource, app, + app.getCurrentConsumption(), "", null); + apps.remove(i); + applicationAttemptIds.remove(i); + } + + if (errorInThreads.get() || (Time.monotonicNow() - startTime) > runTime) { + break; + } + } + + // signal allocation threads to stop + stopThreads.set(true); + + // wait for allocation threads to be done + for (int i = 0; i < numAllocationThreads; i++) { + threads[i].join(); + } + + // check if there was an error in the allocation threads + assertFalse(errorInThreads.get()); + + // check there is only one partition in the user limits cache + assertEquals( 1, leafQueue.userLimitsCache.size()); + + Map> + uLCByPartition = leafQueue.userLimitsCache.get(nodes[0].getPartition()); + + // check there is only one scheduling mode + assertEquals(uLCByPartition.size(), 1); + + ConcurrentMap uLCBySchedulingMode = + uLCByPartition.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + + // check entries in the user limits cache + for (Map.Entry entry : + uLCBySchedulingMode.entrySet()) { + String user = entry.getKey(); + Resource userLimit = entry.getValue().userLimit; + + Resource expectedUL = leafQueue.getResourceLimitForActiveUsers(user, + clusterResource, nodes[0].getPartition(), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + + assertEquals(expectedUL, userLimit); + } + + // check the current version in the user limits cache + assertEquals(leafQueue.getUsersManager().getLatestVersionOfUsersState(), + leafQueue.currentUserLimitCacheVersion); + assertTrue(leafQueue.currentUserLimitCacheVersion > 0); + } + + @Test + public void testUserLimitCacheActiveUsersChanged() throws Exception { + // Setup some nodes + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = + TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 6*GB); + String host_1 = "127.0.0.2"; + FiCaSchedulerNode node_1 = + TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 6*GB); + String host_2 = "127.0.0.3"; + FiCaSchedulerNode node_2 = + TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, 6*GB); + String host_3 = "127.0.0.4"; + FiCaSchedulerNode node_3 = + TestUtils.getMockNode(host_3, DEFAULT_RACK, 0, 6*GB); + + Map nodes = + ImmutableMap.of( + node_0.getNodeID(), node_0, + node_1.getNodeID(), node_1, + node_2.getNodeID(), node_2, + node_3.getNodeID(), node_3 + ); + + final int numNodes = 4; + Resource clusterResource = + Resources.createResource(numNodes * (6*GB), numNodes); + + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + when(csContext.getClusterResource()).thenReturn(clusterResource); + + // working with just one queue + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A}); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 100); + csConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, + 100); + + // reinitialize queues + CSQueueStore newQueues = new CSQueueStore(); + CSQueue newRoot = + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, + newQueues, queues, + TestUtils.spyHook); + queues = newQueues; + root.reinitialize(newRoot, csContext.getClusterResource()); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + + // Mock the queue + LeafQueue leafQueue = stubLeafQueue((LeafQueue)queues.get(A)); + + // initial check + assertEquals(0, leafQueue.userLimitsCache.size()); + assertEquals(0, + leafQueue.getUsersManager().preComputedAllUserLimit.size()); + assertEquals(0, + leafQueue.getUsersManager().preComputedActiveUserLimit.size()); + + // 4 users + final String user_0 = "user_0"; + final String user_1 = "user_1"; + final String user_2 = "user_2"; + final String user_3 = "user_3"; + + // Set user-limit + leafQueue.setUserLimit(0); + leafQueue.setUserLimitFactor(1.0f); + + Priority priority = TestUtils.createMockPriority(1); + + // Fill queue because user limit is calculated as (used / #active users). + final ApplicationAttemptId appAttemptId_9 = + TestUtils.getMockApplicationAttemptId(9, 0); + FiCaSchedulerApp app_9 = + new FiCaSchedulerApp(appAttemptId_9, user_0, leafQueue, + leafQueue.getUsersManager(), spyRMContext); + leafQueue.submitApplicationAttempt(app_9, user_0); + + Map apps = + ImmutableMap.of(app_9.getApplicationAttemptId(), app_9); + + app_9.updateResourceRequests(Arrays.asList( + TestUtils.createResourceRequest(host_0, 1*GB, 5, true, + priority, recordFactory), + TestUtils.createResourceRequest(DEFAULT_RACK, 1*GB, 5, true, + priority, recordFactory), + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 5, true, + priority, recordFactory))); + assertEquals(1, leafQueue.getUsersManager().getNumActiveUsers()); + + CSAssignment assignment; + for (int i = 0; i < 5; i++) { + assignment = leafQueue.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps); + } + app_9.updateResourceRequests(Arrays.asList( + TestUtils.createResourceRequest(host_1, 1*GB, 5, true, + priority, recordFactory), + TestUtils.createResourceRequest(DEFAULT_RACK, 1*GB, 5, true, + priority, recordFactory), + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 5, true, + priority, recordFactory))); + for (int i = 0; i < 5; i++) { + assignment = leafQueue.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps); + } + // A total of 10GB have been allocated + assertEquals(10*GB, leafQueue.getUsedResources().getMemorySize()); + assertEquals(10*GB, app_9.getCurrentConsumption().getMemorySize()); + // For one user who should have been cached in the assignContainers call + assertEquals(1, leafQueue.userLimitsCache + .get(RMNodeLabelsManager.NO_LABEL) + .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) + .size()); + // But the cache is stale because an allocation was made + assertNotEquals(leafQueue.currentUserLimitCacheVersion, + leafQueue.getUsersManager().getLatestVersionOfUsersState()); + // Have not made any calls to fill up the all user limit in UsersManager + assertEquals(0, + leafQueue.getUsersManager().preComputedAllUserLimit.size()); + // But the user limit cache in leafQueue got filled up using the active + // user limit in UsersManager + assertEquals(1, + leafQueue.getUsersManager().preComputedActiveUserLimit.size()); + + // submit 3 applications for now + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, leafQueue, + leafQueue.getUsersManager(), spyRMContext); + leafQueue.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_1, leafQueue, + leafQueue.getUsersManager(), spyRMContext); + leafQueue.submitApplicationAttempt(app_1, user_1); + + final ApplicationAttemptId appAttemptId_2 = + TestUtils.getMockApplicationAttemptId(2, 0); + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_2, leafQueue, + leafQueue.getUsersManager(), spyRMContext); + leafQueue.submitApplicationAttempt(app_2, user_2); + + apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, + app_1.getApplicationAttemptId(), app_1, + app_2.getApplicationAttemptId(), app_2 + ); + + // requests from first three users (all of which will be locality delayed) + app_0.updateResourceRequests(Arrays.asList( + TestUtils.createResourceRequest(host_0, 4*GB, 1, true, + priority, recordFactory), + TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true, + priority, recordFactory), + TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true, + priority, recordFactory))); + + app_1.updateResourceRequests(Arrays.asList( + TestUtils.createResourceRequest(host_0, 4*GB, 1, true, + priority, recordFactory), + TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true, + priority, recordFactory), + TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true, + priority, recordFactory))); + + app_2.updateResourceRequests(Arrays.asList( + TestUtils.createResourceRequest(host_0, 4*GB, 1, true, + priority, recordFactory), + TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true, + priority, recordFactory), + TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true, + priority, recordFactory))); + + // There are 3 active users right now + assertEquals(3, leafQueue.getUsersManager().getNumActiveUsers()); + + // fill up user limit cache + assignment = leafQueue.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps); + // A total of 10GB have been allocated + assertEquals(10*GB, leafQueue.getUsedResources().getMemorySize()); + assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); + assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize()); + assertEquals(10*GB, app_9.getCurrentConsumption().getMemorySize()); + // There are three users who should have been cached + assertEquals(3, leafQueue.userLimitsCache + .get(RMNodeLabelsManager.NO_LABEL) + .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) + .size()); + // There are three users so each has a limit of 12/3 = 4GB + assertEquals(4*GB, leafQueue.userLimitsCache + .get(RMNodeLabelsManager.NO_LABEL) + .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) + .get(user_0).userLimit.getMemorySize()); + assertEquals(4*GB, leafQueue.userLimitsCache + .get(RMNodeLabelsManager.NO_LABEL) + .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) + .get(user_1).userLimit.getMemorySize()); + assertEquals(4*GB, leafQueue.userLimitsCache + .get(RMNodeLabelsManager.NO_LABEL) + .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) + .get(user_2).userLimit.getMemorySize()); + // And the cache is NOT stale because no allocation was made + assertEquals(leafQueue.currentUserLimitCacheVersion, + leafQueue.getUsersManager().getLatestVersionOfUsersState()); + // Have not made any calls to fill up the all user limit in UsersManager + assertEquals(0, + leafQueue.getUsersManager().preComputedAllUserLimit.size()); + // But the user limit cache in leafQueue got filled up using the active + // user limit in UsersManager with 4GB limit (since there are three users + // so 12/3 = 4GB each) + assertEquals(1, leafQueue.getUsersManager() + .preComputedActiveUserLimit.size()); + assertEquals(1, leafQueue.getUsersManager() + .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL).size()); + assertEquals(4*GB, leafQueue.getUsersManager() + .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL) + .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY).getMemorySize()); + + // submit the 4th application + final ApplicationAttemptId appAttemptId_3 = + TestUtils.getMockApplicationAttemptId(3, 0); + FiCaSchedulerApp app_3 = + new FiCaSchedulerApp(appAttemptId_3, user_3, leafQueue, + leafQueue.getUsersManager(), spyRMContext); + leafQueue.submitApplicationAttempt(app_3, user_3); + + apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, + app_1.getApplicationAttemptId(), app_1, + app_2.getApplicationAttemptId(), app_2, + app_3.getApplicationAttemptId(), app_3 + ); + + app_3.updateResourceRequests(Arrays.asList( + TestUtils.createResourceRequest(host_0, 4*GB, 1, true, + priority, recordFactory), + TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true, + priority, recordFactory), + TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true, + priority, recordFactory))); + + // 4 active users now + assertEquals(4, leafQueue.getUsersManager().getNumActiveUsers()); + // Check that the user limits cache has become stale + assertNotEquals(leafQueue.currentUserLimitCacheVersion, + leafQueue.getUsersManager().getLatestVersionOfUsersState()); + + // Even though there are no allocations, user limit cache is repopulated + assignment = leafQueue.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps); + // A total of 10GB have been allocated + assertEquals(10*GB, leafQueue.getUsedResources().getMemorySize()); + assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); + assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize()); + assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize()); + assertEquals(10*GB, app_9.getCurrentConsumption().getMemorySize()); + // There are four users who should have been cached + assertEquals(4, leafQueue.userLimitsCache + .get(RMNodeLabelsManager.NO_LABEL) + .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) + .size()); + // There are four users so each has a limit of 12/4 = 3GB + assertEquals(3*GB, leafQueue.userLimitsCache + .get(RMNodeLabelsManager.NO_LABEL) + .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) + .get(user_0).userLimit.getMemorySize()); + assertEquals(3*GB, leafQueue.userLimitsCache + .get(RMNodeLabelsManager.NO_LABEL) + .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) + .get(user_1).userLimit.getMemorySize()); + assertEquals(3*GB, leafQueue.userLimitsCache + .get(RMNodeLabelsManager.NO_LABEL) + .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) + .get(user_2).userLimit.getMemorySize()); + assertEquals(3*GB, leafQueue.userLimitsCache + .get(RMNodeLabelsManager.NO_LABEL) + .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) + .get(user_3).userLimit.getMemorySize()); + // And the cache is NOT stale because no allocation was made + assertEquals(leafQueue.currentUserLimitCacheVersion, + leafQueue.getUsersManager().getLatestVersionOfUsersState()); + // Have not made any calls to fill up the all user limit in UsersManager + assertEquals(0, + leafQueue.getUsersManager().preComputedAllUserLimit.size()); + // But the user limit cache in leafQueue got filled up using the active + // user limit in UsersManager with 3GB limit (since there are four users + // so 12/4 = 3GB each) + assertEquals(1, leafQueue.getUsersManager() + .preComputedActiveUserLimit.size()); + assertEquals(1, leafQueue.getUsersManager() + .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL).size()); + assertEquals(3*GB, leafQueue.getUsersManager() + .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL) + .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY).getMemorySize()); + } + @Test public void testUserLimits() throws Exception { // Mock the queue