YARN-4358. Reservation System: Improve relationship between SharingPolicy and ReservationAgent. (Carlo Curino via asuresh)
(cherry picked from commit 742632e346
)
This commit is contained in:
parent
6999e764cb
commit
78a07e99dd
|
@ -540,6 +540,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-4405. Support node label store in non-appendable file system. (Wangda
|
||||
Tan via jianhe)
|
||||
|
||||
YARN-4358. Reservation System: Improve relationship between SharingPolicy
|
||||
and ReservationAgent. (Carlo Curino via asuresh)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||
|
|
|
@ -18,10 +18,14 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
|
||||
|
@ -104,14 +108,17 @@ public class CapacityOverTimePolicy implements SharingPolicy {
|
|||
IntegralResource maxAllowed = new IntegralResource(maxAvgRes);
|
||||
maxAllowed.multiplyBy(validWindow / step);
|
||||
|
||||
RLESparseResourceAllocation userCons =
|
||||
plan.getConsumptionForUserOverTime(reservation.getUser(), startTime
|
||||
- validWindow, endTime + validWindow);
|
||||
|
||||
// check that the resources offered to the user during any window of length
|
||||
// "validWindow" overlapping this allocation are within maxAllowed
|
||||
// also enforce instantaneous and physical constraints during this pass
|
||||
for (long t = startTime - validWindow; t < endTime + validWindow; t += step) {
|
||||
|
||||
Resource currExistingAllocTot = plan.getTotalCommittedResources(t);
|
||||
Resource currExistingAllocForUser =
|
||||
plan.getConsumptionForUser(reservation.getUser(), t);
|
||||
Resource currExistingAllocForUser = userCons.getCapacityAtTime(t);
|
||||
Resource currNewAlloc = reservation.getResourcesAtTime(t);
|
||||
Resource currOldAlloc = Resources.none();
|
||||
if (oldReservation != null) {
|
||||
|
@ -163,8 +170,7 @@ public class CapacityOverTimePolicy implements SharingPolicy {
|
|||
|
||||
// expire contributions from instant in time before (t - validWindow)
|
||||
if (t > startTime) {
|
||||
Resource pastOldAlloc =
|
||||
plan.getConsumptionForUser(reservation.getUser(), t - validWindow);
|
||||
Resource pastOldAlloc = userCons.getCapacityAtTime(t - validWindow);
|
||||
Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow);
|
||||
|
||||
// runningTot = runningTot - pastExistingAlloc - pastNewAlloc;
|
||||
|
@ -188,6 +194,39 @@ public class CapacityOverTimePolicy implements SharingPolicy {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RLESparseResourceAllocation availableResources(
|
||||
RLESparseResourceAllocation available, Plan plan, String user,
|
||||
ReservationId oldId, long start, long end) throws PlanningException {
|
||||
|
||||
// this only propagates the instantaneous maxInst properties, while
|
||||
// the time-varying one depends on the current allocation as well
|
||||
// and are not easily captured here
|
||||
Resource planTotalCapacity = plan.getTotalCapacity();
|
||||
Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst);
|
||||
NavigableMap<Long, Resource> instQuota = new TreeMap<Long, Resource>();
|
||||
instQuota.put(start, maxInsRes);
|
||||
|
||||
RLESparseResourceAllocation instRLEQuota =
|
||||
new RLESparseResourceAllocation(instQuota,
|
||||
plan.getResourceCalculator());
|
||||
|
||||
RLESparseResourceAllocation used =
|
||||
plan.getConsumptionForUserOverTime(user, start, end);
|
||||
|
||||
instRLEQuota =
|
||||
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
|
||||
planTotalCapacity, instRLEQuota, used, RLEOperator.subtract, start,
|
||||
end);
|
||||
|
||||
instRLEQuota =
|
||||
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
|
||||
planTotalCapacity, available, instRLEQuota, RLEOperator.min, start,
|
||||
end);
|
||||
|
||||
return instRLEQuota;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getValidWindow() {
|
||||
return validWindow;
|
||||
|
@ -244,4 +283,7 @@ public class CapacityOverTimePolicy implements SharingPolicy {
|
|||
return "<memory:" + memory + ", vCores:" + vcores + ">";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -27,11 +27,13 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
|
@ -65,6 +67,9 @@ public class InMemoryPlan implements Plan {
|
|||
private Map<String, RLESparseResourceAllocation> userResourceAlloc =
|
||||
new HashMap<String, RLESparseResourceAllocation>();
|
||||
|
||||
private Map<String, RLESparseResourceAllocation> userActiveReservationCount =
|
||||
new HashMap<String, RLESparseResourceAllocation>();
|
||||
|
||||
private Map<ReservationId, InMemoryReservationAllocation> reservationTable =
|
||||
new HashMap<ReservationId, InMemoryReservationAllocation>();
|
||||
|
||||
|
@ -121,6 +126,7 @@ public class InMemoryPlan implements Plan {
|
|||
return queueMetrics;
|
||||
}
|
||||
|
||||
|
||||
private void incrementAllocation(ReservationAllocation reservation) {
|
||||
assert (readWriteLock.isWriteLockedByCurrentThread());
|
||||
Map<ReservationInterval, Resource> allocationRequests =
|
||||
|
@ -132,12 +138,28 @@ public class InMemoryPlan implements Plan {
|
|||
resAlloc = new RLESparseResourceAllocation(resCalc);
|
||||
userResourceAlloc.put(user, resAlloc);
|
||||
}
|
||||
RLESparseResourceAllocation resCount = userActiveReservationCount.get(user);
|
||||
if (resCount == null) {
|
||||
resCount = new RLESparseResourceAllocation(resCalc);
|
||||
userActiveReservationCount.put(user, resCount);
|
||||
}
|
||||
|
||||
long earliestActive = Long.MAX_VALUE;
|
||||
long latestActive = Long.MIN_VALUE;
|
||||
|
||||
for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
|
||||
.entrySet()) {
|
||||
resAlloc.addInterval(r.getKey(), r.getValue());
|
||||
rleSparseVector.addInterval(r.getKey(), r.getValue());
|
||||
if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
|
||||
ZERO_RESOURCE)) {
|
||||
earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
|
||||
latestActive = Math.max(latestActive, r.getKey().getEndTime());
|
||||
}
|
||||
}
|
||||
resCount.addInterval(new ReservationInterval(earliestActive, latestActive),
|
||||
Resource.newInstance(1, 1));
|
||||
}
|
||||
|
||||
private void decrementAllocation(ReservationAllocation reservation) {
|
||||
assert (readWriteLock.isWriteLockedByCurrentThread());
|
||||
|
@ -145,14 +167,29 @@ public class InMemoryPlan implements Plan {
|
|||
reservation.getAllocationRequests();
|
||||
String user = reservation.getUser();
|
||||
RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
|
||||
|
||||
long earliestActive = Long.MAX_VALUE;
|
||||
long latestActive = Long.MIN_VALUE;
|
||||
for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
|
||||
.entrySet()) {
|
||||
resAlloc.removeInterval(r.getKey(), r.getValue());
|
||||
rleSparseVector.removeInterval(r.getKey(), r.getValue());
|
||||
if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(),
|
||||
ZERO_RESOURCE)) {
|
||||
earliestActive = Math.min(earliestActive, r.getKey().getStartTime());
|
||||
latestActive = Math.max(latestActive, r.getKey().getEndTime());
|
||||
}
|
||||
}
|
||||
if (resAlloc.isEmpty()) {
|
||||
userResourceAlloc.remove(user);
|
||||
}
|
||||
|
||||
RLESparseResourceAllocation resCount = userActiveReservationCount.get(user);
|
||||
resCount.removeInterval(new ReservationInterval(earliestActive,
|
||||
latestActive), Resource.newInstance(1, 1));
|
||||
if (resCount.isEmpty()) {
|
||||
userActiveReservationCount.remove(user);
|
||||
}
|
||||
}
|
||||
|
||||
public Set<ReservationAllocation> getAllReservations() {
|
||||
|
@ -160,9 +197,9 @@ public class InMemoryPlan implements Plan {
|
|||
try {
|
||||
if (currentReservations != null) {
|
||||
Set<ReservationAllocation> flattenedReservations =
|
||||
new HashSet<ReservationAllocation>();
|
||||
for (Set<InMemoryReservationAllocation> reservationEntries : currentReservations
|
||||
.values()) {
|
||||
new TreeSet<ReservationAllocation>();
|
||||
for (Set<InMemoryReservationAllocation> reservationEntries :
|
||||
currentReservations.values()) {
|
||||
flattenedReservations.addAll(reservationEntries);
|
||||
}
|
||||
return flattenedReservations;
|
||||
|
@ -417,14 +454,34 @@ public class InMemoryPlan implements Plan {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Resource getConsumptionForUser(String user, long t) {
|
||||
public RLESparseResourceAllocation getReservationCountForUserOverTime(
|
||||
String user, long start, long end) {
|
||||
readLock.lock();
|
||||
try {
|
||||
RLESparseResourceAllocation userResAlloc =
|
||||
userActiveReservationCount.get(user);
|
||||
|
||||
if (userResAlloc != null) {
|
||||
return userResAlloc.getRangeOverlapping(start, end);
|
||||
} else {
|
||||
return new RLESparseResourceAllocation(resCalc);
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
|
||||
long start, long end) {
|
||||
readLock.lock();
|
||||
try {
|
||||
RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user);
|
||||
|
||||
if (userResAlloc != null) {
|
||||
return userResAlloc.getCapacityAtTime(t);
|
||||
return userResAlloc.getRangeOverlapping(start, end);
|
||||
} else {
|
||||
return Resources.clone(ZERO_RESOURCE);
|
||||
return new RLESparseResourceAllocation(resCalc);
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
|
@ -464,6 +521,43 @@ public class InMemoryPlan implements Plan {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RLESparseResourceAllocation getAvailableResourceOverTime(String user,
|
||||
ReservationId oldId, long start, long end) throws PlanningException {
|
||||
readLock.lock();
|
||||
try {
|
||||
// create RLE of totCapacity
|
||||
TreeMap<Long, Resource> totAvailable = new TreeMap<Long, Resource>();
|
||||
totAvailable.put(start, Resources.clone(totalCapacity));
|
||||
RLESparseResourceAllocation totRLEAvail =
|
||||
new RLESparseResourceAllocation(totAvailable, resCalc);
|
||||
|
||||
// subtract used from available
|
||||
RLESparseResourceAllocation netAvailable;
|
||||
|
||||
netAvailable =
|
||||
RLESparseResourceAllocation.merge(resCalc,
|
||||
Resources.clone(totalCapacity), totRLEAvail, rleSparseVector,
|
||||
RLEOperator.subtractTestNonNegative, start, end);
|
||||
|
||||
// add back in old reservation used resources if any
|
||||
ReservationAllocation old = reservationTable.get(oldId);
|
||||
if (old != null) {
|
||||
netAvailable =
|
||||
RLESparseResourceAllocation.merge(resCalc,
|
||||
Resources.clone(totalCapacity), netAvailable,
|
||||
old.getResourcesOverTime(), RLEOperator.add, start, end);
|
||||
}
|
||||
// lower it if this is needed by the sharing policy
|
||||
netAvailable =
|
||||
getSharingPolicy().availableResources(netAvailable, this, user,
|
||||
oldId, start, end);
|
||||
return netAvailable;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getMinimumAllocation() {
|
||||
return Resources.clone(minAlloc);
|
||||
|
@ -549,4 +643,21 @@ public class InMemoryPlan implements Plan {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<ReservationAllocation> getReservationByUserAtTime(String user,
|
||||
long t) {
|
||||
readLock.lock();
|
||||
try {
|
||||
Set<ReservationAllocation> resSet = new HashSet<ReservationAllocation>();
|
||||
for (ReservationAllocation ra : getReservationsAtTime(t)) {
|
||||
String resUser = ra.getUser();
|
||||
if (resUser != null && resUser.equals(user)) {
|
||||
resSet.add(ra);
|
||||
}
|
||||
}
|
||||
return resSet;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -132,12 +132,17 @@ public class InMemoryReservationAllocation implements ReservationAllocation {
|
|||
return Resources.clone(resourcesOverTime.getCapacityAtTime(tick));
|
||||
}
|
||||
|
||||
@Override
|
||||
public RLESparseResourceAllocation getResourcesOverTime(){
|
||||
return resourcesOverTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sBuf = new StringBuilder();
|
||||
sBuf.append(getReservationId()).append(" user:").append(getUser())
|
||||
.append(" startTime: ").append(getStartTime()).append(" endTime: ")
|
||||
.append(getEndTime()).append(" alloc:[")
|
||||
.append(getEndTime()).append(" alloc:\n[")
|
||||
.append(resourcesOverTime.toString()).append("] ");
|
||||
return sBuf.toString();
|
||||
}
|
||||
|
@ -151,6 +156,12 @@ public class InMemoryReservationAllocation implements ReservationAllocation {
|
|||
if (this.getAcceptanceTime() < other.getAcceptanceTime()) {
|
||||
return 1;
|
||||
}
|
||||
if (this.getReservationId().getId() > other.getReservationId().getId()) {
|
||||
return -1;
|
||||
}
|
||||
if (this.getReservationId().getId() < other.getReservationId().getId()) {
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
|
@ -89,4 +90,11 @@ public class NoOverCommitPolicy implements SharingPolicy {
|
|||
// nothing to do for this policy
|
||||
}
|
||||
|
||||
@Override
|
||||
public RLESparseResourceAllocation availableResources(
|
||||
RLESparseResourceAllocation available, Plan plan, String user,
|
||||
ReservationId oldId, long start, long end) throws PlanningException {
|
||||
return available;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
|||
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -40,6 +41,17 @@ public interface PlanView extends PlanContext {
|
|||
*/
|
||||
public ReservationAllocation getReservationById(ReservationId reservationID);
|
||||
|
||||
/**
|
||||
* Return a set of {@link ReservationAllocation} that belongs to a certain
|
||||
* user and overlaps time t.
|
||||
*
|
||||
* @param user the user being considered
|
||||
* @param t the instant in time being considered
|
||||
* @return {@link Set<ReservationAllocation>} for this user at this time
|
||||
*/
|
||||
public Set<ReservationAllocation> getReservationByUserAtTime(String user,
|
||||
long t);
|
||||
|
||||
/**
|
||||
* Gets all the active reservations at the specified point of time
|
||||
*
|
||||
|
@ -67,18 +79,6 @@ public interface PlanView extends PlanContext {
|
|||
*/
|
||||
Resource getTotalCommittedResources(long tick);
|
||||
|
||||
/**
|
||||
* Returns the total {@link Resource} reserved for a given user at the
|
||||
* specified time
|
||||
*
|
||||
* @param user the user who made the reservation(s)
|
||||
* @param tick the time (UTC in ms) for which the reserved resources are
|
||||
* requested
|
||||
* @return the total {@link Resource} reserved for a given user at the
|
||||
* specified time
|
||||
*/
|
||||
public Resource getConsumptionForUser(String user, long tick);
|
||||
|
||||
/**
|
||||
* Returns the overall capacity in terms of {@link Resource} assigned to this
|
||||
* plan (typically will correspond to the absolute capacity of the
|
||||
|
@ -103,4 +103,43 @@ public interface PlanView extends PlanContext {
|
|||
*/
|
||||
public long getLastEndTime();
|
||||
|
||||
/**
|
||||
* This method returns the amount of resources available to a given user
|
||||
* (optionally if removing a certain reservation) over the start-end time
|
||||
* range.
|
||||
*
|
||||
* @param user
|
||||
* @param oldId
|
||||
* @param start
|
||||
* @param end
|
||||
* @return a view of the plan as it is available to this user
|
||||
* @throws PlanningException
|
||||
*/
|
||||
public RLESparseResourceAllocation getAvailableResourceOverTime(String user,
|
||||
ReservationId oldId, long start, long end) throws PlanningException;
|
||||
|
||||
/**
|
||||
* This method returns a RLE encoded view of the user reservation count
|
||||
* utilization between start and end time.
|
||||
*
|
||||
* @param user
|
||||
* @param start
|
||||
* @param end
|
||||
* @return RLE encoded view of reservation used over time
|
||||
*/
|
||||
public RLESparseResourceAllocation getReservationCountForUserOverTime(
|
||||
String user, long start, long end);
|
||||
|
||||
/**
|
||||
* This method returns a RLE encoded view of the user reservation utilization
|
||||
* between start and end time.
|
||||
*
|
||||
* @param user
|
||||
* @param start
|
||||
* @param end
|
||||
* @return RLE encoded view of resources used over time
|
||||
*/
|
||||
public RLESparseResourceAllocation getConsumptionForUserOverTime(String user,
|
||||
long start, long end);
|
||||
|
||||
}
|
||||
|
|
|
@ -50,14 +50,14 @@ public interface ReservationAllocation extends
|
|||
public ReservationDefinition getReservationDefinition();
|
||||
|
||||
/**
|
||||
* Returns the time at which the reservation is activated
|
||||
* Returns the time at which the reservation is activated.
|
||||
*
|
||||
* @return the time at which the reservation is activated
|
||||
*/
|
||||
public long getStartTime();
|
||||
|
||||
/**
|
||||
* Returns the time at which the reservation terminates
|
||||
* Returns the time at which the reservation terminates.
|
||||
*
|
||||
* @return the time at which the reservation terminates
|
||||
*/
|
||||
|
@ -65,7 +65,7 @@ public interface ReservationAllocation extends
|
|||
|
||||
/**
|
||||
* Returns the map of resources requested against the time interval for which
|
||||
* they were
|
||||
* they were.
|
||||
*
|
||||
* @return the allocationRequests the map of resources requested against the
|
||||
* time interval for which they were
|
||||
|
@ -118,4 +118,10 @@ public interface ReservationAllocation extends
|
|||
*/
|
||||
public Resource getResourcesAtTime(long tick);
|
||||
|
||||
/**
|
||||
* Return a RLE representation of used resources.
|
||||
* @return a RLE encoding of resources allocated over time.
|
||||
*/
|
||||
public RLESparseResourceAllocation getResourcesOverTime();
|
||||
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
|
||||
/**
|
||||
|
@ -32,7 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan
|
|||
public interface SharingPolicy {
|
||||
|
||||
/**
|
||||
* Initialize this policy
|
||||
* Initialize this policy.
|
||||
*
|
||||
* @param planQueuePath the name of the queue for this plan
|
||||
* @param conf the system configuration
|
||||
|
@ -53,6 +54,26 @@ public interface SharingPolicy {
|
|||
public void validate(Plan plan, ReservationAllocation newAllocation)
|
||||
throws PlanningException;
|
||||
|
||||
/**
|
||||
* This method provide a (partial) instantaneous validation by applying
|
||||
* business rules (such as max number of parallel containers allowed for a
|
||||
* user). To provide the agent with more feedback the returned parameter is
|
||||
* expressed in number of containers that can be fit in this time according to
|
||||
* the business rules.
|
||||
*
|
||||
* @param available the amount of resources that would be offered if not
|
||||
* constrained by the policy
|
||||
* @param plan reference the the current Plan
|
||||
* @param user the username
|
||||
* @param start the start time for the range we are querying
|
||||
* @param end the end time for the range we are querying
|
||||
* @param oldId (optional) the id of a reservation being updated
|
||||
* @throws PlanningException throws if the request is not valid
|
||||
*/
|
||||
public RLESparseResourceAllocation availableResources(
|
||||
RLESparseResourceAllocation available, Plan plan, String user,
|
||||
ReservationId oldId, long start, long end) throws PlanningException;
|
||||
|
||||
/**
|
||||
* Returns the time range before and after the current reservation considered
|
||||
* by this policy. In particular, this informs the archival process for the
|
||||
|
@ -63,4 +84,5 @@ public interface SharingPolicy {
|
|||
*/
|
||||
public long getValidWindow();
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResour
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
/**
|
||||
|
@ -80,8 +81,8 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
|
||||
@Override
|
||||
public RLESparseResourceAllocation computeJobAllocation(Plan plan,
|
||||
ReservationId reservationId, ReservationDefinition reservation)
|
||||
throws ContractValidationException {
|
||||
ReservationId reservationId, ReservationDefinition reservation,
|
||||
String user) throws PlanningException {
|
||||
|
||||
// Initialize
|
||||
initialize(plan, reservation);
|
||||
|
@ -142,7 +143,7 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
// Compute the allocation of a single stage
|
||||
Map<ReservationInterval, Resource> curAlloc =
|
||||
computeStageAllocation(plan, currentReservationStage,
|
||||
stageArrivalTime, stageDeadline);
|
||||
stageArrivalTime, stageDeadline, user, reservationId);
|
||||
|
||||
// If we did not find an allocation, return NULL
|
||||
// (unless it's an ANY job, then we simply continue).
|
||||
|
@ -159,8 +160,8 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
}
|
||||
|
||||
// Get the start & end time of the current allocation
|
||||
Long stageStartTime = findEarliestTime(curAlloc.keySet());
|
||||
Long stageEndTime = findLatestTime(curAlloc.keySet());
|
||||
Long stageStartTime = findEarliestTime(curAlloc);
|
||||
Long stageEndTime = findLatestTime(curAlloc);
|
||||
|
||||
// If we did find an allocation for the stage, add it
|
||||
for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
|
||||
|
@ -310,10 +311,11 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
// Call algStageAllocator
|
||||
protected Map<ReservationInterval, Resource> computeStageAllocation(
|
||||
Plan plan, ReservationRequest rr, long stageArrivalTime,
|
||||
long stageDeadline) {
|
||||
long stageDeadline, String user, ReservationId oldId)
|
||||
throws PlanningException {
|
||||
|
||||
return algStageAllocator.computeStageAllocation(plan, planLoads,
|
||||
planModifications, rr, stageArrivalTime, stageDeadline);
|
||||
planModifications, rr, stageArrivalTime, stageDeadline, user, oldId);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
|
@ -62,7 +62,7 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
|
|||
|
||||
// Compute the job allocation
|
||||
RLESparseResourceAllocation allocation =
|
||||
computeJobAllocation(plan, reservationId, adjustedContract);
|
||||
computeJobAllocation(plan, reservationId, adjustedContract, user);
|
||||
|
||||
// If no job allocation was found, fail
|
||||
if (allocation == null) {
|
||||
|
@ -84,8 +84,8 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
|
|||
adjustedContract, // Contract
|
||||
user, // User name
|
||||
plan.getQueueName(), // Queue name
|
||||
findEarliestTime(mapAllocations.keySet()), // Earliest start time
|
||||
findLatestTime(mapAllocations.keySet()), // Latest end time
|
||||
findEarliestTime(mapAllocations), // Earliest start time
|
||||
findLatestTime(mapAllocations), // Latest end time
|
||||
mapAllocations, // Allocations
|
||||
plan.getResourceCalculator(), // Resource calculator
|
||||
plan.getMinimumAllocation()); // Minimum allocation
|
||||
|
@ -111,14 +111,14 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
|
|||
Resource zeroResource = Resource.newInstance(0, 0);
|
||||
|
||||
// Pad at the beginning
|
||||
long earliestStart = findEarliestTime(mapAllocations.keySet());
|
||||
long earliestStart = findEarliestTime(mapAllocations);
|
||||
if (jobArrival < earliestStart) {
|
||||
mapAllocations.put(new ReservationInterval(jobArrival, earliestStart),
|
||||
zeroResource);
|
||||
}
|
||||
|
||||
// Pad at the beginning
|
||||
long latestEnd = findLatestTime(mapAllocations.keySet());
|
||||
long latestEnd = findLatestTime(mapAllocations);
|
||||
if (latestEnd < jobDeadline) {
|
||||
mapAllocations.put(new ReservationInterval(latestEnd, jobDeadline),
|
||||
zeroResource);
|
||||
|
@ -129,8 +129,8 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
|
|||
}
|
||||
|
||||
public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan,
|
||||
ReservationId reservationId, ReservationDefinition reservation)
|
||||
throws PlanningException, ContractValidationException;
|
||||
ReservationId reservationId, ReservationDefinition reservation,
|
||||
String user) throws PlanningException, ContractValidationException;
|
||||
|
||||
@Override
|
||||
public boolean createReservation(ReservationId reservationId, String user,
|
||||
|
@ -162,24 +162,26 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
|
|||
|
||||
}
|
||||
|
||||
protected static long findEarliestTime(Set<ReservationInterval> sesInt) {
|
||||
protected static long findEarliestTime(
|
||||
Map<ReservationInterval, Resource> sesInt) {
|
||||
|
||||
long ret = Long.MAX_VALUE;
|
||||
for (ReservationInterval s : sesInt) {
|
||||
if (s.getStartTime() < ret) {
|
||||
ret = s.getStartTime();
|
||||
for (Entry<ReservationInterval, Resource> s : sesInt.entrySet()) {
|
||||
if (s.getKey().getStartTime() < ret && s.getValue() != null) {
|
||||
ret = s.getKey().getStartTime();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
||||
}
|
||||
|
||||
protected static long findLatestTime(Set<ReservationInterval> sesInt) {
|
||||
protected static long findLatestTime(Map<ReservationInterval,
|
||||
Resource> sesInt) {
|
||||
|
||||
long ret = Long.MIN_VALUE;
|
||||
for (ReservationInterval s : sesInt) {
|
||||
if (s.getEndTime() > ret) {
|
||||
ret = s.getEndTime();
|
||||
for (Entry<ReservationInterval, Resource> s : sesInt.entrySet()) {
|
||||
if (s.getKey().getEndTime() > ret && s.getValue() != null) {
|
||||
ret = s.getKey().getEndTime();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
|
|
@ -20,11 +20,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
|
|||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
|
||||
/**
|
||||
* Interface for allocating a single stage in IterativePlanner.
|
||||
|
@ -46,10 +48,12 @@ public interface StageAllocator {
|
|||
*
|
||||
* @return The computed allocation (or null if the stage could not be
|
||||
* allocated)
|
||||
* @throws PlanningException
|
||||
*/
|
||||
Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
|
||||
Map<Long, Resource> planLoads,
|
||||
RLESparseResourceAllocation planModifications, ReservationRequest rr,
|
||||
long stageEarliestStart, long stageDeadline);
|
||||
long stageEarliestStart, long stageDeadline, String user,
|
||||
ReservationId oldId) throws PlanningException;
|
||||
|
||||
}
|
|
@ -21,11 +21,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
/**
|
||||
|
@ -40,7 +43,8 @@ public class StageAllocatorGreedy implements StageAllocator {
|
|||
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
|
||||
Map<Long, Resource> planLoads,
|
||||
RLESparseResourceAllocation planModifications, ReservationRequest rr,
|
||||
long stageEarliestStart, long stageDeadline) {
|
||||
long stageEarliestStart, long stageDeadline, String user,
|
||||
ReservationId oldId) throws PlanningException {
|
||||
|
||||
Resource totalCapacity = plan.getTotalCapacity();
|
||||
|
||||
|
@ -63,6 +67,15 @@ public class StageAllocatorGreedy implements StageAllocator {
|
|||
|
||||
int maxGang = 0;
|
||||
|
||||
RLESparseResourceAllocation netAvailable =
|
||||
plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
|
||||
stageDeadline);
|
||||
|
||||
netAvailable =
|
||||
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
|
||||
plan.getTotalCapacity(), netAvailable, planModifications,
|
||||
RLEOperator.subtract, stageEarliestStart, stageDeadline);
|
||||
|
||||
// loop trying to place until we are done, or we are considering
|
||||
// an invalid range of times
|
||||
while (gangsToPlace > 0 && stageDeadline - dur >= stageEarliestStart) {
|
||||
|
@ -79,13 +92,7 @@ public class StageAllocatorGreedy implements StageAllocator {
|
|||
for (long t = stageDeadline - plan.getStep(); t >= stageDeadline - dur
|
||||
&& maxGang > 0; t = t - plan.getStep()) {
|
||||
|
||||
// compute net available resources
|
||||
Resource netAvailableRes = Resources.clone(totalCapacity);
|
||||
// Resources.addTo(netAvailableRes, oldResCap);
|
||||
Resources.subtractFrom(netAvailableRes,
|
||||
plan.getTotalCommittedResources(t));
|
||||
Resources.subtractFrom(netAvailableRes,
|
||||
planModifications.getCapacityAtTime(t));
|
||||
Resource netAvailableRes = netAvailable.getCapacityAtTime(t);
|
||||
|
||||
// compute maximum number of gangs we could fit
|
||||
curMaxGang =
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.Comparator;
|
|||
import java.util.Map;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
|
||||
|
@ -60,7 +61,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
|
|||
public Map<ReservationInterval, Resource> computeStageAllocation(
|
||||
Plan plan, Map<Long, Resource> planLoads,
|
||||
RLESparseResourceAllocation planModifications, ReservationRequest rr,
|
||||
long stageEarliestStart, long stageDeadline) {
|
||||
long stageEarliestStart, long stageDeadline, String user,
|
||||
ReservationId oldId) {
|
||||
|
||||
// Initialize
|
||||
ResourceCalculator resCalc = plan.getResourceCalculator();
|
||||
|
@ -136,7 +138,9 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
|
|||
DurationInterval bestDurationInterval =
|
||||
durationIntervalsSortedByCost.first();
|
||||
int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs);
|
||||
|
||||
numGangsToAllocate =
|
||||
Math.min(numGangsToAllocate,
|
||||
bestDurationInterval.numCanFit(gang, capacity, resCalc));
|
||||
// Add it
|
||||
remainingGangs -= numGangsToAllocate;
|
||||
|
||||
|
@ -355,5 +359,11 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
|
|||
this.cost = value;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(" start: " + startTime).append(" end: " + endTime)
|
||||
.append(" cost: " + cost).append(" maxLoad: " + maxLoad);
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -118,11 +118,18 @@ public class TestInMemoryPlan {
|
|||
Assert.fail(e.getMessage());
|
||||
}
|
||||
doAssertions(plan, rAllocation);
|
||||
checkAllocation(plan, alloc, start);
|
||||
}
|
||||
|
||||
private void checkAllocation(Plan plan, int[] alloc, int start) {
|
||||
RLESparseResourceAllocation userCons =
|
||||
plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
|
||||
|
||||
for (int i = 0; i < alloc.length; i++) {
|
||||
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
|
||||
plan.getTotalCommittedResources(start + i));
|
||||
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
|
||||
plan.getConsumptionForUser(user, start + i));
|
||||
userCons.getCapacityAtTime(start + i));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -180,12 +187,7 @@ public class TestInMemoryPlan {
|
|||
Assert.fail(e.getMessage());
|
||||
}
|
||||
doAssertions(plan, rAllocation);
|
||||
for (int i = 0; i < alloc.length; i++) {
|
||||
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
|
||||
plan.getTotalCommittedResources(start + i));
|
||||
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
|
||||
plan.getConsumptionForUser(user, start + i));
|
||||
}
|
||||
checkAllocation(plan, alloc, start);
|
||||
|
||||
// Try to add it again
|
||||
try {
|
||||
|
@ -226,11 +228,14 @@ public class TestInMemoryPlan {
|
|||
Assert.fail(e.getMessage());
|
||||
}
|
||||
doAssertions(plan, rAllocation);
|
||||
|
||||
RLESparseResourceAllocation userCons =
|
||||
plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
|
||||
for (int i = 0; i < alloc.length; i++) {
|
||||
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
|
||||
plan.getTotalCommittedResources(start + i));
|
||||
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
|
||||
plan.getConsumptionForUser(user, start + i));
|
||||
userCons.getCapacityAtTime(start + i));
|
||||
}
|
||||
|
||||
// Now update it
|
||||
|
@ -252,13 +257,18 @@ public class TestInMemoryPlan {
|
|||
Assert.fail(e.getMessage());
|
||||
}
|
||||
doAssertions(plan, rAllocation);
|
||||
|
||||
userCons =
|
||||
plan.getConsumptionForUserOverTime(user, start, start
|
||||
+ updatedAlloc.length);
|
||||
|
||||
for (int i = 0; i < updatedAlloc.length; i++) {
|
||||
Assert.assertEquals(
|
||||
Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
|
||||
+ i), plan.getTotalCommittedResources(start + i));
|
||||
Assert.assertEquals(
|
||||
Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
|
||||
+ i), plan.getConsumptionForUser(user, start + i));
|
||||
+ i), userCons.getCapacityAtTime(start + i));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -321,13 +331,17 @@ public class TestInMemoryPlan {
|
|||
Assert.fail(e.getMessage());
|
||||
}
|
||||
doAssertions(plan, rAllocation);
|
||||
|
||||
RLESparseResourceAllocation userCons =
|
||||
plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
|
||||
|
||||
for (int i = 0; i < alloc.length; i++) {
|
||||
Assert.assertEquals(
|
||||
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
|
||||
plan.getTotalCommittedResources(start + i));
|
||||
Assert.assertEquals(
|
||||
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
|
||||
plan.getConsumptionForUser(user, start + i));
|
||||
userCons.getCapacityAtTime(start + i));
|
||||
}
|
||||
|
||||
// Now delete it
|
||||
|
@ -337,11 +351,13 @@ public class TestInMemoryPlan {
|
|||
Assert.fail(e.getMessage());
|
||||
}
|
||||
Assert.assertNull(plan.getReservationById(reservationID));
|
||||
userCons =
|
||||
plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
|
||||
for (int i = 0; i < alloc.length; i++) {
|
||||
Assert.assertEquals(Resource.newInstance(0, 0),
|
||||
plan.getTotalCommittedResources(start + i));
|
||||
Assert.assertEquals(Resource.newInstance(0, 0),
|
||||
plan.getConsumptionForUser(user, start + i));
|
||||
userCons.getCapacityAtTime(start + i));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -393,14 +409,8 @@ public class TestInMemoryPlan {
|
|||
Assert.fail(e.getMessage());
|
||||
}
|
||||
doAssertions(plan, rAllocation);
|
||||
for (int i = 0; i < alloc1.length; i++) {
|
||||
Assert.assertEquals(
|
||||
Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
|
||||
plan.getTotalCommittedResources(start + i));
|
||||
Assert.assertEquals(
|
||||
Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
|
||||
plan.getConsumptionForUser(user, start + i));
|
||||
}
|
||||
checkAllocation(plan, alloc1, start);
|
||||
|
||||
|
||||
// Now add another one
|
||||
ReservationId reservationID2 =
|
||||
|
@ -424,13 +434,17 @@ public class TestInMemoryPlan {
|
|||
Assert.fail(e.getMessage());
|
||||
}
|
||||
Assert.assertNotNull(plan.getReservationById(reservationID2));
|
||||
|
||||
RLESparseResourceAllocation userCons =
|
||||
plan.getConsumptionForUserOverTime(user, start, start + alloc2.length);
|
||||
|
||||
for (int i = 0; i < alloc2.length; i++) {
|
||||
Assert.assertEquals(
|
||||
Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
|
||||
+ alloc2[i] + i), plan.getTotalCommittedResources(start + i));
|
||||
Assert.assertEquals(
|
||||
Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
|
||||
+ alloc2[i] + i), plan.getConsumptionForUser(user, start + i));
|
||||
+ alloc2[i] + i), userCons.getCapacityAtTime(start + i));
|
||||
}
|
||||
|
||||
// Now archive completed reservations
|
||||
|
@ -445,14 +459,8 @@ public class TestInMemoryPlan {
|
|||
}
|
||||
Assert.assertNotNull(plan.getReservationById(reservationID1));
|
||||
Assert.assertNull(plan.getReservationById(reservationID2));
|
||||
for (int i = 0; i < alloc1.length; i++) {
|
||||
Assert.assertEquals(
|
||||
Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
|
||||
plan.getTotalCommittedResources(start + i));
|
||||
Assert.assertEquals(
|
||||
Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
|
||||
plan.getConsumptionForUser(user, start + i));
|
||||
}
|
||||
checkAllocation(plan, alloc1, start);
|
||||
|
||||
when(clock.getTime()).thenReturn(107L);
|
||||
try {
|
||||
// will remove 1st reservation also as it has fallen out of the archival
|
||||
|
@ -461,12 +469,16 @@ public class TestInMemoryPlan {
|
|||
} catch (PlanningException e) {
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
|
||||
userCons =
|
||||
plan.getConsumptionForUserOverTime(user, start, start + alloc1.length);
|
||||
|
||||
Assert.assertNull(plan.getReservationById(reservationID1));
|
||||
for (int i = 0; i < alloc1.length; i++) {
|
||||
Assert.assertEquals(Resource.newInstance(0, 0),
|
||||
plan.getTotalCommittedResources(start + i));
|
||||
Assert.assertEquals(Resource.newInstance(0, 0),
|
||||
plan.getConsumptionForUser(user, start + i));
|
||||
userCons.getCapacityAtTime(start + i));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,9 +18,12 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -86,6 +89,7 @@ public class TestGreedyReservationAgent {
|
|||
instConstraint, avgConstraint);
|
||||
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
|
||||
policy.init(reservationQ, conf);
|
||||
|
||||
agent = new GreedyReservationAgent();
|
||||
|
||||
QueueMetrics queueMetrics = mock(QueueMetrics.class);
|
||||
|
@ -135,6 +139,94 @@ public class TestGreedyReservationAgent {
|
|||
|
||||
}
|
||||
|
||||
@SuppressWarnings("javadoc")
|
||||
@Test
|
||||
public void testSharingPolicyFeedback() throws PlanningException {
|
||||
|
||||
prepareBasicPlan();
|
||||
|
||||
// let's constraint the instantaneous allocation and see the
|
||||
// policy kicking in during planning
|
||||
float instConstraint = 40;
|
||||
float avgConstraint = 40;
|
||||
|
||||
ReservationSchedulerConfiguration conf =
|
||||
ReservationSystemTestUtil.createConf(plan.getQueueName(), 100000,
|
||||
instConstraint, avgConstraint);
|
||||
|
||||
plan.getSharingPolicy().init(plan.getQueueName(), conf);
|
||||
|
||||
// create a request with a single atomic ask
|
||||
ReservationDefinition rr = new ReservationDefinitionPBImpl();
|
||||
rr.setArrival(5 * step);
|
||||
rr.setDeadline(100 * step);
|
||||
ReservationRequest r =
|
||||
ReservationRequest.newInstance(Resource.newInstance(2048, 2), 20, 20,
|
||||
10 * step);
|
||||
ReservationRequests reqs = new ReservationRequestsPBImpl();
|
||||
reqs.setReservationResources(Collections.singletonList(r));
|
||||
rr.setReservationRequests(reqs);
|
||||
|
||||
ReservationId reservationID =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
agent.createReservation(reservationID, "u3", plan, rr);
|
||||
|
||||
ReservationId reservationID2 =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
agent.createReservation(reservationID2, "u3", plan, rr);
|
||||
|
||||
ReservationDefinition rr3 = new ReservationDefinitionPBImpl();
|
||||
rr3.setArrival(5 * step);
|
||||
rr3.setDeadline(100 * step);
|
||||
ReservationRequest r3 =
|
||||
ReservationRequest.newInstance(Resource.newInstance(2048, 2), 45, 45,
|
||||
10 * step);
|
||||
ReservationRequests reqs3 = new ReservationRequestsPBImpl();
|
||||
reqs3.setReservationResources(Collections.singletonList(r3));
|
||||
rr3.setReservationRequests(reqs3);
|
||||
|
||||
ReservationId reservationID3 =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
try {
|
||||
// RR3 is simply too big to fit
|
||||
agent.createReservation(reservationID3, "u3", plan, rr3);
|
||||
fail();
|
||||
} catch (PlanningException pe) {
|
||||
// expected
|
||||
}
|
||||
|
||||
assertTrue("Agent-based allocation failed", reservationID != null);
|
||||
assertTrue("Agent-based allocation failed", plan.getAllReservations()
|
||||
.size() == 4);
|
||||
|
||||
ReservationAllocation cs = plan.getReservationById(reservationID);
|
||||
ReservationAllocation cs2 = plan.getReservationById(reservationID2);
|
||||
ReservationAllocation cs3 = plan.getReservationById(reservationID3);
|
||||
|
||||
assertNotNull(cs);
|
||||
assertNotNull(cs2);
|
||||
assertNull(cs3);
|
||||
|
||||
System.out.println("--------AFTER SIMPLE ALLOCATION (queue: "
|
||||
+ reservationID + ")----------");
|
||||
System.out.println(plan.toString());
|
||||
System.out.println(plan.toCumulativeString());
|
||||
|
||||
for (long i = 90 * step; i < 100 * step; i++) {
|
||||
assertTrue(
|
||||
"Agent-based allocation unexpected",
|
||||
Resources.equals(cs.getResourcesAtTime(i),
|
||||
Resource.newInstance(2048 * 20, 2 * 20)));
|
||||
}
|
||||
// RR2 is pushed out by the presence of RR
|
||||
for (long i = 80 * step; i < 90 * step; i++) {
|
||||
assertTrue(
|
||||
"Agent-based allocation unexpected",
|
||||
Resources.equals(cs2.getResourcesAtTime(i),
|
||||
Resource.newInstance(2048 * 20, 2 * 20)));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOrder() throws PlanningException {
|
||||
prepareBasicPlan();
|
||||
|
@ -186,7 +278,6 @@ public class TestGreedyReservationAgent {
|
|||
assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
|
||||
assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1));
|
||||
assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1));
|
||||
|
||||
System.out.println("--------AFTER ORDER ALLOCATION (queue: "
|
||||
+ reservationID + ")----------");
|
||||
System.out.println(plan.toString());
|
||||
|
@ -376,7 +467,6 @@ public class TestGreedyReservationAgent {
|
|||
ReservationAllocation cs = plan.getReservationById(reservationID);
|
||||
|
||||
assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
|
||||
|
||||
System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID
|
||||
+ ")----------");
|
||||
System.out.println(plan.toString());
|
||||
|
|
Loading…
Reference in New Issue