YARN-2315. FairScheduler: Set current capacity in addition to capacity. (Zhihai Xu via kasha)
(cherry picked from commit a9a0cc3679
)
This commit is contained in:
parent
564deb75a8
commit
7ba5bb0c5b
|
@ -94,6 +94,9 @@ Release 2.7.0 - UNRELEASED
|
|||
YARN-2865. Fixed RM to always create a new RMContext when transtions from
|
||||
StandBy to Active. (Rohith Sharmaks via jianhe)
|
||||
|
||||
YARN-2315. FairScheduler: Set current capacity in addition to capacity.
|
||||
(Zhihai Xu via kasha)
|
||||
|
||||
Release 2.6.0 - 2014-11-18
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -123,12 +123,20 @@ public abstract class FSQueue implements Queue, Schedulable {
|
|||
public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) {
|
||||
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
|
||||
queueInfo.setQueueName(getQueueName());
|
||||
// TODO: we might change these queue metrics around a little bit
|
||||
// to match the semantics of the fair scheduler.
|
||||
|
||||
if (scheduler.getClusterResource().getMemory() == 0) {
|
||||
queueInfo.setCapacity(0.0f);
|
||||
} else {
|
||||
queueInfo.setCapacity((float) getFairShare().getMemory() /
|
||||
scheduler.getClusterResource().getMemory());
|
||||
queueInfo.setCapacity((float) getResourceUsage().getMemory() /
|
||||
scheduler.getClusterResource().getMemory());
|
||||
}
|
||||
|
||||
if (getFairShare().getMemory() == 0) {
|
||||
queueInfo.setCurrentCapacity(0.0f);
|
||||
} else {
|
||||
queueInfo.setCurrentCapacity((float) getResourceUsage().getMemory() /
|
||||
getFairShare().getMemory());
|
||||
}
|
||||
|
||||
ArrayList<QueueInfo> childQueueInfos = new ArrayList<QueueInfo>();
|
||||
if (includeChildQueues) {
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||
|
@ -505,6 +506,66 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
assertEquals(4096, queue.getFairShare().getMemory());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQueueInfo() throws IOException {
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"queueA\">");
|
||||
out.println("<weight>.25</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"queueB\">");
|
||||
out.println("<weight>.75</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
// Add one big node (only care about aggregate capacity)
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
// Queue A wants 1 * 1024.
|
||||
createSchedulingRequest(1 * 1024, "queueA", "user1");
|
||||
// Queue B wants 6 * 1024
|
||||
createSchedulingRequest(6 * 1024, "queueB", "user1");
|
||||
|
||||
scheduler.update();
|
||||
|
||||
// Capacity should be the same as weight of Queue,
|
||||
// because the sum of all active Queues' weight are 1.
|
||||
// Before NodeUpdate Event, CurrentCapacity should be 0
|
||||
QueueInfo queueInfo = scheduler.getQueueInfo("queueA", false, false);
|
||||
Assert.assertEquals(0.25f, queueInfo.getCapacity(), 0.0f);
|
||||
Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity(), 0.0f);
|
||||
queueInfo = scheduler.getQueueInfo("queueB", false, false);
|
||||
Assert.assertEquals(0.75f, queueInfo.getCapacity(), 0.0f);
|
||||
Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity(), 0.0f);
|
||||
|
||||
// Each NodeUpdate Event will only assign one container.
|
||||
// To assign two containers, call handle NodeUpdate Event twice.
|
||||
NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent2);
|
||||
scheduler.handle(nodeEvent2);
|
||||
|
||||
// After NodeUpdate Event, CurrentCapacity for queueA should be 1/2=0.5
|
||||
// and CurrentCapacity for queueB should be 6/6=1.
|
||||
queueInfo = scheduler.getQueueInfo("queueA", false, false);
|
||||
Assert.assertEquals(0.25f, queueInfo.getCapacity(), 0.0f);
|
||||
Assert.assertEquals(0.5f, queueInfo.getCurrentCapacity(), 0.0f);
|
||||
queueInfo = scheduler.getQueueInfo("queueB", false, false);
|
||||
Assert.assertEquals(0.75f, queueInfo.getCapacity(), 0.0f);
|
||||
Assert.assertEquals(1.0f, queueInfo.getCurrentCapacity(), 0.0f);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleHierarchicalFairShareCalculation() throws IOException {
|
||||
scheduler.init(conf);
|
||||
|
|
Loading…
Reference in New Issue