YARN-1710. Logic to find allocations within a Plan that satisfy user ReservationRequest(s). Contributed by Carlo Curino and Subru Krishnan.

(cherry picked from commit aef7928899)
(cherry picked from commit f66ffcf832)
(cherry picked from commit 6a3c167175)
This commit is contained in:
carlo curino 2014-09-15 16:56:28 -07:00 committed by Chris Douglas
parent 056b7f5799
commit 4ee027b9d6
5 changed files with 1031 additions and 3 deletions

View File

@ -4,7 +4,13 @@ CapacityScheduler. (Carlo Curino and Subru Krishnan via curino)
YARN-2475. Logic for responding to capacity drops for the YARN-2475. Logic for responding to capacity drops for the
ReservationSystem. (Carlo Curino and Subru Krishnan via curino) ReservationSystem. (Carlo Curino and Subru Krishnan via curino)
YARN-1708. Public YARN APIs for creating/updating/deleting reservations. (subru) YARN-1708. Public YARN APIs for creating/updating/deleting
reservations. (Carlo Curino and Subru Krishnan via subru)
YARN-1709. In-memory data structures used to track resources over time to YARN-1709. In-memory data structures used to track resources over
enable reservations. (subru) time to enable reservations. (Carlo Curino and Subru Krishnan via
subru)
YARN-1710. Logic to find allocations within a Plan that satisfy
user ReservationRequest(s). (Carlo Curino and Subru Krishnan via
curino)

View File

@ -0,0 +1,367 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This Agent employs a simple greedy placement strategy, placing the various
* stages of a {@link ReservationRequest} from the deadline moving backward
* towards the arrival. This allows jobs with earlier deadline to be scheduled
* greedily as well. Combined with an opportunistic anticipation of work if the
* cluster is not fully utilized also seems to provide good latency for
* best-effort jobs (i.e., jobs running without a reservation).
*
* This agent does not account for locality and only consider container
* granularity for validation purposes (i.e., you can't exceed max-container
* size).
*/
public class GreedyReservationAgent implements ReservationAgent {
private static final Logger LOG = LoggerFactory
.getLogger(GreedyReservationAgent.class);
@Override
public boolean createReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
return computeAllocation(reservationId, user, plan, contract, null);
}
@Override
public boolean updateReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
return computeAllocation(reservationId, user, plan, contract,
plan.getReservationById(reservationId));
}
@Override
public boolean deleteReservation(ReservationId reservationId, String user,
Plan plan) throws PlanningException {
return plan.deleteReservation(reservationId);
}
private boolean computeAllocation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract,
ReservationAllocation oldReservation) throws PlanningException,
ContractValidationException {
LOG.info("placing the following ReservationRequest: " + contract);
Resource totalCapacity = plan.getTotalCapacity();
// Here we can addd logic to adjust the ResourceDefinition to account for
// system "imperfections" (e.g., scheduling delays for large containers).
// Align with plan step conservatively (i.e., ceil arrival, and floor
// deadline)
long earliestStart = contract.getArrival();
long step = plan.getStep();
if (earliestStart % step != 0) {
earliestStart = earliestStart + (step - (earliestStart % step));
}
long deadline =
contract.getDeadline() - contract.getDeadline() % plan.getStep();
// setup temporary variables to handle time-relations between stages and
// intermediate answers
long curDeadline = deadline;
long oldDeadline = -1;
Map<ReservationInterval, ReservationRequest> allocations =
new HashMap<ReservationInterval, ReservationRequest>();
RLESparseResourceAllocation tempAssigned =
new RLESparseResourceAllocation(plan.getResourceCalculator(),
plan.getMinimumAllocation());
List<ReservationRequest> stages = contract.getReservationRequests()
.getReservationResources();
ReservationRequestInterpreter type = contract.getReservationRequests()
.getInterpreter();
// Iterate the stages in backward from deadline
for (ListIterator<ReservationRequest> li =
stages.listIterator(stages.size()); li.hasPrevious();) {
ReservationRequest currentReservationStage = li.previous();
// validate the RR respect basic constraints
validateInput(plan, currentReservationStage, totalCapacity);
// run allocation for a single stage
Map<ReservationInterval, ReservationRequest> curAlloc =
placeSingleStage(plan, tempAssigned, currentReservationStage,
earliestStart, curDeadline, oldReservation, totalCapacity);
if (curAlloc == null) {
// if we did not find an allocation for the currentReservationStage
// return null, unless the ReservationDefinition we are placing is of
// type ANY
if (type != ReservationRequestInterpreter.R_ANY) {
throw new PlanningException("The GreedyAgent"
+ " couldn't find a valid allocation for your request");
} else {
continue;
}
} else {
// if we did find an allocation add it to the set of allocations
allocations.putAll(curAlloc);
// if this request is of type ANY we are done searching (greedy)
// and can return the current allocation (break-out of the search)
if (type == ReservationRequestInterpreter.R_ANY) {
break;
}
// if the request is of ORDER or ORDER_NO_GAP we constraint the next
// round of allocation to precede the current allocation, by setting
// curDeadline
if (type == ReservationRequestInterpreter.R_ORDER
|| type == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
curDeadline = findEarliestTime(curAlloc.keySet());
// for ORDER_NO_GAP verify that the allocation found so far has no
// gap, return null otherwise (the greedy procedure failed to find a
// no-gap
// allocation)
if (type == ReservationRequestInterpreter.R_ORDER_NO_GAP
&& oldDeadline > 0) {
if (oldDeadline - findLatestTime(curAlloc.keySet()) > plan
.getStep()) {
throw new PlanningException("The GreedyAgent"
+ " couldn't find a valid allocation for your request");
}
}
// keep the variable oldDeadline pointing to the last deadline we
// found
oldDeadline = curDeadline;
}
}
}
// / If we got here is because we failed to find an allocation for the
// ReservationDefinition give-up and report failure to the user
if (allocations.isEmpty()) {
throw new PlanningException("The GreedyAgent"
+ " couldn't find a valid allocation for your request");
}
// create reservation with above allocations if not null/empty
ReservationRequest ZERO_RES =
ReservationRequest.newInstance(Resource.newInstance(0, 0), 0);
long firstStartTime = findEarliestTime(allocations.keySet());
// add zero-padding from arrival up to the first non-null allocation
// to guarantee that the reservation exists starting at arrival
if (firstStartTime > earliestStart) {
allocations.put(new ReservationInterval(earliestStart,
firstStartTime), ZERO_RES);
firstStartTime = earliestStart;
// consider to add trailing zeros at the end for simmetry
}
// Actually add/update the reservation in the plan.
// This is subject to validation as other agents might be placing
// in parallel and there might be sharing policies the agent is not
// aware off.
ReservationAllocation capReservation =
new InMemoryReservationAllocation(reservationId, contract, user,
plan.getQueueName(), firstStartTime,
findLatestTime(allocations.keySet()), allocations,
plan.getResourceCalculator(), plan.getMinimumAllocation());
if (oldReservation != null) {
return plan.updateReservation(capReservation);
} else {
return plan.addReservation(capReservation);
}
}
private void validateInput(Plan plan, ReservationRequest rr,
Resource totalCapacity) throws ContractValidationException {
if (rr.getConcurrency() < 1) {
throw new ContractValidationException("Gang Size should be >= 1");
}
if (rr.getNumContainers() <= 0) {
throw new ContractValidationException("Num containers should be >= 0");
}
// check that gangSize and numContainers are compatible
if (rr.getNumContainers() % rr.getConcurrency() != 0) {
throw new ContractValidationException(
"Parallelism must be an exact multiple of gang size");
}
// check that the largest container request does not exceed
// the cluster-wide limit for container sizes
if (Resources.greaterThan(plan.getResourceCalculator(), totalCapacity,
rr.getCapability(), plan.getMaximumAllocation())) {
throw new ContractValidationException("Individual"
+ " capability requests should not exceed cluster's maxAlloc");
}
}
/**
* This method actually perform the placement of an atomic stage of the
* reservation. The key idea is to traverse the plan backward for a
* "lease-duration" worth of time, and compute what is the maximum multiple of
* our concurrency (gang) parameter we can fit. We do this and move towards
* previous instant in time until the time-window is exhausted or we placed
* all the user request.
*/
private Map<ReservationInterval, ReservationRequest> placeSingleStage(
Plan plan, RLESparseResourceAllocation tempAssigned,
ReservationRequest rr, long earliestStart, long curDeadline,
ReservationAllocation oldResAllocation, final Resource totalCapacity) {
Map<ReservationInterval, ReservationRequest> allocationRequests =
new HashMap<ReservationInterval, ReservationRequest>();
// compute the gang as a resource and get the duration
Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
long dur = rr.getDuration();
long step = plan.getStep();
// ceil the duration to the next multiple of the plan step
if (dur % step != 0) {
dur += (step - (dur % step));
}
// we know for sure that this division has no remainder (part of contract
// with user, validate before
int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
int maxGang = 0;
// loop trying to place until we are done, or we are considering
// an invalid range of times
while (gangsToPlace > 0 && curDeadline - dur >= earliestStart) {
// as we run along we remember how many gangs we can fit, and what
// was the most constraining moment in time (we will restart just
// after that to place the next batch)
maxGang = gangsToPlace;
long minPoint = curDeadline;
int curMaxGang = maxGang;
// start placing at deadline (excluded due to [,) interval semantics and
// move backward
for (long t = curDeadline - plan.getStep(); t >= curDeadline - dur
&& maxGang > 0; t = t - plan.getStep()) {
// As we run along we will logically remove the previous allocation for
// this reservation
// if one existed
Resource oldResCap = Resource.newInstance(0, 0);
if (oldResAllocation != null) {
oldResCap = oldResAllocation.getResourcesAtTime(t);
}
// compute net available resources
Resource netAvailableRes = Resources.clone(totalCapacity);
Resources.addTo(netAvailableRes, oldResCap);
Resources.subtractFrom(netAvailableRes,
plan.getTotalCommittedResources(t));
Resources.subtractFrom(netAvailableRes,
tempAssigned.getCapacityAtTime(t));
// compute maximum number of gangs we could fit
curMaxGang =
(int) Math.floor(Resources.divide(plan.getResourceCalculator(),
totalCapacity, netAvailableRes, gang));
// pick the minimum between available resources in this instant, and how
// many gangs we have to place
curMaxGang = Math.min(gangsToPlace, curMaxGang);
// compare with previous max, and set it. also remember *where* we found
// the minimum (useful for next attempts)
if (curMaxGang <= maxGang) {
maxGang = curMaxGang;
minPoint = t;
}
}
// if we were able to place any gang, record this, and decrement
// gangsToPlace
if (maxGang > 0) {
gangsToPlace -= maxGang;
ReservationInterval reservationInt =
new ReservationInterval(curDeadline - dur, curDeadline);
ReservationRequest reservationRes =
ReservationRequest.newInstance(rr.getCapability(),
rr.getConcurrency() * maxGang, rr.getConcurrency(),
rr.getDuration());
// remember occupied space (plan is read-only till we find a plausible
// allocation for the entire request). This is needed since we might be
// placing other ReservationRequest within the same
// ReservationDefinition,
// and we must avoid double-counting the available resources
tempAssigned.addInterval(reservationInt, reservationRes);
allocationRequests.put(reservationInt, reservationRes);
}
// reset our new starting point (curDeadline) to the most constraining
// point so far, we will look "left" of that to find more places where
// to schedule gangs (for sure nothing on the "right" of this point can
// fit a full gang.
curDeadline = minPoint;
}
// if no gangs are left to place we succeed and return the allocation
if (gangsToPlace == 0) {
return allocationRequests;
} else {
// If we are here is becasue we did not manage to satisfy this request.
// So we need to remove unwanted side-effect from tempAssigned (needed
// for ANY).
for (Map.Entry<ReservationInterval, ReservationRequest> tempAllocation :
allocationRequests.entrySet()) {
tempAssigned.removeInterval(tempAllocation.getKey(),
tempAllocation.getValue());
}
// and return null to signal failure in this allocation
return null;
}
}
// finds the leftmost point of this set of ReservationInterval
private long findEarliestTime(Set<ReservationInterval> resInt) {
long ret = Long.MAX_VALUE;
for (ReservationInterval s : resInt) {
if (s.getStartTime() < ret) {
ret = s.getStartTime();
}
}
return ret;
}
// finds the rightmost point of this set of ReservationIntervals
private long findLatestTime(Set<ReservationInterval> resInt) {
long ret = Long.MIN_VALUE;
for (ReservationInterval s : resInt) {
if (s.getEndTime() > ret) {
ret = s.getEndTime();
}
}
return ret;
}
}

View File

@ -0,0 +1,55 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
/**
* An entity that seeks to acquire resources to satisfy an user's contract
*/
public interface ReservationAgent {
/**
* Create a reservation for the user that abides by the specified contract
*
* @param reservationId the identifier of the reservation to be created.
* @param user the user who wants to create the reservation
* @param plan the Plan to which the reservation must be fitted
* @param contract encapsulates the resources the user requires for his
* session
*
* @return whether the create operation was successful or not
* @throws PlanningException if the session cannot be fitted into the plan
*/
public boolean createReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException;
/**
* Update a reservation for the user that abides by the specified contract
*
* @param reservationId the identifier of the reservation to be updated
* @param user the user who wants to create the session
* @param plan the Plan to which the reservation must be fitted
* @param contract encapsulates the resources the user requires for his
* reservation
*
* @return whether the update operation was successful or not
* @throws PlanningException if the reservation cannot be fitted into the plan
*/
public boolean updateReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException;
/**
* Delete an user reservation
*
* @param reservationId the identifier of the reservation to be deleted
* @param user the user who wants to create the reservation
* @param plan the Plan to which the session must be fitted
*
* @return whether the delete operation was successful or not
* @throws PlanningException if the reservation cannot be fitted into the plan
*/
public boolean deleteReservation(ReservationId reservationId, String user,
Plan plan) throws PlanningException;
}

View File

@ -0,0 +1,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
public class ContractValidationException extends PlanningException {
private static final long serialVersionUID = 1L;
public ContractValidationException(String message) {
super(message);
}
}

View File

@ -0,0 +1,588 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Test;
import org.mortbay.log.Log;
public class TestGreedyReservationAgent {
ReservationAgent agent;
InMemoryPlan plan;
Resource minAlloc = Resource.newInstance(1024, 1);
ResourceCalculator res = new DefaultResourceCalculator();
Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
Random rand = new Random();
long step;
@Before
public void setup() throws Exception {
long seed = rand.nextLong();
rand.setSeed(seed);
Log.info("Running with seed: " + seed);
// setting completely loose quotas
long timeWindow = 1000000L;
Resource clusterCapacity = Resource.newInstance(100 * 1024, 100);
step = 1000L;
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
CapacityScheduler scheduler = testUtil.mockCapacityScheduler(125);
String reservationQ = testUtil.getFullReservationQueueName();
CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
capConf.setReservationWindow(reservationQ, timeWindow);
capConf.setMaximumCapacity(reservationQ, 100);
capConf.setAverageCapacity(reservationQ, 100);
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
policy.init(reservationQ, capConf, new HashSet<String>());
agent = new GreedyReservationAgent();
QueueMetrics queueMetrics = QueueMetrics.forQueue("dedicated",
mock(ParentQueue.class), false, capConf);
plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
res, minAlloc, maxAlloc, "dedicated", null, true);
}
@SuppressWarnings("javadoc")
@Test
public void testSimple() throws PlanningException {
prepareBasicPlan();
// create a request with a single atomic ask
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(5 * step);
rr.setDeadline(20 * step);
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(2048, 2), 10, 5, 10 * step);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setReservationResources(Collections.singletonList(r));
rr.setReservationRequests(reqs);
ReservationId reservationID = ReservationSystemTestUtil
.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr);
assertTrue("Agent-based allocation failed", reservationID != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == 3);
ReservationAllocation cs = plan.getReservationById(reservationID);
System.out.println("--------AFTER SIMPLE ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
for (long i = 10 * step; i < 20 * step; i++) {
assertTrue(
"Agent-based allocation unexpected",
Resources.equals(cs.getResourcesAtTime(i),
Resource.newInstance(2048 * 10, 2 * 10)));
}
}
@Test
public void testOrder() throws PlanningException {
prepareBasicPlan();
// create a completely utilized segment around time 30
int[] f = { 100, 100 };
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", 30 * step, 30 * step + f.length * step,
ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
res, minAlloc)));
// create a chain of 4 RR, mixing gang and non-gang
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(0 * step);
rr.setDeadline(70 * step);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER);
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(2048, 2), 10, 1, 10 * step);
ReservationRequest r2 = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 10, 10, 20 * step);
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
list.add(r);
list.add(r2);
list.add(r);
list.add(r2);
reqs.setReservationResources(list);
rr.setReservationRequests(reqs);
// submit to agent
ReservationId reservationID = ReservationSystemTestUtil
.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr);
// validate
assertTrue("Agent-based allocation failed", reservationID != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == 4);
ReservationAllocation cs = plan.getReservationById(reservationID);
assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1));
assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1));
System.out.println("--------AFTER ORDER ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
}
@Test
public void testOrderNoGapImpossible() throws PlanningException {
prepareBasicPlan();
// create a completely utilized segment at time 30
int[] f = { 100, 100 };
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", 30 * step, 30 * step + f.length * step,
ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
res, minAlloc)));
// create a chain of 4 RR, mixing gang and non-gang
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(0L);
rr.setDeadline(70L);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(2048, 2), 10, 1, 10);
ReservationRequest r2 = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 10, 10, 20);
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
list.add(r);
list.add(r2);
list.add(r);
list.add(r2);
reqs.setReservationResources(list);
rr.setReservationRequests(reqs);
ReservationId reservationID = ReservationSystemTestUtil
.getNewReservationId();
boolean result = false;
try {
// submit to agent
result = agent.createReservation(reservationID, "u1", plan, rr);
fail();
} catch (PlanningException p) {
// expected
}
// validate
assertFalse("Agent-based allocation should have failed", result);
assertTrue("Agent-based allocation should have failed", plan
.getAllReservations().size() == 3);
System.out
.println("--------AFTER ORDER_NO_GAP IMPOSSIBLE ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
}
@Test
public void testOrderNoGap() throws PlanningException {
prepareBasicPlan();
// create a chain of 4 RR, mixing gang and non-gang
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(0 * step);
rr.setDeadline(60 * step);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(2048, 2), 10, 1, 10 * step);
ReservationRequest r2 = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 10, 10, 20 * step);
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
list.add(r);
list.add(r2);
list.add(r);
list.add(r2);
reqs.setReservationResources(list);
rr.setReservationRequests(reqs);
rr.setReservationRequests(reqs);
// submit to agent
ReservationId reservationID = ReservationSystemTestUtil
.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr);
System.out.println("--------AFTER ORDER ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
// validate
assertTrue("Agent-based allocation failed", reservationID != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == 3);
ReservationAllocation cs = plan.getReservationById(reservationID);
assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
assertTrue(cs.toString(), check(cs, 30 * step, 40 * step, 20, 1024, 1));
assertTrue(cs.toString(), check(cs, 40 * step, 60 * step, 10, 1024, 1));
}
@Test
public void testSingleSliding() throws PlanningException {
prepareBasicPlan();
// create a single request for which we need subsequent (tight) packing.
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(100 * step);
rr.setDeadline(120 * step);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 200, 10, 10 * step);
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
list.add(r);
reqs.setReservationResources(list);
rr.setReservationRequests(reqs);
// submit to agent
ReservationId reservationID = ReservationSystemTestUtil
.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr);
// validate results, we expect the second one to be accepted
assertTrue("Agent-based allocation failed", reservationID != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == 3);
ReservationAllocation cs = plan.getReservationById(reservationID);
assertTrue(cs.toString(), check(cs, 100 * step, 120 * step, 100, 1024, 1));
System.out.println("--------AFTER packed ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
}
@Test
public void testAny() throws PlanningException {
prepareBasicPlan();
// create an ANY request, with an impossible step (last in list, first
// considered),
// and two satisfiable ones. We expect the second one to be returned.
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(100 * step);
rr.setDeadline(120 * step);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 5, 5, 10 * step);
ReservationRequest r2 = ReservationRequest.newInstance(
Resource.newInstance(2048, 2), 10, 5, 10 * step);
ReservationRequest r3 = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 110, 110, 10 * step);
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
list.add(r);
list.add(r2);
list.add(r3);
reqs.setReservationResources(list);
rr.setReservationRequests(reqs);
// submit to agent
ReservationId reservationID = ReservationSystemTestUtil
.getNewReservationId();
boolean res = agent.createReservation(reservationID, "u1", plan, rr);
// validate results, we expect the second one to be accepted
assertTrue("Agent-based allocation failed", res);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == 3);
ReservationAllocation cs = plan.getReservationById(reservationID);
assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID
+ ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
}
@Test
public void testAnyImpossible() throws PlanningException {
prepareBasicPlan();
// create an ANY request, with all impossible alternatives
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(100L);
rr.setDeadline(120L);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
// longer than arrival-deadline
ReservationRequest r1 = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 35, 5, 30);
// above max cluster size
ReservationRequest r2 = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 110, 110, 10);
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
list.add(r1);
list.add(r2);
reqs.setReservationResources(list);
rr.setReservationRequests(reqs);
ReservationId reservationID = ReservationSystemTestUtil
.getNewReservationId();
boolean result = false;
try {
// submit to agent
result = agent.createReservation(reservationID, "u1", plan, rr);
fail();
} catch (PlanningException p) {
// expected
}
// validate results, we expect the second one to be accepted
assertFalse("Agent-based allocation should have failed", result);
assertTrue("Agent-based allocation should have failed", plan
.getAllReservations().size() == 2);
System.out.println("--------AFTER ANY IMPOSSIBLE ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
}
@Test
public void testAll() throws PlanningException {
prepareBasicPlan();
// create an ALL request
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(100 * step);
rr.setDeadline(120 * step);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 5, 5, 10 * step);
ReservationRequest r2 = ReservationRequest.newInstance(
Resource.newInstance(2048, 2), 10, 10, 20 * step);
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
list.add(r);
list.add(r2);
reqs.setReservationResources(list);
rr.setReservationRequests(reqs);
// submit to agent
ReservationId reservationID = ReservationSystemTestUtil
.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr);
// validate results, we expect the second one to be accepted
assertTrue("Agent-based allocation failed", reservationID != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == 3);
ReservationAllocation cs = plan.getReservationById(reservationID);
assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 20, 1024, 1));
assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 25, 1024, 1));
System.out.println("--------AFTER ALL ALLOCATION (queue: " + reservationID
+ ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
}
@Test
public void testAllImpossible() throws PlanningException {
prepareBasicPlan();
// create an ALL request, with an impossible combination, it should be
// rejected, and allocation remain unchanged
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(100L);
rr.setDeadline(120L);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 55, 5, 10);
ReservationRequest r2 = ReservationRequest.newInstance(
Resource.newInstance(2048, 2), 55, 5, 20);
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
list.add(r);
list.add(r2);
reqs.setReservationResources(list);
rr.setReservationRequests(reqs);
ReservationId reservationID = ReservationSystemTestUtil
.getNewReservationId();
boolean result = false;
try {
// submit to agent
result = agent.createReservation(reservationID, "u1", plan, rr);
fail();
} catch (PlanningException p) {
// expected
}
// validate results, we expect the second one to be accepted
assertFalse("Agent-based allocation failed", result);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == 2);
System.out.println("--------AFTER ALL IMPOSSIBLE ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
}
private void prepareBasicPlan() throws PlanningException {
// insert in the reservation a couple of controlled reservations, to create
// conditions for assignment that are non-empty
int[] f = { 10, 10, 20, 20, 20, 10, 10 };
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil
.generateAllocation(0, step, f), res, minAlloc)));
int[] f2 = { 5, 5, 5, 5, 5, 5, 5 };
Map<ReservationInterval, ReservationRequest> alloc =
ReservationSystemTestUtil.generateAllocation(5000, step, f2);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc)));
System.out.println("--------BEFORE AGENT----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
}
private boolean check(ReservationAllocation cs, long start, long end,
int containers, int mem, int cores) {
boolean res = true;
for (long i = start; i < end; i++) {
res = res
&& Resources.equals(cs.getResourcesAtTime(i),
Resource.newInstance(mem * containers, cores * containers));
}
return res;
}
public void testStress(int numJobs) throws PlanningException, IOException {
long timeWindow = 1000000L;
Resource clusterCapacity = Resource.newInstance(500 * 100 * 1024, 500 * 32);
step = 1000L;
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100);
String reservationQ = testUtil.getFullReservationQueueName();
CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
capConf.setReservationWindow(reservationQ, timeWindow);
capConf.setMaximumCapacity(reservationQ, 100);
capConf.setAverageCapacity(reservationQ, 100);
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
policy.init(reservationQ, capConf, new HashSet<String>());
plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent,
clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true);
int acc = 0;
List<ReservationDefinition> list = new ArrayList<ReservationDefinition>();
for (long i = 0; i < numJobs; i++) {
list.add(ReservationSystemTestUtil.generateRandomRR(rand, i));
}
long start = System.currentTimeMillis();
for (int i = 0; i < numJobs; i++) {
try {
if (agent.createReservation(
ReservationSystemTestUtil.getNewReservationId(), "u" + i % 100,
plan, list.get(i))) {
acc++;
}
} catch (PlanningException p) {
// ignore exceptions
}
}
long end = System.currentTimeMillis();
System.out.println("Submitted " + numJobs + " jobs " + " accepted " + acc
+ " in " + (end - start) + "ms");
}
public static void main(String[] arg) {
// run a stress test with by default 1000 random jobs
int numJobs = 1000;
if (arg.length > 0) {
numJobs = Integer.parseInt(arg[0]);
}
try {
TestGreedyReservationAgent test = new TestGreedyReservationAgent();
test.setup();
test.testStress(numJobs);
} catch (Exception e) {
e.printStackTrace();
}
}
}