Merge -r 1475680:1475681 from trunk to branch-2. Fixes: YARN-289. Fair scheduler allows reservations that won't fit on node. Contributed by Sandy Ryza.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1475682 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas White 2013-04-25 09:18:24 +00:00
parent 3bed8dc7b9
commit 07ab02a982
4 changed files with 90 additions and 32 deletions

View File

@ -235,6 +235,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-605. Fix failing unit test in TestNMWebServices when versionInfo has YARN-605. Fix failing unit test in TestNMWebServices when versionInfo has
parantheses like when running on a git checkout. (Hitesh Shah via vinodkv) parantheses like when running on a git checkout. (Hitesh Shah via vinodkv)
YARN-289. Fair scheduler allows reservations that won't fit on node.
(Sandy Ryza via tomwhite)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@ -45,6 +46,9 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
@Private @Private
@Unstable @Unstable
public class AppSchedulable extends Schedulable { public class AppSchedulable extends Schedulable {
private static final DefaultResourceCalculator RESOURCE_CALCULATOR
= new DefaultResourceCalculator();
private FairScheduler scheduler; private FairScheduler scheduler;
private FSSchedulerApp app; private FSSchedulerApp app;
private Resource demand = Resources.createResource(0); private Resource demand = Resources.createResource(0);
@ -180,15 +184,15 @@ public class AppSchedulable extends Schedulable {
* update relevant bookeeping. This dispatches ro relevant handlers * update relevant bookeeping. This dispatches ro relevant handlers
* in the {@link FSSchedulerNode} and {@link SchedulerApp} classes. * in the {@link FSSchedulerNode} and {@link SchedulerApp} classes.
*/ */
private void reserve(FSSchedulerApp application, Priority priority, private void reserve(Priority priority, FSSchedulerNode node,
FSSchedulerNode node, Container container, boolean alreadyReserved) { Container container, boolean alreadyReserved) {
LOG.info("Making reservation: node=" + node.getHostName() + LOG.info("Making reservation: node=" + node.getHostName() +
" app_id=" + app.getApplicationId()); " app_id=" + app.getApplicationId());
if (!alreadyReserved) { if (!alreadyReserved) {
getMetrics().reserveResource(application.getUser(), container.getResource()); getMetrics().reserveResource(app.getUser(), container.getResource());
RMContainer rmContainer = application.reserve(node, priority, null, RMContainer rmContainer = app.reserve(node, priority, null,
container); container);
node.reserveResource(application, priority, rmContainer); node.reserveResource(app, priority, rmContainer);
getMetrics().reserveResource(app.getUser(), getMetrics().reserveResource(app.getUser(),
container.getResource()); container.getResource());
scheduler.getRootQueueMetrics().reserveResource(app.getUser(), scheduler.getRootQueueMetrics().reserveResource(app.getUser(),
@ -197,25 +201,24 @@ public class AppSchedulable extends Schedulable {
else { else {
RMContainer rmContainer = node.getReservedContainer(); RMContainer rmContainer = node.getReservedContainer();
application.reserve(node, priority, rmContainer, container); app.reserve(node, priority, rmContainer, container);
node.reserveResource(application, priority, rmContainer); node.reserveResource(app, priority, rmContainer);
} }
} }
/** /**
* Remove the reservation on {@code node} for {@ application} at the given * Remove the reservation on {@code node} at the given
* {@link Priority}. This dispatches to the SchedulerApp and SchedulerNode * {@link Priority}. This dispatches to the SchedulerApp and SchedulerNode
* handlers for an unreservation. * handlers for an unreservation.
*/ */
private void unreserve(FSSchedulerApp application, Priority priority, public void unreserve(Priority priority, FSSchedulerNode node) {
FSSchedulerNode node) {
RMContainer rmContainer = node.getReservedContainer(); RMContainer rmContainer = node.getReservedContainer();
application.unreserve(node, priority); app.unreserve(node, priority);
node.unreserveResource(application); node.unreserveResource(app);
getMetrics().unreserveResource( getMetrics().unreserveResource(
application.getUser(), rmContainer.getContainer().getResource()); app.getUser(), rmContainer.getContainer().getResource());
scheduler.getRootQueueMetrics().unreserveResource( scheduler.getRootQueueMetrics().unreserveResource(
application.getUser(), rmContainer.getContainer().getResource()); app.getUser(), rmContainer.getContainer().getResource());
} }
/** /**
@ -224,8 +227,8 @@ public class AppSchedulable extends Schedulable {
* sure the particular request should be facilitated by this node. * sure the particular request should be facilitated by this node.
*/ */
private Resource assignContainer(FSSchedulerNode node, private Resource assignContainer(FSSchedulerNode node,
FSSchedulerApp application, Priority priority, Priority priority, ResourceRequest request, NodeType type,
ResourceRequest request, NodeType type, boolean reserved) { boolean reserved) {
// How much does this request need? // How much does this request need?
Resource capability = request.getCapability(); Resource capability = request.getCapability();
@ -237,7 +240,7 @@ public class AppSchedulable extends Schedulable {
if (reserved) { if (reserved) {
container = node.getReservedContainer().getContainer(); container = node.getReservedContainer().getContainer();
} else { } else {
container = createContainer(application, node, capability, priority); container = createContainer(app, node, capability, priority);
} }
// Can we allocate a container on this node? // Can we allocate a container on this node?
@ -247,9 +250,12 @@ public class AppSchedulable extends Schedulable {
if (availableContainers > 0) { if (availableContainers > 0) {
// Inform the application of the new container for this request // Inform the application of the new container for this request
RMContainer allocatedContainer = RMContainer allocatedContainer =
application.allocate(type, node, priority, request, container); app.allocate(type, node, priority, request, container);
if (allocatedContainer == null) { if (allocatedContainer == null) {
// Did the application need this resource? // Did the application need this resource?
if (reserved) {
unreserve(priority, node);
}
return Resources.none(); return Resources.none();
} }
else { else {
@ -262,17 +268,17 @@ public class AppSchedulable extends Schedulable {
// If we had previously made a reservation, delete it // If we had previously made a reservation, delete it
if (reserved) { if (reserved) {
unreserve(application, priority, node); unreserve(priority, node);
} }
// Inform the node // Inform the node
node.allocateContainer(application.getApplicationId(), node.allocateContainer(app.getApplicationId(),
allocatedContainer); allocatedContainer);
return container.getResource(); return container.getResource();
} else { } else {
// The desired container won't fit here, so reserve // The desired container won't fit here, so reserve
reserve(application, priority, node, container, reserved); reserve(priority, node, container, reserved);
return FairScheduler.CONTAINER_RESERVED; return FairScheduler.CONTAINER_RESERVED;
} }
@ -287,7 +293,7 @@ public class AppSchedulable extends Schedulable {
// Make sure the application still needs requests at this priority // Make sure the application still needs requests at this priority
if (app.getTotalRequiredResources(priority) == 0) { if (app.getTotalRequiredResources(priority) == 0) {
unreserve(app, priority, node); unreserve(priority, node);
return Resources.none(); return Resources.none();
} }
} else { } else {
@ -304,7 +310,8 @@ public class AppSchedulable extends Schedulable {
// (not scheduled) in order to promote better locality. // (not scheduled) in order to promote better locality.
synchronized (app) { synchronized (app) {
for (Priority priority : prioritiesToTry) { for (Priority priority : prioritiesToTry) {
if (app.getTotalRequiredResources(priority) <= 0) { if (app.getTotalRequiredResources(priority) <= 0 ||
!hasContainerForNode(priority, node)) {
continue; continue;
} }
@ -321,14 +328,14 @@ public class AppSchedulable extends Schedulable {
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& localRequest != null && localRequest.getNumContainers() != 0) { && localRequest != null && localRequest.getNumContainers() != 0) {
return assignContainer(node, app, priority, return assignContainer(node, priority,
localRequest, NodeType.NODE_LOCAL, reserved); localRequest, NodeType.NODE_LOCAL, reserved);
} }
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& (allowedLocality.equals(NodeType.RACK_LOCAL) || && (allowedLocality.equals(NodeType.RACK_LOCAL) ||
allowedLocality.equals(NodeType.OFF_SWITCH))) { allowedLocality.equals(NodeType.OFF_SWITCH))) {
return assignContainer(node, app, priority, rackLocalRequest, return assignContainer(node, priority, rackLocalRequest,
NodeType.RACK_LOCAL, reserved); NodeType.RACK_LOCAL, reserved);
} }
@ -336,7 +343,7 @@ public class AppSchedulable extends Schedulable {
ResourceRequest.ANY); ResourceRequest.ANY);
if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0 if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0
&& allowedLocality.equals(NodeType.OFF_SWITCH)) { && allowedLocality.equals(NodeType.OFF_SWITCH)) {
return assignContainer(node, app, priority, offSwitchRequest, return assignContainer(node, priority, offSwitchRequest,
NodeType.OFF_SWITCH, reserved); NodeType.OFF_SWITCH, reserved);
} }
} }
@ -352,4 +359,16 @@ public class AppSchedulable extends Schedulable {
public Resource assignContainer(FSSchedulerNode node) { public Resource assignContainer(FSSchedulerNode node) {
return assignContainer(node, false); return assignContainer(node, false);
} }
/**
* Whether this app has containers requests that could be satisfied on the
* given node, if the node had full space.
*/
public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
// TODO: add checks stuff about node specific scheduling here
ResourceRequest request = app.getResourceRequest(prio, ResourceRequest.ANY);
return request.getNumContainers() > 0 &&
Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
request.getCapability(), node.getRMNode().getTotalCapability());
}
} }

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@ -805,6 +806,16 @@ public class FairScheduler implements ResourceScheduler {
AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable(); AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
if (reservedAppSchedulable != null) { if (reservedAppSchedulable != null) {
Priority reservedPriority = node.getReservedContainer().getReservedPriority();
if (reservedAppSchedulable != null &&
!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
// Don't hold the reservation if app can no longer use it
LOG.info("Releasing reservation that cannot be satisfied for application "
+ reservedAppSchedulable.getApp().getApplicationAttemptId()
+ " on node " + nm);
reservedAppSchedulable.unreserve(reservedPriority, node);
reservedAppSchedulable = null;
} else {
// Reservation exists; try to fulfill the reservation // Reservation exists; try to fulfill the reservation
LOG.info("Trying to fulfill reservation for application " LOG.info("Trying to fulfill reservation for application "
+ reservedAppSchedulable.getApp().getApplicationAttemptId() + reservedAppSchedulable.getApp().getApplicationAttemptId()
@ -812,7 +823,8 @@ public class FairScheduler implements ResourceScheduler {
node.getReservedAppSchedulable().assignReservedContainer(node); node.getReservedAppSchedulable().assignReservedContainer(node);
} }
else { }
if (reservedAppSchedulable == null) {
// No reservation, schedule at queue which is farthest below fair share // No reservation, schedule at queue which is farthest below fair share
int assignedContainers = 0; int assignedContainers = 0;
while (node.getReservedContainer() == null) { while (node.getReservedContainer() == null) {

View File

@ -1520,4 +1520,28 @@ public class TestFairScheduler {
} }
assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus()); assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus());
} }
@Test
public void testReservationThatDoesntFit() {
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1",
"user1", 1);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
FSSchedulerApp app = scheduler.applications.get(attId);
assertEquals(0, app.getLiveContainers().size());
assertEquals(0, app.getReservedContainers().size());
createSchedulingRequestExistingApplication(1024, 2, attId);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals(1, app.getLiveContainers().size());
assertEquals(0, app.getReservedContainers().size());
}
} }