YARN-5788. Apps not activiated and AM limit resource in UI and REST not updated after -replaceLabelsOnNode (Bibin A Chundatt via Varun Saxena)

This commit is contained in:
Varun Saxena 2016-11-01 16:26:38 +05:30
parent 2f0a101ab9
commit 91ddea5bfb
2 changed files with 133 additions and 56 deletions

View File

@ -1133,8 +1133,6 @@ private void updateNodeAndQueueResource(RMNode nm,
*/ */
private void updateLabelsOnNode(NodeId nodeId, private void updateLabelsOnNode(NodeId nodeId,
Set<String> newLabels) { Set<String> newLabels) {
try {
writeLock.lock();
FiCaSchedulerNode node = nodeTracker.getNode(nodeId); FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
if (null == node) { if (null == node) {
return; return;
@ -1175,9 +1173,6 @@ private void updateLabelsOnNode(NodeId nodeId,
// Update node labels after we've done this // Update node labels after we've done this
node.updateLabels(newLabels); node.updateLabels(newLabels);
} finally {
writeLock.unlock();
}
} }
private void updateSchedulerHealth(long now, FiCaSchedulerNode node, private void updateSchedulerHealth(long now, FiCaSchedulerNode node,
@ -1373,12 +1368,7 @@ public void handle(SchedulerEvent event) {
NodeLabelsUpdateSchedulerEvent labelUpdateEvent = NodeLabelsUpdateSchedulerEvent labelUpdateEvent =
(NodeLabelsUpdateSchedulerEvent) event; (NodeLabelsUpdateSchedulerEvent) event;
for (Entry<NodeId, Set<String>> entry : labelUpdateEvent updateNodeLabelsAndQueueResource(labelUpdateEvent);
.getUpdatedNodeToLabels().entrySet()) {
NodeId id = entry.getKey();
Set<String> labels = entry.getValue();
updateLabelsOnNode(id, labels);
}
} }
break; break;
case NODE_UPDATE: case NODE_UPDATE:
@ -1483,6 +1473,27 @@ public void handle(SchedulerEvent event) {
} }
} }
/**
* Process node labels update.
*/
private void updateNodeLabelsAndQueueResource(
NodeLabelsUpdateSchedulerEvent labelUpdateEvent) {
try {
writeLock.lock();
for (Entry<NodeId, Set<String>> entry : labelUpdateEvent
.getUpdatedNodeToLabels().entrySet()) {
NodeId id = entry.getKey();
Set<String> labels = entry.getValue();
updateLabelsOnNode(id, labels);
}
Resource clusterResource = getClusterResource();
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
} finally {
writeLock.unlock();
}
}
private void addNode(RMNode nodeManager) { private void addNode(RMNode nodeManager) {
try { try {
writeLock.lock(); writeLock.lock();

View File

@ -413,7 +413,7 @@ public RMNodeLabelsManager createNodeLabelManager() {
rm.close(); rm.close();
} }
@Test(timeout = 3000000) @Test(timeout = 300000)
public void testMoveApplicationWithLabel() throws Exception { public void testMoveApplicationWithLabel() throws Exception {
// set node -> label // set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity( mgr.addToCluserNodeLabelsWithDefaultExclusivity(
@ -589,7 +589,49 @@ public RMNodeLabelsManager createNodeLabelManager() {
rm.close(); rm.close();
} }
@Test (timeout = 60000) @Test
public void testAMResourceLimitNodeUpdatePartition() throws Exception {
conf.setInt("yarn.scheduler.minimum-allocation-mb", 64);
// inject node label manager
MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
rm.registerNode("h1:1234", 6400);
mgr.addToCluserNodeLabelsWithDefaultExclusivity(
ImmutableSet.of("x", "y", "z"));
// .1 percentage of 6400 will be for am
checkAMResourceLimit(rm, "a", 640, "");
checkAMResourceLimit(rm, "a", 0, "x");
checkAMResourceLimit(rm, "a", 0, "y");
checkAMResourceLimit(rm, "a", 0, "z");
mgr.replaceLabelsOnNode(
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
rm.drainEvents();
checkAMResourceLimit(rm, "a", 640, "x");
checkAMResourceLimit(rm, "a", 0, "y");
checkAMResourceLimit(rm, "a", 0, "z");
checkAMResourceLimit(rm, "a", 0, "");
// Switch
mgr.replaceLabelsOnNode(
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
rm.drainEvents();
checkAMResourceLimit(rm, "a", 0, "x");
checkAMResourceLimit(rm, "a", 640, "y");
checkAMResourceLimit(rm, "a", 0, "z");
checkAMResourceLimit(rm, "a", 0, "");
}
@Test(timeout = 60000)
public void testAMResourceUsageWhenNodeUpdatesPartition() public void testAMResourceUsageWhenNodeUpdatesPartition()
throws Exception { throws Exception {
// set node -> label // set node -> label
@ -638,8 +680,8 @@ public RMNodeLabelsManager createNodeLabelManager() {
FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId()); FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId());
// change h1's label to z // change h1's label to z
cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(), cs.handle(new NodeLabelsUpdateSchedulerEvent(
toSet("z")))); ImmutableMap.of(nm1.getNodeId(), toSet("z"))));
// Now the resources also should change from x to z. Verify AM and normal // Now the resources also should change from x to z. Verify AM and normal
// used resource are successfully changed. // used resource are successfully changed.
@ -677,4 +719,28 @@ public RMNodeLabelsManager createNodeLabelManager() {
rm.close(); rm.close();
} }
private void checkAMResourceLimit(MockRM rm, String queuename, int memory,
String label) throws InterruptedException {
Assert.assertEquals(memory,
waitForResourceUpdate(rm, queuename, memory, label, 3000L));
}
private long waitForResourceUpdate(MockRM rm, String queuename, long memory,
String label, long timeout) throws InterruptedException {
long start = System.currentTimeMillis();
long memorySize = 0;
while (System.currentTimeMillis() - start < timeout) {
CapacityScheduler scheduler =
(CapacityScheduler) rm.getResourceScheduler();
CSQueue queue = scheduler.getQueue(queuename);
memorySize =
queue.getQueueResourceUsage().getAMLimit(label).getMemorySize();
if (memory == memorySize) {
return memorySize;
}
Thread.sleep(100);
}
return memorySize;
}
} }