YARN-5342. Improve non-exclusive node partition resource allocation in Capacity Scheduler. (Sunil G via wangda)
This commit is contained in:
parent
33a87ffe10
commit
c322e749d6
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
@ -660,9 +661,16 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Non-exclusive scheduling opportunity is different: we need reset
|
// Non-exclusive scheduling opportunity is different: we need reset
|
||||||
// it every time to make sure non-labeled resource request will be
|
// it when:
|
||||||
|
// - It allocated on the default partition
|
||||||
|
//
|
||||||
|
// This is to make sure non-labeled resource request will be
|
||||||
// most likely allocated on non-labeled nodes first.
|
// most likely allocated on non-labeled nodes first.
|
||||||
application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority);
|
if (StringUtils.equals(node.getPartition(),
|
||||||
|
RMNodeLabelsManager.NO_LABEL)) {
|
||||||
|
application
|
||||||
|
.resetMissedNonPartitionedRequestSchedulingOpportunity(priority);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return allocationResult;
|
return allocationResult;
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||||
|
@ -280,6 +281,7 @@ public class TestApplicationPriority {
|
||||||
// If app3 (highest priority among rest) gets active, it indicates that
|
// If app3 (highest priority among rest) gets active, it indicates that
|
||||||
// priority is working with pendingApplications.
|
// priority is working with pendingApplications.
|
||||||
rm.killApp(app1.getApplicationId());
|
rm.killApp(app1.getApplicationId());
|
||||||
|
rm.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED);
|
||||||
|
|
||||||
// kick the scheduler, app3 (high among pending) gets free space
|
// kick the scheduler, app3 (high among pending) gets free space
|
||||||
MockAM am3 = MockRM.launchAM(app3, rm, nm1);
|
MockAM am3 = MockRM.launchAM(app3, rm, nm1);
|
||||||
|
|
|
@ -691,20 +691,19 @@ public class TestNodeLabelContainerAllocation {
|
||||||
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
|
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
|
||||||
MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
|
MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
|
||||||
|
|
||||||
ContainerId nextContainerId;
|
|
||||||
|
|
||||||
// launch an app to queue b1 (label = y), AM container should be launched in nm3
|
// launch an app to queue b1 (label = y), AM container should be launched in nm3
|
||||||
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
|
||||||
|
|
||||||
// request containers from am2, priority=1 asks for "" and priority=2 asks
|
// request containers from am2, priority=1 asks for "" and priority=2 asks
|
||||||
// for "y", "y" container should be allocated first
|
// for "y", "y" container should be allocated first
|
||||||
nextContainerId =
|
|
||||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
|
||||||
am1.allocate("*", 1 * GB, 1, 1, new ArrayList<ContainerId>(), "");
|
am1.allocate("*", 1 * GB, 1, 1, new ArrayList<ContainerId>(), "");
|
||||||
am1.allocate("*", 1 * GB, 1, 2, new ArrayList<ContainerId>(), "y");
|
am1.allocate("*", 1 * GB, 1, 2, new ArrayList<ContainerId>(), "y");
|
||||||
Assert.assertTrue(rm1.waitForState(nm1, nextContainerId,
|
|
||||||
RMContainerState.ALLOCATED, 10 * 1000));
|
// Do a node heartbeat once
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(
|
||||||
|
rm1.getRMContext().getRMNodes().get(nm1.getNodeId())));
|
||||||
|
|
||||||
// Check pending resource for am2, priority=1 doesn't get allocated before
|
// Check pending resource for am2, priority=1 doesn't get allocated before
|
||||||
// priority=2 allocated
|
// priority=2 allocated
|
||||||
|
@ -1583,7 +1582,7 @@ public class TestNodeLabelContainerAllocation {
|
||||||
// Test case 7
|
// Test case 7
|
||||||
// After c allocated, d will go first because it has less used_capacity(x)
|
// After c allocated, d will go first because it has less used_capacity(x)
|
||||||
// than c
|
// than c
|
||||||
doNMHeartbeat(rm, nm1.getNodeId(), 2);
|
doNMHeartbeat(rm, nm1.getNodeId(), 1);
|
||||||
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
|
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
|
||||||
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
|
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
|
||||||
checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
|
checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
|
||||||
|
|
Loading…
Reference in New Issue