YARN-4287. Capacity Scheduler: Rack Locality improvement (Nathan Roberts via wangda)
This commit is contained in:
parent
0ca8df716a
commit
796638d9bc
|
@ -554,6 +554,8 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-4310. FairScheduler: Log skipping reservation messages at DEBUG level (asuresh)
|
YARN-4310. FairScheduler: Log skipping reservation messages at DEBUG level (asuresh)
|
||||||
|
|
||||||
|
YARN-4287. Capacity Scheduler: Rack Locality improvement (Nathan Roberts via wangda)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||||
|
|
|
@ -595,9 +595,11 @@ public class AppSchedulingInfo {
|
||||||
+ " container=" + container.getId()
|
+ " container=" + container.getId()
|
||||||
+ " host=" + container.getNodeId().toString()
|
+ " host=" + container.getNodeId().toString()
|
||||||
+ " user=" + user
|
+ " user=" + user
|
||||||
+ " resource=" + request.getCapability());
|
+ " resource=" + request.getCapability()
|
||||||
|
+ " type=" + type);
|
||||||
}
|
}
|
||||||
metrics.allocateResources(user, 1, request.getCapability(), true);
|
metrics.allocateResources(user, 1, request.getCapability(), true);
|
||||||
|
metrics.incrNodeTypeAggregations(user, type);
|
||||||
return resourceRequests;
|
return resourceRequests;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -64,6 +64,12 @@ public class QueueMetrics implements MetricsSource {
|
||||||
@Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
|
@Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
|
||||||
@Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
|
@Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
|
||||||
@Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated;
|
@Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated;
|
||||||
|
@Metric("Aggregate # of allocated node-local containers")
|
||||||
|
MutableCounterLong aggregateNodeLocalContainersAllocated;
|
||||||
|
@Metric("Aggregate # of allocated rack-local containers")
|
||||||
|
MutableCounterLong aggregateRackLocalContainersAllocated;
|
||||||
|
@Metric("Aggregate # of allocated off-switch containers")
|
||||||
|
MutableCounterLong aggregateOffSwitchContainersAllocated;
|
||||||
@Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased;
|
@Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased;
|
||||||
@Metric("Available memory in MB") MutableGaugeInt availableMB;
|
@Metric("Available memory in MB") MutableGaugeInt availableMB;
|
||||||
@Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores;
|
@Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores;
|
||||||
|
@ -379,6 +385,25 @@ public class QueueMetrics implements MetricsSource {
|
||||||
pendingVCores.decr(res.getVirtualCores() * Math.max(containers, 1));
|
pendingVCores.decr(res.getVirtualCores() * Math.max(containers, 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incrNodeTypeAggregations(String user, NodeType type) {
|
||||||
|
if (type == NodeType.NODE_LOCAL) {
|
||||||
|
aggregateNodeLocalContainersAllocated.incr();
|
||||||
|
} else if (type == NodeType.RACK_LOCAL) {
|
||||||
|
aggregateRackLocalContainersAllocated.incr();
|
||||||
|
} else if (type == NodeType.OFF_SWITCH) {
|
||||||
|
aggregateOffSwitchContainersAllocated.incr();
|
||||||
|
} else {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
QueueMetrics userMetrics = getUserMetrics(user);
|
||||||
|
if (userMetrics != null) {
|
||||||
|
userMetrics.incrNodeTypeAggregations(user, type);
|
||||||
|
}
|
||||||
|
if (parent != null) {
|
||||||
|
parent.incrNodeTypeAggregations(user, type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void allocateResources(String user, int containers, Resource res,
|
public void allocateResources(String user, int containers, Resource res,
|
||||||
boolean decrPending) {
|
boolean decrPending) {
|
||||||
// if #containers = 0, means change container resource
|
// if #containers = 0, means change container resource
|
||||||
|
@ -562,6 +587,18 @@ public class QueueMetrics implements MetricsSource {
|
||||||
return aggregateContainersAllocated.value();
|
return aggregateContainersAllocated.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getAggregateNodeLocalContainersAllocated() {
|
||||||
|
return aggregateNodeLocalContainersAllocated.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getAggregateRackLocalContainersAllocated() {
|
||||||
|
return aggregateRackLocalContainersAllocated.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getAggregateOffSwitchContainersAllocated() {
|
||||||
|
return aggregateOffSwitchContainersAllocated.value();
|
||||||
|
}
|
||||||
|
|
||||||
public long getAggegatedReleasedContainers() {
|
public long getAggegatedReleasedContainers() {
|
||||||
return aggregateContainersReleased.value();
|
return aggregateContainersReleased.value();
|
||||||
}
|
}
|
||||||
|
|
|
@ -184,6 +184,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
@Private
|
@Private
|
||||||
public static final int DEFAULT_NODE_LOCALITY_DELAY = 40;
|
public static final int DEFAULT_NODE_LOCALITY_DELAY = 40;
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String RACK_LOCALITY_FULL_RESET =
|
||||||
|
PREFIX + "rack-locality-full-reset";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final boolean DEFAULT_RACK_LOCALITY_FULL_RESET = true;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public static final String SCHEDULE_ASYNCHRONOUSLY_PREFIX =
|
public static final String SCHEDULE_ASYNCHRONOUSLY_PREFIX =
|
||||||
PREFIX + "schedule-asynchronously";
|
PREFIX + "schedule-asynchronously";
|
||||||
|
@ -665,6 +672,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY);
|
return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean getRackLocalityFullReset() {
|
||||||
|
return getBoolean(RACK_LOCALITY_FULL_RESET,
|
||||||
|
DEFAULT_RACK_LOCALITY_FULL_RESET);
|
||||||
|
}
|
||||||
|
|
||||||
public ResourceCalculator getResourceCalculator() {
|
public ResourceCalculator getResourceCalculator() {
|
||||||
return ReflectionUtils.newInstance(
|
return ReflectionUtils.newInstance(
|
||||||
getClass(
|
getClass(
|
||||||
|
|
|
@ -86,6 +86,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
private float maxAMResourcePerQueuePercent;
|
private float maxAMResourcePerQueuePercent;
|
||||||
|
|
||||||
private volatile int nodeLocalityDelay;
|
private volatile int nodeLocalityDelay;
|
||||||
|
private volatile boolean rackLocalityFullReset;
|
||||||
|
|
||||||
Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
|
Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
|
||||||
new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
|
new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
|
||||||
|
@ -190,6 +191,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeLocalityDelay = conf.getNodeLocalityDelay();
|
nodeLocalityDelay = conf.getNodeLocalityDelay();
|
||||||
|
rackLocalityFullReset = conf.getRackLocalityFullReset();
|
||||||
|
|
||||||
// re-init this since max allocation could have changed
|
// re-init this since max allocation could have changed
|
||||||
this.minimumAllocationFactor =
|
this.minimumAllocationFactor =
|
||||||
|
@ -1002,6 +1004,11 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
return nodeLocalityDelay;
|
return nodeLocalityDelay;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Lock(NoLock.class)
|
||||||
|
public boolean getRackLocalityFullReset() {
|
||||||
|
return rackLocalityFullReset;
|
||||||
|
}
|
||||||
|
|
||||||
@Lock(NoLock.class)
|
@Lock(NoLock.class)
|
||||||
private Resource computeUserLimit(FiCaSchedulerApp application,
|
private Resource computeUserLimit(FiCaSchedulerApp application,
|
||||||
Resource clusterResource, User user,
|
Resource clusterResource, User user,
|
||||||
|
|
|
@ -89,7 +89,7 @@ public abstract class AbstractContainerAllocator {
|
||||||
LOG.info("assignedContainer" + " application attempt="
|
LOG.info("assignedContainer" + " application attempt="
|
||||||
+ application.getApplicationAttemptId() + " container="
|
+ application.getApplicationAttemptId() + " container="
|
||||||
+ updatedContainer.getId() + " queue=" + this + " clusterResource="
|
+ updatedContainer.getId() + " queue=" + this + " clusterResource="
|
||||||
+ clusterResource);
|
+ clusterResource + " type=" + assignment.getType());
|
||||||
|
|
||||||
application
|
application
|
||||||
.getCSLeafQueue()
|
.getCSLeafQueue()
|
||||||
|
|
|
@ -246,8 +246,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
||||||
float localityWaitFactor =
|
float localityWaitFactor =
|
||||||
getLocalityWaitFactor(priority, rmContext.getScheduler()
|
getLocalityWaitFactor(priority, rmContext.getScheduler()
|
||||||
.getNumClusterNodes());
|
.getNumClusterNodes());
|
||||||
|
// Cap the delay by the number of nodes in the cluster. Under most conditions
|
||||||
return ((requiredContainers * localityWaitFactor) < missedOpportunities);
|
// this means we will consider each node in the cluster before
|
||||||
|
// accepting an off-switch assignment.
|
||||||
|
return (Math.min(rmContext.getScheduler().getNumClusterNodes(),
|
||||||
|
(requiredContainers * localityWaitFactor)) < missedOpportunities);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if we need containers on this rack
|
// Check if we need containers on this rack
|
||||||
|
@ -643,7 +646,13 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Resetting scheduling opportunities");
|
LOG.debug("Resetting scheduling opportunities");
|
||||||
}
|
}
|
||||||
application.resetSchedulingOpportunities(priority);
|
// Only reset scheduling opportunities for RACK_LOCAL if configured
|
||||||
|
// to do so. Not resetting means we will continue to schedule
|
||||||
|
// RACK_LOCAL without delay.
|
||||||
|
if (allocationResult.containerNodeType == NodeType.NODE_LOCAL
|
||||||
|
|| application.getCSLeafQueue().getRackLocalityFullReset()) {
|
||||||
|
application.resetSchedulingOpportunities(priority);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Non-exclusive scheduling opportunity is different: we need reset
|
// Non-exclusive scheduling opportunity is different: we need reset
|
||||||
|
|
|
@ -198,6 +198,53 @@ public class TestQueueMetrics {
|
||||||
checkApps(userSource, 1, 0, 0, 1, 0, 0, true);
|
checkApps(userSource, 1, 0, 0, 1, 0, 0, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test public void testNodeTypeMetrics() {
|
||||||
|
String parentQueueName = "root";
|
||||||
|
String leafQueueName = "root.leaf";
|
||||||
|
String user = "alice";
|
||||||
|
|
||||||
|
QueueMetrics parentMetrics =
|
||||||
|
QueueMetrics.forQueue(ms, parentQueueName, null, true, conf);
|
||||||
|
Queue parentQueue = make(stub(Queue.class).returning(parentMetrics).
|
||||||
|
from.getMetrics());
|
||||||
|
QueueMetrics metrics =
|
||||||
|
QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, conf);
|
||||||
|
MetricsSource parentQueueSource = queueSource(ms, parentQueueName);
|
||||||
|
MetricsSource queueSource = queueSource(ms, leafQueueName);
|
||||||
|
//AppSchedulingInfo app = mockApp(user);
|
||||||
|
|
||||||
|
metrics.submitApp(user);
|
||||||
|
MetricsSource userSource = userSource(ms, leafQueueName, user);
|
||||||
|
MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
|
||||||
|
|
||||||
|
metrics.incrNodeTypeAggregations(user, NodeType.NODE_LOCAL);
|
||||||
|
checkAggregatedNodeTypes(queueSource,1L,0L,0L);
|
||||||
|
checkAggregatedNodeTypes(parentQueueSource,1L,0L,0L);
|
||||||
|
checkAggregatedNodeTypes(userSource,1L,0L,0L);
|
||||||
|
checkAggregatedNodeTypes(parentUserSource,1L,0L,0L);
|
||||||
|
|
||||||
|
metrics.incrNodeTypeAggregations(user, NodeType.RACK_LOCAL);
|
||||||
|
checkAggregatedNodeTypes(queueSource,1L,1L,0L);
|
||||||
|
checkAggregatedNodeTypes(parentQueueSource,1L,1L,0L);
|
||||||
|
checkAggregatedNodeTypes(userSource,1L,1L,0L);
|
||||||
|
checkAggregatedNodeTypes(parentUserSource,1L,1L,0L);
|
||||||
|
|
||||||
|
metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH);
|
||||||
|
checkAggregatedNodeTypes(queueSource,1L,1L,1L);
|
||||||
|
checkAggregatedNodeTypes(parentQueueSource,1L,1L,1L);
|
||||||
|
checkAggregatedNodeTypes(userSource,1L,1L,1L);
|
||||||
|
checkAggregatedNodeTypes(parentUserSource,1L,1L,1L);
|
||||||
|
|
||||||
|
metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH);
|
||||||
|
checkAggregatedNodeTypes(queueSource,1L,1L,2L);
|
||||||
|
checkAggregatedNodeTypes(parentQueueSource,1L,1L,2L);
|
||||||
|
checkAggregatedNodeTypes(userSource,1L,1L,2L);
|
||||||
|
checkAggregatedNodeTypes(parentUserSource,1L,1L,2L);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test public void testTwoLevelWithUserMetrics() {
|
@Test public void testTwoLevelWithUserMetrics() {
|
||||||
String parentQueueName = "root";
|
String parentQueueName = "root";
|
||||||
String leafQueueName = "root.leaf";
|
String leafQueueName = "root.leaf";
|
||||||
|
@ -367,6 +414,14 @@ public class TestQueueMetrics {
|
||||||
assertGauge("ReservedContainers", reservedCtnrs, rb);
|
assertGauge("ReservedContainers", reservedCtnrs, rb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void checkAggregatedNodeTypes(MetricsSource source,
|
||||||
|
long nodeLocal, long rackLocal, long offSwitch) {
|
||||||
|
MetricsRecordBuilder rb = getMetrics(source);
|
||||||
|
assertCounter("AggregateNodeLocalContainersAllocated", nodeLocal, rb);
|
||||||
|
assertCounter("AggregateRackLocalContainersAllocated", rackLocal, rb);
|
||||||
|
assertCounter("AggregateOffSwitchContainersAllocated", offSwitch, rb);
|
||||||
|
}
|
||||||
|
|
||||||
private static AppSchedulingInfo mockApp(String user) {
|
private static AppSchedulingInfo mockApp(String user) {
|
||||||
AppSchedulingInfo app = mock(AppSchedulingInfo.class);
|
AppSchedulingInfo app = mock(AppSchedulingInfo.class);
|
||||||
when(app.getUser()).thenReturn(user);
|
when(app.getUser()).thenReturn(user);
|
||||||
|
|
|
@ -1506,8 +1506,8 @@ public class TestLeafQueue {
|
||||||
@Test
|
@Test
|
||||||
public void testLocalityScheduling() throws Exception {
|
public void testLocalityScheduling() throws Exception {
|
||||||
|
|
||||||
// Manipulate queue 'a'
|
// Manipulate queue 'b'
|
||||||
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(B));
|
||||||
|
|
||||||
// User
|
// User
|
||||||
String user_0 = "user_0";
|
String user_0 = "user_0";
|
||||||
|
@ -1614,32 +1614,85 @@ public class TestLeafQueue {
|
||||||
TestUtils.createResourceRequest(host_1, 1*GB, 1,
|
TestUtils.createResourceRequest(host_1, 1*GB, 1,
|
||||||
true, priority, recordFactory));
|
true, priority, recordFactory));
|
||||||
app_0_requests_0.add(
|
app_0_requests_0.add(
|
||||||
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
|
TestUtils.createResourceRequest(rack_1, 1*GB, 3,
|
||||||
true, priority, recordFactory));
|
true, priority, recordFactory));
|
||||||
app_0_requests_0.add(
|
app_0_requests_0.add(
|
||||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, // one extra
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 4, // one extra
|
||||||
true, priority, recordFactory));
|
true, priority, recordFactory));
|
||||||
app_0.updateResourceRequests(app_0_requests_0);
|
app_0.updateResourceRequests(app_0_requests_0);
|
||||||
assertEquals(2, app_0.getTotalRequiredResources(priority));
|
assertEquals(4, app_0.getTotalRequiredResources(priority));
|
||||||
|
|
||||||
String host_3 = "127.0.0.4"; // on rack_1
|
String host_3 = "127.0.0.4"; // on rack_1
|
||||||
FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
|
FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
|
||||||
|
|
||||||
// Rack-delay
|
// Rack-delay
|
||||||
|
doReturn(true).when(a).getRackLocalityFullReset();
|
||||||
doReturn(1).when(a).getNodeLocalityDelay();
|
doReturn(1).when(a).getNodeLocalityDelay();
|
||||||
|
|
||||||
// Shouldn't assign RACK_LOCAL yet
|
// Shouldn't assign RACK_LOCAL yet
|
||||||
assignment = a.assignContainers(clusterResource, node_3,
|
assignment = a.assignContainers(clusterResource, node_3,
|
||||||
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
assertEquals(1, app_0.getSchedulingOpportunities(priority));
|
assertEquals(1, app_0.getSchedulingOpportunities(priority));
|
||||||
assertEquals(2, app_0.getTotalRequiredResources(priority));
|
assertEquals(4, app_0.getTotalRequiredResources(priority));
|
||||||
|
|
||||||
// Should assign RACK_LOCAL now
|
// Should assign RACK_LOCAL now
|
||||||
assignment = a.assignContainers(clusterResource, node_3,
|
assignment = a.assignContainers(clusterResource, node_3,
|
||||||
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
|
verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
|
||||||
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
|
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
|
||||||
|
assertEquals(3, app_0.getTotalRequiredResources(priority));
|
||||||
|
|
||||||
|
// Shouldn't assign RACK_LOCAL because schedulingOpportunities should have gotten reset.
|
||||||
|
assignment = a.assignContainers(clusterResource, node_3,
|
||||||
|
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
assertEquals(1, app_0.getSchedulingOpportunities(priority));
|
||||||
|
assertEquals(3, app_0.getTotalRequiredResources(priority));
|
||||||
|
|
||||||
|
// Next time we schedule RACK_LOCAL, don't reset
|
||||||
|
doReturn(false).when(a).getRackLocalityFullReset();
|
||||||
|
|
||||||
|
// Should assign RACK_LOCAL now
|
||||||
|
assignment = a.assignContainers(clusterResource, node_3,
|
||||||
|
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
|
||||||
|
assertEquals(2, app_0.getSchedulingOpportunities(priority)); // should NOT reset
|
||||||
|
assertEquals(2, app_0.getTotalRequiredResources(priority));
|
||||||
|
|
||||||
|
// Another RACK_LOCAL since schedulingOpportunities not reset
|
||||||
|
assignment = a.assignContainers(clusterResource, node_3,
|
||||||
|
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
|
||||||
|
assertEquals(3, app_0.getSchedulingOpportunities(priority)); // should NOT reset
|
||||||
assertEquals(1, app_0.getTotalRequiredResources(priority));
|
assertEquals(1, app_0.getTotalRequiredResources(priority));
|
||||||
|
|
||||||
|
// Add a request larger than cluster size to verify
|
||||||
|
// OFF_SWITCH delay is capped by cluster size
|
||||||
|
app_0.resetSchedulingOpportunities(priority);
|
||||||
|
app_0_requests_0.clear();
|
||||||
|
app_0_requests_0.add(
|
||||||
|
TestUtils.createResourceRequest(host_0, 1*GB, 100,
|
||||||
|
true, priority, recordFactory));
|
||||||
|
app_0_requests_0.add(
|
||||||
|
TestUtils.createResourceRequest(rack_0, 1*GB, 100,
|
||||||
|
true, priority, recordFactory));
|
||||||
|
app_0_requests_0.add(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 100,
|
||||||
|
true, priority, recordFactory));
|
||||||
|
app_0.updateResourceRequests(app_0_requests_0);
|
||||||
|
|
||||||
|
// Start with off switch. 3 nodes in cluster so shouldn't allocate first 3
|
||||||
|
for (int i = 0; i < numNodes; i++) {
|
||||||
|
assignment =
|
||||||
|
a.assignContainers(clusterResource, node_2, new ResourceLimits(
|
||||||
|
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
verifyNoContainerAllocated(assignment);
|
||||||
|
assertEquals(i+1, app_0.getSchedulingOpportunities(priority));
|
||||||
|
}
|
||||||
|
// delay should be capped at numNodes so next one should allocate
|
||||||
|
assignment = a.assignContainers(clusterResource, node_2,
|
||||||
|
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
|
||||||
|
assertEquals(numNodes+1, app_0.getSchedulingOpportunities(priority));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue