YARN-9838. Fix resource inconsistency for queues when moving app with reserved container to another queue. Contributed by jiulongzhu.
This commit is contained in:
parent
8551c04726
commit
c95e772d5d
@ -2393,6 +2393,11 @@ public String moveApplication(ApplicationId appId,
|
|||||||
// attach the Container to another queue
|
// attach the Container to another queue
|
||||||
dest.attachContainer(getClusterResource(), app, rmContainer);
|
dest.attachContainer(getClusterResource(), app, rmContainer);
|
||||||
}
|
}
|
||||||
|
// Move all reserved containers
|
||||||
|
for (RMContainer rmContainer : app.getReservedContainers()) {
|
||||||
|
source.detachContainer(getClusterResource(), app, rmContainer);
|
||||||
|
dest.attachContainer(getClusterResource(), app, rmContainer);
|
||||||
|
}
|
||||||
if (!app.isStopped()) {
|
if (!app.isStopped()) {
|
||||||
source.finishApplicationAttempt(app, sourceQueueName);
|
source.finishApplicationAttempt(app, sourceQueueName);
|
||||||
// Submit to a new queue
|
// Submit to a new queue
|
||||||
|
@ -2026,6 +2026,7 @@ public void attachContainer(Resource clusterResource,
|
|||||||
allocateResource(clusterResource, application, rmContainer.getContainer()
|
allocateResource(clusterResource, application, rmContainer.getContainer()
|
||||||
.getResource(), node.getPartition(), rmContainer);
|
.getResource(), node.getPartition(), rmContainer);
|
||||||
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
|
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
|
||||||
|
+ " containerState="+ rmContainer.getState()
|
||||||
+ " resource=" + rmContainer.getContainer().getResource()
|
+ " resource=" + rmContainer.getContainer().getResource()
|
||||||
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
|
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
|
||||||
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
|
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
|
||||||
@ -2045,6 +2046,7 @@ public void detachContainer(Resource clusterResource,
|
|||||||
releaseResource(clusterResource, application, rmContainer.getContainer()
|
releaseResource(clusterResource, application, rmContainer.getContainer()
|
||||||
.getResource(), node.getPartition(), rmContainer);
|
.getResource(), node.getPartition(), rmContainer);
|
||||||
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
|
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
|
||||||
|
+ " containerState="+ rmContainer.getState()
|
||||||
+ " resource=" + rmContainer.getContainer().getResource()
|
+ " resource=" + rmContainer.getContainer().getResource()
|
||||||
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
|
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
|
||||||
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
|
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
|
||||||
|
@ -5212,4 +5212,94 @@ null, new RMContainerTokenSecretManager(conf),
|
|||||||
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getMaxCapacityMB());
|
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getMaxCapacityMB());
|
||||||
assertEquals(25600, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getMaxCapacityMB());
|
assertEquals(25600, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getMaxCapacityMB());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReservedContainerLeakWhenMoveApplication() throws Exception {
|
||||||
|
CapacitySchedulerConfiguration csConf
|
||||||
|
= new CapacitySchedulerConfiguration();
|
||||||
|
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||||
|
new String[] {"a", "b"});
|
||||||
|
csConf.setCapacity("root.a", 50);
|
||||||
|
csConf.setMaximumCapacity("root.a", 100);
|
||||||
|
csConf.setUserLimitFactor("root.a", 100);
|
||||||
|
csConf.setCapacity("root.b", 50);
|
||||||
|
csConf.setMaximumCapacity("root.b", 100);
|
||||||
|
csConf.setUserLimitFactor("root.b", 100);
|
||||||
|
|
||||||
|
YarnConfiguration conf=new YarnConfiguration(csConf);
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
RMNodeLabelsManager mgr=new NullRMNodeLabelsManager();
|
||||||
|
mgr.init(conf);
|
||||||
|
MockRM rm1 = new MockRM(csConf);
|
||||||
|
CapacityScheduler scheduler=(CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
rm1.getRMContext().setNodeLabelManager(mgr);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8 * GB);
|
||||||
|
MockNM nm2 = rm1.registerNode("127.0.0.2:1234", 8 * GB);
|
||||||
|
/*
|
||||||
|
* simulation
|
||||||
|
* app1: (1 AM,1 running container)
|
||||||
|
* app2: (1 AM,1 reserved container)
|
||||||
|
*/
|
||||||
|
// launch an app to queue, AM container should be launched in nm1
|
||||||
|
RMApp app1 = rm1.submitApp(1 * GB, "app_1", "user_1", null, "a");
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
|
// launch another app to queue, AM container should be launched in nm1
|
||||||
|
RMApp app2 = rm1.submitApp(1 * GB, "app_2", "user_1", null, "a");
|
||||||
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
||||||
|
|
||||||
|
am1.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
|
||||||
|
// this containerRequest should be reserved
|
||||||
|
am2.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
|
||||||
|
|
||||||
|
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||||
|
// Do node heartbeats 2 times
|
||||||
|
// First time will allocate container for app1, second time will reserve
|
||||||
|
// container for app2
|
||||||
|
scheduler.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
scheduler.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
|
||||||
|
FiCaSchedulerApp schedulerApp1 =
|
||||||
|
scheduler.getApplicationAttempt(am1.getApplicationAttemptId());
|
||||||
|
FiCaSchedulerApp schedulerApp2 =
|
||||||
|
scheduler.getApplicationAttempt(am2.getApplicationAttemptId());
|
||||||
|
// APP1: 1 AM, 1 allocatedContainer
|
||||||
|
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
|
||||||
|
// APP2: 1 AM,1 reservedContainer
|
||||||
|
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
|
||||||
|
Assert.assertEquals(1, schedulerApp2.getReservedContainers().size());
|
||||||
|
//before,move app2 which has one reservedContainer
|
||||||
|
LeafQueue srcQueue = (LeafQueue) scheduler.getQueue("a");
|
||||||
|
LeafQueue desQueue = (LeafQueue) scheduler.getQueue("b");
|
||||||
|
Assert.assertEquals(4, srcQueue.getNumContainers());
|
||||||
|
Assert.assertEquals(10*GB, srcQueue.getUsedResources().getMemorySize());
|
||||||
|
Assert.assertEquals(0, desQueue.getNumContainers());
|
||||||
|
Assert.assertEquals(0, desQueue.getUsedResources().getMemorySize());
|
||||||
|
//app1 ResourceUsage (0 reserved)
|
||||||
|
Assert.assertEquals(5*GB,
|
||||||
|
schedulerApp1
|
||||||
|
.getAppAttemptResourceUsage().getAllUsed().getMemorySize());
|
||||||
|
Assert.assertEquals(0,
|
||||||
|
schedulerApp1.getCurrentReservation().getMemorySize());
|
||||||
|
//app2 ResourceUsage (4GB reserved)
|
||||||
|
Assert.assertEquals(1*GB,
|
||||||
|
schedulerApp2
|
||||||
|
.getAppAttemptResourceUsage().getAllUsed().getMemorySize());
|
||||||
|
Assert.assertEquals(4*GB,
|
||||||
|
schedulerApp2.getCurrentReservation().getMemorySize());
|
||||||
|
//move app2 which has one reservedContainer
|
||||||
|
scheduler.moveApplication(app2.getApplicationId(), "b");
|
||||||
|
// keep this order
|
||||||
|
// if killing app1 first,the reservedContainer of app2 will be allocated
|
||||||
|
rm1.killApp(app2.getApplicationId());
|
||||||
|
rm1.killApp(app1.getApplicationId());
|
||||||
|
//after,moved app2 which has one reservedContainer
|
||||||
|
Assert.assertEquals(0, srcQueue.getNumContainers());
|
||||||
|
Assert.assertEquals(0, desQueue.getNumContainers());
|
||||||
|
Assert.assertEquals(0, srcQueue.getUsedResources().getMemorySize());
|
||||||
|
Assert.assertEquals(0, desQueue.getUsedResources().getMemorySize());
|
||||||
|
rm1.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user