YARN-4359. Update LowCost agents logic to take advantage of YARN-4358. (Jonathan Yaniv and Ishai Menache via Subru).
This commit is contained in:
parent
14b5c93f3c
commit
a3a615eeab
|
@ -687,4 +687,15 @@ public class InMemoryPlan implements Plan {
|
|||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RLESparseResourceAllocation getCumulativeLoadOverTime(
|
||||
long start, long end) {
|
||||
readLock.lock();
|
||||
try {
|
||||
return rleSparseVector.getRangeOverlapping(start, end);
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -174,4 +174,13 @@ public interface PlanView extends PlanContext {
|
|||
public RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
|
||||
long start, long end);
|
||||
|
||||
/**
|
||||
* Get the cumulative load over a time interval.
|
||||
*
|
||||
* @param start Start of the time interval.
|
||||
* @param end End of the time interval.
|
||||
* @return RLE sparse allocation.
|
||||
*/
|
||||
RLESparseResourceAllocation getCumulativeLoadOverTime(long start, long end);
|
||||
|
||||
}
|
||||
|
|
|
@ -39,6 +39,8 @@ public class AlignedPlannerWithGreedy implements ReservationAgent {
|
|||
public static final int DEFAULT_SMOOTHNESS_FACTOR = 10;
|
||||
public static final String SMOOTHNESS_FACTOR =
|
||||
"yarn.resourcemanager.reservation-system.smoothness-factor";
|
||||
private boolean allocateLeft = false;
|
||||
|
||||
|
||||
// Log
|
||||
private static final Logger LOG = LoggerFactory
|
||||
|
@ -49,26 +51,31 @@ public class AlignedPlannerWithGreedy implements ReservationAgent {
|
|||
|
||||
// Constructor
|
||||
public AlignedPlannerWithGreedy() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Configuration conf) {
|
||||
int smoothnessFactor =
|
||||
conf.getInt(SMOOTHNESS_FACTOR, DEFAULT_SMOOTHNESS_FACTOR);
|
||||
allocateLeft = conf.getBoolean(FAVOR_EARLY_ALLOCATION,
|
||||
DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION);
|
||||
|
||||
// List of algorithms
|
||||
List<ReservationAgent> listAlg = new LinkedList<ReservationAgent>();
|
||||
|
||||
// LowCostAligned planning algorithm
|
||||
ReservationAgent algAligned =
|
||||
new IterativePlanner(new StageEarliestStartByDemand(),
|
||||
new StageAllocatorLowCostAligned(smoothnessFactor), false);
|
||||
new IterativePlanner(new StageExecutionIntervalByDemand(),
|
||||
new StageAllocatorLowCostAligned(smoothnessFactor, allocateLeft),
|
||||
allocateLeft);
|
||||
|
||||
listAlg.add(algAligned);
|
||||
|
||||
// Greedy planning algorithm
|
||||
ReservationAgent algGreedy =
|
||||
new IterativePlanner(new StageEarliestStartByJobArrival(),
|
||||
new StageAllocatorGreedy(), false);
|
||||
new IterativePlanner(new StageExecutionIntervalUnconstrained(),
|
||||
new StageAllocatorGreedyRLE(allocateLeft), allocateLeft);
|
||||
listAlg.add(algGreedy);
|
||||
|
||||
// Set planner:
|
||||
|
|
|
@ -47,9 +47,6 @@ public class GreedyReservationAgent implements ReservationAgent {
|
|||
|
||||
// Greedy planner
|
||||
private ReservationAgent planner;
|
||||
public final static String GREEDY_FAVOR_EARLY_ALLOCATION =
|
||||
"yarn.resourcemanager.reservation-system.favor-early-allocation";
|
||||
public final static boolean DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION = true;
|
||||
private boolean allocateLeft;
|
||||
|
||||
public GreedyReservationAgent() {
|
||||
|
@ -57,20 +54,20 @@ public class GreedyReservationAgent implements ReservationAgent {
|
|||
|
||||
@Override
|
||||
public void init(Configuration conf) {
|
||||
allocateLeft = conf.getBoolean(GREEDY_FAVOR_EARLY_ALLOCATION,
|
||||
allocateLeft = conf.getBoolean(FAVOR_EARLY_ALLOCATION,
|
||||
DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION);
|
||||
if (allocateLeft) {
|
||||
LOG.info("Initializing the GreedyReservationAgent to favor \"early\""
|
||||
+ " (left) allocations (controlled by parameter: "
|
||||
+ GREEDY_FAVOR_EARLY_ALLOCATION + ")");
|
||||
+ FAVOR_EARLY_ALLOCATION + ")");
|
||||
} else {
|
||||
LOG.info("Initializing the GreedyReservationAgent to favor \"late\""
|
||||
+ " (right) allocations (controlled by parameter: "
|
||||
+ GREEDY_FAVOR_EARLY_ALLOCATION + ")");
|
||||
+ FAVOR_EARLY_ALLOCATION + ")");
|
||||
}
|
||||
|
||||
planner =
|
||||
new IterativePlanner(new StageEarliestStartByJobArrival(),
|
||||
new IterativePlanner(new StageExecutionIntervalUnconstrained(),
|
||||
new StageAllocatorGreedyRLE(allocateLeft), allocateLeft);
|
||||
}
|
||||
|
||||
|
@ -123,4 +120,4 @@ public class GreedyReservationAgent implements ReservationAgent {
|
|||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Map;
|
||||
|
@ -32,26 +31,24 @@ import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
/**
|
||||
* A planning algorithm consisting of two main phases. The algorithm iterates
|
||||
* over the job stages in descending order. For each stage, the algorithm: 1.
|
||||
* Determines an interval [stageArrivalTime, stageDeadline) in which the stage
|
||||
* is allocated. 2. Computes an allocation for the stage inside the interval.
|
||||
*
|
||||
* For ANY and ALL jobs, phase 1 sets the allocation window of each stage to be
|
||||
* [jobArrival, jobDeadline]. For ORDER and ORDER_NO_GAP jobs, the deadline of
|
||||
* each stage is set as succcessorStartTime - the starting time of its
|
||||
* succeeding stage (or jobDeadline if it is the last stage).
|
||||
*
|
||||
* The phases are set using the two functions: 1. setAlgEarliestStartTime 2.
|
||||
* setAlgComputeStageAllocation
|
||||
* over the job stages in ascending/descending order, depending on the flag
|
||||
* allocateLeft. For each stage, the algorithm: 1. Determines an interval
|
||||
* [stageArrival, stageDeadline) in which the stage is allocated. 2. Computes an
|
||||
* allocation for the stage inside the interval. For ANY and ALL jobs, phase 1
|
||||
* sets the allocation window of each stage to be [jobArrival, jobDeadline]. For
|
||||
* ORDER and ORDER_NO_GAP jobs, the deadline of each stage is set as
|
||||
* succcessorStartTime - the starting time of its succeeding stage (or
|
||||
* jobDeadline if it is the last stage). The phases are set using the two
|
||||
* functions: 1. setAlgStageExecutionInterval 2.setAlgStageAllocator
|
||||
*/
|
||||
public class IterativePlanner extends PlanningAlgorithm {
|
||||
|
||||
|
@ -60,7 +57,7 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
private RLESparseResourceAllocation planModifications;
|
||||
|
||||
// Data extracted from plan
|
||||
private Map<Long, Resource> planLoads;
|
||||
private RLESparseResourceAllocation planLoads;
|
||||
private Resource capacity;
|
||||
private long step;
|
||||
|
||||
|
@ -70,16 +67,16 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
private long jobDeadline;
|
||||
|
||||
// Phase algorithms
|
||||
private StageEarliestStart algStageEarliestStart = null;
|
||||
private StageExecutionInterval algStageExecutionInterval = null;
|
||||
private StageAllocator algStageAllocator = null;
|
||||
private final boolean allocateLeft;
|
||||
|
||||
// Constructor
|
||||
public IterativePlanner(StageEarliestStart algEarliestStartTime,
|
||||
public IterativePlanner(StageExecutionInterval algStageExecutionInterval,
|
||||
StageAllocator algStageAllocator, boolean allocateLeft) {
|
||||
|
||||
this.allocateLeft = allocateLeft;
|
||||
setAlgStageEarliestStart(algEarliestStartTime);
|
||||
setAlgStageExecutionInterval(algStageExecutionInterval);
|
||||
setAlgStageAllocator(algStageAllocator);
|
||||
|
||||
}
|
||||
|
@ -101,12 +98,6 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
// Current stage
|
||||
ReservationRequest currentReservationStage;
|
||||
|
||||
// Stage deadlines
|
||||
long stageDeadline = stepRoundDown(reservation.getDeadline(), step);
|
||||
long successorStartingTime = -1;
|
||||
long predecessorEndTime = stepRoundDown(reservation.getArrival(), step);
|
||||
long stageArrivalTime = -1;
|
||||
|
||||
// Iterate the stages in reverse order
|
||||
while (stageProvider.hasNext()) {
|
||||
|
||||
|
@ -116,27 +107,17 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
// Validate that the ReservationRequest respects basic constraints
|
||||
validateInputStage(plan, currentReservationStage);
|
||||
|
||||
// Compute an adjusted earliestStart for this resource
|
||||
// (we need this to provision some space for the ORDER contracts)
|
||||
// Set the stageArrival and stageDeadline
|
||||
ReservationInterval stageInterval =
|
||||
setStageExecutionInterval(plan, reservation, currentReservationStage,
|
||||
allocations);
|
||||
Long stageArrival = stageInterval.getStartTime();
|
||||
Long stageDeadline = stageInterval.getEndTime();
|
||||
|
||||
if (allocateLeft) {
|
||||
stageArrivalTime = predecessorEndTime;
|
||||
} else {
|
||||
stageArrivalTime = reservation.getArrival();
|
||||
if (jobType == ReservationRequestInterpreter.R_ORDER
|
||||
|| jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
|
||||
stageArrivalTime =
|
||||
computeEarliestStartingTime(plan, reservation,
|
||||
stageProvider.getCurrentIndex(), currentReservationStage,
|
||||
stageDeadline);
|
||||
}
|
||||
stageArrivalTime = stepRoundUp(stageArrivalTime, step);
|
||||
stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival());
|
||||
}
|
||||
// Compute the allocation of a single stage
|
||||
// Compute stage allocation
|
||||
Map<ReservationInterval, Resource> curAlloc =
|
||||
computeStageAllocation(plan, currentReservationStage,
|
||||
stageArrivalTime, stageDeadline, user, reservationId);
|
||||
computeStageAllocation(plan, currentReservationStage, stageArrival,
|
||||
stageDeadline, user, reservationId);
|
||||
|
||||
// If we did not find an allocation, return NULL
|
||||
// (unless it's an ANY job, then we simply continue).
|
||||
|
@ -152,9 +133,13 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
|
||||
}
|
||||
|
||||
// Get the start & end time of the current allocation
|
||||
Long stageStartTime = findEarliestTime(curAlloc);
|
||||
Long stageEndTime = findLatestTime(curAlloc);
|
||||
// Validate ORDER_NO_GAP
|
||||
if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
|
||||
if (!validateOrderNoGap(allocations, curAlloc, allocateLeft)) {
|
||||
throw new PlanningException(
|
||||
"The allocation found does not respect ORDER_NO_GAP");
|
||||
}
|
||||
}
|
||||
|
||||
// If we did find an allocation for the stage, add it
|
||||
for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
|
||||
|
@ -165,33 +150,6 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
if (jobType == ReservationRequestInterpreter.R_ANY) {
|
||||
break;
|
||||
}
|
||||
|
||||
// If ORDER job, set the stageDeadline of the next stage to be processed
|
||||
if (jobType == ReservationRequestInterpreter.R_ORDER
|
||||
|| jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
|
||||
|
||||
// CHECK ORDER_NO_GAP
|
||||
// Verify that there is no gap, in case the job is ORDER_NO_GAP
|
||||
// note that the test is different left-to-right and right-to-left
|
||||
if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP
|
||||
&& successorStartingTime != -1
|
||||
&& ((allocateLeft && predecessorEndTime < stageStartTime) ||
|
||||
(!allocateLeft && (stageEndTime < successorStartingTime))
|
||||
)
|
||||
|| (!isNonPreemptiveAllocation(curAlloc))) {
|
||||
throw new PlanningException(
|
||||
"The allocation found does not respect ORDER_NO_GAP");
|
||||
}
|
||||
|
||||
if (allocateLeft) {
|
||||
// Store the stageStartTime and set the new stageDeadline
|
||||
predecessorEndTime = stageEndTime;
|
||||
} else {
|
||||
// Store the stageStartTime and set the new stageDeadline
|
||||
successorStartingTime = stageStartTime;
|
||||
stageDeadline = stageStartTime;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If the allocation is empty, return an error
|
||||
|
@ -200,7 +158,39 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
}
|
||||
|
||||
return allocations;
|
||||
}
|
||||
|
||||
protected static boolean validateOrderNoGap(
|
||||
RLESparseResourceAllocation allocations,
|
||||
Map<ReservationInterval, Resource> curAlloc, boolean allocateLeft) {
|
||||
|
||||
// Left to right
|
||||
if (allocateLeft) {
|
||||
Long stageStartTime = findEarliestTime(curAlloc);
|
||||
Long allocationEndTime = allocations.getLatestNonNullTime();
|
||||
|
||||
// Check that there is no gap between stages
|
||||
if ((allocationEndTime != -1) && (allocationEndTime < stageStartTime)) {
|
||||
return false;
|
||||
}
|
||||
// Right to left
|
||||
} else {
|
||||
Long stageEndTime = findLatestTime(curAlloc);
|
||||
Long allocationStartTime = allocations.getEarliestStartTime();
|
||||
|
||||
// Check that there is no gap between stages
|
||||
if ((allocationStartTime != -1) && (stageEndTime < allocationStartTime)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Check that the stage allocation does not violate ORDER_NO_GAP
|
||||
if (!isNonPreemptiveAllocation(curAlloc)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// The allocation is legal
|
||||
return true;
|
||||
}
|
||||
|
||||
protected void initialize(Plan plan, ReservationId reservationId,
|
||||
|
@ -223,35 +213,15 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
|
||||
// planLoads are not used by other StageAllocators... and don't deal
|
||||
// well with huge reservation ranges
|
||||
if (this.algStageAllocator instanceof StageAllocatorLowCostAligned) {
|
||||
planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline);
|
||||
ReservationAllocation oldRes = plan.getReservationById(reservationId);
|
||||
if (oldRes != null) {
|
||||
planModifications =
|
||||
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
|
||||
plan.getTotalCapacity(), planModifications,
|
||||
oldRes.getResourcesOverTime(), RLEOperator.subtract,
|
||||
jobArrival, jobDeadline);
|
||||
}
|
||||
planLoads = plan.getCumulativeLoadOverTime(jobArrival, jobDeadline);
|
||||
ReservationAllocation oldRes = plan.getReservationById(reservationId);
|
||||
if (oldRes != null) {
|
||||
planLoads =
|
||||
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
|
||||
plan.getTotalCapacity(), planLoads,
|
||||
oldRes.getResourcesOverTime(), RLEOperator.subtract, jobArrival,
|
||||
jobDeadline);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private Map<Long, Resource> getAllLoadsInInterval(Plan plan, long startTime,
|
||||
long endTime) {
|
||||
|
||||
// Create map
|
||||
Map<Long, Resource> loads = new HashMap<Long, Resource>();
|
||||
|
||||
// Calculate the load for every time slot between [start,end)
|
||||
for (long t = startTime; t < endTime; t += step) {
|
||||
Resource load = plan.getTotalCommittedResources(t);
|
||||
loads.put(t, load);
|
||||
}
|
||||
|
||||
// Return map
|
||||
return loads;
|
||||
|
||||
}
|
||||
|
||||
private void validateInputStage(Plan plan, ReservationRequest rr)
|
||||
|
@ -286,7 +256,7 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
|
||||
}
|
||||
|
||||
private boolean isNonPreemptiveAllocation(
|
||||
private static boolean isNonPreemptiveAllocation(
|
||||
Map<ReservationInterval, Resource> curAlloc) {
|
||||
|
||||
// Checks whether a stage allocation is non preemptive or not.
|
||||
|
@ -329,14 +299,13 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
|
||||
}
|
||||
|
||||
// Call algEarliestStartTime()
|
||||
protected long computeEarliestStartingTime(Plan plan,
|
||||
ReservationDefinition reservation, int index,
|
||||
ReservationRequest currentReservationStage, long stageDeadline) {
|
||||
|
||||
return algStageEarliestStart.setEarliestStartTime(plan, reservation, index,
|
||||
currentReservationStage, stageDeadline);
|
||||
|
||||
// Call setStageExecutionInterval()
|
||||
protected ReservationInterval setStageExecutionInterval(Plan plan,
|
||||
ReservationDefinition reservation,
|
||||
ReservationRequest currentReservationStage,
|
||||
RLESparseResourceAllocation allocations) {
|
||||
return algStageExecutionInterval.computeExecutionInterval(plan,
|
||||
reservation, currentReservationStage, allocateLeft, allocations);
|
||||
}
|
||||
|
||||
// Call algStageAllocator
|
||||
|
@ -350,10 +319,11 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
|
||||
}
|
||||
|
||||
// Set the algorithm: algStageEarliestStart
|
||||
public IterativePlanner setAlgStageEarliestStart(StageEarliestStart alg) {
|
||||
// Set the algorithm: algStageExecutionInterval
|
||||
public IterativePlanner setAlgStageExecutionInterval(
|
||||
StageExecutionInterval alg) {
|
||||
|
||||
this.algStageEarliestStart = alg;
|
||||
this.algStageExecutionInterval = alg;
|
||||
return this; // To allow concatenation of setAlg() functions
|
||||
|
||||
}
|
||||
|
@ -375,7 +345,7 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
|
||||
private final boolean allocateLeft;
|
||||
|
||||
private ListIterator<ReservationRequest> li;
|
||||
private final ListIterator<ReservationRequest> li;
|
||||
|
||||
public StageProvider(boolean allocateLeft,
|
||||
ReservationDefinition reservation) {
|
||||
|
|
|
@ -28,15 +28,26 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan
|
|||
*/
|
||||
public interface ReservationAgent {
|
||||
|
||||
/**
|
||||
* Constant defining the preferential treatment of time for equally valid
|
||||
* allocations.
|
||||
*/
|
||||
final static String FAVOR_EARLY_ALLOCATION =
|
||||
"yarn.resourcemanager.reservation-system.favor-early-allocation";
|
||||
/**
|
||||
* By default favor early allocations.
|
||||
*/
|
||||
final static boolean DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION = true;
|
||||
|
||||
/**
|
||||
* Create a reservation for the user that abides by the specified contract
|
||||
*
|
||||
*
|
||||
* @param reservationId the identifier of the reservation to be created.
|
||||
* @param user the user who wants to create the reservation
|
||||
* @param plan the Plan to which the reservation must be fitted
|
||||
* @param contract encapsulates the resources the user requires for his
|
||||
* session
|
||||
*
|
||||
*
|
||||
* @return whether the create operation was successful or not
|
||||
* @throws PlanningException if the session cannot be fitted into the plan
|
||||
*/
|
||||
|
@ -45,13 +56,13 @@ public interface ReservationAgent {
|
|||
|
||||
/**
|
||||
* Update a reservation for the user that abides by the specified contract
|
||||
*
|
||||
*
|
||||
* @param reservationId the identifier of the reservation to be updated
|
||||
* @param user the user who wants to create the session
|
||||
* @param plan the Plan to which the reservation must be fitted
|
||||
* @param contract encapsulates the resources the user requires for his
|
||||
* reservation
|
||||
*
|
||||
*
|
||||
* @return whether the update operation was successful or not
|
||||
* @throws PlanningException if the reservation cannot be fitted into the plan
|
||||
*/
|
||||
|
@ -60,11 +71,11 @@ public interface ReservationAgent {
|
|||
|
||||
/**
|
||||
* Delete an user reservation
|
||||
*
|
||||
*
|
||||
* @param reservationId the identifier of the reservation to be deleted
|
||||
* @param user the user who wants to create the reservation
|
||||
* @param plan the Plan to which the session must be fitted
|
||||
*
|
||||
*
|
||||
* @return whether the delete operation was successful or not
|
||||
* @throws PlanningException if the reservation cannot be fitted into the plan
|
||||
*/
|
||||
|
|
|
@ -42,7 +42,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
* This (re)planner scan a period of time from now to a maximum time window (or
|
||||
* the end of the last session, whichever comes first) checking the overall
|
||||
* capacity is not violated.
|
||||
*
|
||||
*
|
||||
* It greedily removes sessions in reversed order of acceptance (latest accepted
|
||||
* is the first removed).
|
||||
*/
|
||||
|
@ -90,8 +90,8 @@ public class SimpleCapacityReplanner implements Planner {
|
|||
|
||||
// loop on all moment in time from now to the end of the check Zone
|
||||
// or the end of the planned sessions whichever comes first
|
||||
for (long t = now;
|
||||
(t < plan.getLastEndTime() && t < (now + lengthOfCheckZone));
|
||||
for (long t = now;
|
||||
(t < plan.getLastEndTime() && t < (now + lengthOfCheckZone));
|
||||
t += plan.getStep()) {
|
||||
Resource excessCap =
|
||||
Resources.subtract(plan.getTotalCommittedResources(t), totCap);
|
||||
|
@ -102,7 +102,7 @@ public class SimpleCapacityReplanner implements Planner {
|
|||
new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t));
|
||||
for (Iterator<ReservationAllocation> resIter =
|
||||
curReservations.iterator(); resIter.hasNext()
|
||||
&& Resources.greaterThan(resCalc, totCap, excessCap,
|
||||
&& Resources.greaterThan(resCalc, totCap, excessCap,
|
||||
ZERO_RESOURCE);) {
|
||||
ReservationAllocation reservation = resIter.next();
|
||||
plan.deleteReservation(reservation.getReservationId());
|
||||
|
|
|
@ -41,19 +41,21 @@ public interface StageAllocator {
|
|||
* @param planModifications the allocations performed by the planning
|
||||
* algorithm which are not yet reflected by plan
|
||||
* @param rr the stage
|
||||
* @param stageEarliestStart the arrival time (earliest starting time) set for
|
||||
* @param stageArrival the arrival time (earliest starting time) set for
|
||||
* the stage by the two phase planning algorithm
|
||||
* @param stageDeadline the deadline of the stage set by the two phase
|
||||
* planning algorithm
|
||||
* @param user name of the user
|
||||
* @param oldId identifier of the old reservation
|
||||
*
|
||||
* @return The computed allocation (or null if the stage could not be
|
||||
* allocated)
|
||||
* @throws PlanningException
|
||||
*/
|
||||
Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
|
||||
Map<Long, Resource> planLoads,
|
||||
RLESparseResourceAllocation planLoads,
|
||||
RLESparseResourceAllocation planModifications, ReservationRequest rr,
|
||||
long stageEarliestStart, long stageDeadline, String user,
|
||||
long stageArrival, long stageDeadline, String user,
|
||||
ReservationId oldId) throws PlanningException;
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,8 +26,8 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
|
@ -41,7 +41,7 @@ public class StageAllocatorGreedy implements StageAllocator {
|
|||
|
||||
@Override
|
||||
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
|
||||
Map<Long, Resource> planLoads,
|
||||
RLESparseResourceAllocation planLoads,
|
||||
RLESparseResourceAllocation planModifications, ReservationRequest rr,
|
||||
long stageEarliestStart, long stageDeadline, String user,
|
||||
ReservationId oldId) throws PlanningException {
|
||||
|
|
|
@ -29,8 +29,8 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
|
@ -52,7 +52,7 @@ public class StageAllocatorGreedyRLE implements StageAllocator {
|
|||
|
||||
@Override
|
||||
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
|
||||
Map<Long, Resource> planLoads,
|
||||
RLESparseResourceAllocation planLoads,
|
||||
RLESparseResourceAllocation planModifications, ReservationRequest rr,
|
||||
long stageEarliestStart, long stageDeadline, String user,
|
||||
ReservationId oldId) throws PlanningException {
|
||||
|
|
|
@ -18,8 +18,12 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
|
@ -27,46 +31,55 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
/**
|
||||
* A stage allocator that iteratively allocates containers in the
|
||||
* {@link DurationInterval} with lowest overall cost. The algorithm only
|
||||
* considers intervals of the form: [stageDeadline - (n+1)*duration,
|
||||
* stageDeadline - n*duration) for an integer n. This guarantees that the
|
||||
* allocations are aligned (as opposed to overlapping duration intervals).
|
||||
*
|
||||
* The smoothnessFactor parameter controls the number of containers that are
|
||||
* simultaneously allocated in each iteration of the algorithm.
|
||||
* considers non-overlapping intervals of length 'duration'. This guarantees
|
||||
* that the allocations are aligned. If 'allocateLeft == true', the intervals
|
||||
* considered by the algorithm are aligned to stageArrival; otherwise, they are
|
||||
* aligned to stageDeadline. The smoothnessFactor parameter controls the number
|
||||
* of containers that are simultaneously allocated in each iteration of the
|
||||
* algorithm.
|
||||
*/
|
||||
|
||||
public class StageAllocatorLowCostAligned implements StageAllocator {
|
||||
|
||||
private final boolean allocateLeft;
|
||||
// Smoothness factor
|
||||
private int smoothnessFactor = 10;
|
||||
|
||||
// Constructor
|
||||
public StageAllocatorLowCostAligned() {
|
||||
public StageAllocatorLowCostAligned(boolean allocateLeft) {
|
||||
this.allocateLeft = allocateLeft;
|
||||
}
|
||||
|
||||
// Constructor
|
||||
public StageAllocatorLowCostAligned(int smoothnessFactor) {
|
||||
public StageAllocatorLowCostAligned(int smoothnessFactor,
|
||||
boolean allocateLeft) {
|
||||
this.allocateLeft = allocateLeft;
|
||||
this.smoothnessFactor = smoothnessFactor;
|
||||
}
|
||||
|
||||
// computeJobAllocation()
|
||||
@Override
|
||||
public Map<ReservationInterval, Resource> computeStageAllocation(
|
||||
Plan plan, Map<Long, Resource> planLoads,
|
||||
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
|
||||
RLESparseResourceAllocation planLoads,
|
||||
RLESparseResourceAllocation planModifications, ReservationRequest rr,
|
||||
long stageEarliestStart, long stageDeadline, String user,
|
||||
ReservationId oldId) {
|
||||
long stageArrival, long stageDeadline, String user, ReservationId oldId)
|
||||
throws PlanningException {
|
||||
|
||||
// Initialize
|
||||
ResourceCalculator resCalc = plan.getResourceCalculator();
|
||||
Resource capacity = plan.getTotalCapacity();
|
||||
|
||||
RLESparseResourceAllocation netRLERes = plan
|
||||
.getAvailableResourceOverTime(user, oldId, stageArrival, stageDeadline);
|
||||
|
||||
long step = plan.getStep();
|
||||
|
||||
// Create allocationRequestsearlies
|
||||
|
@ -76,16 +89,15 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
|
|||
// Initialize parameters
|
||||
long duration = stepRoundUp(rr.getDuration(), step);
|
||||
int windowSizeInDurations =
|
||||
(int) ((stageDeadline - stageEarliestStart) / duration);
|
||||
(int) ((stageDeadline - stageArrival) / duration);
|
||||
int totalGangs = rr.getNumContainers() / rr.getConcurrency();
|
||||
int numContainersPerGang = rr.getConcurrency();
|
||||
Resource gang =
|
||||
Resources.multiply(rr.getCapability(), numContainersPerGang);
|
||||
|
||||
// Set maxGangsPerUnit
|
||||
int maxGangsPerUnit =
|
||||
(int) Math.max(
|
||||
Math.floor(((double) totalGangs) / windowSizeInDurations), 1);
|
||||
int maxGangsPerUnit = (int) Math
|
||||
.max(Math.floor(((double) totalGangs) / windowSizeInDurations), 1);
|
||||
maxGangsPerUnit = Math.max(maxGangsPerUnit / smoothnessFactor, 1);
|
||||
|
||||
// If window size is too small, return null
|
||||
|
@ -93,6 +105,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
|
|||
return null;
|
||||
}
|
||||
|
||||
final int preferLeft = allocateLeft ? 1 : -1;
|
||||
|
||||
// Initialize tree sorted by costs
|
||||
TreeSet<DurationInterval> durationIntervalsSortedByCost =
|
||||
new TreeSet<DurationInterval>(new Comparator<DurationInterval>() {
|
||||
|
@ -104,23 +118,26 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
|
|||
return cmp;
|
||||
}
|
||||
|
||||
return (-1) * Long.compare(val1.getEndTime(), val2.getEndTime());
|
||||
return preferLeft
|
||||
* Long.compare(val1.getEndTime(), val2.getEndTime());
|
||||
}
|
||||
});
|
||||
|
||||
List<Long> intervalEndTimes =
|
||||
computeIntervalEndTimes(stageArrival, stageDeadline, duration);
|
||||
|
||||
// Add durationIntervals that end at (endTime - n*duration) for some n.
|
||||
for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart
|
||||
+ duration; intervalEnd -= duration) {
|
||||
for (long intervalEnd : intervalEndTimes) {
|
||||
|
||||
long intervalStart = intervalEnd - duration;
|
||||
|
||||
// Get duration interval [intervalStart,intervalEnd)
|
||||
DurationInterval durationInterval =
|
||||
getDurationInterval(intervalStart, intervalEnd, planLoads,
|
||||
planModifications, capacity, resCalc, step);
|
||||
planModifications, capacity, netRLERes, resCalc, step, gang);
|
||||
|
||||
// If the interval can fit a gang, add it to the tree
|
||||
if (durationInterval.canAllocate(gang, capacity, resCalc)) {
|
||||
if (durationInterval.canAllocate()) {
|
||||
durationIntervalsSortedByCost.add(durationInterval);
|
||||
}
|
||||
}
|
||||
|
@ -139,8 +156,7 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
|
|||
durationIntervalsSortedByCost.first();
|
||||
int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs);
|
||||
numGangsToAllocate =
|
||||
Math.min(numGangsToAllocate,
|
||||
bestDurationInterval.numCanFit(gang, capacity, resCalc));
|
||||
Math.min(numGangsToAllocate, bestDurationInterval.numCanFit());
|
||||
// Add it
|
||||
remainingGangs -= numGangsToAllocate;
|
||||
|
||||
|
@ -148,9 +164,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
|
|||
new ReservationInterval(bestDurationInterval.getStartTime(),
|
||||
bestDurationInterval.getEndTime());
|
||||
|
||||
Resource reservationRes =
|
||||
Resources.multiply(rr.getCapability(), rr.getConcurrency()
|
||||
* numGangsToAllocate);
|
||||
Resource reservationRes = Resources.multiply(rr.getCapability(),
|
||||
rr.getConcurrency() * numGangsToAllocate);
|
||||
|
||||
planModifications.addInterval(reservationInt, reservationRes);
|
||||
allocationRequests.addInterval(reservationInt, reservationRes);
|
||||
|
@ -162,10 +177,10 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
|
|||
DurationInterval updatedDurationInterval =
|
||||
getDurationInterval(bestDurationInterval.getStartTime(),
|
||||
bestDurationInterval.getStartTime() + duration, planLoads,
|
||||
planModifications, capacity, resCalc, step);
|
||||
planModifications, capacity, netRLERes, resCalc, step, gang);
|
||||
|
||||
// Add to tree, if possible
|
||||
if (updatedDurationInterval.canAllocate(gang, capacity, resCalc)) {
|
||||
if (updatedDurationInterval.canAllocate()) {
|
||||
durationIntervalsSortedByCost.add(updatedDurationInterval);
|
||||
}
|
||||
|
||||
|
@ -180,10 +195,12 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
|
|||
return allocations;
|
||||
} else {
|
||||
|
||||
// If we are here is because we did not manage to satisfy this request.
|
||||
// We remove unwanted side-effect from planModifications (needed for ANY).
|
||||
for (Map.Entry<ReservationInterval, Resource> tempAllocation
|
||||
: allocations.entrySet()) {
|
||||
// If we are here is because we did not manage to satisfy this
|
||||
// request.
|
||||
// We remove unwanted side-effect from planModifications (needed for
|
||||
// ANY).
|
||||
for (Map.Entry<ReservationInterval, Resource> tempAllocation : allocations
|
||||
.entrySet()) {
|
||||
|
||||
planModifications.removeInterval(tempAllocation.getKey(),
|
||||
tempAllocation.getValue());
|
||||
|
@ -196,37 +213,144 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
|
|||
|
||||
}
|
||||
|
||||
protected DurationInterval getDurationInterval(long startTime, long endTime,
|
||||
Map<Long, Resource> planLoads,
|
||||
RLESparseResourceAllocation planModifications, Resource capacity,
|
||||
ResourceCalculator resCalc, long step) {
|
||||
|
||||
// Initialize the dominant loads structure
|
||||
Resource dominantResources = Resource.newInstance(0, 0);
|
||||
|
||||
// Calculate totalCost and maxLoad
|
||||
double totalCost = 0.0;
|
||||
for (long t = startTime; t < endTime; t += step) {
|
||||
|
||||
// Get the load
|
||||
Resource load = getLoadAtTime(t, planLoads, planModifications);
|
||||
|
||||
// Increase the total cost
|
||||
totalCost += calcCostOfLoad(load, capacity, resCalc);
|
||||
|
||||
// Update the dominant resources
|
||||
dominantResources = Resources.componentwiseMax(dominantResources, load);
|
||||
private List<Long> computeIntervalEndTimes(long stageEarliestStart,
|
||||
long stageDeadline, long duration) {
|
||||
|
||||
List<Long> intervalEndTimes = new ArrayList<Long>();
|
||||
if (!allocateLeft) {
|
||||
for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart
|
||||
+ duration; intervalEnd -= duration) {
|
||||
intervalEndTimes.add(intervalEnd);
|
||||
}
|
||||
} else {
|
||||
for (long intervalStart =
|
||||
stageEarliestStart; intervalStart <= stageDeadline
|
||||
- duration; intervalStart += duration) {
|
||||
intervalEndTimes.add(intervalStart + duration);
|
||||
}
|
||||
}
|
||||
|
||||
// Return the corresponding durationInterval
|
||||
return new DurationInterval(startTime, endTime, totalCost,
|
||||
dominantResources);
|
||||
return intervalEndTimes;
|
||||
}
|
||||
|
||||
protected static DurationInterval getDurationInterval(long startTime,
|
||||
long endTime, RLESparseResourceAllocation planLoads,
|
||||
RLESparseResourceAllocation planModifications, Resource capacity,
|
||||
RLESparseResourceAllocation netRLERes, ResourceCalculator resCalc,
|
||||
long step, Resource requestedResources) throws PlanningException {
|
||||
|
||||
// Get the total cost associated with the duration interval
|
||||
double totalCost = getDurationIntervalTotalCost(startTime, endTime,
|
||||
planLoads, planModifications, capacity, resCalc, step);
|
||||
|
||||
// Calculate how many gangs can fit, i.e., how many times can 'capacity'
|
||||
// be allocated within the duration interval [startTime, endTime)
|
||||
int gangsCanFit = getDurationIntervalGangsCanFit(startTime, endTime,
|
||||
planModifications, capacity, netRLERes, resCalc, requestedResources);
|
||||
|
||||
// Return the desired durationInterval
|
||||
return new DurationInterval(startTime, endTime, totalCost, gangsCanFit);
|
||||
|
||||
}
|
||||
|
||||
protected static double getDurationIntervalTotalCost(long startTime,
|
||||
long endTime, RLESparseResourceAllocation planLoads,
|
||||
RLESparseResourceAllocation planModifications, Resource capacity,
|
||||
ResourceCalculator resCalc, long step) throws PlanningException {
|
||||
|
||||
// Compute the current resource load within the interval [startTime,endTime)
|
||||
// by adding planLoads (existing load) and planModifications (load that
|
||||
// corresponds to the current job).
|
||||
RLESparseResourceAllocation currentLoad =
|
||||
RLESparseResourceAllocation.merge(resCalc, capacity, planLoads,
|
||||
planModifications, RLEOperator.add, startTime, endTime);
|
||||
|
||||
// Convert load from RLESparseResourceAllocation to a Map representation
|
||||
NavigableMap<Long, Resource> mapCurrentLoad = currentLoad.getCumulative();
|
||||
|
||||
// Initialize auxiliary variables
|
||||
double totalCost = 0.0;
|
||||
Long tPrev = -1L;
|
||||
Resource loadPrev = Resources.none();
|
||||
double cost = 0.0;
|
||||
|
||||
// Iterate over time points. For each point 't', accumulate the total cost
|
||||
// that corresponds to the interval [tPrev, t). The cost associated within
|
||||
// this interval is fixed for each of the time steps, therefore the cost of
|
||||
// a single step is multiplied by (t - tPrev) / step.
|
||||
for (Entry<Long, Resource> e : mapCurrentLoad.entrySet()) {
|
||||
Long t = e.getKey();
|
||||
Resource load = e.getValue();
|
||||
if (tPrev != -1L) {
|
||||
tPrev = Math.max(tPrev, startTime);
|
||||
cost = calcCostOfLoad(loadPrev, capacity, resCalc);
|
||||
totalCost = totalCost + cost * (t - tPrev) / step;
|
||||
}
|
||||
|
||||
tPrev = t;
|
||||
loadPrev = load;
|
||||
}
|
||||
|
||||
// Add the cost associated with the last interval (the for loop does not
|
||||
// calculate it).
|
||||
if (loadPrev != null) {
|
||||
|
||||
// This takes care of the corner case of a single entry
|
||||
tPrev = Math.max(tPrev, startTime);
|
||||
cost = calcCostOfLoad(loadPrev, capacity, resCalc);
|
||||
totalCost = totalCost + cost * (endTime - tPrev) / step;
|
||||
}
|
||||
|
||||
// Return the overall cost
|
||||
return totalCost;
|
||||
}
|
||||
|
||||
protected static int getDurationIntervalGangsCanFit(long startTime,
|
||||
long endTime, RLESparseResourceAllocation planModifications,
|
||||
Resource capacity, RLESparseResourceAllocation netRLERes,
|
||||
ResourceCalculator resCalc, Resource requestedResources)
|
||||
throws PlanningException {
|
||||
|
||||
// Initialize auxiliary variables
|
||||
int gangsCanFit = Integer.MAX_VALUE;
|
||||
int curGangsCanFit;
|
||||
|
||||
// Calculate the total amount of available resources between startTime
|
||||
// and endTime, by subtracting planModifications from netRLERes
|
||||
RLESparseResourceAllocation netAvailableResources =
|
||||
RLESparseResourceAllocation.merge(resCalc, capacity, netRLERes,
|
||||
planModifications, RLEOperator.subtractTestNonNegative, startTime,
|
||||
endTime);
|
||||
|
||||
// Convert result to a map
|
||||
NavigableMap<Long, Resource> mapAvailableCapacity =
|
||||
netAvailableResources.getCumulative();
|
||||
|
||||
// Iterate over the map representation.
|
||||
// At each point, calculate how many times does 'requestedResources' fit.
|
||||
// The result is the minimum over all time points.
|
||||
for (Entry<Long, Resource> e : mapAvailableCapacity.entrySet()) {
|
||||
Long t = e.getKey();
|
||||
Resource curAvailable = e.getValue();
|
||||
if (t >= endTime) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (curAvailable == null) {
|
||||
gangsCanFit = 0;
|
||||
} else {
|
||||
curGangsCanFit = (int) Math.floor(Resources.divide(resCalc, capacity,
|
||||
curAvailable, requestedResources));
|
||||
if (curGangsCanFit < gangsCanFit) {
|
||||
gangsCanFit = curGangsCanFit;
|
||||
}
|
||||
}
|
||||
}
|
||||
return gangsCanFit;
|
||||
}
|
||||
|
||||
protected double calcCostOfInterval(long startTime, long endTime,
|
||||
Map<Long, Resource> planLoads,
|
||||
RLESparseResourceAllocation planLoads,
|
||||
RLESparseResourceAllocation planModifications, Resource capacity,
|
||||
ResourceCalculator resCalc, long step) {
|
||||
|
||||
|
@ -242,7 +366,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
|
|||
|
||||
}
|
||||
|
||||
protected double calcCostOfTimeSlot(long t, Map<Long, Resource> planLoads,
|
||||
protected double calcCostOfTimeSlot(long t,
|
||||
RLESparseResourceAllocation planLoads,
|
||||
RLESparseResourceAllocation planModifications, Resource capacity,
|
||||
ResourceCalculator resCalc) {
|
||||
|
||||
|
@ -254,17 +379,17 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
|
|||
|
||||
}
|
||||
|
||||
protected Resource getLoadAtTime(long t, Map<Long, Resource> planLoads,
|
||||
protected Resource getLoadAtTime(long t,
|
||||
RLESparseResourceAllocation planLoads,
|
||||
RLESparseResourceAllocation planModifications) {
|
||||
|
||||
Resource planLoad = planLoads.get(t);
|
||||
planLoad = (planLoad == null) ? Resource.newInstance(0, 0) : planLoad;
|
||||
Resource planLoad = planLoads.getCapacityAtTime(t);
|
||||
|
||||
return Resources.add(planLoad, planModifications.getCapacityAtTime(t));
|
||||
|
||||
}
|
||||
|
||||
protected double calcCostOfLoad(Resource load, Resource capacity,
|
||||
protected static double calcCostOfLoad(Resource load, Resource capacity,
|
||||
ResourceCalculator resCalc) {
|
||||
|
||||
return resCalc.ratio(load, capacity);
|
||||
|
@ -289,42 +414,30 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
|
|||
private long startTime;
|
||||
private long endTime;
|
||||
private double cost;
|
||||
private Resource maxLoad;
|
||||
private final int gangsCanFit;
|
||||
|
||||
// Constructor
|
||||
public DurationInterval(long startTime, long endTime, double cost,
|
||||
Resource maxLoad) {
|
||||
int gangsCanfit) {
|
||||
this.startTime = startTime;
|
||||
this.endTime = endTime;
|
||||
this.cost = cost;
|
||||
this.maxLoad = maxLoad;
|
||||
this.gangsCanFit = gangsCanfit;
|
||||
}
|
||||
|
||||
// canAllocate() - boolean function, returns whether requestedResources
|
||||
// can be allocated during the durationInterval without
|
||||
// violating capacity constraints
|
||||
public boolean canAllocate(Resource requestedResources, Resource capacity,
|
||||
ResourceCalculator resCalc) {
|
||||
|
||||
Resource updatedMaxLoad = Resources.add(maxLoad, requestedResources);
|
||||
return (resCalc.compare(capacity, updatedMaxLoad, capacity) <= 0);
|
||||
|
||||
public boolean canAllocate() {
|
||||
return (gangsCanFit > 0);
|
||||
}
|
||||
|
||||
// numCanFit() - returns the maximal number of requestedResources can be
|
||||
// allocated during the durationInterval without violating
|
||||
// capacity constraints
|
||||
public int numCanFit(Resource requestedResources, Resource capacity,
|
||||
ResourceCalculator resCalc) {
|
||||
|
||||
// Represents the largest resource demand that can be satisfied throughout
|
||||
// the entire DurationInterval (i.e., during [startTime,endTime))
|
||||
Resource availableResources = Resources.subtract(capacity, maxLoad);
|
||||
|
||||
// Maximal number of requestedResources that fit inside the interval
|
||||
return (int) Math.floor(Resources.divide(resCalc, capacity,
|
||||
availableResources, requestedResources));
|
||||
|
||||
public int numCanFit() {
|
||||
return gangsCanFit;
|
||||
}
|
||||
|
||||
public long getStartTime() {
|
||||
|
@ -343,14 +456,6 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
|
|||
this.endTime = value;
|
||||
}
|
||||
|
||||
public Resource getMaxLoad() {
|
||||
return this.maxLoad;
|
||||
}
|
||||
|
||||
public void setMaxLoad(Resource value) {
|
||||
this.maxLoad = value;
|
||||
}
|
||||
|
||||
public double getTotalCost() {
|
||||
return this.cost;
|
||||
}
|
||||
|
@ -359,11 +464,17 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
|
|||
this.cost = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
sb.append(" start: " + startTime).append(" end: " + endTime)
|
||||
.append(" cost: " + cost).append(" maxLoad: " + maxLoad);
|
||||
.append(" cost: " + cost).append(" gangsCanFit: " + gangsCanFit);
|
||||
|
||||
return sb.toString();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,106 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
|
||||
|
||||
import java.util.ListIterator;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
|
||||
|
||||
/**
|
||||
* Sets the earliest start time of a stage proportional to the job weight. The
|
||||
* interval [jobArrival, stageDeadline) is divided as follows. First, each stage
|
||||
* is guaranteed at least its requested duration. Then, the stage receives a
|
||||
* fraction of the remaining time. The fraction is calculated as the ratio
|
||||
* between the weight (total requested resources) of the stage and the total
|
||||
* weight of all proceeding stages.
|
||||
*/
|
||||
|
||||
public class StageEarliestStartByDemand implements StageEarliestStart {
|
||||
|
||||
private long step;
|
||||
|
||||
@Override
|
||||
public long setEarliestStartTime(Plan plan,
|
||||
ReservationDefinition reservation, int index, ReservationRequest current,
|
||||
long stageDeadline) {
|
||||
|
||||
step = plan.getStep();
|
||||
|
||||
// If this is the first stage, don't bother with the computation.
|
||||
if (index < 1) {
|
||||
return reservation.getArrival();
|
||||
}
|
||||
|
||||
// Get iterator
|
||||
ListIterator<ReservationRequest> li =
|
||||
reservation.getReservationRequests().getReservationResources()
|
||||
.listIterator(index);
|
||||
ReservationRequest rr;
|
||||
|
||||
// Calculate the total weight & total duration
|
||||
double totalWeight = calcWeight(current);
|
||||
long totalDuration = getRoundedDuration(current, plan);
|
||||
|
||||
while (li.hasPrevious()) {
|
||||
rr = li.previous();
|
||||
totalWeight += calcWeight(rr);
|
||||
totalDuration += getRoundedDuration(rr, plan);
|
||||
}
|
||||
|
||||
// Compute the weight of the current stage as compared to remaining ones
|
||||
double ratio = calcWeight(current) / totalWeight;
|
||||
|
||||
// Estimate an early start time, such that:
|
||||
// 1. Every stage is guaranteed to receive at least its duration
|
||||
// 2. The remainder of the window is divided between stages
|
||||
// proportionally to its workload (total memory consumption)
|
||||
long window = stageDeadline - reservation.getArrival();
|
||||
long windowRemainder = window - totalDuration;
|
||||
long earlyStart =
|
||||
(long) (stageDeadline - getRoundedDuration(current, plan)
|
||||
- (windowRemainder * ratio));
|
||||
|
||||
// Realign if necessary (since we did some arithmetic)
|
||||
earlyStart = stepRoundUp(earlyStart, step);
|
||||
|
||||
// Return
|
||||
return earlyStart;
|
||||
|
||||
}
|
||||
|
||||
// Weight = total memory consumption of stage
|
||||
protected double calcWeight(ReservationRequest stage) {
|
||||
return (stage.getDuration() * stage.getCapability().getMemorySize())
|
||||
* (stage.getNumContainers());
|
||||
}
|
||||
|
||||
protected long getRoundedDuration(ReservationRequest stage, Plan plan) {
|
||||
return stepRoundUp(stage.getDuration(), step);
|
||||
}
|
||||
|
||||
protected static long stepRoundDown(long t, long step) {
|
||||
return (t / step) * step;
|
||||
}
|
||||
|
||||
protected static long stepRoundUp(long t, long step) {
|
||||
return ((t + step - 1) / step) * step;
|
||||
}
|
||||
}
|
|
@ -1,39 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
|
||||
|
||||
/**
|
||||
* Sets the earliest start time of a stage as the job arrival time.
|
||||
*/
|
||||
public class StageEarliestStartByJobArrival implements StageEarliestStart {
|
||||
|
||||
@Override
|
||||
public long setEarliestStartTime(Plan plan,
|
||||
ReservationDefinition reservation, int index, ReservationRequest current,
|
||||
long stageDeadline) {
|
||||
|
||||
return reservation.getArrival();
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -21,26 +21,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
|
|||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
|
||||
|
||||
/**
|
||||
* Interface for setting the earliest start time of a stage in IterativePlanner.
|
||||
* An auxiliary class used to compute the time interval in which the stage can
|
||||
* be allocated resources by {@link IterativePlanner}.
|
||||
*/
|
||||
public interface StageEarliestStart {
|
||||
|
||||
public interface StageExecutionInterval {
|
||||
/**
|
||||
* Computes the earliest allowed starting time for a given stage.
|
||||
*
|
||||
* @param plan the Plan to which the reservation must be fitted
|
||||
* @param reservation the job contract
|
||||
* @param index the index of the stage in the job contract
|
||||
* @param currentReservationStage the stage
|
||||
* @param stageDeadline the deadline of the stage set by the two phase
|
||||
* planning algorithm
|
||||
*
|
||||
* @return the earliest allowed starting time for the stage.
|
||||
* @param allocateLeft is the job allocated from left to right
|
||||
* @param allocations Existing resource assignments for the job
|
||||
* @return the time interval in which the stage can get resources.
|
||||
*/
|
||||
long setEarliestStartTime(Plan plan, ReservationDefinition reservation,
|
||||
int index, ReservationRequest currentReservationStage,
|
||||
long stageDeadline);
|
||||
ReservationInterval computeExecutionInterval(Plan plan,
|
||||
ReservationDefinition reservation,
|
||||
ReservationRequest currentReservationStage, boolean allocateLeft,
|
||||
RLESparseResourceAllocation allocations);
|
||||
|
||||
}
|
|
@ -0,0 +1,144 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.IterativePlanner.StageProvider;
|
||||
|
||||
/**
|
||||
* An implementation of {@link StageExecutionInterval}, which sets the execution
|
||||
* interval of the stage. For ANY and ALL jobs, the interval is
|
||||
* [jobArrival,jobDeadline]. For ORDER jobs, the the maximal possible time
|
||||
* interval is divided as follows: First, each stage is guaranteed at least its
|
||||
* requested duration. Then, the stage receives a fraction of the remaining
|
||||
* time. The fraction is calculated as the ratio between the weight (total
|
||||
* requested resources) of the stage and the total weight of all remaining
|
||||
* stages.
|
||||
*/
|
||||
|
||||
public class StageExecutionIntervalByDemand implements StageExecutionInterval {
|
||||
|
||||
private long step;
|
||||
|
||||
@Override
|
||||
public ReservationInterval computeExecutionInterval(Plan plan,
|
||||
ReservationDefinition reservation,
|
||||
ReservationRequest currentReservationStage, boolean allocateLeft,
|
||||
RLESparseResourceAllocation allocations) {
|
||||
|
||||
// Use StageExecutionIntervalUnconstrained to get the maximal interval
|
||||
ReservationInterval maxInterval =
|
||||
(new StageExecutionIntervalUnconstrained()).computeExecutionInterval(
|
||||
plan, reservation, currentReservationStage, allocateLeft,
|
||||
allocations);
|
||||
|
||||
ReservationRequestInterpreter jobType =
|
||||
reservation.getReservationRequests().getInterpreter();
|
||||
|
||||
// For unconstrained jobs, such as ALL & ANY, we can use the unconstrained
|
||||
// version
|
||||
if ((jobType != ReservationRequestInterpreter.R_ORDER)
|
||||
&& (jobType != ReservationRequestInterpreter.R_ORDER_NO_GAP)) {
|
||||
return maxInterval;
|
||||
}
|
||||
|
||||
// For ORDER and ORDER_NO_GAP, take a sub-interval of maxInterval
|
||||
step = plan.getStep();
|
||||
|
||||
double totalWeight = 0.0;
|
||||
long totalDuration = 0;
|
||||
|
||||
// Iterate over the stages that haven't been allocated.
|
||||
// For allocateLeft == True, we iterate in reverse order, starting from the
|
||||
// last
|
||||
// stage, until we reach the current stage.
|
||||
// For allocateLeft == False, we do the opposite.
|
||||
StageProvider stageProvider = new StageProvider(!allocateLeft, reservation);
|
||||
|
||||
while (stageProvider.hasNext()) {
|
||||
ReservationRequest rr = stageProvider.next();
|
||||
totalWeight += calcWeight(rr);
|
||||
totalDuration += getRoundedDuration(rr, step);
|
||||
|
||||
// Stop once we reach current
|
||||
if (rr == currentReservationStage) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Compute the weight of the current stage as compared to remaining ones
|
||||
double ratio = calcWeight(currentReservationStage) / totalWeight;
|
||||
|
||||
// Estimate an early start time, such that:
|
||||
// 1. Every stage is guaranteed to receive at least its duration
|
||||
// 2. The remainder of the window is divided between stages
|
||||
// proportionally to its workload (total memory consumption)
|
||||
long maxIntervalArrival = maxInterval.getStartTime();
|
||||
long maxIntervalDeadline = maxInterval.getEndTime();
|
||||
long window = maxIntervalDeadline - maxIntervalArrival;
|
||||
long windowRemainder = window - totalDuration;
|
||||
|
||||
if (allocateLeft) {
|
||||
long latestEnd =
|
||||
(long) (maxIntervalArrival
|
||||
+ getRoundedDuration(currentReservationStage, step)
|
||||
+ (windowRemainder * ratio));
|
||||
|
||||
// Realign if necessary (since we did some arithmetic)
|
||||
latestEnd = stepRoundDown(latestEnd, step);
|
||||
|
||||
// Return new interval
|
||||
return new ReservationInterval(maxIntervalArrival, latestEnd);
|
||||
} else {
|
||||
long earlyStart =
|
||||
(long) (maxIntervalDeadline
|
||||
- getRoundedDuration(currentReservationStage, step)
|
||||
- (windowRemainder * ratio));
|
||||
|
||||
// Realign if necessary (since we did some arithmetic)
|
||||
earlyStart = stepRoundUp(earlyStart, step);
|
||||
|
||||
// Return new interval
|
||||
return new ReservationInterval(earlyStart, maxIntervalDeadline);
|
||||
}
|
||||
}
|
||||
|
||||
// Weight = total memory consumption of stage
|
||||
protected double calcWeight(ReservationRequest stage) {
|
||||
return (stage.getDuration() * stage.getCapability().getMemorySize())
|
||||
* (stage.getNumContainers());
|
||||
}
|
||||
|
||||
protected long getRoundedDuration(ReservationRequest stage, Long s) {
|
||||
return stepRoundUp(stage.getDuration(), s);
|
||||
}
|
||||
|
||||
protected static long stepRoundDown(long t, long s) {
|
||||
return (t / s) * s;
|
||||
}
|
||||
|
||||
protected static long stepRoundUp(long t, long s) {
|
||||
return ((t + s - 1) / s) * s;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
|
||||
|
||||
/**
|
||||
* An implementation of {@link StageExecutionInterval} which gives each stage
|
||||
* the maximal possible time interval, given the job constraints. Specifically,
|
||||
* for ANY and ALL jobs, the interval would be [jobArrival, jobDeadline). For
|
||||
* ORDER jobs, the stage cannot start before its predecessors (if allocateLeft
|
||||
* == true) or cannot end before its successors (if allocateLeft == false)
|
||||
*/
|
||||
public class StageExecutionIntervalUnconstrained implements
|
||||
StageExecutionInterval {
|
||||
|
||||
@Override
|
||||
public ReservationInterval computeExecutionInterval(Plan plan,
|
||||
ReservationDefinition reservation,
|
||||
ReservationRequest currentReservationStage, boolean allocateLeft,
|
||||
RLESparseResourceAllocation allocations) {
|
||||
|
||||
Long stageArrival = reservation.getArrival();
|
||||
Long stageDeadline = reservation.getDeadline();
|
||||
|
||||
ReservationRequestInterpreter jobType =
|
||||
reservation.getReservationRequests().getInterpreter();
|
||||
|
||||
// Left to right
|
||||
if (allocateLeft) {
|
||||
// If ORDER job, change the stage arrival time
|
||||
if ((jobType == ReservationRequestInterpreter.R_ORDER)
|
||||
|| (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP)) {
|
||||
Long allocationEndTime = allocations.getLatestNonNullTime();
|
||||
if (allocationEndTime != -1) {
|
||||
stageArrival = allocationEndTime;
|
||||
}
|
||||
}
|
||||
// Right to left
|
||||
} else {
|
||||
// If ORDER job, change the stage deadline
|
||||
if ((jobType == ReservationRequestInterpreter.R_ORDER)
|
||||
|| (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP)) {
|
||||
Long allocationStartTime = allocations.getEarliestStartTime();
|
||||
if (allocationStartTime != -1) {
|
||||
stageDeadline = allocationStartTime;
|
||||
}
|
||||
}
|
||||
}
|
||||
return new ReservationInterval(stageArrival, stageDeadline);
|
||||
}
|
||||
}
|
|
@ -18,13 +18,17 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
|
@ -37,29 +41,32 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.StageAllocatorLowCostAligned.DurationInterval;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
|
||||
public class TestAlignedPlanner {
|
||||
|
||||
ReservationAgent agent;
|
||||
InMemoryPlan plan;
|
||||
Resource minAlloc = Resource.newInstance(1024, 1);
|
||||
ResourceCalculator res = new DefaultResourceCalculator();
|
||||
Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
|
||||
Random rand = new Random();
|
||||
long step;
|
||||
private ReservationAgent agentRight;
|
||||
private ReservationAgent agentLeft;
|
||||
private InMemoryPlan plan;
|
||||
private final Resource minAlloc = Resource.newInstance(1024, 1);
|
||||
private final ResourceCalculator res = new DefaultResourceCalculator();
|
||||
private final Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
|
||||
private final Random rand = new Random();
|
||||
private Resource clusterCapacity;
|
||||
private long step;
|
||||
|
||||
@Test
|
||||
public void testSingleReservationAccept() throws PlanningException {
|
||||
|
@ -82,7 +89,7 @@ public class TestAlignedPlanner {
|
|||
// Add reservation
|
||||
ReservationId reservationID =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
agent.createReservation(reservationID, "u1", plan, rr1);
|
||||
agentRight.createReservation(reservationID, "u1", plan, rr1);
|
||||
|
||||
// CHECK: allocation was accepted
|
||||
assertTrue("Agent-based allocation failed", reservationID != null);
|
||||
|
@ -107,7 +114,7 @@ public class TestAlignedPlanner {
|
|||
// Create reservation
|
||||
ReservationDefinition rr1 =
|
||||
createReservationDefinition(
|
||||
10L, // Job arrival time
|
||||
10 * step, // Job arrival time
|
||||
15 * step, // Job deadline
|
||||
new ReservationRequest[] {
|
||||
ReservationRequest.newInstance(
|
||||
|
@ -126,7 +133,7 @@ public class TestAlignedPlanner {
|
|||
try {
|
||||
ReservationId reservationID =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
agent.createReservation(reservationID, "u1", plan, rr1);
|
||||
agentRight.createReservation(reservationID, "u1", plan, rr1);
|
||||
fail();
|
||||
} catch (PlanningException e) {
|
||||
// Expected failure
|
||||
|
@ -166,7 +173,7 @@ public class TestAlignedPlanner {
|
|||
try {
|
||||
ReservationId reservationID =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
agent.createReservation(reservationID, "u1", plan, rr1);
|
||||
agentRight.createReservation(reservationID, "u1", plan, rr1);
|
||||
fail();
|
||||
} catch (PlanningException e) {
|
||||
// Expected failure
|
||||
|
@ -206,7 +213,7 @@ public class TestAlignedPlanner {
|
|||
try {
|
||||
ReservationId reservationID =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
agent.createReservation(reservationID, "u1", plan, rr1);
|
||||
agentRight.createReservation(reservationID, "u1", plan, rr1);
|
||||
fail();
|
||||
} catch (PlanningException e) {
|
||||
// Expected failure
|
||||
|
@ -246,7 +253,7 @@ public class TestAlignedPlanner {
|
|||
try {
|
||||
ReservationId reservationID =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
agent.createReservation(reservationID, "u1", plan, rr1);
|
||||
agentRight.createReservation(reservationID, "u1", plan, rr1);
|
||||
fail();
|
||||
} catch (PlanningException e) {
|
||||
// Expected failure
|
||||
|
@ -285,7 +292,7 @@ public class TestAlignedPlanner {
|
|||
// Add reservation
|
||||
ReservationId reservationID =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
agent.createReservation(reservationID, "u1", plan, rr1);
|
||||
agentRight.createReservation(reservationID, "u1", plan, rr1);
|
||||
|
||||
// CHECK: allocation was accepted
|
||||
assertTrue("Agent-based allocation failed", reservationID != null);
|
||||
|
@ -328,7 +335,7 @@ public class TestAlignedPlanner {
|
|||
// Add reservation
|
||||
ReservationId reservationID =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
agent.createReservation(reservationID, "u1", plan, rr1);
|
||||
agentRight.createReservation(reservationID, "u1", plan, rr1);
|
||||
|
||||
// CHECK: allocation was accepted
|
||||
assertTrue("Agent-based allocation failed", reservationID != null);
|
||||
|
@ -374,7 +381,7 @@ public class TestAlignedPlanner {
|
|||
try {
|
||||
ReservationId reservationID =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
agent.createReservation(reservationID, "u1", plan, rr1);
|
||||
agentRight.createReservation(reservationID, "u1", plan, rr1);
|
||||
fail();
|
||||
} catch (PlanningException e) {
|
||||
// Expected failure
|
||||
|
@ -420,10 +427,10 @@ public class TestAlignedPlanner {
|
|||
ReservationSystemTestUtil.getNewReservationId();
|
||||
|
||||
// Add block, add flex, remove block, update flex
|
||||
agent.createReservation(blockReservationID, "uBlock", plan, rrBlock);
|
||||
agent.createReservation(flexReservationID, "uFlex", plan, rrFlex);
|
||||
agent.deleteReservation(blockReservationID, "uBlock", plan);
|
||||
agent.updateReservation(flexReservationID, "uFlex", plan, rrFlex);
|
||||
agentRight.createReservation(blockReservationID, "uBlock", plan, rrBlock);
|
||||
agentRight.createReservation(flexReservationID, "uFlex", plan, rrFlex);
|
||||
agentRight.deleteReservation(blockReservationID, "uBlock", plan);
|
||||
agentRight.updateReservation(flexReservationID, "uFlex", plan, rrFlex);
|
||||
|
||||
// CHECK: allocation was accepted
|
||||
assertTrue("Agent-based allocation failed", flexReservationID != null);
|
||||
|
@ -458,7 +465,7 @@ public class TestAlignedPlanner {
|
|||
try {
|
||||
ReservationId reservationID =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
agent.createReservation(reservationID, "u1", plan, rr1);
|
||||
agentRight.createReservation(reservationID, "u1", plan, rr1);
|
||||
fail();
|
||||
} catch (PlanningException e) {
|
||||
// Expected failure
|
||||
|
@ -490,7 +497,7 @@ public class TestAlignedPlanner {
|
|||
// Add reservation
|
||||
ReservationId reservationID =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
agent.createReservation(reservationID, "u1", plan, rr1);
|
||||
agentRight.createReservation(reservationID, "u1", plan, rr1);
|
||||
|
||||
// CHECK: allocation was accepted
|
||||
assertTrue("Agent-based allocation failed", reservationID != null);
|
||||
|
@ -557,9 +564,9 @@ public class TestAlignedPlanner {
|
|||
ReservationSystemTestUtil.getNewReservationId();
|
||||
|
||||
// Add all
|
||||
agent.createReservation(reservationID1, "u1", plan, rr7Mem1Core);
|
||||
agent.createReservation(reservationID2, "u2", plan, rr6Mem6Cores);
|
||||
agent.createReservation(reservationID3, "u3", plan, rr);
|
||||
agentRight.createReservation(reservationID1, "u1", plan, rr7Mem1Core);
|
||||
agentRight.createReservation(reservationID2, "u2", plan, rr6Mem6Cores);
|
||||
agentRight.createReservation(reservationID3, "u3", plan, rr);
|
||||
|
||||
// Get reservation
|
||||
ReservationAllocation alloc3 = plan.getReservationById(reservationID3);
|
||||
|
@ -684,8 +691,8 @@ public class TestAlignedPlanner {
|
|||
for (ReservationDefinition rr : list) {
|
||||
ReservationId reservationID =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
agent.createReservation(reservationID, "u" + Integer.toString(i), plan,
|
||||
rr);
|
||||
agentRight.createReservation(reservationID, "u" + Integer.toString(i),
|
||||
plan, rr);
|
||||
++i;
|
||||
}
|
||||
|
||||
|
@ -695,6 +702,328 @@ public class TestAlignedPlanner {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleReservationAcceptAllocateLeft()
|
||||
throws PlanningException {
|
||||
|
||||
// Create reservation
|
||||
ReservationDefinition rr1 =
|
||||
createReservationDefinition(
|
||||
10 * step, // Job arrival time
|
||||
35 * step, // Job deadline
|
||||
new ReservationRequest[] {
|
||||
ReservationRequest.newInstance(
|
||||
Resource.newInstance(1024, 1), // Capability
|
||||
20, // Num containers
|
||||
20, // Concurrency
|
||||
10 * step), // Duration
|
||||
ReservationRequest.newInstance(
|
||||
Resource.newInstance(1024, 1), // Capability
|
||||
20, // Num containers
|
||||
20, // Concurrency
|
||||
10 * step) }, // Duration
|
||||
ReservationRequestInterpreter.R_ORDER, "u1");
|
||||
|
||||
// Add reservation
|
||||
ReservationId reservationID =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
agentLeft.createReservation(reservationID, "u1", plan, rr1);
|
||||
|
||||
// CHECK: allocation was accepted
|
||||
assertTrue("Agent-based allocation failed", reservationID != null);
|
||||
assertTrue("Agent-based allocation failed", plan.getAllReservations()
|
||||
.size() == 1);
|
||||
|
||||
// Get reservation
|
||||
ReservationAllocation alloc1 = plan.getReservationById(reservationID);
|
||||
|
||||
// Verify allocation
|
||||
assertTrue(alloc1.toString(),
|
||||
check(alloc1, 10 * step, 30 * step, 20, 1024, 1));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLeftSucceedsRightFails() throws PlanningException {
|
||||
|
||||
// Prepare basic plan
|
||||
int numJobsInScenario = initializeScenario2();
|
||||
|
||||
// Create reservation
|
||||
ReservationDefinition rr1 =
|
||||
createReservationDefinition(
|
||||
7 * step, // Job arrival time
|
||||
16 * step, // Job deadline
|
||||
new ReservationRequest[] {
|
||||
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
|
||||
20, // Num containers
|
||||
20, // Concurrency
|
||||
2 * step), // Duration
|
||||
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
|
||||
20, // Num containers
|
||||
20, // Concurrency
|
||||
2 * step) }, // Duration
|
||||
ReservationRequestInterpreter.R_ORDER, "u1");
|
||||
|
||||
ReservationDefinition rr2 =
|
||||
createReservationDefinition(
|
||||
14 * step, // Job arrival time
|
||||
16 * step, // Job deadline
|
||||
new ReservationRequest[] {
|
||||
ReservationRequest.newInstance(
|
||||
Resource.newInstance(1024, 1), // Capability
|
||||
100, // Num containers
|
||||
100, // Concurrency
|
||||
2 * step) }, // Duration
|
||||
ReservationRequestInterpreter.R_ORDER, "u2");
|
||||
|
||||
// Add 1st reservation
|
||||
ReservationId reservationID1 =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
agentLeft.createReservation(reservationID1, "u1", plan, rr1);
|
||||
|
||||
// CHECK: allocation was accepted
|
||||
assertTrue("Agent-based allocation failed", reservationID1 != null);
|
||||
assertTrue("Agent-based allocation failed", plan.getAllReservations()
|
||||
.size() == numJobsInScenario + 1);
|
||||
|
||||
// Get reservation
|
||||
ReservationAllocation alloc1 = plan.getReservationById(reservationID1);
|
||||
|
||||
// Verify allocation
|
||||
assertTrue(alloc1.toString(),
|
||||
check(alloc1, 7 * step, 11 * step, 20, 1024, 1));
|
||||
|
||||
// Add second reservation
|
||||
ReservationId reservationID2 =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
agentLeft.createReservation(reservationID2, "u2", plan, rr2);
|
||||
|
||||
// CHECK: allocation was accepted
|
||||
assertTrue("Agent-based allocation failed", reservationID2 != null);
|
||||
assertTrue("Agent-based allocation failed", plan.getAllReservations()
|
||||
.size() == numJobsInScenario + 2);
|
||||
|
||||
// Get reservation
|
||||
ReservationAllocation alloc2 = plan.getReservationById(reservationID2);
|
||||
|
||||
// Verify allocation
|
||||
assertTrue(alloc2.toString(),
|
||||
check(alloc2, 14 * step, 16 * step, 100, 1024, 1));
|
||||
|
||||
agentLeft.deleteReservation(reservationID1, "u1", plan);
|
||||
agentLeft.deleteReservation(reservationID2, "u2", plan);
|
||||
|
||||
// Now try to allocate the same jobs with agentRight. The second
|
||||
// job should fail
|
||||
// Add 1st reservation
|
||||
ReservationId reservationID3 =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
agentRight.createReservation(reservationID3, "u1", plan, rr1);
|
||||
|
||||
// CHECK: allocation was accepted
|
||||
assertTrue("Agent-based allocation failed", reservationID3 != null);
|
||||
assertTrue("Agent-based allocation failed", plan.getAllReservations()
|
||||
.size() == numJobsInScenario + 1);
|
||||
|
||||
// Add 2nd reservation
|
||||
try {
|
||||
ReservationId reservationID4 =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
agentRight.createReservation(reservationID4, "u2", plan, rr2);
|
||||
fail();
|
||||
} catch (PlanningException e) {
|
||||
// Expected failure
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateOrderNoGap() {
|
||||
|
||||
//
|
||||
// Initialize allocations
|
||||
//
|
||||
|
||||
RLESparseResourceAllocation allocation =
|
||||
new RLESparseResourceAllocation(res);
|
||||
allocation.addInterval(new ReservationInterval(10 * step, 13 * step),
|
||||
Resource.newInstance(1024, 1));
|
||||
|
||||
// curAlloc
|
||||
Map<ReservationInterval, Resource> curAlloc =
|
||||
new HashMap<ReservationInterval, Resource>();
|
||||
|
||||
//
|
||||
// Check cases
|
||||
//
|
||||
|
||||
// 1. allocateLeft = false, succeed when there is no gap
|
||||
curAlloc.clear();
|
||||
curAlloc.put(new ReservationInterval(9 * step, 10 * step),
|
||||
Resource.newInstance(1024, 1));
|
||||
assertTrue("validateOrderNoFap() should have suceeded",
|
||||
IterativePlanner.validateOrderNoGap(allocation, curAlloc, false));
|
||||
|
||||
// 2. allocateLeft = false, fail when curAlloc has a gap
|
||||
curAlloc.put(new ReservationInterval(7 * step, 8 * step),
|
||||
Resource.newInstance(1024, 1));
|
||||
assertFalse("validateOrderNoGap() failed to identify a gap in curAlloc",
|
||||
IterativePlanner.validateOrderNoGap(allocation, curAlloc, false));
|
||||
|
||||
// 3. allocateLeft = false, fail when there is a gap between curAlloc and
|
||||
// allocations
|
||||
curAlloc.clear();
|
||||
curAlloc.put(new ReservationInterval(8 * step, 9 * step),
|
||||
Resource.newInstance(1024, 1));
|
||||
assertFalse("validateOrderNoGap() failed to identify a gap between "
|
||||
+ "allocations and curAlloc",
|
||||
IterativePlanner.validateOrderNoGap(allocation, curAlloc, false));
|
||||
|
||||
// 4. allocateLeft = true, succeed when there is no gap
|
||||
curAlloc.clear();
|
||||
curAlloc.put(new ReservationInterval(13 * step, 14 * step),
|
||||
Resource.newInstance(1024, 1));
|
||||
assertTrue("validateOrderNoFap() should have suceeded",
|
||||
IterativePlanner.validateOrderNoGap(allocation, curAlloc, true));
|
||||
|
||||
// 5. allocateLeft = true, fail when there is a gap between curAlloc and
|
||||
// allocations
|
||||
curAlloc.put(new ReservationInterval(15 * step, 16 * step),
|
||||
Resource.newInstance(1024, 1));
|
||||
assertFalse("validateOrderNoGap() failed to identify a gap in curAlloc",
|
||||
IterativePlanner.validateOrderNoGap(allocation, curAlloc, true));
|
||||
|
||||
// 6. allocateLeft = true, fail when curAlloc has a gap
|
||||
curAlloc.clear();
|
||||
curAlloc.put(new ReservationInterval(14 * step, 15 * step),
|
||||
Resource.newInstance(1024, 1));
|
||||
assertFalse("validateOrderNoGap() failed to identify a gap between "
|
||||
+ "allocations and curAlloc",
|
||||
IterativePlanner.validateOrderNoGap(allocation, curAlloc, true));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetDurationInterval() throws PlanningException {
|
||||
|
||||
DurationInterval durationInterval = null;
|
||||
|
||||
// Create netRLERes:
|
||||
// - 4GB & 4VC between [10,20) and [30,40)
|
||||
// - 8GB & 8VC between [20,30)
|
||||
RLESparseResourceAllocation netRLERes =
|
||||
new RLESparseResourceAllocation(res);
|
||||
netRLERes.addInterval(
|
||||
new ReservationInterval(10 * step, 40 * step),
|
||||
Resource.newInstance(4096, 4)
|
||||
);
|
||||
netRLERes.addInterval(
|
||||
new ReservationInterval(20 * step, 30 * step),
|
||||
Resource.newInstance(4096, 4)
|
||||
);
|
||||
|
||||
// Create planLoads:
|
||||
// - 5GB & 5VC between [20,30)
|
||||
RLESparseResourceAllocation planLoads =
|
||||
new RLESparseResourceAllocation(res);
|
||||
planLoads.addInterval(
|
||||
new ReservationInterval(20 * step, 30 * step),
|
||||
Resource.newInstance(5120, 5)
|
||||
);
|
||||
|
||||
// Create planModifications:
|
||||
// - 1GB & 1VC between [25,35)
|
||||
RLESparseResourceAllocation planModifications =
|
||||
new RLESparseResourceAllocation(res);
|
||||
planModifications.addInterval(
|
||||
new ReservationInterval(25 * step, 35 * step),
|
||||
Resource.newInstance(1024, 1)
|
||||
);
|
||||
|
||||
// Set requested resources
|
||||
Resource requestedResources = Resource.newInstance(1024, 1);
|
||||
|
||||
|
||||
// 1.
|
||||
// currLoad: should start at 20*step, end at 30*step with a null value
|
||||
// (in getTotalCost(), after the for loop we will have loadPrev == null
|
||||
// netAvailableResources: should start exactly at startTime (10*step),
|
||||
// end exactly at endTime (30*step) with a null value
|
||||
durationInterval =
|
||||
StageAllocatorLowCostAligned.getDurationInterval(10*step, 30*step,
|
||||
planLoads, planModifications, clusterCapacity, netRLERes, res, step,
|
||||
requestedResources);
|
||||
assertEquals(durationInterval.numCanFit(), 4);
|
||||
assertEquals(durationInterval.getTotalCost(), 0.55, 0.00001);
|
||||
|
||||
// 2.
|
||||
// currLoad: should start at 20*step, end at 31*step with a null value
|
||||
// (in getTotalCost, after the for loop we will have loadPrev == null)
|
||||
// netAvailableResources: should start exactly at startTime (10*step),
|
||||
// end exactly at endTime (31*step) with a null value
|
||||
durationInterval =
|
||||
StageAllocatorLowCostAligned.getDurationInterval(10*step, 31*step,
|
||||
planLoads, planModifications, clusterCapacity, netRLERes, res, step,
|
||||
requestedResources);
|
||||
System.out.println(durationInterval);
|
||||
assertEquals(durationInterval.numCanFit(), 3);
|
||||
assertEquals(durationInterval.getTotalCost(), 0.56, 0.00001);
|
||||
|
||||
// 3.
|
||||
// currLoad: should start at 20*step, end at 30*step with a null value
|
||||
// (in getTotalCost, after the for loop we will have loadPrev == null)
|
||||
// netAvailableResources: should start exactly startTime (15*step),
|
||||
// end exactly at endTime (30*step) with a null value
|
||||
durationInterval =
|
||||
StageAllocatorLowCostAligned.getDurationInterval(15*step, 30*step,
|
||||
planLoads, planModifications, clusterCapacity, netRLERes, res, step,
|
||||
requestedResources);
|
||||
assertEquals(durationInterval.numCanFit(), 4);
|
||||
assertEquals(durationInterval.getTotalCost(), 0.55, 0.00001);
|
||||
|
||||
// 4.
|
||||
// currLoad: should start at 20*step, end at 31*step with a null value
|
||||
// (in getTotalCost, after the for loop we will have loadPrev == null)
|
||||
// netAvailableResources: should start exactly at startTime (15*step),
|
||||
// end exactly at endTime (31*step) with a value other than null
|
||||
durationInterval =
|
||||
StageAllocatorLowCostAligned.getDurationInterval(15*step, 31*step,
|
||||
planLoads, planModifications, clusterCapacity, netRLERes, res, step,
|
||||
requestedResources);
|
||||
System.out.println(durationInterval);
|
||||
assertEquals(durationInterval.numCanFit(), 3);
|
||||
assertEquals(durationInterval.getTotalCost(), 0.56, 0.00001);
|
||||
|
||||
// 5.
|
||||
// currLoad: should only contain one entry at startTime
|
||||
// (22*step), therefore loadPrev != null and we should enter the if
|
||||
// condition after the for loop in getTotalCost
|
||||
// netAvailableResources: should only contain one entry at startTime
|
||||
// (22*step)
|
||||
durationInterval =
|
||||
StageAllocatorLowCostAligned.getDurationInterval(22*step, 23*step,
|
||||
planLoads, planModifications, clusterCapacity, netRLERes, res, step,
|
||||
requestedResources);
|
||||
System.out.println(durationInterval);
|
||||
assertEquals(durationInterval.numCanFit(), 8);
|
||||
assertEquals(durationInterval.getTotalCost(), 0.05, 0.00001);
|
||||
|
||||
// 6.
|
||||
// currLoad: should start at 39*step, end at 41*step with a null value
|
||||
// (in getTotalCost, after the for loop we will have loadPrev == null)
|
||||
// netAvailableResources: should start exactly at startTime (39*step),
|
||||
// end exactly at endTime (41*step) with a null value
|
||||
durationInterval =
|
||||
StageAllocatorLowCostAligned.getDurationInterval(39*step, 41*step,
|
||||
planLoads, planModifications, clusterCapacity, netRLERes, res, step,
|
||||
requestedResources);
|
||||
System.out.println(durationInterval);
|
||||
assertEquals(durationInterval.numCanFit(), 0);
|
||||
assertEquals(durationInterval.getTotalCost(), 0, 0.00001);
|
||||
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
|
||||
|
@ -709,16 +1038,15 @@ public class TestAlignedPlanner {
|
|||
int capacityCores = 100;
|
||||
step = 60000L;
|
||||
|
||||
Resource clusterCapacity = Resource.newInstance(capacityMem, capacityCores);
|
||||
clusterCapacity = Resource.newInstance(capacityMem, capacityCores);
|
||||
|
||||
String reservationQ =
|
||||
ReservationSystemTestUtil.getFullReservationQueueName();
|
||||
float instConstraint = 100;
|
||||
float avgConstraint = 100;
|
||||
|
||||
ReservationSchedulerConfiguration conf =
|
||||
ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
|
||||
instConstraint, avgConstraint);
|
||||
ReservationSchedulerConfiguration conf = ReservationSystemTestUtil
|
||||
.createConf(reservationQ, timeWindow, instConstraint, avgConstraint);
|
||||
|
||||
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
|
||||
policy.init(reservationQ, conf);
|
||||
|
@ -728,14 +1056,19 @@ public class TestAlignedPlanner {
|
|||
|
||||
conf.setInt(AlignedPlannerWithGreedy.SMOOTHNESS_FACTOR,
|
||||
AlignedPlannerWithGreedy.DEFAULT_SMOOTHNESS_FACTOR);
|
||||
conf.setBoolean(ReservationAgent.FAVOR_EARLY_ALLOCATION, false);
|
||||
|
||||
// Set planning agent
|
||||
agent = new AlignedPlannerWithGreedy();
|
||||
agent.init(conf);
|
||||
agentRight = new AlignedPlannerWithGreedy();
|
||||
agentRight.init(conf);
|
||||
|
||||
conf.setBoolean(ReservationAgent.FAVOR_EARLY_ALLOCATION, true);
|
||||
agentLeft = new AlignedPlannerWithGreedy();
|
||||
agentLeft.init(conf);
|
||||
|
||||
// Create Plan
|
||||
plan =
|
||||
new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
|
||||
res, minAlloc, maxAlloc, "dedicated", null, true, context);
|
||||
plan = new InMemoryPlan(queueMetrics, policy, agentRight, clusterCapacity,
|
||||
step, res, minAlloc, maxAlloc, "dedicated", null, true, context);
|
||||
}
|
||||
|
||||
private int initializeScenario1() throws PlanningException {
|
||||
|
|
|
@ -6,9 +6,9 @@
|
|||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -55,12 +55,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
|
|||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestGreedyReservationAgent {
|
||||
|
@ -108,7 +108,7 @@ public class TestGreedyReservationAgent {
|
|||
policy.init(reservationQ, conf);
|
||||
|
||||
// setting conf to
|
||||
conf.setBoolean(GreedyReservationAgent.GREEDY_FAVOR_EARLY_ALLOCATION,
|
||||
conf.setBoolean(GreedyReservationAgent.FAVOR_EARLY_ALLOCATION,
|
||||
allocateLeft);
|
||||
agent = new GreedyReservationAgent();
|
||||
agent.init(conf);
|
||||
|
|
|
@ -6,9 +6,9 @@
|
|||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
|
Loading…
Reference in New Issue