Merge -r 1163767:1163768 from trunk to branch-0.23 to fix MAPREDUCE-2917.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1163769 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-08-31 19:54:16 +00:00
parent 464486bee3
commit 2572c7c434
6 changed files with 224 additions and 20 deletions

View File

@ -1179,6 +1179,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2916. Ivy build for MRv1 fails with bad organization for
common daemon. (mahadev)
MAPREDUCE-2917. Fixed corner case in container reservation which led to
starvation and hung jobs. (acmurthy)
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -73,7 +73,11 @@ public class SchedulerApp {
final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
new HashMap<Priority, Map<NodeId, RMContainer>>();
Map<Priority, Integer> schedulingOpportunities = new HashMap<Priority, Integer>();
Map<Priority, Integer> schedulingOpportunities =
new HashMap<Priority, Integer>();
Map<Priority, Integer> reReservations =
new HashMap<Priority, Integer>();
Resource currentReservation = recordFactory
.newRecordInstance(Resource.class);
@ -265,15 +269,15 @@ public class SchedulerApp {
}
synchronized public void resetSchedulingOpportunities(Priority priority) {
Integer schedulingOpportunities = this.schedulingOpportunities
.get(priority);
Integer schedulingOpportunities =
this.schedulingOpportunities.get(priority);
schedulingOpportunities = 0;
this.schedulingOpportunities.put(priority, schedulingOpportunities);
}
synchronized public void addSchedulingOpportunity(Priority priority) {
Integer schedulingOpportunities = this.schedulingOpportunities
.get(priority);
Integer schedulingOpportunities =
this.schedulingOpportunities.get(priority);
if (schedulingOpportunities == null) {
schedulingOpportunities = 0;
}
@ -282,8 +286,8 @@ public class SchedulerApp {
}
synchronized public int getSchedulingOpportunities(Priority priority) {
Integer schedulingOpportunities = this.schedulingOpportunities
.get(priority);
Integer schedulingOpportunities =
this.schedulingOpportunities.get(priority);
if (schedulingOpportunities == null) {
schedulingOpportunities = 0;
this.schedulingOpportunities.put(priority, schedulingOpportunities);
@ -291,6 +295,30 @@ public class SchedulerApp {
return schedulingOpportunities;
}
synchronized void resetReReservations(Priority priority) {
Integer reReservations = this.reReservations.get(priority);
reReservations = 0;
this.reReservations.put(priority, reReservations);
}
synchronized void addReReservation(Priority priority) {
Integer reReservations = this.reReservations.get(priority);
if (reReservations == null) {
reReservations = 0;
}
++reReservations;
this.reReservations.put(priority, reReservations);
}
synchronized public int getReReservations(Priority priority) {
Integer reReservations = this.reReservations.get(priority);
if (reReservations == null) {
reReservations = 0;
this.reReservations.put(priority, reReservations);
}
return reReservations;
}
public synchronized int getNumReservedContainers(Priority priority) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
@ -318,6 +346,12 @@ public class SchedulerApp {
rmContext.getContainerAllocationExpirer());
Resources.addTo(currentReservation, container.getResource());
// Reset the re-reservation count
resetReReservations(priority);
} else {
// Note down the re-reservation
addReReservation(priority);
}
rmContainer.handle(new RMContainerReservedEvent(container.getId(),
container.getResource(), node.getNodeID(), priority));
@ -347,6 +381,9 @@ public class SchedulerApp {
this.reservedContainers.remove(priority);
}
// Reset the re-reservation count
resetReReservations(priority);
Resource resource = reservedContainer.getContainer().getResource();
Resources.subtractFrom(currentReservation, resource);

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;

View File

@ -86,7 +86,9 @@ public class LeafQueue implements Queue {
Map<ApplicationAttemptId, SchedulerApp> applicationsMap =
new HashMap<ApplicationAttemptId, SchedulerApp>();
public final Resource minimumAllocation;
private final Resource minimumAllocation;
private final Resource maximumAllocation;
private final float minimumAllocationFactor;
private ContainerTokenSecretManager containerTokenSecretManager;
@ -118,6 +120,10 @@ public class LeafQueue implements Queue {
cs.getConfiguration().getEnableUserMetrics());
this.minimumAllocation = cs.getMinimumResourceCapability();
this.maximumAllocation = cs.getMaximumResourceCapability();
this.minimumAllocationFactor =
(float)(maximumAllocation.getMemory() - minimumAllocation.getMemory()) /
maximumAllocation.getMemory();
this.containerTokenSecretManager = cs.getContainerTokenSecretManager();
float capacity =
@ -239,6 +245,30 @@ public class LeafQueue implements Queue {
return parent.getQueuePath() + "." + getQueueName();
}
/**
* Used only by tests.
*/
@Private
public Resource getMinimumAllocation() {
return minimumAllocation;
}
/**
* Used only by tests.
*/
@Private
public Resource getMaximumAllocation() {
return maximumAllocation;
}
/**
* Used only by tests.
*/
@Private
public float getMinimumAllocationFactor() {
return minimumAllocationFactor;
}
@Override
public synchronized float getUsedCapacity() {
return usedCapacity;
@ -536,25 +566,24 @@ public class LeafQueue implements Queue {
setUserResourceLimit(application, userLimit);
for (Priority priority : application.getPriorities()) {
// Required resource
Resource required =
application.getResourceRequest(priority, RMNode.ANY).getCapability();
// Do we need containers at this 'priority'?
if (!needContainers(application, priority)) {
if (!needContainers(application, priority, required)) {
continue;
}
// Are we going over limits by allocating to this application?
ResourceRequest required =
application.getResourceRequest(priority, RMNode.ANY);
// Maximum Capacity of the queue
if (!assignToQueue(clusterResource, required.getCapability())) {
if (!assignToQueue(clusterResource, required)) {
return Resources.none();
}
// User limits
userLimit =
computeUserLimit(application, clusterResource,
required.getCapability());
computeUserLimit(application, clusterResource, required);
if (!assignToUser(application.getUser(), userLimit)) {
break;
}
@ -732,10 +761,32 @@ public class LeafQueue implements Queue {
return (a + (b - 1)) / b;
}
boolean needContainers(SchedulerApp application, Priority priority) {
boolean needContainers(SchedulerApp application, Priority priority, Resource required) {
int requiredContainers = application.getTotalRequiredResources(priority);
int reservedContainers = application.getNumReservedContainers(priority);
return ((requiredContainers - reservedContainers) > 0);
int starvation = 0;
if (reservedContainers > 0) {
float nodeFactor =
((float)required.getMemory() / getMaximumAllocation().getMemory());
// Use percentage of node required to bias against large containers...
// Protect against corner case where you need the whole node with
// Math.min(nodeFactor, minimumAllocationFactor)
starvation =
(int)((application.getReReservations(priority) / reservedContainers) *
(1.0f - (Math.min(nodeFactor, getMinimumAllocationFactor())))
);
if (LOG.isDebugEnabled()) {
LOG.debug("needsContainers:" +
" app.#re-reserve=" + application.getReReservations(priority) +
" reserved=" + reservedContainers +
" nodeFactor=" + nodeFactor +
" minAllocFactor=" + minimumAllocationFactor +
" starvation=" + starvation);
}
}
return (((starvation + requiredContainers) - reservedContainers) > 0);
}
private Resource assignContainersOnNode(Resource clusterResource,

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;

View File

@ -447,7 +447,7 @@ public class TestLeafQueue {
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
final int numNodes = 1;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
Resource clusterResource = Resources.createResource(numNodes * (4*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests
@ -504,6 +504,121 @@ public class TestLeafQueue {
assertEquals(4*GB, node_0.getUsedResource().getMemory());
}
@Test
public void testReservationExchange() throws Exception {
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
a.setUserLimitFactor(10);
// Users
final String user_0 = "user_0";
final String user_1 = "user_1";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_1, a, rmContext, null);
a.submitApplication(app_1, user_1, A);
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
String host_1 = "host_1";
SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (4*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
when(csContext.getMaximumResourceCapability()).thenReturn(
Resources.createResource(4*GB));
when(a.getMaximumAllocation()).thenReturn(Resources.createResource(4*GB));
when(a.getMinimumAllocationFactor()).thenReturn(0.25f); // 1G / 4G
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 4*GB, 1, priority,
recordFactory)));
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
assertEquals(2*GB, node_0.getUsedResource().getMemory());
// Now free 1 container from app_0 i.e. 1G, and re-reserve it
a.completedContainer(clusterResource, app_0, node_0,
app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL);
a.assignContainers(clusterResource, node_0);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
assertEquals(1*GB, node_0.getUsedResource().getMemory());
assertEquals(1, app_1.getReReservations(priority));
// Re-reserve
a.assignContainers(clusterResource, node_0);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
assertEquals(1*GB, node_0.getUsedResource().getMemory());
assertEquals(2, app_1.getReReservations(priority));
// Try to schedule on node_1 now, should *move* the reservation
a.assignContainers(clusterResource, node_1);
assertEquals(9*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
assertEquals(4*GB, node_1.getUsedResource().getMemory());
// Doesn't change yet... only when reservation is cancelled or a different
// container is reserved
assertEquals(2, app_1.getReReservations(priority));
// Now finish another container from app_0 and see the reservation cancelled
a.completedContainer(clusterResource, app_0, node_0,
app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL);
a.assignContainers(clusterResource, node_0);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentReservation().getMemory());
assertEquals(0*GB, node_0.getUsedResource().getMemory());
}
@Test
public void testLocalityScheduling() throws Exception {