YARN-5165. Fix NoOvercommitPolicy to take advantage of RLE representation of plan. (Carlo Curino via asuresh)

This commit is contained in:
Arun Suresh 2016-06-03 14:49:32 -07:00
parent f10ebc67f5
commit db54670e83
2 changed files with 15 additions and 25 deletions

View File

@ -21,11 +21,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
import org.apache.hadoop.yarn.util.resource.Resources;
/** /**
* This policy enforce a simple physical cluster capacity constraints, by * This policy enforce a simple physical cluster capacity constraints, by
@ -52,29 +50,21 @@ public class NoOverCommitPolicy implements SharingPolicy {
+ oldReservation.getUser() + " != " + reservation.getUser()); + oldReservation.getUser() + " != " + reservation.getUser());
} }
long startTime = reservation.getStartTime(); RLESparseResourceAllocation available = plan.getAvailableResourceOverTime(
long endTime = reservation.getEndTime(); reservation.getUser(), reservation.getReservationId(),
long step = plan.getStep(); reservation.getStartTime(), reservation.getEndTime());
// for every instant in time, check we are respecting cluster capacity // test the reservation does not exceed what is available
for (long t = startTime; t < endTime; t += step) { try {
Resource currExistingAllocTot = plan.getTotalCommittedResources(t); RLESparseResourceAllocation
Resource currNewAlloc = reservation.getResourcesAtTime(t); .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
Resource currOldAlloc = Resource.newInstance(0, 0); available, reservation.getResourcesOverTime(),
if (oldReservation != null) { RLESparseResourceAllocation.RLEOperator.subtractTestNonNegative,
oldReservation.getResourcesAtTime(t); reservation.getStartTime(), reservation.getEndTime());
} } catch (PlanningException p) {
// check the cluster is never over committed throw new ResourceOverCommitException(
// currExistingAllocTot + currNewAlloc - currOldAlloc > "Resources at time " + " would be overcommitted by "
// capPlan.getTotalCapacity() + "accepting reservation: " + reservation.getReservationId());
if (Resources.greaterThan(plan.getResourceCalculator(), plan
.getTotalCapacity(), Resources.subtract(
Resources.add(currExistingAllocTot, currNewAlloc), currOldAlloc),
plan.getTotalCapacity())) {
throw new ResourceOverCommitException("Resources at time " + t
+ " would be overcommitted by " + "accepting reservation: "
+ reservation.getReservationId());
}
} }
} }

View File

@ -53,7 +53,7 @@ public class TestSimpleCapacityReplanner {
@Test @Test
public void testReplanningPlanCapacityLoss() throws PlanningException { public void testReplanningPlanCapacityLoss() throws PlanningException {
Resource clusterCapacity = Resource.newInstance(100 * 1024, 10); Resource clusterCapacity = Resource.newInstance(100 * 1024, 100);
Resource minAlloc = Resource.newInstance(1024, 1); Resource minAlloc = Resource.newInstance(1024, 1);
Resource maxAlloc = Resource.newInstance(1024 * 8, 8); Resource maxAlloc = Resource.newInstance(1024 * 8, 8);