diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt index a7c08a02cca..410d974cce0 100644 --- a/YARN-1051-CHANGES.txt +++ b/YARN-1051-CHANGES.txt @@ -5,3 +5,6 @@ YARN-2475. Logic for responding to capacity drops for the ReservationSystem. (Carlo Curino and Subru Krishnan via curino) YARN-1708. Public YARN APIs for creating/updating/deleting reservations. (subru) + +YARN-1709. In-memory data structures used to track resources over time to +enable reservations. (subru) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java new file mode 100644 index 00000000000..99231c44c5c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java @@ -0,0 +1,507 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.UTCClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class InMemoryPlan implements Plan { + + private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class); + + private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); + + private TreeMap> currentReservations = + new TreeMap>(); + + private RLESparseResourceAllocation rleSparseVector; + + private Map userResourceAlloc = + new HashMap(); + + private Map reservationTable = + new HashMap(); + + private final ReentrantReadWriteLock readWriteLock = + new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + private final SharingPolicy policy; + private final ReservationAgent agent; + private final long step; + private final ResourceCalculator resCalc; + private final Resource minAlloc, maxAlloc; + private final String queueName; + private final QueueMetrics queueMetrics; + private final Planner replanner; + private final boolean getMoveOnExpiry; + private final Clock clock; + + private Resource totalCapacity; + + InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, + ReservationAgent agent, Resource totalCapacity, long step, + ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, + String queueName, Planner replanner, boolean getMoveOnExpiry) { + this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc, + maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock()); + } + + InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, + ReservationAgent agent, Resource totalCapacity, long step, + ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, + String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) { + this.queueMetrics = queueMetrics; + this.policy = policy; + this.agent = agent; + this.step = step; + this.totalCapacity = totalCapacity; + this.resCalc = resCalc; + this.minAlloc = minAlloc; + this.maxAlloc = maxAlloc; + this.rleSparseVector = new RLESparseResourceAllocation(resCalc, minAlloc); + this.queueName = queueName; + this.replanner = replanner; + this.getMoveOnExpiry = getMoveOnExpiry; + this.clock = clock; + } + + @Override + public QueueMetrics getQueueMetrics() { + return queueMetrics; + } + + private void incrementAllocation(ReservationAllocation reservation) { + assert (readWriteLock.isWriteLockedByCurrentThread()); + Map allocationRequests = + reservation.getAllocationRequests(); + // check if we have encountered the user earlier and if not add an entry + String user = reservation.getUser(); + RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user); + if (resAlloc == null) { + resAlloc = new RLESparseResourceAllocation(resCalc, minAlloc); + userResourceAlloc.put(user, resAlloc); + } + for (Map.Entry r : allocationRequests + .entrySet()) { + resAlloc.addInterval(r.getKey(), r.getValue()); + rleSparseVector.addInterval(r.getKey(), r.getValue()); + } + } + + private void decrementAllocation(ReservationAllocation reservation) { + assert (readWriteLock.isWriteLockedByCurrentThread()); + Map allocationRequests = + reservation.getAllocationRequests(); + String user = reservation.getUser(); + RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user); + for (Map.Entry r : allocationRequests + .entrySet()) { + resAlloc.removeInterval(r.getKey(), r.getValue()); + rleSparseVector.removeInterval(r.getKey(), r.getValue()); + } + if (resAlloc.isEmpty()) { + userResourceAlloc.remove(resAlloc); + } + } + + public Set getAllReservations() { + readLock.lock(); + try { + if (currentReservations != null) { + Set flattenedReservations = + new HashSet(); + for (Set reservationEntries : currentReservations + .values()) { + flattenedReservations.addAll(reservationEntries); + } + return flattenedReservations; + } else { + return null; + } + } finally { + readLock.unlock(); + } + } + + @Override + public boolean addReservation(ReservationAllocation reservation) + throws PlanningException { + // Verify the allocation is memory based otherwise it is not supported + InMemoryReservationAllocation inMemReservation = + (InMemoryReservationAllocation) reservation; + if (inMemReservation.getUser() == null) { + String errMsg = + "The specified Reservation with ID " + + inMemReservation.getReservationId() + + " is not mapped to any user"; + LOG.error(errMsg); + throw new IllegalArgumentException(errMsg); + } + writeLock.lock(); + try { + if (reservationTable.containsKey(inMemReservation.getReservationId())) { + String errMsg = + "The specified Reservation with ID " + + inMemReservation.getReservationId() + " already exists"; + LOG.error(errMsg); + throw new IllegalArgumentException(errMsg); + } + // Validate if we can accept this reservation, throws exception if + // validation fails + policy.validate(this, inMemReservation); + // we record here the time in which the allocation has been accepted + reservation.setAcceptanceTimestamp(clock.getTime()); + ReservationInterval searchInterval = + new ReservationInterval(inMemReservation.getStartTime(), + inMemReservation.getEndTime()); + Set reservations = + currentReservations.get(searchInterval); + if (reservations == null) { + reservations = new HashSet(); + } + if (!reservations.add(inMemReservation)) { + LOG.error("Unable to add reservation: {} to plan.", + inMemReservation.getReservationId()); + return false; + } + currentReservations.put(searchInterval, reservations); + reservationTable.put(inMemReservation.getReservationId(), + inMemReservation); + incrementAllocation(inMemReservation); + LOG.info("Sucessfully added reservation: {} to plan.", + inMemReservation.getReservationId()); + return true; + } finally { + writeLock.unlock(); + } + } + + @Override + public boolean updateReservation(ReservationAllocation reservation) + throws PlanningException { + writeLock.lock(); + boolean result = false; + try { + ReservationId resId = reservation.getReservationId(); + ReservationAllocation currReservation = getReservationById(resId); + if (currReservation == null) { + String errMsg = + "The specified Reservation with ID " + resId + + " does not exist in the plan"; + LOG.error(errMsg); + throw new IllegalArgumentException(errMsg); + } + if (!removeReservation(currReservation)) { + LOG.error("Unable to replace reservation: {} from plan.", + reservation.getReservationId()); + return result; + } + try { + result = addReservation(reservation); + } catch (PlanningException e) { + LOG.error("Unable to update reservation: {} from plan due to {}.", + reservation.getReservationId(), e.getMessage()); + } + if (result) { + LOG.info("Sucessfully updated reservation: {} in plan.", + reservation.getReservationId()); + return result; + } else { + // rollback delete + addReservation(currReservation); + LOG.info("Rollbacked update reservation: {} from plan.", + reservation.getReservationId()); + return result; + } + } finally { + writeLock.unlock(); + } + } + + private boolean removeReservation(ReservationAllocation reservation) { + assert (readWriteLock.isWriteLockedByCurrentThread()); + ReservationInterval searchInterval = + new ReservationInterval(reservation.getStartTime(), + reservation.getEndTime()); + Set reservations = + currentReservations.get(searchInterval); + if (reservations != null) { + if (!reservations.remove(reservation)) { + LOG.error("Unable to remove reservation: {} from plan.", + reservation.getReservationId()); + return false; + } + if (reservations.isEmpty()) { + currentReservations.remove(searchInterval); + } + } else { + String errMsg = + "The specified Reservation with ID " + reservation.getReservationId() + + " does not exist in the plan"; + LOG.error(errMsg); + throw new IllegalArgumentException(errMsg); + } + reservationTable.remove(reservation.getReservationId()); + decrementAllocation(reservation); + LOG.info("Sucessfully deleted reservation: {} in plan.", + reservation.getReservationId()); + return true; + } + + @Override + public boolean deleteReservation(ReservationId reservationID) { + writeLock.lock(); + try { + ReservationAllocation reservation = getReservationById(reservationID); + if (reservation == null) { + String errMsg = + "The specified Reservation with ID " + reservationID + + " does not exist in the plan"; + LOG.error(errMsg); + throw new IllegalArgumentException(errMsg); + } + return removeReservation(reservation); + } finally { + writeLock.unlock(); + } + } + + @Override + public void archiveCompletedReservations(long tick) { + // Since we are looking for old reservations, read lock is optimal + LOG.debug("Running archival at time: {}", tick); + readLock.lock(); + List expiredReservations = + new ArrayList(); + // archive reservations and delete the ones which are beyond + // the reservation policy "window" + try { + long archivalTime = tick - policy.getValidWindow(); + ReservationInterval searchInterval = + new ReservationInterval(archivalTime, archivalTime); + SortedMap> reservations = + currentReservations.headMap(searchInterval, true); + if (!reservations.isEmpty()) { + for (Set reservationEntries : reservations + .values()) { + for (InMemoryReservationAllocation reservation : reservationEntries) { + if (reservation.getEndTime() <= archivalTime) { + expiredReservations.add(reservation); + } + } + } + } + } finally { + readLock.unlock(); + } + if (expiredReservations.isEmpty()) { + return; + } + // Need write lock only if there are any reservations to be deleted + writeLock.lock(); + try { + for (InMemoryReservationAllocation expiredReservation : expiredReservations) { + removeReservation(expiredReservation); + } + } finally { + writeLock.unlock(); + } + } + + @Override + public Set getReservationsAtTime(long tick) { + readLock.lock(); + ReservationInterval searchInterval = + new ReservationInterval(tick, Long.MAX_VALUE); + try { + SortedMap> reservations = + currentReservations.headMap(searchInterval, true); + if (!reservations.isEmpty()) { + Set flattenedReservations = + new HashSet(); + for (Set reservationEntries : reservations + .values()) { + for (InMemoryReservationAllocation reservation : reservationEntries) { + if (reservation.getEndTime() > tick) { + flattenedReservations.add(reservation); + } + } + } + return Collections.unmodifiableSet(flattenedReservations); + } else { + return Collections.emptySet(); + } + } finally { + readLock.unlock(); + } + } + + @Override + public long getStep() { + return step; + } + + @Override + public SharingPolicy getSharingPolicy() { + return policy; + } + + @Override + public ReservationAgent getReservationAgent() { + return agent; + } + + @Override + public Resource getConsumptionForUser(String user, long t) { + readLock.lock(); + try { + RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user); + if (userResAlloc != null) { + return userResAlloc.getCapacityAtTime(t); + } else { + return Resources.clone(ZERO_RESOURCE); + } + } finally { + readLock.unlock(); + } + } + + @Override + public Resource getTotalCommittedResources(long t) { + readLock.lock(); + try { + return rleSparseVector.getCapacityAtTime(t); + } finally { + readLock.unlock(); + } + } + + @Override + public ReservationAllocation getReservationById(ReservationId reservationID) { + if (reservationID == null) { + return null; + } + readLock.lock(); + try { + return reservationTable.get(reservationID); + } finally { + readLock.unlock(); + } + } + + @Override + public Resource getTotalCapacity() { + readLock.lock(); + try { + return Resources.clone(totalCapacity); + } finally { + readLock.unlock(); + } + } + + @Override + public Resource getMinimumAllocation() { + return Resources.clone(minAlloc); + } + + @Override + public void setTotalCapacity(Resource cap) { + writeLock.lock(); + try { + totalCapacity = Resources.clone(cap); + } finally { + writeLock.unlock(); + } + } + + public long getEarliestStartTime() { + readLock.lock(); + try { + return rleSparseVector.getEarliestStartTime(); + } finally { + readLock.unlock(); + } + } + + @Override + public long getLastEndTime() { + readLock.lock(); + try { + return rleSparseVector.getLatestEndTime(); + } finally { + readLock.unlock(); + } + } + + @Override + public ResourceCalculator getResourceCalculator() { + return resCalc; + } + + @Override + public String getQueueName() { + return queueName; + } + + @Override + public Resource getMaximumAllocation() { + return Resources.clone(maxAlloc); + } + + public String toCumulativeString() { + readLock.lock(); + try { + return rleSparseVector.toString(); + } finally { + readLock.unlock(); + } + } + + @Override + public Planner getReplanner() { + return replanner; + } + + @Override + public boolean getMoveOnExpiry() { + return getMoveOnExpiry; + } + + @Override + public String toString() { + readLock.lock(); + try { + StringBuffer planStr = new StringBuffer("In-memory Plan: "); + planStr.append("Parent Queue: ").append(queueName) + .append("Total Capacity: ").append(totalCapacity).append("Step: ") + .append(step); + for (ReservationAllocation reservation : getAllReservations()) { + planStr.append(reservation); + } + return planStr.toString(); + } finally { + readLock.unlock(); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java new file mode 100644 index 00000000000..10cc55f9c0c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java @@ -0,0 +1,151 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Collections; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * An in memory implementation of a reservation allocation using the + * {@link RLESparseResourceAllocation} + * + */ +class InMemoryReservationAllocation implements ReservationAllocation { + + private final String planName; + private final ReservationId reservationID; + private final String user; + private final ReservationDefinition contract; + private final long startTime; + private final long endTime; + private final Map allocationRequests; + private boolean hasGang = false; + private long acceptedAt = -1; + + private RLESparseResourceAllocation resourcesOverTime; + + InMemoryReservationAllocation(ReservationId reservationID, + ReservationDefinition contract, String user, String planName, + long startTime, long endTime, + Map allocationRequests, + ResourceCalculator calculator, Resource minAlloc) { + this.contract = contract; + this.startTime = startTime; + this.endTime = endTime; + this.reservationID = reservationID; + this.user = user; + this.allocationRequests = allocationRequests; + this.planName = planName; + resourcesOverTime = new RLESparseResourceAllocation(calculator, minAlloc); + for (Map.Entry r : allocationRequests + .entrySet()) { + resourcesOverTime.addInterval(r.getKey(), r.getValue()); + if (r.getValue().getConcurrency() > 1) { + hasGang = true; + } + } + } + + @Override + public ReservationId getReservationId() { + return reservationID; + } + + @Override + public ReservationDefinition getReservationDefinition() { + return contract; + } + + @Override + public long getStartTime() { + return startTime; + } + + @Override + public long getEndTime() { + return endTime; + } + + @Override + public Map getAllocationRequests() { + return Collections.unmodifiableMap(allocationRequests); + } + + @Override + public String getPlanName() { + return planName; + } + + @Override + public String getUser() { + return user; + } + + @Override + public boolean containsGangs() { + return hasGang; + } + + @Override + public void setAcceptanceTimestamp(long acceptedAt) { + this.acceptedAt = acceptedAt; + } + + @Override + public long getAcceptanceTime() { + return acceptedAt; + } + + @Override + public Resource getResourcesAtTime(long tick) { + if (tick < startTime || tick >= endTime) { + return Resource.newInstance(0, 0); + } + return Resources.clone(resourcesOverTime.getCapacityAtTime(tick)); + } + + @Override + public String toString() { + StringBuilder sBuf = new StringBuilder(); + sBuf.append(getReservationId()).append(" user:").append(getUser()) + .append(" startTime: ").append(getStartTime()).append(" endTime: ") + .append(getEndTime()).append(" alloc:[") + .append(resourcesOverTime.toString()).append("] "); + return sBuf.toString(); + } + + @Override + public int compareTo(ReservationAllocation other) { + // reverse order of acceptance + if (this.getAcceptanceTime() > other.getAcceptanceTime()) { + return -1; + } + if (this.getAcceptanceTime() < other.getAcceptanceTime()) { + return 1; + } + return 0; + } + + @Override + public int hashCode() { + return reservationID.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + InMemoryReservationAllocation other = (InMemoryReservationAllocation) obj; + return this.reservationID.equals(other.getReservationId()); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java new file mode 100644 index 00000000000..cf2aed78eef --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java @@ -0,0 +1,32 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +/** + * A Plan represents the central data structure of a reservation system that + * maintains the "agenda" for the cluster. In particular, it maintains + * information on how a set of {@link ReservationDefinition} that have been + * previously accepted will be honored. + * + * {@link ReservationDefinition} submitted by the users through the RM public + * APIs are passed to appropriate {@link ReservationAgent}s, which in turn will + * consult the Plan (via the {@link PlanView} interface) and try to determine + * whether there are sufficient resources available in this Plan to satisfy the + * temporal and resource constraints of a {@link ReservationDefinition}. If a + * valid allocation is found the agent will try to store it in the plan (via the + * {@link PlanEdit} interface). Upon success the system return to the user a + * positive acknowledgment, and a reservation identifier to be later used to + * access the reserved resources. + * + * A {@link PlanFollower} will continuously read from the Plan and will + * affect the instantaneous allocation of resources among jobs running by + * publishing the "current" slice of the Plan to the underlying scheduler. I.e., + * the configuration of queues/weights of the scheduler are modified to reflect + * the allocations in the Plan. + * + * As this interface have several methods we decompose them into three groups: + * {@link PlanContext}: containing configuration type information, + * {@link PlanView} read-only access to the plan state, and {@link PlanEdit} + * write access to the plan state. + */ +public interface Plan extends PlanContext, PlanView, PlanEdit { + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java new file mode 100644 index 00000000000..40a25a6ff62 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java @@ -0,0 +1,101 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + +/** + * This interface provides read-only access to configuration-type parameter for + * a plan. + * + */ +public interface PlanContext { + + /** + * Returns the configured "step" or granularity of time of the plan in millis. + * + * @return plan step in millis + */ + public long getStep(); + + /** + * Return the {@link ReservationAgent} configured for this plan that is + * responsible for optimally placing various reservation requests + * + * @return the {@link ReservationAgent} configured for this plan + */ + public ReservationAgent getReservationAgent(); + + /** + * Return an instance of a {@link Planner}, which will be invoked in response + * to unexpected reduction in the resources of this plan + * + * @return an instance of a {@link Planner}, which will be invoked in response + * to unexpected reduction in the resources of this plan + */ + public Planner getReplanner(); + + /** + * Return the configured {@link SharingPolicy} that governs the sharing of the + * resources of the plan between its various users + * + * @return the configured {@link SharingPolicy} that governs the sharing of + * the resources of the plan between its various users + */ + public SharingPolicy getSharingPolicy(); + + /** + * Returns the system {@link ResourceCalculator} + * + * @return the system {@link ResourceCalculator} + */ + public ResourceCalculator getResourceCalculator(); + + /** + * Returns the single smallest {@link Resource} allocation that can be + * reserved in this plan + * + * @return the single smallest {@link Resource} allocation that can be + * reserved in this plan + */ + public Resource getMinimumAllocation(); + + /** + * Returns the single largest {@link Resource} allocation that can be reserved + * in this plan + * + * @return the single largest {@link Resource} allocation that can be reserved + * in this plan + */ + public Resource getMaximumAllocation(); + + /** + * Return the name of the queue in the {@link ResourceScheduler} corresponding + * to this plan + * + * @return the name of the queue in the {@link ResourceScheduler} + * corresponding to this plan + */ + public String getQueueName(); + + /** + * Return the {@link QueueMetrics} for the queue in the + * {@link ResourceScheduler} corresponding to this plan + * + * @return the {@link QueueMetrics} for the queue in the + * {@link ResourceScheduler} corresponding to this plan + */ + public QueueMetrics getQueueMetrics(); + + /** + * Instructs the {@link PlanFollower} on what to do for applications + * which are still running when the reservation is expiring (move-to-default + * vs kill) + * + * @return true if remaining applications have to be killed, false if they + * have to migrated + */ + public boolean getMoveOnExpiry(); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java new file mode 100644 index 00000000000..648edba4a5e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java @@ -0,0 +1,61 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; + +/** + * This interface groups the methods used to modify the state of a Plan. + */ +public interface PlanEdit extends PlanContext, PlanView { + + /** + * Add a new {@link ReservationAllocation} to the plan + * + * @param reservation the {@link ReservationAllocation} to be added to the + * plan + * @return true if addition is successful, false otherwise + */ + public boolean addReservation(ReservationAllocation reservation) + throws PlanningException; + + /** + * Updates an existing {@link ReservationAllocation} in the plan. This is + * required for re-negotiation + * + * @param reservation the {@link ReservationAllocation} to be updated the plan + * @return true if update is successful, false otherwise + */ + public boolean updateReservation(ReservationAllocation reservation) + throws PlanningException; + + /** + * Delete an existing {@link ReservationAllocation} from the plan identified + * uniquely by its {@link ReservationId}. This will generally be used for + * garbage collection + * + * @param reservation the {@link ReservationAllocation} to be deleted from the + * plan identified uniquely by its {@link ReservationId} + * @return true if delete is successful, false otherwise + */ + public boolean deleteReservation(ReservationId reservationID) + throws PlanningException; + + /** + * Method invoked to garbage collect old reservations. It cleans up expired + * reservations that have fallen out of the sliding archival window + * + * @param tick the current time from which the archival window is computed + */ + public void archiveCompletedReservations(long tick) throws PlanningException; + + /** + * Sets the overall capacity in terms of {@link Resource} assigned to this + * plan + * + * @param capacity the overall capacity in terms of {@link Resource} assigned + * to this plan + */ + public void setTotalCapacity(Resource capacity); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java new file mode 100644 index 00000000000..6e58dde5f7f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java @@ -0,0 +1,89 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; + +/** + * This interface provides a read-only view on the allocations made in this + * plan. This methods are used for example by {@link ReservationAgent}s to + * determine the free resources in a certain point in time, and by + * PlanFollowerPolicy to publish this plan to the scheduler. + */ +public interface PlanView extends PlanContext { + + /** + * Return a {@link ReservationAllocation} identified by its + * {@link ReservationId} + * + * @param reservationID the unique id to identify the + * {@link ReservationAllocation} + * @return {@link ReservationAllocation} identified by the specified id + */ + public ReservationAllocation getReservationById(ReservationId reservationID); + + /** + * Gets all the active reservations at the specified point of time + * + * @param tick the time (UTC in ms) for which the active reservations are + * requested + * @return set of active reservations at the specified time + */ + public Set getReservationsAtTime(long tick); + + /** + * Gets all the reservations in the plan + * + * @return set of all reservations handled by this Plan + */ + public Set getAllReservations(); + + /** + * Returns the total {@link Resource} reserved for all users at the specified + * time + * + * @param tick the time (UTC in ms) for which the reserved resources are + * requested + * @return the total {@link Resource} reserved for all users at the specified + * time + */ + public Resource getTotalCommittedResources(long tick); + + /** + * Returns the total {@link Resource} reserved for a given user at the + * specified time + * + * @param user the user who made the reservation(s) + * @param tick the time (UTC in ms) for which the reserved resources are + * requested + * @return the total {@link Resource} reserved for a given user at the + * specified time + */ + public Resource getConsumptionForUser(String user, long tick); + + /** + * Returns the overall capacity in terms of {@link Resource} assigned to this + * plan (typically will correspond to the absolute capacity of the + * corresponding queue). + * + * @return the overall capacity in terms of {@link Resource} assigned to this + * plan + */ + public Resource getTotalCapacity(); + + /** + * Gets the time (UTC in ms) at which the first reservation starts + * + * @return the time (UTC in ms) at which the first reservation starts + */ + public long getEarliestStartTime(); + + /** + * Returns the time (UTC in ms) at which the last reservation terminates + * + * @return the time (UTC in ms) at which the last reservation terminates + */ + public long getLastEndTime(); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java new file mode 100644 index 00000000000..fa8db302779 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java @@ -0,0 +1,332 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.gson.stream.JsonWriter; + +/** + * This is a run length encoded sparse data structure that maintains resource + * allocations over time + */ +public class RLESparseResourceAllocation { + + private static final int THRESHOLD = 100; + private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); + + private TreeMap cumulativeCapacity = + new TreeMap(); + + private final ReentrantReadWriteLock readWriteLock = + new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + + private final ResourceCalculator resourceCalculator; + private final Resource minAlloc; + + public RLESparseResourceAllocation(ResourceCalculator resourceCalculator, + Resource minAlloc) { + this.resourceCalculator = resourceCalculator; + this.minAlloc = minAlloc; + } + + private boolean isSameAsPrevious(Long key, Resource capacity) { + Entry previous = cumulativeCapacity.lowerEntry(key); + return (previous != null && previous.getValue().equals(capacity)); + } + + private boolean isSameAsNext(Long key, Resource capacity) { + Entry next = cumulativeCapacity.higherEntry(key); + return (next != null && next.getValue().equals(capacity)); + } + + /** + * Add a resource for the specified interval + * + * @param reservationInterval the interval for which the resource is to be + * added + * @param capacity the resource to be added + * @return true if addition is successful, false otherwise + */ + public boolean addInterval(ReservationInterval reservationInterval, + ReservationRequest capacity) { + Resource totCap = + Resources.multiply(capacity.getCapability(), + (float) capacity.getNumContainers()); + if (totCap.equals(ZERO_RESOURCE)) { + return true; + } + writeLock.lock(); + try { + long startKey = reservationInterval.getStartTime(); + long endKey = reservationInterval.getEndTime(); + NavigableMap ticks = + cumulativeCapacity.headMap(endKey, false); + if (ticks != null && !ticks.isEmpty()) { + Resource updatedCapacity = Resource.newInstance(0, 0); + Entry lowEntry = ticks.floorEntry(startKey); + if (lowEntry == null) { + // This is the earliest starting interval + cumulativeCapacity.put(startKey, totCap); + } else { + updatedCapacity = Resources.add(lowEntry.getValue(), totCap); + // Add a new tick only if the updated value is different + // from the previous tick + if ((startKey == lowEntry.getKey()) + && (isSameAsPrevious(lowEntry.getKey(), updatedCapacity))) { + cumulativeCapacity.remove(lowEntry.getKey()); + } else { + cumulativeCapacity.put(startKey, updatedCapacity); + } + } + // Increase all the capacities of overlapping intervals + Set> overlapSet = + ticks.tailMap(startKey, false).entrySet(); + for (Entry entry : overlapSet) { + updatedCapacity = Resources.add(entry.getValue(), totCap); + entry.setValue(updatedCapacity); + } + } else { + // This is the first interval to be added + cumulativeCapacity.put(startKey, totCap); + } + Resource nextTick = cumulativeCapacity.get(endKey); + if (nextTick != null) { + // If there is overlap, remove the duplicate entry + if (isSameAsPrevious(endKey, nextTick)) { + cumulativeCapacity.remove(endKey); + } + } else { + // Decrease capacity as this is end of the interval + cumulativeCapacity.put(endKey, Resources.subtract(cumulativeCapacity + .floorEntry(endKey).getValue(), totCap)); + } + return true; + } finally { + writeLock.unlock(); + } + } + + /** + * Add multiple resources for the specified interval + * + * @param reservationInterval the interval for which the resource is to be + * added + * @param ReservationRequests the resources to be added + * @param clusterResource the total resources in the cluster + * @return true if addition is successful, false otherwise + */ + public boolean addCompositeInterval(ReservationInterval reservationInterval, + List ReservationRequests, Resource clusterResource) { + ReservationRequest aggregateReservationRequest = + Records.newRecord(ReservationRequest.class); + Resource capacity = Resource.newInstance(0, 0); + for (ReservationRequest ReservationRequest : ReservationRequests) { + Resources.addTo(capacity, Resources.multiply( + ReservationRequest.getCapability(), + ReservationRequest.getNumContainers())); + } + aggregateReservationRequest.setNumContainers((int) Math.ceil(Resources + .divide(resourceCalculator, clusterResource, capacity, minAlloc))); + aggregateReservationRequest.setCapability(minAlloc); + + return addInterval(reservationInterval, aggregateReservationRequest); + } + + /** + * Removes a resource for the specified interval + * + * @param reservationInterval the interval for which the resource is to be + * removed + * @param capacity the resource to be removed + * @return true if removal is successful, false otherwise + */ + public boolean removeInterval(ReservationInterval reservationInterval, + ReservationRequest capacity) { + Resource totCap = + Resources.multiply(capacity.getCapability(), + (float) capacity.getNumContainers()); + if (totCap.equals(ZERO_RESOURCE)) { + return true; + } + writeLock.lock(); + try { + long startKey = reservationInterval.getStartTime(); + long endKey = reservationInterval.getEndTime(); + // update the start key + NavigableMap ticks = + cumulativeCapacity.headMap(endKey, false); + // Decrease all the capacities of overlapping intervals + SortedMap overlapSet = ticks.tailMap(startKey); + if (overlapSet != null && !overlapSet.isEmpty()) { + Resource updatedCapacity = Resource.newInstance(0, 0); + long currentKey = -1; + for (Iterator> overlapEntries = + overlapSet.entrySet().iterator(); overlapEntries.hasNext();) { + Entry entry = overlapEntries.next(); + currentKey = entry.getKey(); + updatedCapacity = Resources.subtract(entry.getValue(), totCap); + // update each entry between start and end key + cumulativeCapacity.put(currentKey, updatedCapacity); + } + // Remove the first overlap entry if it is same as previous after + // updation + Long firstKey = overlapSet.firstKey(); + if (isSameAsPrevious(firstKey, overlapSet.get(firstKey))) { + cumulativeCapacity.remove(firstKey); + } + // Remove the next entry if it is same as end entry after updation + if ((currentKey != -1) && (isSameAsNext(currentKey, updatedCapacity))) { + cumulativeCapacity.remove(cumulativeCapacity.higherKey(currentKey)); + } + } + return true; + } finally { + writeLock.unlock(); + } + } + + /** + * Returns the capacity, i.e. total resources allocated at the specified point + * of time + * + * @param tick the time (UTC in ms) at which the capacity is requested + * @return the resources allocated at the specified time + */ + public Resource getCapacityAtTime(long tick) { + readLock.lock(); + try { + Entry closestStep = cumulativeCapacity.floorEntry(tick); + if (closestStep != null) { + return Resources.clone(closestStep.getValue()); + } + return Resources.clone(ZERO_RESOURCE); + } finally { + readLock.unlock(); + } + } + + /** + * Get the timestamp of the earliest resource allocation + * + * @return the timestamp of the first resource allocation + */ + public long getEarliestStartTime() { + readLock.lock(); + try { + if (cumulativeCapacity.isEmpty()) { + return -1; + } else { + return cumulativeCapacity.firstKey(); + } + } finally { + readLock.unlock(); + } + } + + /** + * Get the timestamp of the latest resource allocation + * + * @return the timestamp of the last resource allocation + */ + public long getLatestEndTime() { + readLock.lock(); + try { + if (cumulativeCapacity.isEmpty()) { + return -1; + } else { + return cumulativeCapacity.lastKey(); + } + } finally { + readLock.unlock(); + } + } + + /** + * Returns true if there are no non-zero entries + * + * @return true if there are no allocations or false otherwise + */ + public boolean isEmpty() { + readLock.lock(); + try { + if (cumulativeCapacity.isEmpty()) { + return true; + } + // Deletion leaves a single zero entry so check for that + if (cumulativeCapacity.size() == 1) { + return cumulativeCapacity.firstEntry().getValue().equals(ZERO_RESOURCE); + } + return false; + } finally { + readLock.unlock(); + } + } + + @Override + public String toString() { + StringBuilder ret = new StringBuilder(); + readLock.lock(); + try { + if (cumulativeCapacity.size() > THRESHOLD) { + ret.append("Number of steps: ").append(cumulativeCapacity.size()) + .append(" earliest entry: ").append(cumulativeCapacity.firstKey()) + .append(" latest entry: ").append(cumulativeCapacity.lastKey()); + } else { + for (Map.Entry r : cumulativeCapacity.entrySet()) { + ret.append(r.getKey()).append(": ").append(r.getValue()) + .append("\n "); + } + } + return ret.toString(); + } finally { + readLock.unlock(); + } + } + + /** + * Returns the JSON string representation of the current resources allocated + * over time + * + * @return the JSON string representation of the current resources allocated + * over time + */ + public String toMemJSONString() { + StringWriter json = new StringWriter(); + JsonWriter jsonWriter = new JsonWriter(json); + readLock.lock(); + try { + jsonWriter.beginObject(); + // jsonWriter.name("timestamp").value("resource"); + for (Map.Entry r : cumulativeCapacity.entrySet()) { + jsonWriter.name(r.getKey().toString()).value(r.getValue().toString()); + } + jsonWriter.endObject(); + jsonWriter.close(); + return json.toString(); + } catch (IOException e) { + // This should not happen + return ""; + } finally { + readLock.unlock(); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java new file mode 100644 index 00000000000..bca3aa82883 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java @@ -0,0 +1,104 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; + +/** + * A ReservationAllocation represents a concrete allocation of resources over + * time that satisfy a certain {@link ReservationDefinition}. This is used + * internally by a {@link Plan} to store information about how each of the + * accepted {@link ReservationDefinition} have been allocated. + */ +public interface ReservationAllocation extends + Comparable { + + /** + * Returns the unique identifier {@link ReservationId} that represents the + * reservation + * + * @return reservationId the unique identifier {@link ReservationId} that + * represents the reservation + */ + public ReservationId getReservationId(); + + /** + * Returns the original {@link ReservationDefinition} submitted by the client + * + * @return + */ + public ReservationDefinition getReservationDefinition(); + + /** + * Returns the time at which the reservation is activated + * + * @return the time at which the reservation is activated + */ + public long getStartTime(); + + /** + * Returns the time at which the reservation terminates + * + * @return the time at which the reservation terminates + */ + public long getEndTime(); + + /** + * Returns the map of resources requested against the time interval for which + * they were + * + * @return the allocationRequests the map of resources requested against the + * time interval for which they were + */ + public Map getAllocationRequests(); + + /** + * Return a string identifying the plan to which the reservation belongs + * + * @return the plan to which the reservation belongs + */ + public String getPlanName(); + + /** + * Returns the user who requested the reservation + * + * @return the user who requested the reservation + */ + public String getUser(); + + /** + * Returns whether the reservation has gang semantics or not + * + * @return true if there is a gang request, false otherwise + */ + public boolean containsGangs(); + + /** + * Sets the time at which the reservation was accepted by the system + * + * @param acceptedAt the time at which the reservation was accepted by the + * system + */ + public void setAcceptanceTimestamp(long acceptedAt); + + /** + * Returns the time at which the reservation was accepted by the system + * + * @return the time at which the reservation was accepted by the system + */ + public long getAcceptanceTime(); + + /** + * Returns the capacity represented by cumulative resources reserved by the + * reservation at the specified point of time + * + * @param tick the time (UTC in ms) for which the reserved resources are + * requested + * @return the resources reserved at the specified time + */ + public Resource getResourcesAtTime(long tick); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInterval.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInterval.java new file mode 100644 index 00000000000..d3a6d516c11 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInterval.java @@ -0,0 +1,67 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +/** + * This represents the time duration of the reservation + * + */ +public class ReservationInterval implements Comparable { + + private final long startTime; + + private final long endTime; + + public ReservationInterval(long startTime, long endTime) { + this.startTime = startTime; + this.endTime = endTime; + } + + /** + * Get the start time of the reservation interval + * + * @return the startTime + */ + public long getStartTime() { + return startTime; + } + + /** + * Get the end time of the reservation interval + * + * @return the endTime + */ + public long getEndTime() { + return endTime; + } + + /** + * Returns whether the interval is active at the specified instant of time + * + * @param tick the instance of the time to check + * @return true if active, false otherwise + */ + public boolean isOverlap(long tick) { + return (startTime <= tick && tick <= endTime); + } + + @Override + public int compareTo(ReservationInterval anotherInterval) { + long diff = 0; + if (startTime == anotherInterval.getStartTime()) { + diff = endTime - anotherInterval.getEndTime(); + } else { + diff = startTime - anotherInterval.getStartTime(); + } + if (diff < 0) { + return -1; + } else if (diff > 0) { + return 1; + } else { + return 0; + } + } + + public String toString() { + return "[" + startTime + ", " + endTime + "]"; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java new file mode 100644 index 00000000000..aa9e9fbb11f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java @@ -0,0 +1,25 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions; + +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; + +/** + * Exception thrown by the admission control subsystem when there is a problem + * in trying to find an allocation for a user {@link ReservationSubmissionRequest}. + */ +public class PlanningException extends Exception { + + private static final long serialVersionUID = -684069387367879218L; + + public PlanningException(String message) { + super(message); + } + + public PlanningException(Throwable cause) { + super(cause); + } + + public PlanningException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java new file mode 100644 index 00000000000..cbca6dc5c2b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -0,0 +1,210 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.junit.Assert; +import org.mockito.Mockito; + +public class ReservationSystemTestUtil { + + private static Random rand = new Random(); + + public final static String reservationQ = "dedicated"; + + public static ReservationId getNewReservationId() { + return ReservationId.newInstance(rand.nextLong(), rand.nextLong()); + } + + public CapacityScheduler mockCapacityScheduler(int numContainers) + throws IOException { + // stolen from TestCapacityScheduler + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + + CapacityScheduler cs = Mockito.spy(new CapacityScheduler()); + cs.setConf(new YarnConfiguration()); + RMContext mockRmContext = + Mockito.spy(new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null)); + cs.setRMContext(mockRmContext); + try { + cs.serviceInit(conf); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + when(mockRmContext.getScheduler()).thenReturn(cs); + Resource r = Resource.newInstance(numContainers * 1024, numContainers); + doReturn(r).when(cs).getClusterResource(); + return cs; + } + + public static void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { + // Define default queue + final String defQ = CapacitySchedulerConfiguration.ROOT + ".default"; + conf.setCapacity(defQ, 10); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { + "default", "a", reservationQ }); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 10); + + final String dedicated = + CapacitySchedulerConfiguration.ROOT + + CapacitySchedulerConfiguration.DOT + reservationQ; + conf.setCapacity(dedicated, 80); + // Set as reservation queue + conf.setReservableQueue(dedicated, true); + + // Define 2nd-level queues + final String A1 = A + ".a1"; + final String A2 = A + ".a2"; + conf.setQueues(A, new String[] { "a1", "a2" }); + conf.setCapacity(A1, 30); + conf.setCapacity(A2, 70); + } + + public String getFullReservationQueueName() { + return CapacitySchedulerConfiguration.ROOT + + CapacitySchedulerConfiguration.DOT + reservationQ; + } + + public String getreservationQueueName() { + return reservationQ; + } + + public void updateQueueConfiguration(CapacitySchedulerConfiguration conf, + String newQ) { + // Define default queue + final String prefix = + CapacitySchedulerConfiguration.ROOT + + CapacitySchedulerConfiguration.DOT; + final String defQ = prefix + "default"; + conf.setCapacity(defQ, 5); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { + "default", "a", reservationQ, newQ }); + + final String A = prefix + "a"; + conf.setCapacity(A, 5); + + final String dedicated = prefix + reservationQ; + conf.setCapacity(dedicated, 80); + // Set as reservation queue + conf.setReservableQueue(dedicated, true); + + conf.setCapacity(prefix + newQ, 10); + // Set as reservation queue + conf.setReservableQueue(prefix + newQ, true); + + // Define 2nd-level queues + final String A1 = A + ".a1"; + final String A2 = A + ".a2"; + conf.setQueues(A, new String[] { "a1", "a2" }); + conf.setCapacity(A1, 30); + conf.setCapacity(A2, 70); + } + + public static ReservationDefinition generateRandomRR(Random rand, long i) { + rand.setSeed(i); + long now = System.currentTimeMillis(); + + // start time at random in the next 12 hours + long arrival = rand.nextInt(12 * 3600 * 1000); + // deadline at random in the next day + long deadline = arrival + rand.nextInt(24 * 3600 * 1000); + + // create a request with a single atomic ask + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(now + arrival); + rr.setDeadline(now + deadline); + + int gang = 1 + rand.nextInt(9); + int par = (rand.nextInt(1000) + 1) * gang; + long dur = rand.nextInt(2 * 3600 * 1000); // random duration within 2h + ReservationRequest r = + ReservationRequest.newInstance(Resource.newInstance(1024, 1), par, + gang, dur); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setReservationResources(Collections.singletonList(r)); + rand.nextInt(3); + ReservationRequestInterpreter[] type = + ReservationRequestInterpreter.values(); + reqs.setInterpreter(type[rand.nextInt(type.length)]); + rr.setReservationRequests(reqs); + + return rr; + + } + + public static ReservationDefinition generateBigRR(Random rand, long i) { + rand.setSeed(i); + long now = System.currentTimeMillis(); + + // start time at random in the next 2 hours + long arrival = rand.nextInt(2 * 3600 * 1000); + // deadline at random in the next day + long deadline = rand.nextInt(24 * 3600 * 1000); + + // create a request with a single atomic ask + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(now + arrival); + rr.setDeadline(now + deadline); + + int gang = 1; + int par = 100000; // 100k tasks + long dur = rand.nextInt(60 * 1000); // 1min tasks + ReservationRequest r = + ReservationRequest.newInstance(Resource.newInstance(1024, 1), par, + gang, dur); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setReservationResources(Collections.singletonList(r)); + rand.nextInt(3); + ReservationRequestInterpreter[] type = + ReservationRequestInterpreter.values(); + reqs.setInterpreter(type[rand.nextInt(type.length)]); + rr.setReservationRequests(reqs); + + return rr; + } + + public static Map generateAllocation( + long startTime, long step, int[] alloc) { + Map req = + new TreeMap(); + for (int i = 0; i < alloc.length; i++) { + req.put(new ReservationInterval(startTime + i * step, startTime + (i + 1) + * step), ReservationRequest.newInstance( + Resource.newInstance(1024, 1), alloc[i])); + } + return req; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java new file mode 100644 index 00000000000..6dcd41ff03a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java @@ -0,0 +1,477 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestInMemoryPlan { + + private String user = "yarn"; + private String planName = "test-reservation"; + private ResourceCalculator resCalc; + private Resource minAlloc; + private Resource maxAlloc; + private Resource totalCapacity; + + private Clock clock; + private QueueMetrics queueMetrics; + private SharingPolicy policy; + private ReservationAgent agent; + private Planner replanner; + + @Before + public void setUp() throws PlanningException { + resCalc = new DefaultResourceCalculator(); + minAlloc = Resource.newInstance(1024, 1); + maxAlloc = Resource.newInstance(64 * 1024, 20); + totalCapacity = Resource.newInstance(100 * 1024, 100); + + clock = mock(Clock.class); + queueMetrics = mock(QueueMetrics.class); + policy = mock(SharingPolicy.class); + replanner = mock(Planner.class); + + when(clock.getTime()).thenReturn(1L); + } + + @After + public void tearDown() { + resCalc = null; + minAlloc = null; + maxAlloc = null; + totalCapacity = null; + + clock = null; + queueMetrics = null; + policy = null; + replanner = null; + } + + @Test + public void testAddReservation() { + Plan plan = + new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, + resCalc, minAlloc, maxAlloc, planName, replanner, true); + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + Map allocations = + generateAllocation(start, alloc, false); + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length, + alloc.length, allocations.values()); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length, allocations, resCalc, minAlloc); + Assert.assertNull(plan.getReservationById(reservationID)); + try { + plan.addReservation(rAllocation); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + doAssertions(plan, rAllocation); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), + plan.getTotalCommittedResources(start + i)); + Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), + plan.getConsumptionForUser(user, start + i)); + } + } + + @Test + public void testAddEmptyReservation() { + Plan plan = + new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, + resCalc, minAlloc, maxAlloc, planName, replanner, true); + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + int[] alloc = {}; + int start = 100; + Map allocations = + new HashMap(); + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length, + alloc.length, allocations.values()); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length, allocations, resCalc, minAlloc); + Assert.assertNull(plan.getReservationById(reservationID)); + try { + plan.addReservation(rAllocation); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testAddReservationAlreadyExists() { + // First add a reservation + Plan plan = + new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, + resCalc, minAlloc, maxAlloc, planName, replanner, true); + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + Map allocations = + generateAllocation(start, alloc, false); + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length, + alloc.length, allocations.values()); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length, allocations, resCalc, minAlloc); + Assert.assertNull(plan.getReservationById(reservationID)); + try { + plan.addReservation(rAllocation); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + doAssertions(plan, rAllocation); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), + plan.getTotalCommittedResources(start + i)); + Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), + plan.getConsumptionForUser(user, start + i)); + } + + // Try to add it again + try { + plan.addReservation(rAllocation); + Assert.fail("Add should fail as it already exists"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().endsWith("already exists")); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + doAssertions(plan, rAllocation); + } + + @Test + public void testUpdateReservation() { + Plan plan = + new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, + resCalc, minAlloc, maxAlloc, planName, replanner, true); + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + // First add a reservation + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + Map allocations = + generateAllocation(start, alloc, false); + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length, + alloc.length, allocations.values()); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length, allocations, resCalc, minAlloc); + Assert.assertNull(plan.getReservationById(reservationID)); + try { + plan.addReservation(rAllocation); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + doAssertions(plan, rAllocation); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), + plan.getTotalCommittedResources(start + i)); + Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), + plan.getConsumptionForUser(user, start + i)); + } + + // Now update it + start = 110; + int[] updatedAlloc = { 0, 5, 10, 10, 5, 0 }; + allocations = generateAllocation(start, updatedAlloc, true); + rDef = + createSimpleReservationDefinition(start, start + updatedAlloc.length, + updatedAlloc.length, allocations.values()); + rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + updatedAlloc.length, allocations, resCalc, minAlloc); + try { + plan.updateReservation(rAllocation); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + doAssertions(plan, rAllocation); + for (int i = 0; i < updatedAlloc.length; i++) { + Assert.assertEquals( + Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i] + + i), plan.getTotalCommittedResources(start + i)); + Assert.assertEquals( + Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i] + + i), plan.getConsumptionForUser(user, start + i)); + } + } + + @Test + public void testUpdateNonExistingReservation() { + Plan plan = + new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, + resCalc, minAlloc, maxAlloc, planName, replanner, true); + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + // Try to update a reservation without adding + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + Map allocations = + generateAllocation(start, alloc, false); + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length, + alloc.length, allocations.values()); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length, allocations, resCalc, minAlloc); + Assert.assertNull(plan.getReservationById(reservationID)); + try { + plan.updateReservation(rAllocation); + Assert.fail("Update should fail as it does not exist in the plan"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().endsWith("does not exist in the plan")); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + Assert.assertNull(plan.getReservationById(reservationID)); + } + + @Test + public void testDeleteReservation() { + // First add a reservation + Plan plan = + new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, + resCalc, minAlloc, maxAlloc, planName, replanner, true); + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + Map allocations = + generateAllocation(start, alloc, true); + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length, + alloc.length, allocations.values()); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length, allocations, resCalc, minAlloc); + Assert.assertNull(plan.getReservationById(reservationID)); + try { + plan.addReservation(rAllocation); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + doAssertions(plan, rAllocation); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals( + Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)), + plan.getTotalCommittedResources(start + i)); + Assert.assertEquals( + Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)), + plan.getConsumptionForUser(user, start + i)); + } + + // Now delete it + try { + plan.deleteReservation(reservationID); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + Assert.assertNull(plan.getReservationById(reservationID)); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(0, 0), + plan.getTotalCommittedResources(start + i)); + Assert.assertEquals(Resource.newInstance(0, 0), + plan.getConsumptionForUser(user, start + i)); + } + } + + @Test + public void testDeleteNonExistingReservation() { + Plan plan = + new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, + resCalc, minAlloc, maxAlloc, planName, replanner, true); + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + // Try to delete a reservation without adding + Assert.assertNull(plan.getReservationById(reservationID)); + try { + plan.deleteReservation(reservationID); + Assert.fail("Delete should fail as it does not exist in the plan"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().endsWith("does not exist in the plan")); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + Assert.assertNull(plan.getReservationById(reservationID)); + } + + @Test + public void testArchiveCompletedReservations() { + Plan plan = + new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, + resCalc, minAlloc, maxAlloc, planName, replanner, true); + ReservationId reservationID1 = + ReservationSystemTestUtil.getNewReservationId(); + // First add a reservation + int[] alloc1 = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + Map allocations1 = + generateAllocation(start, alloc1, false); + ReservationDefinition rDef1 = + createSimpleReservationDefinition(start, start + alloc1.length, + alloc1.length, allocations1.values()); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID1, rDef1, user, + planName, start, start + alloc1.length, allocations1, resCalc, + minAlloc); + Assert.assertNull(plan.getReservationById(reservationID1)); + try { + plan.addReservation(rAllocation); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + doAssertions(plan, rAllocation); + for (int i = 0; i < alloc1.length; i++) { + Assert.assertEquals( + Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])), + plan.getTotalCommittedResources(start + i)); + Assert.assertEquals( + Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])), + plan.getConsumptionForUser(user, start + i)); + } + + // Now add another one + ReservationId reservationID2 = + ReservationSystemTestUtil.getNewReservationId(); + int[] alloc2 = { 0, 5, 10, 5, 0 }; + Map allocations2 = + generateAllocation(start, alloc2, true); + ReservationDefinition rDef2 = + createSimpleReservationDefinition(start, start + alloc2.length, + alloc2.length, allocations2.values()); + rAllocation = + new InMemoryReservationAllocation(reservationID2, rDef2, user, + planName, start, start + alloc2.length, allocations2, resCalc, + minAlloc); + Assert.assertNull(plan.getReservationById(reservationID2)); + try { + plan.addReservation(rAllocation); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(plan.getReservationById(reservationID2)); + for (int i = 0; i < alloc2.length; i++) { + Assert.assertEquals( + Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i] + + alloc2[i] + i), plan.getTotalCommittedResources(start + i)); + Assert.assertEquals( + Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i] + + alloc2[i] + i), plan.getConsumptionForUser(user, start + i)); + } + + // Now archive completed reservations + when(clock.getTime()).thenReturn(106L); + when(policy.getValidWindow()).thenReturn(1L); + try { + // will only remove 2nd reservation as only that has fallen out of the + // archival window + plan.archiveCompletedReservations(clock.getTime()); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(plan.getReservationById(reservationID1)); + Assert.assertNull(plan.getReservationById(reservationID2)); + for (int i = 0; i < alloc1.length; i++) { + Assert.assertEquals( + Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])), + plan.getTotalCommittedResources(start + i)); + Assert.assertEquals( + Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])), + plan.getConsumptionForUser(user, start + i)); + } + when(clock.getTime()).thenReturn(107L); + try { + // will remove 1st reservation also as it has fallen out of the archival + // window + plan.archiveCompletedReservations(clock.getTime()); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + Assert.assertNull(plan.getReservationById(reservationID1)); + for (int i = 0; i < alloc1.length; i++) { + Assert.assertEquals(Resource.newInstance(0, 0), + plan.getTotalCommittedResources(start + i)); + Assert.assertEquals(Resource.newInstance(0, 0), + plan.getConsumptionForUser(user, start + i)); + } + } + + private void doAssertions(Plan plan, ReservationAllocation rAllocation) { + ReservationId reservationID = rAllocation.getReservationId(); + Assert.assertNotNull(plan.getReservationById(reservationID)); + Assert.assertEquals(rAllocation, plan.getReservationById(reservationID)); + Assert.assertTrue(((InMemoryPlan) plan).getAllReservations().size() == 1); + Assert.assertEquals(rAllocation.getEndTime(), plan.getLastEndTime()); + Assert.assertEquals(totalCapacity, plan.getTotalCapacity()); + Assert.assertEquals(minAlloc, plan.getMinimumAllocation()); + Assert.assertEquals(maxAlloc, plan.getMaximumAllocation()); + Assert.assertEquals(resCalc, plan.getResourceCalculator()); + Assert.assertEquals(planName, plan.getQueueName()); + Assert.assertTrue(plan.getMoveOnExpiry()); + } + + private ReservationDefinition createSimpleReservationDefinition(long arrival, + long deadline, long duration, Collection resources) { + // create a request with a single atomic ask + ReservationDefinition rDef = new ReservationDefinitionPBImpl(); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setReservationResources(new ArrayList(resources)); + reqs.setInterpreter(ReservationRequestInterpreter.R_ALL); + rDef.setReservationRequests(reqs); + rDef.setArrival(arrival); + rDef.setDeadline(deadline); + return rDef; + } + + private Map generateAllocation( + int startTime, int[] alloc, boolean isStep) { + Map req = + new HashMap(); + int numContainers = 0; + for (int i = 0; i < alloc.length; i++) { + if (isStep) { + numContainers = alloc[i] + i; + } else { + numContainers = alloc[i]; + } + ReservationRequest rr = + ReservationRequest.newInstance(Resource.newInstance(1024, 1), + (numContainers)); + req.put(new ReservationInterval(startTime + i, startTime + i + 1), rr); + } + return req; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java new file mode 100644 index 00000000000..f4c4581d52a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java @@ -0,0 +1,206 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestInMemoryReservationAllocation { + + private String user = "yarn"; + private String planName = "test-reservation"; + private ResourceCalculator resCalc; + private Resource minAlloc; + + private Random rand = new Random(); + + @Before + public void setUp() { + resCalc = new DefaultResourceCalculator(); + minAlloc = Resource.newInstance(1, 1); + } + + @After + public void tearDown() { + user = null; + planName = null; + resCalc = null; + minAlloc = null; + } + + @Test + public void testBlocks() { + ReservationId reservationID = + ReservationId.newInstance(rand.nextLong(), rand.nextLong()); + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length + 1, + alloc.length); + Map allocations = + generateAllocation(start, alloc, false, false); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length + 1, allocations, resCalc, minAlloc); + doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc); + Assert.assertFalse(rAllocation.containsGangs()); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), + rAllocation.getResourcesAtTime(start + i)); + } + } + + @Test + public void testSteps() { + ReservationId reservationID = + ReservationId.newInstance(rand.nextLong(), rand.nextLong()); + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length + 1, + alloc.length); + Map allocations = + generateAllocation(start, alloc, true, false); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length + 1, allocations, resCalc, minAlloc); + doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc); + Assert.assertFalse(rAllocation.containsGangs()); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals( + Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)), + rAllocation.getResourcesAtTime(start + i)); + } + } + + @Test + public void testSkyline() { + ReservationId reservationID = + ReservationId.newInstance(rand.nextLong(), rand.nextLong()); + int[] alloc = { 0, 5, 10, 10, 5, 0 }; + int start = 100; + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length + 1, + alloc.length); + Map allocations = + generateAllocation(start, alloc, true, false); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length + 1, allocations, resCalc, minAlloc); + doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc); + Assert.assertFalse(rAllocation.containsGangs()); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals( + Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)), + rAllocation.getResourcesAtTime(start + i)); + } + } + + @Test + public void testZeroAlloaction() { + ReservationId reservationID = + ReservationId.newInstance(rand.nextLong(), rand.nextLong()); + int[] alloc = {}; + long start = 0; + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length + 1, + alloc.length); + Map allocations = + new HashMap(); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length + 1, allocations, resCalc, minAlloc); + doAssertions(rAllocation, reservationID, rDef, allocations, (int) start, + alloc); + Assert.assertFalse(rAllocation.containsGangs()); + } + + @Test + public void testGangAlloaction() { + ReservationId reservationID = + ReservationId.newInstance(rand.nextLong(), rand.nextLong()); + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length + 1, + alloc.length); + Map allocations = + generateAllocation(start, alloc, false, true); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length + 1, allocations, resCalc, minAlloc); + doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc); + Assert.assertTrue(rAllocation.containsGangs()); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), + rAllocation.getResourcesAtTime(start + i)); + } + } + + private void doAssertions(ReservationAllocation rAllocation, + ReservationId reservationID, ReservationDefinition rDef, + Map allocations, int start, + int[] alloc) { + Assert.assertEquals(reservationID, rAllocation.getReservationId()); + Assert.assertEquals(rDef, rAllocation.getReservationDefinition()); + Assert.assertEquals(allocations, rAllocation.getAllocationRequests()); + Assert.assertEquals(user, rAllocation.getUser()); + Assert.assertEquals(planName, rAllocation.getPlanName()); + Assert.assertEquals(start, rAllocation.getStartTime()); + Assert.assertEquals(start + alloc.length + 1, rAllocation.getEndTime()); + } + + private ReservationDefinition createSimpleReservationDefinition(long arrival, + long deadline, long duration) { + // create a request with a single atomic ask + ReservationRequest r = + ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1, + duration); + ReservationDefinition rDef = new ReservationDefinitionPBImpl(); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setReservationResources(Collections.singletonList(r)); + reqs.setInterpreter(ReservationRequestInterpreter.R_ALL); + rDef.setReservationRequests(reqs); + rDef.setArrival(arrival); + rDef.setDeadline(deadline); + return rDef; + } + + private Map generateAllocation( + int startTime, int[] alloc, boolean isStep, boolean isGang) { + Map req = + new HashMap(); + int numContainers = 0; + for (int i = 0; i < alloc.length; i++) { + if (isStep) { + numContainers = alloc[i] + i; + } else { + numContainers = alloc[i]; + } + ReservationRequest rr = + ReservationRequest.newInstance(Resource.newInstance(1024, 1), + (numContainers)); + if (isGang) { + rr.setConcurrency(numContainers); + } + req.put(new ReservationInterval(startTime + i, startTime + i + 1), rr); + } + return req; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java new file mode 100644 index 00000000000..ab0de6b25dd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java @@ -0,0 +1,169 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestRLESparseResourceAllocation { + + private static final Logger LOG = LoggerFactory + .getLogger(TestRLESparseResourceAllocation.class); + + @Test + public void testBlocks() { + ResourceCalculator resCalc = new DefaultResourceCalculator(); + Resource minAlloc = Resource.newInstance(1, 1); + + RLESparseResourceAllocation rleSparseVector = + new RLESparseResourceAllocation(resCalc, minAlloc); + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + Set> inputs = + generateAllocation(start, alloc, false).entrySet(); + for (Entry ip : inputs) { + rleSparseVector.addInterval(ip.getKey(), ip.getValue()); + } + LOG.info(rleSparseVector.toString()); + Assert.assertFalse(rleSparseVector.isEmpty()); + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(99)); + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(start + alloc.length + 1)); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), + rleSparseVector.getCapacityAtTime(start + i)); + } + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(start + alloc.length + 2)); + for (Entry ip : inputs) { + rleSparseVector.removeInterval(ip.getKey(), ip.getValue()); + } + LOG.info(rleSparseVector.toString()); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(start + i)); + } + Assert.assertTrue(rleSparseVector.isEmpty()); + } + + @Test + public void testSteps() { + ResourceCalculator resCalc = new DefaultResourceCalculator(); + Resource minAlloc = Resource.newInstance(1, 1); + + RLESparseResourceAllocation rleSparseVector = + new RLESparseResourceAllocation(resCalc, minAlloc); + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + Set> inputs = + generateAllocation(start, alloc, true).entrySet(); + for (Entry ip : inputs) { + rleSparseVector.addInterval(ip.getKey(), ip.getValue()); + } + LOG.info(rleSparseVector.toString()); + Assert.assertFalse(rleSparseVector.isEmpty()); + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(99)); + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(start + alloc.length + 1)); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals( + Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)), + rleSparseVector.getCapacityAtTime(start + i)); + } + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(start + alloc.length + 2)); + for (Entry ip : inputs) { + rleSparseVector.removeInterval(ip.getKey(), ip.getValue()); + } + LOG.info(rleSparseVector.toString()); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(start + i)); + } + Assert.assertTrue(rleSparseVector.isEmpty()); + } + + @Test + public void testSkyline() { + ResourceCalculator resCalc = new DefaultResourceCalculator(); + Resource minAlloc = Resource.newInstance(1, 1); + + RLESparseResourceAllocation rleSparseVector = + new RLESparseResourceAllocation(resCalc, minAlloc); + int[] alloc = { 0, 5, 10, 10, 5, 0 }; + int start = 100; + Set> inputs = + generateAllocation(start, alloc, true).entrySet(); + for (Entry ip : inputs) { + rleSparseVector.addInterval(ip.getKey(), ip.getValue()); + } + LOG.info(rleSparseVector.toString()); + Assert.assertFalse(rleSparseVector.isEmpty()); + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(99)); + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(start + alloc.length + 1)); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals( + Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)), + rleSparseVector.getCapacityAtTime(start + i)); + } + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(start + alloc.length + 2)); + for (Entry ip : inputs) { + rleSparseVector.removeInterval(ip.getKey(), ip.getValue()); + } + LOG.info(rleSparseVector.toString()); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(start + i)); + } + Assert.assertTrue(rleSparseVector.isEmpty()); + } + + @Test + public void testZeroAlloaction() { + ResourceCalculator resCalc = new DefaultResourceCalculator(); + Resource minAlloc = Resource.newInstance(1, 1); + RLESparseResourceAllocation rleSparseVector = + new RLESparseResourceAllocation(resCalc, minAlloc); + rleSparseVector.addInterval(new ReservationInterval(0, Long.MAX_VALUE), + ReservationRequest.newInstance(Resource.newInstance(0, 0), (0))); + LOG.info(rleSparseVector.toString()); + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(new Random().nextLong())); + Assert.assertTrue(rleSparseVector.isEmpty()); + } + + private Map generateAllocation( + int startTime, int[] alloc, boolean isStep) { + Map req = + new HashMap(); + int numContainers = 0; + for (int i = 0; i < alloc.length; i++) { + if (isStep) { + numContainers = alloc[i] + i; + } else { + numContainers = alloc[i]; + } + req.put(new ReservationInterval(startTime + i, startTime + i + 1), + + ReservationRequest.newInstance(Resource.newInstance(1024, 1), + (numContainers))); + } + return req; + } + +}