YARN-5555. Scheduler UI: "% of Queue" is inaccurate if leaf queue is hierarchically nested. Contributed by Eric Payne.
This commit is contained in:
parent
1742022390
commit
d5ea508ca2
@ -29,6 +29,7 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
@ -55,6 +56,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
|
||||||
@ -594,4 +596,29 @@ public void updateNodeInfoForAMDiagnostics(FiCaSchedulerNode node) {
|
|||||||
updateAMContainerDiagnostics(AMState.ACTIVATED, diagnosticMessageBldr.toString());
|
updateAMContainerDiagnostics(AMState.ACTIVATED, diagnosticMessageBldr.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Recalculates the per-app, percent of queue metric, specific to the
|
||||||
|
* Capacity Scheduler.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
|
||||||
|
ApplicationResourceUsageReport report = super.getResourceUsageReport();
|
||||||
|
Resource cluster = rmContext.getScheduler().getClusterResource();
|
||||||
|
Resource totalPartitionRes =
|
||||||
|
rmContext.getNodeLabelManager()
|
||||||
|
.getResourceByLabel(getAppAMNodePartitionName(), cluster);
|
||||||
|
ResourceCalculator calc = rmContext.getScheduler().getResourceCalculator();
|
||||||
|
if (!calc.isInvalidDivisor(totalPartitionRes)) {
|
||||||
|
float queueAbsMaxCapPerPartition =
|
||||||
|
((AbstractCSQueue)getQueue()).getQueueCapacities()
|
||||||
|
.getAbsoluteCapacity(getAppAMNodePartitionName());
|
||||||
|
float queueUsagePerc =
|
||||||
|
calc.divide(totalPartitionRes, report.getUsedResources(),
|
||||||
|
Resources.multiply(totalPartitionRes,
|
||||||
|
queueAbsMaxCapPerPartition)) * 100;
|
||||||
|
report.setQueueUsagePercentage(queueUsagePerc);
|
||||||
|
}
|
||||||
|
return report;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,8 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
|
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
@ -44,6 +46,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
||||||
@ -3284,6 +3287,90 @@ private CapacitySchedulerContext mockCSContext(
|
|||||||
return csContext;
|
return csContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testApplicationQueuePercent()
|
||||||
|
throws Exception {
|
||||||
|
Resource res = Resource.newInstance(10 * 1024, 10);
|
||||||
|
CapacityScheduler scheduler = mock(CapacityScheduler.class);
|
||||||
|
when(scheduler.getClusterResource()).thenReturn(res);
|
||||||
|
when(scheduler.getResourceCalculator())
|
||||||
|
.thenReturn(new DefaultResourceCalculator());
|
||||||
|
|
||||||
|
ApplicationAttemptId appAttId = createAppAttemptId(0, 0);
|
||||||
|
RMContext rmContext = mock(RMContext.class);
|
||||||
|
when(rmContext.getEpoch()).thenReturn(3L);
|
||||||
|
when(rmContext.getScheduler()).thenReturn(scheduler);
|
||||||
|
when(rmContext.getRMApps())
|
||||||
|
.thenReturn(new ConcurrentHashMap<ApplicationId, RMApp>());
|
||||||
|
RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class);
|
||||||
|
when(nlm.getResourceByLabel((String)any(), (Resource)any())).thenReturn(res);
|
||||||
|
when(rmContext.getNodeLabelManager()).thenReturn(nlm);
|
||||||
|
|
||||||
|
// Queue "test" consumes 100% of the cluster, so its capacity and absolute
|
||||||
|
// capacity are both 1.0f.
|
||||||
|
Queue queue = createQueue("test", null, 1.0f, 1.0f);
|
||||||
|
final String user = "user1";
|
||||||
|
FiCaSchedulerApp app =
|
||||||
|
new FiCaSchedulerApp(appAttId, user, queue,
|
||||||
|
queue.getActiveUsersManager(), rmContext);
|
||||||
|
|
||||||
|
// Resource request
|
||||||
|
Resource requestedResource = Resource.newInstance(1536, 2);
|
||||||
|
app.getAppAttemptResourceUsage().incUsed(requestedResource);
|
||||||
|
// In "test" queue, 1536 used is 15% of both the queue and the cluster
|
||||||
|
assertEquals(15.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
|
||||||
|
0.01f);
|
||||||
|
assertEquals(15.0f,
|
||||||
|
app.getResourceUsageReport().getClusterUsagePercentage(), 0.01f);
|
||||||
|
|
||||||
|
// Queue "test2" is a child of root and its capacity is 50% of root. As a
|
||||||
|
// child of root, its absolute capaicty is also 50%.
|
||||||
|
queue = createQueue("test2", null, 0.5f, 0.5f);
|
||||||
|
app = new FiCaSchedulerApp(appAttId, user, queue,
|
||||||
|
queue.getActiveUsersManager(), rmContext);
|
||||||
|
app.getAppAttemptResourceUsage().incUsed(requestedResource);
|
||||||
|
// In "test2" queue, 1536 used is 30% of "test2" and 15% of the cluster.
|
||||||
|
assertEquals(30.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
|
||||||
|
0.01f);
|
||||||
|
assertEquals(15.0f,
|
||||||
|
app.getResourceUsageReport().getClusterUsagePercentage(), 0.01f);
|
||||||
|
|
||||||
|
// Queue "test2.1" is 50% of queue "test2", which is 50% of the cluster.
|
||||||
|
// Therefore, "test2.1" capacity is 50% and absolute capacity is 25%.
|
||||||
|
AbstractCSQueue qChild = createQueue("test2.1", queue, 0.5f, 0.25f);
|
||||||
|
app = new FiCaSchedulerApp(appAttId, user, qChild,
|
||||||
|
qChild.getActiveUsersManager(), rmContext);
|
||||||
|
app.getAppAttemptResourceUsage().incUsed(requestedResource);
|
||||||
|
// In "test2.1" queue, 1536 used is 60% of "test2.1" and 15% of the cluster.
|
||||||
|
assertEquals(60.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
|
||||||
|
0.01f);
|
||||||
|
assertEquals(15.0f,
|
||||||
|
app.getResourceUsageReport().getClusterUsagePercentage(), 0.01f);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
|
||||||
|
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
|
||||||
|
ApplicationAttemptId attId =
|
||||||
|
ApplicationAttemptId.newInstance(appIdImpl, attemptId);
|
||||||
|
return attId;
|
||||||
|
}
|
||||||
|
|
||||||
|
private AbstractCSQueue createQueue(String name, Queue parent, float capacity,
|
||||||
|
float absCap) {
|
||||||
|
CSQueueMetrics metrics = CSQueueMetrics.forQueue(name, parent, false, cs.getConf());
|
||||||
|
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null,
|
||||||
|
null, QueueState.RUNNING, null, "", null, false);
|
||||||
|
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
|
||||||
|
AbstractCSQueue queue = mock(AbstractCSQueue.class);
|
||||||
|
when(queue.getMetrics()).thenReturn(metrics);
|
||||||
|
when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
|
||||||
|
when(queue.getQueueInfo(false, false)).thenReturn(queueInfo);
|
||||||
|
QueueCapacities qCaps = mock(QueueCapacities.class);
|
||||||
|
when(qCaps.getAbsoluteCapacity((String)any())).thenReturn(absCap);
|
||||||
|
when(queue.getQueueCapacities()).thenReturn(qCaps);
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
if (cs != null) {
|
if (cs != null) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user