YARN-1709. In-memory data structures used to track resources over time to enable reservations.

(cherry picked from commit 0d8b2cd88b)
This commit is contained in:
subru 2014-09-12 17:22:08 -07:00 committed by Chris Douglas
parent c9266df404
commit cf4b34282a
15 changed files with 2534 additions and 0 deletions

View File

@ -5,3 +5,6 @@ YARN-2475. Logic for responding to capacity drops for the
ReservationSystem. (Carlo Curino and Subru Krishnan via curino)
YARN-1708. Public YARN APIs for creating/updating/deleting reservations. (subru)
YARN-1709. In-memory data structures used to track resources over time to
enable reservations. (subru)

View File

@ -0,0 +1,507 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class InMemoryPlan implements Plan {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class);
private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
private TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>> currentReservations =
new TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>>();
private RLESparseResourceAllocation rleSparseVector;
private Map<String, RLESparseResourceAllocation> userResourceAlloc =
new HashMap<String, RLESparseResourceAllocation>();
private Map<ReservationId, InMemoryReservationAllocation> reservationTable =
new HashMap<ReservationId, InMemoryReservationAllocation>();
private final ReentrantReadWriteLock readWriteLock =
new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
private final SharingPolicy policy;
private final ReservationAgent agent;
private final long step;
private final ResourceCalculator resCalc;
private final Resource minAlloc, maxAlloc;
private final String queueName;
private final QueueMetrics queueMetrics;
private final Planner replanner;
private final boolean getMoveOnExpiry;
private final Clock clock;
private Resource totalCapacity;
InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
ReservationAgent agent, Resource totalCapacity, long step,
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
String queueName, Planner replanner, boolean getMoveOnExpiry) {
this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock());
}
InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
ReservationAgent agent, Resource totalCapacity, long step,
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) {
this.queueMetrics = queueMetrics;
this.policy = policy;
this.agent = agent;
this.step = step;
this.totalCapacity = totalCapacity;
this.resCalc = resCalc;
this.minAlloc = minAlloc;
this.maxAlloc = maxAlloc;
this.rleSparseVector = new RLESparseResourceAllocation(resCalc, minAlloc);
this.queueName = queueName;
this.replanner = replanner;
this.getMoveOnExpiry = getMoveOnExpiry;
this.clock = clock;
}
@Override
public QueueMetrics getQueueMetrics() {
return queueMetrics;
}
private void incrementAllocation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
Map<ReservationInterval, ReservationRequest> allocationRequests =
reservation.getAllocationRequests();
// check if we have encountered the user earlier and if not add an entry
String user = reservation.getUser();
RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
if (resAlloc == null) {
resAlloc = new RLESparseResourceAllocation(resCalc, minAlloc);
userResourceAlloc.put(user, resAlloc);
}
for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
.entrySet()) {
resAlloc.addInterval(r.getKey(), r.getValue());
rleSparseVector.addInterval(r.getKey(), r.getValue());
}
}
private void decrementAllocation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
Map<ReservationInterval, ReservationRequest> allocationRequests =
reservation.getAllocationRequests();
String user = reservation.getUser();
RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
.entrySet()) {
resAlloc.removeInterval(r.getKey(), r.getValue());
rleSparseVector.removeInterval(r.getKey(), r.getValue());
}
if (resAlloc.isEmpty()) {
userResourceAlloc.remove(resAlloc);
}
}
public Set<ReservationAllocation> getAllReservations() {
readLock.lock();
try {
if (currentReservations != null) {
Set<ReservationAllocation> flattenedReservations =
new HashSet<ReservationAllocation>();
for (Set<InMemoryReservationAllocation> reservationEntries : currentReservations
.values()) {
flattenedReservations.addAll(reservationEntries);
}
return flattenedReservations;
} else {
return null;
}
} finally {
readLock.unlock();
}
}
@Override
public boolean addReservation(ReservationAllocation reservation)
throws PlanningException {
// Verify the allocation is memory based otherwise it is not supported
InMemoryReservationAllocation inMemReservation =
(InMemoryReservationAllocation) reservation;
if (inMemReservation.getUser() == null) {
String errMsg =
"The specified Reservation with ID "
+ inMemReservation.getReservationId()
+ " is not mapped to any user";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
writeLock.lock();
try {
if (reservationTable.containsKey(inMemReservation.getReservationId())) {
String errMsg =
"The specified Reservation with ID "
+ inMemReservation.getReservationId() + " already exists";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
// Validate if we can accept this reservation, throws exception if
// validation fails
policy.validate(this, inMemReservation);
// we record here the time in which the allocation has been accepted
reservation.setAcceptanceTimestamp(clock.getTime());
ReservationInterval searchInterval =
new ReservationInterval(inMemReservation.getStartTime(),
inMemReservation.getEndTime());
Set<InMemoryReservationAllocation> reservations =
currentReservations.get(searchInterval);
if (reservations == null) {
reservations = new HashSet<InMemoryReservationAllocation>();
}
if (!reservations.add(inMemReservation)) {
LOG.error("Unable to add reservation: {} to plan.",
inMemReservation.getReservationId());
return false;
}
currentReservations.put(searchInterval, reservations);
reservationTable.put(inMemReservation.getReservationId(),
inMemReservation);
incrementAllocation(inMemReservation);
LOG.info("Sucessfully added reservation: {} to plan.",
inMemReservation.getReservationId());
return true;
} finally {
writeLock.unlock();
}
}
@Override
public boolean updateReservation(ReservationAllocation reservation)
throws PlanningException {
writeLock.lock();
boolean result = false;
try {
ReservationId resId = reservation.getReservationId();
ReservationAllocation currReservation = getReservationById(resId);
if (currReservation == null) {
String errMsg =
"The specified Reservation with ID " + resId
+ " does not exist in the plan";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
if (!removeReservation(currReservation)) {
LOG.error("Unable to replace reservation: {} from plan.",
reservation.getReservationId());
return result;
}
try {
result = addReservation(reservation);
} catch (PlanningException e) {
LOG.error("Unable to update reservation: {} from plan due to {}.",
reservation.getReservationId(), e.getMessage());
}
if (result) {
LOG.info("Sucessfully updated reservation: {} in plan.",
reservation.getReservationId());
return result;
} else {
// rollback delete
addReservation(currReservation);
LOG.info("Rollbacked update reservation: {} from plan.",
reservation.getReservationId());
return result;
}
} finally {
writeLock.unlock();
}
}
private boolean removeReservation(ReservationAllocation reservation) {
assert (readWriteLock.isWriteLockedByCurrentThread());
ReservationInterval searchInterval =
new ReservationInterval(reservation.getStartTime(),
reservation.getEndTime());
Set<InMemoryReservationAllocation> reservations =
currentReservations.get(searchInterval);
if (reservations != null) {
if (!reservations.remove(reservation)) {
LOG.error("Unable to remove reservation: {} from plan.",
reservation.getReservationId());
return false;
}
if (reservations.isEmpty()) {
currentReservations.remove(searchInterval);
}
} else {
String errMsg =
"The specified Reservation with ID " + reservation.getReservationId()
+ " does not exist in the plan";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
reservationTable.remove(reservation.getReservationId());
decrementAllocation(reservation);
LOG.info("Sucessfully deleted reservation: {} in plan.",
reservation.getReservationId());
return true;
}
@Override
public boolean deleteReservation(ReservationId reservationID) {
writeLock.lock();
try {
ReservationAllocation reservation = getReservationById(reservationID);
if (reservation == null) {
String errMsg =
"The specified Reservation with ID " + reservationID
+ " does not exist in the plan";
LOG.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
return removeReservation(reservation);
} finally {
writeLock.unlock();
}
}
@Override
public void archiveCompletedReservations(long tick) {
// Since we are looking for old reservations, read lock is optimal
LOG.debug("Running archival at time: {}", tick);
readLock.lock();
List<InMemoryReservationAllocation> expiredReservations =
new ArrayList<InMemoryReservationAllocation>();
// archive reservations and delete the ones which are beyond
// the reservation policy "window"
try {
long archivalTime = tick - policy.getValidWindow();
ReservationInterval searchInterval =
new ReservationInterval(archivalTime, archivalTime);
SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> reservations =
currentReservations.headMap(searchInterval, true);
if (!reservations.isEmpty()) {
for (Set<InMemoryReservationAllocation> reservationEntries : reservations
.values()) {
for (InMemoryReservationAllocation reservation : reservationEntries) {
if (reservation.getEndTime() <= archivalTime) {
expiredReservations.add(reservation);
}
}
}
}
} finally {
readLock.unlock();
}
if (expiredReservations.isEmpty()) {
return;
}
// Need write lock only if there are any reservations to be deleted
writeLock.lock();
try {
for (InMemoryReservationAllocation expiredReservation : expiredReservations) {
removeReservation(expiredReservation);
}
} finally {
writeLock.unlock();
}
}
@Override
public Set<ReservationAllocation> getReservationsAtTime(long tick) {
readLock.lock();
ReservationInterval searchInterval =
new ReservationInterval(tick, Long.MAX_VALUE);
try {
SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> reservations =
currentReservations.headMap(searchInterval, true);
if (!reservations.isEmpty()) {
Set<ReservationAllocation> flattenedReservations =
new HashSet<ReservationAllocation>();
for (Set<InMemoryReservationAllocation> reservationEntries : reservations
.values()) {
for (InMemoryReservationAllocation reservation : reservationEntries) {
if (reservation.getEndTime() > tick) {
flattenedReservations.add(reservation);
}
}
}
return Collections.unmodifiableSet(flattenedReservations);
} else {
return Collections.emptySet();
}
} finally {
readLock.unlock();
}
}
@Override
public long getStep() {
return step;
}
@Override
public SharingPolicy getSharingPolicy() {
return policy;
}
@Override
public ReservationAgent getReservationAgent() {
return agent;
}
@Override
public Resource getConsumptionForUser(String user, long t) {
readLock.lock();
try {
RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user);
if (userResAlloc != null) {
return userResAlloc.getCapacityAtTime(t);
} else {
return Resources.clone(ZERO_RESOURCE);
}
} finally {
readLock.unlock();
}
}
@Override
public Resource getTotalCommittedResources(long t) {
readLock.lock();
try {
return rleSparseVector.getCapacityAtTime(t);
} finally {
readLock.unlock();
}
}
@Override
public ReservationAllocation getReservationById(ReservationId reservationID) {
if (reservationID == null) {
return null;
}
readLock.lock();
try {
return reservationTable.get(reservationID);
} finally {
readLock.unlock();
}
}
@Override
public Resource getTotalCapacity() {
readLock.lock();
try {
return Resources.clone(totalCapacity);
} finally {
readLock.unlock();
}
}
@Override
public Resource getMinimumAllocation() {
return Resources.clone(minAlloc);
}
@Override
public void setTotalCapacity(Resource cap) {
writeLock.lock();
try {
totalCapacity = Resources.clone(cap);
} finally {
writeLock.unlock();
}
}
public long getEarliestStartTime() {
readLock.lock();
try {
return rleSparseVector.getEarliestStartTime();
} finally {
readLock.unlock();
}
}
@Override
public long getLastEndTime() {
readLock.lock();
try {
return rleSparseVector.getLatestEndTime();
} finally {
readLock.unlock();
}
}
@Override
public ResourceCalculator getResourceCalculator() {
return resCalc;
}
@Override
public String getQueueName() {
return queueName;
}
@Override
public Resource getMaximumAllocation() {
return Resources.clone(maxAlloc);
}
public String toCumulativeString() {
readLock.lock();
try {
return rleSparseVector.toString();
} finally {
readLock.unlock();
}
}
@Override
public Planner getReplanner() {
return replanner;
}
@Override
public boolean getMoveOnExpiry() {
return getMoveOnExpiry;
}
@Override
public String toString() {
readLock.lock();
try {
StringBuffer planStr = new StringBuffer("In-memory Plan: ");
planStr.append("Parent Queue: ").append(queueName)
.append("Total Capacity: ").append(totalCapacity).append("Step: ")
.append(step);
for (ReservationAllocation reservation : getAllReservations()) {
planStr.append(reservation);
}
return planStr.toString();
} finally {
readLock.unlock();
}
}
}

View File

@ -0,0 +1,151 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Collections;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* An in memory implementation of a reservation allocation using the
* {@link RLESparseResourceAllocation}
*
*/
class InMemoryReservationAllocation implements ReservationAllocation {
private final String planName;
private final ReservationId reservationID;
private final String user;
private final ReservationDefinition contract;
private final long startTime;
private final long endTime;
private final Map<ReservationInterval, ReservationRequest> allocationRequests;
private boolean hasGang = false;
private long acceptedAt = -1;
private RLESparseResourceAllocation resourcesOverTime;
InMemoryReservationAllocation(ReservationId reservationID,
ReservationDefinition contract, String user, String planName,
long startTime, long endTime,
Map<ReservationInterval, ReservationRequest> allocationRequests,
ResourceCalculator calculator, Resource minAlloc) {
this.contract = contract;
this.startTime = startTime;
this.endTime = endTime;
this.reservationID = reservationID;
this.user = user;
this.allocationRequests = allocationRequests;
this.planName = planName;
resourcesOverTime = new RLESparseResourceAllocation(calculator, minAlloc);
for (Map.Entry<ReservationInterval, ReservationRequest> r : allocationRequests
.entrySet()) {
resourcesOverTime.addInterval(r.getKey(), r.getValue());
if (r.getValue().getConcurrency() > 1) {
hasGang = true;
}
}
}
@Override
public ReservationId getReservationId() {
return reservationID;
}
@Override
public ReservationDefinition getReservationDefinition() {
return contract;
}
@Override
public long getStartTime() {
return startTime;
}
@Override
public long getEndTime() {
return endTime;
}
@Override
public Map<ReservationInterval, ReservationRequest> getAllocationRequests() {
return Collections.unmodifiableMap(allocationRequests);
}
@Override
public String getPlanName() {
return planName;
}
@Override
public String getUser() {
return user;
}
@Override
public boolean containsGangs() {
return hasGang;
}
@Override
public void setAcceptanceTimestamp(long acceptedAt) {
this.acceptedAt = acceptedAt;
}
@Override
public long getAcceptanceTime() {
return acceptedAt;
}
@Override
public Resource getResourcesAtTime(long tick) {
if (tick < startTime || tick >= endTime) {
return Resource.newInstance(0, 0);
}
return Resources.clone(resourcesOverTime.getCapacityAtTime(tick));
}
@Override
public String toString() {
StringBuilder sBuf = new StringBuilder();
sBuf.append(getReservationId()).append(" user:").append(getUser())
.append(" startTime: ").append(getStartTime()).append(" endTime: ")
.append(getEndTime()).append(" alloc:[")
.append(resourcesOverTime.toString()).append("] ");
return sBuf.toString();
}
@Override
public int compareTo(ReservationAllocation other) {
// reverse order of acceptance
if (this.getAcceptanceTime() > other.getAcceptanceTime()) {
return -1;
}
if (this.getAcceptanceTime() < other.getAcceptanceTime()) {
return 1;
}
return 0;
}
@Override
public int hashCode() {
return reservationID.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
InMemoryReservationAllocation other = (InMemoryReservationAllocation) obj;
return this.reservationID.equals(other.getReservationId());
}
}

View File

@ -0,0 +1,32 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
/**
* A Plan represents the central data structure of a reservation system that
* maintains the "agenda" for the cluster. In particular, it maintains
* information on how a set of {@link ReservationDefinition} that have been
* previously accepted will be honored.
*
* {@link ReservationDefinition} submitted by the users through the RM public
* APIs are passed to appropriate {@link ReservationAgent}s, which in turn will
* consult the Plan (via the {@link PlanView} interface) and try to determine
* whether there are sufficient resources available in this Plan to satisfy the
* temporal and resource constraints of a {@link ReservationDefinition}. If a
* valid allocation is found the agent will try to store it in the plan (via the
* {@link PlanEdit} interface). Upon success the system return to the user a
* positive acknowledgment, and a reservation identifier to be later used to
* access the reserved resources.
*
* A {@link PlanFollower} will continuously read from the Plan and will
* affect the instantaneous allocation of resources among jobs running by
* publishing the "current" slice of the Plan to the underlying scheduler. I.e.,
* the configuration of queues/weights of the scheduler are modified to reflect
* the allocations in the Plan.
*
* As this interface have several methods we decompose them into three groups:
* {@link PlanContext}: containing configuration type information,
* {@link PlanView} read-only access to the plan state, and {@link PlanEdit}
* write access to the plan state.
*/
public interface Plan extends PlanContext, PlanView, PlanEdit {
}

View File

@ -0,0 +1,101 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
/**
* This interface provides read-only access to configuration-type parameter for
* a plan.
*
*/
public interface PlanContext {
/**
* Returns the configured "step" or granularity of time of the plan in millis.
*
* @return plan step in millis
*/
public long getStep();
/**
* Return the {@link ReservationAgent} configured for this plan that is
* responsible for optimally placing various reservation requests
*
* @return the {@link ReservationAgent} configured for this plan
*/
public ReservationAgent getReservationAgent();
/**
* Return an instance of a {@link Planner}, which will be invoked in response
* to unexpected reduction in the resources of this plan
*
* @return an instance of a {@link Planner}, which will be invoked in response
* to unexpected reduction in the resources of this plan
*/
public Planner getReplanner();
/**
* Return the configured {@link SharingPolicy} that governs the sharing of the
* resources of the plan between its various users
*
* @return the configured {@link SharingPolicy} that governs the sharing of
* the resources of the plan between its various users
*/
public SharingPolicy getSharingPolicy();
/**
* Returns the system {@link ResourceCalculator}
*
* @return the system {@link ResourceCalculator}
*/
public ResourceCalculator getResourceCalculator();
/**
* Returns the single smallest {@link Resource} allocation that can be
* reserved in this plan
*
* @return the single smallest {@link Resource} allocation that can be
* reserved in this plan
*/
public Resource getMinimumAllocation();
/**
* Returns the single largest {@link Resource} allocation that can be reserved
* in this plan
*
* @return the single largest {@link Resource} allocation that can be reserved
* in this plan
*/
public Resource getMaximumAllocation();
/**
* Return the name of the queue in the {@link ResourceScheduler} corresponding
* to this plan
*
* @return the name of the queue in the {@link ResourceScheduler}
* corresponding to this plan
*/
public String getQueueName();
/**
* Return the {@link QueueMetrics} for the queue in the
* {@link ResourceScheduler} corresponding to this plan
*
* @return the {@link QueueMetrics} for the queue in the
* {@link ResourceScheduler} corresponding to this plan
*/
public QueueMetrics getQueueMetrics();
/**
* Instructs the {@link PlanFollower} on what to do for applications
* which are still running when the reservation is expiring (move-to-default
* vs kill)
*
* @return true if remaining applications have to be killed, false if they
* have to migrated
*/
public boolean getMoveOnExpiry();
}

View File

@ -0,0 +1,61 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
/**
* This interface groups the methods used to modify the state of a Plan.
*/
public interface PlanEdit extends PlanContext, PlanView {
/**
* Add a new {@link ReservationAllocation} to the plan
*
* @param reservation the {@link ReservationAllocation} to be added to the
* plan
* @return true if addition is successful, false otherwise
*/
public boolean addReservation(ReservationAllocation reservation)
throws PlanningException;
/**
* Updates an existing {@link ReservationAllocation} in the plan. This is
* required for re-negotiation
*
* @param reservation the {@link ReservationAllocation} to be updated the plan
* @return true if update is successful, false otherwise
*/
public boolean updateReservation(ReservationAllocation reservation)
throws PlanningException;
/**
* Delete an existing {@link ReservationAllocation} from the plan identified
* uniquely by its {@link ReservationId}. This will generally be used for
* garbage collection
*
* @param reservation the {@link ReservationAllocation} to be deleted from the
* plan identified uniquely by its {@link ReservationId}
* @return true if delete is successful, false otherwise
*/
public boolean deleteReservation(ReservationId reservationID)
throws PlanningException;
/**
* Method invoked to garbage collect old reservations. It cleans up expired
* reservations that have fallen out of the sliding archival window
*
* @param tick the current time from which the archival window is computed
*/
public void archiveCompletedReservations(long tick) throws PlanningException;
/**
* Sets the overall capacity in terms of {@link Resource} assigned to this
* plan
*
* @param capacity the overall capacity in terms of {@link Resource} assigned
* to this plan
*/
public void setTotalCapacity(Resource capacity);
}

View File

@ -0,0 +1,89 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
/**
* This interface provides a read-only view on the allocations made in this
* plan. This methods are used for example by {@link ReservationAgent}s to
* determine the free resources in a certain point in time, and by
* PlanFollowerPolicy to publish this plan to the scheduler.
*/
public interface PlanView extends PlanContext {
/**
* Return a {@link ReservationAllocation} identified by its
* {@link ReservationId}
*
* @param reservationID the unique id to identify the
* {@link ReservationAllocation}
* @return {@link ReservationAllocation} identified by the specified id
*/
public ReservationAllocation getReservationById(ReservationId reservationID);
/**
* Gets all the active reservations at the specified point of time
*
* @param tick the time (UTC in ms) for which the active reservations are
* requested
* @return set of active reservations at the specified time
*/
public Set<ReservationAllocation> getReservationsAtTime(long tick);
/**
* Gets all the reservations in the plan
*
* @return set of all reservations handled by this Plan
*/
public Set<ReservationAllocation> getAllReservations();
/**
* Returns the total {@link Resource} reserved for all users at the specified
* time
*
* @param tick the time (UTC in ms) for which the reserved resources are
* requested
* @return the total {@link Resource} reserved for all users at the specified
* time
*/
public Resource getTotalCommittedResources(long tick);
/**
* Returns the total {@link Resource} reserved for a given user at the
* specified time
*
* @param user the user who made the reservation(s)
* @param tick the time (UTC in ms) for which the reserved resources are
* requested
* @return the total {@link Resource} reserved for a given user at the
* specified time
*/
public Resource getConsumptionForUser(String user, long tick);
/**
* Returns the overall capacity in terms of {@link Resource} assigned to this
* plan (typically will correspond to the absolute capacity of the
* corresponding queue).
*
* @return the overall capacity in terms of {@link Resource} assigned to this
* plan
*/
public Resource getTotalCapacity();
/**
* Gets the time (UTC in ms) at which the first reservation starts
*
* @return the time (UTC in ms) at which the first reservation starts
*/
public long getEarliestStartTime();
/**
* Returns the time (UTC in ms) at which the last reservation terminates
*
* @return the time (UTC in ms) at which the last reservation terminates
*/
public long getLastEndTime();
}

View File

@ -0,0 +1,332 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.gson.stream.JsonWriter;
/**
* This is a run length encoded sparse data structure that maintains resource
* allocations over time
*/
public class RLESparseResourceAllocation {
private static final int THRESHOLD = 100;
private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
private TreeMap<Long, Resource> cumulativeCapacity =
new TreeMap<Long, Resource>();
private final ReentrantReadWriteLock readWriteLock =
new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
private final ResourceCalculator resourceCalculator;
private final Resource minAlloc;
public RLESparseResourceAllocation(ResourceCalculator resourceCalculator,
Resource minAlloc) {
this.resourceCalculator = resourceCalculator;
this.minAlloc = minAlloc;
}
private boolean isSameAsPrevious(Long key, Resource capacity) {
Entry<Long, Resource> previous = cumulativeCapacity.lowerEntry(key);
return (previous != null && previous.getValue().equals(capacity));
}
private boolean isSameAsNext(Long key, Resource capacity) {
Entry<Long, Resource> next = cumulativeCapacity.higherEntry(key);
return (next != null && next.getValue().equals(capacity));
}
/**
* Add a resource for the specified interval
*
* @param reservationInterval the interval for which the resource is to be
* added
* @param capacity the resource to be added
* @return true if addition is successful, false otherwise
*/
public boolean addInterval(ReservationInterval reservationInterval,
ReservationRequest capacity) {
Resource totCap =
Resources.multiply(capacity.getCapability(),
(float) capacity.getNumContainers());
if (totCap.equals(ZERO_RESOURCE)) {
return true;
}
writeLock.lock();
try {
long startKey = reservationInterval.getStartTime();
long endKey = reservationInterval.getEndTime();
NavigableMap<Long, Resource> ticks =
cumulativeCapacity.headMap(endKey, false);
if (ticks != null && !ticks.isEmpty()) {
Resource updatedCapacity = Resource.newInstance(0, 0);
Entry<Long, Resource> lowEntry = ticks.floorEntry(startKey);
if (lowEntry == null) {
// This is the earliest starting interval
cumulativeCapacity.put(startKey, totCap);
} else {
updatedCapacity = Resources.add(lowEntry.getValue(), totCap);
// Add a new tick only if the updated value is different
// from the previous tick
if ((startKey == lowEntry.getKey())
&& (isSameAsPrevious(lowEntry.getKey(), updatedCapacity))) {
cumulativeCapacity.remove(lowEntry.getKey());
} else {
cumulativeCapacity.put(startKey, updatedCapacity);
}
}
// Increase all the capacities of overlapping intervals
Set<Entry<Long, Resource>> overlapSet =
ticks.tailMap(startKey, false).entrySet();
for (Entry<Long, Resource> entry : overlapSet) {
updatedCapacity = Resources.add(entry.getValue(), totCap);
entry.setValue(updatedCapacity);
}
} else {
// This is the first interval to be added
cumulativeCapacity.put(startKey, totCap);
}
Resource nextTick = cumulativeCapacity.get(endKey);
if (nextTick != null) {
// If there is overlap, remove the duplicate entry
if (isSameAsPrevious(endKey, nextTick)) {
cumulativeCapacity.remove(endKey);
}
} else {
// Decrease capacity as this is end of the interval
cumulativeCapacity.put(endKey, Resources.subtract(cumulativeCapacity
.floorEntry(endKey).getValue(), totCap));
}
return true;
} finally {
writeLock.unlock();
}
}
/**
* Add multiple resources for the specified interval
*
* @param reservationInterval the interval for which the resource is to be
* added
* @param ReservationRequests the resources to be added
* @param clusterResource the total resources in the cluster
* @return true if addition is successful, false otherwise
*/
public boolean addCompositeInterval(ReservationInterval reservationInterval,
List<ReservationRequest> ReservationRequests, Resource clusterResource) {
ReservationRequest aggregateReservationRequest =
Records.newRecord(ReservationRequest.class);
Resource capacity = Resource.newInstance(0, 0);
for (ReservationRequest ReservationRequest : ReservationRequests) {
Resources.addTo(capacity, Resources.multiply(
ReservationRequest.getCapability(),
ReservationRequest.getNumContainers()));
}
aggregateReservationRequest.setNumContainers((int) Math.ceil(Resources
.divide(resourceCalculator, clusterResource, capacity, minAlloc)));
aggregateReservationRequest.setCapability(minAlloc);
return addInterval(reservationInterval, aggregateReservationRequest);
}
/**
* Removes a resource for the specified interval
*
* @param reservationInterval the interval for which the resource is to be
* removed
* @param capacity the resource to be removed
* @return true if removal is successful, false otherwise
*/
public boolean removeInterval(ReservationInterval reservationInterval,
ReservationRequest capacity) {
Resource totCap =
Resources.multiply(capacity.getCapability(),
(float) capacity.getNumContainers());
if (totCap.equals(ZERO_RESOURCE)) {
return true;
}
writeLock.lock();
try {
long startKey = reservationInterval.getStartTime();
long endKey = reservationInterval.getEndTime();
// update the start key
NavigableMap<Long, Resource> ticks =
cumulativeCapacity.headMap(endKey, false);
// Decrease all the capacities of overlapping intervals
SortedMap<Long, Resource> overlapSet = ticks.tailMap(startKey);
if (overlapSet != null && !overlapSet.isEmpty()) {
Resource updatedCapacity = Resource.newInstance(0, 0);
long currentKey = -1;
for (Iterator<Entry<Long, Resource>> overlapEntries =
overlapSet.entrySet().iterator(); overlapEntries.hasNext();) {
Entry<Long, Resource> entry = overlapEntries.next();
currentKey = entry.getKey();
updatedCapacity = Resources.subtract(entry.getValue(), totCap);
// update each entry between start and end key
cumulativeCapacity.put(currentKey, updatedCapacity);
}
// Remove the first overlap entry if it is same as previous after
// updation
Long firstKey = overlapSet.firstKey();
if (isSameAsPrevious(firstKey, overlapSet.get(firstKey))) {
cumulativeCapacity.remove(firstKey);
}
// Remove the next entry if it is same as end entry after updation
if ((currentKey != -1) && (isSameAsNext(currentKey, updatedCapacity))) {
cumulativeCapacity.remove(cumulativeCapacity.higherKey(currentKey));
}
}
return true;
} finally {
writeLock.unlock();
}
}
/**
* Returns the capacity, i.e. total resources allocated at the specified point
* of time
*
* @param tick the time (UTC in ms) at which the capacity is requested
* @return the resources allocated at the specified time
*/
public Resource getCapacityAtTime(long tick) {
readLock.lock();
try {
Entry<Long, Resource> closestStep = cumulativeCapacity.floorEntry(tick);
if (closestStep != null) {
return Resources.clone(closestStep.getValue());
}
return Resources.clone(ZERO_RESOURCE);
} finally {
readLock.unlock();
}
}
/**
* Get the timestamp of the earliest resource allocation
*
* @return the timestamp of the first resource allocation
*/
public long getEarliestStartTime() {
readLock.lock();
try {
if (cumulativeCapacity.isEmpty()) {
return -1;
} else {
return cumulativeCapacity.firstKey();
}
} finally {
readLock.unlock();
}
}
/**
* Get the timestamp of the latest resource allocation
*
* @return the timestamp of the last resource allocation
*/
public long getLatestEndTime() {
readLock.lock();
try {
if (cumulativeCapacity.isEmpty()) {
return -1;
} else {
return cumulativeCapacity.lastKey();
}
} finally {
readLock.unlock();
}
}
/**
* Returns true if there are no non-zero entries
*
* @return true if there are no allocations or false otherwise
*/
public boolean isEmpty() {
readLock.lock();
try {
if (cumulativeCapacity.isEmpty()) {
return true;
}
// Deletion leaves a single zero entry so check for that
if (cumulativeCapacity.size() == 1) {
return cumulativeCapacity.firstEntry().getValue().equals(ZERO_RESOURCE);
}
return false;
} finally {
readLock.unlock();
}
}
@Override
public String toString() {
StringBuilder ret = new StringBuilder();
readLock.lock();
try {
if (cumulativeCapacity.size() > THRESHOLD) {
ret.append("Number of steps: ").append(cumulativeCapacity.size())
.append(" earliest entry: ").append(cumulativeCapacity.firstKey())
.append(" latest entry: ").append(cumulativeCapacity.lastKey());
} else {
for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) {
ret.append(r.getKey()).append(": ").append(r.getValue())
.append("\n ");
}
}
return ret.toString();
} finally {
readLock.unlock();
}
}
/**
* Returns the JSON string representation of the current resources allocated
* over time
*
* @return the JSON string representation of the current resources allocated
* over time
*/
public String toMemJSONString() {
StringWriter json = new StringWriter();
JsonWriter jsonWriter = new JsonWriter(json);
readLock.lock();
try {
jsonWriter.beginObject();
// jsonWriter.name("timestamp").value("resource");
for (Map.Entry<Long, Resource> r : cumulativeCapacity.entrySet()) {
jsonWriter.name(r.getKey().toString()).value(r.getValue().toString());
}
jsonWriter.endObject();
jsonWriter.close();
return json.toString();
} catch (IOException e) {
// This should not happen
return "";
} finally {
readLock.unlock();
}
}
}

View File

@ -0,0 +1,104 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
/**
* A ReservationAllocation represents a concrete allocation of resources over
* time that satisfy a certain {@link ReservationDefinition}. This is used
* internally by a {@link Plan} to store information about how each of the
* accepted {@link ReservationDefinition} have been allocated.
*/
public interface ReservationAllocation extends
Comparable<ReservationAllocation> {
/**
* Returns the unique identifier {@link ReservationId} that represents the
* reservation
*
* @return reservationId the unique identifier {@link ReservationId} that
* represents the reservation
*/
public ReservationId getReservationId();
/**
* Returns the original {@link ReservationDefinition} submitted by the client
*
* @return
*/
public ReservationDefinition getReservationDefinition();
/**
* Returns the time at which the reservation is activated
*
* @return the time at which the reservation is activated
*/
public long getStartTime();
/**
* Returns the time at which the reservation terminates
*
* @return the time at which the reservation terminates
*/
public long getEndTime();
/**
* Returns the map of resources requested against the time interval for which
* they were
*
* @return the allocationRequests the map of resources requested against the
* time interval for which they were
*/
public Map<ReservationInterval, ReservationRequest> getAllocationRequests();
/**
* Return a string identifying the plan to which the reservation belongs
*
* @return the plan to which the reservation belongs
*/
public String getPlanName();
/**
* Returns the user who requested the reservation
*
* @return the user who requested the reservation
*/
public String getUser();
/**
* Returns whether the reservation has gang semantics or not
*
* @return true if there is a gang request, false otherwise
*/
public boolean containsGangs();
/**
* Sets the time at which the reservation was accepted by the system
*
* @param acceptedAt the time at which the reservation was accepted by the
* system
*/
public void setAcceptanceTimestamp(long acceptedAt);
/**
* Returns the time at which the reservation was accepted by the system
*
* @return the time at which the reservation was accepted by the system
*/
public long getAcceptanceTime();
/**
* Returns the capacity represented by cumulative resources reserved by the
* reservation at the specified point of time
*
* @param tick the time (UTC in ms) for which the reserved resources are
* requested
* @return the resources reserved at the specified time
*/
public Resource getResourcesAtTime(long tick);
}

View File

@ -0,0 +1,67 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
/**
* This represents the time duration of the reservation
*
*/
public class ReservationInterval implements Comparable<ReservationInterval> {
private final long startTime;
private final long endTime;
public ReservationInterval(long startTime, long endTime) {
this.startTime = startTime;
this.endTime = endTime;
}
/**
* Get the start time of the reservation interval
*
* @return the startTime
*/
public long getStartTime() {
return startTime;
}
/**
* Get the end time of the reservation interval
*
* @return the endTime
*/
public long getEndTime() {
return endTime;
}
/**
* Returns whether the interval is active at the specified instant of time
*
* @param tick the instance of the time to check
* @return true if active, false otherwise
*/
public boolean isOverlap(long tick) {
return (startTime <= tick && tick <= endTime);
}
@Override
public int compareTo(ReservationInterval anotherInterval) {
long diff = 0;
if (startTime == anotherInterval.getStartTime()) {
diff = endTime - anotherInterval.getEndTime();
} else {
diff = startTime - anotherInterval.getStartTime();
}
if (diff < 0) {
return -1;
} else if (diff > 0) {
return 1;
} else {
return 0;
}
}
public String toString() {
return "[" + startTime + ", " + endTime + "]";
}
}

View File

@ -0,0 +1,25 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
/**
* Exception thrown by the admission control subsystem when there is a problem
* in trying to find an allocation for a user {@link ReservationSubmissionRequest}.
*/
public class PlanningException extends Exception {
private static final long serialVersionUID = -684069387367879218L;
public PlanningException(String message) {
super(message);
}
public PlanningException(Throwable cause) {
super(cause);
}
public PlanningException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,210 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.junit.Assert;
import org.mockito.Mockito;
public class ReservationSystemTestUtil {
private static Random rand = new Random();
public final static String reservationQ = "dedicated";
public static ReservationId getNewReservationId() {
return ReservationId.newInstance(rand.nextLong(), rand.nextLong());
}
public CapacityScheduler mockCapacityScheduler(int numContainers)
throws IOException {
// stolen from TestCapacityScheduler
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
CapacityScheduler cs = Mockito.spy(new CapacityScheduler());
cs.setConf(new YarnConfiguration());
RMContext mockRmContext =
Mockito.spy(new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null));
cs.setRMContext(mockRmContext);
try {
cs.serviceInit(conf);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
when(mockRmContext.getScheduler()).thenReturn(cs);
Resource r = Resource.newInstance(numContainers * 1024, numContainers);
doReturn(r).when(cs).getClusterResource();
return cs;
}
public static void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
// Define default queue
final String defQ = CapacitySchedulerConfiguration.ROOT + ".default";
conf.setCapacity(defQ, 10);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {
"default", "a", reservationQ });
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 10);
final String dedicated =
CapacitySchedulerConfiguration.ROOT
+ CapacitySchedulerConfiguration.DOT + reservationQ;
conf.setCapacity(dedicated, 80);
// Set as reservation queue
conf.setReservableQueue(dedicated, true);
// Define 2nd-level queues
final String A1 = A + ".a1";
final String A2 = A + ".a2";
conf.setQueues(A, new String[] { "a1", "a2" });
conf.setCapacity(A1, 30);
conf.setCapacity(A2, 70);
}
public String getFullReservationQueueName() {
return CapacitySchedulerConfiguration.ROOT
+ CapacitySchedulerConfiguration.DOT + reservationQ;
}
public String getreservationQueueName() {
return reservationQ;
}
public void updateQueueConfiguration(CapacitySchedulerConfiguration conf,
String newQ) {
// Define default queue
final String prefix =
CapacitySchedulerConfiguration.ROOT
+ CapacitySchedulerConfiguration.DOT;
final String defQ = prefix + "default";
conf.setCapacity(defQ, 5);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {
"default", "a", reservationQ, newQ });
final String A = prefix + "a";
conf.setCapacity(A, 5);
final String dedicated = prefix + reservationQ;
conf.setCapacity(dedicated, 80);
// Set as reservation queue
conf.setReservableQueue(dedicated, true);
conf.setCapacity(prefix + newQ, 10);
// Set as reservation queue
conf.setReservableQueue(prefix + newQ, true);
// Define 2nd-level queues
final String A1 = A + ".a1";
final String A2 = A + ".a2";
conf.setQueues(A, new String[] { "a1", "a2" });
conf.setCapacity(A1, 30);
conf.setCapacity(A2, 70);
}
public static ReservationDefinition generateRandomRR(Random rand, long i) {
rand.setSeed(i);
long now = System.currentTimeMillis();
// start time at random in the next 12 hours
long arrival = rand.nextInt(12 * 3600 * 1000);
// deadline at random in the next day
long deadline = arrival + rand.nextInt(24 * 3600 * 1000);
// create a request with a single atomic ask
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(now + arrival);
rr.setDeadline(now + deadline);
int gang = 1 + rand.nextInt(9);
int par = (rand.nextInt(1000) + 1) * gang;
long dur = rand.nextInt(2 * 3600 * 1000); // random duration within 2h
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(1024, 1), par,
gang, dur);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setReservationResources(Collections.singletonList(r));
rand.nextInt(3);
ReservationRequestInterpreter[] type =
ReservationRequestInterpreter.values();
reqs.setInterpreter(type[rand.nextInt(type.length)]);
rr.setReservationRequests(reqs);
return rr;
}
public static ReservationDefinition generateBigRR(Random rand, long i) {
rand.setSeed(i);
long now = System.currentTimeMillis();
// start time at random in the next 2 hours
long arrival = rand.nextInt(2 * 3600 * 1000);
// deadline at random in the next day
long deadline = rand.nextInt(24 * 3600 * 1000);
// create a request with a single atomic ask
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(now + arrival);
rr.setDeadline(now + deadline);
int gang = 1;
int par = 100000; // 100k tasks
long dur = rand.nextInt(60 * 1000); // 1min tasks
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(1024, 1), par,
gang, dur);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setReservationResources(Collections.singletonList(r));
rand.nextInt(3);
ReservationRequestInterpreter[] type =
ReservationRequestInterpreter.values();
reqs.setInterpreter(type[rand.nextInt(type.length)]);
rr.setReservationRequests(reqs);
return rr;
}
public static Map<ReservationInterval, ReservationRequest> generateAllocation(
long startTime, long step, int[] alloc) {
Map<ReservationInterval, ReservationRequest> req =
new TreeMap<ReservationInterval, ReservationRequest>();
for (int i = 0; i < alloc.length; i++) {
req.put(new ReservationInterval(startTime + i * step, startTime + (i + 1)
* step), ReservationRequest.newInstance(
Resource.newInstance(1024, 1), alloc[i]));
}
return req;
}
}

View File

@ -0,0 +1,477 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestInMemoryPlan {
private String user = "yarn";
private String planName = "test-reservation";
private ResourceCalculator resCalc;
private Resource minAlloc;
private Resource maxAlloc;
private Resource totalCapacity;
private Clock clock;
private QueueMetrics queueMetrics;
private SharingPolicy policy;
private ReservationAgent agent;
private Planner replanner;
@Before
public void setUp() throws PlanningException {
resCalc = new DefaultResourceCalculator();
minAlloc = Resource.newInstance(1024, 1);
maxAlloc = Resource.newInstance(64 * 1024, 20);
totalCapacity = Resource.newInstance(100 * 1024, 100);
clock = mock(Clock.class);
queueMetrics = mock(QueueMetrics.class);
policy = mock(SharingPolicy.class);
replanner = mock(Planner.class);
when(clock.getTime()).thenReturn(1L);
}
@After
public void tearDown() {
resCalc = null;
minAlloc = null;
maxAlloc = null;
totalCapacity = null;
clock = null;
queueMetrics = null;
policy = null;
replanner = null;
}
@Test
public void testAddReservation() {
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, false);
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length, allocations, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getConsumptionForUser(user, start + i));
}
}
@Test
public void testAddEmptyReservation() {
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
int[] alloc = {};
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations =
new HashMap<ReservationInterval, ReservationRequest>();
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length, allocations, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
}
@Test
public void testAddReservationAlreadyExists() {
// First add a reservation
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, false);
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length, allocations, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getConsumptionForUser(user, start + i));
}
// Try to add it again
try {
plan.addReservation(rAllocation);
Assert.fail("Add should fail as it already exists");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().endsWith("already exists"));
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
}
@Test
public void testUpdateReservation() {
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
// First add a reservation
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, false);
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length, allocations, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getConsumptionForUser(user, start + i));
}
// Now update it
start = 110;
int[] updatedAlloc = { 0, 5, 10, 10, 5, 0 };
allocations = generateAllocation(start, updatedAlloc, true);
rDef =
createSimpleReservationDefinition(start, start + updatedAlloc.length,
updatedAlloc.length, allocations.values());
rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + updatedAlloc.length, allocations, resCalc, minAlloc);
try {
plan.updateReservation(rAllocation);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
for (int i = 0; i < updatedAlloc.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
+ i), plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i]
+ i), plan.getConsumptionForUser(user, start + i));
}
}
@Test
public void testUpdateNonExistingReservation() {
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
// Try to update a reservation without adding
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, false);
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length, allocations, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.updateReservation(rAllocation);
Assert.fail("Update should fail as it does not exist in the plan");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().endsWith("does not exist in the plan"));
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
Assert.assertNull(plan.getReservationById(reservationID));
}
@Test
public void testDeleteReservation() {
// First add a reservation
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, true);
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length, allocations, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
plan.getConsumptionForUser(user, start + i));
}
// Now delete it
try {
plan.deleteReservation(reservationID);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
Assert.assertNull(plan.getReservationById(reservationID));
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(0, 0),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(0, 0),
plan.getConsumptionForUser(user, start + i));
}
}
@Test
public void testDeleteNonExistingReservation() {
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
// Try to delete a reservation without adding
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.deleteReservation(reservationID);
Assert.fail("Delete should fail as it does not exist in the plan");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().endsWith("does not exist in the plan"));
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
Assert.assertNull(plan.getReservationById(reservationID));
}
@Test
public void testArchiveCompletedReservations() {
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true);
ReservationId reservationID1 =
ReservationSystemTestUtil.getNewReservationId();
// First add a reservation
int[] alloc1 = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations1 =
generateAllocation(start, alloc1, false);
ReservationDefinition rDef1 =
createSimpleReservationDefinition(start, start + alloc1.length,
alloc1.length, allocations1.values());
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID1, rDef1, user,
planName, start, start + alloc1.length, allocations1, resCalc,
minAlloc);
Assert.assertNull(plan.getReservationById(reservationID1));
try {
plan.addReservation(rAllocation);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
doAssertions(plan, rAllocation);
for (int i = 0; i < alloc1.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
plan.getConsumptionForUser(user, start + i));
}
// Now add another one
ReservationId reservationID2 =
ReservationSystemTestUtil.getNewReservationId();
int[] alloc2 = { 0, 5, 10, 5, 0 };
Map<ReservationInterval, ReservationRequest> allocations2 =
generateAllocation(start, alloc2, true);
ReservationDefinition rDef2 =
createSimpleReservationDefinition(start, start + alloc2.length,
alloc2.length, allocations2.values());
rAllocation =
new InMemoryReservationAllocation(reservationID2, rDef2, user,
planName, start, start + alloc2.length, allocations2, resCalc,
minAlloc);
Assert.assertNull(plan.getReservationById(reservationID2));
try {
plan.addReservation(rAllocation);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(plan.getReservationById(reservationID2));
for (int i = 0; i < alloc2.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
+ alloc2[i] + i), plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i]
+ alloc2[i] + i), plan.getConsumptionForUser(user, start + i));
}
// Now archive completed reservations
when(clock.getTime()).thenReturn(106L);
when(policy.getValidWindow()).thenReturn(1L);
try {
// will only remove 2nd reservation as only that has fallen out of the
// archival window
plan.archiveCompletedReservations(clock.getTime());
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(plan.getReservationById(reservationID1));
Assert.assertNull(plan.getReservationById(reservationID2));
for (int i = 0; i < alloc1.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])),
plan.getConsumptionForUser(user, start + i));
}
when(clock.getTime()).thenReturn(107L);
try {
// will remove 1st reservation also as it has fallen out of the archival
// window
plan.archiveCompletedReservations(clock.getTime());
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
Assert.assertNull(plan.getReservationById(reservationID1));
for (int i = 0; i < alloc1.length; i++) {
Assert.assertEquals(Resource.newInstance(0, 0),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(0, 0),
plan.getConsumptionForUser(user, start + i));
}
}
private void doAssertions(Plan plan, ReservationAllocation rAllocation) {
ReservationId reservationID = rAllocation.getReservationId();
Assert.assertNotNull(plan.getReservationById(reservationID));
Assert.assertEquals(rAllocation, plan.getReservationById(reservationID));
Assert.assertTrue(((InMemoryPlan) plan).getAllReservations().size() == 1);
Assert.assertEquals(rAllocation.getEndTime(), plan.getLastEndTime());
Assert.assertEquals(totalCapacity, plan.getTotalCapacity());
Assert.assertEquals(minAlloc, plan.getMinimumAllocation());
Assert.assertEquals(maxAlloc, plan.getMaximumAllocation());
Assert.assertEquals(resCalc, plan.getResourceCalculator());
Assert.assertEquals(planName, plan.getQueueName());
Assert.assertTrue(plan.getMoveOnExpiry());
}
private ReservationDefinition createSimpleReservationDefinition(long arrival,
long deadline, long duration, Collection<ReservationRequest> resources) {
// create a request with a single atomic ask
ReservationDefinition rDef = new ReservationDefinitionPBImpl();
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setReservationResources(new ArrayList<ReservationRequest>(resources));
reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
rDef.setReservationRequests(reqs);
rDef.setArrival(arrival);
rDef.setDeadline(deadline);
return rDef;
}
private Map<ReservationInterval, ReservationRequest> generateAllocation(
int startTime, int[] alloc, boolean isStep) {
Map<ReservationInterval, ReservationRequest> req =
new HashMap<ReservationInterval, ReservationRequest>();
int numContainers = 0;
for (int i = 0; i < alloc.length; i++) {
if (isStep) {
numContainers = alloc[i] + i;
} else {
numContainers = alloc[i];
}
ReservationRequest rr =
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
(numContainers));
req.put(new ReservationInterval(startTime + i, startTime + i + 1), rr);
}
return req;
}
}

View File

@ -0,0 +1,206 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestInMemoryReservationAllocation {
private String user = "yarn";
private String planName = "test-reservation";
private ResourceCalculator resCalc;
private Resource minAlloc;
private Random rand = new Random();
@Before
public void setUp() {
resCalc = new DefaultResourceCalculator();
minAlloc = Resource.newInstance(1, 1);
}
@After
public void tearDown() {
user = null;
planName = null;
resCalc = null;
minAlloc = null;
}
@Test
public void testBlocks() {
ReservationId reservationID =
ReservationId.newInstance(rand.nextLong(), rand.nextLong());
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length + 1,
alloc.length);
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, false, false);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length + 1, allocations, resCalc, minAlloc);
doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
Assert.assertFalse(rAllocation.containsGangs());
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
rAllocation.getResourcesAtTime(start + i));
}
}
@Test
public void testSteps() {
ReservationId reservationID =
ReservationId.newInstance(rand.nextLong(), rand.nextLong());
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length + 1,
alloc.length);
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, true, false);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length + 1, allocations, resCalc, minAlloc);
doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
Assert.assertFalse(rAllocation.containsGangs());
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
rAllocation.getResourcesAtTime(start + i));
}
}
@Test
public void testSkyline() {
ReservationId reservationID =
ReservationId.newInstance(rand.nextLong(), rand.nextLong());
int[] alloc = { 0, 5, 10, 10, 5, 0 };
int start = 100;
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length + 1,
alloc.length);
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, true, false);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length + 1, allocations, resCalc, minAlloc);
doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
Assert.assertFalse(rAllocation.containsGangs());
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
rAllocation.getResourcesAtTime(start + i));
}
}
@Test
public void testZeroAlloaction() {
ReservationId reservationID =
ReservationId.newInstance(rand.nextLong(), rand.nextLong());
int[] alloc = {};
long start = 0;
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length + 1,
alloc.length);
Map<ReservationInterval, ReservationRequest> allocations =
new HashMap<ReservationInterval, ReservationRequest>();
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length + 1, allocations, resCalc, minAlloc);
doAssertions(rAllocation, reservationID, rDef, allocations, (int) start,
alloc);
Assert.assertFalse(rAllocation.containsGangs());
}
@Test
public void testGangAlloaction() {
ReservationId reservationID =
ReservationId.newInstance(rand.nextLong(), rand.nextLong());
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length + 1,
alloc.length);
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, false, true);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length + 1, allocations, resCalc, minAlloc);
doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc);
Assert.assertTrue(rAllocation.containsGangs());
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
rAllocation.getResourcesAtTime(start + i));
}
}
private void doAssertions(ReservationAllocation rAllocation,
ReservationId reservationID, ReservationDefinition rDef,
Map<ReservationInterval, ReservationRequest> allocations, int start,
int[] alloc) {
Assert.assertEquals(reservationID, rAllocation.getReservationId());
Assert.assertEquals(rDef, rAllocation.getReservationDefinition());
Assert.assertEquals(allocations, rAllocation.getAllocationRequests());
Assert.assertEquals(user, rAllocation.getUser());
Assert.assertEquals(planName, rAllocation.getPlanName());
Assert.assertEquals(start, rAllocation.getStartTime());
Assert.assertEquals(start + alloc.length + 1, rAllocation.getEndTime());
}
private ReservationDefinition createSimpleReservationDefinition(long arrival,
long deadline, long duration) {
// create a request with a single atomic ask
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1,
duration);
ReservationDefinition rDef = new ReservationDefinitionPBImpl();
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setReservationResources(Collections.singletonList(r));
reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
rDef.setReservationRequests(reqs);
rDef.setArrival(arrival);
rDef.setDeadline(deadline);
return rDef;
}
private Map<ReservationInterval, ReservationRequest> generateAllocation(
int startTime, int[] alloc, boolean isStep, boolean isGang) {
Map<ReservationInterval, ReservationRequest> req =
new HashMap<ReservationInterval, ReservationRequest>();
int numContainers = 0;
for (int i = 0; i < alloc.length; i++) {
if (isStep) {
numContainers = alloc[i] + i;
} else {
numContainers = alloc[i];
}
ReservationRequest rr =
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
(numContainers));
if (isGang) {
rr.setConcurrency(numContainers);
}
req.put(new ReservationInterval(startTime + i, startTime + i + 1), rr);
}
return req;
}
}

View File

@ -0,0 +1,169 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestRLESparseResourceAllocation {
private static final Logger LOG = LoggerFactory
.getLogger(TestRLESparseResourceAllocation.class);
@Test
public void testBlocks() {
ResourceCalculator resCalc = new DefaultResourceCalculator();
Resource minAlloc = Resource.newInstance(1, 1);
RLESparseResourceAllocation rleSparseVector =
new RLESparseResourceAllocation(resCalc, minAlloc);
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Set<Entry<ReservationInterval, ReservationRequest>> inputs =
generateAllocation(start, alloc, false).entrySet();
for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
rleSparseVector.addInterval(ip.getKey(), ip.getValue());
}
LOG.info(rleSparseVector.toString());
Assert.assertFalse(rleSparseVector.isEmpty());
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(99));
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + alloc.length + 1));
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
rleSparseVector.getCapacityAtTime(start + i));
}
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
}
LOG.info(rleSparseVector.toString());
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + i));
}
Assert.assertTrue(rleSparseVector.isEmpty());
}
@Test
public void testSteps() {
ResourceCalculator resCalc = new DefaultResourceCalculator();
Resource minAlloc = Resource.newInstance(1, 1);
RLESparseResourceAllocation rleSparseVector =
new RLESparseResourceAllocation(resCalc, minAlloc);
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Set<Entry<ReservationInterval, ReservationRequest>> inputs =
generateAllocation(start, alloc, true).entrySet();
for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
rleSparseVector.addInterval(ip.getKey(), ip.getValue());
}
LOG.info(rleSparseVector.toString());
Assert.assertFalse(rleSparseVector.isEmpty());
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(99));
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + alloc.length + 1));
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
rleSparseVector.getCapacityAtTime(start + i));
}
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
}
LOG.info(rleSparseVector.toString());
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + i));
}
Assert.assertTrue(rleSparseVector.isEmpty());
}
@Test
public void testSkyline() {
ResourceCalculator resCalc = new DefaultResourceCalculator();
Resource minAlloc = Resource.newInstance(1, 1);
RLESparseResourceAllocation rleSparseVector =
new RLESparseResourceAllocation(resCalc, minAlloc);
int[] alloc = { 0, 5, 10, 10, 5, 0 };
int start = 100;
Set<Entry<ReservationInterval, ReservationRequest>> inputs =
generateAllocation(start, alloc, true).entrySet();
for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
rleSparseVector.addInterval(ip.getKey(), ip.getValue());
}
LOG.info(rleSparseVector.toString());
Assert.assertFalse(rleSparseVector.isEmpty());
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(99));
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + alloc.length + 1));
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
rleSparseVector.getCapacityAtTime(start + i));
}
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
for (Entry<ReservationInterval, ReservationRequest> ip : inputs) {
rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
}
LOG.info(rleSparseVector.toString());
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(start + i));
}
Assert.assertTrue(rleSparseVector.isEmpty());
}
@Test
public void testZeroAlloaction() {
ResourceCalculator resCalc = new DefaultResourceCalculator();
Resource minAlloc = Resource.newInstance(1, 1);
RLESparseResourceAllocation rleSparseVector =
new RLESparseResourceAllocation(resCalc, minAlloc);
rleSparseVector.addInterval(new ReservationInterval(0, Long.MAX_VALUE),
ReservationRequest.newInstance(Resource.newInstance(0, 0), (0)));
LOG.info(rleSparseVector.toString());
Assert.assertEquals(Resource.newInstance(0, 0),
rleSparseVector.getCapacityAtTime(new Random().nextLong()));
Assert.assertTrue(rleSparseVector.isEmpty());
}
private Map<ReservationInterval, ReservationRequest> generateAllocation(
int startTime, int[] alloc, boolean isStep) {
Map<ReservationInterval, ReservationRequest> req =
new HashMap<ReservationInterval, ReservationRequest>();
int numContainers = 0;
for (int i = 0; i < alloc.length; i++) {
if (isStep) {
numContainers = alloc[i] + i;
} else {
numContainers = alloc[i];
}
req.put(new ReservationInterval(startTime + i, startTime + i + 1),
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
(numContainers)));
}
return req;
}
}