YARN-7527. Over-allocate node resource in async-scheduling mode of CapacityScheduler. (Tao Yang via wangda)

Change-Id: I51ae6c2ab7a3d1febdd7d8d0519b63a13295ac7d
This commit is contained in:
Wangda Tan 2017-11-20 11:48:15 -08:00
parent c326fc89b0
commit 0d781dd03b
2 changed files with 74 additions and 1 deletions

View File

@ -417,7 +417,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
// Common part of check container allocation regardless if it is a // Common part of check container allocation regardless if it is a
// increase container or regular container // increase container or regular container
commonCheckContainerAllocation(allocation, schedulerContainer); if (!commonCheckContainerAllocation(allocation, schedulerContainer)) {
return false;
}
} else { } else {
// Container reserved first time will be NEW, after the container // Container reserved first time will be NEW, after the container
// accepted & confirmed, it will become RESERVED state // accepted & confirmed, it will become RESERVED state

View File

@ -405,6 +405,77 @@ public class TestCapacitySchedulerAsyncScheduling {
rm.stop(); rm.stop();
} }
@Test (timeout = 30000)
public void testNodeResourceOverAllocated()
throws Exception {
// disable async-scheduling for simulating complex scene
Configuration disableAsyncConf = new Configuration(conf);
disableAsyncConf.setBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
// init RM & NMs & Nodes
final MockRM rm = new MockRM(disableAsyncConf);
rm.start();
final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
final MockNM nm2 = rm.registerNode("h2:1234", 9 * GB);
List<MockNM> nmLst = new ArrayList<>();
nmLst.add(nm1);
nmLst.add(nm2);
// init scheduler & nodes
while (
((CapacityScheduler) rm.getRMContext().getScheduler()).getNodeTracker()
.nodeCount() < 2) {
Thread.sleep(10);
}
Assert.assertEquals(2,
((AbstractYarnScheduler) rm.getRMContext().getScheduler())
.getNodeTracker().nodeCount());
CapacityScheduler scheduler =
(CapacityScheduler) rm.getRMContext().getScheduler();
SchedulerNode sn1 = scheduler.getSchedulerNode(nm1.getNodeId());
// launch app
RMApp app = rm.submitApp(200, "app", "user", null, false, "default",
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
FiCaSchedulerApp schedulerApp =
scheduler.getApplicationAttempt(am.getApplicationAttemptId());
// allocate 2 containers and running on nm1
Resource containerResource = Resources.createResource(5 * GB);
am.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.newInstance(0), "*", containerResource, 2)),
null);
// generate over-allocated proposals for nm1
for (int containerNo = 2; containerNo <= 3; containerNo++) {
Container container = Container.newInstance(
ContainerId.newContainerId(am.getApplicationAttemptId(), containerNo),
sn1.getNodeID(), sn1.getHttpAddress(), containerResource,
Priority.newInstance(0), null);
RMContainer rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.create(ResourceRequest
.newInstance(Priority.newInstance(0), "*", containerResource, 1)),
am.getApplicationAttemptId(), sn1.getNodeID(), "user",
rm.getRMContext());
SchedulerContainer newContainer = new SchedulerContainer(schedulerApp,
scheduler.getNode(sn1.getNodeID()), rmContainer, "", true);
ContainerAllocationProposal newContainerProposal =
new ContainerAllocationProposal(newContainer, null, null,
NodeType.OFF_SWITCH, NodeType.OFF_SWITCH,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, containerResource);
List<ContainerAllocationProposal> newProposals = new ArrayList<>();
newProposals.add(newContainerProposal);
ResourceCommitRequest request =
new ResourceCommitRequest(newProposals, null, null);
scheduler.tryCommit(scheduler.getClusterResource(), request);
}
// make sure node resource can't be over-allocated!
Assert.assertTrue("Node resource is Over-allocated!",
sn1.getUnallocatedResource().getMemorySize() > 0);
rm.stop();
}
private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm, private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
int nContainer, Resource resource, int priority, int startContainerId) int nContainer, Resource resource, int priority, int startContainerId)
throws Exception { throws Exception {