YARN-1709. In-memory data structures used to track resources over time to enable reservations.

(cherry picked from commit 0d8b2cd88b)
(cherry picked from commit cf4b34282a)
(cherry picked from commit 63250ef9d6)
This commit is contained in:
subru 2014-09-12 17:22:08 -07:00 committed by Chris Douglas
parent f4522fd987
commit 056b7f5799
15 changed files with 2534 additions and 0 deletions

View File

@ -5,3 +5,6 @@ YARN-2475. Logic for responding to capacity drops for the
ReservationSystem. (Carlo Curino and Subru Krishnan via curino) ReservationSystem. (Carlo Curino and Subru Krishnan via curino)
YARN-1708. Public YARN APIs for creating/updating/deleting reservations. (subru) YARN-1708. Public YARN APIs for creating/updating/deleting reservations. (subru)
YARN-1709. In-memory data structures used to track resources over time to
enable reservations. (subru)

View File

@ -0,0 +1,507 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class InMemoryPlan implements Plan {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class);
private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
private TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>> currentReservations =
new TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>>();
private RLESparseResourceAllocation rleSparseVector;
private Map<String, RLESparseResourceAllocation> userResourceAlloc =
new HashMap<String, RLESparseResourceAllocation>();
private Map<ReservationId, InMemoryReservationAllocation> reservationTable =
new HashMap<ReservationId, InMemoryReservationAllocation>();
private final ReentrantReadWriteLock readWriteLock =
new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
private final SharingPolicy policy;
private final ReservationAgent agent;
private final long step;
private final ResourceCalculator resCalc;
private final Resource minAlloc, maxAlloc;
private final String queueName;
private final QueueMetrics queueMetrics;
private final Planner replanner;
private final boolean getMoveOnExpiry;
private final Clock clock;
private Resource totalCapacity;
InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
ReservationAgent agent, Resource totalCapacity, long step,
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
String queueName, Planner replanner, boolean getMoveOnExpiry) {
this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock());
}
InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
ReservationAgent agent, Resource totalCapacity, long step,
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) {
this.queueMetrics = queueMetrics;
this.policy = policy;
this.agent = agent;
this.step = step;
this.totalCapacity = totalCapacity;
this.resCalc = resCalc;
this.minAlloc = minAlloc;
this.maxAlloc = maxAlloc;
this.rleSparseVector = new RLESparseResourceAllocation(resCalc, minAlloc);
this.queueName = queueName;
this.replanner = replanner;
this.getMoveOnExpiry = getMoveOnExpiry;
this.clock = clock;
}
@Override
public QueueMetrics getQueueMetrics() {
return queueMetrics;
}
private void incrementAllocation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
Map<ReservationInterval, ReservationRequest> allocationRequests =
reservation.getAllocationRequests();
// check if we have encountered the user earlier and if not add an entry
String user = reservation.getUser();
RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
if (resAlloc == null) {
resAlloc = new RLESparseResourceAllocation(resCalc, minAlloc);
userResourceAlloc.put(user, resAlloc);
}
for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
.entrySet()) {
resAlloc.addInterval(r.getKey(), r.getValue());
rleSparseVector.addInterval(r.getKey(), r.getValue());
}
}
private void decrementAllocation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
Map<ReservationInterval, ReservationRequest> allocationRequests =
reservation.getAllocationRequests();
String user = reservation.getUser();
RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
.entrySet()) {
resAlloc.removeInterval(r.getKey(), r.getValue());
rleSparseVector.removeInterval(r.getKey(), r.getValue());
}
if (resAlloc.isEmpty()) {
userResourceAlloc.remove(resAlloc);
}
}
public Set<ReservationAllocation> getAllReservations() {
readLock.lock();
try {
if (currentReservations != null) {
Set<ReservationAllocation> flattenedReservations =
new HashSet<ReservationAllocation>();
for (Set<InMemoryReservationAllocation> reservationEntries : currentReservations
.values()) {
flattenedReservations.addAll(reservationEntries);
}
return flattenedReservations;
} else {
return null;
}
} finally {
readLock.unlock();
}
}
@Override
public boolean addReservation(ReservationAllocation reservation)
throws PlanningException {
// Verify the allocation is memory based otherwise it is not supported
InMemoryReservationAllocation inMemReservation =
(InMemoryReservationAllocation) reservation;
if (inMemReservation.getUser() == null) {
String errMsg =
"The specified Reservation with ID "
+ inMemReservation.getReservationId()
+ " is not mapped to any user";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
writeLock.lock();
try {
if (reservationTable.containsKey(inMemReservation.getReservationId())) {
String errMsg =
"The specified Reservation with ID "
+ inMemReservation.getReservationId() + " already exists";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
// Validate if we can accept this reservation, throws exception if
// validation fails
policy.validate(this, inMemReservation);
// we record here the time in which the allocation has been accepted
reservation.setAcceptanceTimestamp(clock.getTime());
ReservationInterval searchInterval =
new ReservationInterval(inMemReservation.getStartTime(),
inMemReservation.getEndTime());
Set<InMemoryReservationAllocation> reservations =
currentReservations.get(searchInterval);
if (reservations == null) {
reservations = new HashSet<InMemoryReservationAllocation>();
}
if (!reservations.add(inMemReservation)) {
LOG.error("Unable to add reservation: {} to plan.",
inMemReservation.getReservationId());
return false;
}
currentReservations.put(searchInterval, reservations);
reservationTable.put(inMemReservation.getReservationId(),
inMemReservation);
incrementAllocation(inMemReservation);
LOG.info("Sucessfully added reservation: {} to plan.",
inMemReservation.getReservationId());
return true;
} finally {
writeLock.unlock();
}
}
@Override
public boolean updateReservation(ReservationAllocation reservation)
throws PlanningException {
writeLock.lock();
boolean result = false;
try {
ReservationId resId = reservation.getReservationId();
ReservationAllocation currReservation = getReservationById(resId);
if (currReservation == null) {
String errMsg =
"The specified Reservation with ID " + resId
+ " does not exist in the plan";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
if (!removeReservation(currReservation)) {
LOG.error("Unable to replace reservation: {} from plan.",
reservation.getReservationId());
return result;
}
try {
result = addReservation(reservation);
} catch (PlanningException e) {
LOG.error("Unable to update reservation: {} from plan due to {}.",
reservation.getReservationId(), e.getMessage());
}
if (result) {
LOG.info("Sucessfully updated reservation: {} in plan.",
reservation.getReservationId());
return result;
} else {
// rollback delete
addReservation(currReservation);
LOG.info("Rollbacked update reservation: {} from plan.",
reservation.getReservationId());
return result;
}
} finally {
writeLock.unlock();
}
}
private boolean removeReservation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
ReservationInterval searchInterval =
new ReservationInterval(reservation.getStartTime(),
reservation.getEndTime());
Set<InMemoryReservationAllocation> reservations =
currentReservations.get(searchInterval);
if (reservations != null) {
if (!reservations.remove(reservation)) {
LOG.error("Unable to remove reservation: {} from plan.",
reservation.getReservationId());
return false;
}
if (reservations.isEmpty()) {
currentReservations.remove(searchInterval);
}
} else {
String errMsg =
"The specified Reservation with ID " + reservation.getReservationId()
+ " does not exist in the plan";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
reservationTable.remove(reservation.getReservationId());
decrementAllocation(reservation);
LOG.info("Sucessfully deleted reservation: {} in plan.",
reservation.getReservationId());
return true;
}
@Override
public boolean deleteReservation(ReservationId reservationID) {
writeLock.lock();
try {
ReservationAllocation reservation = getReservationById(reservationID);
if (reservation == null) {
String errMsg =
"The specified Reservation with ID " + reservationID
+ " does not exist in the plan";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
return removeReservation(reservation);
} finally {
writeLock.unlock();
}
}
@Override
public void archiveCompletedReservations(long tick) {
// Since we are looking for old reservations, read lock is optimal
LOG.debug("Running archival at time: {}", tick);
readLock.lock();
List<InMemoryReservationAllocation> expiredReservations =
new ArrayList<InMemoryReservationAllocation>();
// archive reservations and delete the ones which are beyond
// the reservation policy "window"
try {
long archivalTime = tick - policy.getValidWindow();
ReservationInterval searchInterval =
new ReservationInterval(archivalTime, archivalTime);
SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> reservations =
currentReservations.headMap(searchInterval, true);
if (!reservations.isEmpty()) {
for (Set<InMemoryReservationAllocation> reservationEntries : reservations
.values()) {
for (InMemoryReservationAllocation reservation : reservationEntries) {
if (reservation.getEndTime() <= archivalTime) {
expiredReservations.add(reservation);
}
}
}
}
} finally {
readLock.unlock();
}
if (expiredReservations.isEmpty()) {
return;
}
// Need write lock only if there are any reservations to be deleted
writeLock.lock();
try {
for (InMemoryReservationAllocation expiredReservation : expiredReservations) {
removeReservation(expiredReservation);
}
} finally {
writeLock.unlock();
}
}
@Override
public Set<ReservationAllocation> getReservationsAtTime(long tick) {
readLock.lock();
ReservationInterval searchInterval =
new ReservationInterval(tick, Long.MAX_VALUE);
try {
SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> reservations =
currentReservations.headMap(searchInterval, true);
if (!reservations.isEmpty()) {
Set<ReservationAllocation> flattenedReservations =
new HashSet<ReservationAllocation>();
for (Set<InMemoryReservationAllocation> reservationEntries : reservations
.values()) {
for (InMemoryReservationAllocation reservation : reservationEntries) {
if (reservation.getEndTime() > tick) {
flattenedReservations.add(reservation);
}
}
}
return Collections.unmodifiableSet(flattenedReservations);
} else {
return Collections.emptySet();
}
} finally {
readLock.unlock();
}
}
@Override
public long getStep() {
return step;
}
@Override
public SharingPolicy getSharingPolicy() {
return policy;
}
@Override
public ReservationAgent getReservationAgent() {
return agent;
}
@Override
public Resource getConsumptionForUser(String user, long t) {
readLock.lock();
try {
RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user);
if (userResAlloc != null) {
return userResAlloc.getCapacityAtTime(t);
} else {
return Resources.clone(ZERO_RESOURCE);
}
} finally {
readLock.unlock();
}
}
@Override
public Resource getTotalCommittedResources(long t) {
readLock.lock();
try {
return rleSparseVector.getCapacityAtTime(t);
} finally {
readLock.unlock();
}
}
@Override
public ReservationAllocation getReservationById(ReservationId reservationID) {
if (reservationID == null) {
return null;
}
readLock.lock();
try {
return reservationTable.get(reservationID);
} finally {
readLock.unlock();
}
}
@Override
public Resource getTotalCapacity() {
readLock.lock();
try {
return Resources.clone(totalCapacity);
} finally {
readLock.unlock();
}
}
@Override
public Resource getMinimumAllocation() {
return Resources.clone(minAlloc);
}
@Override
public void setTotalCapacity(Resource cap) {
writeLock.lock();
try {
totalCapacity = Resources.clone(cap);
} finally {
writeLock.unlock();
}
}
public long getEarliestStartTime() {
readLock.lock();
try {
return rleSparseVector.getEarliestStartTime();
} finally {
readLock.unlock();
}
}
@Override
public long getLastEndTime() {
readLock.lock();
try {
return rleSparseVector.getLatestEndTime();
} finally {
readLock.unlock();
}
}
@Override
public ResourceCalculator getResourceCalculator() {
return resCalc;
}
@Override
public String getQueueName() {
return queueName;
}
@Override
public Resource getMaximumAllocation() {
return Resources.clone(maxAlloc);
}
public String toCumulativeString() {
readLock.lock();
try {
return rleSparseVector.toString();
} finally {
readLock.unlock();
}
}
@Override
public Planner getReplanner() {
return replanner;
}
@Override
public boolean getMoveOnExpiry() {
return getMoveOnExpiry;
}
@Override
public String toString() {
readLock.lock();
try {
StringBuffer planStr = new StringBuffer("In-memory Plan: ");
planStr.append("Parent Queue: ").append(queueName)
.append("Total Capacity: ").append(totalCapacity).append("Step: ")
.append(step);
for (ReservationAllocation reservation : getAllReservations()) {
planStr.append(reservation);
}
return planStr.toString();
} finally {
readLock.unlock();
}
}
}

View File

@ -0,0 +1,151 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Collections;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* An in memory implementation of a reservation allocation using the
* {@link RLESparseResourceAllocation}
*
*/
class InMemoryReservationAllocation implements ReservationAllocation {
private final String planName;
private final ReservationId reservationID;
private final String user;
private final ReservationDefinition contract;
private final long startTime;
private final long endTime;
private final Map<ReservationInterval, ReservationRequest> allocationRequests;
private boolean hasGang = false;
private long acceptedAt = -1;
private RLESparseResourceAllocation resourcesOverTime;
InMemoryReservationAllocation(ReservationId reservationID,
ReservationDefinition contract, String user, String planName,
long startTime, long endTime,
Map<ReservationInterval, ReservationRequest> allocationRequests,
ResourceCalculator calculator, Resource minAlloc) {
this.contract = contract;
this.startTime = startTime;
this.endTime = endTime;
this.reservationID = reservationID;
this.user = user;
this.allocationRequests = allocationRequests;
this.planName = planName;
resourcesOverTime = new RLESparseResourceAllocation(calculator, minAlloc);
for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
.entrySet()) {
resourcesOverTime.addInterval(r.getKey(), r.getValue());
if (r.getValue().getConcurrency() > 1) {
hasGang = true;
}
}
}
@Override
public ReservationId getReservationId() {
return reservationID;
}
@Override
public ReservationDefinition getReservationDefinition() {
return contract;
}
@Override
public long getStartTime() {
return startTime;
}
@Override
public long getEndTime() {
return endTime;
}
@Override
public Map<ReservationInterval, ReservationRequest> getAllocationRequests() {
return Collections.unmodifiableMap(allocationRequests);
}
@Override
public String getPlanName() {
return planName;
}
@Override
public String getUser() {
return user;
}
@Override
public boolean containsGangs() {
return hasGang;
}
@Override
public void setAcceptanceTimestamp(long acceptedAt) {
this.acceptedAt = acceptedAt;
}
@Override
public long getAcceptanceTime() {
return acceptedAt;
}
@Override
public Resource getResourcesAtTime(long tick) {
if (tick < startTime || tick >= endTime) {
return Resource.newInstance(0, 0);
}
return Resources.clone(resourcesOverTime.getCapacityAtTime(tick));
}
@Override
public String toString() {
StringBuilder sBuf = new StringBuilder();
sBuf.append(getReservationId()).append(" user:").append(getUser())
.append(" startTime: ").append(getStartTime()).append(" endTime: ")
.append(getEndTime()).append(" alloc:[")
.append(resourcesOverTime.toString()).append("] ");
return sBuf.toString();
}
@Override
public int compareTo(ReservationAllocation other) {
// reverse order of acceptance
if (this.getAcceptanceTime() > other.getAcceptanceTime()) {
return -1;
}
if (this.getAcceptanceTime() < other.getAcceptanceTime()) {
return 1;
}
return 0;
}
@Override
public int hashCode() {
return reservationID.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
InMemoryReservationAllocation other = (InMemoryReservationAllocation) obj;
return this.reservationID.equals(other.getReservationId());
}
}

View File

@ -0,0 +1,32 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
/**
* A Plan represents the central data structure of a reservation system that
* maintains the "agenda" for the cluster. In particular, it maintains
* information on how a set of {@link ReservationDefinition} that have been
* previously accepted will be honored.
*
* {@link ReservationDefinition} submitted by the users through the RM public
* APIs are passed to appropriate {@link ReservationAgent}s, which in turn will
* consult the Plan (via the {@link PlanView} interface) and try to determine
* whether there are sufficient resources available in this Plan to satisfy the
* temporal and resource constraints of a {@link ReservationDefinition}. If a
* valid allocation is found the agent will try to store it in the plan (via the
* {@link PlanEdit} interface). Upon success the system return to the user a
* positive acknowledgment, and a reservation identifier to be later used to
* access the reserved resources.
*
* A {@link PlanFollower} will continuously read from the Plan and will
* affect the instantaneous allocation of resources among jobs running by
* publishing the "current" slice of the Plan to the underlying scheduler. I.e.,
* the configuration of queues/weights of the scheduler are modified to reflect
* the allocations in the Plan.
*
* As this interface have several methods we decompose them into three groups:
* {@link PlanContext}: containing configuration type information,
* {@link PlanView} read-only access to the plan state, and {@link PlanEdit}
* write access to the plan state.
*/
public interface Plan extends PlanContext, PlanView, PlanEdit {
}

View File

@ -0,0 +1,101 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
/**
* This interface provides read-only access to configuration-type parameter for
* a plan.
*
*/
public interface PlanContext {
/**
* Returns the configured "step" or granularity of time of the plan in millis.
*
* @return plan step in millis
*/
public long getStep();
/**
* Return the {@link ReservationAgent} configured for this plan that is
* responsible for optimally placing various reservation requests
*
* @return the {@link ReservationAgent} configured for this plan
*/
public ReservationAgent getReservationAgent();
/**
* Return an instance of a {@link Planner}, which will be invoked in response
* to unexpected reduction in the resources of this plan
*
* @return an instance of a {@link Planner}, which will be invoked in response
* to unexpected reduction in the resources of this plan
*/
public Planner getReplanner();
/**
* Return the configured {@link SharingPolicy} that governs the sharing of the
* resources of the plan between its various users
*
* @return the configured {@link SharingPolicy} that governs the sharing of
* the resources of the plan between its various users
*/
public SharingPolicy getSharingPolicy();
/**
* Returns the system {@link ResourceCalculator}
*
* @return the system {@link ResourceCalculator}
*/
public ResourceCalculator getResourceCalculator();
/**
* Returns the single smallest {@link Resource} allocation that can be
* reserved in this plan
*
* @return the single smallest {@link Resource} allocation that can be
* reserved in this plan
*/
public Resource getMinimumAllocation();
/**
* Returns the single largest {@link Resource} allocation that can be reserved
* in this plan
*
* @return the single largest {@link Resource} allocation that can be reserved
* in this plan
*/
public Resource getMaximumAllocation();
/**
* Return the name of the queue in the {@link ResourceScheduler} corresponding
* to this plan
*
* @return the name of the queue in the {@link ResourceScheduler}
* corresponding to this plan
*/
public String getQueueName();
/**
* Return the {@link QueueMetrics} for the queue in the
* {@link ResourceScheduler} corresponding to this plan
*
* @return the {@link QueueMetrics} for the queue in the
* {@link ResourceScheduler} corresponding to this plan
*/
public QueueMetrics getQueueMetrics();
/**
* Instructs the {@link PlanFollower} on what to do for applications
* which are still running when the reservation is expiring (move-to-default
* vs kill)
*
* @return true if remaining applications have to be killed, false if they
* have to migrated
*/
public boolean getMoveOnExpiry();
}

View File

@ -0,0 +1,61 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
/**
* This interface groups the methods used to modify the state of a Plan.
*/
public interface PlanEdit extends PlanContext, PlanView {
/**
* Add a new {@link ReservationAllocation} to the plan
*
* @param reservation the {@link ReservationAllocation} to be added to the
* plan
* @return true if addition is successful, false otherwise
*/
public boolean addReservation(ReservationAllocation reservation)
throws PlanningException;
/**
* Updates an existing {@link ReservationAllocation} in the plan. This is
* required for re-negotiation
*
* @param reservation the {@link ReservationAllocation} to be updated the plan
* @return true if update is successful, false otherwise
*/
public boolean updateReservation(ReservationAllocation reservation)
throws PlanningException;
/**
* Delete an existing {@link ReservationAllocation} from the plan identified
* uniquely by its {@link ReservationId}. This will generally be used for
* garbage collection
*
* @param reservation the {@link ReservationAllocation} to be deleted from the
* plan identified uniquely by its {@link ReservationId}
* @return true if delete is successful, false otherwise
*/
public boolean deleteReservation(ReservationId reservationID)
throws PlanningException;
/**
* Method invoked to garbage collect old reservations. It cleans up expired
* reservations that have fallen out of the sliding archival window
*
* @param tick the current time from which the archival window is computed
*/
public void archiveCompletedReservations(long tick) throws PlanningException;
/**
* Sets the overall capacity in terms of {@link Resource} assigned to this
* plan
*
* @param capacity the overall capacity in terms of {@link Resource} assigned
* to this plan
*/
public void setTotalCapacity(Resource capacity);
}

View File

@ -0,0 +1,89 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
/**
* This interface provides a read-only view on the allocations made in this
* plan. This methods are used for example by {@link ReservationAgent}s to
* determine the free resources in a certain point in time, and by
* PlanFollowerPolicy to publish this plan to the scheduler.
*/
public interface PlanView extends PlanContext {
/**
* Return a {@link ReservationAllocation} identified by its
* {@link ReservationId}
*
* @param reservationID the unique id to identify the
* {@link ReservationAllocation}
* @return {@link ReservationAllocation} identified by the specified id
*/
public ReservationAllocation getReservationById(ReservationId reservationID);
/**
* Gets all the active reservations at the specified point of time
*
* @param tick the time (UTC in ms) for which the active reservations are
* requested
* @return set of active reservations at the specified time
*/
public Set<ReservationAllocation> getReservationsAtTime(long tick);
/**
* Gets all the reservations in the plan
*
* @return set of all reservations handled by this Plan
*/
public Set<ReservationAllocation> getAllReservations();
/**
* Returns the total {@link Resource} reserved for all users at the specified
* time
*
* @param tick the time (UTC in ms) for which the reserved resources are
* requested
* @return the total {@link Resource} reserved for all users at the specified
* time
*/
public Resource getTotalCommittedResources(long tick);
/**
* Returns the total {@link Resource} reserved for a given user at the
* specified time
*
* @param user the user who made the reservation(s)
* @param tick the time (UTC in ms) for which the reserved resources are
* requested
* @return the total {@link Resource} reserved for a given user at the
* specified time
*/
public Resource getConsumptionForUser(String user, long tick);
/**
* Returns the overall capacity in terms of {@link Resource} assigned to this
* plan (typically will correspond to the absolute capacity of the
* corresponding queue).
*
* @return the overall capacity in terms of {@link Resource} assigned to this
* plan
*/
public Resource getTotalCapacity();
/**
* Gets the time (UTC in ms) at which the first reservation starts
*
* @return the time (UTC in ms) at which the first reservation starts
*/
public long getEarliestStartTime();
/**
* Returns the time (UTC in ms) at which the last reservation terminates
*
* @return the time (UTC in ms) at which the last reservation terminates
*/
public long getLastEndTime();
}

View File

@ -0,0 +1,332 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.gson.stream.JsonWriter;
/**
* This is a run length encoded sparse data structure that maintains resource
* allocations over time
*/
public class RLESparseResourceAllocation {
private static final int THRESHOLD = 100;
private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
private TreeMap<Long, Resource> cumulativeCapacity =
new TreeMap<Long, Resource>();
private final ReentrantReadWriteLock readWriteLock =
new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
private final ResourceCalculator resourceCalculator;
private final Resource minAlloc;
public RLESparseResourceAllocation(ResourceCalculator resourceCalculator,
Resource minAlloc) {
this.resourceCalculator = resourceCalculator;
this.minAlloc = minAlloc;
}
private boolean isSameAsPrevious(Long key, Resource capacity) {
Entry<Long, Resource> previous = cumulativeCapacity.lowerEntry(key);
return (previous != null && previous.getValue().equals(capacity));
}
private boolean isSameAsNext(Long key, Resource capacity) {
Entry<Long, Resource> next = cumulativeCapacity.higherEntry(key);
return (next != null && next.getValue().equals(capacity));
}
/**
* Add a resource for the specified interval
*
* @param reservationInterval the interval for which the resource is to be
* added
* @param capacity the resource to be added
* @return true if addition is successful, false otherwise
*/
public boolean addInterval(ReservationInterval reservationInterval,
ReservationRequest capacity) {
Resource totCap =
Resources.multiply(capacity.getCapability(),
(float) capacity.getNumContainers());
if (totCap.equals(ZERO_RESOURCE)) {
return true;
}
writeLock.lock();
try {
long startKey = reservationInterval.getStartTime();
long endKey = reservationInterval.getEndTime();
NavigableMap<Long, Resource> ticks =
cumulativeCapacity.headMap(endKey, false);
if (ticks != null && !ticks.isEmpty()) {
Resource updatedCapacity = Resource.newInstance(0, 0);
Entry<Long, Resource> lowEntry = ticks.floorEntry(startKey);
if (lowEntry == null) {
// This is the earliest starting interval
cumulativeCapacity.put(startKey, totCap);
} else {
updatedCapacity = Resources.add(lowEntry.getValue(), totCap);
// Add a new tick only if the updated value is different
// from the previous tick
if ((startKey == lowEntry.getKey())
&& (isSameAsPrevious(lowEntry.getKey(), updatedCapacity))) {
cumulativeCapacity.remove(lowEntry.getKey());
} else {
cumulativeCapacity.put(startKey, updatedCapacity);
}
}
// Increase all the capacities of overlapping intervals
Set<Entry<Long, Resource>> overlapSet =
ticks.tailMap(startKey, false).entrySet();
for (Entry<Long, Resource> entry : overlapSet) {
updatedCapacity = Resources.add(entry.getValue(), totCap);
entry.setValue(updatedCapacity);
}
} else {
// This is the first interval to be added
cumulativeCapacity.put(startKey, totCap);
}
Resource nextTick = cumulativeCapacity.get(endKey);
if (nextTick != null) {
// If there is overlap, remove the duplicate entry
if (isSameAsPrevious(endKey, nextTick)) {
cumulativeCapacity.remove(endKey);
}
} else {
// Decrease capacity as this is end of the interval
cumulativeCapacity.put(endKey, Resources.subtract(cumulativeCapacity
.floorEntry(endKey).getValue(), totCap));
}
return true;
} finally {
writeLock.unlock();
}
}
/**
* Add multiple resources for the specified interval
*
* @param reservationInterval the interval for which the resource is to be
* added
* @param ReservationRequests the resources to be added
* @param clusterResource the total resources in the cluster
* @return true if addition is successful, false otherwise
*/
public boolean addCompositeInterval(ReservationInterval reservationInterval,
List<ReservationRequest> ReservationRequests, Resource clusterResource) {
ReservationRequest aggregateReservationRequest =
Records.newRecord(ReservationRequest.class);
Resource capacity = Resource.newInstance(0, 0);
for (ReservationRequest ReservationRequest : ReservationRequests) {
Resources.addTo(capacity, Resources.multiply(
ReservationRequest.getCapability(),
ReservationRequest.getNumContainers()));
}
aggregateReservationRequest.setNumContainers((int) Math.ceil(Resources
.divide(resourceCalculator, clusterResource, capacity, minAlloc)));
aggregateReservationRequest.setCapability(minAlloc);
return addInterval(reservationInterval, aggregateReservationRequest);
}
/**
* Removes a resource for the specified interval
*
* @param reservationInterval the interval for which the resource is to be
* removed
* @param capacity the resource to be removed
* @return true if removal is successful, false otherwise
*/
public boolean removeInterval(ReservationInterval reservationInterval,
ReservationRequest capacity) {
Resource totCap =
Resources.multiply(capacity.getCapability(),
(float) capacity.getNumContainers());
if (totCap.equals(ZERO_RESOURCE)) {
return true;
}
writeLock.lock();
try {
long startKey = reservationInterval.getStartTime();
long endKey = reservationInterval.getEndTime();
// update the start key
NavigableMap<Long, Resource> ticks =
cumulativeCapacity.headMap(endKey, false);
// Decrease all the capacities of overlapping intervals
SortedMap<Long, Resource> overlapSet = ticks.tailMap(startKey);
if (overlapSet != null && !overlapSet.isEmpty()) {
Resource updatedCapacity = Resource.newInstance(0, 0);
long currentKey = -1;
for (Iterator<Entry<Long, Resource>> overlapEntries =
overlapSet.entrySet().iterator(); overlapEntries.hasNext();) {
Entry<Long, Resource> entry = overlapEntries.next();
currentKey = entry.getKey();
updatedCapacity = Resources.subtract(entry.getValue(), totCap);
// update each entry between start and end key
cumulativeCapacity.put(currentKey, updatedCapacity);
}
// Remove the first overlap entry if it is same as previous after
// updation
Long firstKey = overlapSet.firstKey();
if (isSameAsPrevious(firstKey, overlapSet.get(firstKey))) {
cumulativeCapacity.remove(firstKey);
}
// Remove the next entry if it is same as end entry after updation
if ((currentKey != -1) && (isSameAsNext(currentKey, updatedCapacity))) {
cumulativeCapacity.remove(cumulativeCapacity.higherKey(currentKey));
}
}
return true;
} finally {
writeLock.unlock();
}
}
/**
* Returns the capacity, i.e. total resources allocated at the specified point
* of time
*
* @param tick the time (UTC in ms) at which the capacity is requested
* @return the resources allocated at the specified time
*/
public Resource getCapacityAtTime(long tick) {
readLock.lock();
try {
Entry<Long, Resource> closestStep = cumulativeCapacity.floorEntry(tick);
if (closestStep != null) {
return Resources.clone(closestStep.getValue());
}
return Resources.clone(ZERO_RESOURCE);
} finally {
readLock.unlock();
}
}
/**
* Get the timestamp of the earliest resource allocation
*
* @return the timestamp of the first resource allocation
*/
public long getEarliestStartTime() {
readLock.lock();
try {
if (cumulativeCapacity.isEmpty()) {
return -1;
} else {
return cumulativeCapacity.firstKey();
}
} finally {
readLock.unlock();
}
}
/**
* Get the timestamp of the latest resource allocation
*
* @return the timestamp of the last resource allocation
*/
public long getLatestEndTime() {
readLock.lock();
try {
if (cumulativeCapacity.isEmpty()) {
return -1;
} else {
return cumulativeCapacity.lastKey();
}
} finally {
readLock.unlock();
}
}
/**
* Returns true if there are no non-zero entries
*
* @return true if there are no allocations or false otherwise
*/
public boolean isEmpty() {
readLock.lock();
try {
if (cumulativeCapacity.isEmpty()) {
return true;
}
// Deletion leaves a single zero entry so check for that
if (cumulativeCapacity.size() == 1) {
return cumulativeCapacity.firstEntry().getValue().equals(ZERO_RESOURCE);
}
return false;
} finally {
readLock.unlock();
}
}
@Override
public String toString() {
StringBuilder ret = new StringBuilder();
readLock.lock();
try {
if (cumulativeCapacity.size() > THRESHOLD) {
ret.append("Number of steps: ").append(cumulativeCapacity.size())
.append(" earliest entry: ").append(cumulativeCapacity.firstKey())
.append(" latest entry: ").append(cumulativeCapacity.lastKey());
} else {
for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) {
ret.append(r.getKey()).append(": ").append(r.getValue())
.append("\n ");
}
}
return ret.toString();
} finally {
readLock.unlock();
}
}
/**
* Returns the JSON string representation of the current resources allocated
* over time
*
* @return the JSON string representation of the current resources allocated
* over time
*/
public String toMemJSONString() {
StringWriter json = new StringWriter();
JsonWriter jsonWriter = new JsonWriter(json);
readLock.lock();
try {
jsonWriter.beginObject();
// jsonWriter.name("timestamp").value("resource");
for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) {
jsonWriter.name(r.getKey().toString()).value(r.getValue().toString());
}
jsonWriter.endObject();
jsonWriter.close();
return json.toString();
} catch (IOException e) {
// This should not happen
return "";
} finally {
readLock.unlock();
}
}
}

View File

@ -0,0 +1,104 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
/**
* A ReservationAllocation represents a concrete allocation of resources over
* time that satisfy a certain {@link ReservationDefinition}. This is used
* internally by a {@link Plan} to store information about how each of the
* accepted {@link ReservationDefinition} have been allocated.
*/
public interface ReservationAllocation extends
Comparable<ReservationAllocation> {
/**
* Returns the unique identifier {@link ReservationId} that represents the
* reservation
*
* @return reservationId the unique identifier {@link ReservationId} that
* represents the reservation
*/
public ReservationId getReservationId();
/**
* Returns the original {@link ReservationDefinition} submitted by the client
*
* @return
*/
public ReservationDefinition getReservationDefinition();
/**
* Returns the time at which the reservation is activated
*
* @return the time at which the reservation is activated
*/
public long getStartTime();
/**
* Returns the time at which the reservation terminates
*
* @return the time at which the reservation terminates
*/
public long getEndTime();
/**
* Returns the map of resources requested against the time interval for which
* they were
*
* @return the allocationRequests the map of resources requested against the
* time interval for which they were
*/
public Map<ReservationInterval, ReservationRequest> getAllocationRequests();
/**
* Return a string identifying the plan to which the reservation belongs
*
* @return the plan to which the reservation belongs
*/
public String getPlanName();
/**
* Returns the user who requested the reservation
*
* @return the user who requested the reservation
*/
public String getUser();
/**
* Returns whether the reservation has gang semantics or not
*
* @return true if there is a gang request, false otherwise
*/
public boolean containsGangs();
/**
* Sets the time at which the reservation was accepted by the system
*
* @param acceptedAt the time at which the reservation was accepted by the
* system
*/
public void setAcceptanceTimestamp(long acceptedAt);
/**
* Returns the time at which the reservation was accepted by the system
*
* @return the time at which the reservation was accepted by the system
*/
public long getAcceptanceTime();
/**
* Returns the capacity represented by cumulative resources reserved by the
* reservation at the specified point of time
*
* @param tick the time (UTC in ms) for which the reserved resources are
* requested
* @return the resources reserved at the specified time
*/
public Resource getResourcesAtTime(long tick);
}

View File

@ -0,0 +1,67 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
/**
* This represents the time duration of the reservation
*
*/
public class ReservationInterval implements Comparable<ReservationInterval> {
private final long startTime;
private final long endTime;
public ReservationInterval(long startTime, long endTime) {
this.startTime = startTime;
this.endTime = endTime;
}
/**
* Get the start time of the reservation interval
*
* @return the startTime
*/
public long getStartTime() {
return startTime;
}
/**
* Get the end time of the reservation interval
*
* @return the endTime
*/
public long getEndTime() {
return endTime;
}
/**
* Returns whether the interval is active at the specified instant of time
*
* @param tick the instance of the time to check
* @return true if active, false otherwise
*/
public boolean isOverlap(long tick) {
return (startTime <= tick && tick <= endTime);
}
@Override
public int compareTo(ReservationInterval anotherInterval) {
long diff = 0;
if (startTime == anotherInterval.getStartTime()) {
diff = endTime - anotherInterval.getEndTime();
} else {
diff = startTime - anotherInterval.getStartTime();
}
if (diff < 0) {
return -1;
} else if (diff > 0) {
return 1;
} else {
return 0;
}
}
public String toString() {
return "[" + startTime + ", " + endTime + "]";
}
}

View File

@ -0,0 +1,25 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
/**
* Exception thrown by the admission control subsystem when there is a problem
* in trying to find an allocation for a user {@link ReservationSubmissionRequest}.
*/
public class PlanningException extends Exception {
private static final long serialVersionUID = -684069387367879218L;
public PlanningException(String message) {
super(message);
}
public PlanningException(Throwable cause) {
super(cause);
}
public PlanningException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,210 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.junit.Assert;
import org.mockito.Mockito;
public class ReservationSystemTestUtil {
private static Random rand = new Random();
public final static String reservationQ = "dedicated";
public static ReservationId getNewReservationId() {
return ReservationId.newInstance(rand.nextLong(), rand.nextLong());
}
public CapacityScheduler mockCapacityScheduler(int numContainers)
throws IOException {
// stolen from TestCapacityScheduler
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
CapacityScheduler cs = Mockito.spy(new CapacityScheduler());
cs.setConf(new YarnConfiguration());
RMContext mockRmContext =
Mockito.spy(new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null));
cs.setRMContext(mockRmContext);
try {
cs.serviceInit(conf);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
when(mockRmContext.getScheduler()).thenReturn(cs);
Resource r = Resource.newInstance(numContainers * 1024, numContainers);
doReturn(r).when(cs).getClusterResource();
return cs;
}
public static void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
// Define default queue
final String defQ = CapacitySchedulerConfiguration.ROOT + ".default";
conf.setCapacity(defQ, 10);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {
"default", "a", reservationQ });
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 10);
final String dedicated =
CapacitySchedulerConfiguration.ROOT
+ CapacitySchedulerConfiguration.DOT + reservationQ;
conf.setCapacity(dedicated, 80);
// Set as reservation queue
conf.setReservableQueue(dedicated, true);
// Define 2nd-level queues
final String A1 = A + ".a1";
final String A2 = A + ".a2";
conf.setQueues(A, new String[] { "a1", "a2" });
conf.setCapacity(A1, 30);
conf.setCapacity(A2, 70);
}
public String getFullReservationQueueName() {
return CapacitySchedulerConfiguration.ROOT
+ CapacitySchedulerConfiguration.DOT + reservationQ;
}
public String getreservationQueueName() {
return reservationQ;
}
public void updateQueueConfiguration(CapacitySchedulerConfiguration conf,
String newQ) {
// Define default queue
final String prefix =
CapacitySchedulerConfiguration.ROOT
+ CapacitySchedulerConfiguration.DOT;
final String defQ = prefix + "default";
conf.setCapacity(defQ, 5);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {
"default", "a", reservationQ, newQ });
final String A = prefix + "a";
conf.setCapacity(A, 5);
final String dedicated = prefix + reservationQ;
conf.setCapacity(dedicated, 80);
// Set as reservation queue
conf.setReservableQueue(dedicated, true);
conf.setCapacity(prefix + newQ, 10);
// Set as reservation queue
conf.setReservableQueue(prefix + newQ, true);
// Define 2nd-level queues
final String A1 = A + ".a1";
final String A2 = A + ".a2";
conf.setQueues(A, new String[] { "a1", "a2" });
conf.setCapacity(A1, 30);
conf.setCapacity(A2, 70);
}
public static ReservationDefinition generateRandomRR(Random rand, long i) {
rand.setSeed(i);
long now = System.currentTimeMillis();
// start time at random in the next 12 hours
long arrival = rand.nextInt(12 * 3600 * 1000);
// deadline at random in the next day
long deadline = arrival + rand.nextInt(24 * 3600 * 1000);
// create a request with a single atomic ask
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(now + arrival);
rr.setDeadline(now + deadline);
int gang = 1 + rand.nextInt(9);
int par = (rand.nextInt(1000) + 1) * gang;
long dur = rand.nextInt(2 * 3600 * 1000); // random duration within 2h
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(1024, 1), par,
gang, dur);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setReservationResources(Collections.singletonList(r));
rand.nextInt(3);
ReservationRequestInterpreter[] type =
ReservationRequestInterpreter.values();
reqs.setInterpreter(type[rand.nextInt(type.length)]);
rr.setReservationRequests(reqs);
return rr;
}
public static ReservationDefinition generateBigRR(Random rand, long i) {
rand.setSeed(i);
long now = System.currentTimeMillis();
// start time at random in the next 2 hours
long arrival = rand.nextInt(2 * 3600 * 1000);
// deadline at random in the next day
long deadline = rand.nextInt(24 * 3600 * 1000);
// create a request with a single atomic ask
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(now + arrival);
rr.setDeadline(now + deadline);
int gang = 1;
int par = 100000; // 100k tasks
long dur = rand.nextInt(60 * 1000); // 1min tasks
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(1024, 1), par,
gang, dur);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setReservationResources(Collections.singletonList(r));
rand.nextInt(3);
ReservationRequestInterpreter[] type =
ReservationRequestInterpreter.values();
reqs.setInterpreter(type[rand.nextInt(type.length)]);
rr.setReservationRequests(reqs);
return rr;
}
public static Map<ReservationInterval, ReservationRequest> generateAllocation(
long startTime, long step, int[] alloc) {
Map<ReservationInterval, ReservationRequest> req =
new TreeMap<ReservationInterval, ReservationRequest>();
for (int i = 0; i < alloc.length; i++) {
req.put(new ReservationInterval(startTime + i * step, startTime + (i + 1)
* step), ReservationRequest.newInstance(
Resource.newInstance(1024, 1), alloc[i]));
}
return req;
}
}

View File

@ -0,0 +1,477 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestInMemoryPlan {
private String user = "yarn";
private String planName = "test-reservation";
private ResourceCalculator resCalc;
private Resource minAlloc;
private Resource maxAlloc;
private Resource totalCapacity;
private Clock clock;
private QueueMetrics queueMetrics;
private SharingPolicy policy;
private ReservationAgent agent;
private Planner replanner;
@Before
public void setUp() throws PlanningException {
resCalc = new DefaultResourceCalculator();
minAlloc = Resource.newInstance(1024, 1);
maxAlloc = Resource.newInstance(64 * 1024, 20);
totalCapacity = Resource.newInstance(100 * 1024, 100);
clock = mock(Clock.class);
queueMetrics = mock(QueueMetrics.class);
policy = mock(SharingPolicy.class);
replanner = mock(Planner.class);
when(clock.getTime()).thenReturn(1L);
}
@After
public void tearDown() {
resCalc = null;
minAlloc = null;
maxAlloc = null;
totalCapacity = null;
clock = null;
queueMetrics = null;
policy = null;
replanner = null;
}
@Test
public void testAddReservation() {
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, false);
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length, allocations, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getConsumptionForUser(user, start + i));
}
}
@Test
public void testAddEmptyReservation() {
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
int[] alloc = {};
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations =
new HashMap<ReservationInterval, ReservationRequest>();
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length, allocations, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
}
@Test
public void testAddReservationAlreadyExists() {
// First add a reservation
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, false);
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length, allocations, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getConsumptionForUser(user, start + i));
}
// Try to add it again
try {
plan.addReservation(rAllocation);
Assert.fail("Add should fail as it already exists");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().endsWith("already exists"));
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
}
@Test
public void testUpdateReservation() {
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
// First add a reservation
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, false);
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length, allocations, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getConsumptionForUser(user, start + i));
}
// Now update it
start = 110;
int[] updatedAlloc = { 0, 5, 10, 10, 5, 0 };
allocations = generateAllocation(start, updatedAlloc, true);
rDef =
createSimpleReservationDefinition(start, start + updatedAlloc.length,
updatedAlloc.length, allocations.values());
rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + updatedAlloc.length, allocations, resCalc, minAlloc);
try {
plan.updateReservation(rAllocation);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
for (int i = 0; i < updatedAlloc.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
+ i), plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
+ i), plan.getConsumptionForUser(user, start + i));
}
}
@Test
public void testUpdateNonExistingReservation() {
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
// Try to update a reservation without adding
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, false);
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length, allocations, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.updateReservation(rAllocation);
Assert.fail("Update should fail as it does not exist in the plan");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().endsWith("does not exist in the plan"));
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
Assert.assertNull(plan.getReservationById(reservationID));
}
@Test
public void testDeleteReservation() {
// First add a reservation
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, true);
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length, allocations, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
plan.getConsumptionForUser(user, start + i));
}
// Now delete it
try {
plan.deleteReservation(reservationID);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
Assert.assertNull(plan.getReservationById(reservationID));
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(0, 0),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(0, 0),
plan.getConsumptionForUser(user, start + i));
}
}
@Test
public void testDeleteNonExistingReservation() {
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
// Try to delete a reservation without adding
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.deleteReservation(reservationID);
Assert.fail("Delete should fail as it does not exist in the plan");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().endsWith("does not exist in the plan"));
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
Assert.assertNull(plan.getReservationById(reservationID));
}
@Test
public void testArchiveCompletedReservations() {
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true);
ReservationId reservationID1 =
ReservationSystemTestUtil.getNewReservationId();
// First add a reservation
int[] alloc1 = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations1 =
generateAllocation(start, alloc1, false);
ReservationDefinition rDef1 =
createSimpleReservationDefinition(start, start + alloc1.length,
alloc1.length, allocations1.values());
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID1, rDef1, user,
planName, start, start + alloc1.length, allocations1, resCalc,
minAlloc);
Assert.assertNull(plan.getReservationById(reservationID1));
try {
plan.addReservation(rAllocation);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
for (int i = 0; i < alloc1.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
plan.getConsumptionForUser(user, start + i));
}
// Now add another one
ReservationId reservationID2 =
ReservationSystemTestUtil.getNewReservationId();
int[] alloc2 = { 0, 5, 10, 5, 0 };
Map<ReservationInterval, ReservationRequest> allocations2 =
generateAllocation(start, alloc2, true);
ReservationDefinition rDef2 =
createSimpleReservationDefinition(start, start + alloc2.length,
alloc2.length, allocations2.values());
rAllocation =
new InMemoryReservationAllocation(reservationID2, rDef2, user,
planName, start, start + alloc2.length, allocations2, resCalc,
minAlloc);
Assert.assertNull(plan.getReservationById(reservationID2));
try {
plan.addReservation(rAllocation);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(plan.getReservationById(reservationID2));
for (int i = 0; i < alloc2.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
+ alloc2[i] + i), plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
+ alloc2[i] + i), plan.getConsumptionForUser(user, start + i));
}
// Now archive completed reservations
when(clock.getTime()).thenReturn(106L);
when(policy.getValidWindow()).thenReturn(1L);
try {
// will only remove 2nd reservation as only that has fallen out of the
// archival window
plan.archiveCompletedReservations(clock.getTime());
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(plan.getReservationById(reservationID1));
Assert.assertNull(plan.getReservationById(reservationID2));
for (int i = 0; i < alloc1.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
plan.getConsumptionForUser(user, start + i));
}
when(clock.getTime()).thenReturn(107L);
try {
// will remove 1st reservation also as it has fallen out of the archival
// window
plan.archiveCompletedReservations(clock.getTime());
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
Assert.assertNull(plan.getReservationById(reservationID1));
for (int i = 0; i < alloc1.length; i++) {
Assert.assertEquals(Resource.newInstance(0, 0),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(0, 0),
plan.getConsumptionForUser(user, start + i));
}
}
private void doAssertions(Plan plan, ReservationAllocation rAllocation) {
ReservationId reservationID = rAllocation.getReservationId();
Assert.assertNotNull(plan.getReservationById(reservationID));
Assert.assertEquals(rAllocation, plan.getReservationById(reservationID));
Assert.assertTrue(((InMemoryPlan) plan).getAllReservations().size() == 1);
Assert.assertEquals(rAllocation.getEndTime(), plan.getLastEndTime());
Assert.assertEquals(totalCapacity, plan.getTotalCapacity());
Assert.assertEquals(minAlloc, plan.getMinimumAllocation());
Assert.assertEquals(maxAlloc, plan.getMaximumAllocation());
Assert.assertEquals(resCalc, plan.getResourceCalculator());
Assert.assertEquals(planName, plan.getQueueName());
Assert.assertTrue(plan.getMoveOnExpiry());
}
private ReservationDefinition createSimpleReservationDefinition(long arrival,
long deadline, long duration, Collection<ReservationRequest> resources) {
// create a request with a single atomic ask
ReservationDefinition rDef = new ReservationDefinitionPBImpl();
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setReservationResources(new ArrayList<ReservationRequest>(resources));
reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
rDef.setReservationRequests(reqs);
rDef.setArrival(arrival);
rDef.setDeadline(deadline);
return rDef;
}
private Map<ReservationInterval, ReservationRequest> generateAllocation(
int startTime, int[] alloc, boolean isStep) {
Map<ReservationInterval, ReservationRequest> req =
new HashMap<ReservationInterval, ReservationRequest>();
int numContainers = 0;
for (int i = 0; i < alloc.length; i++) {
if (isStep) {
numContainers = alloc[i] + i;
} else {
numContainers = alloc[i];
}
ReservationRequest rr =
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
(numContainers));
req.put(new ReservationInterval(startTime + i, startTime + i + 1), rr);
}
return req;
}
}

View File

@ -0,0 +1,206 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestInMemoryReservationAllocation {
private String user = "yarn";
private String planName = "test-reservation";
private ResourceCalculator resCalc;
private Resource minAlloc;
private Random rand = new Random();
@Before
public void setUp() {
resCalc = new DefaultResourceCalculator();
minAlloc = Resource.newInstance(1, 1);
}
@After
public void tearDown() {
user = null;
planName = null;
resCalc = null;
minAlloc = null;
}
@Test
public void testBlocks() {
ReservationId reservationID =
ReservationId.newInstance(rand.nextLong(), rand.nextLong());
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length + 1,
alloc.length);
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, false, false);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length + 1, allocations, resCalc, minAlloc);
doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
Assert.assertFalse(rAllocation.containsGangs());
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
rAllocation.getResourcesAtTime(start + i));
}
}
@Test
public void testSteps() {
ReservationId reservationID =
ReservationId.newInstance(rand.nextLong(), rand.nextLong());
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length + 1,
alloc.length);
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, true, false);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length + 1, allocations, resCalc, minAlloc);
doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
Assert.assertFalse(rAllocation.containsGangs());
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
rAllocation.getResourcesAtTime(start + i));
}
}
@Test
public void testSkyline() {
ReservationId reservationID =
ReservationId.newInstance(rand.nextLong(), rand.nextLong());
int[] alloc = { 0, 5, 10, 10, 5, 0 };
int start = 100;
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length + 1,
alloc.length);
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, true, false);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length + 1, allocations, resCalc, minAlloc);
doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
Assert.assertFalse(rAllocation.containsGangs());
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
rAllocation.getResourcesAtTime(start + i));
}
}
@Test
public void testZeroAlloaction() {
ReservationId reservationID =
ReservationId.newInstance(rand.nextLong(), rand.nextLong());
int[] alloc = {};
long start = 0;
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length + 1,
alloc.length);
Map<ReservationInterval, ReservationRequest> allocations =
new HashMap<ReservationInterval, ReservationRequest>();
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length + 1, allocations, resCalc, minAlloc);
doAssertions(rAllocation, reservationID, rDef, allocations, (int) start,
alloc);
Assert.assertFalse(rAllocation.containsGangs());
}
@Test
public void testGangAlloaction() {
ReservationId reservationID =
ReservationId.newInstance(rand.nextLong(), rand.nextLong());
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length + 1,
alloc.length);
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, false, true);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length + 1, allocations, resCalc, minAlloc);
doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
Assert.assertTrue(rAllocation.containsGangs());
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
rAllocation.getResourcesAtTime(start + i));
}
}
private void doAssertions(ReservationAllocation rAllocation,
ReservationId reservationID, ReservationDefinition rDef,
Map<ReservationInterval, ReservationRequest> allocations, int start,
int[] alloc) {
Assert.assertEquals(reservationID, rAllocation.getReservationId());
Assert.assertEquals(rDef, rAllocation.getReservationDefinition());
Assert.assertEquals(allocations, rAllocation.getAllocationRequests());
Assert.assertEquals(user, rAllocation.getUser());
Assert.assertEquals(planName, rAllocation.getPlanName());
Assert.assertEquals(start, rAllocation.getStartTime());
Assert.assertEquals(start + alloc.length + 1, rAllocation.getEndTime());
}
private ReservationDefinition createSimpleReservationDefinition(long arrival,
long deadline, long duration) {
// create a request with a single atomic ask
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1,
duration);
ReservationDefinition rDef = new ReservationDefinitionPBImpl();
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setReservationResources(Collections.singletonList(r));
reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
rDef.setReservationRequests(reqs);
rDef.setArrival(arrival);
rDef.setDeadline(deadline);
return rDef;
}
private Map<ReservationInterval, ReservationRequest> generateAllocation(
int startTime, int[] alloc, boolean isStep, boolean isGang) {
Map<ReservationInterval, ReservationRequest> req =
new HashMap<ReservationInterval, ReservationRequest>();
int numContainers = 0;
for (int i = 0; i < alloc.length; i++) {
if (isStep) {
numContainers = alloc[i] + i;
} else {
numContainers = alloc[i];
}
ReservationRequest rr =
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
(numContainers));
if (isGang) {
rr.setConcurrency(numContainers);
}
req.put(new ReservationInterval(startTime + i, startTime + i + 1), rr);
}
return req;
}
}

View File

@ -0,0 +1,169 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestRLESparseResourceAllocation {
private static final Logger LOG = LoggerFactory
.getLogger(TestRLESparseResourceAllocation.class);
@Test
public void testBlocks() {
ResourceCalculator resCalc = new DefaultResourceCalculator();
Resource minAlloc = Resource.newInstance(1, 1);
RLESparseResourceAllocation rleSparseVector =
new RLESparseResourceAllocation(resCalc, minAlloc);
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Set<Entry<ReservationInterval, ReservationRequest>> inputs =
generateAllocation(start, alloc, false).entrySet();
for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
rleSparseVector.addInterval(ip.getKey(), ip.getValue());
}
LOG.info(rleSparseVector.toString());
Assert.assertFalse(rleSparseVector.isEmpty());
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(99));
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + alloc.length + 1));
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
rleSparseVector.getCapacityAtTime(start + i));
}
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
}
LOG.info(rleSparseVector.toString());
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + i));
}
Assert.assertTrue(rleSparseVector.isEmpty());
}
@Test
public void testSteps() {
ResourceCalculator resCalc = new DefaultResourceCalculator();
Resource minAlloc = Resource.newInstance(1, 1);
RLESparseResourceAllocation rleSparseVector =
new RLESparseResourceAllocation(resCalc, minAlloc);
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Set<Entry<ReservationInterval, ReservationRequest>> inputs =
generateAllocation(start, alloc, true).entrySet();
for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
rleSparseVector.addInterval(ip.getKey(), ip.getValue());
}
LOG.info(rleSparseVector.toString());
Assert.assertFalse(rleSparseVector.isEmpty());
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(99));
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + alloc.length + 1));
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
rleSparseVector.getCapacityAtTime(start + i));
}
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
}
LOG.info(rleSparseVector.toString());
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + i));
}
Assert.assertTrue(rleSparseVector.isEmpty());
}
@Test
public void testSkyline() {
ResourceCalculator resCalc = new DefaultResourceCalculator();
Resource minAlloc = Resource.newInstance(1, 1);
RLESparseResourceAllocation rleSparseVector =
new RLESparseResourceAllocation(resCalc, minAlloc);
int[] alloc = { 0, 5, 10, 10, 5, 0 };
int start = 100;
Set<Entry<ReservationInterval, ReservationRequest>> inputs =
generateAllocation(start, alloc, true).entrySet();
for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
rleSparseVector.addInterval(ip.getKey(), ip.getValue());
}
LOG.info(rleSparseVector.toString());
Assert.assertFalse(rleSparseVector.isEmpty());
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(99));
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + alloc.length + 1));
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
rleSparseVector.getCapacityAtTime(start + i));
}
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
}
LOG.info(rleSparseVector.toString());
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + i));
}
Assert.assertTrue(rleSparseVector.isEmpty());
}
@Test
public void testZeroAlloaction() {
ResourceCalculator resCalc = new DefaultResourceCalculator();
Resource minAlloc = Resource.newInstance(1, 1);
RLESparseResourceAllocation rleSparseVector =
new RLESparseResourceAllocation(resCalc, minAlloc);
rleSparseVector.addInterval(new ReservationInterval(0, Long.MAX_VALUE),
ReservationRequest.newInstance(Resource.newInstance(0, 0), (0)));
LOG.info(rleSparseVector.toString());
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(new Random().nextLong()));
Assert.assertTrue(rleSparseVector.isEmpty());
}
private Map<ReservationInterval, ReservationRequest> generateAllocation(
int startTime, int[] alloc, boolean isStep) {
Map<ReservationInterval, ReservationRequest> req =
new HashMap<ReservationInterval, ReservationRequest>();
int numContainers = 0;
for (int i = 0; i < alloc.length; i++) {
if (isStep) {
numContainers = alloc[i] + i;
} else {
numContainers = alloc[i];
}
req.put(new ReservationInterval(startTime + i, startTime + i + 1),
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
(numContainers)));
}
return req;
}
}