YARN-7527. Over-allocate node resource in async-scheduling mode of CapacityScheduler. Contributed by Tao Yang.

This commit is contained in:
Weiwei Yang 2018-04-12 10:12:46 +08:00
parent be627ccfe3
commit a48deb1552
2 changed files with 78 additions and 2 deletions

View File

@ -424,8 +424,10 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
// Common part of check container allocation regardless if it is a
// increase container or regular container
commonCheckContainerAllocation(cluster, allocation,
schedulerContainer);
if (!commonCheckContainerAllocation(cluster, allocation,
schedulerContainer)) {
return false;
}
} else {
// Container reserved first time will be NEW, after the container
// accepted & confirmed, it will become RESERVED state

View File

@ -411,6 +411,80 @@ public class TestCapacitySchedulerAsyncScheduling {
rm.stop();
}
@Test (timeout = 30000)
public void testNodeResourceOverAllocated()
throws Exception {
// disable async-scheduling for simulating complex scene
Configuration disableAsyncConf = new Configuration(conf);
disableAsyncConf.setBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
// init RM & NMs & Nodes
final MockRM rm = new MockRM(disableAsyncConf);
rm.start();
final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
final MockNM nm2 = rm.registerNode("h2:1234", 9 * GB);
List<MockNM> nmLst = new ArrayList<>();
nmLst.add(nm1);
nmLst.add(nm2);
// init scheduler & nodes
while (
((CapacityScheduler) rm.getRMContext().getScheduler()).getNodeTracker()
.nodeCount() < 2) {
Thread.sleep(10);
}
Assert.assertEquals(2,
((AbstractYarnScheduler) rm.getRMContext().getScheduler())
.getNodeTracker().nodeCount());
CapacityScheduler scheduler =
(CapacityScheduler) rm.getRMContext().getScheduler();
SchedulerNode sn1 = scheduler.getSchedulerNode(nm1.getNodeId());
// launch app
RMApp app = rm.submitApp(200, "app", "user", null, false, "default",
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
FiCaSchedulerApp schedulerApp =
scheduler.getApplicationAttempt(am.getApplicationAttemptId());
// allocate 2 containers and running on nm1
Resource containerResource = Resources.createResource(5 * GB);
am.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.newInstance(0), "*", containerResource, 2)),
null);
// generate over-allocated proposals for nm1
for (int containerNo = 2; containerNo <= 3; containerNo++) {
Container container = Container.newInstance(
ContainerId.newContainerId(am.getApplicationAttemptId(), containerNo),
sn1.getNodeID(), sn1.getHttpAddress(), containerResource,
Priority.newInstance(0), null);
RMContainer rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.create(ResourceRequest
.newInstance(Priority.newInstance(0), "*", containerResource, 1)),
am.getApplicationAttemptId(), sn1.getNodeID(), "user",
rm.getRMContext());
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> newContainer =
new SchedulerContainer<>(schedulerApp,
scheduler.getNode(sn1.getNodeID()), rmContainer, "", true);
ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>
newContainerProposal =
new ContainerAllocationProposal<>(newContainer, null, null,
NodeType.OFF_SWITCH, NodeType.OFF_SWITCH,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, containerResource);
List<ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>>
newProposals = new ArrayList<>();
newProposals.add(newContainerProposal);
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request =
new ResourceCommitRequest<>(newProposals, null, null);
scheduler.tryCommit(scheduler.getClusterResource(), request);
}
// make sure node resource can't be over-allocated!
Assert.assertTrue("Node resource is Over-allocated!",
sn1.getUnallocatedResource().getMemorySize() > 0);
rm.stop();
}
private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
int nContainer, Resource resource, int priority, int startContainerId)
throws Exception {