YARN-4358. Reservation System: Improve relationship between SharingPolicy and ReservationAgent. (Carlo Curino via asuresh)

This commit is contained in:
Arun Suresh 2015-12-05 21:26:16 -08:00
parent 42d49016d4
commit 742632e346
15 changed files with 464 additions and 95 deletions

View File

@ -592,6 +592,9 @@ Release 2.8.0 - UNRELEASED
YARN-4405. Support node label store in non-appendable file system. (Wangda
Tan via jianhe)
YARN-4358. Reservation System: Improve relationship between SharingPolicy
and ReservationAgent. (Carlo Curino via asuresh)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -18,10 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Date;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
@ -104,14 +108,17 @@ public class CapacityOverTimePolicy implements SharingPolicy {
IntegralResource maxAllowed = new IntegralResource(maxAvgRes);
maxAllowed.multiplyBy(validWindow / step);
RLESparseResourceAllocation userCons =
plan.getConsumptionForUserOverTime(reservation.getUser(), startTime
- validWindow, endTime + validWindow);
// check that the resources offered to the user during any window of length
// "validWindow" overlapping this allocation are within maxAllowed
// also enforce instantaneous and physical constraints during this pass
for (long t = startTime - validWindow; t < endTime + validWindow; t += step) {
Resource currExistingAllocTot = plan.getTotalCommittedResources(t);
Resource currExistingAllocForUser =
plan.getConsumptionForUser(reservation.getUser(), t);
Resource currExistingAllocForUser = userCons.getCapacityAtTime(t);
Resource currNewAlloc = reservation.getResourcesAtTime(t);
Resource currOldAlloc = Resources.none();
if (oldReservation != null) {
@ -163,8 +170,7 @@ public class CapacityOverTimePolicy implements SharingPolicy {
// expire contributions from instant in time before (t - validWindow)
if (t > startTime) {
Resource pastOldAlloc =
plan.getConsumptionForUser(reservation.getUser(), t - validWindow);
Resource pastOldAlloc = userCons.getCapacityAtTime(t - validWindow);
Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow);
// runningTot = runningTot - pastExistingAlloc - pastNewAlloc;
@ -188,6 +194,39 @@ public class CapacityOverTimePolicy implements SharingPolicy {
}
}
@Override
public RLESparseResourceAllocation availableResources(
RLESparseResourceAllocation available, Plan plan, String user,
ReservationId oldId, long start, long end) throws PlanningException {
// this only propagates the instantaneous maxInst properties, while
// the time-varying one depends on the current allocation as well
// and are not easily captured here
Resource planTotalCapacity = plan.getTotalCapacity();
Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst);
NavigableMap<Long, Resource> instQuota = new TreeMap<Long, Resource>();
instQuota.put(start, maxInsRes);
RLESparseResourceAllocation instRLEQuota =
new RLESparseResourceAllocation(instQuota,
plan.getResourceCalculator());
RLESparseResourceAllocation used =
plan.getConsumptionForUserOverTime(user, start, end);
instRLEQuota =
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
planTotalCapacity, instRLEQuota, used, RLEOperator.subtract, start,
end);
instRLEQuota =
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
planTotalCapacity, available, instRLEQuota, RLEOperator.min, start,
end);
return instRLEQuota;
}
@Override
public long getValidWindow() {
return validWindow;
@ -198,7 +237,7 @@ public class CapacityOverTimePolicy implements SharingPolicy {
* long(s), as using Resource to store the "integral" of the allocation over
* time leads to integer overflows for large allocations/clusters. (Evolving
* Resource to use long is too disruptive at this point.)
*
*
* The comparison/multiplication behaviors of IntegralResource are consistent
* with the DefaultResourceCalculator.
*/
@ -244,4 +283,7 @@ public class CapacityOverTimePolicy implements SharingPolicy {
return "<memory:" + memory + ", vCores:" + vcores + ">";
}
}
}

View File

@ -27,11 +27,13 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
@ -65,6 +67,9 @@ public class InMemoryPlan implements Plan {
private Map<String, RLESparseResourceAllocation> userResourceAlloc =
new HashMap<String, RLESparseResourceAllocation>();
private Map<String, RLESparseResourceAllocation> userActiveReservationCount =
new HashMap<String, RLESparseResourceAllocation>();
private Map<ReservationId, InMemoryReservationAllocation> reservationTable =
new HashMap<ReservationId, InMemoryReservationAllocation>();
@ -121,6 +126,7 @@ public class InMemoryPlan implements Plan {
return queueMetrics;
}
private void incrementAllocation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
Map<ReservationInterval, Resource> allocationRequests =
@ -132,11 +138,27 @@ public class InMemoryPlan implements Plan {
resAlloc = new RLESparseResourceAllocation(resCalc);
userResourceAlloc.put(user, resAlloc);
}
RLESparseResourceAllocation resCount = userActiveReservationCount.get(user);
if (resCount == null) {
resCount = new RLESparseResourceAllocation(resCalc);
userActiveReservationCount.put(user, resCount);
}
long earliestActive = Long.MAX_VALUE;
long latestActive = Long.MIN_VALUE;
for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
.entrySet()) {
resAlloc.addInterval(r.getKey(), r.getValue());
rleSparseVector.addInterval(r.getKey(), r.getValue());
if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
ZERO_RESOURCE)) {
earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
latestActive = Math.max(latestActive, r.getKey().getEndTime());
}
}
resCount.addInterval(new ReservationInterval(earliestActive, latestActive),
Resource.newInstance(1, 1));
}
private void decrementAllocation(ReservationAllocation reservation) {
@ -145,14 +167,29 @@ public class InMemoryPlan implements Plan {
reservation.getAllocationRequests();
String user = reservation.getUser();
RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
long earliestActive = Long.MAX_VALUE;
long latestActive = Long.MIN_VALUE;
for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
.entrySet()) {
resAlloc.removeInterval(r.getKey(), r.getValue());
rleSparseVector.removeInterval(r.getKey(), r.getValue());
if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
ZERO_RESOURCE)) {
earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
latestActive = Math.max(latestActive, r.getKey().getEndTime());
}
}
if (resAlloc.isEmpty()) {
userResourceAlloc.remove(user);
}
RLESparseResourceAllocation resCount = userActiveReservationCount.get(user);
resCount.removeInterval(new ReservationInterval(earliestActive,
latestActive), Resource.newInstance(1, 1));
if (resCount.isEmpty()) {
userActiveReservationCount.remove(user);
}
}
public Set<ReservationAllocation> getAllReservations() {
@ -160,9 +197,9 @@ public class InMemoryPlan implements Plan {
try {
if (currentReservations != null) {
Set<ReservationAllocation> flattenedReservations =
new HashSet<ReservationAllocation>();
for (Set<InMemoryReservationAllocation> reservationEntries : currentReservations
.values()) {
new TreeSet<ReservationAllocation>();
for (Set<InMemoryReservationAllocation> reservationEntries :
currentReservations.values()) {
flattenedReservations.addAll(reservationEntries);
}
return flattenedReservations;
@ -417,14 +454,34 @@ public class InMemoryPlan implements Plan {
}
@Override
public Resource getConsumptionForUser(String user, long t) {
public RLESparseResourceAllocation getReservationCountForUserOverTime(
String user, long start, long end) {
readLock.lock();
try {
RLESparseResourceAllocation userResAlloc =
userActiveReservationCount.get(user);
if (userResAlloc != null) {
return userResAlloc.getRangeOverlapping(start, end);
} else {
return new RLESparseResourceAllocation(resCalc);
}
} finally {
readLock.unlock();
}
}
@Override
public RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
long start, long end) {
readLock.lock();
try {
RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user);
if (userResAlloc != null) {
return userResAlloc.getCapacityAtTime(t);
return userResAlloc.getRangeOverlapping(start, end);
} else {
return Resources.clone(ZERO_RESOURCE);
return new RLESparseResourceAllocation(resCalc);
}
} finally {
readLock.unlock();
@ -464,6 +521,43 @@ public class InMemoryPlan implements Plan {
}
}
@Override
public RLESparseResourceAllocation getAvailableResourceOverTime(String user,
ReservationId oldId, long start, long end) throws PlanningException {
readLock.lock();
try {
// create RLE of totCapacity
TreeMap<Long, Resource> totAvailable = new TreeMap<Long, Resource>();
totAvailable.put(start, Resources.clone(totalCapacity));
RLESparseResourceAllocation totRLEAvail =
new RLESparseResourceAllocation(totAvailable, resCalc);
// subtract used from available
RLESparseResourceAllocation netAvailable;
netAvailable =
RLESparseResourceAllocation.merge(resCalc,
Resources.clone(totalCapacity), totRLEAvail, rleSparseVector,
RLEOperator.subtractTestNonNegative, start, end);
// add back in old reservation used resources if any
ReservationAllocation old = reservationTable.get(oldId);
if (old != null) {
netAvailable =
RLESparseResourceAllocation.merge(resCalc,
Resources.clone(totalCapacity), netAvailable,
old.getResourcesOverTime(), RLEOperator.add, start, end);
}
// lower it if this is needed by the sharing policy
netAvailable =
getSharingPolicy().availableResources(netAvailable, this, user,
oldId, start, end);
return netAvailable;
} finally {
readLock.unlock();
}
}
@Override
public Resource getMinimumAllocation() {
return Resources.clone(minAlloc);
@ -549,4 +643,21 @@ public class InMemoryPlan implements Plan {
}
}
@Override
public Set<ReservationAllocation> getReservationByUserAtTime(String user,
long t) {
readLock.lock();
try {
Set<ReservationAllocation> resSet = new HashSet<ReservationAllocation>();
for (ReservationAllocation ra : getReservationsAtTime(t)) {
String resUser = ra.getUser();
if (resUser != null && resUser.equals(user)) {
resSet.add(ra);
}
}
return resSet;
} finally {
readLock.unlock();
}
}
}

View File

@ -132,12 +132,17 @@ public class InMemoryReservationAllocation implements ReservationAllocation {
return Resources.clone(resourcesOverTime.getCapacityAtTime(tick));
}
@Override
public RLESparseResourceAllocation getResourcesOverTime(){
return resourcesOverTime;
}
@Override
public String toString() {
StringBuilder sBuf = new StringBuilder();
sBuf.append(getReservationId()).append(" user:").append(getUser())
.append(" startTime: ").append(getStartTime()).append(" endTime: ")
.append(getEndTime()).append(" alloc:[")
.append(getEndTime()).append(" alloc:\n[")
.append(resourcesOverTime.toString()).append("] ");
return sBuf.toString();
}
@ -151,6 +156,12 @@ public class InMemoryReservationAllocation implements ReservationAllocation {
if (this.getAcceptanceTime() < other.getAcceptanceTime()) {
return 1;
}
if (this.getReservationId().getId() > other.getReservationId().getId()) {
return -1;
}
if (this.getReservationId().getId() < other.getReservationId().getId()) {
return 1;
}
return 0;
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
@ -89,4 +90,11 @@ public class NoOverCommitPolicy implements SharingPolicy {
// nothing to do for this policy
}
@Override
public RLESparseResourceAllocation availableResources(
RLESparseResourceAllocation available, Plan plan, String user,
ReservationId oldId, long start, long end) throws PlanningException {
return available;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import java.util.Set;
@ -40,6 +41,17 @@ public interface PlanView extends PlanContext {
*/
public ReservationAllocation getReservationById(ReservationId reservationID);
/**
* Return a set of {@link ReservationAllocation} that belongs to a certain
* user and overlaps time t.
*
* @param user the user being considered
* @param t the instant in time being considered
* @return {@link Set<ReservationAllocation>} for this user at this time
*/
public Set<ReservationAllocation> getReservationByUserAtTime(String user,
long t);
/**
* Gets all the active reservations at the specified point of time
*
@ -67,18 +79,6 @@ public interface PlanView extends PlanContext {
*/
Resource getTotalCommittedResources(long tick);
/**
* Returns the total {@link Resource} reserved for a given user at the
* specified time
*
* @param user the user who made the reservation(s)
* @param tick the time (UTC in ms) for which the reserved resources are
* requested
* @return the total {@link Resource} reserved for a given user at the
* specified time
*/
public Resource getConsumptionForUser(String user, long tick);
/**
* Returns the overall capacity in terms of {@link Resource} assigned to this
* plan (typically will correspond to the absolute capacity of the
@ -98,9 +98,48 @@ public interface PlanView extends PlanContext {
/**
* Returns the time (UTC in ms) at which the last reservation terminates
*
*
* @return the time (UTC in ms) at which the last reservation terminates
*/
public long getLastEndTime();
/**
* This method returns the amount of resources available to a given user
* (optionally if removing a certain reservation) over the start-end time
* range.
*
* @param user
* @param oldId
* @param start
* @param end
* @return a view of the plan as it is available to this user
* @throws PlanningException
*/
public RLESparseResourceAllocation getAvailableResourceOverTime(String user,
ReservationId oldId, long start, long end) throws PlanningException;
/**
* This method returns a RLE encoded view of the user reservation count
* utilization between start and end time.
*
* @param user
* @param start
* @param end
* @return RLE encoded view of reservation used over time
*/
public RLESparseResourceAllocation getReservationCountForUserOverTime(
String user, long start, long end);
/**
* This method returns a RLE encoded view of the user reservation utilization
* between start and end time.
*
* @param user
* @param start
* @param end
* @return RLE encoded view of resources used over time
*/
public RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
long start, long end);
}

View File

@ -50,14 +50,14 @@ public interface ReservationAllocation extends
public ReservationDefinition getReservationDefinition();
/**
* Returns the time at which the reservation is activated
* Returns the time at which the reservation is activated.
*
* @return the time at which the reservation is activated
*/
public long getStartTime();
/**
* Returns the time at which the reservation terminates
* Returns the time at which the reservation terminates.
*
* @return the time at which the reservation terminates
*/
@ -65,7 +65,7 @@ public interface ReservationAllocation extends
/**
* Returns the map of resources requested against the time interval for which
* they were
* they were.
*
* @return the allocationRequests the map of resources requested against the
* time interval for which they were
@ -118,4 +118,10 @@ public interface ReservationAllocation extends
*/
public Resource getResourcesAtTime(long tick);
/**
* Return a RLE representation of used resources.
* @return a RLE encoding of resources allocated over time.
*/
public RLESparseResourceAllocation getResourcesOverTime();
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
/**
@ -32,7 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan
public interface SharingPolicy {
/**
* Initialize this policy
* Initialize this policy.
*
* @param planQueuePath the name of the queue for this plan
* @param conf the system configuration
@ -53,6 +54,26 @@ public interface SharingPolicy {
public void validate(Plan plan, ReservationAllocation newAllocation)
throws PlanningException;
/**
* This method provide a (partial) instantaneous validation by applying
* business rules (such as max number of parallel containers allowed for a
* user). To provide the agent with more feedback the returned parameter is
* expressed in number of containers that can be fit in this time according to
* the business rules.
*
* @param available the amount of resources that would be offered if not
* constrained by the policy
* @param plan reference the the current Plan
* @param user the username
* @param start the start time for the range we are querying
* @param end the end time for the range we are querying
* @param oldId (optional) the id of a reservation being updated
* @throws PlanningException throws if the request is not valid
*/
public RLESparseResourceAllocation availableResources(
RLESparseResourceAllocation available, Plan plan, String user,
ReservationId oldId, long start, long end) throws PlanningException;
/**
* Returns the time range before and after the current reservation considered
* by this policy. In particular, this informs the archival process for the
@ -63,4 +84,5 @@ public interface SharingPolicy {
*/
public long getValidWindow();
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResour
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@ -80,8 +81,8 @@ public class IterativePlanner extends PlanningAlgorithm {
@Override
public RLESparseResourceAllocation computeJobAllocation(Plan plan,
ReservationId reservationId, ReservationDefinition reservation)
throws ContractValidationException {
ReservationId reservationId, ReservationDefinition reservation,
String user) throws PlanningException {
// Initialize
initialize(plan, reservation);
@ -142,7 +143,7 @@ public class IterativePlanner extends PlanningAlgorithm {
// Compute the allocation of a single stage
Map<ReservationInterval, Resource> curAlloc =
computeStageAllocation(plan, currentReservationStage,
stageArrivalTime, stageDeadline);
stageArrivalTime, stageDeadline, user, reservationId);
// If we did not find an allocation, return NULL
// (unless it's an ANY job, then we simply continue).
@ -159,8 +160,8 @@ public class IterativePlanner extends PlanningAlgorithm {
}
// Get the start & end time of the current allocation
Long stageStartTime = findEarliestTime(curAlloc.keySet());
Long stageEndTime = findLatestTime(curAlloc.keySet());
Long stageStartTime = findEarliestTime(curAlloc);
Long stageEndTime = findLatestTime(curAlloc);
// If we did find an allocation for the stage, add it
for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
@ -310,10 +311,11 @@ public class IterativePlanner extends PlanningAlgorithm {
// Call algStageAllocator
protected Map<ReservationInterval, Resource> computeStageAllocation(
Plan plan, ReservationRequest rr, long stageArrivalTime,
long stageDeadline) {
long stageDeadline, String user, ReservationId oldId)
throws PlanningException {
return algStageAllocator.computeStageAllocation(plan, planLoads,
planModifications, rr, stageArrivalTime, stageDeadline);
planModifications, rr, stageArrivalTime, stageDeadline, user, oldId);
}

View File

@ -19,7 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
@ -62,7 +62,7 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
// Compute the job allocation
RLESparseResourceAllocation allocation =
computeJobAllocation(plan, reservationId, adjustedContract);
computeJobAllocation(plan, reservationId, adjustedContract, user);
// If no job allocation was found, fail
if (allocation == null) {
@ -84,8 +84,8 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
adjustedContract, // Contract
user, // User name
plan.getQueueName(), // Queue name
findEarliestTime(mapAllocations.keySet()), // Earliest start time
findLatestTime(mapAllocations.keySet()), // Latest end time
findEarliestTime(mapAllocations), // Earliest start time
findLatestTime(mapAllocations), // Latest end time
mapAllocations, // Allocations
plan.getResourceCalculator(), // Resource calculator
plan.getMinimumAllocation()); // Minimum allocation
@ -111,14 +111,14 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
Resource zeroResource = Resource.newInstance(0, 0);
// Pad at the beginning
long earliestStart = findEarliestTime(mapAllocations.keySet());
long earliestStart = findEarliestTime(mapAllocations);
if (jobArrival < earliestStart) {
mapAllocations.put(new ReservationInterval(jobArrival, earliestStart),
zeroResource);
}
// Pad at the beginning
long latestEnd = findLatestTime(mapAllocations.keySet());
long latestEnd = findLatestTime(mapAllocations);
if (latestEnd < jobDeadline) {
mapAllocations.put(new ReservationInterval(latestEnd, jobDeadline),
zeroResource);
@ -129,8 +129,8 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
}
public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan,
ReservationId reservationId, ReservationDefinition reservation)
throws PlanningException, ContractValidationException;
ReservationId reservationId, ReservationDefinition reservation,
String user) throws PlanningException, ContractValidationException;
@Override
public boolean createReservation(ReservationId reservationId, String user,
@ -162,24 +162,26 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
}
protected static long findEarliestTime(Set<ReservationInterval> sesInt) {
protected static long findEarliestTime(
Map<ReservationInterval, Resource> sesInt) {
long ret = Long.MAX_VALUE;
for (ReservationInterval s : sesInt) {
if (s.getStartTime() < ret) {
ret = s.getStartTime();
for (Entry<ReservationInterval, Resource> s : sesInt.entrySet()) {
if (s.getKey().getStartTime() < ret && s.getValue() != null) {
ret = s.getKey().getStartTime();
}
}
return ret;
}
protected static long findLatestTime(Set<ReservationInterval> sesInt) {
protected static long findLatestTime(Map<ReservationInterval,
Resource> sesInt) {
long ret = Long.MIN_VALUE;
for (ReservationInterval s : sesInt) {
if (s.getEndTime() > ret) {
ret = s.getEndTime();
for (Entry<ReservationInterval, Resource> s : sesInt.entrySet()) {
if (s.getKey().getEndTime() > ret && s.getValue() != null) {
ret = s.getKey().getEndTime();
}
}
return ret;

View File

@ -20,11 +20,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
/**
* Interface for allocating a single stage in IterativePlanner.
@ -46,10 +48,12 @@ public interface StageAllocator {
*
* @return The computed allocation (or null if the stage could not be
* allocated)
* @throws PlanningException
*/
Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
Map<Long, Resource> planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageEarliestStart, long stageDeadline);
long stageEarliestStart, long stageDeadline, String user,
ReservationId oldId) throws PlanningException;
}

View File

@ -21,11 +21,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@ -40,7 +43,8 @@ public class StageAllocatorGreedy implements StageAllocator {
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
Map<Long, Resource> planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageEarliestStart, long stageDeadline) {
long stageEarliestStart, long stageDeadline, String user,
ReservationId oldId) throws PlanningException {
Resource totalCapacity = plan.getTotalCapacity();
@ -63,6 +67,15 @@ public class StageAllocatorGreedy implements StageAllocator {
int maxGang = 0;
RLESparseResourceAllocation netAvailable =
plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
stageDeadline);
netAvailable =
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
plan.getTotalCapacity(), netAvailable, planModifications,
RLEOperator.subtract, stageEarliestStart, stageDeadline);
// loop trying to place until we are done, or we are considering
// an invalid range of times
while (gangsToPlace > 0 && stageDeadline - dur >= stageEarliestStart) {
@ -79,13 +92,7 @@ public class StageAllocatorGreedy implements StageAllocator {
for (long t = stageDeadline - plan.getStep(); t >= stageDeadline - dur
&& maxGang > 0; t = t - plan.getStep()) {
// compute net available resources
Resource netAvailableRes = Resources.clone(totalCapacity);
// Resources.addTo(netAvailableRes, oldResCap);
Resources.subtractFrom(netAvailableRes,
plan.getTotalCommittedResources(t));
Resources.subtractFrom(netAvailableRes,
planModifications.getCapacityAtTime(t));
Resource netAvailableRes = netAvailable.getCapacityAtTime(t);
// compute maximum number of gangs we could fit
curMaxGang =

View File

@ -22,6 +22,7 @@ import java.util.Comparator;
import java.util.Map;
import java.util.TreeSet;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
@ -60,7 +61,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
public Map<ReservationInterval, Resource> computeStageAllocation(
Plan plan, Map<Long, Resource> planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageEarliestStart, long stageDeadline) {
long stageEarliestStart, long stageDeadline, String user,
ReservationId oldId) {
// Initialize
ResourceCalculator resCalc = plan.getResourceCalculator();
@ -136,7 +138,9 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
DurationInterval bestDurationInterval =
durationIntervalsSortedByCost.first();
int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs);
numGangsToAllocate =
Math.min(numGangsToAllocate,
bestDurationInterval.numCanFit(gang, capacity, resCalc));
// Add it
remainingGangs -= numGangsToAllocate;
@ -355,5 +359,11 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
this.cost = value;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(" start: " + startTime).append(" end: " + endTime)
.append(" cost: " + cost).append(" maxLoad: " + maxLoad);
return sb.toString();
}
}
}

View File

@ -118,11 +118,18 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
checkAllocation(plan, alloc, start);
}
private void checkAllocation(Plan plan, int[] alloc, int start) {
RLESparseResourceAllocation userCons =
plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getConsumptionForUser(user, start + i));
userCons.getCapacityAtTime(start + i));
}
}
@ -180,12 +187,7 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getConsumptionForUser(user, start + i));
}
checkAllocation(plan, alloc, start);
// Try to add it again
try {
@ -226,11 +228,14 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
RLESparseResourceAllocation userCons =
plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getConsumptionForUser(user, start + i));
userCons.getCapacityAtTime(start + i));
}
// Now update it
@ -252,13 +257,18 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
userCons =
plan.getConsumptionForUserOverTime(user, start, start
+ updatedAlloc.length);
for (int i = 0; i < updatedAlloc.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
+ i), plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
+ i), plan.getConsumptionForUser(user, start + i));
+ i), userCons.getCapacityAtTime(start + i));
}
}
@ -321,13 +331,17 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
RLESparseResourceAllocation userCons =
plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
plan.getConsumptionForUser(user, start + i));
userCons.getCapacityAtTime(start + i));
}
// Now delete it
@ -337,11 +351,13 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
Assert.assertNull(plan.getReservationById(reservationID));
userCons =
plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(0, 0),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(0, 0),
plan.getConsumptionForUser(user, start + i));
userCons.getCapacityAtTime(start + i));
}
}
@ -393,14 +409,8 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
for (int i = 0; i < alloc1.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
plan.getConsumptionForUser(user, start + i));
}
checkAllocation(plan, alloc1, start);
// Now add another one
ReservationId reservationID2 =
@ -424,13 +434,17 @@ public class TestInMemoryPlan {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(plan.getReservationById(reservationID2));
RLESparseResourceAllocation userCons =
plan.getConsumptionForUserOverTime(user, start, start + alloc2.length);
for (int i = 0; i < alloc2.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
+ alloc2[i] + i), plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
+ alloc2[i] + i), plan.getConsumptionForUser(user, start + i));
+ alloc2[i] + i), userCons.getCapacityAtTime(start + i));
}
// Now archive completed reservations
@ -445,14 +459,8 @@ public class TestInMemoryPlan {
}
Assert.assertNotNull(plan.getReservationById(reservationID1));
Assert.assertNull(plan.getReservationById(reservationID2));
for (int i = 0; i < alloc1.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
plan.getConsumptionForUser(user, start + i));
}
checkAllocation(plan, alloc1, start);
when(clock.getTime()).thenReturn(107L);
try {
// will remove 1st reservation also as it has fallen out of the archival
@ -461,12 +469,16 @@ public class TestInMemoryPlan {
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
userCons =
plan.getConsumptionForUserOverTime(user, start, start + alloc1.length);
Assert.assertNull(plan.getReservationById(reservationID1));
for (int i = 0; i < alloc1.length; i++) {
Assert.assertEquals(Resource.newInstance(0, 0),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(0, 0),
plan.getConsumptionForUser(user, start + i));
userCons.getCapacityAtTime(start + i));
}
}

View File

@ -18,9 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -86,6 +89,7 @@ public class TestGreedyReservationAgent {
instConstraint, avgConstraint);
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
policy.init(reservationQ, conf);
agent = new GreedyReservationAgent();
QueueMetrics queueMetrics = mock(QueueMetrics.class);
@ -135,6 +139,94 @@ public class TestGreedyReservationAgent {
}
@SuppressWarnings("javadoc")
@Test
public void testSharingPolicyFeedback() throws PlanningException {
prepareBasicPlan();
// let's constraint the instantaneous allocation and see the
// policy kicking in during planning
float instConstraint = 40;
float avgConstraint = 40;
ReservationSchedulerConfiguration conf =
ReservationSystemTestUtil.createConf(plan.getQueueName(), 100000,
instConstraint, avgConstraint);
plan.getSharingPolicy().init(plan.getQueueName(), conf);
// create a request with a single atomic ask
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(5 * step);
rr.setDeadline(100 * step);
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(2048, 2), 20, 20,
10 * step);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setReservationResources(Collections.singletonList(r));
rr.setReservationRequests(reqs);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
agent.createReservation(reservationID, "u3", plan, rr);
ReservationId reservationID2 =
ReservationSystemTestUtil.getNewReservationId();
agent.createReservation(reservationID2, "u3", plan, rr);
ReservationDefinition rr3 = new ReservationDefinitionPBImpl();
rr3.setArrival(5 * step);
rr3.setDeadline(100 * step);
ReservationRequest r3 =
ReservationRequest.newInstance(Resource.newInstance(2048, 2), 45, 45,
10 * step);
ReservationRequests reqs3 = new ReservationRequestsPBImpl();
reqs3.setReservationResources(Collections.singletonList(r3));
rr3.setReservationRequests(reqs3);
ReservationId reservationID3 =
ReservationSystemTestUtil.getNewReservationId();
try {
// RR3 is simply too big to fit
agent.createReservation(reservationID3, "u3", plan, rr3);
fail();
} catch (PlanningException pe) {
// expected
}
assertTrue("Agent-based allocation failed", reservationID != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == 4);
ReservationAllocation cs = plan.getReservationById(reservationID);
ReservationAllocation cs2 = plan.getReservationById(reservationID2);
ReservationAllocation cs3 = plan.getReservationById(reservationID3);
assertNotNull(cs);
assertNotNull(cs2);
assertNull(cs3);
System.out.println("--------AFTER SIMPLE ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
for (long i = 90 * step; i < 100 * step; i++) {
assertTrue(
"Agent-based allocation unexpected",
Resources.equals(cs.getResourcesAtTime(i),
Resource.newInstance(2048 * 20, 2 * 20)));
}
// RR2 is pushed out by the presence of RR
for (long i = 80 * step; i < 90 * step; i++) {
assertTrue(
"Agent-based allocation unexpected",
Resources.equals(cs2.getResourcesAtTime(i),
Resource.newInstance(2048 * 20, 2 * 20)));
}
}
@Test
public void testOrder() throws PlanningException {
prepareBasicPlan();
@ -186,7 +278,6 @@ public class TestGreedyReservationAgent {
assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1));
assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1));
System.out.println("--------AFTER ORDER ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
@ -376,7 +467,6 @@ public class TestGreedyReservationAgent {
ReservationAllocation cs = plan.getReservationById(reservationID);
assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID
+ ")----------");
System.out.println(plan.toString());