YARN-3873. PendingApplications in LeafQueue should also use OrderingPolicy. (Sunil G via wangda)

This commit is contained in:
Wangda Tan 2015-08-10 14:54:55 -07:00
parent 8f73bdd06b
commit cf9d3c9256
11 changed files with 105 additions and 133 deletions

View File

@ -161,6 +161,9 @@ Release 2.8.0 - UNRELEASED
YARN-3948. Display Application Priority in RM Web UI.(Sunil G via rohithsharmaks)
YARN-3873. PendingApplications in LeafQueue should also use OrderingPolicy.
(Sunil G via wangda)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -156,17 +156,6 @@ public class CapacityScheduler extends
static final PartitionedQueueComparator partitionedQueueComparator =
new PartitionedQueueComparator();
public static final Comparator<FiCaSchedulerApp> applicationComparator =
new Comparator<FiCaSchedulerApp>() {
@Override
public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
if (!a1.getPriority().equals(a2.getPriority())) {
return a1.getPriority().compareTo(a2.getPriority());
}
return a1.getApplicationId().compareTo(a2.getApplicationId());
}
};
@Override
public void setConf(Configuration conf) {
yarnConf = conf;
@ -274,11 +263,6 @@ public class CapacityScheduler extends
return this.rmContext.getContainerTokenSecretManager();
}
@Override
public Comparator<FiCaSchedulerApp> getApplicationComparator() {
return applicationComparator;
}
@Override
public ResourceCalculator getResourceCalculator() {
return calculator;
@ -1633,7 +1617,7 @@ public class CapacityScheduler extends
if (disposableLeafQueue.getNumApplications() > 0) {
throw new SchedulerDynamicEditException("The queue " + queueName
+ " is not empty " + disposableLeafQueue.getApplications().size()
+ " active apps " + disposableLeafQueue.pendingApplications.size()
+ " active apps " + disposableLeafQueue.getPendingApplications().size()
+ " pending apps");
}

View File

@ -54,8 +54,6 @@ public interface CapacitySchedulerContext {
*/
Configuration getConf();
Comparator<FiCaSchedulerApp> getApplicationComparator();
ResourceCalculator getResourceCalculator();
Comparator<CSQueue> getNonPartitionedQueueComparator();

View File

@ -93,7 +93,7 @@ public class LeafQueue extends AbstractCSQueue {
private Priority defaultAppPriorityPerQueue;
Set<FiCaSchedulerApp> pendingApplications;
private OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy = null;
private volatile float minimumAllocationFactor;
@ -117,8 +117,7 @@ public class LeafQueue extends AbstractCSQueue {
private volatile ResourceLimits cachedResourceLimitsForHeadroom = null;
private OrderingPolicy<FiCaSchedulerApp>
orderingPolicy = new FifoOrderingPolicy<FiCaSchedulerApp>();
private OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
// record all ignore partition exclusivityRMContainer, this will be used to do
// preemption, key is the partition of the RMContainer allocated on
@ -136,11 +135,6 @@ public class LeafQueue extends AbstractCSQueue {
LOG.debug("LeafQueue:" + " name=" + queueName
+ ", fullname=" + getQueuePath());
}
Comparator<FiCaSchedulerApp> applicationComparator =
cs.getApplicationComparator();
this.pendingApplications =
new TreeSet<FiCaSchedulerApp>(applicationComparator);
setupQueueConfigs(cs.getClusterResource());
}
@ -164,6 +158,8 @@ public class LeafQueue extends AbstractCSQueue {
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
setPendingAppsOrderingPolicy(conf
.<FiCaSchedulerApp> getOrderingPolicy(getQueuePath()));
userLimit = conf.getUserLimit(getQueuePath());
userLimitFactor = conf.getUserLimitFactor(getQueuePath());
@ -328,7 +324,7 @@ public class LeafQueue extends AbstractCSQueue {
}
public synchronized int getNumPendingApplications() {
return pendingApplications.size();
return pendingOrderingPolicy.getNumSchedulableEntities();
}
public synchronized int getNumActiveApplications() {
@ -594,8 +590,8 @@ public class LeafQueue extends AbstractCSQueue {
Resource amLimit = getAMResourceLimit();
Resource userAMLimit = getUserAMResourceLimit();
for (Iterator<FiCaSchedulerApp> i=pendingApplications.iterator();
i.hasNext(); ) {
for (Iterator<FiCaSchedulerApp> i = getPendingAppsOrderingPolicy()
.getAssignmentIterator(); i.hasNext();) {
FiCaSchedulerApp application = i.next();
ApplicationId applicationId = application.getApplicationId();
// Check am resource limit
@ -662,7 +658,7 @@ public class LeafQueue extends AbstractCSQueue {
User user) {
// Accept
user.submitApplication();
pendingApplications.add(application);
getPendingAppsOrderingPolicy().addSchedulableEntity(application);
applicationAttemptMap.put(application.getApplicationAttemptId(), application);
// Activate applications
@ -701,7 +697,7 @@ public class LeafQueue extends AbstractCSQueue {
boolean wasActive =
orderingPolicy.removeSchedulableEntity(application);
if (!wasActive) {
pendingApplications.remove(application);
pendingOrderingPolicy.removeSchedulableEntity(application);
} else {
queueUsage.decAMUsed(application.getAMResource());
user.getResourceUsage().decAMUsed(application.getAMResource());
@ -1354,7 +1350,14 @@ public class LeafQueue extends AbstractCSQueue {
}
getParent().recoverContainer(clusterResource, attempt, rmContainer);
}
/**
* Obtain (read-only) collection of pending applications.
*/
public Collection<FiCaSchedulerApp> getPendingApplications() {
return pendingOrderingPolicy.getSchedulableEntities();
}
/**
* Obtain (read-only) collection of active applications.
*/
@ -1375,7 +1378,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public synchronized void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
for (FiCaSchedulerApp pendingApp : pendingApplications) {
for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy
.getSchedulableEntities()) {
apps.add(pendingApp.getApplicationAttemptId());
}
for (FiCaSchedulerApp app :
@ -1450,9 +1454,10 @@ public class LeafQueue extends AbstractCSQueue {
public synchronized void setOrderingPolicy(
OrderingPolicy<FiCaSchedulerApp> orderingPolicy) {
orderingPolicy.addAllSchedulableEntities(
this.orderingPolicy.getSchedulableEntities()
);
if (null != this.orderingPolicy) {
orderingPolicy.addAllSchedulableEntities(this.orderingPolicy
.getSchedulableEntities());
}
this.orderingPolicy = orderingPolicy;
}
@ -1461,6 +1466,20 @@ public class LeafQueue extends AbstractCSQueue {
return defaultAppPriorityPerQueue;
}
public synchronized OrderingPolicy<FiCaSchedulerApp>
getPendingAppsOrderingPolicy() {
return pendingOrderingPolicy;
}
public synchronized void setPendingAppsOrderingPolicy(
OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy) {
if (null != this.pendingOrderingPolicy) {
pendingOrderingPolicy
.addAllSchedulableEntities(this.pendingOrderingPolicy
.getSchedulableEntities());
}
this.pendingOrderingPolicy = pendingOrderingPolicy;
}
/*
* Holds shared values used by all applications in
* the queue to calculate headroom on demand

View File

@ -34,6 +34,7 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -1127,8 +1128,13 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
when(parentQueue.getChildQueues()).thenReturn(children);
} else {
LeafQueue leafQueue = mock(LeafQueue.class);
final TreeSet<FiCaSchedulerApp> apps =
new TreeSet<>(CapacityScheduler.applicationComparator);
final TreeSet<FiCaSchedulerApp> apps = new TreeSet<>(
new Comparator<FiCaSchedulerApp>() {
@Override
public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
return a1.getApplicationId().compareTo(a2.getApplicationId());
}
});
when(leafQueue.getApplications()).thenReturn(apps);
OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
when(so.getPreemptionIterator()).thenAnswer(new Answer() {

View File

@ -93,8 +93,6 @@ public class TestApplicationLimits {
thenReturn(Resources.createResource(16*GB, 32));
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).
@ -255,8 +253,6 @@ public class TestApplicationLimits {
thenReturn(Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB, 16));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
@ -499,7 +495,7 @@ public class TestApplicationLimits {
assertEquals(1, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
assertEquals(1, queue.getNumPendingApplications(user_0));
assertTrue(queue.pendingApplications.contains(app_2));
assertTrue(queue.getPendingApplications().contains(app_2));
// Submit fourth application, should remain pending
FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0,
@ -509,7 +505,7 @@ public class TestApplicationLimits {
assertEquals(2, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
assertEquals(2, queue.getNumPendingApplications(user_0));
assertTrue(queue.pendingApplications.contains(app_3));
assertTrue(queue.getPendingApplications().contains(app_3));
// Kill 3rd pending application
queue.finishApplicationAttempt(app_2, A);
@ -517,7 +513,7 @@ public class TestApplicationLimits {
assertEquals(1, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
assertEquals(1, queue.getNumPendingApplications(user_0));
assertFalse(queue.pendingApplications.contains(app_2));
assertFalse(queue.getPendingApplications().contains(app_2));
assertFalse(queue.getApplications().contains(app_2));
// Finish 1st application, app_3 should become active
@ -527,7 +523,7 @@ public class TestApplicationLimits {
assertEquals(2, queue.getNumActiveApplications(user_0));
assertEquals(0, queue.getNumPendingApplications(user_0));
assertTrue(queue.getApplications().contains(app_3));
assertFalse(queue.pendingApplications.contains(app_3));
assertFalse(queue.getPendingApplications().contains(app_3));
assertFalse(queue.getApplications().contains(app_0));
// Finish 2nd application
@ -562,8 +558,6 @@ public class TestApplicationLimits {
thenReturn(Resources.createResource(GB));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);

View File

@ -889,61 +889,36 @@ public class TestCapacityScheduler {
0, alloc1Response.getAllocatedContainers().size());
rm.stop();
}
@Test (timeout = 5000)
public void testApplicationComparator()
{
CapacityScheduler cs = new CapacityScheduler();
Comparator<FiCaSchedulerApp> appComparator= cs.getApplicationComparator();
ApplicationId id1 = ApplicationId.newInstance(1, 1);
ApplicationId id2 = ApplicationId.newInstance(1, 2);
ApplicationId id3 = ApplicationId.newInstance(2, 1);
Priority priority = Priority.newInstance(0);
//same clusterId
FiCaSchedulerApp app1 = Mockito.mock(FiCaSchedulerApp.class);
when(app1.getApplicationId()).thenReturn(id1);
when(app1.getPriority()).thenReturn(priority);
FiCaSchedulerApp app2 = Mockito.mock(FiCaSchedulerApp.class);
when(app2.getApplicationId()).thenReturn(id2);
when(app2.getPriority()).thenReturn(priority);
FiCaSchedulerApp app3 = Mockito.mock(FiCaSchedulerApp.class);
when(app3.getApplicationId()).thenReturn(id3);
when(app3.getPriority()).thenReturn(priority);
assertTrue(appComparator.compare(app1, app2) < 0);
//different clusterId
assertTrue(appComparator.compare(app1, app3) < 0);
assertTrue(appComparator.compare(app2, app3) < 0);
}
@Test
public void testGetAppsInQueue() throws Exception {
Application application_0 = new Application("user_0", "a1", resourceManager);
application_0.submit();
Application application_1 = new Application("user_0", "a2", resourceManager);
application_1.submit();
Application application_2 = new Application("user_0", "b2", resourceManager);
application_2.submit();
ResourceScheduler scheduler = resourceManager.getResourceScheduler();
List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
assertEquals(1, appsInA1.size());
List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
assertTrue(appsInA.contains(application_0.getApplicationAttemptId()));
assertTrue(appsInA.contains(application_1.getApplicationAttemptId()));
assertEquals(2, appsInA.size());
@Test
public void testGetAppsInQueue() throws Exception {
Application application_0 = new Application("user_0", "a1", resourceManager);
application_0.submit();
List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
assertTrue(appsInRoot.contains(application_0.getApplicationAttemptId()));
assertTrue(appsInRoot.contains(application_1.getApplicationAttemptId()));
assertTrue(appsInRoot.contains(application_2.getApplicationAttemptId()));
assertEquals(3, appsInRoot.size());
Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue"));
}
Application application_1 = new Application("user_0", "a2", resourceManager);
application_1.submit();
Application application_2 = new Application("user_0", "b2", resourceManager);
application_2.submit();
ResourceScheduler scheduler = resourceManager.getResourceScheduler();
List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
assertEquals(1, appsInA1.size());
List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
assertTrue(appsInA.contains(application_0.getApplicationAttemptId()));
assertTrue(appsInA.contains(application_1.getApplicationAttemptId()));
assertEquals(2, appsInA.size());
List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
assertTrue(appsInRoot.contains(application_0.getApplicationAttemptId()));
assertTrue(appsInRoot.contains(application_1.getApplicationAttemptId()));
assertTrue(appsInRoot.contains(application_2.getApplicationAttemptId()));
assertEquals(3, appsInRoot.size());
Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue"));
}
@Test
public void testAddAndRemoveAppFromCapacityScheduler() throws Exception {

View File

@ -94,8 +94,6 @@ public class TestChildQueueOrder {
Resources.createResource(16*GB, 32));
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).

View File

@ -144,8 +144,6 @@ public class TestLeafQueue {
thenReturn(Resources.createResource(16*GB, 32));
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).
@ -1910,7 +1908,7 @@ public class TestLeafQueue {
// before reinitialization
assertEquals(2, e.getNumActiveApplications());
assertEquals(1, e.pendingApplications.size());
assertEquals(1, e.getNumPendingApplications());
csConf.setDouble(CapacitySchedulerConfiguration
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
@ -1927,7 +1925,7 @@ public class TestLeafQueue {
// after reinitialization
assertEquals(3, e.getNumActiveApplications());
assertEquals(0, e.pendingApplications.size());
assertEquals(0, e.getNumPendingApplications());
}
@Test (timeout = 30000)
@ -1991,7 +1989,7 @@ public class TestLeafQueue {
// before updating cluster resource
assertEquals(2, e.getNumActiveApplications());
assertEquals(1, e.pendingApplications.size());
assertEquals(1, e.getNumPendingApplications());
Resource clusterResource = Resources.createResource(200 * 16 * GB, 100 * 32);
e.updateClusterResource(clusterResource,
@ -1999,7 +1997,7 @@ public class TestLeafQueue {
// after updating cluster resource
assertEquals(3, e.getNumActiveApplications());
assertEquals(0, e.pendingApplications.size());
assertEquals(0, e.getNumPendingApplications());
}
public boolean hasQueueACL(List<QueueUserACLInfo> aclInfos, QueueACL acl) {
@ -2350,8 +2348,9 @@ public class TestLeafQueue {
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
a.setOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>());
a.setOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>());
a.setPendingAppsOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>());
String host_0_0 = "127.0.0.1";
String rack_0 = "rack_0";
FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 16*GB);
@ -2367,14 +2366,14 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), spyRMContext));
mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3)));
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), spyRMContext));
mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5)));
a.submitApplicationAttempt(app_1, user_0);
Priority priority = TestUtils.createMockPriority(1);
@ -2392,36 +2391,34 @@ public class TestLeafQueue {
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
true, priority, recordFactory));
app_1.updateResourceRequests(app_1_requests_0);
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
// app_1 will get containers as it has high priority
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
app_0_requests_0.clear();
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
app_1_requests_0.clear();
app_1_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
true, priority, recordFactory));
app_1.updateResourceRequests(app_1_requests_0);
//Even thought it already has more resources, app_0 will still get
//assigned first
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
//and only then will app_1
//app_1 will still get assigned first as priority is more.
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
}
//and only then will app_2
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
}
@Test
public void testConcurrentAccess() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
@ -2500,6 +2497,7 @@ public class TestLeafQueue {
new FairOrderingPolicy<FiCaSchedulerApp>();
a.setOrderingPolicy(schedulingOrder);
a.setPendingAppsOrderingPolicy(new FairOrderingPolicy<FiCaSchedulerApp>());
String host_0_0 = "127.0.0.1";
String rack_0 = "rack_0";
@ -2542,6 +2540,7 @@ public class TestLeafQueue {
true, priority, recordFactory));
app_1.updateResourceRequests(app_1_requests_0);
// app_0 will get containers as its submitted first.
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);

View File

@ -90,8 +90,6 @@ public class TestParentQueue {
Resources.createResource(16*GB, 32));
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).

View File

@ -112,8 +112,6 @@ public class TestReservations {
Resources.createResource(16 * GB, 12));
when(csContext.getClusterResource()).thenReturn(
Resources.createResource(100 * 16 * GB, 100 * 12));
when(csContext.getApplicationComparator()).thenReturn(
CapacityScheduler.applicationComparator);
when(csContext.getNonPartitionedQueueComparator()).thenReturn(
CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);