YARN-4699. Scheduler UI and REST o/p is not in sync when -replaceLabelsOnNode is used to change label of a node. (Sunil G via wangda)

This commit is contained in:
Wangda Tan 2016-04-05 16:24:11 -07:00
parent 9ba1e5af06
commit 21eb428448
3 changed files with 46 additions and 2 deletions

View File

@ -591,6 +591,9 @@ public abstract class AbstractCSQueue implements CSQueue {
} }
// ResourceUsage has its own lock, no addition lock needs here. // ResourceUsage has its own lock, no addition lock needs here.
queueUsage.incUsed(nodeLabel, resourceToInc); queueUsage.incUsed(nodeLabel, resourceToInc);
CSQueueUtils.updateUsedCapacity(resourceCalculator,
labelManager.getResourceByLabel(nodeLabel, Resources.none()),
minimumAllocation, queueUsage, queueCapacities, nodeLabel);
if (null != parent) { if (null != parent) {
parent.incUsedResource(nodeLabel, resourceToInc, null); parent.incUsedResource(nodeLabel, resourceToInc, null);
} }
@ -604,6 +607,9 @@ public abstract class AbstractCSQueue implements CSQueue {
} }
// ResourceUsage has its own lock, no addition lock needs here. // ResourceUsage has its own lock, no addition lock needs here.
queueUsage.decUsed(nodeLabel, resourceToDec); queueUsage.decUsed(nodeLabel, resourceToDec);
CSQueueUtils.updateUsedCapacity(resourceCalculator,
labelManager.getResourceByLabel(nodeLabel, Resources.none()),
minimumAllocation, queueUsage, queueCapacities, nodeLabel);
if (null != parent) { if (null != parent) {
parent.decUsedResource(nodeLabel, resourceToDec, null); parent.decUsedResource(nodeLabel, resourceToDec, null);
} }

View File

@ -180,7 +180,7 @@ class CSQueueUtils {
* Update partitioned resource usage, if nodePartition == null, will update * Update partitioned resource usage, if nodePartition == null, will update
* used resource for all partitions of this queue. * used resource for all partitions of this queue.
*/ */
private static void updateUsedCapacity(final ResourceCalculator rc, public static void updateUsedCapacity(final ResourceCalculator rc,
final Resource totalPartitionResource, final Resource minimumAllocation, final Resource totalPartitionResource, final Resource minimumAllocation,
ResourceUsage queueResourceUsage, QueueCapacities queueCapacities, ResourceUsage queueResourceUsage, QueueCapacities queueCapacities,
String nodePartition) { String nodePartition) {

View File

@ -100,6 +100,12 @@ public class TestCapacitySchedulerNodeLabelUpdate {
checkAMUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL); checkAMUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL);
} }
private void checkUsedCapacity(MockRM rm, String queueName, int capacity,
int total) {
checkUsedCapacity(rm, queueName, capacity, total,
RMNodeLabelsManager.NO_LABEL);
}
private void checkUsedResource(MockRM rm, String queueName, int memory, private void checkUsedResource(MockRM rm, String queueName, int memory,
String label) { String label) {
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
@ -108,6 +114,15 @@ public class TestCapacitySchedulerNodeLabelUpdate {
.getMemory()); .getMemory());
} }
private void checkUsedCapacity(MockRM rm, String queueName, int capacity,
int total, String label) {
float epsillon = 0.0001f;
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
CSQueue queue = scheduler.getQueue(queueName);
Assert.assertEquals((float)capacity/total,
queue.getQueueCapacities().getUsedCapacity(label), epsillon);
}
private void checkAMUsedResource(MockRM rm, String queueName, int memory, private void checkAMUsedResource(MockRM rm, String queueName, int memory,
String label) { String label) {
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
@ -188,7 +203,7 @@ public class TestCapacitySchedulerNodeLabelUpdate {
rm.stop(); rm.stop();
} }
@Test (timeout = 60000) @Test
public void testResourceUsageWhenNodeUpdatesPartition() public void testResourceUsageWhenNodeUpdatesPartition()
throws Exception { throws Exception {
// set node -> label // set node -> label
@ -233,16 +248,23 @@ public class TestCapacitySchedulerNodeLabelUpdate {
// queue-a used x=1G, ""=1G // queue-a used x=1G, ""=1G
checkUsedResource(rm, "a", 1024, "x"); checkUsedResource(rm, "a", 1024, "x");
checkUsedResource(rm, "a", 1024); checkUsedResource(rm, "a", 1024);
checkUsedCapacity(rm, "a", 1024, 8000, "x");
checkUsedCapacity(rm, "a", 1024, 8000);
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId()); FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId());
// change h1's label to z // change h1's label to z
mgr.replaceLabelsOnNode(ImmutableMap.of(nm1.getNodeId(), toSet("z")));
cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(), cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
toSet("z")))); toSet("z"))));
Thread.sleep(100);
checkUsedResource(rm, "a", 0, "x"); checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 1024, "z"); checkUsedResource(rm, "a", 1024, "z");
checkUsedResource(rm, "a", 1024); checkUsedResource(rm, "a", 1024);
checkUsedCapacity(rm, "a", 0, 8000, "x");
checkUsedCapacity(rm, "a", 1024, 8000, "z");
checkUsedCapacity(rm, "a", 1024, 8000);
checkUsedResource(rm, "root", 0, "x"); checkUsedResource(rm, "root", 0, "x");
checkUsedResource(rm, "root", 1024, "z"); checkUsedResource(rm, "root", 1024, "z");
checkUsedResource(rm, "root", 1024); checkUsedResource(rm, "root", 1024);
@ -254,12 +276,18 @@ public class TestCapacitySchedulerNodeLabelUpdate {
app.getAppAttemptResourceUsage().getUsed("z").getMemory()); app.getAppAttemptResourceUsage().getUsed("z").getMemory());
// change h1's label to y // change h1's label to y
mgr.replaceLabelsOnNode(ImmutableMap.of(nm1.getNodeId(), toSet("y")));
cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(), cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
toSet("y")))); toSet("y"))));
Thread.sleep(100);
checkUsedResource(rm, "a", 0, "x"); checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 1024, "y"); checkUsedResource(rm, "a", 1024, "y");
checkUsedResource(rm, "a", 0, "z"); checkUsedResource(rm, "a", 0, "z");
checkUsedResource(rm, "a", 1024); checkUsedResource(rm, "a", 1024);
checkUsedCapacity(rm, "a", 0, 8000, "x");
checkUsedCapacity(rm, "a", 1024, 16000, "y");
checkUsedCapacity(rm, "a", 0, 8000, "z");
checkUsedCapacity(rm, "a", 1024, 8000);
checkUsedResource(rm, "root", 0, "x"); checkUsedResource(rm, "root", 0, "x");
checkUsedResource(rm, "root", 1024, "y"); checkUsedResource(rm, "root", 1024, "y");
checkUsedResource(rm, "root", 0, "z"); checkUsedResource(rm, "root", 0, "z");
@ -278,11 +306,17 @@ public class TestCapacitySchedulerNodeLabelUpdate {
Set<String> emptyLabels = new HashSet<>(); Set<String> emptyLabels = new HashSet<>();
Map<NodeId,Set<String>> map = ImmutableMap.of(nm1.getNodeId(), Map<NodeId,Set<String>> map = ImmutableMap.of(nm1.getNodeId(),
emptyLabels); emptyLabels);
mgr.replaceLabelsOnNode(map);
cs.handle(new NodeLabelsUpdateSchedulerEvent(map)); cs.handle(new NodeLabelsUpdateSchedulerEvent(map));
Thread.sleep(100);
checkUsedResource(rm, "a", 0, "x"); checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 0, "y"); checkUsedResource(rm, "a", 0, "y");
checkUsedResource(rm, "a", 0, "z"); checkUsedResource(rm, "a", 0, "z");
checkUsedResource(rm, "a", 2048); checkUsedResource(rm, "a", 2048);
checkUsedCapacity(rm, "a", 0, 8000, "x");
checkUsedCapacity(rm, "a", 0, 8000, "y");
checkUsedCapacity(rm, "a", 0, 8000, "z");
checkUsedCapacity(rm, "a", 2048, 16000);
checkUsedResource(rm, "root", 0, "x"); checkUsedResource(rm, "root", 0, "x");
checkUsedResource(rm, "root", 0, "y"); checkUsedResource(rm, "root", 0, "y");
checkUsedResource(rm, "root", 0, "z"); checkUsedResource(rm, "root", 0, "z");
@ -314,6 +348,10 @@ public class TestCapacitySchedulerNodeLabelUpdate {
checkUsedResource(rm, "a", 0, "y"); checkUsedResource(rm, "a", 0, "y");
checkUsedResource(rm, "a", 0, "z"); checkUsedResource(rm, "a", 0, "z");
checkUsedResource(rm, "a", 0); checkUsedResource(rm, "a", 0);
checkUsedCapacity(rm, "a", 0, 8000, "x");
checkUsedCapacity(rm, "a", 0, 8000, "y");
checkUsedCapacity(rm, "a", 0, 8000, "z");
checkUsedCapacity(rm, "a", 0, 16000);
checkUsedResource(rm, "root", 0, "x"); checkUsedResource(rm, "root", 0, "x");
checkUsedResource(rm, "root", 0, "y"); checkUsedResource(rm, "root", 0, "y");
checkUsedResource(rm, "root", 0, "z"); checkUsedResource(rm, "root", 0, "z");