YARN-4359. Update LowCost agents logic to take advantage of YARN-4358. (Jonathan Yaniv and Ishai Menache via Subru).

(cherry picked from commit a3a615eeab)
This commit is contained in:
Subru Krishnan 2017-05-01 16:01:07 -07:00
parent 3e7f97fb0b
commit 0eae1c6368
19 changed files with 960 additions and 436 deletions

View File

@ -687,4 +687,15 @@ public class InMemoryPlan implements Plan {
readLock.unlock(); readLock.unlock();
} }
} }
@Override
public RLESparseResourceAllocation getCumulativeLoadOverTime(
long start, long end) {
readLock.lock();
try {
return rleSparseVector.getRangeOverlapping(start, end);
} finally {
readLock.unlock();
}
}
} }

View File

@ -174,4 +174,13 @@ public interface PlanView extends PlanContext {
public RLESparseResourceAllocation getConsumptionForUserOverTime(String user, public RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
long start, long end); long start, long end);
/**
* Get the cumulative load over a time interval.
*
* @param start Start of the time interval.
* @param end End of the time interval.
* @return RLE sparse allocation.
*/
RLESparseResourceAllocation getCumulativeLoadOverTime(long start, long end);
} }

View File

@ -39,6 +39,8 @@ public class AlignedPlannerWithGreedy implements ReservationAgent {
public static final int DEFAULT_SMOOTHNESS_FACTOR = 10; public static final int DEFAULT_SMOOTHNESS_FACTOR = 10;
public static final String SMOOTHNESS_FACTOR = public static final String SMOOTHNESS_FACTOR =
"yarn.resourcemanager.reservation-system.smoothness-factor"; "yarn.resourcemanager.reservation-system.smoothness-factor";
private boolean allocateLeft = false;
// Log // Log
private static final Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory
@ -49,26 +51,31 @@ public class AlignedPlannerWithGreedy implements ReservationAgent {
// Constructor // Constructor
public AlignedPlannerWithGreedy() { public AlignedPlannerWithGreedy() {
} }
@Override @Override
public void init(Configuration conf) { public void init(Configuration conf) {
int smoothnessFactor = int smoothnessFactor =
conf.getInt(SMOOTHNESS_FACTOR, DEFAULT_SMOOTHNESS_FACTOR); conf.getInt(SMOOTHNESS_FACTOR, DEFAULT_SMOOTHNESS_FACTOR);
allocateLeft = conf.getBoolean(FAVOR_EARLY_ALLOCATION,
DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION);
// List of algorithms // List of algorithms
List<ReservationAgent> listAlg = new LinkedList<ReservationAgent>(); List<ReservationAgent> listAlg = new LinkedList<ReservationAgent>();
// LowCostAligned planning algorithm // LowCostAligned planning algorithm
ReservationAgent algAligned = ReservationAgent algAligned =
new IterativePlanner(new StageEarliestStartByDemand(), new IterativePlanner(new StageExecutionIntervalByDemand(),
new StageAllocatorLowCostAligned(smoothnessFactor), false); new StageAllocatorLowCostAligned(smoothnessFactor, allocateLeft),
allocateLeft);
listAlg.add(algAligned); listAlg.add(algAligned);
// Greedy planning algorithm // Greedy planning algorithm
ReservationAgent algGreedy = ReservationAgent algGreedy =
new IterativePlanner(new StageEarliestStartByJobArrival(), new IterativePlanner(new StageExecutionIntervalUnconstrained(),
new StageAllocatorGreedy(), false); new StageAllocatorGreedyRLE(allocateLeft), allocateLeft);
listAlg.add(algGreedy); listAlg.add(algGreedy);
// Set planner: // Set planner:

View File

@ -47,9 +47,6 @@ public class GreedyReservationAgent implements ReservationAgent {
// Greedy planner // Greedy planner
private ReservationAgent planner; private ReservationAgent planner;
public final static String GREEDY_FAVOR_EARLY_ALLOCATION =
"yarn.resourcemanager.reservation-system.favor-early-allocation";
public final static boolean DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION = true;
private boolean allocateLeft; private boolean allocateLeft;
public GreedyReservationAgent() { public GreedyReservationAgent() {
@ -57,20 +54,20 @@ public class GreedyReservationAgent implements ReservationAgent {
@Override @Override
public void init(Configuration conf) { public void init(Configuration conf) {
allocateLeft = conf.getBoolean(GREEDY_FAVOR_EARLY_ALLOCATION, allocateLeft = conf.getBoolean(FAVOR_EARLY_ALLOCATION,
DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION); DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION);
if (allocateLeft) { if (allocateLeft) {
LOG.info("Initializing the GreedyReservationAgent to favor \"early\"" LOG.info("Initializing the GreedyReservationAgent to favor \"early\""
+ " (left) allocations (controlled by parameter: " + " (left) allocations (controlled by parameter: "
+ GREEDY_FAVOR_EARLY_ALLOCATION + ")"); + FAVOR_EARLY_ALLOCATION + ")");
} else { } else {
LOG.info("Initializing the GreedyReservationAgent to favor \"late\"" LOG.info("Initializing the GreedyReservationAgent to favor \"late\""
+ " (right) allocations (controlled by parameter: " + " (right) allocations (controlled by parameter: "
+ GREEDY_FAVOR_EARLY_ALLOCATION + ")"); + FAVOR_EARLY_ALLOCATION + ")");
} }
planner = planner =
new IterativePlanner(new StageEarliestStartByJobArrival(), new IterativePlanner(new StageExecutionIntervalUnconstrained(),
new StageAllocatorGreedyRLE(allocateLeft), allocateLeft); new StageAllocatorGreedyRLE(allocateLeft), allocateLeft);
} }
@ -123,4 +120,4 @@ public class GreedyReservationAgent implements ReservationAgent {
} }
} }

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Map; import java.util.Map;
@ -32,26 +31,24 @@ import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
/** /**
* A planning algorithm consisting of two main phases. The algorithm iterates * A planning algorithm consisting of two main phases. The algorithm iterates
* over the job stages in descending order. For each stage, the algorithm: 1. * over the job stages in ascending/descending order, depending on the flag
* Determines an interval [stageArrivalTime, stageDeadline) in which the stage * allocateLeft. For each stage, the algorithm: 1. Determines an interval
* is allocated. 2. Computes an allocation for the stage inside the interval. * [stageArrival, stageDeadline) in which the stage is allocated. 2. Computes an
* * allocation for the stage inside the interval. For ANY and ALL jobs, phase 1
* For ANY and ALL jobs, phase 1 sets the allocation window of each stage to be * sets the allocation window of each stage to be [jobArrival, jobDeadline]. For
* [jobArrival, jobDeadline]. For ORDER and ORDER_NO_GAP jobs, the deadline of * ORDER and ORDER_NO_GAP jobs, the deadline of each stage is set as
* each stage is set as succcessorStartTime - the starting time of its * succcessorStartTime - the starting time of its succeeding stage (or
* succeeding stage (or jobDeadline if it is the last stage). * jobDeadline if it is the last stage). The phases are set using the two
* * functions: 1. setAlgStageExecutionInterval 2.setAlgStageAllocator
* The phases are set using the two functions: 1. setAlgEarliestStartTime 2.
* setAlgComputeStageAllocation
*/ */
public class IterativePlanner extends PlanningAlgorithm { public class IterativePlanner extends PlanningAlgorithm {
@ -60,7 +57,7 @@ public class IterativePlanner extends PlanningAlgorithm {
private RLESparseResourceAllocation planModifications; private RLESparseResourceAllocation planModifications;
// Data extracted from plan // Data extracted from plan
private Map<Long, Resource> planLoads; private RLESparseResourceAllocation planLoads;
private Resource capacity; private Resource capacity;
private long step; private long step;
@ -70,16 +67,16 @@ public class IterativePlanner extends PlanningAlgorithm {
private long jobDeadline; private long jobDeadline;
// Phase algorithms // Phase algorithms
private StageEarliestStart algStageEarliestStart = null; private StageExecutionInterval algStageExecutionInterval = null;
private StageAllocator algStageAllocator = null; private StageAllocator algStageAllocator = null;
private final boolean allocateLeft; private final boolean allocateLeft;
// Constructor // Constructor
public IterativePlanner(StageEarliestStart algEarliestStartTime, public IterativePlanner(StageExecutionInterval algStageExecutionInterval,
StageAllocator algStageAllocator, boolean allocateLeft) { StageAllocator algStageAllocator, boolean allocateLeft) {
this.allocateLeft = allocateLeft; this.allocateLeft = allocateLeft;
setAlgStageEarliestStart(algEarliestStartTime); setAlgStageExecutionInterval(algStageExecutionInterval);
setAlgStageAllocator(algStageAllocator); setAlgStageAllocator(algStageAllocator);
} }
@ -101,12 +98,6 @@ public class IterativePlanner extends PlanningAlgorithm {
// Current stage // Current stage
ReservationRequest currentReservationStage; ReservationRequest currentReservationStage;
// Stage deadlines
long stageDeadline = stepRoundDown(reservation.getDeadline(), step);
long successorStartingTime = -1;
long predecessorEndTime = stepRoundDown(reservation.getArrival(), step);
long stageArrivalTime = -1;
// Iterate the stages in reverse order // Iterate the stages in reverse order
while (stageProvider.hasNext()) { while (stageProvider.hasNext()) {
@ -116,27 +107,17 @@ public class IterativePlanner extends PlanningAlgorithm {
// Validate that the ReservationRequest respects basic constraints // Validate that the ReservationRequest respects basic constraints
validateInputStage(plan, currentReservationStage); validateInputStage(plan, currentReservationStage);
// Compute an adjusted earliestStart for this resource // Set the stageArrival and stageDeadline
// (we need this to provision some space for the ORDER contracts) ReservationInterval stageInterval =
setStageExecutionInterval(plan, reservation, currentReservationStage,
allocations);
Long stageArrival = stageInterval.getStartTime();
Long stageDeadline = stageInterval.getEndTime();
if (allocateLeft) { // Compute stage allocation
stageArrivalTime = predecessorEndTime;
} else {
stageArrivalTime = reservation.getArrival();
if (jobType == ReservationRequestInterpreter.R_ORDER
|| jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
stageArrivalTime =
computeEarliestStartingTime(plan, reservation,
stageProvider.getCurrentIndex(), currentReservationStage,
stageDeadline);
}
stageArrivalTime = stepRoundUp(stageArrivalTime, step);
stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival());
}
// Compute the allocation of a single stage
Map<ReservationInterval, Resource> curAlloc = Map<ReservationInterval, Resource> curAlloc =
computeStageAllocation(plan, currentReservationStage, computeStageAllocation(plan, currentReservationStage, stageArrival,
stageArrivalTime, stageDeadline, user, reservationId); stageDeadline, user, reservationId);
// If we did not find an allocation, return NULL // If we did not find an allocation, return NULL
// (unless it's an ANY job, then we simply continue). // (unless it's an ANY job, then we simply continue).
@ -152,9 +133,13 @@ public class IterativePlanner extends PlanningAlgorithm {
} }
// Get the start & end time of the current allocation // Validate ORDER_NO_GAP
Long stageStartTime = findEarliestTime(curAlloc); if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
Long stageEndTime = findLatestTime(curAlloc); if (!validateOrderNoGap(allocations, curAlloc, allocateLeft)) {
throw new PlanningException(
"The allocation found does not respect ORDER_NO_GAP");
}
}
// If we did find an allocation for the stage, add it // If we did find an allocation for the stage, add it
for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) { for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
@ -165,33 +150,6 @@ public class IterativePlanner extends PlanningAlgorithm {
if (jobType == ReservationRequestInterpreter.R_ANY) { if (jobType == ReservationRequestInterpreter.R_ANY) {
break; break;
} }
// If ORDER job, set the stageDeadline of the next stage to be processed
if (jobType == ReservationRequestInterpreter.R_ORDER
|| jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
// CHECK ORDER_NO_GAP
// Verify that there is no gap, in case the job is ORDER_NO_GAP
// note that the test is different left-to-right and right-to-left
if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP
&& successorStartingTime != -1
&& ((allocateLeft && predecessorEndTime < stageStartTime) ||
(!allocateLeft && (stageEndTime < successorStartingTime))
)
|| (!isNonPreemptiveAllocation(curAlloc))) {
throw new PlanningException(
"The allocation found does not respect ORDER_NO_GAP");
}
if (allocateLeft) {
// Store the stageStartTime and set the new stageDeadline
predecessorEndTime = stageEndTime;
} else {
// Store the stageStartTime and set the new stageDeadline
successorStartingTime = stageStartTime;
stageDeadline = stageStartTime;
}
}
} }
// If the allocation is empty, return an error // If the allocation is empty, return an error
@ -200,7 +158,39 @@ public class IterativePlanner extends PlanningAlgorithm {
} }
return allocations; return allocations;
}
protected static boolean validateOrderNoGap(
RLESparseResourceAllocation allocations,
Map<ReservationInterval, Resource> curAlloc, boolean allocateLeft) {
// Left to right
if (allocateLeft) {
Long stageStartTime = findEarliestTime(curAlloc);
Long allocationEndTime = allocations.getLatestNonNullTime();
// Check that there is no gap between stages
if ((allocationEndTime != -1) && (allocationEndTime < stageStartTime)) {
return false;
}
// Right to left
} else {
Long stageEndTime = findLatestTime(curAlloc);
Long allocationStartTime = allocations.getEarliestStartTime();
// Check that there is no gap between stages
if ((allocationStartTime != -1) && (stageEndTime < allocationStartTime)) {
return false;
}
}
// Check that the stage allocation does not violate ORDER_NO_GAP
if (!isNonPreemptiveAllocation(curAlloc)) {
return false;
}
// The allocation is legal
return true;
} }
protected void initialize(Plan plan, ReservationId reservationId, protected void initialize(Plan plan, ReservationId reservationId,
@ -223,35 +213,15 @@ public class IterativePlanner extends PlanningAlgorithm {
// planLoads are not used by other StageAllocators... and don't deal // planLoads are not used by other StageAllocators... and don't deal
// well with huge reservation ranges // well with huge reservation ranges
if (this.algStageAllocator instanceof StageAllocatorLowCostAligned) { planLoads = plan.getCumulativeLoadOverTime(jobArrival, jobDeadline);
planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline); ReservationAllocation oldRes = plan.getReservationById(reservationId);
ReservationAllocation oldRes = plan.getReservationById(reservationId); if (oldRes != null) {
if (oldRes != null) { planLoads =
planModifications = RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
RLESparseResourceAllocation.merge(plan.getResourceCalculator(), plan.getTotalCapacity(), planLoads,
plan.getTotalCapacity(), planModifications, oldRes.getResourcesOverTime(), RLEOperator.subtract, jobArrival,
oldRes.getResourcesOverTime(), RLEOperator.subtract, jobDeadline);
jobArrival, jobDeadline);
}
} }
}
private Map<Long, Resource> getAllLoadsInInterval(Plan plan, long startTime,
long endTime) {
// Create map
Map<Long, Resource> loads = new HashMap<Long, Resource>();
// Calculate the load for every time slot between [start,end)
for (long t = startTime; t < endTime; t += step) {
Resource load = plan.getTotalCommittedResources(t);
loads.put(t, load);
}
// Return map
return loads;
} }
private void validateInputStage(Plan plan, ReservationRequest rr) private void validateInputStage(Plan plan, ReservationRequest rr)
@ -286,7 +256,7 @@ public class IterativePlanner extends PlanningAlgorithm {
} }
private boolean isNonPreemptiveAllocation( private static boolean isNonPreemptiveAllocation(
Map<ReservationInterval, Resource> curAlloc) { Map<ReservationInterval, Resource> curAlloc) {
// Checks whether a stage allocation is non preemptive or not. // Checks whether a stage allocation is non preemptive or not.
@ -329,14 +299,13 @@ public class IterativePlanner extends PlanningAlgorithm {
} }
// Call algEarliestStartTime() // Call setStageExecutionInterval()
protected long computeEarliestStartingTime(Plan plan, protected ReservationInterval setStageExecutionInterval(Plan plan,
ReservationDefinition reservation, int index, ReservationDefinition reservation,
ReservationRequest currentReservationStage, long stageDeadline) { ReservationRequest currentReservationStage,
RLESparseResourceAllocation allocations) {
return algStageEarliestStart.setEarliestStartTime(plan, reservation, index, return algStageExecutionInterval.computeExecutionInterval(plan,
currentReservationStage, stageDeadline); reservation, currentReservationStage, allocateLeft, allocations);
} }
// Call algStageAllocator // Call algStageAllocator
@ -350,10 +319,11 @@ public class IterativePlanner extends PlanningAlgorithm {
} }
// Set the algorithm: algStageEarliestStart // Set the algorithm: algStageExecutionInterval
public IterativePlanner setAlgStageEarliestStart(StageEarliestStart alg) { public IterativePlanner setAlgStageExecutionInterval(
StageExecutionInterval alg) {
this.algStageEarliestStart = alg; this.algStageExecutionInterval = alg;
return this; // To allow concatenation of setAlg() functions return this; // To allow concatenation of setAlg() functions
} }
@ -375,7 +345,7 @@ public class IterativePlanner extends PlanningAlgorithm {
private final boolean allocateLeft; private final boolean allocateLeft;
private ListIterator<ReservationRequest> li; private final ListIterator<ReservationRequest> li;
public StageProvider(boolean allocateLeft, public StageProvider(boolean allocateLeft,
ReservationDefinition reservation) { ReservationDefinition reservation) {

View File

@ -28,15 +28,26 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan
*/ */
public interface ReservationAgent { public interface ReservationAgent {
/**
* Constant defining the preferential treatment of time for equally valid
* allocations.
*/
final static String FAVOR_EARLY_ALLOCATION =
"yarn.resourcemanager.reservation-system.favor-early-allocation";
/**
* By default favor early allocations.
*/
final static boolean DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION = true;
/** /**
* Create a reservation for the user that abides by the specified contract * Create a reservation for the user that abides by the specified contract
* *
* @param reservationId the identifier of the reservation to be created. * @param reservationId the identifier of the reservation to be created.
* @param user the user who wants to create the reservation * @param user the user who wants to create the reservation
* @param plan the Plan to which the reservation must be fitted * @param plan the Plan to which the reservation must be fitted
* @param contract encapsulates the resources the user requires for his * @param contract encapsulates the resources the user requires for his
* session * session
* *
* @return whether the create operation was successful or not * @return whether the create operation was successful or not
* @throws PlanningException if the session cannot be fitted into the plan * @throws PlanningException if the session cannot be fitted into the plan
*/ */
@ -45,13 +56,13 @@ public interface ReservationAgent {
/** /**
* Update a reservation for the user that abides by the specified contract * Update a reservation for the user that abides by the specified contract
* *
* @param reservationId the identifier of the reservation to be updated * @param reservationId the identifier of the reservation to be updated
* @param user the user who wants to create the session * @param user the user who wants to create the session
* @param plan the Plan to which the reservation must be fitted * @param plan the Plan to which the reservation must be fitted
* @param contract encapsulates the resources the user requires for his * @param contract encapsulates the resources the user requires for his
* reservation * reservation
* *
* @return whether the update operation was successful or not * @return whether the update operation was successful or not
* @throws PlanningException if the reservation cannot be fitted into the plan * @throws PlanningException if the reservation cannot be fitted into the plan
*/ */
@ -60,11 +71,11 @@ public interface ReservationAgent {
/** /**
* Delete an user reservation * Delete an user reservation
* *
* @param reservationId the identifier of the reservation to be deleted * @param reservationId the identifier of the reservation to be deleted
* @param user the user who wants to create the reservation * @param user the user who wants to create the reservation
* @param plan the Plan to which the session must be fitted * @param plan the Plan to which the session must be fitted
* *
* @return whether the delete operation was successful or not * @return whether the delete operation was successful or not
* @throws PlanningException if the reservation cannot be fitted into the plan * @throws PlanningException if the reservation cannot be fitted into the plan
*/ */

View File

@ -42,7 +42,7 @@ import com.google.common.annotations.VisibleForTesting;
* This (re)planner scan a period of time from now to a maximum time window (or * This (re)planner scan a period of time from now to a maximum time window (or
* the end of the last session, whichever comes first) checking the overall * the end of the last session, whichever comes first) checking the overall
* capacity is not violated. * capacity is not violated.
* *
* It greedily removes sessions in reversed order of acceptance (latest accepted * It greedily removes sessions in reversed order of acceptance (latest accepted
* is the first removed). * is the first removed).
*/ */
@ -90,8 +90,8 @@ public class SimpleCapacityReplanner implements Planner {
// loop on all moment in time from now to the end of the check Zone // loop on all moment in time from now to the end of the check Zone
// or the end of the planned sessions whichever comes first // or the end of the planned sessions whichever comes first
for (long t = now; for (long t = now;
(t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone));
t += plan.getStep()) { t += plan.getStep()) {
Resource excessCap = Resource excessCap =
Resources.subtract(plan.getTotalCommittedResources(t), totCap); Resources.subtract(plan.getTotalCommittedResources(t), totCap);
@ -102,7 +102,7 @@ public class SimpleCapacityReplanner implements Planner {
new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t)); new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t));
for (Iterator<ReservationAllocation> resIter = for (Iterator<ReservationAllocation> resIter =
curReservations.iterator(); resIter.hasNext() curReservations.iterator(); resIter.hasNext()
&& Resources.greaterThan(resCalc, totCap, excessCap, && Resources.greaterThan(resCalc, totCap, excessCap,
ZERO_RESOURCE);) { ZERO_RESOURCE);) {
ReservationAllocation reservation = resIter.next(); ReservationAllocation reservation = resIter.next();
plan.deleteReservation(reservation.getReservationId()); plan.deleteReservation(reservation.getReservationId());

View File

@ -41,19 +41,21 @@ public interface StageAllocator {
* @param planModifications the allocations performed by the planning * @param planModifications the allocations performed by the planning
* algorithm which are not yet reflected by plan * algorithm which are not yet reflected by plan
* @param rr the stage * @param rr the stage
* @param stageEarliestStart the arrival time (earliest starting time) set for * @param stageArrival the arrival time (earliest starting time) set for
* the stage by the two phase planning algorithm * the stage by the two phase planning algorithm
* @param stageDeadline the deadline of the stage set by the two phase * @param stageDeadline the deadline of the stage set by the two phase
* planning algorithm * planning algorithm
* @param user name of the user
* @param oldId identifier of the old reservation
* *
* @return The computed allocation (or null if the stage could not be * @return The computed allocation (or null if the stage could not be
* allocated) * allocated)
* @throws PlanningException * @throws PlanningException
*/ */
Map<ReservationInterval, Resource> computeStageAllocation(Plan plan, Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
Map<Long, Resource> planLoads, RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr, RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageEarliestStart, long stageDeadline, String user, long stageArrival, long stageDeadline, String user,
ReservationId oldId) throws PlanningException; ReservationId oldId) throws PlanningException;
} }

View File

@ -26,8 +26,8 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -41,7 +41,7 @@ public class StageAllocatorGreedy implements StageAllocator {
@Override @Override
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan, public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
Map<Long, Resource> planLoads, RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr, RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageEarliestStart, long stageDeadline, String user, long stageEarliestStart, long stageDeadline, String user,
ReservationId oldId) throws PlanningException { ReservationId oldId) throws PlanningException {

View File

@ -29,8 +29,8 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -52,7 +52,7 @@ public class StageAllocatorGreedyRLE implements StageAllocator {
@Override @Override
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan, public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
Map<Long, Resource> planLoads, RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr, RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageEarliestStart, long stageDeadline, String user, long stageEarliestStart, long stageDeadline, String user,
ReservationId oldId) throws PlanningException { ReservationId oldId) throws PlanningException {

View File

@ -18,8 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
@ -27,46 +31,55 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
/** /**
* A stage allocator that iteratively allocates containers in the * A stage allocator that iteratively allocates containers in the
* {@link DurationInterval} with lowest overall cost. The algorithm only * {@link DurationInterval} with lowest overall cost. The algorithm only
* considers intervals of the form: [stageDeadline - (n+1)*duration, * considers non-overlapping intervals of length 'duration'. This guarantees
* stageDeadline - n*duration) for an integer n. This guarantees that the * that the allocations are aligned. If 'allocateLeft == true', the intervals
* allocations are aligned (as opposed to overlapping duration intervals). * considered by the algorithm are aligned to stageArrival; otherwise, they are
* * aligned to stageDeadline. The smoothnessFactor parameter controls the number
* The smoothnessFactor parameter controls the number of containers that are * of containers that are simultaneously allocated in each iteration of the
* simultaneously allocated in each iteration of the algorithm. * algorithm.
*/ */
public class StageAllocatorLowCostAligned implements StageAllocator { public class StageAllocatorLowCostAligned implements StageAllocator {
private final boolean allocateLeft;
// Smoothness factor // Smoothness factor
private int smoothnessFactor = 10; private int smoothnessFactor = 10;
// Constructor // Constructor
public StageAllocatorLowCostAligned() { public StageAllocatorLowCostAligned(boolean allocateLeft) {
this.allocateLeft = allocateLeft;
} }
// Constructor // Constructor
public StageAllocatorLowCostAligned(int smoothnessFactor) { public StageAllocatorLowCostAligned(int smoothnessFactor,
boolean allocateLeft) {
this.allocateLeft = allocateLeft;
this.smoothnessFactor = smoothnessFactor; this.smoothnessFactor = smoothnessFactor;
} }
// computeJobAllocation()
@Override @Override
public Map<ReservationInterval, Resource> computeStageAllocation( public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
Plan plan, Map<Long, Resource> planLoads, RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr, RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageEarliestStart, long stageDeadline, String user, long stageArrival, long stageDeadline, String user, ReservationId oldId)
ReservationId oldId) { throws PlanningException {
// Initialize // Initialize
ResourceCalculator resCalc = plan.getResourceCalculator(); ResourceCalculator resCalc = plan.getResourceCalculator();
Resource capacity = plan.getTotalCapacity(); Resource capacity = plan.getTotalCapacity();
RLESparseResourceAllocation netRLERes = plan
.getAvailableResourceOverTime(user, oldId, stageArrival, stageDeadline);
long step = plan.getStep(); long step = plan.getStep();
// Create allocationRequestsearlies // Create allocationRequestsearlies
@ -76,16 +89,15 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
// Initialize parameters // Initialize parameters
long duration = stepRoundUp(rr.getDuration(), step); long duration = stepRoundUp(rr.getDuration(), step);
int windowSizeInDurations = int windowSizeInDurations =
(int) ((stageDeadline - stageEarliestStart) / duration); (int) ((stageDeadline - stageArrival) / duration);
int totalGangs = rr.getNumContainers() / rr.getConcurrency(); int totalGangs = rr.getNumContainers() / rr.getConcurrency();
int numContainersPerGang = rr.getConcurrency(); int numContainersPerGang = rr.getConcurrency();
Resource gang = Resource gang =
Resources.multiply(rr.getCapability(), numContainersPerGang); Resources.multiply(rr.getCapability(), numContainersPerGang);
// Set maxGangsPerUnit // Set maxGangsPerUnit
int maxGangsPerUnit = int maxGangsPerUnit = (int) Math
(int) Math.max( .max(Math.floor(((double) totalGangs) / windowSizeInDurations), 1);
Math.floor(((double) totalGangs) / windowSizeInDurations), 1);
maxGangsPerUnit = Math.max(maxGangsPerUnit / smoothnessFactor, 1); maxGangsPerUnit = Math.max(maxGangsPerUnit / smoothnessFactor, 1);
// If window size is too small, return null // If window size is too small, return null
@ -93,6 +105,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
return null; return null;
} }
final int preferLeft = allocateLeft ? 1 : -1;
// Initialize tree sorted by costs // Initialize tree sorted by costs
TreeSet<DurationInterval> durationIntervalsSortedByCost = TreeSet<DurationInterval> durationIntervalsSortedByCost =
new TreeSet<DurationInterval>(new Comparator<DurationInterval>() { new TreeSet<DurationInterval>(new Comparator<DurationInterval>() {
@ -104,23 +118,26 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
return cmp; return cmp;
} }
return (-1) * Long.compare(val1.getEndTime(), val2.getEndTime()); return preferLeft
* Long.compare(val1.getEndTime(), val2.getEndTime());
} }
}); });
List<Long> intervalEndTimes =
computeIntervalEndTimes(stageArrival, stageDeadline, duration);
// Add durationIntervals that end at (endTime - n*duration) for some n. // Add durationIntervals that end at (endTime - n*duration) for some n.
for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart for (long intervalEnd : intervalEndTimes) {
+ duration; intervalEnd -= duration) {
long intervalStart = intervalEnd - duration; long intervalStart = intervalEnd - duration;
// Get duration interval [intervalStart,intervalEnd) // Get duration interval [intervalStart,intervalEnd)
DurationInterval durationInterval = DurationInterval durationInterval =
getDurationInterval(intervalStart, intervalEnd, planLoads, getDurationInterval(intervalStart, intervalEnd, planLoads,
planModifications, capacity, resCalc, step); planModifications, capacity, netRLERes, resCalc, step, gang);
// If the interval can fit a gang, add it to the tree // If the interval can fit a gang, add it to the tree
if (durationInterval.canAllocate(gang, capacity, resCalc)) { if (durationInterval.canAllocate()) {
durationIntervalsSortedByCost.add(durationInterval); durationIntervalsSortedByCost.add(durationInterval);
} }
} }
@ -139,8 +156,7 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
durationIntervalsSortedByCost.first(); durationIntervalsSortedByCost.first();
int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs); int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs);
numGangsToAllocate = numGangsToAllocate =
Math.min(numGangsToAllocate, Math.min(numGangsToAllocate, bestDurationInterval.numCanFit());
bestDurationInterval.numCanFit(gang, capacity, resCalc));
// Add it // Add it
remainingGangs -= numGangsToAllocate; remainingGangs -= numGangsToAllocate;
@ -148,9 +164,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
new ReservationInterval(bestDurationInterval.getStartTime(), new ReservationInterval(bestDurationInterval.getStartTime(),
bestDurationInterval.getEndTime()); bestDurationInterval.getEndTime());
Resource reservationRes = Resource reservationRes = Resources.multiply(rr.getCapability(),
Resources.multiply(rr.getCapability(), rr.getConcurrency() rr.getConcurrency() * numGangsToAllocate);
* numGangsToAllocate);
planModifications.addInterval(reservationInt, reservationRes); planModifications.addInterval(reservationInt, reservationRes);
allocationRequests.addInterval(reservationInt, reservationRes); allocationRequests.addInterval(reservationInt, reservationRes);
@ -162,10 +177,10 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
DurationInterval updatedDurationInterval = DurationInterval updatedDurationInterval =
getDurationInterval(bestDurationInterval.getStartTime(), getDurationInterval(bestDurationInterval.getStartTime(),
bestDurationInterval.getStartTime() + duration, planLoads, bestDurationInterval.getStartTime() + duration, planLoads,
planModifications, capacity, resCalc, step); planModifications, capacity, netRLERes, resCalc, step, gang);
// Add to tree, if possible // Add to tree, if possible
if (updatedDurationInterval.canAllocate(gang, capacity, resCalc)) { if (updatedDurationInterval.canAllocate()) {
durationIntervalsSortedByCost.add(updatedDurationInterval); durationIntervalsSortedByCost.add(updatedDurationInterval);
} }
@ -180,10 +195,12 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
return allocations; return allocations;
} else { } else {
// If we are here is because we did not manage to satisfy this request. // If we are here is because we did not manage to satisfy this
// We remove unwanted side-effect from planModifications (needed for ANY). // request.
for (Map.Entry<ReservationInterval, Resource> tempAllocation // We remove unwanted side-effect from planModifications (needed for
: allocations.entrySet()) { // ANY).
for (Map.Entry<ReservationInterval, Resource> tempAllocation : allocations
.entrySet()) {
planModifications.removeInterval(tempAllocation.getKey(), planModifications.removeInterval(tempAllocation.getKey(),
tempAllocation.getValue()); tempAllocation.getValue());
@ -196,37 +213,144 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
} }
protected DurationInterval getDurationInterval(long startTime, long endTime, private List<Long> computeIntervalEndTimes(long stageEarliestStart,
Map<Long, Resource> planLoads, long stageDeadline, long duration) {
RLESparseResourceAllocation planModifications, Resource capacity,
ResourceCalculator resCalc, long step) {
// Initialize the dominant loads structure
Resource dominantResources = Resource.newInstance(0, 0);
// Calculate totalCost and maxLoad
double totalCost = 0.0;
for (long t = startTime; t < endTime; t += step) {
// Get the load
Resource load = getLoadAtTime(t, planLoads, planModifications);
// Increase the total cost
totalCost += calcCostOfLoad(load, capacity, resCalc);
// Update the dominant resources
dominantResources = Resources.componentwiseMax(dominantResources, load);
List<Long> intervalEndTimes = new ArrayList<Long>();
if (!allocateLeft) {
for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart
+ duration; intervalEnd -= duration) {
intervalEndTimes.add(intervalEnd);
}
} else {
for (long intervalStart =
stageEarliestStart; intervalStart <= stageDeadline
- duration; intervalStart += duration) {
intervalEndTimes.add(intervalStart + duration);
}
} }
// Return the corresponding durationInterval return intervalEndTimes;
return new DurationInterval(startTime, endTime, totalCost, }
dominantResources);
protected static DurationInterval getDurationInterval(long startTime,
long endTime, RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, Resource capacity,
RLESparseResourceAllocation netRLERes, ResourceCalculator resCalc,
long step, Resource requestedResources) throws PlanningException {
// Get the total cost associated with the duration interval
double totalCost = getDurationIntervalTotalCost(startTime, endTime,
planLoads, planModifications, capacity, resCalc, step);
// Calculate how many gangs can fit, i.e., how many times can 'capacity'
// be allocated within the duration interval [startTime, endTime)
int gangsCanFit = getDurationIntervalGangsCanFit(startTime, endTime,
planModifications, capacity, netRLERes, resCalc, requestedResources);
// Return the desired durationInterval
return new DurationInterval(startTime, endTime, totalCost, gangsCanFit);
} }
protected static double getDurationIntervalTotalCost(long startTime,
long endTime, RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, Resource capacity,
ResourceCalculator resCalc, long step) throws PlanningException {
// Compute the current resource load within the interval [startTime,endTime)
// by adding planLoads (existing load) and planModifications (load that
// corresponds to the current job).
RLESparseResourceAllocation currentLoad =
RLESparseResourceAllocation.merge(resCalc, capacity, planLoads,
planModifications, RLEOperator.add, startTime, endTime);
// Convert load from RLESparseResourceAllocation to a Map representation
NavigableMap<Long, Resource> mapCurrentLoad = currentLoad.getCumulative();
// Initialize auxiliary variables
double totalCost = 0.0;
Long tPrev = -1L;
Resource loadPrev = Resources.none();
double cost = 0.0;
// Iterate over time points. For each point 't', accumulate the total cost
// that corresponds to the interval [tPrev, t). The cost associated within
// this interval is fixed for each of the time steps, therefore the cost of
// a single step is multiplied by (t - tPrev) / step.
for (Entry<Long, Resource> e : mapCurrentLoad.entrySet()) {
Long t = e.getKey();
Resource load = e.getValue();
if (tPrev != -1L) {
tPrev = Math.max(tPrev, startTime);
cost = calcCostOfLoad(loadPrev, capacity, resCalc);
totalCost = totalCost + cost * (t - tPrev) / step;
}
tPrev = t;
loadPrev = load;
}
// Add the cost associated with the last interval (the for loop does not
// calculate it).
if (loadPrev != null) {
// This takes care of the corner case of a single entry
tPrev = Math.max(tPrev, startTime);
cost = calcCostOfLoad(loadPrev, capacity, resCalc);
totalCost = totalCost + cost * (endTime - tPrev) / step;
}
// Return the overall cost
return totalCost;
}
protected static int getDurationIntervalGangsCanFit(long startTime,
long endTime, RLESparseResourceAllocation planModifications,
Resource capacity, RLESparseResourceAllocation netRLERes,
ResourceCalculator resCalc, Resource requestedResources)
throws PlanningException {
// Initialize auxiliary variables
int gangsCanFit = Integer.MAX_VALUE;
int curGangsCanFit;
// Calculate the total amount of available resources between startTime
// and endTime, by subtracting planModifications from netRLERes
RLESparseResourceAllocation netAvailableResources =
RLESparseResourceAllocation.merge(resCalc, capacity, netRLERes,
planModifications, RLEOperator.subtractTestNonNegative, startTime,
endTime);
// Convert result to a map
NavigableMap<Long, Resource> mapAvailableCapacity =
netAvailableResources.getCumulative();
// Iterate over the map representation.
// At each point, calculate how many times does 'requestedResources' fit.
// The result is the minimum over all time points.
for (Entry<Long, Resource> e : mapAvailableCapacity.entrySet()) {
Long t = e.getKey();
Resource curAvailable = e.getValue();
if (t >= endTime) {
break;
}
if (curAvailable == null) {
gangsCanFit = 0;
} else {
curGangsCanFit = (int) Math.floor(Resources.divide(resCalc, capacity,
curAvailable, requestedResources));
if (curGangsCanFit < gangsCanFit) {
gangsCanFit = curGangsCanFit;
}
}
}
return gangsCanFit;
}
protected double calcCostOfInterval(long startTime, long endTime, protected double calcCostOfInterval(long startTime, long endTime,
Map<Long, Resource> planLoads, RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, Resource capacity, RLESparseResourceAllocation planModifications, Resource capacity,
ResourceCalculator resCalc, long step) { ResourceCalculator resCalc, long step) {
@ -242,7 +366,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
} }
protected double calcCostOfTimeSlot(long t, Map<Long, Resource> planLoads, protected double calcCostOfTimeSlot(long t,
RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, Resource capacity, RLESparseResourceAllocation planModifications, Resource capacity,
ResourceCalculator resCalc) { ResourceCalculator resCalc) {
@ -254,17 +379,17 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
} }
protected Resource getLoadAtTime(long t, Map<Long, Resource> planLoads, protected Resource getLoadAtTime(long t,
RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications) { RLESparseResourceAllocation planModifications) {
Resource planLoad = planLoads.get(t); Resource planLoad = planLoads.getCapacityAtTime(t);
planLoad = (planLoad == null) ? Resource.newInstance(0, 0) : planLoad;
return Resources.add(planLoad, planModifications.getCapacityAtTime(t)); return Resources.add(planLoad, planModifications.getCapacityAtTime(t));
} }
protected double calcCostOfLoad(Resource load, Resource capacity, protected static double calcCostOfLoad(Resource load, Resource capacity,
ResourceCalculator resCalc) { ResourceCalculator resCalc) {
return resCalc.ratio(load, capacity); return resCalc.ratio(load, capacity);
@ -289,42 +414,30 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
private long startTime; private long startTime;
private long endTime; private long endTime;
private double cost; private double cost;
private Resource maxLoad; private final int gangsCanFit;
// Constructor // Constructor
public DurationInterval(long startTime, long endTime, double cost, public DurationInterval(long startTime, long endTime, double cost,
Resource maxLoad) { int gangsCanfit) {
this.startTime = startTime; this.startTime = startTime;
this.endTime = endTime; this.endTime = endTime;
this.cost = cost; this.cost = cost;
this.maxLoad = maxLoad; this.gangsCanFit = gangsCanfit;
} }
// canAllocate() - boolean function, returns whether requestedResources // canAllocate() - boolean function, returns whether requestedResources
// can be allocated during the durationInterval without // can be allocated during the durationInterval without
// violating capacity constraints // violating capacity constraints
public boolean canAllocate(Resource requestedResources, Resource capacity, public boolean canAllocate() {
ResourceCalculator resCalc) { return (gangsCanFit > 0);
Resource updatedMaxLoad = Resources.add(maxLoad, requestedResources);
return (resCalc.compare(capacity, updatedMaxLoad, capacity) <= 0);
} }
// numCanFit() - returns the maximal number of requestedResources can be // numCanFit() - returns the maximal number of requestedResources can be
// allocated during the durationInterval without violating // allocated during the durationInterval without violating
// capacity constraints // capacity constraints
public int numCanFit(Resource requestedResources, Resource capacity,
ResourceCalculator resCalc) {
// Represents the largest resource demand that can be satisfied throughout
// the entire DurationInterval (i.e., during [startTime,endTime))
Resource availableResources = Resources.subtract(capacity, maxLoad);
// Maximal number of requestedResources that fit inside the interval
return (int) Math.floor(Resources.divide(resCalc, capacity,
availableResources, requestedResources));
public int numCanFit() {
return gangsCanFit;
} }
public long getStartTime() { public long getStartTime() {
@ -343,14 +456,6 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
this.endTime = value; this.endTime = value;
} }
public Resource getMaxLoad() {
return this.maxLoad;
}
public void setMaxLoad(Resource value) {
this.maxLoad = value;
}
public double getTotalCost() { public double getTotalCost() {
return this.cost; return this.cost;
} }
@ -359,11 +464,17 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
this.cost = value; this.cost = value;
} }
@Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append(" start: " + startTime).append(" end: " + endTime) sb.append(" start: " + startTime).append(" end: " + endTime)
.append(" cost: " + cost).append(" maxLoad: " + maxLoad); .append(" cost: " + cost).append(" gangsCanFit: " + gangsCanFit);
return sb.toString(); return sb.toString();
} }
} }
} }

View File

@ -1,106 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.ListIterator;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
/**
* Sets the earliest start time of a stage proportional to the job weight. The
* interval [jobArrival, stageDeadline) is divided as follows. First, each stage
* is guaranteed at least its requested duration. Then, the stage receives a
* fraction of the remaining time. The fraction is calculated as the ratio
* between the weight (total requested resources) of the stage and the total
* weight of all proceeding stages.
*/
public class StageEarliestStartByDemand implements StageEarliestStart {
private long step;
@Override
public long setEarliestStartTime(Plan plan,
ReservationDefinition reservation, int index, ReservationRequest current,
long stageDeadline) {
step = plan.getStep();
// If this is the first stage, don't bother with the computation.
if (index < 1) {
return reservation.getArrival();
}
// Get iterator
ListIterator<ReservationRequest> li =
reservation.getReservationRequests().getReservationResources()
.listIterator(index);
ReservationRequest rr;
// Calculate the total weight & total duration
double totalWeight = calcWeight(current);
long totalDuration = getRoundedDuration(current, plan);
while (li.hasPrevious()) {
rr = li.previous();
totalWeight += calcWeight(rr);
totalDuration += getRoundedDuration(rr, plan);
}
// Compute the weight of the current stage as compared to remaining ones
double ratio = calcWeight(current) / totalWeight;
// Estimate an early start time, such that:
// 1. Every stage is guaranteed to receive at least its duration
// 2. The remainder of the window is divided between stages
// proportionally to its workload (total memory consumption)
long window = stageDeadline - reservation.getArrival();
long windowRemainder = window - totalDuration;
long earlyStart =
(long) (stageDeadline - getRoundedDuration(current, plan)
- (windowRemainder * ratio));
// Realign if necessary (since we did some arithmetic)
earlyStart = stepRoundUp(earlyStart, step);
// Return
return earlyStart;
}
// Weight = total memory consumption of stage
protected double calcWeight(ReservationRequest stage) {
return (stage.getDuration() * stage.getCapability().getMemorySize())
* (stage.getNumContainers());
}
protected long getRoundedDuration(ReservationRequest stage, Plan plan) {
return stepRoundUp(stage.getDuration(), step);
}
protected static long stepRoundDown(long t, long step) {
return (t / step) * step;
}
protected static long stepRoundUp(long t, long step) {
return ((t + step - 1) / step) * step;
}
}

View File

@ -1,39 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
/**
* Sets the earliest start time of a stage as the job arrival time.
*/
public class StageEarliestStartByJobArrival implements StageEarliestStart {
@Override
public long setEarliestStartTime(Plan plan,
ReservationDefinition reservation, int index, ReservationRequest current,
long stageDeadline) {
return reservation.getArrival();
}
}

View File

@ -21,26 +21,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
/** /**
* Interface for setting the earliest start time of a stage in IterativePlanner. * An auxiliary class used to compute the time interval in which the stage can
* be allocated resources by {@link IterativePlanner}.
*/ */
public interface StageEarliestStart { public interface StageExecutionInterval {
/** /**
* Computes the earliest allowed starting time for a given stage. * Computes the earliest allowed starting time for a given stage.
* *
* @param plan the Plan to which the reservation must be fitted * @param plan the Plan to which the reservation must be fitted
* @param reservation the job contract * @param reservation the job contract
* @param index the index of the stage in the job contract
* @param currentReservationStage the stage * @param currentReservationStage the stage
* @param stageDeadline the deadline of the stage set by the two phase * @param allocateLeft is the job allocated from left to right
* planning algorithm * @param allocations Existing resource assignments for the job
* * @return the time interval in which the stage can get resources.
* @return the earliest allowed starting time for the stage.
*/ */
long setEarliestStartTime(Plan plan, ReservationDefinition reservation, ReservationInterval computeExecutionInterval(Plan plan,
int index, ReservationRequest currentReservationStage, ReservationDefinition reservation,
long stageDeadline); ReservationRequest currentReservationStage, boolean allocateLeft,
RLESparseResourceAllocation allocations);
} }

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.IterativePlanner.StageProvider;
/**
* An implementation of {@link StageExecutionInterval}, which sets the execution
* interval of the stage. For ANY and ALL jobs, the interval is
* [jobArrival,jobDeadline]. For ORDER jobs, the the maximal possible time
* interval is divided as follows: First, each stage is guaranteed at least its
* requested duration. Then, the stage receives a fraction of the remaining
* time. The fraction is calculated as the ratio between the weight (total
* requested resources) of the stage and the total weight of all remaining
* stages.
*/
public class StageExecutionIntervalByDemand implements StageExecutionInterval {
private long step;
@Override
public ReservationInterval computeExecutionInterval(Plan plan,
ReservationDefinition reservation,
ReservationRequest currentReservationStage, boolean allocateLeft,
RLESparseResourceAllocation allocations) {
// Use StageExecutionIntervalUnconstrained to get the maximal interval
ReservationInterval maxInterval =
(new StageExecutionIntervalUnconstrained()).computeExecutionInterval(
plan, reservation, currentReservationStage, allocateLeft,
allocations);
ReservationRequestInterpreter jobType =
reservation.getReservationRequests().getInterpreter();
// For unconstrained jobs, such as ALL & ANY, we can use the unconstrained
// version
if ((jobType != ReservationRequestInterpreter.R_ORDER)
&& (jobType != ReservationRequestInterpreter.R_ORDER_NO_GAP)) {
return maxInterval;
}
// For ORDER and ORDER_NO_GAP, take a sub-interval of maxInterval
step = plan.getStep();
double totalWeight = 0.0;
long totalDuration = 0;
// Iterate over the stages that haven't been allocated.
// For allocateLeft == True, we iterate in reverse order, starting from the
// last
// stage, until we reach the current stage.
// For allocateLeft == False, we do the opposite.
StageProvider stageProvider = new StageProvider(!allocateLeft, reservation);
while (stageProvider.hasNext()) {
ReservationRequest rr = stageProvider.next();
totalWeight += calcWeight(rr);
totalDuration += getRoundedDuration(rr, step);
// Stop once we reach current
if (rr == currentReservationStage) {
break;
}
}
// Compute the weight of the current stage as compared to remaining ones
double ratio = calcWeight(currentReservationStage) / totalWeight;
// Estimate an early start time, such that:
// 1. Every stage is guaranteed to receive at least its duration
// 2. The remainder of the window is divided between stages
// proportionally to its workload (total memory consumption)
long maxIntervalArrival = maxInterval.getStartTime();
long maxIntervalDeadline = maxInterval.getEndTime();
long window = maxIntervalDeadline - maxIntervalArrival;
long windowRemainder = window - totalDuration;
if (allocateLeft) {
long latestEnd =
(long) (maxIntervalArrival
+ getRoundedDuration(currentReservationStage, step)
+ (windowRemainder * ratio));
// Realign if necessary (since we did some arithmetic)
latestEnd = stepRoundDown(latestEnd, step);
// Return new interval
return new ReservationInterval(maxIntervalArrival, latestEnd);
} else {
long earlyStart =
(long) (maxIntervalDeadline
- getRoundedDuration(currentReservationStage, step)
- (windowRemainder * ratio));
// Realign if necessary (since we did some arithmetic)
earlyStart = stepRoundUp(earlyStart, step);
// Return new interval
return new ReservationInterval(earlyStart, maxIntervalDeadline);
}
}
// Weight = total memory consumption of stage
protected double calcWeight(ReservationRequest stage) {
return (stage.getDuration() * stage.getCapability().getMemorySize())
* (stage.getNumContainers());
}
protected long getRoundedDuration(ReservationRequest stage, Long s) {
return stepRoundUp(stage.getDuration(), s);
}
protected static long stepRoundDown(long t, long s) {
return (t / s) * s;
}
protected static long stepRoundUp(long t, long s) {
return ((t + s - 1) / s) * s;
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
/**
* An implementation of {@link StageExecutionInterval} which gives each stage
* the maximal possible time interval, given the job constraints. Specifically,
* for ANY and ALL jobs, the interval would be [jobArrival, jobDeadline). For
* ORDER jobs, the stage cannot start before its predecessors (if allocateLeft
* == true) or cannot end before its successors (if allocateLeft == false)
*/
public class StageExecutionIntervalUnconstrained implements
StageExecutionInterval {
@Override
public ReservationInterval computeExecutionInterval(Plan plan,
ReservationDefinition reservation,
ReservationRequest currentReservationStage, boolean allocateLeft,
RLESparseResourceAllocation allocations) {
Long stageArrival = reservation.getArrival();
Long stageDeadline = reservation.getDeadline();
ReservationRequestInterpreter jobType =
reservation.getReservationRequests().getInterpreter();
// Left to right
if (allocateLeft) {
// If ORDER job, change the stage arrival time
if ((jobType == ReservationRequestInterpreter.R_ORDER)
|| (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP)) {
Long allocationEndTime = allocations.getLatestNonNullTime();
if (allocationEndTime != -1) {
stageArrival = allocationEndTime;
}
}
// Right to left
} else {
// If ORDER job, change the stage deadline
if ((jobType == ReservationRequestInterpreter.R_ORDER)
|| (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP)) {
Long allocationStartTime = allocations.getEarliestStartTime();
if (allocationStartTime != -1) {
stageDeadline = allocationStartTime;
}
}
}
return new ReservationInterval(stageArrival, stageDeadline);
}
}

View File

@ -18,13 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random; import java.util.Random;
import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationDefinition;
@ -37,29 +41,32 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy; import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.StageAllocatorLowCostAligned.DurationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.eclipse.jetty.util.log.Log;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mortbay.log.Log;
public class TestAlignedPlanner { public class TestAlignedPlanner {
ReservationAgent agent; private ReservationAgent agentRight;
InMemoryPlan plan; private ReservationAgent agentLeft;
Resource minAlloc = Resource.newInstance(1024, 1); private InMemoryPlan plan;
ResourceCalculator res = new DefaultResourceCalculator(); private final Resource minAlloc = Resource.newInstance(1024, 1);
Resource maxAlloc = Resource.newInstance(1024 * 8, 8); private final ResourceCalculator res = new DefaultResourceCalculator();
Random rand = new Random(); private final Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
long step; private final Random rand = new Random();
private Resource clusterCapacity;
private long step;
@Test @Test
public void testSingleReservationAccept() throws PlanningException { public void testSingleReservationAccept() throws PlanningException {
@ -82,7 +89,7 @@ public class TestAlignedPlanner {
// Add reservation // Add reservation
ReservationId reservationID = ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId(); ReservationSystemTestUtil.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr1); agentRight.createReservation(reservationID, "u1", plan, rr1);
// CHECK: allocation was accepted // CHECK: allocation was accepted
assertTrue("Agent-based allocation failed", reservationID != null); assertTrue("Agent-based allocation failed", reservationID != null);
@ -107,7 +114,7 @@ public class TestAlignedPlanner {
// Create reservation // Create reservation
ReservationDefinition rr1 = ReservationDefinition rr1 =
createReservationDefinition( createReservationDefinition(
10L, // Job arrival time 10 * step, // Job arrival time
15 * step, // Job deadline 15 * step, // Job deadline
new ReservationRequest[] { new ReservationRequest[] {
ReservationRequest.newInstance( ReservationRequest.newInstance(
@ -126,7 +133,7 @@ public class TestAlignedPlanner {
try { try {
ReservationId reservationID = ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId(); ReservationSystemTestUtil.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr1); agentRight.createReservation(reservationID, "u1", plan, rr1);
fail(); fail();
} catch (PlanningException e) { } catch (PlanningException e) {
// Expected failure // Expected failure
@ -166,7 +173,7 @@ public class TestAlignedPlanner {
try { try {
ReservationId reservationID = ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId(); ReservationSystemTestUtil.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr1); agentRight.createReservation(reservationID, "u1", plan, rr1);
fail(); fail();
} catch (PlanningException e) { } catch (PlanningException e) {
// Expected failure // Expected failure
@ -206,7 +213,7 @@ public class TestAlignedPlanner {
try { try {
ReservationId reservationID = ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId(); ReservationSystemTestUtil.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr1); agentRight.createReservation(reservationID, "u1", plan, rr1);
fail(); fail();
} catch (PlanningException e) { } catch (PlanningException e) {
// Expected failure // Expected failure
@ -246,7 +253,7 @@ public class TestAlignedPlanner {
try { try {
ReservationId reservationID = ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId(); ReservationSystemTestUtil.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr1); agentRight.createReservation(reservationID, "u1", plan, rr1);
fail(); fail();
} catch (PlanningException e) { } catch (PlanningException e) {
// Expected failure // Expected failure
@ -285,7 +292,7 @@ public class TestAlignedPlanner {
// Add reservation // Add reservation
ReservationId reservationID = ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId(); ReservationSystemTestUtil.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr1); agentRight.createReservation(reservationID, "u1", plan, rr1);
// CHECK: allocation was accepted // CHECK: allocation was accepted
assertTrue("Agent-based allocation failed", reservationID != null); assertTrue("Agent-based allocation failed", reservationID != null);
@ -328,7 +335,7 @@ public class TestAlignedPlanner {
// Add reservation // Add reservation
ReservationId reservationID = ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId(); ReservationSystemTestUtil.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr1); agentRight.createReservation(reservationID, "u1", plan, rr1);
// CHECK: allocation was accepted // CHECK: allocation was accepted
assertTrue("Agent-based allocation failed", reservationID != null); assertTrue("Agent-based allocation failed", reservationID != null);
@ -374,7 +381,7 @@ public class TestAlignedPlanner {
try { try {
ReservationId reservationID = ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId(); ReservationSystemTestUtil.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr1); agentRight.createReservation(reservationID, "u1", plan, rr1);
fail(); fail();
} catch (PlanningException e) { } catch (PlanningException e) {
// Expected failure // Expected failure
@ -420,10 +427,10 @@ public class TestAlignedPlanner {
ReservationSystemTestUtil.getNewReservationId(); ReservationSystemTestUtil.getNewReservationId();
// Add block, add flex, remove block, update flex // Add block, add flex, remove block, update flex
agent.createReservation(blockReservationID, "uBlock", plan, rrBlock); agentRight.createReservation(blockReservationID, "uBlock", plan, rrBlock);
agent.createReservation(flexReservationID, "uFlex", plan, rrFlex); agentRight.createReservation(flexReservationID, "uFlex", plan, rrFlex);
agent.deleteReservation(blockReservationID, "uBlock", plan); agentRight.deleteReservation(blockReservationID, "uBlock", plan);
agent.updateReservation(flexReservationID, "uFlex", plan, rrFlex); agentRight.updateReservation(flexReservationID, "uFlex", plan, rrFlex);
// CHECK: allocation was accepted // CHECK: allocation was accepted
assertTrue("Agent-based allocation failed", flexReservationID != null); assertTrue("Agent-based allocation failed", flexReservationID != null);
@ -458,7 +465,7 @@ public class TestAlignedPlanner {
try { try {
ReservationId reservationID = ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId(); ReservationSystemTestUtil.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr1); agentRight.createReservation(reservationID, "u1", plan, rr1);
fail(); fail();
} catch (PlanningException e) { } catch (PlanningException e) {
// Expected failure // Expected failure
@ -490,7 +497,7 @@ public class TestAlignedPlanner {
// Add reservation // Add reservation
ReservationId reservationID = ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId(); ReservationSystemTestUtil.getNewReservationId();
agent.createReservation(reservationID, "u1", plan, rr1); agentRight.createReservation(reservationID, "u1", plan, rr1);
// CHECK: allocation was accepted // CHECK: allocation was accepted
assertTrue("Agent-based allocation failed", reservationID != null); assertTrue("Agent-based allocation failed", reservationID != null);
@ -557,9 +564,9 @@ public class TestAlignedPlanner {
ReservationSystemTestUtil.getNewReservationId(); ReservationSystemTestUtil.getNewReservationId();
// Add all // Add all
agent.createReservation(reservationID1, "u1", plan, rr7Mem1Core); agentRight.createReservation(reservationID1, "u1", plan, rr7Mem1Core);
agent.createReservation(reservationID2, "u2", plan, rr6Mem6Cores); agentRight.createReservation(reservationID2, "u2", plan, rr6Mem6Cores);
agent.createReservation(reservationID3, "u3", plan, rr); agentRight.createReservation(reservationID3, "u3", plan, rr);
// Get reservation // Get reservation
ReservationAllocation alloc3 = plan.getReservationById(reservationID3); ReservationAllocation alloc3 = plan.getReservationById(reservationID3);
@ -684,8 +691,8 @@ public class TestAlignedPlanner {
for (ReservationDefinition rr : list) { for (ReservationDefinition rr : list) {
ReservationId reservationID = ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId(); ReservationSystemTestUtil.getNewReservationId();
agent.createReservation(reservationID, "u" + Integer.toString(i), plan, agentRight.createReservation(reservationID, "u" + Integer.toString(i),
rr); plan, rr);
++i; ++i;
} }
@ -695,13 +702,335 @@ public class TestAlignedPlanner {
} }
@Test
public void testSingleReservationAcceptAllocateLeft()
throws PlanningException {
// Create reservation
ReservationDefinition rr1 =
createReservationDefinition(
10 * step, // Job arrival time
35 * step, // Job deadline
new ReservationRequest[] {
ReservationRequest.newInstance(
Resource.newInstance(1024, 1), // Capability
20, // Num containers
20, // Concurrency
10 * step), // Duration
ReservationRequest.newInstance(
Resource.newInstance(1024, 1), // Capability
20, // Num containers
20, // Concurrency
10 * step) }, // Duration
ReservationRequestInterpreter.R_ORDER, "u1");
// Add reservation
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
agentLeft.createReservation(reservationID, "u1", plan, rr1);
// CHECK: allocation was accepted
assertTrue("Agent-based allocation failed", reservationID != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == 1);
// Get reservation
ReservationAllocation alloc1 = plan.getReservationById(reservationID);
// Verify allocation
assertTrue(alloc1.toString(),
check(alloc1, 10 * step, 30 * step, 20, 1024, 1));
}
@Test
public void testLeftSucceedsRightFails() throws PlanningException {
// Prepare basic plan
int numJobsInScenario = initializeScenario2();
// Create reservation
ReservationDefinition rr1 =
createReservationDefinition(
7 * step, // Job arrival time
16 * step, // Job deadline
new ReservationRequest[] {
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
20, // Num containers
20, // Concurrency
2 * step), // Duration
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
20, // Num containers
20, // Concurrency
2 * step) }, // Duration
ReservationRequestInterpreter.R_ORDER, "u1");
ReservationDefinition rr2 =
createReservationDefinition(
14 * step, // Job arrival time
16 * step, // Job deadline
new ReservationRequest[] {
ReservationRequest.newInstance(
Resource.newInstance(1024, 1), // Capability
100, // Num containers
100, // Concurrency
2 * step) }, // Duration
ReservationRequestInterpreter.R_ORDER, "u2");
// Add 1st reservation
ReservationId reservationID1 =
ReservationSystemTestUtil.getNewReservationId();
agentLeft.createReservation(reservationID1, "u1", plan, rr1);
// CHECK: allocation was accepted
assertTrue("Agent-based allocation failed", reservationID1 != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == numJobsInScenario + 1);
// Get reservation
ReservationAllocation alloc1 = plan.getReservationById(reservationID1);
// Verify allocation
assertTrue(alloc1.toString(),
check(alloc1, 7 * step, 11 * step, 20, 1024, 1));
// Add second reservation
ReservationId reservationID2 =
ReservationSystemTestUtil.getNewReservationId();
agentLeft.createReservation(reservationID2, "u2", plan, rr2);
// CHECK: allocation was accepted
assertTrue("Agent-based allocation failed", reservationID2 != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == numJobsInScenario + 2);
// Get reservation
ReservationAllocation alloc2 = plan.getReservationById(reservationID2);
// Verify allocation
assertTrue(alloc2.toString(),
check(alloc2, 14 * step, 16 * step, 100, 1024, 1));
agentLeft.deleteReservation(reservationID1, "u1", plan);
agentLeft.deleteReservation(reservationID2, "u2", plan);
// Now try to allocate the same jobs with agentRight. The second
// job should fail
// Add 1st reservation
ReservationId reservationID3 =
ReservationSystemTestUtil.getNewReservationId();
agentRight.createReservation(reservationID3, "u1", plan, rr1);
// CHECK: allocation was accepted
assertTrue("Agent-based allocation failed", reservationID3 != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == numJobsInScenario + 1);
// Add 2nd reservation
try {
ReservationId reservationID4 =
ReservationSystemTestUtil.getNewReservationId();
agentRight.createReservation(reservationID4, "u2", plan, rr2);
fail();
} catch (PlanningException e) {
// Expected failure
}
}
@Test
public void testValidateOrderNoGap() {
//
// Initialize allocations
//
RLESparseResourceAllocation allocation =
new RLESparseResourceAllocation(res);
allocation.addInterval(new ReservationInterval(10 * step, 13 * step),
Resource.newInstance(1024, 1));
// curAlloc
Map<ReservationInterval, Resource> curAlloc =
new HashMap<ReservationInterval, Resource>();
//
// Check cases
//
// 1. allocateLeft = false, succeed when there is no gap
curAlloc.clear();
curAlloc.put(new ReservationInterval(9 * step, 10 * step),
Resource.newInstance(1024, 1));
assertTrue("validateOrderNoFap() should have suceeded",
IterativePlanner.validateOrderNoGap(allocation, curAlloc, false));
// 2. allocateLeft = false, fail when curAlloc has a gap
curAlloc.put(new ReservationInterval(7 * step, 8 * step),
Resource.newInstance(1024, 1));
assertFalse("validateOrderNoGap() failed to identify a gap in curAlloc",
IterativePlanner.validateOrderNoGap(allocation, curAlloc, false));
// 3. allocateLeft = false, fail when there is a gap between curAlloc and
// allocations
curAlloc.clear();
curAlloc.put(new ReservationInterval(8 * step, 9 * step),
Resource.newInstance(1024, 1));
assertFalse("validateOrderNoGap() failed to identify a gap between "
+ "allocations and curAlloc",
IterativePlanner.validateOrderNoGap(allocation, curAlloc, false));
// 4. allocateLeft = true, succeed when there is no gap
curAlloc.clear();
curAlloc.put(new ReservationInterval(13 * step, 14 * step),
Resource.newInstance(1024, 1));
assertTrue("validateOrderNoFap() should have suceeded",
IterativePlanner.validateOrderNoGap(allocation, curAlloc, true));
// 5. allocateLeft = true, fail when there is a gap between curAlloc and
// allocations
curAlloc.put(new ReservationInterval(15 * step, 16 * step),
Resource.newInstance(1024, 1));
assertFalse("validateOrderNoGap() failed to identify a gap in curAlloc",
IterativePlanner.validateOrderNoGap(allocation, curAlloc, true));
// 6. allocateLeft = true, fail when curAlloc has a gap
curAlloc.clear();
curAlloc.put(new ReservationInterval(14 * step, 15 * step),
Resource.newInstance(1024, 1));
assertFalse("validateOrderNoGap() failed to identify a gap between "
+ "allocations and curAlloc",
IterativePlanner.validateOrderNoGap(allocation, curAlloc, true));
}
@Test
public void testGetDurationInterval() throws PlanningException {
DurationInterval durationInterval = null;
// Create netRLERes:
// - 4GB & 4VC between [10,20) and [30,40)
// - 8GB & 8VC between [20,30)
RLESparseResourceAllocation netRLERes =
new RLESparseResourceAllocation(res);
netRLERes.addInterval(
new ReservationInterval(10 * step, 40 * step),
Resource.newInstance(4096, 4)
);
netRLERes.addInterval(
new ReservationInterval(20 * step, 30 * step),
Resource.newInstance(4096, 4)
);
// Create planLoads:
// - 5GB & 5VC between [20,30)
RLESparseResourceAllocation planLoads =
new RLESparseResourceAllocation(res);
planLoads.addInterval(
new ReservationInterval(20 * step, 30 * step),
Resource.newInstance(5120, 5)
);
// Create planModifications:
// - 1GB & 1VC between [25,35)
RLESparseResourceAllocation planModifications =
new RLESparseResourceAllocation(res);
planModifications.addInterval(
new ReservationInterval(25 * step, 35 * step),
Resource.newInstance(1024, 1)
);
// Set requested resources
Resource requestedResources = Resource.newInstance(1024, 1);
// 1.
// currLoad: should start at 20*step, end at 30*step with a null value
// (in getTotalCost(), after the for loop we will have loadPrev == null
// netAvailableResources: should start exactly at startTime (10*step),
// end exactly at endTime (30*step) with a null value
durationInterval =
StageAllocatorLowCostAligned.getDurationInterval(10*step, 30*step,
planLoads, planModifications, clusterCapacity, netRLERes, res, step,
requestedResources);
assertEquals(durationInterval.numCanFit(), 4);
assertEquals(durationInterval.getTotalCost(), 0.55, 0.00001);
// 2.
// currLoad: should start at 20*step, end at 31*step with a null value
// (in getTotalCost, after the for loop we will have loadPrev == null)
// netAvailableResources: should start exactly at startTime (10*step),
// end exactly at endTime (31*step) with a null value
durationInterval =
StageAllocatorLowCostAligned.getDurationInterval(10*step, 31*step,
planLoads, planModifications, clusterCapacity, netRLERes, res, step,
requestedResources);
System.out.println(durationInterval);
assertEquals(durationInterval.numCanFit(), 3);
assertEquals(durationInterval.getTotalCost(), 0.56, 0.00001);
// 3.
// currLoad: should start at 20*step, end at 30*step with a null value
// (in getTotalCost, after the for loop we will have loadPrev == null)
// netAvailableResources: should start exactly startTime (15*step),
// end exactly at endTime (30*step) with a null value
durationInterval =
StageAllocatorLowCostAligned.getDurationInterval(15*step, 30*step,
planLoads, planModifications, clusterCapacity, netRLERes, res, step,
requestedResources);
assertEquals(durationInterval.numCanFit(), 4);
assertEquals(durationInterval.getTotalCost(), 0.55, 0.00001);
// 4.
// currLoad: should start at 20*step, end at 31*step with a null value
// (in getTotalCost, after the for loop we will have loadPrev == null)
// netAvailableResources: should start exactly at startTime (15*step),
// end exactly at endTime (31*step) with a value other than null
durationInterval =
StageAllocatorLowCostAligned.getDurationInterval(15*step, 31*step,
planLoads, planModifications, clusterCapacity, netRLERes, res, step,
requestedResources);
System.out.println(durationInterval);
assertEquals(durationInterval.numCanFit(), 3);
assertEquals(durationInterval.getTotalCost(), 0.56, 0.00001);
// 5.
// currLoad: should only contain one entry at startTime
// (22*step), therefore loadPrev != null and we should enter the if
// condition after the for loop in getTotalCost
// netAvailableResources: should only contain one entry at startTime
// (22*step)
durationInterval =
StageAllocatorLowCostAligned.getDurationInterval(22*step, 23*step,
planLoads, planModifications, clusterCapacity, netRLERes, res, step,
requestedResources);
System.out.println(durationInterval);
assertEquals(durationInterval.numCanFit(), 8);
assertEquals(durationInterval.getTotalCost(), 0.05, 0.00001);
// 6.
// currLoad: should start at 39*step, end at 41*step with a null value
// (in getTotalCost, after the for loop we will have loadPrev == null)
// netAvailableResources: should start exactly at startTime (39*step),
// end exactly at endTime (41*step) with a null value
durationInterval =
StageAllocatorLowCostAligned.getDurationInterval(39*step, 41*step,
planLoads, planModifications, clusterCapacity, netRLERes, res, step,
requestedResources);
System.out.println(durationInterval);
assertEquals(durationInterval.numCanFit(), 0);
assertEquals(durationInterval.getTotalCost(), 0, 0.00001);
}
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
// Initialize random seed // Initialize random seed
long seed = rand.nextLong(); long seed = rand.nextLong();
rand.setSeed(seed); rand.setSeed(seed);
Log.info("Running with seed: " + seed); Log.getLog().info("Running with seed: " + seed);
// Set cluster parameters // Set cluster parameters
long timeWindow = 1000000L; long timeWindow = 1000000L;
@ -709,16 +1038,15 @@ public class TestAlignedPlanner {
int capacityCores = 100; int capacityCores = 100;
step = 60000L; step = 60000L;
Resource clusterCapacity = Resource.newInstance(capacityMem, capacityCores); clusterCapacity = Resource.newInstance(capacityMem, capacityCores);
String reservationQ = String reservationQ =
ReservationSystemTestUtil.getFullReservationQueueName(); ReservationSystemTestUtil.getFullReservationQueueName();
float instConstraint = 100; float instConstraint = 100;
float avgConstraint = 100; float avgConstraint = 100;
ReservationSchedulerConfiguration conf = ReservationSchedulerConfiguration conf = ReservationSystemTestUtil
ReservationSystemTestUtil.createConf(reservationQ, timeWindow, .createConf(reservationQ, timeWindow, instConstraint, avgConstraint);
instConstraint, avgConstraint);
CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
policy.init(reservationQ, conf); policy.init(reservationQ, conf);
@ -728,14 +1056,19 @@ public class TestAlignedPlanner {
conf.setInt(AlignedPlannerWithGreedy.SMOOTHNESS_FACTOR, conf.setInt(AlignedPlannerWithGreedy.SMOOTHNESS_FACTOR,
AlignedPlannerWithGreedy.DEFAULT_SMOOTHNESS_FACTOR); AlignedPlannerWithGreedy.DEFAULT_SMOOTHNESS_FACTOR);
conf.setBoolean(ReservationAgent.FAVOR_EARLY_ALLOCATION, false);
// Set planning agent // Set planning agent
agent = new AlignedPlannerWithGreedy(); agentRight = new AlignedPlannerWithGreedy();
agent.init(conf); agentRight.init(conf);
conf.setBoolean(ReservationAgent.FAVOR_EARLY_ALLOCATION, true);
agentLeft = new AlignedPlannerWithGreedy();
agentLeft.init(conf);
// Create Plan // Create Plan
plan = plan = new InMemoryPlan(queueMetrics, policy, agentRight, clusterCapacity,
new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step, step, res, minAlloc, maxAlloc, "dedicated", null, true, context);
res, minAlloc, maxAlloc, "dedicated", null, true, context);
} }
private int initializeScenario1() throws PlanningException { private int initializeScenario1() throws PlanningException {

View File

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -55,12 +55,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.eclipse.jetty.util.log.Log;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameters;
import org.mortbay.log.Log;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class TestGreedyReservationAgent { public class TestGreedyReservationAgent {
@ -89,7 +89,7 @@ public class TestGreedyReservationAgent {
long seed = rand.nextLong(); long seed = rand.nextLong();
rand.setSeed(seed); rand.setSeed(seed);
Log.info("Running with seed: " + seed); Log.getLog().info("Running with seed: " + seed);
// setting completely loose quotas // setting completely loose quotas
long timeWindow = 1000000L; long timeWindow = 1000000L;
@ -108,7 +108,7 @@ public class TestGreedyReservationAgent {
policy.init(reservationQ, conf); policy.init(reservationQ, conf);
// setting conf to // setting conf to
conf.setBoolean(GreedyReservationAgent.GREEDY_FAVOR_EARLY_ALLOCATION, conf.setBoolean(GreedyReservationAgent.FAVOR_EARLY_ALLOCATION,
allocateLeft); allocateLeft);
agent = new GreedyReservationAgent(); agent = new GreedyReservationAgent();
agent.init(conf); agent.init(conf);

View File

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.