YARN-2475. Logic for responding to capacity drops for the ReservationSystem. Contributed by Carlo Curino and Subru Krishnan.

(cherry picked from commit f83a07f266)
(cherry picked from commit 1c6950354f)
This commit is contained in:
carlo curino 2014-09-12 16:52:54 -07:00 committed by Chris Douglas
parent cf5ef00b96
commit b81f571e60
3 changed files with 249 additions and 0 deletions

5
YARN-1051-CHANGES.txt Normal file
View File

@ -0,0 +1,5 @@
YARN-1707. Introduce APIs to add/remove/resize queues in the
CapacityScheduler. (Carlo Curino and Subru Krishnan via curino)
YARN-2475. Logic for responding to capacity drops for the
ReservationSystem. (Carlo Curino and Subru Krishnan via curino)

View File

@ -0,0 +1,95 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
/**
* This (re)planner scan a period of time from now to a maximum time window (or
* the end of the last session, whichever comes first) checking the overall
* capacity is not violated.
*
* It greedily removes sessions in reversed order of acceptance (latest accepted
* is the first removed).
*/
public class SimpleCapacityReplanner implements Planner {
private static final Log LOG = LogFactory
.getLog(SimpleCapacityReplanner.class);
private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
private final Clock clock;
// this allows to control to time-span of this replanning
// far into the future time instants might be worth replanning for
// later on
private long lengthOfCheckZone;
public SimpleCapacityReplanner() {
this(new UTCClock());
}
@VisibleForTesting
SimpleCapacityReplanner(Clock clock) {
this.clock = clock;
}
@Override
public void init(String planQueueName, CapacitySchedulerConfiguration conf) {
this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName);
}
@Override
public void plan(Plan plan, List<ReservationDefinition> contracts)
throws PlanningException {
if (contracts != null) {
throw new RuntimeException(
"SimpleCapacityReplanner cannot handle new reservation contracts");
}
ResourceCalculator resCalc = plan.getResourceCalculator();
Resource totCap = plan.getTotalCapacity();
long now = clock.getTime();
// loop on all moment in time from now to the end of the check Zone
// or the end of the planned sessions whichever comes first
for (long t = now; (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); t +=
plan.getStep()) {
Resource excessCap =
Resources.subtract(plan.getTotalCommittedResources(t), totCap);
// if we are violating
if (Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE)) {
// sorted on reverse order of acceptance, so newest reservations first
Set<ReservationAllocation> curReservations =
new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t));
for (Iterator<ReservationAllocation> resIter =
curReservations.iterator(); resIter.hasNext()
&& Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE);) {
ReservationAllocation reservation = resIter.next();
plan.deleteReservation(reservation.getReservationId());
excessCap =
Resources.subtract(excessCap, reservation.getResourcesAtTime(t));
LOG.info("Removing reservation " + reservation.getReservationId()
+ " to repair physical-resource constraints in the plan: "
+ plan.getQueueName());
}
}
}
}
}

View File

@ -0,0 +1,149 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashSet;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Test;
public class TestSimpleCapacityReplanner {
@Test
public void testReplanningPlanCapacityLoss() throws PlanningException {
Resource clusterCapacity = Resource.newInstance(100 * 1024, 10);
Resource minAlloc = Resource.newInstance(1024, 1);
Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
ResourceCalculator res = new DefaultResourceCalculator();
long step = 1L;
Clock clock = mock(Clock.class);
ReservationAgent agent = mock(ReservationAgent.class);
SharingPolicy policy = new NoOverCommitPolicy();
policy.init("root.dedicated", null, new HashSet<String>());
QueueMetrics queueMetrics = mock(QueueMetrics.class);
when(clock.getTime()).thenReturn(0L);
SimpleCapacityReplanner enf = new SimpleCapacityReplanner(clock);
CapacitySchedulerConfiguration conf =
mock(CapacitySchedulerConfiguration.class);
when(conf.getEnforcementWindow(any(String.class))).thenReturn(6L);
conf.setLong(CapacitySchedulerConfiguration.PREFIX + "blah"
+ CapacitySchedulerConfiguration.DOT
+ CapacitySchedulerConfiguration.RESERVATION_ENFORCEMENT_WINDOW, 6);
enf.init("blah", conf);
// Initialize the plan with more resources
InMemoryPlan plan =
new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
res, minAlloc, maxAlloc, "dedicated", enf, true, clock);
// add reservation filling the plan (separating them 1ms, so we are sure
// s2 follows s1 on acceptance
long ts = System.currentTimeMillis();
ReservationId r1 = ReservationId.newInstance(ts, 1);
int[] f5 = { 20, 20, 20, 20, 20 };
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
minAlloc)));
when(clock.getTime()).thenReturn(1L);
ReservationId r2 = ReservationId.newInstance(ts, 2);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r2, null, "u4",
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
minAlloc)));
when(clock.getTime()).thenReturn(2L);
ReservationId r3 = ReservationId.newInstance(ts, 3);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r3, null, "u5",
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
minAlloc)));
when(clock.getTime()).thenReturn(3L);
ReservationId r4 = ReservationId.newInstance(ts, 4);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r4, null, "u6",
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
minAlloc)));
when(clock.getTime()).thenReturn(4L);
ReservationId r5 = ReservationId.newInstance(ts, 5);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r5, null, "u7",
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
minAlloc)));
int[] f6 = { 50, 50, 50, 50, 50 };
ReservationId r6 = ReservationId.newInstance(ts, 6);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r6, null, "u3",
"dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
minAlloc)));
when(clock.getTime()).thenReturn(6L);
ReservationId r7 = ReservationId.newInstance(ts, 7);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r7, null, "u4",
"dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
minAlloc)));
// remove some of the resources (requires replanning)
plan.setTotalCapacity(Resource.newInstance(70 * 1024, 70));
when(clock.getTime()).thenReturn(0L);
// run the replanner
enf.plan(plan, null);
// check which reservation are still present
assertNotNull(plan.getReservationById(r1));
assertNotNull(plan.getReservationById(r2));
assertNotNull(plan.getReservationById(r3));
assertNotNull(plan.getReservationById(r6));
assertNotNull(plan.getReservationById(r7));
// and which ones are removed
assertNull(plan.getReservationById(r4));
assertNull(plan.getReservationById(r5));
// check resources at each moment in time no more exceed capacity
for (int i = 0; i < 20; i++) {
int tot = 0;
for (ReservationAllocation r : plan.getReservationsAtTime(i)) {
tot = r.getResourcesAtTime(i).getMemory();
}
assertTrue(tot <= 70 * 1024);
}
}
private Map<ReservationInterval, ReservationRequest> generateAllocation(
int startTime, int[] alloc) {
Map<ReservationInterval, ReservationRequest> req =
new TreeMap<ReservationInterval, ReservationRequest>();
for (int i = 0; i < alloc.length; i++) {
req.put(new ReservationInterval(startTime + i, startTime + i + 1),
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
alloc[i]));
}
return req;
}
}