YARN-9596: QueueMetrics has incorrect metrics when labelled partitions are involved. Contributed by Muhammad Samir Khan.
(cherry picked from commit 42683aef1a
)
This commit is contained in:
parent
4662b13b26
commit
36af8845de
|
@ -250,30 +250,24 @@ public class CSQueueUtils {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Resource getMaxAvailableResourceToQueue(
|
private static Resource getMaxAvailableResourceToQueuePartition(
|
||||||
final ResourceCalculator rc, RMNodeLabelsManager nlm, CSQueue queue,
|
final ResourceCalculator rc, CSQueue queue,
|
||||||
Resource cluster) {
|
Resource cluster, String partition) {
|
||||||
Set<String> nodeLabels = queue.getNodeLabelsForQueue();
|
// Calculate guaranteed resource for a label in a queue by below logic.
|
||||||
Resource totalAvailableResource = Resources.createResource(0, 0);
|
// (total label resource) * (absolute capacity of label in that queue)
|
||||||
|
Resource queueGuaranteedResource = queue.getEffectiveCapacity(partition);
|
||||||
|
|
||||||
for (String partition : nodeLabels) {
|
// Available resource in queue for a specific label will be calculated as
|
||||||
// Calculate guaranteed resource for a label in a queue by below logic.
|
// {(guaranteed resource for a label in a queue) -
|
||||||
// (total label resource) * (absolute capacity of label in that queue)
|
// (resource usage of that label in the queue)}
|
||||||
Resource queueGuranteedResource = queue.getEffectiveCapacity(partition);
|
Resource available = (Resources.greaterThan(rc, cluster,
|
||||||
|
queueGuaranteedResource,
|
||||||
|
queue.getQueueResourceUsage().getUsed(partition))) ? Resources
|
||||||
|
.componentwiseMax(Resources.subtractFrom(queueGuaranteedResource,
|
||||||
|
queue.getQueueResourceUsage().getUsed(partition)), Resources
|
||||||
|
.none()) : Resources.none();
|
||||||
|
|
||||||
// Available resource in queue for a specific label will be calculated as
|
return available;
|
||||||
// {(guaranteed resource for a label in a queue) -
|
|
||||||
// (resource usage of that label in the queue)}
|
|
||||||
// Finally accumulate this available resource to get total.
|
|
||||||
Resource available = (Resources.greaterThan(rc, cluster,
|
|
||||||
queueGuranteedResource,
|
|
||||||
queue.getQueueResourceUsage().getUsed(partition))) ? Resources
|
|
||||||
.componentwiseMax(Resources.subtractFrom(queueGuranteedResource,
|
|
||||||
queue.getQueueResourceUsage().getUsed(partition)), Resources
|
|
||||||
.none()) : Resources.none();
|
|
||||||
Resources.addTo(totalAvailableResource, available);
|
|
||||||
}
|
|
||||||
return totalAvailableResource;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -304,16 +298,27 @@ public class CSQueueUtils {
|
||||||
queueResourceUsage.getNodePartitionsSet())) {
|
queueResourceUsage.getNodePartitionsSet())) {
|
||||||
updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster),
|
updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster),
|
||||||
partition, childQueue);
|
partition, childQueue);
|
||||||
|
|
||||||
|
// Update queue metrics w.r.t node labels.
|
||||||
|
// In QueueMetrics, null label is handled the same as NO_LABEL.
|
||||||
|
// This is because queue metrics for partitions are not tracked.
|
||||||
|
// In the future, will have to change this when/if queue metrics
|
||||||
|
// for partitions also get tracked.
|
||||||
|
childQueue.getMetrics().setAvailableResourcesToQueue(
|
||||||
|
partition,
|
||||||
|
getMaxAvailableResourceToQueuePartition(rc, childQueue,
|
||||||
|
cluster, partition));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster),
|
updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster),
|
||||||
nodePartition, childQueue);
|
nodePartition, childQueue);
|
||||||
}
|
|
||||||
|
|
||||||
// Update queue metrics w.r.t node labels. In a generic way, we can
|
// Same as above.
|
||||||
// calculate available resource from all labels in cluster.
|
childQueue.getMetrics().setAvailableResourcesToQueue(
|
||||||
childQueue.getMetrics().setAvailableResourcesToQueue(nodePartition,
|
nodePartition,
|
||||||
getMaxAvailableResourceToQueue(rc, nlm, childQueue, cluster));
|
getMaxAvailableResourceToQueuePartition(rc, childQueue,
|
||||||
|
cluster, nodePartition));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1988,6 +1988,15 @@ public class TestNodeLabelContainerAllocation {
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
|
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
|
||||||
MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = y
|
MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = y
|
||||||
|
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
LeafQueue leafQueueA = (LeafQueue) cs.getQueue("a");
|
||||||
|
assertEquals(0 * GB, leafQueueA.getMetrics().getAvailableMB());
|
||||||
|
assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
|
||||||
|
LeafQueue leafQueueB = (LeafQueue) cs.getQueue("b");
|
||||||
|
assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB());
|
||||||
|
assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB());
|
||||||
|
|
||||||
// app1 -> a
|
// app1 -> a
|
||||||
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x");
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x");
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
@ -1995,7 +2004,6 @@ public class TestNodeLabelContainerAllocation {
|
||||||
// app1 asks for 5 partition=x containers
|
// app1 asks for 5 partition=x containers
|
||||||
am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
|
am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
|
||||||
// NM1 do 50 heartbeats
|
// NM1 do 50 heartbeats
|
||||||
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
||||||
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||||
|
|
||||||
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
|
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
|
||||||
|
@ -2019,17 +2027,23 @@ public class TestNodeLabelContainerAllocation {
|
||||||
Assert.assertEquals(10 * GB,
|
Assert.assertEquals(10 * GB,
|
||||||
reportNm2.getAvailableResource().getMemorySize());
|
reportNm2.getAvailableResource().getMemorySize());
|
||||||
|
|
||||||
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
|
assertEquals(0 * GB, leafQueueA.getMetrics().getAvailableMB());
|
||||||
assertEquals(5 * GB, leafQueue.getMetrics().getAvailableMB());
|
assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
|
||||||
assertEquals(0 * GB, leafQueue.getMetrics().getAllocatedMB());
|
assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB());
|
||||||
|
assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB());
|
||||||
|
|
||||||
|
// The total memory tracked by QueueMetrics is 0GB for the default partition
|
||||||
|
CSQueue rootQueue = cs.getRootQueue();
|
||||||
|
assertEquals(0*GB, rootQueue.getMetrics().getAvailableMB() +
|
||||||
|
rootQueue.getMetrics().getAllocatedMB());
|
||||||
|
|
||||||
// Kill all apps in queue a
|
// Kill all apps in queue a
|
||||||
cs.killAllAppsInQueue("a");
|
cs.killAllAppsInQueue("a");
|
||||||
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
|
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
|
||||||
rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());
|
rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());
|
||||||
|
|
||||||
assertEquals(0 * GB, leafQueue.getMetrics().getUsedAMResourceMB());
|
assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB());
|
||||||
assertEquals(0, leafQueue.getMetrics().getUsedAMResourceVCores());
|
assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores());
|
||||||
rm1.close();
|
rm1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2120,10 +2134,9 @@ public class TestNodeLabelContainerAllocation {
|
||||||
reportNm2.getAvailableResource().getMemorySize());
|
reportNm2.getAvailableResource().getMemorySize());
|
||||||
|
|
||||||
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
|
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
|
||||||
double delta = 0.0001;
|
|
||||||
// 3GB is used from label x quota. 1.5 GB is remaining from default label.
|
// 3GB is used from label x quota. 1.5 GB is remaining from default label.
|
||||||
// 2GB is remaining from label x.
|
// 2GB is remaining from label x.
|
||||||
assertEquals(6.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
|
assertEquals(15 * GB / 10, leafQueue.getMetrics().getAvailableMB());
|
||||||
assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB());
|
assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB());
|
||||||
|
|
||||||
// app1 asks for 1 default partition container
|
// app1 asks for 1 default partition container
|
||||||
|
@ -2139,10 +2152,142 @@ public class TestNodeLabelContainerAllocation {
|
||||||
Assert.assertEquals(2, schedulerNode2.getNumContainers());
|
Assert.assertEquals(2, schedulerNode2.getNumContainers());
|
||||||
|
|
||||||
// 3GB is used from label x quota. 2GB used from default label.
|
// 3GB is used from label x quota. 2GB used from default label.
|
||||||
// So total 2.5 GB is remaining.
|
// So 0.5 GB is remaining from default label.
|
||||||
assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
|
assertEquals(5 * GB / 10, leafQueue.getMetrics().getAvailableMB());
|
||||||
assertEquals(2 * GB, leafQueue.getMetrics().getAllocatedMB());
|
assertEquals(2 * GB, leafQueue.getMetrics().getAllocatedMB());
|
||||||
|
|
||||||
|
// The total memory tracked by QueueMetrics is 10GB
|
||||||
|
// for the default partition
|
||||||
|
CSQueue rootQueue = cs.getRootQueue();
|
||||||
|
assertEquals(10*GB, rootQueue.getMetrics().getAvailableMB() +
|
||||||
|
rootQueue.getMetrics().getAllocatedMB());
|
||||||
|
|
||||||
|
rm1.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueueMetricsWithMixedLabels() throws Exception {
|
||||||
|
// There is only one queue which can access both default label and label x.
|
||||||
|
// There are two nodes of 10GB label x and 12GB no label.
|
||||||
|
// The test is to make sure that the queue metrics is only tracking the
|
||||||
|
// allocations and availability from default partition
|
||||||
|
|
||||||
|
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
|
||||||
|
this.conf);
|
||||||
|
|
||||||
|
// Define top-level queues
|
||||||
|
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||||
|
new String[] {"a"});
|
||||||
|
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
|
||||||
|
|
||||||
|
final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||||
|
csConf.setCapacity(queueA, 100);
|
||||||
|
csConf.setAccessibleNodeLabels(queueA, toSet("x"));
|
||||||
|
csConf.setCapacityByLabel(queueA, "x", 100);
|
||||||
|
csConf.setMaximumCapacityByLabel(queueA, "x", 100);
|
||||||
|
|
||||||
|
// set node -> label
|
||||||
|
// label x exclusivity is set to true
|
||||||
|
mgr.addToCluserNodeLabels(
|
||||||
|
ImmutableSet.of(NodeLabel.newInstance("x", true)));
|
||||||
|
mgr.addLabelsToNode(
|
||||||
|
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
|
||||||
|
|
||||||
|
// inject node label manager
|
||||||
|
MockRM rm1 = new MockRM(csConf) {
|
||||||
|
@Override
|
||||||
|
public RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
return mgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
rm1.getRMContext().setNodeLabelManager(mgr);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
|
||||||
|
MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB); // label = <no_label>
|
||||||
|
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
LeafQueue leafQueueA = (LeafQueue) cs.getQueue("a");
|
||||||
|
assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB());
|
||||||
|
assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
|
||||||
|
|
||||||
|
// app1 -> a
|
||||||
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x");
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
|
// app1 asks for 5 partition=x containers
|
||||||
|
am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
|
||||||
|
// NM1 do 50 heartbeats
|
||||||
|
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||||
|
|
||||||
|
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
|
||||||
|
|
||||||
|
for (int i = 0; i < 50; i++) {
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
}
|
||||||
|
|
||||||
|
// app1 gets all resource in partition=x
|
||||||
|
Assert.assertEquals(6, schedulerNode1.getNumContainers());
|
||||||
|
|
||||||
|
SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
|
||||||
|
.getNodeReport(nm1.getNodeId());
|
||||||
|
Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
|
||||||
|
Assert.assertEquals(4 * GB,
|
||||||
|
reportNm1.getAvailableResource().getMemorySize());
|
||||||
|
|
||||||
|
SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
|
||||||
|
.getNodeReport(nm2.getNodeId());
|
||||||
|
Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize());
|
||||||
|
Assert.assertEquals(12 * GB,
|
||||||
|
reportNm2.getAvailableResource().getMemorySize());
|
||||||
|
|
||||||
|
assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB());
|
||||||
|
assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
|
||||||
|
|
||||||
|
// app2 -> a
|
||||||
|
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a", "");
|
||||||
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
|
||||||
|
|
||||||
|
// app2 asks for 5 partition= containers
|
||||||
|
am2.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "");
|
||||||
|
// NM2 do 50 heartbeats
|
||||||
|
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
||||||
|
|
||||||
|
SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
|
||||||
|
|
||||||
|
for (int i = 0; i < 50; i++) {
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
||||||
|
}
|
||||||
|
|
||||||
|
// app1 gets all resource in partition=x
|
||||||
|
Assert.assertEquals(6, schedulerNode2.getNumContainers());
|
||||||
|
|
||||||
|
reportNm1 = rm1.getResourceScheduler().getNodeReport(nm1.getNodeId());
|
||||||
|
Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
|
||||||
|
Assert.assertEquals(4 * GB,
|
||||||
|
reportNm1.getAvailableResource().getMemorySize());
|
||||||
|
|
||||||
|
reportNm2 = rm1.getResourceScheduler().getNodeReport(nm2.getNodeId());
|
||||||
|
Assert.assertEquals(6 * GB, reportNm2.getUsedResource().getMemorySize());
|
||||||
|
Assert.assertEquals(6 * GB,
|
||||||
|
reportNm2.getAvailableResource().getMemorySize());
|
||||||
|
|
||||||
|
assertEquals(6 * GB, leafQueueA.getMetrics().getAvailableMB());
|
||||||
|
assertEquals(6 * GB, leafQueueA.getMetrics().getAllocatedMB());
|
||||||
|
|
||||||
|
// The total memory tracked by QueueMetrics is 12GB
|
||||||
|
// for the default partition
|
||||||
|
CSQueue rootQueue = cs.getRootQueue();
|
||||||
|
assertEquals(12*GB, rootQueue.getMetrics().getAvailableMB() +
|
||||||
|
rootQueue.getMetrics().getAllocatedMB());
|
||||||
|
|
||||||
|
// Kill all apps in queue a
|
||||||
|
cs.killAllAppsInQueue("a");
|
||||||
|
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
|
||||||
|
rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());
|
||||||
|
|
||||||
|
assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB());
|
||||||
|
assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores());
|
||||||
rm1.close();
|
rm1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue