YARN-10390: LeafQueue: retain user limits cache across assignContainers() calls. Contributed by Samir Khan (samkhan).

(cherry picked from commit 9afec2ed17)
This commit is contained in:
Eric E Payne 2020-09-11 13:29:26 +00:00
parent a2bc0dcd21
commit 87ff2f5597
5 changed files with 807 additions and 76 deletions

View File

@ -224,7 +224,8 @@ public class CapacityScheduler extends
private boolean usePortForNodeName;
private boolean scheduleAsynchronously;
private List<AsyncScheduleThread> asyncSchedulerThreads;
@VisibleForTesting
protected List<AsyncScheduleThread> asyncSchedulerThreads;
private ResourceCommitterService resourceCommitterService;
private RMNodeLabelsManager labelManager;
private AppPriorityACLsManager appPriorityACLManager;

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
@ -121,6 +122,16 @@ public class LeafQueue extends AbstractCSQueue {
private volatile OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
// Map<Partition, Map<SchedulingMode, Map<User, CachedUserLimit>>>
// Not thread safe: only the last level is a ConcurrentMap
@VisibleForTesting
Map<String, Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>>
userLimitsCache = new HashMap<>();
// Not thread safe
@VisibleForTesting
long currentUserLimitCacheVersion = 0;
// record all ignore partition exclusivityRMContainer, this will be used to do
// preemption, key is the partition of the RMContainer allocated on
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
@ -1066,6 +1077,47 @@ public class LeafQueue extends AbstractCSQueue {
return null;
}
private ConcurrentMap<String, CachedUserLimit> getUserLimitCache(
String partition,
SchedulingMode schedulingMode) {
synchronized (userLimitsCache) {
long latestVersion = usersManager.getLatestVersionOfUsersState();
if (latestVersion != this.currentUserLimitCacheVersion) {
// User limits cache needs invalidating
this.currentUserLimitCacheVersion = latestVersion;
userLimitsCache.clear();
Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>
uLCByPartition = new HashMap<>();
userLimitsCache.put(partition, uLCByPartition);
ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode =
new ConcurrentHashMap<>();
uLCByPartition.put(schedulingMode, uLCBySchedulingMode);
return uLCBySchedulingMode;
}
// User limits cache does not need invalidating
Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>
uLCByPartition = userLimitsCache.get(partition);
if (uLCByPartition == null) {
uLCByPartition = new HashMap<>();
userLimitsCache.put(partition, uLCByPartition);
}
ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode =
uLCByPartition.get(schedulingMode);
if (uLCBySchedulingMode == null) {
uLCBySchedulingMode = new ConcurrentHashMap<>();
uLCByPartition.put(schedulingMode, uLCBySchedulingMode);
}
return uLCBySchedulingMode;
}
}
@Override
public CSAssignment assignContainers(Resource clusterResource,
CandidateNodeSet<FiCaSchedulerNode> candidates,
@ -1112,7 +1164,8 @@ public class LeafQueue extends AbstractCSQueue {
return CSAssignment.NULL_ASSIGNMENT;
}
Map<String, CachedUserLimit> userLimits = new HashMap<>();
ConcurrentMap<String, CachedUserLimit> userLimits =
this.getUserLimitCache(candidates.getPartition(), schedulingMode);
boolean needAssignToQueueCheck = true;
IteratorSelector sel = new IteratorSelector();
sel.setPartition(candidates.getPartition());
@ -1157,7 +1210,13 @@ public class LeafQueue extends AbstractCSQueue {
cachedUserLimit);
if (cul == null) {
cul = new CachedUserLimit(userLimit);
userLimits.put(application.getUser(), cul);
CachedUserLimit retVal =
userLimits.putIfAbsent(application.getUser(), cul);
if (retVal != null) {
// another thread updated the user limit cache before us
cul = retVal;
userLimit = cul.userLimit;
}
}
// Check user limit
boolean userAssignable = true;
@ -2234,8 +2293,8 @@ public class LeafQueue extends AbstractCSQueue {
static class CachedUserLimit {
final Resource userLimit;
boolean canAssign = true;
Resource reservation = Resources.none();
volatile boolean canAssign = true;
volatile Resource reservation = Resources.none();
CachedUserLimit(Resource userLimit) {
this.userLimit = userLimit;

View File

@ -24,7 +24,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@ -72,7 +71,7 @@ public class UsersManager implements AbstractUsersManager {
// To detect whether there is a change in user count for every user-limit
// calculation.
private AtomicLong latestVersionOfUsersState = new AtomicLong(0);
private long latestVersionOfUsersState = 0;
private Map<String, Map<SchedulingMode, Long>> localVersionOfActiveUsersState =
new HashMap<String, Map<SchedulingMode, Long>>();
private Map<String, Map<SchedulingMode, Long>> localVersionOfAllUsersState =
@ -91,8 +90,12 @@ public class UsersManager implements AbstractUsersManager {
new HashMap<String, Set<ApplicationId>>();
// Pre-computed list of user-limits.
Map<String, Map<SchedulingMode, Resource>> preComputedActiveUserLimit = new ConcurrentHashMap<>();
Map<String, Map<SchedulingMode, Resource>> preComputedAllUserLimit = new ConcurrentHashMap<>();
@VisibleForTesting
Map<String, Map<SchedulingMode, Resource>> preComputedActiveUserLimit =
new HashMap<>();
@VisibleForTesting
Map<String, Map<SchedulingMode, Resource>> preComputedAllUserLimit =
new HashMap<>();
private float activeUsersTimesWeights = 0.0f;
private float allUsersTimesWeights = 0.0f;
@ -361,9 +364,9 @@ public class UsersManager implements AbstractUsersManager {
writeLock.lock();
try {
long value = latestVersionOfUsersState.incrementAndGet();
long value = ++latestVersionOfUsersState;
if (value < 0) {
latestVersionOfUsersState.set(0);
latestVersionOfUsersState = 0;
}
} finally {
writeLock.unlock();
@ -581,6 +584,15 @@ public class UsersManager implements AbstractUsersManager {
return userSpecificUserLimit;
}
protected long getLatestVersionOfUsersState() {
readLock.lock();
try {
return latestVersionOfUsersState;
} finally {
readLock.unlock();
}
}
/*
* Recompute user-limit under following conditions: 1. cached user-limit does
* not exist in local map. 2. Total User count doesn't match with local cached
@ -588,8 +600,13 @@ public class UsersManager implements AbstractUsersManager {
*/
private boolean isRecomputeNeeded(SchedulingMode schedulingMode,
String nodePartition, boolean isActive) {
return (getLocalVersionOfUsersState(nodePartition, schedulingMode,
isActive) != latestVersionOfUsersState.get());
readLock.lock();
try {
return (getLocalVersionOfUsersState(nodePartition, schedulingMode,
isActive) != latestVersionOfUsersState);
} finally {
readLock.unlock();
}
}
/*
@ -610,7 +627,7 @@ public class UsersManager implements AbstractUsersManager {
localVersionOfUsersState.put(nodePartition, localVersion);
}
localVersion.put(schedulingMode, latestVersionOfUsersState.get());
localVersion.put(schedulingMode, latestVersionOfUsersState);
} finally {
writeLock.unlock();
}

View File

@ -41,11 +41,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptM
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@ -57,6 +59,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES;
import static org.junit.Assert.assertEquals;
@ -72,6 +75,29 @@ public class TestCapacitySchedulerPerf {
return "resource-" + idx;
}
public static class CapacitySchedulerPerf extends CapacityScheduler {
volatile boolean enable = false;
AtomicLong count = new AtomicLong(0);
public CapacitySchedulerPerf() {
super();
}
@Override
CSAssignment allocateContainersToNode(
CandidateNodeSet<FiCaSchedulerNode> candidates,
boolean withNodeHeartbeat) {
CSAssignment retVal = super.allocateContainersToNode(candidates,
withNodeHeartbeat);
if (enable) {
count.incrementAndGet();
}
return retVal;
}
}
// This test is run only when when -DRunCapacitySchedulerPerfTests=true is set
// on the command line. In addition, this test has tunables for the following:
// Number of queues: -DNumberOfQueues (default=100)
@ -88,6 +114,9 @@ public class TestCapacitySchedulerPerf {
throws Exception {
Assume.assumeTrue(Boolean.valueOf(
System.getProperty("RunCapacitySchedulerPerfTests")));
int numThreads = Integer.valueOf(System.getProperty(
"CapacitySchedulerPerfTestsNumThreads", "0"));
if (numOfResourceTypes > 2) {
// Initialize resource map
Map<String, ResourceInformation> riMap = new HashMap<>();
@ -112,13 +141,30 @@ public class TestCapacitySchedulerPerf {
CapacitySchedulerConfiguration csconf =
createCSConfWithManyQueues(numQueues);
if (numThreads > 0) {
csconf.setScheduleAynschronously(true);
csconf.setInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
numThreads);
csconf.setLong(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ ".scheduling-interval-ms", 0);
}
YarnConfiguration conf = new YarnConfiguration(csconf);
// Don't reset resource types since we have already configured resource
// types
conf.setBoolean(TEST_CONF_RESET_RESOURCE_TYPES, false);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
if (numThreads > 0) {
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacitySchedulerPerf.class,
ResourceScheduler.class);
// avoid getting skipped (see CapacityScheduler.shouldSkipNodeSchedule)
conf.setLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 600000);
} else {
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
}
MockRM rm = new MockRM(conf);
rm.start();
@ -189,6 +235,13 @@ public class TestCapacitySchedulerPerf {
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
if (numThreads > 0) {
// disable async scheduling threads
for (CapacityScheduler.AsyncScheduleThread t : cs.asyncSchedulerThreads) {
t.suspendSchedule();
}
}
FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[totalApps];
for (int i=0;i<totalApps;i++) {
fiCaApps[i] =
@ -213,79 +266,145 @@ public class TestCapacitySchedulerPerf {
lqs[i].setUserLimitFactor((float)0.0);
}
// allocate one container for each extra apps since
// LeafQueue.canAssignToUser() checks for used > limit, not used >= limit
cs.handle(new NodeUpdateSchedulerEvent(node));
cs.handle(new NodeUpdateSchedulerEvent(node2));
if (numThreads > 0) {
// enable async scheduling threads
for (CapacityScheduler.AsyncScheduleThread t : cs.asyncSchedulerThreads) {
t.beginSchedule();
}
// make sure only the extra apps have allocated containers
for (int i=0;i<totalApps;i++) {
boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
if (i < activeQueues) {
assertFalse(pending);
assertEquals(0,
fiCaApps[i].getTotalPendingRequestsPerPartition().size());
} else {
assertTrue(pending);
assertEquals(1*GB,
fiCaApps[i].getTotalPendingRequestsPerPartition()
.get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
// let the threads allocate resources for extra apps
while (CapacitySchedulerMetrics.getMetrics().commitSuccess.lastStat()
.numSamples() < activeQueues) {
Thread.sleep(1000);
}
// count the number of apps with allocated containers
int numNotPending = 0;
for (int i = 0; i < totalApps; i++) {
boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
if (!pending) {
numNotPending++;
assertEquals(0,
fiCaApps[i].getTotalPendingRequestsPerPartition().size());
} else {
assertEquals(1*GB,
fiCaApps[i].getTotalPendingRequestsPerPartition()
.get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
}
}
// make sure only extra apps have allocated containers
assertEquals(activeQueues, numNotPending);
} else {
// allocate one container for each extra apps since
// LeafQueue.canAssignToUser() checks for used > limit, not used >= limit
cs.handle(new NodeUpdateSchedulerEvent(node));
cs.handle(new NodeUpdateSchedulerEvent(node2));
// make sure only the extra apps have allocated containers
for (int i=0;i<totalApps;i++) {
boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
if (i < activeQueues) {
assertFalse(pending);
assertEquals(0,
fiCaApps[i].getTotalPendingRequestsPerPartition().size());
} else {
assertTrue(pending);
assertEquals(1*GB,
fiCaApps[i].getTotalPendingRequestsPerPartition()
.get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
}
}
}
// Quiet the loggers while measuring throughput
GenericTestUtils.setRootLogLevel(Level.WARN);
final int topn = 20;
final int iterations = 2000000;
final int printInterval = 20000;
final float numerator = 1000.0f * printInterval;
PriorityQueue<Long> queue = new PriorityQueue<>(topn,
Collections.reverseOrder());
long n = Time.monotonicNow();
long timespent = 0;
for (int i = 0; i < iterations; i+=2) {
if (i > 0 && i % printInterval == 0){
long ts = (Time.monotonicNow() - n);
if (queue.size() < topn) {
queue.offer(ts);
} else {
Long last = queue.peek();
if (last > ts) {
queue.poll();
if (numThreads > 0) {
System.out.println("Starting now");
((CapacitySchedulerPerf) cs).enable = true;
long start = Time.monotonicNow();
Thread.sleep(60000);
long end = Time.monotonicNow();
((CapacitySchedulerPerf) cs).enable = false;
long numOps = ((CapacitySchedulerPerf) cs).count.get();
System.out.println("Number of operations: " + numOps);
System.out.println("Time taken: " + (end - start) + " ms");
System.out.println("" + (numOps * 1000 / (end - start))
+ " ops / second");
} else {
final int topn = 20;
final int iterations = 2000000;
final int printInterval = 20000;
final float numerator = 1000.0f * printInterval;
PriorityQueue<Long> queue = new PriorityQueue<>(topn,
Collections.reverseOrder());
long n = Time.monotonicNow();
long timespent = 0;
for (int i = 0; i < iterations; i+=2) {
if (i > 0 && i % printInterval == 0){
long ts = (Time.monotonicNow() - n);
if (queue.size() < topn) {
queue.offer(ts);
} else {
Long last = queue.peek();
if (last > ts) {
queue.poll();
queue.offer(ts);
}
}
System.out.println(i + " " + (numerator / ts));
n = Time.monotonicNow();
}
System.out.println(i + " " + (numerator / ts));
n= Time.monotonicNow();
cs.handle(new NodeUpdateSchedulerEvent(node));
cs.handle(new NodeUpdateSchedulerEvent(node2));
}
cs.handle(new NodeUpdateSchedulerEvent(node));
cs.handle(new NodeUpdateSchedulerEvent(node2));
timespent = 0;
int entries = queue.size();
while (queue.size() > 0) {
long l = queue.poll();
timespent += l;
}
System.out.println("#ResourceTypes = " + numOfResourceTypes
+ ". Avg of fastest " + entries
+ ": " + numerator / (timespent / entries) + " ops/sec of "
+ appCount + " apps on " + pctActiveQueues + "% of " + numQueues
+ " queues.");
}
timespent=0;
int entries = queue.size();
while(queue.size() > 0){
long l = queue.poll();
timespent += l;
}
System.out.println(
"#ResourceTypes = " + numOfResourceTypes + ". Avg of fastest " + entries
+ ": " + numerator / (timespent / entries) + " ops/sec of "
+ appCount + " apps on " + pctActiveQueues + "% of " + numQueues
+ " queues.");
// make sure only the extra apps have allocated containers
for (int i=0;i<totalApps;i++) {
boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
if (i < activeQueues) {
assertFalse(pending);
assertEquals(0,
fiCaApps[i].getTotalPendingRequestsPerPartition().size());
} else {
assertTrue(pending);
assertEquals(1*GB,
fiCaApps[i].getTotalPendingRequestsPerPartition()
.get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
if (numThreads > 0) {
// count the number of apps with allocated containers
int numNotPending = 0;
for (int i = 0; i < totalApps; i++) {
boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
if (!pending) {
numNotPending++;
assertEquals(0,
fiCaApps[i].getTotalPendingRequestsPerPartition().size());
} else {
assertEquals(1*GB,
fiCaApps[i].getTotalPendingRequestsPerPartition()
.get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
}
}
// make sure only extra apps have allocated containers
assertEquals(activeQueues, numNotPending);
} else {
// make sure only the extra apps have allocated containers
for (int i = 0; i < totalApps; i++) {
boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
if (i < activeQueues) {
assertFalse(pending);
assertEquals(0,
fiCaApps[i].getTotalPendingRequestsPerPartition().size());
} else {
assertTrue(pending);
assertEquals(1 * GB,
fiCaApps[i].getTotalPendingRequestsPerPartition()
.get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
}
}
}

View File

@ -28,6 +28,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.capacity.CapacitySchedulerConfiguration.ROOT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
@ -45,15 +46,18 @@ import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@ -900,6 +904,537 @@ public class TestLeafQueue {
assertEquals(expectedRatio, b.getUsersManager().getUsageRatio(""), 0.001);
}
@Test
public void testUserLimitCache() throws Exception {
// Parameters
final int numNodes = 4;
final int nodeSize = 100;
final int numAllocationThreads = 2;
final int numUsers = 40;
final int containerSize = 1 * GB;
final int numContainersPerApp = 10;
final int runTime = 5000; // in ms
Random random = new Random();
// Setup nodes
FiCaSchedulerNode[] nodes = new FiCaSchedulerNode[numNodes];
Map<NodeId, FiCaSchedulerNode> nodesMap = new HashMap<>(nodes.length);
for (int i = 0; i < numNodes; i++) {
String host = "127.0.0." + i;
FiCaSchedulerNode node = TestUtils.getMockNode(host, DEFAULT_RACK, 0,
nodeSize * GB, nodeSize);
nodes[i] = node;
nodesMap.put(node.getNodeID(), node);
}
Resource clusterResource =
Resources.createResource(numNodes * (nodeSize * GB),
numNodes * nodeSize);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
when(csContext.getClusterResource()).thenReturn(clusterResource);
// working with just one queue
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{A});
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 100);
csConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + "." + A,
100);
// reinitialize queues
CSQueueStore newQueues = new CSQueueStore();
CSQueue newRoot =
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT,
newQueues, queues,
TestUtils.spyHook);
queues = newQueues;
root.reinitialize(newRoot, csContext.getClusterResource());
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Mock the queue
LeafQueue leafQueue = stubLeafQueue((LeafQueue) queues.get(A));
// Set user limit factor so some users are at their limit and the
// user limit cache has more than just a few entries
leafQueue.setUserLimitFactor(10 / nodeSize);
// Flag to let allocation threads know to stop
AtomicBoolean stopThreads = new AtomicBoolean(false);
AtomicBoolean errorInThreads = new AtomicBoolean(false);
// Set up allocation threads
Thread[] threads = new Thread[numAllocationThreads];
for (int i = 0; i < numAllocationThreads; i++) {
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
boolean alwaysNull = true;
while (!stopThreads.get()) {
CSAssignment assignment = leafQueue.assignContainers(
clusterResource,
nodes[random.nextInt(numNodes)],
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
applyCSAssignment(clusterResource, assignment, leafQueue,
nodesMap, leafQueue.applicationAttemptMap);
if (assignment != CSAssignment.NULL_ASSIGNMENT) {
alwaysNull = false;
}
Thread.sleep(500);
}
// One more assignment but not committing so that the
// user limits cache is updated to the latest version
CSAssignment assignment = leafQueue.assignContainers(
clusterResource,
nodes[random.nextInt(numNodes)],
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
if (alwaysNull && assignment == CSAssignment.NULL_ASSIGNMENT) {
LOG.error("Thread only got null assignments");
errorInThreads.set(true);
}
} catch (Exception e) {
LOG.error("Thread exiting because of exception", e);
errorInThreads.set(true);
}
}
}, "Scheduling Thread " + i);
}
// Set up users and some apps
final String[] users = new String[numUsers];
for (int i = 0; i < users.length; i++) {
users[i] = "user_" + i;
}
List<ApplicationAttemptId> applicationAttemptIds =
new ArrayList<>(10);
List<FiCaSchedulerApp> apps = new ArrayList<>(10);
Priority priority = TestUtils.createMockPriority(1);
// Start up 10 apps to begin with
int appId;
for (appId = 0; appId < 10; appId++) {
String user = users[random.nextInt(users.length)];
ApplicationAttemptId applicationAttemptId =
TestUtils.getMockApplicationAttemptId(appId, 0);
FiCaSchedulerApp app = new FiCaSchedulerApp(applicationAttemptId,
user,
leafQueue, leafQueue.getUsersManager(), spyRMContext);
leafQueue.submitApplicationAttempt(app, user);
app.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, containerSize,
numContainersPerApp, true, priority, recordFactory)));
applicationAttemptIds.add(applicationAttemptId);
apps.add(app);
}
// Start threads
for (int i = 0; i < numAllocationThreads; i++) {
threads[i].start();
}
final long startTime = Time.monotonicNow();
while (true) {
// Start a new app about half the iterations and stop a random app the
// rest of the iterations
boolean startOrStopApp = random.nextBoolean();
if (startOrStopApp || (apps.size() == 1)) {
// start a new app
String user = users[random.nextInt(users.length)];
ApplicationAttemptId applicationAttemptId =
TestUtils.getMockApplicationAttemptId(appId, 0);
FiCaSchedulerApp app = new FiCaSchedulerApp(applicationAttemptId,
user,
leafQueue, leafQueue.getUsersManager(), spyRMContext);
leafQueue.submitApplicationAttempt(app, user);
app.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, containerSize,
numContainersPerApp, true, priority, recordFactory)));
applicationAttemptIds.add(applicationAttemptId);
apps.add(app);
appId++;
} else {
// stop a random app
int i = random.nextInt(apps.size());
FiCaSchedulerApp app = apps.get(i);
leafQueue.finishApplication(app.getApplicationId(), app.getUser());
leafQueue.releaseResource(clusterResource, app,
app.getCurrentConsumption(), "", null);
apps.remove(i);
applicationAttemptIds.remove(i);
}
if (errorInThreads.get() || (Time.monotonicNow() - startTime) > runTime) {
break;
}
}
// signal allocation threads to stop
stopThreads.set(true);
// wait for allocation threads to be done
for (int i = 0; i < numAllocationThreads; i++) {
threads[i].join();
}
// check if there was an error in the allocation threads
assertFalse(errorInThreads.get());
// check there is only one partition in the user limits cache
assertEquals( 1, leafQueue.userLimitsCache.size());
Map<SchedulingMode, ConcurrentMap<String, LeafQueue.CachedUserLimit>>
uLCByPartition = leafQueue.userLimitsCache.get(nodes[0].getPartition());
// check there is only one scheduling mode
assertEquals(uLCByPartition.size(), 1);
ConcurrentMap<String, LeafQueue.CachedUserLimit> uLCBySchedulingMode =
uLCByPartition.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
// check entries in the user limits cache
for (Map.Entry<String, LeafQueue.CachedUserLimit> entry :
uLCBySchedulingMode.entrySet()) {
String user = entry.getKey();
Resource userLimit = entry.getValue().userLimit;
Resource expectedUL = leafQueue.getResourceLimitForActiveUsers(user,
clusterResource, nodes[0].getPartition(),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(expectedUL, userLimit);
}
// check the current version in the user limits cache
assertEquals(leafQueue.getUsersManager().getLatestVersionOfUsersState(),
leafQueue.currentUserLimitCacheVersion);
assertTrue(leafQueue.currentUserLimitCacheVersion > 0);
}
@Test
public void testUserLimitCacheActiveUsersChanged() throws Exception {
// Setup some nodes
String host_0 = "127.0.0.1";
FiCaSchedulerNode node_0 =
TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 6*GB);
String host_1 = "127.0.0.2";
FiCaSchedulerNode node_1 =
TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 6*GB);
String host_2 = "127.0.0.3";
FiCaSchedulerNode node_2 =
TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, 6*GB);
String host_3 = "127.0.0.4";
FiCaSchedulerNode node_3 =
TestUtils.getMockNode(host_3, DEFAULT_RACK, 0, 6*GB);
Map<NodeId, FiCaSchedulerNode> nodes =
ImmutableMap.of(
node_0.getNodeID(), node_0,
node_1.getNodeID(), node_1,
node_2.getNodeID(), node_2,
node_3.getNodeID(), node_3
);
final int numNodes = 4;
Resource clusterResource =
Resources.createResource(numNodes * (6*GB), numNodes);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
when(csContext.getClusterResource()).thenReturn(clusterResource);
// working with just one queue
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A});
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 100);
csConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + "." + A,
100);
// reinitialize queues
CSQueueStore newQueues = new CSQueueStore();
CSQueue newRoot =
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT,
newQueues, queues,
TestUtils.spyHook);
queues = newQueues;
root.reinitialize(newRoot, csContext.getClusterResource());
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Mock the queue
LeafQueue leafQueue = stubLeafQueue((LeafQueue)queues.get(A));
// initial check
assertEquals(0, leafQueue.userLimitsCache.size());
assertEquals(0,
leafQueue.getUsersManager().preComputedAllUserLimit.size());
assertEquals(0,
leafQueue.getUsersManager().preComputedActiveUserLimit.size());
// 4 users
final String user_0 = "user_0";
final String user_1 = "user_1";
final String user_2 = "user_2";
final String user_3 = "user_3";
// Set user-limit
leafQueue.setUserLimit(0);
leafQueue.setUserLimitFactor(1.0f);
Priority priority = TestUtils.createMockPriority(1);
// Fill queue because user limit is calculated as (used / #active users).
final ApplicationAttemptId appAttemptId_9 =
TestUtils.getMockApplicationAttemptId(9, 0);
FiCaSchedulerApp app_9 =
new FiCaSchedulerApp(appAttemptId_9, user_0, leafQueue,
leafQueue.getUsersManager(), spyRMContext);
leafQueue.submitApplicationAttempt(app_9, user_0);
Map<ApplicationAttemptId, FiCaSchedulerApp> apps =
ImmutableMap.of(app_9.getApplicationAttemptId(), app_9);
app_9.updateResourceRequests(Arrays.asList(
TestUtils.createResourceRequest(host_0, 1*GB, 5, true,
priority, recordFactory),
TestUtils.createResourceRequest(DEFAULT_RACK, 1*GB, 5, true,
priority, recordFactory),
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 5, true,
priority, recordFactory)));
assertEquals(1, leafQueue.getUsersManager().getNumActiveUsers());
CSAssignment assignment;
for (int i = 0; i < 5; i++) {
assignment = leafQueue.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
}
app_9.updateResourceRequests(Arrays.asList(
TestUtils.createResourceRequest(host_1, 1*GB, 5, true,
priority, recordFactory),
TestUtils.createResourceRequest(DEFAULT_RACK, 1*GB, 5, true,
priority, recordFactory),
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 5, true,
priority, recordFactory)));
for (int i = 0; i < 5; i++) {
assignment = leafQueue.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
}
// A total of 10GB have been allocated
assertEquals(10*GB, leafQueue.getUsedResources().getMemorySize());
assertEquals(10*GB, app_9.getCurrentConsumption().getMemorySize());
// For one user who should have been cached in the assignContainers call
assertEquals(1, leafQueue.userLimitsCache
.get(RMNodeLabelsManager.NO_LABEL)
.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
.size());
// But the cache is stale because an allocation was made
assertNotEquals(leafQueue.currentUserLimitCacheVersion,
leafQueue.getUsersManager().getLatestVersionOfUsersState());
// Have not made any calls to fill up the all user limit in UsersManager
assertEquals(0,
leafQueue.getUsersManager().preComputedAllUserLimit.size());
// But the user limit cache in leafQueue got filled up using the active
// user limit in UsersManager
assertEquals(1,
leafQueue.getUsersManager().preComputedActiveUserLimit.size());
// submit 3 applications for now
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, leafQueue,
leafQueue.getUsersManager(), spyRMContext);
leafQueue.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_1, leafQueue,
leafQueue.getUsersManager(), spyRMContext);
leafQueue.submitApplicationAttempt(app_1, user_1);
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_2, leafQueue,
leafQueue.getUsersManager(), spyRMContext);
leafQueue.submitApplicationAttempt(app_2, user_2);
apps = ImmutableMap.of(
app_0.getApplicationAttemptId(), app_0,
app_1.getApplicationAttemptId(), app_1,
app_2.getApplicationAttemptId(), app_2
);
// requests from first three users (all of which will be locality delayed)
app_0.updateResourceRequests(Arrays.asList(
TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
priority, recordFactory),
TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
priority, recordFactory),
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
priority, recordFactory)));
app_1.updateResourceRequests(Arrays.asList(
TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
priority, recordFactory),
TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
priority, recordFactory),
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
priority, recordFactory)));
app_2.updateResourceRequests(Arrays.asList(
TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
priority, recordFactory),
TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
priority, recordFactory),
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
priority, recordFactory)));
// There are 3 active users right now
assertEquals(3, leafQueue.getUsersManager().getNumActiveUsers());
// fill up user limit cache
assignment = leafQueue.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
// A total of 10GB have been allocated
assertEquals(10*GB, leafQueue.getUsedResources().getMemorySize());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize());
assertEquals(10*GB, app_9.getCurrentConsumption().getMemorySize());
// There are three users who should have been cached
assertEquals(3, leafQueue.userLimitsCache
.get(RMNodeLabelsManager.NO_LABEL)
.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
.size());
// There are three users so each has a limit of 12/3 = 4GB
assertEquals(4*GB, leafQueue.userLimitsCache
.get(RMNodeLabelsManager.NO_LABEL)
.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
.get(user_0).userLimit.getMemorySize());
assertEquals(4*GB, leafQueue.userLimitsCache
.get(RMNodeLabelsManager.NO_LABEL)
.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
.get(user_1).userLimit.getMemorySize());
assertEquals(4*GB, leafQueue.userLimitsCache
.get(RMNodeLabelsManager.NO_LABEL)
.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
.get(user_2).userLimit.getMemorySize());
// And the cache is NOT stale because no allocation was made
assertEquals(leafQueue.currentUserLimitCacheVersion,
leafQueue.getUsersManager().getLatestVersionOfUsersState());
// Have not made any calls to fill up the all user limit in UsersManager
assertEquals(0,
leafQueue.getUsersManager().preComputedAllUserLimit.size());
// But the user limit cache in leafQueue got filled up using the active
// user limit in UsersManager with 4GB limit (since there are three users
// so 12/3 = 4GB each)
assertEquals(1, leafQueue.getUsersManager()
.preComputedActiveUserLimit.size());
assertEquals(1, leafQueue.getUsersManager()
.preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL).size());
assertEquals(4*GB, leafQueue.getUsersManager()
.preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL)
.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY).getMemorySize());
// submit the 4th application
final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0);
FiCaSchedulerApp app_3 =
new FiCaSchedulerApp(appAttemptId_3, user_3, leafQueue,
leafQueue.getUsersManager(), spyRMContext);
leafQueue.submitApplicationAttempt(app_3, user_3);
apps = ImmutableMap.of(
app_0.getApplicationAttemptId(), app_0,
app_1.getApplicationAttemptId(), app_1,
app_2.getApplicationAttemptId(), app_2,
app_3.getApplicationAttemptId(), app_3
);
app_3.updateResourceRequests(Arrays.asList(
TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
priority, recordFactory),
TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
priority, recordFactory),
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
priority, recordFactory)));
// 4 active users now
assertEquals(4, leafQueue.getUsersManager().getNumActiveUsers());
// Check that the user limits cache has become stale
assertNotEquals(leafQueue.currentUserLimitCacheVersion,
leafQueue.getUsersManager().getLatestVersionOfUsersState());
// Even though there are no allocations, user limit cache is repopulated
assignment = leafQueue.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
// A total of 10GB have been allocated
assertEquals(10*GB, leafQueue.getUsedResources().getMemorySize());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());
assertEquals(10*GB, app_9.getCurrentConsumption().getMemorySize());
// There are four users who should have been cached
assertEquals(4, leafQueue.userLimitsCache
.get(RMNodeLabelsManager.NO_LABEL)
.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
.size());
// There are four users so each has a limit of 12/4 = 3GB
assertEquals(3*GB, leafQueue.userLimitsCache
.get(RMNodeLabelsManager.NO_LABEL)
.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
.get(user_0).userLimit.getMemorySize());
assertEquals(3*GB, leafQueue.userLimitsCache
.get(RMNodeLabelsManager.NO_LABEL)
.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
.get(user_1).userLimit.getMemorySize());
assertEquals(3*GB, leafQueue.userLimitsCache
.get(RMNodeLabelsManager.NO_LABEL)
.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
.get(user_2).userLimit.getMemorySize());
assertEquals(3*GB, leafQueue.userLimitsCache
.get(RMNodeLabelsManager.NO_LABEL)
.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
.get(user_3).userLimit.getMemorySize());
// And the cache is NOT stale because no allocation was made
assertEquals(leafQueue.currentUserLimitCacheVersion,
leafQueue.getUsersManager().getLatestVersionOfUsersState());
// Have not made any calls to fill up the all user limit in UsersManager
assertEquals(0,
leafQueue.getUsersManager().preComputedAllUserLimit.size());
// But the user limit cache in leafQueue got filled up using the active
// user limit in UsersManager with 3GB limit (since there are four users
// so 12/4 = 3GB each)
assertEquals(1, leafQueue.getUsersManager()
.preComputedActiveUserLimit.size());
assertEquals(1, leafQueue.getUsersManager()
.preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL).size());
assertEquals(3*GB, leafQueue.getUsersManager()
.preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL)
.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY).getMemorySize());
}
@Test
public void testUserLimits() throws Exception {
// Mock the queue