diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java index 3afcd47820a..783fd098f9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java @@ -687,4 +687,15 @@ public class InMemoryPlan implements Plan { readLock.unlock(); } } + + @Override + public RLESparseResourceAllocation getCumulativeLoadOverTime( + long start, long end) { + readLock.lock(); + try { + return rleSparseVector.getRangeOverlapping(start, end); + } finally { + readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java index 699f461a52f..27679932a5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java @@ -174,4 +174,13 @@ public interface PlanView extends PlanContext { public RLESparseResourceAllocation getConsumptionForUserOverTime(String user, long start, long end); + /** + * Get the cumulative load over a time interval. + * + * @param start Start of the time interval. + * @param end End of the time interval. + * @return RLE sparse allocation. + */ + RLESparseResourceAllocation getCumulativeLoadOverTime(long start, long end); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java index 00c233389d9..3853f411350 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java @@ -39,6 +39,8 @@ public class AlignedPlannerWithGreedy implements ReservationAgent { public static final int DEFAULT_SMOOTHNESS_FACTOR = 10; public static final String SMOOTHNESS_FACTOR = "yarn.resourcemanager.reservation-system.smoothness-factor"; + private boolean allocateLeft = false; + // Log private static final Logger LOG = LoggerFactory @@ -49,26 +51,31 @@ public class AlignedPlannerWithGreedy implements ReservationAgent { // Constructor public AlignedPlannerWithGreedy() { + } @Override public void init(Configuration conf) { int smoothnessFactor = conf.getInt(SMOOTHNESS_FACTOR, DEFAULT_SMOOTHNESS_FACTOR); + allocateLeft = conf.getBoolean(FAVOR_EARLY_ALLOCATION, + DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION); // List of algorithms List listAlg = new LinkedList(); // LowCostAligned planning algorithm ReservationAgent algAligned = - new IterativePlanner(new StageEarliestStartByDemand(), - new StageAllocatorLowCostAligned(smoothnessFactor), false); + new IterativePlanner(new StageExecutionIntervalByDemand(), + new StageAllocatorLowCostAligned(smoothnessFactor, allocateLeft), + allocateLeft); + listAlg.add(algAligned); // Greedy planning algorithm ReservationAgent algGreedy = - new IterativePlanner(new StageEarliestStartByJobArrival(), - new StageAllocatorGreedy(), false); + new IterativePlanner(new StageExecutionIntervalUnconstrained(), + new StageAllocatorGreedyRLE(allocateLeft), allocateLeft); listAlg.add(algGreedy); // Set planner: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java index 1559b973f51..637a17b0e12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java @@ -47,9 +47,6 @@ public class GreedyReservationAgent implements ReservationAgent { // Greedy planner private ReservationAgent planner; - public final static String GREEDY_FAVOR_EARLY_ALLOCATION = - "yarn.resourcemanager.reservation-system.favor-early-allocation"; - public final static boolean DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION = true; private boolean allocateLeft; public GreedyReservationAgent() { @@ -57,20 +54,20 @@ public class GreedyReservationAgent implements ReservationAgent { @Override public void init(Configuration conf) { - allocateLeft = conf.getBoolean(GREEDY_FAVOR_EARLY_ALLOCATION, + allocateLeft = conf.getBoolean(FAVOR_EARLY_ALLOCATION, DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION); if (allocateLeft) { LOG.info("Initializing the GreedyReservationAgent to favor \"early\"" + " (left) allocations (controlled by parameter: " - + GREEDY_FAVOR_EARLY_ALLOCATION + ")"); + + FAVOR_EARLY_ALLOCATION + ")"); } else { LOG.info("Initializing the GreedyReservationAgent to favor \"late\"" + " (right) allocations (controlled by parameter: " - + GREEDY_FAVOR_EARLY_ALLOCATION + ")"); + + FAVOR_EARLY_ALLOCATION + ")"); } planner = - new IterativePlanner(new StageEarliestStartByJobArrival(), + new IterativePlanner(new StageExecutionIntervalUnconstrained(), new StageAllocatorGreedyRLE(allocateLeft), allocateLeft); } @@ -123,4 +120,4 @@ public class GreedyReservationAgent implements ReservationAgent { } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java index 24d237a7998..83f272e451e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; -import java.util.HashMap; import java.util.HashSet; import java.util.ListIterator; import java.util.Map; @@ -32,26 +31,24 @@ import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.util.resource.Resources; /** * A planning algorithm consisting of two main phases. The algorithm iterates - * over the job stages in descending order. For each stage, the algorithm: 1. - * Determines an interval [stageArrivalTime, stageDeadline) in which the stage - * is allocated. 2. Computes an allocation for the stage inside the interval. - * - * For ANY and ALL jobs, phase 1 sets the allocation window of each stage to be - * [jobArrival, jobDeadline]. For ORDER and ORDER_NO_GAP jobs, the deadline of - * each stage is set as succcessorStartTime - the starting time of its - * succeeding stage (or jobDeadline if it is the last stage). - * - * The phases are set using the two functions: 1. setAlgEarliestStartTime 2. - * setAlgComputeStageAllocation + * over the job stages in ascending/descending order, depending on the flag + * allocateLeft. For each stage, the algorithm: 1. Determines an interval + * [stageArrival, stageDeadline) in which the stage is allocated. 2. Computes an + * allocation for the stage inside the interval. For ANY and ALL jobs, phase 1 + * sets the allocation window of each stage to be [jobArrival, jobDeadline]. For + * ORDER and ORDER_NO_GAP jobs, the deadline of each stage is set as + * succcessorStartTime - the starting time of its succeeding stage (or + * jobDeadline if it is the last stage). The phases are set using the two + * functions: 1. setAlgStageExecutionInterval 2.setAlgStageAllocator */ public class IterativePlanner extends PlanningAlgorithm { @@ -60,7 +57,7 @@ public class IterativePlanner extends PlanningAlgorithm { private RLESparseResourceAllocation planModifications; // Data extracted from plan - private Map planLoads; + private RLESparseResourceAllocation planLoads; private Resource capacity; private long step; @@ -70,16 +67,16 @@ public class IterativePlanner extends PlanningAlgorithm { private long jobDeadline; // Phase algorithms - private StageEarliestStart algStageEarliestStart = null; + private StageExecutionInterval algStageExecutionInterval = null; private StageAllocator algStageAllocator = null; private final boolean allocateLeft; // Constructor - public IterativePlanner(StageEarliestStart algEarliestStartTime, + public IterativePlanner(StageExecutionInterval algStageExecutionInterval, StageAllocator algStageAllocator, boolean allocateLeft) { this.allocateLeft = allocateLeft; - setAlgStageEarliestStart(algEarliestStartTime); + setAlgStageExecutionInterval(algStageExecutionInterval); setAlgStageAllocator(algStageAllocator); } @@ -101,12 +98,6 @@ public class IterativePlanner extends PlanningAlgorithm { // Current stage ReservationRequest currentReservationStage; - // Stage deadlines - long stageDeadline = stepRoundDown(reservation.getDeadline(), step); - long successorStartingTime = -1; - long predecessorEndTime = stepRoundDown(reservation.getArrival(), step); - long stageArrivalTime = -1; - // Iterate the stages in reverse order while (stageProvider.hasNext()) { @@ -116,27 +107,17 @@ public class IterativePlanner extends PlanningAlgorithm { // Validate that the ReservationRequest respects basic constraints validateInputStage(plan, currentReservationStage); - // Compute an adjusted earliestStart for this resource - // (we need this to provision some space for the ORDER contracts) + // Set the stageArrival and stageDeadline + ReservationInterval stageInterval = + setStageExecutionInterval(plan, reservation, currentReservationStage, + allocations); + Long stageArrival = stageInterval.getStartTime(); + Long stageDeadline = stageInterval.getEndTime(); - if (allocateLeft) { - stageArrivalTime = predecessorEndTime; - } else { - stageArrivalTime = reservation.getArrival(); - if (jobType == ReservationRequestInterpreter.R_ORDER - || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) { - stageArrivalTime = - computeEarliestStartingTime(plan, reservation, - stageProvider.getCurrentIndex(), currentReservationStage, - stageDeadline); - } - stageArrivalTime = stepRoundUp(stageArrivalTime, step); - stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival()); - } - // Compute the allocation of a single stage + // Compute stage allocation Map curAlloc = - computeStageAllocation(plan, currentReservationStage, - stageArrivalTime, stageDeadline, user, reservationId); + computeStageAllocation(plan, currentReservationStage, stageArrival, + stageDeadline, user, reservationId); // If we did not find an allocation, return NULL // (unless it's an ANY job, then we simply continue). @@ -152,9 +133,13 @@ public class IterativePlanner extends PlanningAlgorithm { } - // Get the start & end time of the current allocation - Long stageStartTime = findEarliestTime(curAlloc); - Long stageEndTime = findLatestTime(curAlloc); + // Validate ORDER_NO_GAP + if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) { + if (!validateOrderNoGap(allocations, curAlloc, allocateLeft)) { + throw new PlanningException( + "The allocation found does not respect ORDER_NO_GAP"); + } + } // If we did find an allocation for the stage, add it for (Entry entry : curAlloc.entrySet()) { @@ -165,33 +150,6 @@ public class IterativePlanner extends PlanningAlgorithm { if (jobType == ReservationRequestInterpreter.R_ANY) { break; } - - // If ORDER job, set the stageDeadline of the next stage to be processed - if (jobType == ReservationRequestInterpreter.R_ORDER - || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) { - - // CHECK ORDER_NO_GAP - // Verify that there is no gap, in case the job is ORDER_NO_GAP - // note that the test is different left-to-right and right-to-left - if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP - && successorStartingTime != -1 - && ((allocateLeft && predecessorEndTime < stageStartTime) || - (!allocateLeft && (stageEndTime < successorStartingTime)) - ) - || (!isNonPreemptiveAllocation(curAlloc))) { - throw new PlanningException( - "The allocation found does not respect ORDER_NO_GAP"); - } - - if (allocateLeft) { - // Store the stageStartTime and set the new stageDeadline - predecessorEndTime = stageEndTime; - } else { - // Store the stageStartTime and set the new stageDeadline - successorStartingTime = stageStartTime; - stageDeadline = stageStartTime; - } - } } // If the allocation is empty, return an error @@ -200,7 +158,39 @@ public class IterativePlanner extends PlanningAlgorithm { } return allocations; + } + protected static boolean validateOrderNoGap( + RLESparseResourceAllocation allocations, + Map curAlloc, boolean allocateLeft) { + + // Left to right + if (allocateLeft) { + Long stageStartTime = findEarliestTime(curAlloc); + Long allocationEndTime = allocations.getLatestNonNullTime(); + + // Check that there is no gap between stages + if ((allocationEndTime != -1) && (allocationEndTime < stageStartTime)) { + return false; + } + // Right to left + } else { + Long stageEndTime = findLatestTime(curAlloc); + Long allocationStartTime = allocations.getEarliestStartTime(); + + // Check that there is no gap between stages + if ((allocationStartTime != -1) && (stageEndTime < allocationStartTime)) { + return false; + } + } + + // Check that the stage allocation does not violate ORDER_NO_GAP + if (!isNonPreemptiveAllocation(curAlloc)) { + return false; + } + + // The allocation is legal + return true; } protected void initialize(Plan plan, ReservationId reservationId, @@ -223,35 +213,15 @@ public class IterativePlanner extends PlanningAlgorithm { // planLoads are not used by other StageAllocators... and don't deal // well with huge reservation ranges - if (this.algStageAllocator instanceof StageAllocatorLowCostAligned) { - planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline); - ReservationAllocation oldRes = plan.getReservationById(reservationId); - if (oldRes != null) { - planModifications = - RLESparseResourceAllocation.merge(plan.getResourceCalculator(), - plan.getTotalCapacity(), planModifications, - oldRes.getResourcesOverTime(), RLEOperator.subtract, - jobArrival, jobDeadline); - } + planLoads = plan.getCumulativeLoadOverTime(jobArrival, jobDeadline); + ReservationAllocation oldRes = plan.getReservationById(reservationId); + if (oldRes != null) { + planLoads = + RLESparseResourceAllocation.merge(plan.getResourceCalculator(), + plan.getTotalCapacity(), planLoads, + oldRes.getResourcesOverTime(), RLEOperator.subtract, jobArrival, + jobDeadline); } - - } - - private Map getAllLoadsInInterval(Plan plan, long startTime, - long endTime) { - - // Create map - Map loads = new HashMap(); - - // Calculate the load for every time slot between [start,end) - for (long t = startTime; t < endTime; t += step) { - Resource load = plan.getTotalCommittedResources(t); - loads.put(t, load); - } - - // Return map - return loads; - } private void validateInputStage(Plan plan, ReservationRequest rr) @@ -286,7 +256,7 @@ public class IterativePlanner extends PlanningAlgorithm { } - private boolean isNonPreemptiveAllocation( + private static boolean isNonPreemptiveAllocation( Map curAlloc) { // Checks whether a stage allocation is non preemptive or not. @@ -329,14 +299,13 @@ public class IterativePlanner extends PlanningAlgorithm { } - // Call algEarliestStartTime() - protected long computeEarliestStartingTime(Plan plan, - ReservationDefinition reservation, int index, - ReservationRequest currentReservationStage, long stageDeadline) { - - return algStageEarliestStart.setEarliestStartTime(plan, reservation, index, - currentReservationStage, stageDeadline); - + // Call setStageExecutionInterval() + protected ReservationInterval setStageExecutionInterval(Plan plan, + ReservationDefinition reservation, + ReservationRequest currentReservationStage, + RLESparseResourceAllocation allocations) { + return algStageExecutionInterval.computeExecutionInterval(plan, + reservation, currentReservationStage, allocateLeft, allocations); } // Call algStageAllocator @@ -350,10 +319,11 @@ public class IterativePlanner extends PlanningAlgorithm { } - // Set the algorithm: algStageEarliestStart - public IterativePlanner setAlgStageEarliestStart(StageEarliestStart alg) { + // Set the algorithm: algStageExecutionInterval + public IterativePlanner setAlgStageExecutionInterval( + StageExecutionInterval alg) { - this.algStageEarliestStart = alg; + this.algStageExecutionInterval = alg; return this; // To allow concatenation of setAlg() functions } @@ -375,7 +345,7 @@ public class IterativePlanner extends PlanningAlgorithm { private final boolean allocateLeft; - private ListIterator li; + private final ListIterator li; public StageProvider(boolean allocateLeft, ReservationDefinition reservation) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java index 52e7055e0fe..3c448b36088 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java @@ -28,15 +28,26 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan */ public interface ReservationAgent { + /** + * Constant defining the preferential treatment of time for equally valid + * allocations. + */ + final static String FAVOR_EARLY_ALLOCATION = + "yarn.resourcemanager.reservation-system.favor-early-allocation"; + /** + * By default favor early allocations. + */ + final static boolean DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION = true; + /** * Create a reservation for the user that abides by the specified contract - * + * * @param reservationId the identifier of the reservation to be created. * @param user the user who wants to create the reservation * @param plan the Plan to which the reservation must be fitted * @param contract encapsulates the resources the user requires for his * session - * + * * @return whether the create operation was successful or not * @throws PlanningException if the session cannot be fitted into the plan */ @@ -45,13 +56,13 @@ public interface ReservationAgent { /** * Update a reservation for the user that abides by the specified contract - * + * * @param reservationId the identifier of the reservation to be updated * @param user the user who wants to create the session * @param plan the Plan to which the reservation must be fitted * @param contract encapsulates the resources the user requires for his * reservation - * + * * @return whether the update operation was successful or not * @throws PlanningException if the reservation cannot be fitted into the plan */ @@ -60,11 +71,11 @@ public interface ReservationAgent { /** * Delete an user reservation - * + * * @param reservationId the identifier of the reservation to be deleted * @param user the user who wants to create the reservation * @param plan the Plan to which the session must be fitted - * + * * @return whether the delete operation was successful or not * @throws PlanningException if the reservation cannot be fitted into the plan */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java index 750778320b9..7bfc730eb9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java @@ -42,7 +42,7 @@ import com.google.common.annotations.VisibleForTesting; * This (re)planner scan a period of time from now to a maximum time window (or * the end of the last session, whichever comes first) checking the overall * capacity is not violated. - * + * * It greedily removes sessions in reversed order of acceptance (latest accepted * is the first removed). */ @@ -90,8 +90,8 @@ public class SimpleCapacityReplanner implements Planner { // loop on all moment in time from now to the end of the check Zone // or the end of the planned sessions whichever comes first - for (long t = now; - (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); + for (long t = now; + (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); t += plan.getStep()) { Resource excessCap = Resources.subtract(plan.getTotalCommittedResources(t), totCap); @@ -102,7 +102,7 @@ public class SimpleCapacityReplanner implements Planner { new TreeSet(plan.getReservationsAtTime(t)); for (Iterator resIter = curReservations.iterator(); resIter.hasNext() - && Resources.greaterThan(resCalc, totCap, excessCap, + && Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE);) { ReservationAllocation reservation = resIter.next(); plan.deleteReservation(reservation.getReservationId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java index b95f8d4e9d1..ec6d9c0ae7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java @@ -41,19 +41,21 @@ public interface StageAllocator { * @param planModifications the allocations performed by the planning * algorithm which are not yet reflected by plan * @param rr the stage - * @param stageEarliestStart the arrival time (earliest starting time) set for + * @param stageArrival the arrival time (earliest starting time) set for * the stage by the two phase planning algorithm * @param stageDeadline the deadline of the stage set by the two phase * planning algorithm + * @param user name of the user + * @param oldId identifier of the old reservation * * @return The computed allocation (or null if the stage could not be * allocated) * @throws PlanningException */ Map computeStageAllocation(Plan plan, - Map planLoads, + RLESparseResourceAllocation planLoads, RLESparseResourceAllocation planModifications, ReservationRequest rr, - long stageEarliestStart, long stageDeadline, String user, + long stageArrival, long stageDeadline, String user, ReservationId oldId) throws PlanningException; -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java index c8369707c2d..da04336fd20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java @@ -26,8 +26,8 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.util.resource.Resources; @@ -41,7 +41,7 @@ public class StageAllocatorGreedy implements StageAllocator { @Override public Map computeStageAllocation(Plan plan, - Map planLoads, + RLESparseResourceAllocation planLoads, RLESparseResourceAllocation planModifications, ReservationRequest rr, long stageEarliestStart, long stageDeadline, String user, ReservationId oldId) throws PlanningException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java index 5e748fc5553..ec83e02124f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java @@ -29,8 +29,8 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.util.resource.Resources; @@ -52,7 +52,7 @@ public class StageAllocatorGreedyRLE implements StageAllocator { @Override public Map computeStageAllocation(Plan plan, - Map planLoads, + RLESparseResourceAllocation planLoads, RLESparseResourceAllocation planModifications, ReservationRequest rr, long stageEarliestStart, long stageDeadline, String user, ReservationId oldId) throws PlanningException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java index b9fd8e15959..e45f58cef41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java @@ -18,8 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; +import java.util.ArrayList; import java.util.Comparator; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; import java.util.TreeSet; import org.apache.hadoop.yarn.api.records.ReservationId; @@ -27,46 +31,55 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; /** * A stage allocator that iteratively allocates containers in the * {@link DurationInterval} with lowest overall cost. The algorithm only - * considers intervals of the form: [stageDeadline - (n+1)*duration, - * stageDeadline - n*duration) for an integer n. This guarantees that the - * allocations are aligned (as opposed to overlapping duration intervals). - * - * The smoothnessFactor parameter controls the number of containers that are - * simultaneously allocated in each iteration of the algorithm. + * considers non-overlapping intervals of length 'duration'. This guarantees + * that the allocations are aligned. If 'allocateLeft == true', the intervals + * considered by the algorithm are aligned to stageArrival; otherwise, they are + * aligned to stageDeadline. The smoothnessFactor parameter controls the number + * of containers that are simultaneously allocated in each iteration of the + * algorithm. */ public class StageAllocatorLowCostAligned implements StageAllocator { + private final boolean allocateLeft; // Smoothness factor private int smoothnessFactor = 10; // Constructor - public StageAllocatorLowCostAligned() { + public StageAllocatorLowCostAligned(boolean allocateLeft) { + this.allocateLeft = allocateLeft; } // Constructor - public StageAllocatorLowCostAligned(int smoothnessFactor) { + public StageAllocatorLowCostAligned(int smoothnessFactor, + boolean allocateLeft) { + this.allocateLeft = allocateLeft; this.smoothnessFactor = smoothnessFactor; } - // computeJobAllocation() @Override - public Map computeStageAllocation( - Plan plan, Map planLoads, + public Map computeStageAllocation(Plan plan, + RLESparseResourceAllocation planLoads, RLESparseResourceAllocation planModifications, ReservationRequest rr, - long stageEarliestStart, long stageDeadline, String user, - ReservationId oldId) { + long stageArrival, long stageDeadline, String user, ReservationId oldId) + throws PlanningException { // Initialize ResourceCalculator resCalc = plan.getResourceCalculator(); Resource capacity = plan.getTotalCapacity(); + + RLESparseResourceAllocation netRLERes = plan + .getAvailableResourceOverTime(user, oldId, stageArrival, stageDeadline); + long step = plan.getStep(); // Create allocationRequestsearlies @@ -76,16 +89,15 @@ public class StageAllocatorLowCostAligned implements StageAllocator { // Initialize parameters long duration = stepRoundUp(rr.getDuration(), step); int windowSizeInDurations = - (int) ((stageDeadline - stageEarliestStart) / duration); + (int) ((stageDeadline - stageArrival) / duration); int totalGangs = rr.getNumContainers() / rr.getConcurrency(); int numContainersPerGang = rr.getConcurrency(); Resource gang = Resources.multiply(rr.getCapability(), numContainersPerGang); // Set maxGangsPerUnit - int maxGangsPerUnit = - (int) Math.max( - Math.floor(((double) totalGangs) / windowSizeInDurations), 1); + int maxGangsPerUnit = (int) Math + .max(Math.floor(((double) totalGangs) / windowSizeInDurations), 1); maxGangsPerUnit = Math.max(maxGangsPerUnit / smoothnessFactor, 1); // If window size is too small, return null @@ -93,6 +105,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator { return null; } + final int preferLeft = allocateLeft ? 1 : -1; + // Initialize tree sorted by costs TreeSet durationIntervalsSortedByCost = new TreeSet(new Comparator() { @@ -104,23 +118,26 @@ public class StageAllocatorLowCostAligned implements StageAllocator { return cmp; } - return (-1) * Long.compare(val1.getEndTime(), val2.getEndTime()); + return preferLeft + * Long.compare(val1.getEndTime(), val2.getEndTime()); } }); + List intervalEndTimes = + computeIntervalEndTimes(stageArrival, stageDeadline, duration); + // Add durationIntervals that end at (endTime - n*duration) for some n. - for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart - + duration; intervalEnd -= duration) { + for (long intervalEnd : intervalEndTimes) { long intervalStart = intervalEnd - duration; // Get duration interval [intervalStart,intervalEnd) DurationInterval durationInterval = getDurationInterval(intervalStart, intervalEnd, planLoads, - planModifications, capacity, resCalc, step); + planModifications, capacity, netRLERes, resCalc, step, gang); // If the interval can fit a gang, add it to the tree - if (durationInterval.canAllocate(gang, capacity, resCalc)) { + if (durationInterval.canAllocate()) { durationIntervalsSortedByCost.add(durationInterval); } } @@ -139,8 +156,7 @@ public class StageAllocatorLowCostAligned implements StageAllocator { durationIntervalsSortedByCost.first(); int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs); numGangsToAllocate = - Math.min(numGangsToAllocate, - bestDurationInterval.numCanFit(gang, capacity, resCalc)); + Math.min(numGangsToAllocate, bestDurationInterval.numCanFit()); // Add it remainingGangs -= numGangsToAllocate; @@ -148,9 +164,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator { new ReservationInterval(bestDurationInterval.getStartTime(), bestDurationInterval.getEndTime()); - Resource reservationRes = - Resources.multiply(rr.getCapability(), rr.getConcurrency() - * numGangsToAllocate); + Resource reservationRes = Resources.multiply(rr.getCapability(), + rr.getConcurrency() * numGangsToAllocate); planModifications.addInterval(reservationInt, reservationRes); allocationRequests.addInterval(reservationInt, reservationRes); @@ -162,10 +177,10 @@ public class StageAllocatorLowCostAligned implements StageAllocator { DurationInterval updatedDurationInterval = getDurationInterval(bestDurationInterval.getStartTime(), bestDurationInterval.getStartTime() + duration, planLoads, - planModifications, capacity, resCalc, step); + planModifications, capacity, netRLERes, resCalc, step, gang); // Add to tree, if possible - if (updatedDurationInterval.canAllocate(gang, capacity, resCalc)) { + if (updatedDurationInterval.canAllocate()) { durationIntervalsSortedByCost.add(updatedDurationInterval); } @@ -180,10 +195,12 @@ public class StageAllocatorLowCostAligned implements StageAllocator { return allocations; } else { - // If we are here is because we did not manage to satisfy this request. - // We remove unwanted side-effect from planModifications (needed for ANY). - for (Map.Entry tempAllocation - : allocations.entrySet()) { + // If we are here is because we did not manage to satisfy this + // request. + // We remove unwanted side-effect from planModifications (needed for + // ANY). + for (Map.Entry tempAllocation : allocations + .entrySet()) { planModifications.removeInterval(tempAllocation.getKey(), tempAllocation.getValue()); @@ -196,37 +213,144 @@ public class StageAllocatorLowCostAligned implements StageAllocator { } - protected DurationInterval getDurationInterval(long startTime, long endTime, - Map planLoads, - RLESparseResourceAllocation planModifications, Resource capacity, - ResourceCalculator resCalc, long step) { - - // Initialize the dominant loads structure - Resource dominantResources = Resource.newInstance(0, 0); - - // Calculate totalCost and maxLoad - double totalCost = 0.0; - for (long t = startTime; t < endTime; t += step) { - - // Get the load - Resource load = getLoadAtTime(t, planLoads, planModifications); - - // Increase the total cost - totalCost += calcCostOfLoad(load, capacity, resCalc); - - // Update the dominant resources - dominantResources = Resources.componentwiseMax(dominantResources, load); + private List computeIntervalEndTimes(long stageEarliestStart, + long stageDeadline, long duration) { + List intervalEndTimes = new ArrayList(); + if (!allocateLeft) { + for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart + + duration; intervalEnd -= duration) { + intervalEndTimes.add(intervalEnd); + } + } else { + for (long intervalStart = + stageEarliestStart; intervalStart <= stageDeadline + - duration; intervalStart += duration) { + intervalEndTimes.add(intervalStart + duration); + } } - // Return the corresponding durationInterval - return new DurationInterval(startTime, endTime, totalCost, - dominantResources); + return intervalEndTimes; + } + + protected static DurationInterval getDurationInterval(long startTime, + long endTime, RLESparseResourceAllocation planLoads, + RLESparseResourceAllocation planModifications, Resource capacity, + RLESparseResourceAllocation netRLERes, ResourceCalculator resCalc, + long step, Resource requestedResources) throws PlanningException { + + // Get the total cost associated with the duration interval + double totalCost = getDurationIntervalTotalCost(startTime, endTime, + planLoads, planModifications, capacity, resCalc, step); + + // Calculate how many gangs can fit, i.e., how many times can 'capacity' + // be allocated within the duration interval [startTime, endTime) + int gangsCanFit = getDurationIntervalGangsCanFit(startTime, endTime, + planModifications, capacity, netRLERes, resCalc, requestedResources); + + // Return the desired durationInterval + return new DurationInterval(startTime, endTime, totalCost, gangsCanFit); } + protected static double getDurationIntervalTotalCost(long startTime, + long endTime, RLESparseResourceAllocation planLoads, + RLESparseResourceAllocation planModifications, Resource capacity, + ResourceCalculator resCalc, long step) throws PlanningException { + + // Compute the current resource load within the interval [startTime,endTime) + // by adding planLoads (existing load) and planModifications (load that + // corresponds to the current job). + RLESparseResourceAllocation currentLoad = + RLESparseResourceAllocation.merge(resCalc, capacity, planLoads, + planModifications, RLEOperator.add, startTime, endTime); + + // Convert load from RLESparseResourceAllocation to a Map representation + NavigableMap mapCurrentLoad = currentLoad.getCumulative(); + + // Initialize auxiliary variables + double totalCost = 0.0; + Long tPrev = -1L; + Resource loadPrev = Resources.none(); + double cost = 0.0; + + // Iterate over time points. For each point 't', accumulate the total cost + // that corresponds to the interval [tPrev, t). The cost associated within + // this interval is fixed for each of the time steps, therefore the cost of + // a single step is multiplied by (t - tPrev) / step. + for (Entry e : mapCurrentLoad.entrySet()) { + Long t = e.getKey(); + Resource load = e.getValue(); + if (tPrev != -1L) { + tPrev = Math.max(tPrev, startTime); + cost = calcCostOfLoad(loadPrev, capacity, resCalc); + totalCost = totalCost + cost * (t - tPrev) / step; + } + + tPrev = t; + loadPrev = load; + } + + // Add the cost associated with the last interval (the for loop does not + // calculate it). + if (loadPrev != null) { + + // This takes care of the corner case of a single entry + tPrev = Math.max(tPrev, startTime); + cost = calcCostOfLoad(loadPrev, capacity, resCalc); + totalCost = totalCost + cost * (endTime - tPrev) / step; + } + + // Return the overall cost + return totalCost; + } + + protected static int getDurationIntervalGangsCanFit(long startTime, + long endTime, RLESparseResourceAllocation planModifications, + Resource capacity, RLESparseResourceAllocation netRLERes, + ResourceCalculator resCalc, Resource requestedResources) + throws PlanningException { + + // Initialize auxiliary variables + int gangsCanFit = Integer.MAX_VALUE; + int curGangsCanFit; + + // Calculate the total amount of available resources between startTime + // and endTime, by subtracting planModifications from netRLERes + RLESparseResourceAllocation netAvailableResources = + RLESparseResourceAllocation.merge(resCalc, capacity, netRLERes, + planModifications, RLEOperator.subtractTestNonNegative, startTime, + endTime); + + // Convert result to a map + NavigableMap mapAvailableCapacity = + netAvailableResources.getCumulative(); + + // Iterate over the map representation. + // At each point, calculate how many times does 'requestedResources' fit. + // The result is the minimum over all time points. + for (Entry e : mapAvailableCapacity.entrySet()) { + Long t = e.getKey(); + Resource curAvailable = e.getValue(); + if (t >= endTime) { + break; + } + + if (curAvailable == null) { + gangsCanFit = 0; + } else { + curGangsCanFit = (int) Math.floor(Resources.divide(resCalc, capacity, + curAvailable, requestedResources)); + if (curGangsCanFit < gangsCanFit) { + gangsCanFit = curGangsCanFit; + } + } + } + return gangsCanFit; + } + protected double calcCostOfInterval(long startTime, long endTime, - Map planLoads, + RLESparseResourceAllocation planLoads, RLESparseResourceAllocation planModifications, Resource capacity, ResourceCalculator resCalc, long step) { @@ -242,7 +366,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator { } - protected double calcCostOfTimeSlot(long t, Map planLoads, + protected double calcCostOfTimeSlot(long t, + RLESparseResourceAllocation planLoads, RLESparseResourceAllocation planModifications, Resource capacity, ResourceCalculator resCalc) { @@ -254,17 +379,17 @@ public class StageAllocatorLowCostAligned implements StageAllocator { } - protected Resource getLoadAtTime(long t, Map planLoads, + protected Resource getLoadAtTime(long t, + RLESparseResourceAllocation planLoads, RLESparseResourceAllocation planModifications) { - Resource planLoad = planLoads.get(t); - planLoad = (planLoad == null) ? Resource.newInstance(0, 0) : planLoad; + Resource planLoad = planLoads.getCapacityAtTime(t); return Resources.add(planLoad, planModifications.getCapacityAtTime(t)); } - protected double calcCostOfLoad(Resource load, Resource capacity, + protected static double calcCostOfLoad(Resource load, Resource capacity, ResourceCalculator resCalc) { return resCalc.ratio(load, capacity); @@ -289,42 +414,30 @@ public class StageAllocatorLowCostAligned implements StageAllocator { private long startTime; private long endTime; private double cost; - private Resource maxLoad; + private final int gangsCanFit; // Constructor public DurationInterval(long startTime, long endTime, double cost, - Resource maxLoad) { + int gangsCanfit) { this.startTime = startTime; this.endTime = endTime; this.cost = cost; - this.maxLoad = maxLoad; + this.gangsCanFit = gangsCanfit; } // canAllocate() - boolean function, returns whether requestedResources // can be allocated during the durationInterval without // violating capacity constraints - public boolean canAllocate(Resource requestedResources, Resource capacity, - ResourceCalculator resCalc) { - - Resource updatedMaxLoad = Resources.add(maxLoad, requestedResources); - return (resCalc.compare(capacity, updatedMaxLoad, capacity) <= 0); - + public boolean canAllocate() { + return (gangsCanFit > 0); } // numCanFit() - returns the maximal number of requestedResources can be // allocated during the durationInterval without violating // capacity constraints - public int numCanFit(Resource requestedResources, Resource capacity, - ResourceCalculator resCalc) { - - // Represents the largest resource demand that can be satisfied throughout - // the entire DurationInterval (i.e., during [startTime,endTime)) - Resource availableResources = Resources.subtract(capacity, maxLoad); - - // Maximal number of requestedResources that fit inside the interval - return (int) Math.floor(Resources.divide(resCalc, capacity, - availableResources, requestedResources)); + public int numCanFit() { + return gangsCanFit; } public long getStartTime() { @@ -343,14 +456,6 @@ public class StageAllocatorLowCostAligned implements StageAllocator { this.endTime = value; } - public Resource getMaxLoad() { - return this.maxLoad; - } - - public void setMaxLoad(Resource value) { - this.maxLoad = value; - } - public double getTotalCost() { return this.cost; } @@ -359,11 +464,17 @@ public class StageAllocatorLowCostAligned implements StageAllocator { this.cost = value; } + @Override public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(" start: " + startTime).append(" end: " + endTime) - .append(" cost: " + cost).append(" maxLoad: " + maxLoad); + .append(" cost: " + cost).append(" gangsCanFit: " + gangsCanFit); + return sb.toString(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java deleted file mode 100644 index 43d6584c10f..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; - -import java.util.ListIterator; - -import org.apache.hadoop.yarn.api.records.ReservationDefinition; -import org.apache.hadoop.yarn.api.records.ReservationRequest; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; - -/** - * Sets the earliest start time of a stage proportional to the job weight. The - * interval [jobArrival, stageDeadline) is divided as follows. First, each stage - * is guaranteed at least its requested duration. Then, the stage receives a - * fraction of the remaining time. The fraction is calculated as the ratio - * between the weight (total requested resources) of the stage and the total - * weight of all proceeding stages. - */ - -public class StageEarliestStartByDemand implements StageEarliestStart { - - private long step; - - @Override - public long setEarliestStartTime(Plan plan, - ReservationDefinition reservation, int index, ReservationRequest current, - long stageDeadline) { - - step = plan.getStep(); - - // If this is the first stage, don't bother with the computation. - if (index < 1) { - return reservation.getArrival(); - } - - // Get iterator - ListIterator li = - reservation.getReservationRequests().getReservationResources() - .listIterator(index); - ReservationRequest rr; - - // Calculate the total weight & total duration - double totalWeight = calcWeight(current); - long totalDuration = getRoundedDuration(current, plan); - - while (li.hasPrevious()) { - rr = li.previous(); - totalWeight += calcWeight(rr); - totalDuration += getRoundedDuration(rr, plan); - } - - // Compute the weight of the current stage as compared to remaining ones - double ratio = calcWeight(current) / totalWeight; - - // Estimate an early start time, such that: - // 1. Every stage is guaranteed to receive at least its duration - // 2. The remainder of the window is divided between stages - // proportionally to its workload (total memory consumption) - long window = stageDeadline - reservation.getArrival(); - long windowRemainder = window - totalDuration; - long earlyStart = - (long) (stageDeadline - getRoundedDuration(current, plan) - - (windowRemainder * ratio)); - - // Realign if necessary (since we did some arithmetic) - earlyStart = stepRoundUp(earlyStart, step); - - // Return - return earlyStart; - - } - - // Weight = total memory consumption of stage - protected double calcWeight(ReservationRequest stage) { - return (stage.getDuration() * stage.getCapability().getMemorySize()) - * (stage.getNumContainers()); - } - - protected long getRoundedDuration(ReservationRequest stage, Plan plan) { - return stepRoundUp(stage.getDuration(), step); - } - - protected static long stepRoundDown(long t, long step) { - return (t / step) * step; - } - - protected static long stepRoundUp(long t, long step) { - return ((t + step - 1) / step) * step; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java deleted file mode 100644 index 8347816808f..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; - -import org.apache.hadoop.yarn.api.records.ReservationDefinition; -import org.apache.hadoop.yarn.api.records.ReservationRequest; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; - -/** - * Sets the earliest start time of a stage as the job arrival time. - */ -public class StageEarliestStartByJobArrival implements StageEarliestStart { - - @Override - public long setEarliestStartTime(Plan plan, - ReservationDefinition reservation, int index, ReservationRequest current, - long stageDeadline) { - - return reservation.getArrival(); - - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionInterval.java similarity index 63% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionInterval.java index 547616a0d55..8f7f5f758a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionInterval.java @@ -21,26 +21,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; /** - * Interface for setting the earliest start time of a stage in IterativePlanner. + * An auxiliary class used to compute the time interval in which the stage can + * be allocated resources by {@link IterativePlanner}. */ -public interface StageEarliestStart { - +public interface StageExecutionInterval { /** * Computes the earliest allowed starting time for a given stage. * * @param plan the Plan to which the reservation must be fitted * @param reservation the job contract - * @param index the index of the stage in the job contract * @param currentReservationStage the stage - * @param stageDeadline the deadline of the stage set by the two phase - * planning algorithm - * - * @return the earliest allowed starting time for the stage. + * @param allocateLeft is the job allocated from left to right + * @param allocations Existing resource assignments for the job + * @return the time interval in which the stage can get resources. */ - long setEarliestStartTime(Plan plan, ReservationDefinition reservation, - int index, ReservationRequest currentReservationStage, - long stageDeadline); + ReservationInterval computeExecutionInterval(Plan plan, + ReservationDefinition reservation, + ReservationRequest currentReservationStage, boolean allocateLeft, + RLESparseResourceAllocation allocations); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalByDemand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalByDemand.java new file mode 100644 index 00000000000..95f1d4bc1de --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalByDemand.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.IterativePlanner.StageProvider; + +/** + * An implementation of {@link StageExecutionInterval}, which sets the execution + * interval of the stage. For ANY and ALL jobs, the interval is + * [jobArrival,jobDeadline]. For ORDER jobs, the the maximal possible time + * interval is divided as follows: First, each stage is guaranteed at least its + * requested duration. Then, the stage receives a fraction of the remaining + * time. The fraction is calculated as the ratio between the weight (total + * requested resources) of the stage and the total weight of all remaining + * stages. + */ + +public class StageExecutionIntervalByDemand implements StageExecutionInterval { + + private long step; + + @Override + public ReservationInterval computeExecutionInterval(Plan plan, + ReservationDefinition reservation, + ReservationRequest currentReservationStage, boolean allocateLeft, + RLESparseResourceAllocation allocations) { + + // Use StageExecutionIntervalUnconstrained to get the maximal interval + ReservationInterval maxInterval = + (new StageExecutionIntervalUnconstrained()).computeExecutionInterval( + plan, reservation, currentReservationStage, allocateLeft, + allocations); + + ReservationRequestInterpreter jobType = + reservation.getReservationRequests().getInterpreter(); + + // For unconstrained jobs, such as ALL & ANY, we can use the unconstrained + // version + if ((jobType != ReservationRequestInterpreter.R_ORDER) + && (jobType != ReservationRequestInterpreter.R_ORDER_NO_GAP)) { + return maxInterval; + } + + // For ORDER and ORDER_NO_GAP, take a sub-interval of maxInterval + step = plan.getStep(); + + double totalWeight = 0.0; + long totalDuration = 0; + + // Iterate over the stages that haven't been allocated. + // For allocateLeft == True, we iterate in reverse order, starting from the + // last + // stage, until we reach the current stage. + // For allocateLeft == False, we do the opposite. + StageProvider stageProvider = new StageProvider(!allocateLeft, reservation); + + while (stageProvider.hasNext()) { + ReservationRequest rr = stageProvider.next(); + totalWeight += calcWeight(rr); + totalDuration += getRoundedDuration(rr, step); + + // Stop once we reach current + if (rr == currentReservationStage) { + break; + } + } + + // Compute the weight of the current stage as compared to remaining ones + double ratio = calcWeight(currentReservationStage) / totalWeight; + + // Estimate an early start time, such that: + // 1. Every stage is guaranteed to receive at least its duration + // 2. The remainder of the window is divided between stages + // proportionally to its workload (total memory consumption) + long maxIntervalArrival = maxInterval.getStartTime(); + long maxIntervalDeadline = maxInterval.getEndTime(); + long window = maxIntervalDeadline - maxIntervalArrival; + long windowRemainder = window - totalDuration; + + if (allocateLeft) { + long latestEnd = + (long) (maxIntervalArrival + + getRoundedDuration(currentReservationStage, step) + + (windowRemainder * ratio)); + + // Realign if necessary (since we did some arithmetic) + latestEnd = stepRoundDown(latestEnd, step); + + // Return new interval + return new ReservationInterval(maxIntervalArrival, latestEnd); + } else { + long earlyStart = + (long) (maxIntervalDeadline + - getRoundedDuration(currentReservationStage, step) + - (windowRemainder * ratio)); + + // Realign if necessary (since we did some arithmetic) + earlyStart = stepRoundUp(earlyStart, step); + + // Return new interval + return new ReservationInterval(earlyStart, maxIntervalDeadline); + } + } + + // Weight = total memory consumption of stage + protected double calcWeight(ReservationRequest stage) { + return (stage.getDuration() * stage.getCapability().getMemorySize()) + * (stage.getNumContainers()); + } + + protected long getRoundedDuration(ReservationRequest stage, Long s) { + return stepRoundUp(stage.getDuration(), s); + } + + protected static long stepRoundDown(long t, long s) { + return (t / s) * s; + } + + protected static long stepRoundUp(long t, long s) { + return ((t + s - 1) / s) * s; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalUnconstrained.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalUnconstrained.java new file mode 100644 index 00000000000..cccd9d88e28 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalUnconstrained.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; + +/** + * An implementation of {@link StageExecutionInterval} which gives each stage + * the maximal possible time interval, given the job constraints. Specifically, + * for ANY and ALL jobs, the interval would be [jobArrival, jobDeadline). For + * ORDER jobs, the stage cannot start before its predecessors (if allocateLeft + * == true) or cannot end before its successors (if allocateLeft == false) + */ +public class StageExecutionIntervalUnconstrained implements + StageExecutionInterval { + + @Override + public ReservationInterval computeExecutionInterval(Plan plan, + ReservationDefinition reservation, + ReservationRequest currentReservationStage, boolean allocateLeft, + RLESparseResourceAllocation allocations) { + + Long stageArrival = reservation.getArrival(); + Long stageDeadline = reservation.getDeadline(); + + ReservationRequestInterpreter jobType = + reservation.getReservationRequests().getInterpreter(); + + // Left to right + if (allocateLeft) { + // If ORDER job, change the stage arrival time + if ((jobType == ReservationRequestInterpreter.R_ORDER) + || (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP)) { + Long allocationEndTime = allocations.getLatestNonNullTime(); + if (allocationEndTime != -1) { + stageArrival = allocationEndTime; + } + } + // Right to left + } else { + // If ORDER job, change the stage deadline + if ((jobType == ReservationRequestInterpreter.R_ORDER) + || (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP)) { + Long allocationStartTime = allocations.getEarliestStartTime(); + if (allocationStartTime != -1) { + stageDeadline = allocationStartTime; + } + } + } + return new ReservationInterval(stageArrival, stageDeadline); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java index 5566dc7ad33..7207d7169af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java @@ -18,13 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import org.apache.hadoop.yarn.api.records.ReservationDefinition; @@ -37,29 +41,32 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy; import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.StageAllocatorLowCostAligned.DurationInterval; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import org.eclipse.jetty.util.log.Log; import org.junit.Before; import org.junit.Test; -import org.mortbay.log.Log; public class TestAlignedPlanner { - ReservationAgent agent; - InMemoryPlan plan; - Resource minAlloc = Resource.newInstance(1024, 1); - ResourceCalculator res = new DefaultResourceCalculator(); - Resource maxAlloc = Resource.newInstance(1024 * 8, 8); - Random rand = new Random(); - long step; + private ReservationAgent agentRight; + private ReservationAgent agentLeft; + private InMemoryPlan plan; + private final Resource minAlloc = Resource.newInstance(1024, 1); + private final ResourceCalculator res = new DefaultResourceCalculator(); + private final Resource maxAlloc = Resource.newInstance(1024 * 8, 8); + private final Random rand = new Random(); + private Resource clusterCapacity; + private long step; @Test public void testSingleReservationAccept() throws PlanningException { @@ -82,7 +89,7 @@ public class TestAlignedPlanner { // Add reservation ReservationId reservationID = ReservationSystemTestUtil.getNewReservationId(); - agent.createReservation(reservationID, "u1", plan, rr1); + agentRight.createReservation(reservationID, "u1", plan, rr1); // CHECK: allocation was accepted assertTrue("Agent-based allocation failed", reservationID != null); @@ -107,7 +114,7 @@ public class TestAlignedPlanner { // Create reservation ReservationDefinition rr1 = createReservationDefinition( - 10L, // Job arrival time + 10 * step, // Job arrival time 15 * step, // Job deadline new ReservationRequest[] { ReservationRequest.newInstance( @@ -126,7 +133,7 @@ public class TestAlignedPlanner { try { ReservationId reservationID = ReservationSystemTestUtil.getNewReservationId(); - agent.createReservation(reservationID, "u1", plan, rr1); + agentRight.createReservation(reservationID, "u1", plan, rr1); fail(); } catch (PlanningException e) { // Expected failure @@ -166,7 +173,7 @@ public class TestAlignedPlanner { try { ReservationId reservationID = ReservationSystemTestUtil.getNewReservationId(); - agent.createReservation(reservationID, "u1", plan, rr1); + agentRight.createReservation(reservationID, "u1", plan, rr1); fail(); } catch (PlanningException e) { // Expected failure @@ -206,7 +213,7 @@ public class TestAlignedPlanner { try { ReservationId reservationID = ReservationSystemTestUtil.getNewReservationId(); - agent.createReservation(reservationID, "u1", plan, rr1); + agentRight.createReservation(reservationID, "u1", plan, rr1); fail(); } catch (PlanningException e) { // Expected failure @@ -246,7 +253,7 @@ public class TestAlignedPlanner { try { ReservationId reservationID = ReservationSystemTestUtil.getNewReservationId(); - agent.createReservation(reservationID, "u1", plan, rr1); + agentRight.createReservation(reservationID, "u1", plan, rr1); fail(); } catch (PlanningException e) { // Expected failure @@ -285,7 +292,7 @@ public class TestAlignedPlanner { // Add reservation ReservationId reservationID = ReservationSystemTestUtil.getNewReservationId(); - agent.createReservation(reservationID, "u1", plan, rr1); + agentRight.createReservation(reservationID, "u1", plan, rr1); // CHECK: allocation was accepted assertTrue("Agent-based allocation failed", reservationID != null); @@ -328,7 +335,7 @@ public class TestAlignedPlanner { // Add reservation ReservationId reservationID = ReservationSystemTestUtil.getNewReservationId(); - agent.createReservation(reservationID, "u1", plan, rr1); + agentRight.createReservation(reservationID, "u1", plan, rr1); // CHECK: allocation was accepted assertTrue("Agent-based allocation failed", reservationID != null); @@ -374,7 +381,7 @@ public class TestAlignedPlanner { try { ReservationId reservationID = ReservationSystemTestUtil.getNewReservationId(); - agent.createReservation(reservationID, "u1", plan, rr1); + agentRight.createReservation(reservationID, "u1", plan, rr1); fail(); } catch (PlanningException e) { // Expected failure @@ -420,10 +427,10 @@ public class TestAlignedPlanner { ReservationSystemTestUtil.getNewReservationId(); // Add block, add flex, remove block, update flex - agent.createReservation(blockReservationID, "uBlock", plan, rrBlock); - agent.createReservation(flexReservationID, "uFlex", plan, rrFlex); - agent.deleteReservation(blockReservationID, "uBlock", plan); - agent.updateReservation(flexReservationID, "uFlex", plan, rrFlex); + agentRight.createReservation(blockReservationID, "uBlock", plan, rrBlock); + agentRight.createReservation(flexReservationID, "uFlex", plan, rrFlex); + agentRight.deleteReservation(blockReservationID, "uBlock", plan); + agentRight.updateReservation(flexReservationID, "uFlex", plan, rrFlex); // CHECK: allocation was accepted assertTrue("Agent-based allocation failed", flexReservationID != null); @@ -458,7 +465,7 @@ public class TestAlignedPlanner { try { ReservationId reservationID = ReservationSystemTestUtil.getNewReservationId(); - agent.createReservation(reservationID, "u1", plan, rr1); + agentRight.createReservation(reservationID, "u1", plan, rr1); fail(); } catch (PlanningException e) { // Expected failure @@ -490,7 +497,7 @@ public class TestAlignedPlanner { // Add reservation ReservationId reservationID = ReservationSystemTestUtil.getNewReservationId(); - agent.createReservation(reservationID, "u1", plan, rr1); + agentRight.createReservation(reservationID, "u1", plan, rr1); // CHECK: allocation was accepted assertTrue("Agent-based allocation failed", reservationID != null); @@ -557,9 +564,9 @@ public class TestAlignedPlanner { ReservationSystemTestUtil.getNewReservationId(); // Add all - agent.createReservation(reservationID1, "u1", plan, rr7Mem1Core); - agent.createReservation(reservationID2, "u2", plan, rr6Mem6Cores); - agent.createReservation(reservationID3, "u3", plan, rr); + agentRight.createReservation(reservationID1, "u1", plan, rr7Mem1Core); + agentRight.createReservation(reservationID2, "u2", plan, rr6Mem6Cores); + agentRight.createReservation(reservationID3, "u3", plan, rr); // Get reservation ReservationAllocation alloc3 = plan.getReservationById(reservationID3); @@ -684,8 +691,8 @@ public class TestAlignedPlanner { for (ReservationDefinition rr : list) { ReservationId reservationID = ReservationSystemTestUtil.getNewReservationId(); - agent.createReservation(reservationID, "u" + Integer.toString(i), plan, - rr); + agentRight.createReservation(reservationID, "u" + Integer.toString(i), + plan, rr); ++i; } @@ -695,13 +702,335 @@ public class TestAlignedPlanner { } + @Test + public void testSingleReservationAcceptAllocateLeft() + throws PlanningException { + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10 * step, // Job arrival time + 35 * step, // Job deadline + new ReservationRequest[] { + ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + 10 * step), // Duration + ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + 10 * step) }, // Duration + ReservationRequestInterpreter.R_ORDER, "u1"); + + // Add reservation + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agentLeft.createReservation(reservationID, "u1", plan, rr1); + + // CHECK: allocation was accepted + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == 1); + + // Get reservation + ReservationAllocation alloc1 = plan.getReservationById(reservationID); + + // Verify allocation + assertTrue(alloc1.toString(), + check(alloc1, 10 * step, 30 * step, 20, 1024, 1)); + + } + + @Test + public void testLeftSucceedsRightFails() throws PlanningException { + + // Prepare basic plan + int numJobsInScenario = initializeScenario2(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 7 * step, // Job arrival time + 16 * step, // Job deadline + new ReservationRequest[] { + ReservationRequest.newInstance(Resource.newInstance(1024, 1), + 20, // Num containers + 20, // Concurrency + 2 * step), // Duration + ReservationRequest.newInstance(Resource.newInstance(1024, 1), + 20, // Num containers + 20, // Concurrency + 2 * step) }, // Duration + ReservationRequestInterpreter.R_ORDER, "u1"); + + ReservationDefinition rr2 = + createReservationDefinition( + 14 * step, // Job arrival time + 16 * step, // Job deadline + new ReservationRequest[] { + ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 100, // Num containers + 100, // Concurrency + 2 * step) }, // Duration + ReservationRequestInterpreter.R_ORDER, "u2"); + + // Add 1st reservation + ReservationId reservationID1 = + ReservationSystemTestUtil.getNewReservationId(); + agentLeft.createReservation(reservationID1, "u1", plan, rr1); + + // CHECK: allocation was accepted + assertTrue("Agent-based allocation failed", reservationID1 != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == numJobsInScenario + 1); + + // Get reservation + ReservationAllocation alloc1 = plan.getReservationById(reservationID1); + + // Verify allocation + assertTrue(alloc1.toString(), + check(alloc1, 7 * step, 11 * step, 20, 1024, 1)); + + // Add second reservation + ReservationId reservationID2 = + ReservationSystemTestUtil.getNewReservationId(); + agentLeft.createReservation(reservationID2, "u2", plan, rr2); + + // CHECK: allocation was accepted + assertTrue("Agent-based allocation failed", reservationID2 != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == numJobsInScenario + 2); + + // Get reservation + ReservationAllocation alloc2 = plan.getReservationById(reservationID2); + + // Verify allocation + assertTrue(alloc2.toString(), + check(alloc2, 14 * step, 16 * step, 100, 1024, 1)); + + agentLeft.deleteReservation(reservationID1, "u1", plan); + agentLeft.deleteReservation(reservationID2, "u2", plan); + + // Now try to allocate the same jobs with agentRight. The second + // job should fail + // Add 1st reservation + ReservationId reservationID3 = + ReservationSystemTestUtil.getNewReservationId(); + agentRight.createReservation(reservationID3, "u1", plan, rr1); + + // CHECK: allocation was accepted + assertTrue("Agent-based allocation failed", reservationID3 != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == numJobsInScenario + 1); + + // Add 2nd reservation + try { + ReservationId reservationID4 = + ReservationSystemTestUtil.getNewReservationId(); + agentRight.createReservation(reservationID4, "u2", plan, rr2); + fail(); + } catch (PlanningException e) { + // Expected failure + } + + } + + @Test + public void testValidateOrderNoGap() { + + // + // Initialize allocations + // + + RLESparseResourceAllocation allocation = + new RLESparseResourceAllocation(res); + allocation.addInterval(new ReservationInterval(10 * step, 13 * step), + Resource.newInstance(1024, 1)); + + // curAlloc + Map curAlloc = + new HashMap(); + + // + // Check cases + // + + // 1. allocateLeft = false, succeed when there is no gap + curAlloc.clear(); + curAlloc.put(new ReservationInterval(9 * step, 10 * step), + Resource.newInstance(1024, 1)); + assertTrue("validateOrderNoFap() should have suceeded", + IterativePlanner.validateOrderNoGap(allocation, curAlloc, false)); + + // 2. allocateLeft = false, fail when curAlloc has a gap + curAlloc.put(new ReservationInterval(7 * step, 8 * step), + Resource.newInstance(1024, 1)); + assertFalse("validateOrderNoGap() failed to identify a gap in curAlloc", + IterativePlanner.validateOrderNoGap(allocation, curAlloc, false)); + + // 3. allocateLeft = false, fail when there is a gap between curAlloc and + // allocations + curAlloc.clear(); + curAlloc.put(new ReservationInterval(8 * step, 9 * step), + Resource.newInstance(1024, 1)); + assertFalse("validateOrderNoGap() failed to identify a gap between " + + "allocations and curAlloc", + IterativePlanner.validateOrderNoGap(allocation, curAlloc, false)); + + // 4. allocateLeft = true, succeed when there is no gap + curAlloc.clear(); + curAlloc.put(new ReservationInterval(13 * step, 14 * step), + Resource.newInstance(1024, 1)); + assertTrue("validateOrderNoFap() should have suceeded", + IterativePlanner.validateOrderNoGap(allocation, curAlloc, true)); + + // 5. allocateLeft = true, fail when there is a gap between curAlloc and + // allocations + curAlloc.put(new ReservationInterval(15 * step, 16 * step), + Resource.newInstance(1024, 1)); + assertFalse("validateOrderNoGap() failed to identify a gap in curAlloc", + IterativePlanner.validateOrderNoGap(allocation, curAlloc, true)); + + // 6. allocateLeft = true, fail when curAlloc has a gap + curAlloc.clear(); + curAlloc.put(new ReservationInterval(14 * step, 15 * step), + Resource.newInstance(1024, 1)); + assertFalse("validateOrderNoGap() failed to identify a gap between " + + "allocations and curAlloc", + IterativePlanner.validateOrderNoGap(allocation, curAlloc, true)); + + } + + @Test + public void testGetDurationInterval() throws PlanningException { + + DurationInterval durationInterval = null; + + // Create netRLERes: + // - 4GB & 4VC between [10,20) and [30,40) + // - 8GB & 8VC between [20,30) + RLESparseResourceAllocation netRLERes = + new RLESparseResourceAllocation(res); + netRLERes.addInterval( + new ReservationInterval(10 * step, 40 * step), + Resource.newInstance(4096, 4) + ); + netRLERes.addInterval( + new ReservationInterval(20 * step, 30 * step), + Resource.newInstance(4096, 4) + ); + + // Create planLoads: + // - 5GB & 5VC between [20,30) + RLESparseResourceAllocation planLoads = + new RLESparseResourceAllocation(res); + planLoads.addInterval( + new ReservationInterval(20 * step, 30 * step), + Resource.newInstance(5120, 5) + ); + + // Create planModifications: + // - 1GB & 1VC between [25,35) + RLESparseResourceAllocation planModifications = + new RLESparseResourceAllocation(res); + planModifications.addInterval( + new ReservationInterval(25 * step, 35 * step), + Resource.newInstance(1024, 1) + ); + + // Set requested resources + Resource requestedResources = Resource.newInstance(1024, 1); + + + // 1. + // currLoad: should start at 20*step, end at 30*step with a null value + // (in getTotalCost(), after the for loop we will have loadPrev == null + // netAvailableResources: should start exactly at startTime (10*step), + // end exactly at endTime (30*step) with a null value + durationInterval = + StageAllocatorLowCostAligned.getDurationInterval(10*step, 30*step, + planLoads, planModifications, clusterCapacity, netRLERes, res, step, + requestedResources); + assertEquals(durationInterval.numCanFit(), 4); + assertEquals(durationInterval.getTotalCost(), 0.55, 0.00001); + + // 2. + // currLoad: should start at 20*step, end at 31*step with a null value + // (in getTotalCost, after the for loop we will have loadPrev == null) + // netAvailableResources: should start exactly at startTime (10*step), + // end exactly at endTime (31*step) with a null value + durationInterval = + StageAllocatorLowCostAligned.getDurationInterval(10*step, 31*step, + planLoads, planModifications, clusterCapacity, netRLERes, res, step, + requestedResources); + System.out.println(durationInterval); + assertEquals(durationInterval.numCanFit(), 3); + assertEquals(durationInterval.getTotalCost(), 0.56, 0.00001); + + // 3. + // currLoad: should start at 20*step, end at 30*step with a null value + // (in getTotalCost, after the for loop we will have loadPrev == null) + // netAvailableResources: should start exactly startTime (15*step), + // end exactly at endTime (30*step) with a null value + durationInterval = + StageAllocatorLowCostAligned.getDurationInterval(15*step, 30*step, + planLoads, planModifications, clusterCapacity, netRLERes, res, step, + requestedResources); + assertEquals(durationInterval.numCanFit(), 4); + assertEquals(durationInterval.getTotalCost(), 0.55, 0.00001); + + // 4. + // currLoad: should start at 20*step, end at 31*step with a null value + // (in getTotalCost, after the for loop we will have loadPrev == null) + // netAvailableResources: should start exactly at startTime (15*step), + // end exactly at endTime (31*step) with a value other than null + durationInterval = + StageAllocatorLowCostAligned.getDurationInterval(15*step, 31*step, + planLoads, planModifications, clusterCapacity, netRLERes, res, step, + requestedResources); + System.out.println(durationInterval); + assertEquals(durationInterval.numCanFit(), 3); + assertEquals(durationInterval.getTotalCost(), 0.56, 0.00001); + + // 5. + // currLoad: should only contain one entry at startTime + // (22*step), therefore loadPrev != null and we should enter the if + // condition after the for loop in getTotalCost + // netAvailableResources: should only contain one entry at startTime + // (22*step) + durationInterval = + StageAllocatorLowCostAligned.getDurationInterval(22*step, 23*step, + planLoads, planModifications, clusterCapacity, netRLERes, res, step, + requestedResources); + System.out.println(durationInterval); + assertEquals(durationInterval.numCanFit(), 8); + assertEquals(durationInterval.getTotalCost(), 0.05, 0.00001); + + // 6. + // currLoad: should start at 39*step, end at 41*step with a null value + // (in getTotalCost, after the for loop we will have loadPrev == null) + // netAvailableResources: should start exactly at startTime (39*step), + // end exactly at endTime (41*step) with a null value + durationInterval = + StageAllocatorLowCostAligned.getDurationInterval(39*step, 41*step, + planLoads, planModifications, clusterCapacity, netRLERes, res, step, + requestedResources); + System.out.println(durationInterval); + assertEquals(durationInterval.numCanFit(), 0); + assertEquals(durationInterval.getTotalCost(), 0, 0.00001); + + } + @Before public void setup() throws Exception { // Initialize random seed long seed = rand.nextLong(); rand.setSeed(seed); - Log.info("Running with seed: " + seed); + Log.getLog().info("Running with seed: " + seed); // Set cluster parameters long timeWindow = 1000000L; @@ -709,16 +1038,15 @@ public class TestAlignedPlanner { int capacityCores = 100; step = 60000L; - Resource clusterCapacity = Resource.newInstance(capacityMem, capacityCores); + clusterCapacity = Resource.newInstance(capacityMem, capacityCores); String reservationQ = ReservationSystemTestUtil.getFullReservationQueueName(); float instConstraint = 100; float avgConstraint = 100; - ReservationSchedulerConfiguration conf = - ReservationSystemTestUtil.createConf(reservationQ, timeWindow, - instConstraint, avgConstraint); + ReservationSchedulerConfiguration conf = ReservationSystemTestUtil + .createConf(reservationQ, timeWindow, instConstraint, avgConstraint); CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); policy.init(reservationQ, conf); @@ -728,14 +1056,19 @@ public class TestAlignedPlanner { conf.setInt(AlignedPlannerWithGreedy.SMOOTHNESS_FACTOR, AlignedPlannerWithGreedy.DEFAULT_SMOOTHNESS_FACTOR); + conf.setBoolean(ReservationAgent.FAVOR_EARLY_ALLOCATION, false); + // Set planning agent - agent = new AlignedPlannerWithGreedy(); - agent.init(conf); + agentRight = new AlignedPlannerWithGreedy(); + agentRight.init(conf); + + conf.setBoolean(ReservationAgent.FAVOR_EARLY_ALLOCATION, true); + agentLeft = new AlignedPlannerWithGreedy(); + agentLeft.init(conf); // Create Plan - plan = - new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step, - res, minAlloc, maxAlloc, "dedicated", null, true, context); + plan = new InMemoryPlan(queueMetrics, policy, agentRight, clusterCapacity, + step, res, minAlloc, maxAlloc, "dedicated", null, true, context); } private int initializeScenario1() throws PlanningException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java index 7988706f0ca..46bfa80e23a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -55,12 +55,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import org.eclipse.jetty.util.log.Log; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import org.mortbay.log.Log; @RunWith(Parameterized.class) public class TestGreedyReservationAgent { @@ -89,7 +89,7 @@ public class TestGreedyReservationAgent { long seed = rand.nextLong(); rand.setSeed(seed); - Log.info("Running with seed: " + seed); + Log.getLog().info("Running with seed: " + seed); // setting completely loose quotas long timeWindow = 1000000L; @@ -108,7 +108,7 @@ public class TestGreedyReservationAgent { policy.init(reservationQ, conf); // setting conf to - conf.setBoolean(GreedyReservationAgent.GREEDY_FAVOR_EARLY_ALLOCATION, + conf.setBoolean(GreedyReservationAgent.FAVOR_EARLY_ALLOCATION, allocateLeft); agent = new GreedyReservationAgent(); agent.init(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java index e01608c9651..c4f94c281eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.