diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c3c714a4fce..498607fc5a0 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -96,6 +96,9 @@ Release 2.0.5-beta - UNRELEASED YARN-422. Add a NM Client library to help application-writers. (Zhijie Shen via vinodkv) + YARN-392. Make it possible to specify hard locality constraints in resource + requests. (sandyr via tucu) + IMPROVEMENTS YARN-365. Change NM heartbeat handling to not generate a scheduler event diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index 81c418ea35a..4085ca1fdc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -155,6 +155,44 @@ public abstract class ResourceRequest implements Comparable { @Stable public abstract void setNumContainers(int numContainers); + /** + * Get whether locality relaxation is enabled with this + * ResourceRequest. Defaults to true. + * + * @return whether locality relaxation is enabled with this + * ResourceRequest. + */ + @Public + @Stable + public abstract boolean getRelaxLocality(); + + /** + * For a request at a network hierarchy level, set whether locality can be relaxed + * to that level and beyond. + * + * If the flag is off on a rack-level ResourceRequest, + * containers at that request's priority will not be assigned to nodes on that + * request's rack unless requests specifically for those nodes have also been + * submitted. + * + * If the flag is off on an {@link ResourceRequest#ANY}-level + * ResourceRequest, containers at that request's priority will + * only be assigned on racks for which specific requests have also been + * submitted. + * + * For example, to request a container strictly on a specific node, the + * corresponding rack-level and any-level requests should have locality + * relaxation set to false. Similarly, to request a container strictly on a + * specific rack, the corresponding any-level request should have locality + * relaxation set to false. + * + * @param relaxLocality whether locality relaxation is enabled with this + * ResourceRequest. + */ + @Public + @Stable + public abstract void setRelaxLocality(boolean relaxLocality); + @Override public int hashCode() { final int prime = 2153; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java index ba064c73f6b..3726daaf2a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java @@ -146,6 +146,18 @@ public class ResourceRequestPBImpl extends ResourceRequest { maybeInitBuilder(); builder.setNumContainers((numContainers)); } + + @Override + public boolean getRelaxLocality() { + ResourceRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.getRelaxLocality(); + } + + @Override + public void setRelaxLocality(boolean relaxLocality) { + maybeInitBuilder(); + builder.setRelaxLocality(relaxLocality); + } private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { return new PriorityPBImpl(p); @@ -167,6 +179,7 @@ public class ResourceRequestPBImpl extends ResourceRequest { public String toString() { return "{Priority: " + getPriority() + ", Capability: " + getCapability() + ", # Containers: " + getNumContainers() - + ", Location: " + getHostName() + "}"; + + ", Location: " + getHostName() + + ", Relax Locality: " + getRelaxLocality() + "}"; } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 98d40b94b3f..84eccfb3534 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -204,6 +204,7 @@ message ResourceRequestProto { optional string host_name = 2; optional ResourceProto capability = 3; optional int32 num_containers = 4; + optional bool relax_locality = 5 [default = true]; } //////////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 6b48c8f4d6f..e5a5d69ba80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -316,6 +316,11 @@ public class AppSchedulable extends Schedulable { ResourceRequest localRequest = app.getResourceRequest(priority, node.getHostName()); + if (localRequest != null && !localRequest.getRelaxLocality()) { + LOG.warn("Relax locality off is not supported on local request: " + + localRequest); + } + NodeType allowedLocality = app.getAllowedLocalityLevel(priority, scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(), scheduler.getRackLocalityThreshold()); @@ -325,6 +330,10 @@ public class AppSchedulable extends Schedulable { return assignContainer(node, priority, localRequest, NodeType.NODE_LOCAL, reserved); } + + if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) { + continue; + } if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 && (allowedLocality.equals(NodeType.RACK_LOCAL) || @@ -335,6 +344,10 @@ public class AppSchedulable extends Schedulable { ResourceRequest offSwitchRequest = app.getResourceRequest(priority, ResourceRequest.ANY); + if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) { + continue; + } + if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0 && allowedLocality.equals(NodeType.OFF_SWITCH)) { return assignContainer(node, priority, offSwitchRequest, @@ -359,10 +372,23 @@ public class AppSchedulable extends Schedulable { * given node, if the node had full space. */ public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) { - // TODO: add checks stuff about node specific scheduling here - ResourceRequest request = app.getResourceRequest(prio, ResourceRequest.ANY); - return request.getNumContainers() > 0 && + ResourceRequest anyRequest = app.getResourceRequest(prio, ResourceRequest.ANY); + ResourceRequest rackRequest = app.getResourceRequest(prio, node.getRackName()); + ResourceRequest nodeRequest = app.getResourceRequest(prio, node.getHostName()); + + return + // There must be outstanding requests at the given priority: + anyRequest != null && anyRequest.getNumContainers() > 0 && + // If locality relaxation is turned off at *-level, there must be a + // non-zero request for the node's rack: + (anyRequest.getRelaxLocality() || + (rackRequest != null && rackRequest.getNumContainers() > 0)) && + // If locality relaxation is turned off at rack-level, there must be a + // non-zero request at the node: + (rackRequest == null || rackRequest.getRelaxLocality() || + (nodeRequest != null && nodeRequest.getNumContainers() > 0)) && + // The requested container must be able to fit on the node: Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null, - request.getCapability(), node.getRMNode().getTotalCapability()); + anyRequest.getCapability(), node.getRMNode().getTotalCapability()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index b5074f24a70..1571121de3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -151,7 +151,7 @@ public class TestFairScheduler { private ResourceRequest createResourceRequest(int memory, String host, - int priority, int numContainers) { + int priority, int numContainers, boolean relaxLocality) { ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); request.setCapability(Resources.createResource(memory)); request.setHostName(host); @@ -159,6 +159,7 @@ public class TestFairScheduler { Priority prio = recordFactory.newRecordInstance(Priority.class); prio.setPriority(priority); request.setPriority(prio); + request.setRelaxLocality(relaxLocality); return request; } @@ -182,7 +183,7 @@ public class TestFairScheduler { scheduler.addApplication(id, queueId, userId); List ask = new ArrayList(); ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY, - priority, numContainers); + priority, numContainers, true); ask.add(request); scheduler.allocate(id, ask, new ArrayList()); return id; @@ -190,9 +191,14 @@ public class TestFairScheduler { private void createSchedulingRequestExistingApplication(int memory, int priority, ApplicationAttemptId attId) { - List ask = new ArrayList(); ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY, - priority, 1); + priority, 1, true); + createSchedulingRequestExistingApplication(request, attId); + } + + private void createSchedulingRequestExistingApplication(ResourceRequest request, + ApplicationAttemptId attId) { + List ask = new ArrayList(); ask.add(request); scheduler.allocate(attId, ask, new ArrayList()); } @@ -499,14 +505,16 @@ public class TestFairScheduler { // First ask, queue1 requests 1 large (minReqSize * 2). List ask1 = new ArrayList(); ResourceRequest request1 = - createResourceRequest(minReqSize * 2, ResourceRequest.ANY, 1, 1); + createResourceRequest(minReqSize * 2, ResourceRequest.ANY, 1, 1, true); ask1.add(request1); scheduler.allocate(id11, ask1, new ArrayList()); // Second ask, queue2 requests 1 large + (2 * minReqSize) List ask2 = new ArrayList(); - ResourceRequest request2 = createResourceRequest(2 * minReqSize, "foo", 1, 1); - ResourceRequest request3 = createResourceRequest(minReqSize, "bar", 1, 2); + ResourceRequest request2 = createResourceRequest(2 * minReqSize, "foo", 1, 1, + false); + ResourceRequest request3 = createResourceRequest(minReqSize, "bar", 1, 2, + false); ask2.add(request2); ask2.add(request3); scheduler.allocate(id21, ask2, new ArrayList()); @@ -514,7 +522,7 @@ public class TestFairScheduler { // Third ask, queue2 requests 1 large List ask3 = new ArrayList(); ResourceRequest request4 = - createResourceRequest(2 * minReqSize, ResourceRequest.ANY, 1, 1); + createResourceRequest(2 * minReqSize, ResourceRequest.ANY, 1, 1, true); ask3.add(request4); scheduler.allocate(id22, ask3, new ArrayList()); @@ -1408,12 +1416,12 @@ public class TestFairScheduler { // 1 request with 2 nodes on the same rack. another request with 1 node on // a different rack List asks = new ArrayList(); - asks.add(createResourceRequest(1024, node1.getHostName(), 1, 1)); - asks.add(createResourceRequest(1024, node2.getHostName(), 1, 1)); - asks.add(createResourceRequest(1024, node3.getHostName(), 1, 1)); - asks.add(createResourceRequest(1024, node1.getRackName(), 1, 1)); - asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1)); - asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2)); + asks.add(createResourceRequest(1024, node1.getHostName(), 1, 1, true)); + asks.add(createResourceRequest(1024, node2.getHostName(), 1, 1, true)); + asks.add(createResourceRequest(1024, node3.getHostName(), 1, 1, true)); + asks.add(createResourceRequest(1024, node1.getRackName(), 1, 1, true)); + asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1, true)); + asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true)); scheduler.allocate(appId, asks, new ArrayList()); @@ -1693,5 +1701,129 @@ public class TestFairScheduler { assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); scheduler.update(); // update shouldn't change things assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); +} + + @Test + public void testStrictLocality() { + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", + "user1", 0); + + ResourceRequest nodeRequest = createResourceRequest(1024, node1.getHostName(), 1, 1, true); + ResourceRequest rackRequest = createResourceRequest(1024, node1.getRackName(), 1, 1, false); + ResourceRequest anyRequest = createResourceRequest(1024, ResourceRequest.ANY, + 1, 1, false); + createSchedulingRequestExistingApplication(nodeRequest, attId1); + createSchedulingRequestExistingApplication(rackRequest, attId1); + createSchedulingRequestExistingApplication(anyRequest, attId1); + + scheduler.update(); + + NodeUpdateSchedulerEvent node1UpdateEvent = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); + + // no matter how many heartbeats, node2 should never get a container + FSSchedulerApp app = scheduler.applications.get(attId1); + for (int i = 0; i < 10; i++) { + scheduler.handle(node2UpdateEvent); + assertEquals(0, app.getLiveContainers().size()); + assertEquals(0, app.getReservedContainers().size()); + } + // then node1 should get the container + scheduler.handle(node1UpdateEvent); + assertEquals(1, app.getLiveContainers().size()); + } + + @Test + public void testCancelStrictLocality() { + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", + "user1", 0); + + ResourceRequest nodeRequest = createResourceRequest(1024, node1.getHostName(), 1, 1, true); + ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 1, false); + ResourceRequest anyRequest = createResourceRequest(1024, ResourceRequest.ANY, + 1, 1, false); + createSchedulingRequestExistingApplication(nodeRequest, attId1); + createSchedulingRequestExistingApplication(rackRequest, attId1); + createSchedulingRequestExistingApplication(anyRequest, attId1); + + scheduler.update(); + + NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); + + // no matter how many heartbeats, node2 should never get a container + FSSchedulerApp app = scheduler.applications.get(attId1); + for (int i = 0; i < 10; i++) { + scheduler.handle(node2UpdateEvent); + assertEquals(0, app.getLiveContainers().size()); + } + + // relax locality + List update = Arrays.asList( + createResourceRequest(1024, node1.getHostName(), 1, 0, true), + createResourceRequest(1024, "rack1", 1, 0, true), + createResourceRequest(1024, ResourceRequest.ANY, 1, 1, true)); + scheduler.allocate(attId1, update, new ArrayList()); + + // then node2 should get the container + scheduler.handle(node2UpdateEvent); + assertEquals(1, app.getLiveContainers().size()); + } + + /** + * If we update our ask to strictly request a node, it doesn't make sense to keep + * a reservation on another. + */ + @Test + public void testReservationsStrictLocality() { + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); + RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent2); + + ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1", + "user1", 0); + FSSchedulerApp app = scheduler.applications.get(attId); + + ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true); + ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true); + ResourceRequest anyRequest = createResourceRequest(1024, ResourceRequest.ANY, + 1, 2, false); + createSchedulingRequestExistingApplication(nodeRequest, attId); + createSchedulingRequestExistingApplication(rackRequest, attId); + createSchedulingRequestExistingApplication(anyRequest, attId); + + scheduler.update(); + + NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdateEvent); + assertEquals(1, app.getLiveContainers().size()); + scheduler.handle(nodeUpdateEvent); + assertEquals(1, app.getReservedContainers().size()); + + // now, make our request node-specific (on a different node) + rackRequest = createResourceRequest(1024, "rack1", 1, 1, false); + anyRequest = createResourceRequest(1024, ResourceRequest.ANY, + 1, 1, false); + scheduler.allocate(attId, Arrays.asList(rackRequest, anyRequest), + new ArrayList()); + + scheduler.handle(nodeUpdateEvent); + assertEquals(0, app.getReservedContainers().size()); } }