diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt index 410d974cce0..deece7c6d96 100644 --- a/YARN-1051-CHANGES.txt +++ b/YARN-1051-CHANGES.txt @@ -4,7 +4,13 @@ CapacityScheduler. (Carlo Curino and Subru Krishnan via curino) YARN-2475. Logic for responding to capacity drops for the ReservationSystem. (Carlo Curino and Subru Krishnan via curino) -YARN-1708. Public YARN APIs for creating/updating/deleting reservations. (subru) +YARN-1708. Public YARN APIs for creating/updating/deleting +reservations. (Carlo Curino and Subru Krishnan via subru) -YARN-1709. In-memory data structures used to track resources over time to -enable reservations. (subru) +YARN-1709. In-memory data structures used to track resources over +time to enable reservations. (Carlo Curino and Subru Krishnan via +subru) + +YARN-1710. Logic to find allocations within a Plan that satisfy +user ReservationRequest(s). (Carlo Curino and Subru Krishnan via +curino) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java new file mode 100644 index 00000000000..3214f93e0ab --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java @@ -0,0 +1,367 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.HashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This Agent employs a simple greedy placement strategy, placing the various + * stages of a {@link ReservationRequest} from the deadline moving backward + * towards the arrival. This allows jobs with earlier deadline to be scheduled + * greedily as well. Combined with an opportunistic anticipation of work if the + * cluster is not fully utilized also seems to provide good latency for + * best-effort jobs (i.e., jobs running without a reservation). + * + * This agent does not account for locality and only consider container + * granularity for validation purposes (i.e., you can't exceed max-container + * size). + */ +public class GreedyReservationAgent implements ReservationAgent { + + private static final Logger LOG = LoggerFactory + .getLogger(GreedyReservationAgent.class); + + @Override + public boolean createReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + return computeAllocation(reservationId, user, plan, contract, null); + } + + @Override + public boolean updateReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + return computeAllocation(reservationId, user, plan, contract, + plan.getReservationById(reservationId)); + } + + @Override + public boolean deleteReservation(ReservationId reservationId, String user, + Plan plan) throws PlanningException { + return plan.deleteReservation(reservationId); + } + + private boolean computeAllocation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract, + ReservationAllocation oldReservation) throws PlanningException, + ContractValidationException { + LOG.info("placing the following ReservationRequest: " + contract); + + Resource totalCapacity = plan.getTotalCapacity(); + + // Here we can addd logic to adjust the ResourceDefinition to account for + // system "imperfections" (e.g., scheduling delays for large containers). + + // Align with plan step conservatively (i.e., ceil arrival, and floor + // deadline) + long earliestStart = contract.getArrival(); + long step = plan.getStep(); + if (earliestStart % step != 0) { + earliestStart = earliestStart + (step - (earliestStart % step)); + } + long deadline = + contract.getDeadline() - contract.getDeadline() % plan.getStep(); + + // setup temporary variables to handle time-relations between stages and + // intermediate answers + long curDeadline = deadline; + long oldDeadline = -1; + + Map allocations = + new HashMap(); + RLESparseResourceAllocation tempAssigned = + new RLESparseResourceAllocation(plan.getResourceCalculator(), + plan.getMinimumAllocation()); + + List stages = contract.getReservationRequests() + .getReservationResources(); + ReservationRequestInterpreter type = contract.getReservationRequests() + .getInterpreter(); + + // Iterate the stages in backward from deadline + for (ListIterator li = + stages.listIterator(stages.size()); li.hasPrevious();) { + + ReservationRequest currentReservationStage = li.previous(); + + // validate the RR respect basic constraints + validateInput(plan, currentReservationStage, totalCapacity); + + // run allocation for a single stage + Map curAlloc = + placeSingleStage(plan, tempAssigned, currentReservationStage, + earliestStart, curDeadline, oldReservation, totalCapacity); + + if (curAlloc == null) { + // if we did not find an allocation for the currentReservationStage + // return null, unless the ReservationDefinition we are placing is of + // type ANY + if (type != ReservationRequestInterpreter.R_ANY) { + throw new PlanningException("The GreedyAgent" + + " couldn't find a valid allocation for your request"); + } else { + continue; + } + } else { + + // if we did find an allocation add it to the set of allocations + allocations.putAll(curAlloc); + + // if this request is of type ANY we are done searching (greedy) + // and can return the current allocation (break-out of the search) + if (type == ReservationRequestInterpreter.R_ANY) { + break; + } + + // if the request is of ORDER or ORDER_NO_GAP we constraint the next + // round of allocation to precede the current allocation, by setting + // curDeadline + if (type == ReservationRequestInterpreter.R_ORDER + || type == ReservationRequestInterpreter.R_ORDER_NO_GAP) { + curDeadline = findEarliestTime(curAlloc.keySet()); + + // for ORDER_NO_GAP verify that the allocation found so far has no + // gap, return null otherwise (the greedy procedure failed to find a + // no-gap + // allocation) + if (type == ReservationRequestInterpreter.R_ORDER_NO_GAP + && oldDeadline > 0) { + if (oldDeadline - findLatestTime(curAlloc.keySet()) > plan + .getStep()) { + throw new PlanningException("The GreedyAgent" + + " couldn't find a valid allocation for your request"); + } + } + // keep the variable oldDeadline pointing to the last deadline we + // found + oldDeadline = curDeadline; + } + } + } + + // / If we got here is because we failed to find an allocation for the + // ReservationDefinition give-up and report failure to the user + if (allocations.isEmpty()) { + throw new PlanningException("The GreedyAgent" + + " couldn't find a valid allocation for your request"); + } + + // create reservation with above allocations if not null/empty + + ReservationRequest ZERO_RES = + ReservationRequest.newInstance(Resource.newInstance(0, 0), 0); + + long firstStartTime = findEarliestTime(allocations.keySet()); + + // add zero-padding from arrival up to the first non-null allocation + // to guarantee that the reservation exists starting at arrival + if (firstStartTime > earliestStart) { + allocations.put(new ReservationInterval(earliestStart, + firstStartTime), ZERO_RES); + firstStartTime = earliestStart; + // consider to add trailing zeros at the end for simmetry + } + + // Actually add/update the reservation in the plan. + // This is subject to validation as other agents might be placing + // in parallel and there might be sharing policies the agent is not + // aware off. + ReservationAllocation capReservation = + new InMemoryReservationAllocation(reservationId, contract, user, + plan.getQueueName(), firstStartTime, + findLatestTime(allocations.keySet()), allocations, + plan.getResourceCalculator(), plan.getMinimumAllocation()); + if (oldReservation != null) { + return plan.updateReservation(capReservation); + } else { + return plan.addReservation(capReservation); + } + } + + private void validateInput(Plan plan, ReservationRequest rr, + Resource totalCapacity) throws ContractValidationException { + + if (rr.getConcurrency() < 1) { + throw new ContractValidationException("Gang Size should be >= 1"); + } + + if (rr.getNumContainers() <= 0) { + throw new ContractValidationException("Num containers should be >= 0"); + } + + // check that gangSize and numContainers are compatible + if (rr.getNumContainers() % rr.getConcurrency() != 0) { + throw new ContractValidationException( + "Parallelism must be an exact multiple of gang size"); + } + + // check that the largest container request does not exceed + // the cluster-wide limit for container sizes + if (Resources.greaterThan(plan.getResourceCalculator(), totalCapacity, + rr.getCapability(), plan.getMaximumAllocation())) { + throw new ContractValidationException("Individual" + + " capability requests should not exceed cluster's maxAlloc"); + } + } + + /** + * This method actually perform the placement of an atomic stage of the + * reservation. The key idea is to traverse the plan backward for a + * "lease-duration" worth of time, and compute what is the maximum multiple of + * our concurrency (gang) parameter we can fit. We do this and move towards + * previous instant in time until the time-window is exhausted or we placed + * all the user request. + */ + private Map placeSingleStage( + Plan plan, RLESparseResourceAllocation tempAssigned, + ReservationRequest rr, long earliestStart, long curDeadline, + ReservationAllocation oldResAllocation, final Resource totalCapacity) { + + Map allocationRequests = + new HashMap(); + + // compute the gang as a resource and get the duration + Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency()); + long dur = rr.getDuration(); + long step = plan.getStep(); + + // ceil the duration to the next multiple of the plan step + if (dur % step != 0) { + dur += (step - (dur % step)); + } + + // we know for sure that this division has no remainder (part of contract + // with user, validate before + int gangsToPlace = rr.getNumContainers() / rr.getConcurrency(); + + int maxGang = 0; + + // loop trying to place until we are done, or we are considering + // an invalid range of times + while (gangsToPlace > 0 && curDeadline - dur >= earliestStart) { + + // as we run along we remember how many gangs we can fit, and what + // was the most constraining moment in time (we will restart just + // after that to place the next batch) + maxGang = gangsToPlace; + long minPoint = curDeadline; + int curMaxGang = maxGang; + + // start placing at deadline (excluded due to [,) interval semantics and + // move backward + for (long t = curDeadline - plan.getStep(); t >= curDeadline - dur + && maxGang > 0; t = t - plan.getStep()) { + + // As we run along we will logically remove the previous allocation for + // this reservation + // if one existed + Resource oldResCap = Resource.newInstance(0, 0); + if (oldResAllocation != null) { + oldResCap = oldResAllocation.getResourcesAtTime(t); + } + + // compute net available resources + Resource netAvailableRes = Resources.clone(totalCapacity); + Resources.addTo(netAvailableRes, oldResCap); + Resources.subtractFrom(netAvailableRes, + plan.getTotalCommittedResources(t)); + Resources.subtractFrom(netAvailableRes, + tempAssigned.getCapacityAtTime(t)); + + // compute maximum number of gangs we could fit + curMaxGang = + (int) Math.floor(Resources.divide(plan.getResourceCalculator(), + totalCapacity, netAvailableRes, gang)); + + // pick the minimum between available resources in this instant, and how + // many gangs we have to place + curMaxGang = Math.min(gangsToPlace, curMaxGang); + + // compare with previous max, and set it. also remember *where* we found + // the minimum (useful for next attempts) + if (curMaxGang <= maxGang) { + maxGang = curMaxGang; + minPoint = t; + } + } + + // if we were able to place any gang, record this, and decrement + // gangsToPlace + if (maxGang > 0) { + gangsToPlace -= maxGang; + + ReservationInterval reservationInt = + new ReservationInterval(curDeadline - dur, curDeadline); + ReservationRequest reservationRes = + ReservationRequest.newInstance(rr.getCapability(), + rr.getConcurrency() * maxGang, rr.getConcurrency(), + rr.getDuration()); + // remember occupied space (plan is read-only till we find a plausible + // allocation for the entire request). This is needed since we might be + // placing other ReservationRequest within the same + // ReservationDefinition, + // and we must avoid double-counting the available resources + tempAssigned.addInterval(reservationInt, reservationRes); + allocationRequests.put(reservationInt, reservationRes); + + } + + // reset our new starting point (curDeadline) to the most constraining + // point so far, we will look "left" of that to find more places where + // to schedule gangs (for sure nothing on the "right" of this point can + // fit a full gang. + curDeadline = minPoint; + } + + // if no gangs are left to place we succeed and return the allocation + if (gangsToPlace == 0) { + return allocationRequests; + } else { + // If we are here is becasue we did not manage to satisfy this request. + // So we need to remove unwanted side-effect from tempAssigned (needed + // for ANY). + for (Map.Entry tempAllocation : + allocationRequests.entrySet()) { + tempAssigned.removeInterval(tempAllocation.getKey(), + tempAllocation.getValue()); + } + // and return null to signal failure in this allocation + return null; + } + } + + // finds the leftmost point of this set of ReservationInterval + private long findEarliestTime(Set resInt) { + long ret = Long.MAX_VALUE; + for (ReservationInterval s : resInt) { + if (s.getStartTime() < ret) { + ret = s.getStartTime(); + } + } + return ret; + } + + // finds the rightmost point of this set of ReservationIntervals + private long findLatestTime(Set resInt) { + long ret = Long.MIN_VALUE; + for (ReservationInterval s : resInt) { + if (s.getEndTime() > ret) { + ret = s.getEndTime(); + } + } + return ret; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java new file mode 100644 index 00000000000..fe1941d0288 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java @@ -0,0 +1,55 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; + +/** + * An entity that seeks to acquire resources to satisfy an user's contract + */ +public interface ReservationAgent { + + /** + * Create a reservation for the user that abides by the specified contract + * + * @param reservationId the identifier of the reservation to be created. + * @param user the user who wants to create the reservation + * @param plan the Plan to which the reservation must be fitted + * @param contract encapsulates the resources the user requires for his + * session + * + * @return whether the create operation was successful or not + * @throws PlanningException if the session cannot be fitted into the plan + */ + public boolean createReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException; + + /** + * Update a reservation for the user that abides by the specified contract + * + * @param reservationId the identifier of the reservation to be updated + * @param user the user who wants to create the session + * @param plan the Plan to which the reservation must be fitted + * @param contract encapsulates the resources the user requires for his + * reservation + * + * @return whether the update operation was successful or not + * @throws PlanningException if the reservation cannot be fitted into the plan + */ + public boolean updateReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException; + + /** + * Delete an user reservation + * + * @param reservationId the identifier of the reservation to be deleted + * @param user the user who wants to create the reservation + * @param plan the Plan to which the session must be fitted + * + * @return whether the delete operation was successful or not + * @throws PlanningException if the reservation cannot be fitted into the plan + */ + public boolean deleteReservation(ReservationId reservationId, String user, + Plan plan) throws PlanningException; + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java new file mode 100644 index 00000000000..7ee5a76852f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java @@ -0,0 +1,12 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions; + +public class ContractValidationException extends PlanningException { + + private static final long serialVersionUID = 1L; + + public ContractValidationException(String message) { + super(message); + } + + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java new file mode 100644 index 00000000000..0b0201d6ef7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java @@ -0,0 +1,588 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Before; +import org.junit.Test; +import org.mortbay.log.Log; + +public class TestGreedyReservationAgent { + + ReservationAgent agent; + InMemoryPlan plan; + Resource minAlloc = Resource.newInstance(1024, 1); + ResourceCalculator res = new DefaultResourceCalculator(); + Resource maxAlloc = Resource.newInstance(1024 * 8, 8); + Random rand = new Random(); + long step; + + @Before + public void setup() throws Exception { + + long seed = rand.nextLong(); + rand.setSeed(seed); + Log.info("Running with seed: " + seed); + + // setting completely loose quotas + long timeWindow = 1000000L; + Resource clusterCapacity = Resource.newInstance(100 * 1024, 100); + step = 1000L; + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + CapacityScheduler scheduler = testUtil.mockCapacityScheduler(125); + String reservationQ = testUtil.getFullReservationQueueName(); + CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); + capConf.setReservationWindow(reservationQ, timeWindow); + capConf.setMaximumCapacity(reservationQ, 100); + capConf.setAverageCapacity(reservationQ, 100); + CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); + policy.init(reservationQ, capConf, new HashSet()); + agent = new GreedyReservationAgent(); + + QueueMetrics queueMetrics = QueueMetrics.forQueue("dedicated", + mock(ParentQueue.class), false, capConf); + + plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step, + res, minAlloc, maxAlloc, "dedicated", null, true); + } + + @SuppressWarnings("javadoc") + @Test + public void testSimple() throws PlanningException { + + prepareBasicPlan(); + + // create a request with a single atomic ask + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(5 * step); + rr.setDeadline(20 * step); + ReservationRequest r = ReservationRequest.newInstance( + Resource.newInstance(2048, 2), 10, 5, 10 * step); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setReservationResources(Collections.singletonList(r)); + rr.setReservationRequests(reqs); + + ReservationId reservationID = ReservationSystemTestUtil + .getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr); + + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == 3); + + ReservationAllocation cs = plan.getReservationById(reservationID); + + System.out.println("--------AFTER SIMPLE ALLOCATION (queue: " + + reservationID + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + for (long i = 10 * step; i < 20 * step; i++) { + assertTrue( + "Agent-based allocation unexpected", + Resources.equals(cs.getResourcesAtTime(i), + Resource.newInstance(2048 * 10, 2 * 10))); + } + + } + + @Test + public void testOrder() throws PlanningException { + prepareBasicPlan(); + + // create a completely utilized segment around time 30 + int[] f = { 100, 100 }; + + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", 30 * step, 30 * step + f.length * step, + ReservationSystemTestUtil.generateAllocation(30 * step, step, f), + res, minAlloc))); + + // create a chain of 4 RR, mixing gang and non-gang + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(0 * step); + rr.setDeadline(70 * step); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER); + ReservationRequest r = ReservationRequest.newInstance( + Resource.newInstance(2048, 2), 10, 1, 10 * step); + ReservationRequest r2 = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 10, 10, 20 * step); + List list = new ArrayList(); + list.add(r); + list.add(r2); + list.add(r); + list.add(r2); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + + // submit to agent + ReservationId reservationID = ReservationSystemTestUtil + .getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr); + + // validate + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == 4); + + ReservationAllocation cs = plan.getReservationById(reservationID); + + assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1)); + assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1)); + assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1)); + assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1)); + + System.out.println("--------AFTER ORDER ALLOCATION (queue: " + + reservationID + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + } + + @Test + public void testOrderNoGapImpossible() throws PlanningException { + prepareBasicPlan(); + // create a completely utilized segment at time 30 + int[] f = { 100, 100 }; + + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", 30 * step, 30 * step + f.length * step, + ReservationSystemTestUtil.generateAllocation(30 * step, step, f), + res, minAlloc))); + + // create a chain of 4 RR, mixing gang and non-gang + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(0L); + + rr.setDeadline(70L); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP); + ReservationRequest r = ReservationRequest.newInstance( + Resource.newInstance(2048, 2), 10, 1, 10); + ReservationRequest r2 = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 10, 10, 20); + List list = new ArrayList(); + list.add(r); + list.add(r2); + list.add(r); + list.add(r2); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + + ReservationId reservationID = ReservationSystemTestUtil + .getNewReservationId(); + boolean result = false; + try { + // submit to agent + result = agent.createReservation(reservationID, "u1", plan, rr); + fail(); + } catch (PlanningException p) { + // expected + } + + // validate + assertFalse("Agent-based allocation should have failed", result); + assertTrue("Agent-based allocation should have failed", plan + .getAllReservations().size() == 3); + + System.out + .println("--------AFTER ORDER_NO_GAP IMPOSSIBLE ALLOCATION (queue: " + + reservationID + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + } + + @Test + public void testOrderNoGap() throws PlanningException { + prepareBasicPlan(); + // create a chain of 4 RR, mixing gang and non-gang + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(0 * step); + rr.setDeadline(60 * step); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP); + ReservationRequest r = ReservationRequest.newInstance( + Resource.newInstance(2048, 2), 10, 1, 10 * step); + ReservationRequest r2 = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 10, 10, 20 * step); + List list = new ArrayList(); + list.add(r); + list.add(r2); + list.add(r); + list.add(r2); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + rr.setReservationRequests(reqs); + + // submit to agent + ReservationId reservationID = ReservationSystemTestUtil + .getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr); + + System.out.println("--------AFTER ORDER ALLOCATION (queue: " + + reservationID + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + // validate + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == 3); + + ReservationAllocation cs = plan.getReservationById(reservationID); + + assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1)); + assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1)); + assertTrue(cs.toString(), check(cs, 30 * step, 40 * step, 20, 1024, 1)); + assertTrue(cs.toString(), check(cs, 40 * step, 60 * step, 10, 1024, 1)); + + } + + @Test + public void testSingleSliding() throws PlanningException { + prepareBasicPlan(); + + // create a single request for which we need subsequent (tight) packing. + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(100 * step); + rr.setDeadline(120 * step); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ALL); + ReservationRequest r = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 200, 10, 10 * step); + + List list = new ArrayList(); + list.add(r); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + + // submit to agent + ReservationId reservationID = ReservationSystemTestUtil + .getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr); + + // validate results, we expect the second one to be accepted + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == 3); + + ReservationAllocation cs = plan.getReservationById(reservationID); + + assertTrue(cs.toString(), check(cs, 100 * step, 120 * step, 100, 1024, 1)); + + System.out.println("--------AFTER packed ALLOCATION (queue: " + + reservationID + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + } + + @Test + public void testAny() throws PlanningException { + prepareBasicPlan(); + // create an ANY request, with an impossible step (last in list, first + // considered), + // and two satisfiable ones. We expect the second one to be returned. + + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(100 * step); + rr.setDeadline(120 * step); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ANY); + ReservationRequest r = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 5, 5, 10 * step); + ReservationRequest r2 = ReservationRequest.newInstance( + Resource.newInstance(2048, 2), 10, 5, 10 * step); + ReservationRequest r3 = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 110, 110, 10 * step); + + List list = new ArrayList(); + list.add(r); + list.add(r2); + list.add(r3); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + + // submit to agent + ReservationId reservationID = ReservationSystemTestUtil + .getNewReservationId(); + boolean res = agent.createReservation(reservationID, "u1", plan, rr); + + // validate results, we expect the second one to be accepted + assertTrue("Agent-based allocation failed", res); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == 3); + + ReservationAllocation cs = plan.getReservationById(reservationID); + + assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1)); + + System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID + + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + } + + @Test + public void testAnyImpossible() throws PlanningException { + prepareBasicPlan(); + // create an ANY request, with all impossible alternatives + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(100L); + rr.setDeadline(120L); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ANY); + + // longer than arrival-deadline + ReservationRequest r1 = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 35, 5, 30); + // above max cluster size + ReservationRequest r2 = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 110, 110, 10); + + List list = new ArrayList(); + list.add(r1); + list.add(r2); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + + ReservationId reservationID = ReservationSystemTestUtil + .getNewReservationId(); + boolean result = false; + try { + // submit to agent + result = agent.createReservation(reservationID, "u1", plan, rr); + fail(); + } catch (PlanningException p) { + // expected + } + // validate results, we expect the second one to be accepted + assertFalse("Agent-based allocation should have failed", result); + assertTrue("Agent-based allocation should have failed", plan + .getAllReservations().size() == 2); + + System.out.println("--------AFTER ANY IMPOSSIBLE ALLOCATION (queue: " + + reservationID + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + } + + @Test + public void testAll() throws PlanningException { + prepareBasicPlan(); + // create an ALL request + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(100 * step); + rr.setDeadline(120 * step); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ALL); + ReservationRequest r = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 5, 5, 10 * step); + ReservationRequest r2 = ReservationRequest.newInstance( + Resource.newInstance(2048, 2), 10, 10, 20 * step); + + List list = new ArrayList(); + list.add(r); + list.add(r2); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + + // submit to agent + ReservationId reservationID = ReservationSystemTestUtil + .getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr); + + // validate results, we expect the second one to be accepted + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == 3); + + ReservationAllocation cs = plan.getReservationById(reservationID); + + assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 20, 1024, 1)); + assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 25, 1024, 1)); + + System.out.println("--------AFTER ALL ALLOCATION (queue: " + reservationID + + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + } + + @Test + public void testAllImpossible() throws PlanningException { + prepareBasicPlan(); + // create an ALL request, with an impossible combination, it should be + // rejected, and allocation remain unchanged + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(100L); + rr.setDeadline(120L); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ALL); + ReservationRequest r = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 55, 5, 10); + ReservationRequest r2 = ReservationRequest.newInstance( + Resource.newInstance(2048, 2), 55, 5, 20); + + List list = new ArrayList(); + list.add(r); + list.add(r2); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + + ReservationId reservationID = ReservationSystemTestUtil + .getNewReservationId(); + boolean result = false; + try { + // submit to agent + result = agent.createReservation(reservationID, "u1", plan, rr); + fail(); + } catch (PlanningException p) { + // expected + } + + // validate results, we expect the second one to be accepted + assertFalse("Agent-based allocation failed", result); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == 2); + + System.out.println("--------AFTER ALL IMPOSSIBLE ALLOCATION (queue: " + + reservationID + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + } + + private void prepareBasicPlan() throws PlanningException { + + // insert in the reservation a couple of controlled reservations, to create + // conditions for assignment that are non-empty + + int[] f = { 10, 10, 20, 20, 20, 10, 10 }; + + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil + .generateAllocation(0, step, f), res, minAlloc))); + + int[] f2 = { 5, 5, 5, 5, 5, 5, 5 }; + Map alloc = + ReservationSystemTestUtil.generateAllocation(5000, step, f2); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc))); + + System.out.println("--------BEFORE AGENT----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + } + + private boolean check(ReservationAllocation cs, long start, long end, + int containers, int mem, int cores) { + + boolean res = true; + for (long i = start; i < end; i++) { + res = res + && Resources.equals(cs.getResourcesAtTime(i), + Resource.newInstance(mem * containers, cores * containers)); + } + return res; + } + + public void testStress(int numJobs) throws PlanningException, IOException { + + long timeWindow = 1000000L; + Resource clusterCapacity = Resource.newInstance(500 * 100 * 1024, 500 * 32); + step = 1000L; + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100); + String reservationQ = testUtil.getFullReservationQueueName(); + CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); + capConf.setReservationWindow(reservationQ, timeWindow); + capConf.setMaximumCapacity(reservationQ, 100); + capConf.setAverageCapacity(reservationQ, 100); + CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); + policy.init(reservationQ, capConf, new HashSet()); + + plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent, + clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true); + + int acc = 0; + List list = new ArrayList(); + for (long i = 0; i < numJobs; i++) { + list.add(ReservationSystemTestUtil.generateRandomRR(rand, i)); + } + + long start = System.currentTimeMillis(); + for (int i = 0; i < numJobs; i++) { + + try { + if (agent.createReservation( + ReservationSystemTestUtil.getNewReservationId(), "u" + i % 100, + plan, list.get(i))) { + acc++; + } + } catch (PlanningException p) { + // ignore exceptions + } + } + + long end = System.currentTimeMillis(); + System.out.println("Submitted " + numJobs + " jobs " + " accepted " + acc + + " in " + (end - start) + "ms"); + } + + public static void main(String[] arg) { + + // run a stress test with by default 1000 random jobs + int numJobs = 1000; + if (arg.length > 0) { + numJobs = Integer.parseInt(arg[0]); + } + + try { + TestGreedyReservationAgent test = new TestGreedyReservationAgent(); + test.setup(); + test.testStress(numJobs); + } catch (Exception e) { + e.printStackTrace(); + } + } + +}