YARN-392. Make it possible to specify hard locality constraints in resource requests. (sandyr via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1488328 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-05-31 17:28:10 +00:00
parent 1711348e48
commit ab4b602ca6
6 changed files with 232 additions and 19 deletions

View File

@ -76,6 +76,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-422. Add a NM Client library to help application-writers. (Zhijie Shen
via vinodkv)
YARN-392. Make it possible to specify hard locality constraints in resource
requests. (sandyr via tucu)
IMPROVEMENTS
YARN-365. Change NM heartbeat handling to not generate a scheduler event

View File

@ -155,6 +155,44 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
@Stable
public abstract void setNumContainers(int numContainers);
/**
* Get whether locality relaxation is enabled with this
* <code>ResourceRequest</code>. Defaults to true.
*
* @return whether locality relaxation is enabled with this
* <code>ResourceRequest</code>.
*/
@Public
@Stable
public abstract boolean getRelaxLocality();
/**
* For a request at a network hierarchy level, set whether locality can be relaxed
* to that level and beyond.
*
* If the flag is off on a rack-level <code>ResourceRequest</code>,
* containers at that request's priority will not be assigned to nodes on that
* request's rack unless requests specifically for those nodes have also been
* submitted.
*
* If the flag is off on an {@link ResourceRequest#ANY}-level
* <code>ResourceRequest</code>, containers at that request's priority will
* only be assigned on racks for which specific requests have also been
* submitted.
*
* For example, to request a container strictly on a specific node, the
* corresponding rack-level and any-level requests should have locality
* relaxation set to false. Similarly, to request a container strictly on a
* specific rack, the corresponding any-level request should have locality
* relaxation set to false.
*
* @param relaxLocality whether locality relaxation is enabled with this
* <code>ResourceRequest</code>.
*/
@Public
@Stable
public abstract void setRelaxLocality(boolean relaxLocality);
@Override
public int hashCode() {
final int prime = 2153;

View File

@ -146,6 +146,18 @@ public class ResourceRequestPBImpl extends ResourceRequest {
maybeInitBuilder();
builder.setNumContainers((numContainers));
}
@Override
public boolean getRelaxLocality() {
ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
return p.getRelaxLocality();
}
@Override
public void setRelaxLocality(boolean relaxLocality) {
maybeInitBuilder();
builder.setRelaxLocality(relaxLocality);
}
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
return new PriorityPBImpl(p);
@ -167,6 +179,7 @@ public class ResourceRequestPBImpl extends ResourceRequest {
public String toString() {
return "{Priority: " + getPriority() + ", Capability: " + getCapability()
+ ", # Containers: " + getNumContainers()
+ ", Location: " + getHostName() + "}";
+ ", Location: " + getHostName()
+ ", Relax Locality: " + getRelaxLocality() + "}";
}
}

View File

@ -204,6 +204,7 @@ message ResourceRequestProto {
optional string host_name = 2;
optional ResourceProto capability = 3;
optional int32 num_containers = 4;
optional bool relax_locality = 5 [default = true];
}
////////////////////////////////////////////////////////////////////////

View File

@ -316,6 +316,11 @@ public class AppSchedulable extends Schedulable {
ResourceRequest localRequest = app.getResourceRequest(priority,
node.getHostName());
if (localRequest != null && !localRequest.getRelaxLocality()) {
LOG.warn("Relax locality off is not supported on local request: "
+ localRequest);
}
NodeType allowedLocality = app.getAllowedLocalityLevel(priority,
scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(),
scheduler.getRackLocalityThreshold());
@ -325,6 +330,10 @@ public class AppSchedulable extends Schedulable {
return assignContainer(node, priority,
localRequest, NodeType.NODE_LOCAL, reserved);
}
if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) {
continue;
}
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& (allowedLocality.equals(NodeType.RACK_LOCAL) ||
@ -335,6 +344,10 @@ public class AppSchedulable extends Schedulable {
ResourceRequest offSwitchRequest = app.getResourceRequest(priority,
ResourceRequest.ANY);
if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) {
continue;
}
if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0
&& allowedLocality.equals(NodeType.OFF_SWITCH)) {
return assignContainer(node, priority, offSwitchRequest,
@ -359,10 +372,23 @@ public class AppSchedulable extends Schedulable {
* given node, if the node had full space.
*/
public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
// TODO: add checks stuff about node specific scheduling here
ResourceRequest request = app.getResourceRequest(prio, ResourceRequest.ANY);
return request.getNumContainers() > 0 &&
ResourceRequest anyRequest = app.getResourceRequest(prio, ResourceRequest.ANY);
ResourceRequest rackRequest = app.getResourceRequest(prio, node.getRackName());
ResourceRequest nodeRequest = app.getResourceRequest(prio, node.getHostName());
return
// There must be outstanding requests at the given priority:
anyRequest != null && anyRequest.getNumContainers() > 0 &&
// If locality relaxation is turned off at *-level, there must be a
// non-zero request for the node's rack:
(anyRequest.getRelaxLocality() ||
(rackRequest != null && rackRequest.getNumContainers() > 0)) &&
// If locality relaxation is turned off at rack-level, there must be a
// non-zero request at the node:
(rackRequest == null || rackRequest.getRelaxLocality() ||
(nodeRequest != null && nodeRequest.getNumContainers() > 0)) &&
// The requested container must be able to fit on the node:
Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
request.getCapability(), node.getRMNode().getTotalCapability());
anyRequest.getCapability(), node.getRMNode().getTotalCapability());
}
}

View File

@ -151,7 +151,7 @@ public class TestFairScheduler {
private ResourceRequest createResourceRequest(int memory, String host,
int priority, int numContainers) {
int priority, int numContainers, boolean relaxLocality) {
ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
request.setCapability(Resources.createResource(memory));
request.setHostName(host);
@ -159,6 +159,7 @@ public class TestFairScheduler {
Priority prio = recordFactory.newRecordInstance(Priority.class);
prio.setPriority(priority);
request.setPriority(prio);
request.setRelaxLocality(relaxLocality);
return request;
}
@ -182,7 +183,7 @@ public class TestFairScheduler {
scheduler.addApplication(id, queueId, userId);
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
priority, numContainers);
priority, numContainers, true);
ask.add(request);
scheduler.allocate(id, ask, new ArrayList<ContainerId>());
return id;
@ -190,9 +191,14 @@ public class TestFairScheduler {
private void createSchedulingRequestExistingApplication(int memory, int priority,
ApplicationAttemptId attId) {
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
priority, 1);
priority, 1, true);
createSchedulingRequestExistingApplication(request, attId);
}
private void createSchedulingRequestExistingApplication(ResourceRequest request,
ApplicationAttemptId attId) {
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ask.add(request);
scheduler.allocate(attId, ask, new ArrayList<ContainerId>());
}
@ -499,14 +505,16 @@ public class TestFairScheduler {
// First ask, queue1 requests 1 large (minReqSize * 2).
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
ResourceRequest request1 =
createResourceRequest(minReqSize * 2, ResourceRequest.ANY, 1, 1);
createResourceRequest(minReqSize * 2, ResourceRequest.ANY, 1, 1, true);
ask1.add(request1);
scheduler.allocate(id11, ask1, new ArrayList<ContainerId>());
// Second ask, queue2 requests 1 large + (2 * minReqSize)
List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
ResourceRequest request2 = createResourceRequest(2 * minReqSize, "foo", 1, 1);
ResourceRequest request3 = createResourceRequest(minReqSize, "bar", 1, 2);
ResourceRequest request2 = createResourceRequest(2 * minReqSize, "foo", 1, 1,
false);
ResourceRequest request3 = createResourceRequest(minReqSize, "bar", 1, 2,
false);
ask2.add(request2);
ask2.add(request3);
scheduler.allocate(id21, ask2, new ArrayList<ContainerId>());
@ -514,7 +522,7 @@ public class TestFairScheduler {
// Third ask, queue2 requests 1 large
List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>();
ResourceRequest request4 =
createResourceRequest(2 * minReqSize, ResourceRequest.ANY, 1, 1);
createResourceRequest(2 * minReqSize, ResourceRequest.ANY, 1, 1, true);
ask3.add(request4);
scheduler.allocate(id22, ask3, new ArrayList<ContainerId>());
@ -1408,12 +1416,12 @@ public class TestFairScheduler {
// 1 request with 2 nodes on the same rack. another request with 1 node on
// a different rack
List<ResourceRequest> asks = new ArrayList<ResourceRequest>();
asks.add(createResourceRequest(1024, node1.getHostName(), 1, 1));
asks.add(createResourceRequest(1024, node2.getHostName(), 1, 1));
asks.add(createResourceRequest(1024, node3.getHostName(), 1, 1));
asks.add(createResourceRequest(1024, node1.getRackName(), 1, 1));
asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1));
asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2));
asks.add(createResourceRequest(1024, node1.getHostName(), 1, 1, true));
asks.add(createResourceRequest(1024, node2.getHostName(), 1, 1, true));
asks.add(createResourceRequest(1024, node3.getHostName(), 1, 1, true));
asks.add(createResourceRequest(1024, node1.getRackName(), 1, 1, true));
asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1, true));
asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true));
scheduler.allocate(appId, asks, new ArrayList<ContainerId>());
@ -1693,5 +1701,129 @@ public class TestFairScheduler {
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
scheduler.update(); // update shouldn't change things
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
}
@Test
public void testStrictLocality() {
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
"user1", 0);
ResourceRequest nodeRequest = createResourceRequest(1024, node1.getHostName(), 1, 1, true);
ResourceRequest rackRequest = createResourceRequest(1024, node1.getRackName(), 1, 1, false);
ResourceRequest anyRequest = createResourceRequest(1024, ResourceRequest.ANY,
1, 1, false);
createSchedulingRequestExistingApplication(nodeRequest, attId1);
createSchedulingRequestExistingApplication(rackRequest, attId1);
createSchedulingRequestExistingApplication(anyRequest, attId1);
scheduler.update();
NodeUpdateSchedulerEvent node1UpdateEvent = new NodeUpdateSchedulerEvent(node1);
NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2);
// no matter how many heartbeats, node2 should never get a container
FSSchedulerApp app = scheduler.applications.get(attId1);
for (int i = 0; i < 10; i++) {
scheduler.handle(node2UpdateEvent);
assertEquals(0, app.getLiveContainers().size());
assertEquals(0, app.getReservedContainers().size());
}
// then node1 should get the container
scheduler.handle(node1UpdateEvent);
assertEquals(1, app.getLiveContainers().size());
}
@Test
public void testCancelStrictLocality() {
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
"user1", 0);
ResourceRequest nodeRequest = createResourceRequest(1024, node1.getHostName(), 1, 1, true);
ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 1, false);
ResourceRequest anyRequest = createResourceRequest(1024, ResourceRequest.ANY,
1, 1, false);
createSchedulingRequestExistingApplication(nodeRequest, attId1);
createSchedulingRequestExistingApplication(rackRequest, attId1);
createSchedulingRequestExistingApplication(anyRequest, attId1);
scheduler.update();
NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2);
// no matter how many heartbeats, node2 should never get a container
FSSchedulerApp app = scheduler.applications.get(attId1);
for (int i = 0; i < 10; i++) {
scheduler.handle(node2UpdateEvent);
assertEquals(0, app.getLiveContainers().size());
}
// relax locality
List<ResourceRequest> update = Arrays.asList(
createResourceRequest(1024, node1.getHostName(), 1, 0, true),
createResourceRequest(1024, "rack1", 1, 0, true),
createResourceRequest(1024, ResourceRequest.ANY, 1, 1, true));
scheduler.allocate(attId1, update, new ArrayList<ContainerId>());
// then node2 should get the container
scheduler.handle(node2UpdateEvent);
assertEquals(1, app.getLiveContainers().size());
}
/**
* If we update our ask to strictly request a node, it doesn't make sense to keep
* a reservation on another.
*/
@Test
public void testReservationsStrictLocality() {
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent2);
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
"user1", 0);
FSSchedulerApp app = scheduler.applications.get(attId);
ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true);
ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true);
ResourceRequest anyRequest = createResourceRequest(1024, ResourceRequest.ANY,
1, 2, false);
createSchedulingRequestExistingApplication(nodeRequest, attId);
createSchedulingRequestExistingApplication(rackRequest, attId);
createSchedulingRequestExistingApplication(anyRequest, attId);
scheduler.update();
NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeUpdateEvent);
assertEquals(1, app.getLiveContainers().size());
scheduler.handle(nodeUpdateEvent);
assertEquals(1, app.getReservedContainers().size());
// now, make our request node-specific (on a different node)
rackRequest = createResourceRequest(1024, "rack1", 1, 1, false);
anyRequest = createResourceRequest(1024, ResourceRequest.ANY,
1, 1, false);
scheduler.allocate(attId, Arrays.asList(rackRequest, anyRequest),
new ArrayList<ContainerId>());
scheduler.handle(nodeUpdateEvent);
assertEquals(0, app.getReservedContainers().size());
}
}