YARN-5164. Use plan RLE to improve CapacityOverTimePolicy efficiency

This commit is contained in:
Chris Douglas 2016-07-25 16:37:50 -07:00
parent 703fdf86c6
commit d383bfdcd4
2 changed files with 185 additions and 134 deletions

View File

@ -1,26 +1,21 @@
/******************************************************************************* /*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one or more
* or more contributor license agreements. See the NOTICE file * contributor license agreements. See the NOTICE file distributed with this
* distributed with this work for additional information * work for additional information regarding copyright ownership. The ASF
* regarding copyright ownership. The ASF licenses this file * licenses this file to you under the Apache License, Version 2.0 (the
* to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License.
* "License"); you may not use this file except in compliance * You may obtain a copy of the License at
* with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* See the License for the specific language governing permissions and * License for the specific language governing permissions and limitations under
* limitations under the License. * the License.
*******************************************************************************/ *******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation; package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Date;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
@ -28,9 +23,12 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
/** /**
* This policy enforces a time-extended notion of Capacity. In particular it * This policy enforces a time-extended notion of Capacity. In particular it
* guarantees that the allocation received in input when combined with all * guarantees that the allocation received in input when combined with all
@ -51,7 +49,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
*/ */
@LimitedPrivate("yarn") @LimitedPrivate("yarn")
@Unstable @Unstable
public class CapacityOverTimePolicy implements SharingPolicy { public class CapacityOverTimePolicy extends NoOverCommitPolicy {
private ReservationSchedulerConfiguration conf; private ReservationSchedulerConfiguration conf;
private long validWindow; private long validWindow;
@ -68,121 +66,153 @@ public class CapacityOverTimePolicy implements SharingPolicy {
validWindow = this.conf.getReservationWindow(reservationQueuePath); validWindow = this.conf.getReservationWindow(reservationQueuePath);
maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100; maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100;
maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100; maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100;
}; }
/**
* The validation algorithm walks over the RLE encoded allocation and
* checks that for all transition points (when the start or end of the
* checking window encounters a value in the RLE). At this point it
* checkes whether the integral computed exceeds the quota limit. Note that
* this might not find the exact time of a violation, but if a violation
* exists it will find it. The advantage is a much lower number of checks
* as compared to time-slot by time-slot checks.
*
* @param plan the plan to validate against
* @param reservation the reservation allocation to test.
* @throws PlanningException if the validation fails.
*/
@Override @Override
public void validate(Plan plan, ReservationAllocation reservation) public void validate(Plan plan, ReservationAllocation reservation)
throws PlanningException { throws PlanningException {
// this is entire method invoked under a write-lock on the plan, no need
// to synchronize accesses to the plan further
// Try to verify whether there is already a reservation with this ID in // rely on NoOverCommitPolicy to check for: 1) user-match, 2) physical
// the system (remove its contribution during validation to simulate a // cluster limits, and 3) maxInst (via override of available)
// try-n-swap try {
// update). super.validate(plan, reservation);
ReservationAllocation oldReservation = } catch (PlanningException p) {
//wrap it in proper quota exception
throw new PlanningQuotaException(p);
}
//---- check for integral violations of capacity --------
// Gather a view of what to check (curr allocation of user, minus old
// version of this reservation, plus new version)
RLESparseResourceAllocation consumptionForUserOverTime =
plan.getConsumptionForUserOverTime(reservation.getUser(),
reservation.getStartTime() - validWindow,
reservation.getEndTime() + validWindow);
ReservationAllocation old =
plan.getReservationById(reservation.getReservationId()); plan.getReservationById(reservation.getReservationId());
if (old != null) {
consumptionForUserOverTime = RLESparseResourceAllocation
.merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
consumptionForUserOverTime, old.getResourcesOverTime(),
RLEOperator.add, reservation.getStartTime() - validWindow,
reservation.getEndTime() + validWindow);
}
long startTime = reservation.getStartTime(); RLESparseResourceAllocation resRLE = reservation.getResourcesOverTime();
long endTime = reservation.getEndTime();
long step = plan.getStep();
Resource planTotalCapacity = plan.getTotalCapacity(); RLESparseResourceAllocation toCheck = RLESparseResourceAllocation
.merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
consumptionForUserOverTime, resRLE, RLEOperator.add, Long.MIN_VALUE,
Long.MAX_VALUE);
Resource maxAvgRes = Resources.multiply(planTotalCapacity, maxAvg); NavigableMap<Long, Resource> integralUp = new TreeMap<>();
Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst); NavigableMap<Long, Resource> integralDown = new TreeMap<>();
// define variable that will store integral of resources (need diff class to long prevTime = toCheck.getEarliestStartTime();
// avoid overflow issues for long/large allocations) IntegralResource prevResource = new IntegralResource(0L, 0L);
IntegralResource runningTot = new IntegralResource(0L, 0L); IntegralResource runningTot = new IntegralResource(0L, 0L);
IntegralResource maxAllowed = new IntegralResource(maxAvgRes);
maxAllowed.multiplyBy(validWindow / step);
RLESparseResourceAllocation userCons = // add intermediate points
plan.getConsumptionForUserOverTime(reservation.getUser(), startTime Map<Long, Resource> temp = new TreeMap<>();
- validWindow, endTime + validWindow); for (Map.Entry<Long, Resource> pointToCheck : toCheck.getCumulative()
.entrySet()) {
// check that the resources offered to the user during any window of length Long timeToCheck = pointToCheck.getKey();
// "validWindow" overlapping this allocation are within maxAllowed Resource resourceToCheck = pointToCheck.getValue();
// also enforce instantaneous and physical constraints during this pass
for (long t = startTime - validWindow; t < endTime + validWindow; t += step) {
Resource currExistingAllocTot = plan.getTotalCommittedResources(t); Long nextPoint = toCheck.getCumulative().higherKey(timeToCheck);
Resource currExistingAllocForUser = userCons.getCapacityAtTime(t); if (nextPoint == null || toCheck.getCumulative().get(nextPoint) == null) {
Resource currNewAlloc = reservation.getResourcesAtTime(t); continue;
Resource currOldAlloc = Resources.none(); }
if (oldReservation != null) { for (int i = 1; i <= (nextPoint - timeToCheck) / validWindow; i++) {
currOldAlloc = oldReservation.getResourcesAtTime(t); temp.put(timeToCheck + (i * validWindow), resourceToCheck);
}
}
temp.putAll(toCheck.getCumulative());
// compute point-wise integral for the up-fronts and down-fronts
for (Map.Entry<Long, Resource> currPoint : temp.entrySet()) {
Long currTime = currPoint.getKey();
Resource currResource = currPoint.getValue();
//add to running total current contribution
prevResource.multiplyBy(currTime - prevTime);
runningTot.add(prevResource);
integralUp.put(currTime, normalizeToResource(runningTot, validWindow));
integralDown.put(currTime + validWindow,
normalizeToResource(runningTot, validWindow));
if (currResource != null) {
prevResource.memory = currResource.getMemorySize();
prevResource.vcores = currResource.getVirtualCores();
} else {
prevResource.memory = 0L;
prevResource.vcores = 0L;
}
prevTime = currTime;
} }
// throw exception if the cluster is overcommitted // compute final integral as delta of up minus down transitions
// tot_allocated - old + new > capacity RLESparseResourceAllocation intUp =
Resource inst = new RLESparseResourceAllocation(integralUp,
Resources.subtract(Resources.add(currExistingAllocTot, currNewAlloc), plan.getResourceCalculator());
currOldAlloc); RLESparseResourceAllocation intDown =
if (Resources.greaterThan(plan.getResourceCalculator(), new RLESparseResourceAllocation(integralDown,
planTotalCapacity, inst, planTotalCapacity)) { plan.getResourceCalculator());
throw new ResourceOverCommitException(" Resources at time " + t
+ " would be overcommitted (" + inst + " over "
+ plan.getTotalCapacity() + ") by accepting reservation: "
+ reservation.getReservationId());
}
// throw exception if instantaneous limits are violated RLESparseResourceAllocation integral = RLESparseResourceAllocation
// tot_alloc_to_this_user - old + new > inst_limit .merge(plan.getResourceCalculator(), plan.getTotalCapacity(), intUp,
if (Resources.greaterThan(plan.getResourceCalculator(), intDown, RLEOperator.subtract, Long.MIN_VALUE, Long.MAX_VALUE);
planTotalCapacity, Resources.subtract(
Resources.add(currExistingAllocForUser, currNewAlloc),
currOldAlloc), maxInsRes)) {
throw new PlanningQuotaException("Instantaneous quota capacity "
+ maxInst + " would be passed at time " + t
+ " by accepting reservation: " + reservation.getReservationId());
}
// throw exception if the running integral of utilization over validWindow // define over-time integral limit
// is violated. We perform a delta check, adding/removing instants at the // note: this is aligned with the normalization done above
// boundary of the window from runningTot. NavigableMap<Long, Resource> tlimit = new TreeMap<>();
Resource maxAvgRes = Resources.multiply(plan.getTotalCapacity(), maxAvg);
tlimit.put(toCheck.getEarliestStartTime() - validWindow, maxAvgRes);
RLESparseResourceAllocation targetLimit =
new RLESparseResourceAllocation(tlimit, plan.getResourceCalculator());
// runningTot = previous_runningTot + currExistingAllocForUser + // compare using merge() limit with integral
// currNewAlloc - currOldAlloc - pastNewAlloc - pastOldAlloc; try {
RLESparseResourceAllocation
// Where: .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
// 1) currNewAlloc, currExistingAllocForUser represent the contribution of targetLimit, integral, RLEOperator.subtractTestNonNegative,
// the instant in time added in this pass. reservation.getStartTime() - validWindow,
// 2) pastNewAlloc, pastOldAlloc are the contributions relative to time reservation.getEndTime() + validWindow);
// instants that are being retired from the the window } catch (PlanningException p) {
// 3) currOldAlloc is the contribution (if any) of the previous version of
// this reservation (the one we are updating)
runningTot.add(currExistingAllocForUser);
runningTot.add(currNewAlloc);
runningTot.subtract(currOldAlloc);
// expire contributions from instant in time before (t - validWindow)
if (t > startTime) {
Resource pastOldAlloc = userCons.getCapacityAtTime(t - validWindow);
Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow);
// runningTot = runningTot - pastExistingAlloc - pastNewAlloc;
runningTot.subtract(pastOldAlloc);
runningTot.subtract(pastNewAlloc);
}
// check integral
// runningTot > maxAvg * validWindow
// NOTE: we need to use comparator of IntegralResource directly, as
// Resource and ResourceCalculator assume "int" amount of resources,
// which is not sufficient when comparing integrals (out-of-bound)
if (maxAllowed.compareTo(runningTot) < 0) {
throw new PlanningQuotaException( throw new PlanningQuotaException(
"Integral (avg over time) quota capacity " + maxAvg "Integral (avg over time) quota capacity " + maxAvg
+ " over a window of " + validWindow / 1000 + " seconds, " + " over a window of " + validWindow / 1000 + " seconds, "
+ " would be passed at time " + t + "(" + new Date(t) + " would be exceeded by accepting reservation: " + reservation
+ ") by accepting reservation: " .getReservationId(), p);
+ reservation.getReservationId());
} }
} }
private Resource normalizeToResource(IntegralResource runningTot,
long window) {
// normalize to fit in windows. Rounding should not impact more than
// sub 1 core average allocations. This will all be removed once
// Resource moves to long.
int memory = (int) Math.round((double) runningTot.memory / window);
int vcores = (int) Math.round((double) runningTot.vcores / window);
return Resource.newInstance(memory, vcores);
} }
@Override @Override
@ -208,21 +238,18 @@ public class CapacityOverTimePolicy implements SharingPolicy {
// add back in old reservation used resources if any // add back in old reservation used resources if any
ReservationAllocation old = plan.getReservationById(oldId); ReservationAllocation old = plan.getReservationById(oldId);
if (old != null) { if (old != null) {
used = used = RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
Resources.clone(plan.getTotalCapacity()), used, Resources.clone(plan.getTotalCapacity()), used,
old.getResourcesOverTime(), RLEOperator.subtract, start, end); old.getResourcesOverTime(), RLEOperator.subtract, start, end);
} }
instRLEQuota = instRLEQuota = RLESparseResourceAllocation
RLESparseResourceAllocation.merge(plan.getResourceCalculator(), .merge(plan.getResourceCalculator(), planTotalCapacity, instRLEQuota,
planTotalCapacity, instRLEQuota, used, RLEOperator.subtract, start, used, RLEOperator.subtract, start, end);
end);
instRLEQuota = instRLEQuota = RLESparseResourceAllocation
RLESparseResourceAllocation.merge(plan.getResourceCalculator(), .merge(plan.getResourceCalculator(), planTotalCapacity, available,
planTotalCapacity, available, instRLEQuota, RLEOperator.min, start, instRLEQuota, RLEOperator.min, start, end);
end);
return instRLEQuota; return instRLEQuota;
} }
@ -260,11 +287,20 @@ public class CapacityOverTimePolicy implements SharingPolicy {
vcores += r.getVirtualCores(); vcores += r.getVirtualCores();
} }
public void add(IntegralResource r) {
memory += r.memory;
vcores += r.vcores;
}
public void subtract(Resource r) { public void subtract(Resource r) {
memory -= r.getMemorySize(); memory -= r.getMemorySize();
vcores -= r.getVirtualCores(); vcores -= r.getVirtualCores();
} }
public IntegralResource negate() {
return new IntegralResource(-memory, -vcores);
}
public void multiplyBy(long window) { public void multiplyBy(long window) {
memory = memory * window; memory = memory * window;
vcores = vcores * window; vcores = vcores * window;
@ -282,8 +318,7 @@ public class CapacityOverTimePolicy implements SharingPolicy {
public String toString() { public String toString() {
return "<memory:" + memory + ", vCores:" + vcores + ">"; return "<memory:" + memory + ", vCores:" + vcores + ">";
} }
} }
} }

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@ -117,6 +116,23 @@ public class TestCapacityOverTimePolicy {
res, minAlloc), false)); res, minAlloc), false));
} }
@Test(expected = PlanningException.class)
public void testAllocationLargerThanValidWindow() throws IOException,
PlanningException {
// generate allocation that exceed the validWindow
int[] f = generateData(25*3600, (int) Math.ceil(0.69 * totCont));
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
initTime, initTime + f.length + 1, f.length);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc), false));
}
@Test @Test
public void testSimplePass2() throws IOException, PlanningException { public void testSimplePass2() throws IOException, PlanningException {
// generate allocation from single tenant that exceed avg momentarily but // generate allocation from single tenant that exceed avg momentarily but
@ -151,7 +167,7 @@ public class TestCapacityOverTimePolicy {
} }
} }
@Test(expected = ResourceOverCommitException.class) @Test(expected = PlanningQuotaException.class)
public void testMultiTenantFail() throws IOException, PlanningException { public void testMultiTenantFail() throws IOException, PlanningException {
// generate allocation from multiple tenants that exceed tot capacity // generate allocation from multiple tenants that exceed tot capacity
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont)); int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));