Merge -r 1433483:1433484 from trunk to branch-2. Fixes: YARN-335. Fair scheduler doesn't check whether rack needs containers before assigning to node. Contributed by Sandy Ryza.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1433504 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5e826c4b28
commit
416e23ad47
|
@ -172,6 +172,9 @@ Release 2.0.3-alpha - Unreleased
|
||||||
|
|
||||||
YARN-330. Fix flakey test: TestNodeManagerShutdown#testKillContainersOnShutdown.
|
YARN-330. Fix flakey test: TestNodeManagerShutdown#testKillContainersOnShutdown.
|
||||||
(Sandy Ryza via hitesh)
|
(Sandy Ryza via hitesh)
|
||||||
|
|
||||||
|
YARN-335. Fair scheduler doesn't check whether rack needs containers
|
||||||
|
before assigning to node. (Sandy Ryza via tomwhite)
|
||||||
|
|
||||||
Release 2.0.2-alpha - 2012-09-07
|
Release 2.0.2-alpha - 2012-09-07
|
||||||
|
|
||||||
|
|
|
@ -307,20 +307,27 @@ public class AppSchedulable extends Schedulable {
|
||||||
// (not scheduled) in order to promote better locality.
|
// (not scheduled) in order to promote better locality.
|
||||||
synchronized (app) {
|
synchronized (app) {
|
||||||
for (Priority priority : prioritiesToTry) {
|
for (Priority priority : prioritiesToTry) {
|
||||||
|
if (app.getTotalRequiredResources(priority) <= 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
app.addSchedulingOpportunity(priority);
|
app.addSchedulingOpportunity(priority);
|
||||||
|
|
||||||
|
ResourceRequest rackLocalRequest = app.getResourceRequest(priority,
|
||||||
|
node.getRackName());
|
||||||
|
ResourceRequest localRequest = app.getResourceRequest(priority,
|
||||||
|
node.getHostName());
|
||||||
|
|
||||||
NodeType allowedLocality = app.getAllowedLocalityLevel(priority,
|
NodeType allowedLocality = app.getAllowedLocalityLevel(priority,
|
||||||
scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(),
|
scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(),
|
||||||
scheduler.getRackLocalityThreshold());
|
scheduler.getRackLocalityThreshold());
|
||||||
|
|
||||||
ResourceRequest localRequest = app.getResourceRequest(priority,
|
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
|
||||||
node.getHostName());
|
&& localRequest != null && localRequest.getNumContainers() != 0) {
|
||||||
if (localRequest != null && localRequest.getNumContainers() != 0) {
|
|
||||||
return assignContainer(node, app, priority,
|
return assignContainer(node, app, priority,
|
||||||
localRequest, NodeType.NODE_LOCAL, reserved);
|
localRequest, NodeType.NODE_LOCAL, reserved);
|
||||||
}
|
}
|
||||||
|
|
||||||
ResourceRequest rackLocalRequest = app.getResourceRequest(priority,
|
|
||||||
node.getRackName());
|
|
||||||
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
|
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
|
||||||
&& (allowedLocality.equals(NodeType.RACK_LOCAL) ||
|
&& (allowedLocality.equals(NodeType.RACK_LOCAL) ||
|
||||||
allowedLocality.equals(NodeType.OFF_SWITCH))) {
|
allowedLocality.equals(NodeType.OFF_SWITCH))) {
|
||||||
|
|
|
@ -1275,4 +1275,46 @@ public class TestFairScheduler {
|
||||||
FSSchedulerApp app2 = scheduler.applications.get(attId2);
|
FSSchedulerApp app2 = scheduler.applications.get(attId2);
|
||||||
assertNull("The application was allowed", app2);
|
assertNull("The application was allowed", app2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleNodesSingleRackRequest() throws Exception {
|
||||||
|
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
|
||||||
|
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
|
||||||
|
RMNode node3 = MockNodes.newNodeInfo(2, Resources.createResource(1024));
|
||||||
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeEvent1);
|
||||||
|
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||||
|
scheduler.handle(nodeEvent2);
|
||||||
|
|
||||||
|
ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||||
|
scheduler.addApplication(appId, "queue1", "user1");
|
||||||
|
|
||||||
|
// 1 request with 2 nodes on the same rack. another request with 1 node on
|
||||||
|
// a different rack
|
||||||
|
List<ResourceRequest> asks = new ArrayList<ResourceRequest>();
|
||||||
|
asks.add(createResourceRequest(1024, node1.getHostName(), 1, 1));
|
||||||
|
asks.add(createResourceRequest(1024, node2.getHostName(), 1, 1));
|
||||||
|
asks.add(createResourceRequest(1024, node3.getHostName(), 1, 1));
|
||||||
|
asks.add(createResourceRequest(1024, node1.getRackName(), 1, 1));
|
||||||
|
asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1));
|
||||||
|
asks.add(createResourceRequest(1024, RMNode.ANY, 1, 2));
|
||||||
|
|
||||||
|
scheduler.allocate(appId, asks, new ArrayList<ContainerId>());
|
||||||
|
|
||||||
|
// node 1 checks in
|
||||||
|
scheduler.update();
|
||||||
|
NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1,
|
||||||
|
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
|
||||||
|
scheduler.handle(updateEvent1);
|
||||||
|
// should assign node local
|
||||||
|
assertEquals(1, scheduler.applications.get(appId).getLiveContainers().size());
|
||||||
|
|
||||||
|
// node 2 checks in
|
||||||
|
scheduler.update();
|
||||||
|
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2,
|
||||||
|
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
|
||||||
|
scheduler.handle(updateEvent2);
|
||||||
|
// should assign rack local
|
||||||
|
assertEquals(2, scheduler.applications.get(appId).getLiveContainers().size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue