YARN-1710. Logic to find allocations within a Plan that satisfy user ReservationRequest(s). Contributed by Carlo Curino and Subru Krishnan.

(cherry picked from commit aef7928899)
This commit is contained in:
carlo curino 2014-09-15 16:56:28 -07:00 committed by Chris Douglas
parent cf4b34282a
commit f66ffcf832
5 changed files with 1031 additions and 3 deletions

View File

@ -4,7 +4,13 @@ CapacityScheduler. (Carlo Curino and Subru Krishnan via curino)
YARN-2475. Logic for responding to capacity drops for the YARN-2475. Logic for responding to capacity drops for the
ReservationSystem. (Carlo Curino and Subru Krishnan via curino) ReservationSystem. (Carlo Curino and Subru Krishnan via curino)
YARN-1708. Public YARN APIs for creating/updating/deleting reservations. (subru) YARN-1708. Public YARN APIs for creating/updating/deleting
reservations. (Carlo Curino and Subru Krishnan via subru)
YARN-1709. In-memory data structures used to track resources over time to YARN-1709. In-memory data structures used to track resources over
enable reservations. (subru) time to enable reservations. (Carlo Curino and Subru Krishnan via
subru)
YARN-1710. Logic to find allocations within a Plan that satisfy
user ReservationRequest(s). (Carlo Curino and Subru Krishnan via
curino)

View File

@ -0,0 +1,367 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This Agent employs a simple greedy placement strategy, placing the various
* stages of a {@link ReservationRequest} from the deadline moving backward
* towards the arrival. This allows jobs with earlier deadline to be scheduled
* greedily as well. Combined with an opportunistic anticipation of work if the
* cluster is not fully utilized also seems to provide good latency for
* best-effort jobs (i.e., jobs running without a reservation).
*
* This agent does not account for locality and only consider container
* granularity for validation purposes (i.e., you can't exceed max-container
* size).
*/
public class GreedyReservationAgent implements ReservationAgent {
private static final Logger LOG = LoggerFactory
.getLogger(GreedyReservationAgent.class);
@Override
public boolean createReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
return computeAllocation(reservationId, user, plan, contract, null);
}
@Override
public boolean updateReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
return computeAllocation(reservationId, user, plan, contract,
plan.getReservationById(reservationId));
}
@Override
public boolean deleteReservation(ReservationId reservationId, String user,
Plan plan) throws PlanningException {
return plan.deleteReservation(reservationId);
}
private boolean computeAllocation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract,
ReservationAllocation oldReservation) throws PlanningException,
ContractValidationException {
LOG.info("placing the following ReservationRequest: " + contract);
Resource totalCapacity = plan.getTotalCapacity();
// Here we can addd logic to adjust the ResourceDefinition to account for
// system "imperfections" (e.g., scheduling delays for large containers).
// Align with plan step conservatively (i.e., ceil arrival, and floor
// deadline)
long earliestStart = contract.getArrival();
long step = plan.getStep();
if (earliestStart % step != 0) {
earliestStart = earliestStart + (step - (earliestStart % step));
}
long deadline =
contract.getDeadline() - contract.getDeadline() % plan.getStep();
// setup temporary variables to handle time-relations between stages and
// intermediate answers
long curDeadline = deadline;
long oldDeadline = -1;
Map<ReservationInterval, ReservationRequest> allocations =
new HashMap<ReservationInterval, ReservationRequest>();
RLESparseResourceAllocation tempAssigned =
new RLESparseResourceAllocation(plan.getResourceCalculator(),
plan.getMinimumAllocation());
List<ReservationRequest> stages = contract.getReservationRequests()
.getReservationResources();
ReservationRequestInterpreter type = contract.getReservationRequests()
.getInterpreter();
// Iterate the stages in backward from deadline
for (ListIterator<ReservationRequest> li =
stages.listIterator(stages.size()); li.hasPrevious();) {
ReservationRequest currentReservationStage = li.previous();
// validate the RR respect basic constraints
validateInput(plan, currentReservationStage, totalCapacity);
// run allocation for a single stage
Map<ReservationInterval, ReservationRequest> curAlloc =
placeSingleStage(plan, tempAssigned, currentReservationStage,
earliestStart, curDeadline, oldReservation, totalCapacity);
if (curAlloc == null) {
// if we did not find an allocation for the currentReservationStage
// return null, unless the ReservationDefinition we are placing is of
// type ANY
if (type != ReservationRequestInterpreter.R_ANY) {
throw new PlanningException("The GreedyAgent"
+ " couldn't find a valid allocation for your request");
} else {
continue;
}
} else {
// if we did find an allocation add it to the set of allocations
allocations.putAll(curAlloc);
// if this request is of type ANY we are done searching (greedy)
// and can return the current allocation (break-out of the search)
if (type == ReservationRequestInterpreter.R_ANY) {
break;
}
// if the request is of ORDER or ORDER_NO_GAP we constraint the next
// round of allocation to precede the current allocation, by setting
// curDeadline
if (type == ReservationRequestInterpreter.R_ORDER
|| type == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
curDeadline = findEarliestTime(curAlloc.keySet());
// for ORDER_NO_GAP verify that the allocation found so far has no
// gap, return null otherwise (the greedy procedure failed to find a
// no-gap
// allocation)
if (type == ReservationRequestInterpreter.R_ORDER_NO_GAP
&& oldDeadline > 0) {
if (oldDeadline - findLatestTime(curAlloc.keySet()) > plan
.getStep()) {
throw new PlanningException("The GreedyAgent"
+ " couldn't find a valid allocation for your request");
}
}
// keep the variable oldDeadline pointing to the last deadline we
// found
oldDeadline = curDeadline;
}
}
}
// / If we got here is because we failed to find an allocation for the
// ReservationDefinition give-up and report failure to the user
if (allocations.isEmpty()) {
throw new PlanningException("The GreedyAgent"
+ " couldn't find a valid allocation for your request");
}
// create reservation with above allocations if not null/empty
ReservationRequest ZERO_RES =
ReservationRequest.newInstance(Resource.newInstance(0, 0), 0);
long firstStartTime = findEarliestTime(allocations.keySet());
// add zero-padding from arrival up to the first non-null allocation
// to guarantee that the reservation exists starting at arrival
if (firstStartTime > earliestStart) {
allocations.put(new ReservationInterval(earliestStart,
firstStartTime), ZERO_RES);
firstStartTime = earliestStart;
// consider to add trailing zeros at the end for simmetry
}
// Actually add/update the reservation in the plan.
// This is subject to validation as other agents might be placing
// in parallel and there might be sharing policies the agent is not
// aware off.
ReservationAllocation capReservation =
new InMemoryReservationAllocation(reservationId, contract, user,
plan.getQueueName(), firstStartTime,
findLatestTime(allocations.keySet()), allocations,
plan.getResourceCalculator(), plan.getMinimumAllocation());
if (oldReservation != null) {
return plan.updateReservation(capReservation);
} else {
return plan.addReservation(capReservation);
}
}
private void validateInput(Plan plan, ReservationRequest rr,
Resource totalCapacity) throws ContractValidationException {
if (rr.getConcurrency() < 1) {
throw new ContractValidationException("Gang Size should be >= 1");
}
if (rr.getNumContainers() <= 0) {
throw new ContractValidationException("Num containers should be >= 0");
}
// check that gangSize and numContainers are compatible
if (rr.getNumContainers() % rr.getConcurrency() != 0) {
throw new ContractValidationException(
"Parallelism must be an exact multiple of gang size");
}
// check that the largest container request does not exceed
// the cluster-wide limit for container sizes
if (Resources.greaterThan(plan.getResourceCalculator(), totalCapacity,
rr.getCapability(), plan.getMaximumAllocation())) {
throw new ContractValidationException("Individual"
+ " capability requests should not exceed cluster's maxAlloc");
}
}
/**
* This method actually perform the placement of an atomic stage of the
* reservation. The key idea is to traverse the plan backward for a
* "lease-duration" worth of time, and compute what is the maximum multiple of
* our concurrency (gang) parameter we can fit. We do this and move towards
* previous instant in time until the time-window is exhausted or we placed
* all the user request.
*/
private Map<ReservationInterval, ReservationRequest> placeSingleStage(
Plan plan, RLESparseResourceAllocation tempAssigned,
ReservationRequest rr, long earliestStart, long curDeadline,
ReservationAllocation oldResAllocation, final Resource totalCapacity) {
Map<ReservationInterval, ReservationRequest> allocationRequests =
new HashMap<ReservationInterval, ReservationRequest>();
// compute the gang as a resource and get the duration
Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
long dur = rr.getDuration();
long step = plan.getStep();
// ceil the duration to the next multiple of the plan step
if (dur % step != 0) {
dur += (step - (dur % step));
}
// we know for sure that this division has no remainder (part of contract
// with user, validate before
int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
int maxGang = 0;
// loop trying to place until we are done, or we are considering
// an invalid range of times
while (gangsToPlace > 0 && curDeadline - dur >= earliestStart) {
// as we run along we remember how many gangs we can fit, and what
// was the most constraining moment in time (we will restart just
// after that to place the next batch)
maxGang = gangsToPlace;
long minPoint = curDeadline;
int curMaxGang = maxGang;
// start placing at deadline (excluded due to [,) interval semantics and
// move backward
for (long t = curDeadline - plan.getStep(); t >= curDeadline - dur
&& maxGang > 0; t = t - plan.getStep()) {
// As we run along we will logically remove the previous allocation for
// this reservation
// if one existed
Resource oldResCap = Resource.newInstance(0, 0);
if (oldResAllocation != null) {
oldResCap = oldResAllocation.getResourcesAtTime(t);
}
// compute net available resources
Resource netAvailableRes = Resources.clone(totalCapacity);
Resources.addTo(netAvailableRes, oldResCap);
Resources.subtractFrom(netAvailableRes,
plan.getTotalCommittedResources(t));
Resources.subtractFrom(netAvailableRes,
tempAssigned.getCapacityAtTime(t));
// compute maximum number of gangs we could fit
curMaxGang =
(int) Math.floor(Resources.divide(plan.getResourceCalculator(),
totalCapacity, netAvailableRes, gang));
// pick the minimum between available resources in this instant, and how
// many gangs we have to place
curMaxGang = Math.min(gangsToPlace, curMaxGang);
// compare with previous max, and set it. also remember *where* we found
// the minimum (useful for next attempts)
if (curMaxGang <= maxGang) {
maxGang = curMaxGang;
minPoint = t;
}
}
// if we were able to place any gang, record this, and decrement
// gangsToPlace
if (maxGang > 0) {
gangsToPlace -= maxGang;
ReservationInterval reservationInt =
new ReservationInterval(curDeadline - dur, curDeadline);
ReservationRequest reservationRes =
ReservationRequest.newInstance(rr.getCapability(),
rr.getConcurrency() * maxGang, rr.getConcurrency(),
rr.getDuration());
// remember occupied space (plan is read-only till we find a plausible
// allocation for the entire request). This is needed since we might be
// placing other ReservationRequest within the same
// ReservationDefinition,
// and we must avoid double-counting the available resources
tempAssigned.addInterval(reservationInt, reservationRes);
allocationRequests.put(reservationInt, reservationRes);
}
// reset our new starting point (curDeadline) to the most constraining
// point so far, we will look "left" of that to find more places where
// to schedule gangs (for sure nothing on the "right" of this point can
// fit a full gang.
curDeadline = minPoint;
}
// if no gangs are left to place we succeed and return the allocation
if (gangsToPlace == 0) {
return allocationRequests;
} else {
// If we are here is becasue we did not manage to satisfy this request.
// So we need to remove unwanted side-effect from tempAssigned (needed
// for ANY).
for (Map.Entry<ReservationInterval, ReservationRequest> tempAllocation :
allocationRequests.entrySet()) {
tempAssigned.removeInterval(tempAllocation.getKey(),
tempAllocation.getValue());
}
// and return null to signal failure in this allocation
return null;
}
}
// finds the leftmost point of this set of ReservationInterval
private long findEarliestTime(Set<ReservationInterval> resInt) {
long ret = Long.MAX_VALUE;
for (ReservationInterval s : resInt) {
if (s.getStartTime() < ret) {
ret = s.getStartTime();
}
}
return ret;
}
// finds the rightmost point of this set of ReservationIntervals
private long findLatestTime(Set<ReservationInterval> resInt) {
long ret = Long.MIN_VALUE;
for (ReservationInterval s : resInt) {
if (s.getEndTime() > ret) {
ret = s.getEndTime();
}
}
return ret;
}
}

View File

@ -0,0 +1,55 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
/**
* An entity that seeks to acquire resources to satisfy an user's contract
*/
public interface ReservationAgent {
/**
* Create a reservation for the user that abides by the specified contract
*
* @param reservationId the identifier of the reservation to be created.
* @param user the user who wants to create the reservation
* @param plan the Plan to which the reservation must be fitted
* @param contract encapsulates the resources the user requires for his
* session
*
* @return whether the create operation was successful or not
* @throws PlanningException if the session cannot be fitted into the plan
*/
public boolean createReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException;
/**
* Update a reservation for the user that abides by the specified contract
*
* @param reservationId the identifier of the reservation to be updated
* @param user the user who wants to create the session
* @param plan the Plan to which the reservation must be fitted
* @param contract encapsulates the resources the user requires for his
* reservation
*
* @return whether the update operation was successful or not
* @throws PlanningException if the reservation cannot be fitted into the plan
*/
public boolean updateReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException;
/**
* Delete an user reservation
*
* @param reservationId the identifier of the reservation to be deleted
* @param user the user who wants to create the reservation
* @param plan the Plan to which the session must be fitted
*
* @return whether the delete operation was successful or not
* @throws PlanningException if the reservation cannot be fitted into the plan
*/
public boolean deleteReservation(ReservationId reservationId, String user,
Plan plan) throws PlanningException;
}

View File

@ -0,0 +1,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
public class ContractValidationException extends PlanningException {
private static final long serialVersionUID = 1L;
public ContractValidationException(String message) {
super(message);
}
}

View File

@ -0,0 +1,588 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Test;
import org.mortbay.log.Log;
public class TestGreedyReservationAgent {
ReservationAgent agent;
InMemoryPlan plan;
Resource minAlloc = Resource.newInstance(1024, 1);
ResourceCalculator res = new DefaultResourceCalculator();
Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
Random rand = new Random();
long step;
@Before
public void setup() throws Exception {
long seed = rand.nextLong();
rand.setSeed(seed);
Log.info("Running with seed: " + seed);
// setting completely loose quotas
long timeWindow = 1000000L;
Resource clusterCapacity = Resource.newInstance(100 * 1024, 100);
step = 1000L;
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
CapacityScheduler scheduler = testUtil.mockCapacityScheduler(125);
String reservationQ = testUtil.getFullReservationQueueName();
CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
capConf.setReservationWindow(reservationQ, timeWindow);
capConf.setMaximumCapacity(reservationQ, 100);
capConf.setAverageCapacity(reservationQ, 100);
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
policy.init(reservationQ, capConf, new HashSet<String>());
agent = new GreedyReservationAgent();
QueueMetrics queueMetrics = QueueMetrics.forQueue("dedicated",
mock(ParentQueue.class), false, capConf);
plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
res, minAlloc, maxAlloc, "dedicated", null, true);
}
@SuppressWarnings("javadoc")
@Test
public void testSimple() throws PlanningException {
prepareBasicPlan();
// create a request with a single atomic ask
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(5 * step);
rr.setDeadline(20 * step);
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(2048, 2), 10, 5, 10 * step);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setReservationResources(Collections.singletonList(r));
rr.setReservationRequests(reqs);
ReservationId reservationID = ReservationSystemTestUtil
.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr);
assertTrue("Agent-based allocation failed", reservationID != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == 3);
ReservationAllocation cs = plan.getReservationById(reservationID);
System.out.println("--------AFTER SIMPLE ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
for (long i = 10 * step; i < 20 * step; i++) {
assertTrue(
"Agent-based allocation unexpected",
Resources.equals(cs.getResourcesAtTime(i),
Resource.newInstance(2048 * 10, 2 * 10)));
}
}
@Test
public void testOrder() throws PlanningException {
prepareBasicPlan();
// create a completely utilized segment around time 30
int[] f = { 100, 100 };
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", 30 * step, 30 * step + f.length * step,
ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
res, minAlloc)));
// create a chain of 4 RR, mixing gang and non-gang
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(0 * step);
rr.setDeadline(70 * step);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER);
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(2048, 2), 10, 1, 10 * step);
ReservationRequest r2 = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 10, 10, 20 * step);
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
list.add(r);
list.add(r2);
list.add(r);
list.add(r2);
reqs.setReservationResources(list);
rr.setReservationRequests(reqs);
// submit to agent
ReservationId reservationID = ReservationSystemTestUtil
.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr);
// validate
assertTrue("Agent-based allocation failed", reservationID != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == 4);
ReservationAllocation cs = plan.getReservationById(reservationID);
assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1));
assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1));
System.out.println("--------AFTER ORDER ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
}
@Test
public void testOrderNoGapImpossible() throws PlanningException {
prepareBasicPlan();
// create a completely utilized segment at time 30
int[] f = { 100, 100 };
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", 30 * step, 30 * step + f.length * step,
ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
res, minAlloc)));
// create a chain of 4 RR, mixing gang and non-gang
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(0L);
rr.setDeadline(70L);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(2048, 2), 10, 1, 10);
ReservationRequest r2 = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 10, 10, 20);
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
list.add(r);
list.add(r2);
list.add(r);
list.add(r2);
reqs.setReservationResources(list);
rr.setReservationRequests(reqs);
ReservationId reservationID = ReservationSystemTestUtil
.getNewReservationId();
boolean result = false;
try {
// submit to agent
result = agent.createReservation(reservationID, "u1", plan, rr);
fail();
} catch (PlanningException p) {
// expected
}
// validate
assertFalse("Agent-based allocation should have failed", result);
assertTrue("Agent-based allocation should have failed", plan
.getAllReservations().size() == 3);
System.out
.println("--------AFTER ORDER_NO_GAP IMPOSSIBLE ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
}
@Test
public void testOrderNoGap() throws PlanningException {
prepareBasicPlan();
// create a chain of 4 RR, mixing gang and non-gang
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(0 * step);
rr.setDeadline(60 * step);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(2048, 2), 10, 1, 10 * step);
ReservationRequest r2 = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 10, 10, 20 * step);
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
list.add(r);
list.add(r2);
list.add(r);
list.add(r2);
reqs.setReservationResources(list);
rr.setReservationRequests(reqs);
rr.setReservationRequests(reqs);
// submit to agent
ReservationId reservationID = ReservationSystemTestUtil
.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr);
System.out.println("--------AFTER ORDER ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
// validate
assertTrue("Agent-based allocation failed", reservationID != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == 3);
ReservationAllocation cs = plan.getReservationById(reservationID);
assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
assertTrue(cs.toString(), check(cs, 30 * step, 40 * step, 20, 1024, 1));
assertTrue(cs.toString(), check(cs, 40 * step, 60 * step, 10, 1024, 1));
}
@Test
public void testSingleSliding() throws PlanningException {
prepareBasicPlan();
// create a single request for which we need subsequent (tight) packing.
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(100 * step);
rr.setDeadline(120 * step);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 200, 10, 10 * step);
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
list.add(r);
reqs.setReservationResources(list);
rr.setReservationRequests(reqs);
// submit to agent
ReservationId reservationID = ReservationSystemTestUtil
.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr);
// validate results, we expect the second one to be accepted
assertTrue("Agent-based allocation failed", reservationID != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == 3);
ReservationAllocation cs = plan.getReservationById(reservationID);
assertTrue(cs.toString(), check(cs, 100 * step, 120 * step, 100, 1024, 1));
System.out.println("--------AFTER packed ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
}
@Test
public void testAny() throws PlanningException {
prepareBasicPlan();
// create an ANY request, with an impossible step (last in list, first
// considered),
// and two satisfiable ones. We expect the second one to be returned.
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(100 * step);
rr.setDeadline(120 * step);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 5, 5, 10 * step);
ReservationRequest r2 = ReservationRequest.newInstance(
Resource.newInstance(2048, 2), 10, 5, 10 * step);
ReservationRequest r3 = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 110, 110, 10 * step);
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
list.add(r);
list.add(r2);
list.add(r3);
reqs.setReservationResources(list);
rr.setReservationRequests(reqs);
// submit to agent
ReservationId reservationID = ReservationSystemTestUtil
.getNewReservationId();
boolean res = agent.createReservation(reservationID, "u1", plan, rr);
// validate results, we expect the second one to be accepted
assertTrue("Agent-based allocation failed", res);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == 3);
ReservationAllocation cs = plan.getReservationById(reservationID);
assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID
+ ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
}
@Test
public void testAnyImpossible() throws PlanningException {
prepareBasicPlan();
// create an ANY request, with all impossible alternatives
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(100L);
rr.setDeadline(120L);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
// longer than arrival-deadline
ReservationRequest r1 = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 35, 5, 30);
// above max cluster size
ReservationRequest r2 = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 110, 110, 10);
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
list.add(r1);
list.add(r2);
reqs.setReservationResources(list);
rr.setReservationRequests(reqs);
ReservationId reservationID = ReservationSystemTestUtil
.getNewReservationId();
boolean result = false;
try {
// submit to agent
result = agent.createReservation(reservationID, "u1", plan, rr);
fail();
} catch (PlanningException p) {
// expected
}
// validate results, we expect the second one to be accepted
assertFalse("Agent-based allocation should have failed", result);
assertTrue("Agent-based allocation should have failed", plan
.getAllReservations().size() == 2);
System.out.println("--------AFTER ANY IMPOSSIBLE ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
}
@Test
public void testAll() throws PlanningException {
prepareBasicPlan();
// create an ALL request
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(100 * step);
rr.setDeadline(120 * step);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 5, 5, 10 * step);
ReservationRequest r2 = ReservationRequest.newInstance(
Resource.newInstance(2048, 2), 10, 10, 20 * step);
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
list.add(r);
list.add(r2);
reqs.setReservationResources(list);
rr.setReservationRequests(reqs);
// submit to agent
ReservationId reservationID = ReservationSystemTestUtil
.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr);
// validate results, we expect the second one to be accepted
assertTrue("Agent-based allocation failed", reservationID != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == 3);
ReservationAllocation cs = plan.getReservationById(reservationID);
assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 20, 1024, 1));
assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 25, 1024, 1));
System.out.println("--------AFTER ALL ALLOCATION (queue: " + reservationID
+ ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
}
@Test
public void testAllImpossible() throws PlanningException {
prepareBasicPlan();
// create an ALL request, with an impossible combination, it should be
// rejected, and allocation remain unchanged
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(100L);
rr.setDeadline(120L);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 55, 5, 10);
ReservationRequest r2 = ReservationRequest.newInstance(
Resource.newInstance(2048, 2), 55, 5, 20);
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
list.add(r);
list.add(r2);
reqs.setReservationResources(list);
rr.setReservationRequests(reqs);
ReservationId reservationID = ReservationSystemTestUtil
.getNewReservationId();
boolean result = false;
try {
// submit to agent
result = agent.createReservation(reservationID, "u1", plan, rr);
fail();
} catch (PlanningException p) {
// expected
}
// validate results, we expect the second one to be accepted
assertFalse("Agent-based allocation failed", result);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == 2);
System.out.println("--------AFTER ALL IMPOSSIBLE ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
}
private void prepareBasicPlan() throws PlanningException {
// insert in the reservation a couple of controlled reservations, to create
// conditions for assignment that are non-empty
int[] f = { 10, 10, 20, 20, 20, 10, 10 };
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil
.generateAllocation(0, step, f), res, minAlloc)));
int[] f2 = { 5, 5, 5, 5, 5, 5, 5 };
Map<ReservationInterval, ReservationRequest> alloc =
ReservationSystemTestUtil.generateAllocation(5000, step, f2);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc)));
System.out.println("--------BEFORE AGENT----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
}
private boolean check(ReservationAllocation cs, long start, long end,
int containers, int mem, int cores) {
boolean res = true;
for (long i = start; i < end; i++) {
res = res
&& Resources.equals(cs.getResourcesAtTime(i),
Resource.newInstance(mem * containers, cores * containers));
}
return res;
}
public void testStress(int numJobs) throws PlanningException, IOException {
long timeWindow = 1000000L;
Resource clusterCapacity = Resource.newInstance(500 * 100 * 1024, 500 * 32);
step = 1000L;
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100);
String reservationQ = testUtil.getFullReservationQueueName();
CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
capConf.setReservationWindow(reservationQ, timeWindow);
capConf.setMaximumCapacity(reservationQ, 100);
capConf.setAverageCapacity(reservationQ, 100);
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
policy.init(reservationQ, capConf, new HashSet<String>());
plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent,
clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true);
int acc = 0;
List<ReservationDefinition> list = new ArrayList<ReservationDefinition>();
for (long i = 0; i < numJobs; i++) {
list.add(ReservationSystemTestUtil.generateRandomRR(rand, i));
}
long start = System.currentTimeMillis();
for (int i = 0; i < numJobs; i++) {
try {
if (agent.createReservation(
ReservationSystemTestUtil.getNewReservationId(), "u" + i % 100,
plan, list.get(i))) {
acc++;
}
} catch (PlanningException p) {
// ignore exceptions
}
}
long end = System.currentTimeMillis();
System.out.println("Submitted " + numJobs + " jobs " + " accepted " + acc
+ " in " + (end - start) + "ms");
}
public static void main(String[] arg) {
// run a stress test with by default 1000 random jobs
int numJobs = 1000;
if (arg.length > 0) {
numJobs = Integer.parseInt(arg[0]);
}
try {
TestGreedyReservationAgent test = new TestGreedyReservationAgent();
test.setup();
test.testStress(numJobs);
} catch (Exception e) {
e.printStackTrace();
}
}
}