From 78a07e99ddf7c0850e67a8e6dd1eb41b9bdff247 Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Sat, 5 Dec 2015 21:26:16 -0800 Subject: [PATCH] YARN-4358. Reservation System: Improve relationship between SharingPolicy and ReservationAgent. (Carlo Curino via asuresh) (cherry picked from commit 742632e346604fd2b263bd42367165638fcf2416) --- hadoop-yarn-project/CHANGES.txt | 3 + .../reservation/CapacityOverTimePolicy.java | 52 +++++++- .../reservation/InMemoryPlan.java | 123 +++++++++++++++++- .../InMemoryReservationAllocation.java | 13 +- .../reservation/NoOverCommitPolicy.java | 8 ++ .../resourcemanager/reservation/PlanView.java | 65 +++++++-- .../reservation/ReservationAllocation.java | 12 +- .../reservation/SharingPolicy.java | 24 +++- .../planning/IterativePlanner.java | 16 ++- .../planning/PlanningAlgorithm.java | 34 ++--- .../reservation/planning/StageAllocator.java | 6 +- .../planning/StageAllocatorGreedy.java | 23 ++-- .../StageAllocatorLowCostAligned.java | 14 +- .../reservation/TestInMemoryPlan.java | 72 +++++----- .../planning/TestGreedyReservationAgent.java | 94 ++++++++++++- 15 files changed, 464 insertions(+), 95 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 7fa7f0203ac..47b804ae4d0 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -540,6 +540,9 @@ Release 2.8.0 - UNRELEASED YARN-4405. Support node label store in non-appendable file system. (Wangda Tan via jianhe) + YARN-4358. Reservation System: Improve relationship between SharingPolicy + and ReservationAgent. (Carlo Curino via asuresh) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java index afba7ea1bd9..424b54316df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java @@ -18,10 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import java.util.Date; +import java.util.NavigableMap; +import java.util.TreeMap; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; @@ -104,14 +108,17 @@ public class CapacityOverTimePolicy implements SharingPolicy { IntegralResource maxAllowed = new IntegralResource(maxAvgRes); maxAllowed.multiplyBy(validWindow / step); + RLESparseResourceAllocation userCons = + plan.getConsumptionForUserOverTime(reservation.getUser(), startTime + - validWindow, endTime + validWindow); + // check that the resources offered to the user during any window of length // "validWindow" overlapping this allocation are within maxAllowed // also enforce instantaneous and physical constraints during this pass for (long t = startTime - validWindow; t < endTime + validWindow; t += step) { Resource currExistingAllocTot = plan.getTotalCommittedResources(t); - Resource currExistingAllocForUser = - plan.getConsumptionForUser(reservation.getUser(), t); + Resource currExistingAllocForUser = userCons.getCapacityAtTime(t); Resource currNewAlloc = reservation.getResourcesAtTime(t); Resource currOldAlloc = Resources.none(); if (oldReservation != null) { @@ -163,8 +170,7 @@ public class CapacityOverTimePolicy implements SharingPolicy { // expire contributions from instant in time before (t - validWindow) if (t > startTime) { - Resource pastOldAlloc = - plan.getConsumptionForUser(reservation.getUser(), t - validWindow); + Resource pastOldAlloc = userCons.getCapacityAtTime(t - validWindow); Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow); // runningTot = runningTot - pastExistingAlloc - pastNewAlloc; @@ -188,6 +194,39 @@ public class CapacityOverTimePolicy implements SharingPolicy { } } + @Override + public RLESparseResourceAllocation availableResources( + RLESparseResourceAllocation available, Plan plan, String user, + ReservationId oldId, long start, long end) throws PlanningException { + + // this only propagates the instantaneous maxInst properties, while + // the time-varying one depends on the current allocation as well + // and are not easily captured here + Resource planTotalCapacity = plan.getTotalCapacity(); + Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst); + NavigableMap instQuota = new TreeMap(); + instQuota.put(start, maxInsRes); + + RLESparseResourceAllocation instRLEQuota = + new RLESparseResourceAllocation(instQuota, + plan.getResourceCalculator()); + + RLESparseResourceAllocation used = + plan.getConsumptionForUserOverTime(user, start, end); + + instRLEQuota = + RLESparseResourceAllocation.merge(plan.getResourceCalculator(), + planTotalCapacity, instRLEQuota, used, RLEOperator.subtract, start, + end); + + instRLEQuota = + RLESparseResourceAllocation.merge(plan.getResourceCalculator(), + planTotalCapacity, available, instRLEQuota, RLEOperator.min, start, + end); + + return instRLEQuota; + } + @Override public long getValidWindow() { return validWindow; @@ -198,7 +237,7 @@ public class CapacityOverTimePolicy implements SharingPolicy { * long(s), as using Resource to store the "integral" of the allocation over * time leads to integer overflows for large allocations/clusters. (Evolving * Resource to use long is too disruptive at this point.) - * + * * The comparison/multiplication behaviors of IntegralResource are consistent * with the DefaultResourceCalculator. */ @@ -244,4 +283,7 @@ public class CapacityOverTimePolicy implements SharingPolicy { return ""; } } + + + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java index af42df947db..c51c3ba2de5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java @@ -27,11 +27,13 @@ import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; @@ -65,6 +67,9 @@ public class InMemoryPlan implements Plan { private Map userResourceAlloc = new HashMap(); + private Map userActiveReservationCount = + new HashMap(); + private Map reservationTable = new HashMap(); @@ -121,6 +126,7 @@ public class InMemoryPlan implements Plan { return queueMetrics; } + private void incrementAllocation(ReservationAllocation reservation) { assert (readWriteLock.isWriteLockedByCurrentThread()); Map allocationRequests = @@ -132,11 +138,27 @@ public class InMemoryPlan implements Plan { resAlloc = new RLESparseResourceAllocation(resCalc); userResourceAlloc.put(user, resAlloc); } + RLESparseResourceAllocation resCount = userActiveReservationCount.get(user); + if (resCount == null) { + resCount = new RLESparseResourceAllocation(resCalc); + userActiveReservationCount.put(user, resCount); + } + + long earliestActive = Long.MAX_VALUE; + long latestActive = Long.MIN_VALUE; + for (Map.Entry r : allocationRequests .entrySet()) { resAlloc.addInterval(r.getKey(), r.getValue()); rleSparseVector.addInterval(r.getKey(), r.getValue()); + if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(), + ZERO_RESOURCE)) { + earliestActive = Math.min(earliestActive, r.getKey().getStartTime()); + latestActive = Math.max(latestActive, r.getKey().getEndTime()); + } } + resCount.addInterval(new ReservationInterval(earliestActive, latestActive), + Resource.newInstance(1, 1)); } private void decrementAllocation(ReservationAllocation reservation) { @@ -145,14 +167,29 @@ public class InMemoryPlan implements Plan { reservation.getAllocationRequests(); String user = reservation.getUser(); RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user); + + long earliestActive = Long.MAX_VALUE; + long latestActive = Long.MIN_VALUE; for (Map.Entry r : allocationRequests .entrySet()) { resAlloc.removeInterval(r.getKey(), r.getValue()); rleSparseVector.removeInterval(r.getKey(), r.getValue()); + if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(), + ZERO_RESOURCE)) { + earliestActive = Math.min(earliestActive, r.getKey().getStartTime()); + latestActive = Math.max(latestActive, r.getKey().getEndTime()); + } } if (resAlloc.isEmpty()) { userResourceAlloc.remove(user); } + + RLESparseResourceAllocation resCount = userActiveReservationCount.get(user); + resCount.removeInterval(new ReservationInterval(earliestActive, + latestActive), Resource.newInstance(1, 1)); + if (resCount.isEmpty()) { + userActiveReservationCount.remove(user); + } } public Set getAllReservations() { @@ -160,9 +197,9 @@ public class InMemoryPlan implements Plan { try { if (currentReservations != null) { Set flattenedReservations = - new HashSet(); - for (Set reservationEntries : currentReservations - .values()) { + new TreeSet(); + for (Set reservationEntries : + currentReservations.values()) { flattenedReservations.addAll(reservationEntries); } return flattenedReservations; @@ -417,14 +454,34 @@ public class InMemoryPlan implements Plan { } @Override - public Resource getConsumptionForUser(String user, long t) { + public RLESparseResourceAllocation getReservationCountForUserOverTime( + String user, long start, long end) { + readLock.lock(); + try { + RLESparseResourceAllocation userResAlloc = + userActiveReservationCount.get(user); + + if (userResAlloc != null) { + return userResAlloc.getRangeOverlapping(start, end); + } else { + return new RLESparseResourceAllocation(resCalc); + } + } finally { + readLock.unlock(); + } + } + + @Override + public RLESparseResourceAllocation getConsumptionForUserOverTime(String user, + long start, long end) { readLock.lock(); try { RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user); + if (userResAlloc != null) { - return userResAlloc.getCapacityAtTime(t); + return userResAlloc.getRangeOverlapping(start, end); } else { - return Resources.clone(ZERO_RESOURCE); + return new RLESparseResourceAllocation(resCalc); } } finally { readLock.unlock(); @@ -464,6 +521,43 @@ public class InMemoryPlan implements Plan { } } + @Override + public RLESparseResourceAllocation getAvailableResourceOverTime(String user, + ReservationId oldId, long start, long end) throws PlanningException { + readLock.lock(); + try { + // create RLE of totCapacity + TreeMap totAvailable = new TreeMap(); + totAvailable.put(start, Resources.clone(totalCapacity)); + RLESparseResourceAllocation totRLEAvail = + new RLESparseResourceAllocation(totAvailable, resCalc); + + // subtract used from available + RLESparseResourceAllocation netAvailable; + + netAvailable = + RLESparseResourceAllocation.merge(resCalc, + Resources.clone(totalCapacity), totRLEAvail, rleSparseVector, + RLEOperator.subtractTestNonNegative, start, end); + + // add back in old reservation used resources if any + ReservationAllocation old = reservationTable.get(oldId); + if (old != null) { + netAvailable = + RLESparseResourceAllocation.merge(resCalc, + Resources.clone(totalCapacity), netAvailable, + old.getResourcesOverTime(), RLEOperator.add, start, end); + } + // lower it if this is needed by the sharing policy + netAvailable = + getSharingPolicy().availableResources(netAvailable, this, user, + oldId, start, end); + return netAvailable; + } finally { + readLock.unlock(); + } + } + @Override public Resource getMinimumAllocation() { return Resources.clone(minAlloc); @@ -549,4 +643,21 @@ public class InMemoryPlan implements Plan { } } + @Override + public Set getReservationByUserAtTime(String user, + long t) { + readLock.lock(); + try { + Set resSet = new HashSet(); + for (ReservationAllocation ra : getReservationsAtTime(t)) { + String resUser = ra.getUser(); + if (resUser != null && resUser.equals(user)) { + resSet.add(ra); + } + } + return resSet; + } finally { + readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java index 55ab066ac16..69fd43f0363 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java @@ -132,12 +132,17 @@ public class InMemoryReservationAllocation implements ReservationAllocation { return Resources.clone(resourcesOverTime.getCapacityAtTime(tick)); } + @Override + public RLESparseResourceAllocation getResourcesOverTime(){ + return resourcesOverTime; + } + @Override public String toString() { StringBuilder sBuf = new StringBuilder(); sBuf.append(getReservationId()).append(" user:").append(getUser()) .append(" startTime: ").append(getStartTime()).append(" endTime: ") - .append(getEndTime()).append(" alloc:[") + .append(getEndTime()).append(" alloc:\n[") .append(resourcesOverTime.toString()).append("] "); return sBuf.toString(); } @@ -151,6 +156,12 @@ public class InMemoryReservationAllocation implements ReservationAllocation { if (this.getAcceptanceTime() < other.getAcceptanceTime()) { return 1; } + if (this.getReservationId().getId() > other.getReservationId().getId()) { + return -1; + } + if (this.getReservationId().getId() < other.getReservationId().getId()) { + return 1; + } return 0; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java index f87e9dc2807..119520ba423 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; @@ -89,4 +90,11 @@ public class NoOverCommitPolicy implements SharingPolicy { // nothing to do for this policy } + @Override + public RLESparseResourceAllocation availableResources( + RLESparseResourceAllocation available, Plan plan, String user, + ReservationId oldId, long start, long end) throws PlanningException { + return available; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java index 66c66cacb4d..f57c2e093a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import java.util.Set; @@ -40,6 +41,17 @@ public interface PlanView extends PlanContext { */ public ReservationAllocation getReservationById(ReservationId reservationID); + /** + * Return a set of {@link ReservationAllocation} that belongs to a certain + * user and overlaps time t. + * + * @param user the user being considered + * @param t the instant in time being considered + * @return {@link Set} for this user at this time + */ + public Set getReservationByUserAtTime(String user, + long t); + /** * Gets all the active reservations at the specified point of time * @@ -67,18 +79,6 @@ public interface PlanView extends PlanContext { */ Resource getTotalCommittedResources(long tick); - /** - * Returns the total {@link Resource} reserved for a given user at the - * specified time - * - * @param user the user who made the reservation(s) - * @param tick the time (UTC in ms) for which the reserved resources are - * requested - * @return the total {@link Resource} reserved for a given user at the - * specified time - */ - public Resource getConsumptionForUser(String user, long tick); - /** * Returns the overall capacity in terms of {@link Resource} assigned to this * plan (typically will correspond to the absolute capacity of the @@ -98,9 +98,48 @@ public interface PlanView extends PlanContext { /** * Returns the time (UTC in ms) at which the last reservation terminates - * + * * @return the time (UTC in ms) at which the last reservation terminates */ public long getLastEndTime(); + /** + * This method returns the amount of resources available to a given user + * (optionally if removing a certain reservation) over the start-end time + * range. + * + * @param user + * @param oldId + * @param start + * @param end + * @return a view of the plan as it is available to this user + * @throws PlanningException + */ + public RLESparseResourceAllocation getAvailableResourceOverTime(String user, + ReservationId oldId, long start, long end) throws PlanningException; + + /** + * This method returns a RLE encoded view of the user reservation count + * utilization between start and end time. + * + * @param user + * @param start + * @param end + * @return RLE encoded view of reservation used over time + */ + public RLESparseResourceAllocation getReservationCountForUserOverTime( + String user, long start, long end); + + /** + * This method returns a RLE encoded view of the user reservation utilization + * between start and end time. + * + * @param user + * @param start + * @param end + * @return RLE encoded view of resources used over time + */ + public RLESparseResourceAllocation getConsumptionForUserOverTime(String user, + long start, long end); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java index 0d3c692bc28..0da95acce48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAllocation.java @@ -50,14 +50,14 @@ public interface ReservationAllocation extends public ReservationDefinition getReservationDefinition(); /** - * Returns the time at which the reservation is activated + * Returns the time at which the reservation is activated. * * @return the time at which the reservation is activated */ public long getStartTime(); /** - * Returns the time at which the reservation terminates + * Returns the time at which the reservation terminates. * * @return the time at which the reservation terminates */ @@ -65,7 +65,7 @@ public interface ReservationAllocation extends /** * Returns the map of resources requested against the time interval for which - * they were + * they were. * * @return the allocationRequests the map of resources requested against the * time interval for which they were @@ -118,4 +118,10 @@ public interface ReservationAllocation extends */ public Resource getResourcesAtTime(long tick); + /** + * Return a RLE representation of used resources. + * @return a RLE encoding of resources allocated over time. + */ + public RLESparseResourceAllocation getResourcesOverTime(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java index 8f8d24c1cf8..e4580553068 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; /** @@ -32,7 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan public interface SharingPolicy { /** - * Initialize this policy + * Initialize this policy. * * @param planQueuePath the name of the queue for this plan * @param conf the system configuration @@ -53,6 +54,26 @@ public interface SharingPolicy { public void validate(Plan plan, ReservationAllocation newAllocation) throws PlanningException; + /** + * This method provide a (partial) instantaneous validation by applying + * business rules (such as max number of parallel containers allowed for a + * user). To provide the agent with more feedback the returned parameter is + * expressed in number of containers that can be fit in this time according to + * the business rules. + * + * @param available the amount of resources that would be offered if not + * constrained by the policy + * @param plan reference the the current Plan + * @param user the username + * @param start the start time for the range we are querying + * @param end the end time for the range we are querying + * @param oldId (optional) the id of a reservation being updated + * @throws PlanningException throws if the request is not valid + */ + public RLESparseResourceAllocation availableResources( + RLESparseResourceAllocation available, Plan plan, String user, + ReservationId oldId, long start, long end) throws PlanningException; + /** * Returns the time range before and after the current reservation considered * by this policy. In particular, this informs the archival process for the @@ -63,4 +84,5 @@ public interface SharingPolicy { */ public long getValidWindow(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java index d05b0ef192d..77362d58290 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResour import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -80,8 +81,8 @@ public class IterativePlanner extends PlanningAlgorithm { @Override public RLESparseResourceAllocation computeJobAllocation(Plan plan, - ReservationId reservationId, ReservationDefinition reservation) - throws ContractValidationException { + ReservationId reservationId, ReservationDefinition reservation, + String user) throws PlanningException { // Initialize initialize(plan, reservation); @@ -142,7 +143,7 @@ public class IterativePlanner extends PlanningAlgorithm { // Compute the allocation of a single stage Map curAlloc = computeStageAllocation(plan, currentReservationStage, - stageArrivalTime, stageDeadline); + stageArrivalTime, stageDeadline, user, reservationId); // If we did not find an allocation, return NULL // (unless it's an ANY job, then we simply continue). @@ -159,8 +160,8 @@ public class IterativePlanner extends PlanningAlgorithm { } // Get the start & end time of the current allocation - Long stageStartTime = findEarliestTime(curAlloc.keySet()); - Long stageEndTime = findLatestTime(curAlloc.keySet()); + Long stageStartTime = findEarliestTime(curAlloc); + Long stageEndTime = findLatestTime(curAlloc); // If we did find an allocation for the stage, add it for (Entry entry : curAlloc.entrySet()) { @@ -310,10 +311,11 @@ public class IterativePlanner extends PlanningAlgorithm { // Call algStageAllocator protected Map computeStageAllocation( Plan plan, ReservationRequest rr, long stageArrivalTime, - long stageDeadline) { + long stageDeadline, String user, ReservationId oldId) + throws PlanningException { return algStageAllocator.computeStageAllocation(plan, planLoads, - planModifications, rr, stageArrivalTime, stageDeadline); + planModifications, rr, stageArrivalTime, stageDeadline, user, oldId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java index 8b72b9f413f..e1b508d22b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java @@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; import java.util.Map; -import java.util.Set; +import java.util.Map.Entry; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; @@ -62,7 +62,7 @@ public abstract class PlanningAlgorithm implements ReservationAgent { // Compute the job allocation RLESparseResourceAllocation allocation = - computeJobAllocation(plan, reservationId, adjustedContract); + computeJobAllocation(plan, reservationId, adjustedContract, user); // If no job allocation was found, fail if (allocation == null) { @@ -84,8 +84,8 @@ public abstract class PlanningAlgorithm implements ReservationAgent { adjustedContract, // Contract user, // User name plan.getQueueName(), // Queue name - findEarliestTime(mapAllocations.keySet()), // Earliest start time - findLatestTime(mapAllocations.keySet()), // Latest end time + findEarliestTime(mapAllocations), // Earliest start time + findLatestTime(mapAllocations), // Latest end time mapAllocations, // Allocations plan.getResourceCalculator(), // Resource calculator plan.getMinimumAllocation()); // Minimum allocation @@ -111,14 +111,14 @@ public abstract class PlanningAlgorithm implements ReservationAgent { Resource zeroResource = Resource.newInstance(0, 0); // Pad at the beginning - long earliestStart = findEarliestTime(mapAllocations.keySet()); + long earliestStart = findEarliestTime(mapAllocations); if (jobArrival < earliestStart) { mapAllocations.put(new ReservationInterval(jobArrival, earliestStart), zeroResource); } // Pad at the beginning - long latestEnd = findLatestTime(mapAllocations.keySet()); + long latestEnd = findLatestTime(mapAllocations); if (latestEnd < jobDeadline) { mapAllocations.put(new ReservationInterval(latestEnd, jobDeadline), zeroResource); @@ -129,8 +129,8 @@ public abstract class PlanningAlgorithm implements ReservationAgent { } public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan, - ReservationId reservationId, ReservationDefinition reservation) - throws PlanningException, ContractValidationException; + ReservationId reservationId, ReservationDefinition reservation, + String user) throws PlanningException, ContractValidationException; @Override public boolean createReservation(ReservationId reservationId, String user, @@ -162,24 +162,26 @@ public abstract class PlanningAlgorithm implements ReservationAgent { } - protected static long findEarliestTime(Set sesInt) { + protected static long findEarliestTime( + Map sesInt) { long ret = Long.MAX_VALUE; - for (ReservationInterval s : sesInt) { - if (s.getStartTime() < ret) { - ret = s.getStartTime(); + for (Entry s : sesInt.entrySet()) { + if (s.getKey().getStartTime() < ret && s.getValue() != null) { + ret = s.getKey().getStartTime(); } } return ret; } - protected static long findLatestTime(Set sesInt) { + protected static long findLatestTime(Map sesInt) { long ret = Long.MIN_VALUE; - for (ReservationInterval s : sesInt) { - if (s.getEndTime() > ret) { - ret = s.getEndTime(); + for (Entry s : sesInt.entrySet()) { + if (s.getKey().getEndTime() > ret && s.getValue() != null) { + ret = s.getKey().getEndTime(); } } return ret; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java index 9df6b749462..b95f8d4e9d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java @@ -20,11 +20,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; import java.util.Map; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; /** * Interface for allocating a single stage in IterativePlanner. @@ -46,10 +48,12 @@ public interface StageAllocator { * * @return The computed allocation (or null if the stage could not be * allocated) + * @throws PlanningException */ Map computeStageAllocation(Plan plan, Map planLoads, RLESparseResourceAllocation planModifications, ReservationRequest rr, - long stageEarliestStart, long stageDeadline); + long stageEarliestStart, long stageDeadline, String user, + ReservationId oldId) throws PlanningException; } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java index 773fbdfc5a1..c8369707c2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java @@ -21,11 +21,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -40,7 +43,8 @@ public class StageAllocatorGreedy implements StageAllocator { public Map computeStageAllocation(Plan plan, Map planLoads, RLESparseResourceAllocation planModifications, ReservationRequest rr, - long stageEarliestStart, long stageDeadline) { + long stageEarliestStart, long stageDeadline, String user, + ReservationId oldId) throws PlanningException { Resource totalCapacity = plan.getTotalCapacity(); @@ -63,6 +67,15 @@ public class StageAllocatorGreedy implements StageAllocator { int maxGang = 0; + RLESparseResourceAllocation netAvailable = + plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart, + stageDeadline); + + netAvailable = + RLESparseResourceAllocation.merge(plan.getResourceCalculator(), + plan.getTotalCapacity(), netAvailable, planModifications, + RLEOperator.subtract, stageEarliestStart, stageDeadline); + // loop trying to place until we are done, or we are considering // an invalid range of times while (gangsToPlace > 0 && stageDeadline - dur >= stageEarliestStart) { @@ -79,13 +92,7 @@ public class StageAllocatorGreedy implements StageAllocator { for (long t = stageDeadline - plan.getStep(); t >= stageDeadline - dur && maxGang > 0; t = t - plan.getStep()) { - // compute net available resources - Resource netAvailableRes = Resources.clone(totalCapacity); - // Resources.addTo(netAvailableRes, oldResCap); - Resources.subtractFrom(netAvailableRes, - plan.getTotalCommittedResources(t)); - Resources.subtractFrom(netAvailableRes, - planModifications.getCapacityAtTime(t)); + Resource netAvailableRes = netAvailable.getCapacityAtTime(t); // compute maximum number of gangs we could fit curMaxGang = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java index 04cce7ba5b6..b9fd8e15959 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java @@ -22,6 +22,7 @@ import java.util.Comparator; import java.util.Map; import java.util.TreeSet; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; @@ -60,7 +61,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator { public Map computeStageAllocation( Plan plan, Map planLoads, RLESparseResourceAllocation planModifications, ReservationRequest rr, - long stageEarliestStart, long stageDeadline) { + long stageEarliestStart, long stageDeadline, String user, + ReservationId oldId) { // Initialize ResourceCalculator resCalc = plan.getResourceCalculator(); @@ -136,7 +138,9 @@ public class StageAllocatorLowCostAligned implements StageAllocator { DurationInterval bestDurationInterval = durationIntervalsSortedByCost.first(); int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs); - + numGangsToAllocate = + Math.min(numGangsToAllocate, + bestDurationInterval.numCanFit(gang, capacity, resCalc)); // Add it remainingGangs -= numGangsToAllocate; @@ -355,5 +359,11 @@ public class StageAllocatorLowCostAligned implements StageAllocator { this.cost = value; } + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(" start: " + startTime).append(" end: " + endTime) + .append(" cost: " + cost).append(" maxLoad: " + maxLoad); + return sb.toString(); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java index 2e262a070fc..1756e869404 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java @@ -118,11 +118,18 @@ public class TestInMemoryPlan { Assert.fail(e.getMessage()); } doAssertions(plan, rAllocation); + checkAllocation(plan, alloc, start); + } + + private void checkAllocation(Plan plan, int[] alloc, int start) { + RLESparseResourceAllocation userCons = + plan.getConsumptionForUserOverTime(user, start, start + alloc.length); + for (int i = 0; i < alloc.length; i++) { Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), plan.getTotalCommittedResources(start + i)); Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), - plan.getConsumptionForUser(user, start + i)); + userCons.getCapacityAtTime(start + i)); } } @@ -180,12 +187,7 @@ public class TestInMemoryPlan { Assert.fail(e.getMessage()); } doAssertions(plan, rAllocation); - for (int i = 0; i < alloc.length; i++) { - Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), - plan.getTotalCommittedResources(start + i)); - Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), - plan.getConsumptionForUser(user, start + i)); - } + checkAllocation(plan, alloc, start); // Try to add it again try { @@ -226,11 +228,14 @@ public class TestInMemoryPlan { Assert.fail(e.getMessage()); } doAssertions(plan, rAllocation); + + RLESparseResourceAllocation userCons = + plan.getConsumptionForUserOverTime(user, start, start + alloc.length); for (int i = 0; i < alloc.length; i++) { Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), plan.getTotalCommittedResources(start + i)); Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), - plan.getConsumptionForUser(user, start + i)); + userCons.getCapacityAtTime(start + i)); } // Now update it @@ -252,13 +257,18 @@ public class TestInMemoryPlan { Assert.fail(e.getMessage()); } doAssertions(plan, rAllocation); + + userCons = + plan.getConsumptionForUserOverTime(user, start, start + + updatedAlloc.length); + for (int i = 0; i < updatedAlloc.length; i++) { Assert.assertEquals( - Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i] + Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i] + i), plan.getTotalCommittedResources(start + i)); Assert.assertEquals( Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i] - + i), plan.getConsumptionForUser(user, start + i)); + + i), userCons.getCapacityAtTime(start + i)); } } @@ -321,13 +331,17 @@ public class TestInMemoryPlan { Assert.fail(e.getMessage()); } doAssertions(plan, rAllocation); + + RLESparseResourceAllocation userCons = + plan.getConsumptionForUserOverTime(user, start, start + alloc.length); + for (int i = 0; i < alloc.length; i++) { Assert.assertEquals( Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)), plan.getTotalCommittedResources(start + i)); Assert.assertEquals( Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)), - plan.getConsumptionForUser(user, start + i)); + userCons.getCapacityAtTime(start + i)); } // Now delete it @@ -337,11 +351,13 @@ public class TestInMemoryPlan { Assert.fail(e.getMessage()); } Assert.assertNull(plan.getReservationById(reservationID)); + userCons = + plan.getConsumptionForUserOverTime(user, start, start + alloc.length); for (int i = 0; i < alloc.length; i++) { Assert.assertEquals(Resource.newInstance(0, 0), plan.getTotalCommittedResources(start + i)); Assert.assertEquals(Resource.newInstance(0, 0), - plan.getConsumptionForUser(user, start + i)); + userCons.getCapacityAtTime(start + i)); } } @@ -393,14 +409,8 @@ public class TestInMemoryPlan { Assert.fail(e.getMessage()); } doAssertions(plan, rAllocation); - for (int i = 0; i < alloc1.length; i++) { - Assert.assertEquals( - Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])), - plan.getTotalCommittedResources(start + i)); - Assert.assertEquals( - Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])), - plan.getConsumptionForUser(user, start + i)); - } + checkAllocation(plan, alloc1, start); + // Now add another one ReservationId reservationID2 = @@ -424,13 +434,17 @@ public class TestInMemoryPlan { Assert.fail(e.getMessage()); } Assert.assertNotNull(plan.getReservationById(reservationID2)); + + RLESparseResourceAllocation userCons = + plan.getConsumptionForUserOverTime(user, start, start + alloc2.length); + for (int i = 0; i < alloc2.length; i++) { Assert.assertEquals( Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i] + alloc2[i] + i), plan.getTotalCommittedResources(start + i)); Assert.assertEquals( Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i] - + alloc2[i] + i), plan.getConsumptionForUser(user, start + i)); + + alloc2[i] + i), userCons.getCapacityAtTime(start + i)); } // Now archive completed reservations @@ -445,14 +459,8 @@ public class TestInMemoryPlan { } Assert.assertNotNull(plan.getReservationById(reservationID1)); Assert.assertNull(plan.getReservationById(reservationID2)); - for (int i = 0; i < alloc1.length; i++) { - Assert.assertEquals( - Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])), - plan.getTotalCommittedResources(start + i)); - Assert.assertEquals( - Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])), - plan.getConsumptionForUser(user, start + i)); - } + checkAllocation(plan, alloc1, start); + when(clock.getTime()).thenReturn(107L); try { // will remove 1st reservation also as it has fallen out of the archival @@ -461,12 +469,16 @@ public class TestInMemoryPlan { } catch (PlanningException e) { Assert.fail(e.getMessage()); } + + userCons = + plan.getConsumptionForUserOverTime(user, start, start + alloc1.length); + Assert.assertNull(plan.getReservationById(reservationID1)); for (int i = 0; i < alloc1.length; i++) { Assert.assertEquals(Resource.newInstance(0, 0), plan.getTotalCommittedResources(start + i)); Assert.assertEquals(Resource.newInstance(0, 0), - plan.getConsumptionForUser(user, start + i)); + userCons.getCapacityAtTime(start + i)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java index cb4eaebf94e..f81e7ec3ecb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java @@ -18,9 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -86,6 +89,7 @@ public class TestGreedyReservationAgent { instConstraint, avgConstraint); CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); policy.init(reservationQ, conf); + agent = new GreedyReservationAgent(); QueueMetrics queueMetrics = mock(QueueMetrics.class); @@ -135,6 +139,94 @@ public class TestGreedyReservationAgent { } + @SuppressWarnings("javadoc") + @Test + public void testSharingPolicyFeedback() throws PlanningException { + + prepareBasicPlan(); + + // let's constraint the instantaneous allocation and see the + // policy kicking in during planning + float instConstraint = 40; + float avgConstraint = 40; + + ReservationSchedulerConfiguration conf = + ReservationSystemTestUtil.createConf(plan.getQueueName(), 100000, + instConstraint, avgConstraint); + + plan.getSharingPolicy().init(plan.getQueueName(), conf); + + // create a request with a single atomic ask + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(5 * step); + rr.setDeadline(100 * step); + ReservationRequest r = + ReservationRequest.newInstance(Resource.newInstance(2048, 2), 20, 20, + 10 * step); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setReservationResources(Collections.singletonList(r)); + rr.setReservationRequests(reqs); + + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u3", plan, rr); + + ReservationId reservationID2 = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID2, "u3", plan, rr); + + ReservationDefinition rr3 = new ReservationDefinitionPBImpl(); + rr3.setArrival(5 * step); + rr3.setDeadline(100 * step); + ReservationRequest r3 = + ReservationRequest.newInstance(Resource.newInstance(2048, 2), 45, 45, + 10 * step); + ReservationRequests reqs3 = new ReservationRequestsPBImpl(); + reqs3.setReservationResources(Collections.singletonList(r3)); + rr3.setReservationRequests(reqs3); + + ReservationId reservationID3 = + ReservationSystemTestUtil.getNewReservationId(); + try { + // RR3 is simply too big to fit + agent.createReservation(reservationID3, "u3", plan, rr3); + fail(); + } catch (PlanningException pe) { + // expected + } + + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == 4); + + ReservationAllocation cs = plan.getReservationById(reservationID); + ReservationAllocation cs2 = plan.getReservationById(reservationID2); + ReservationAllocation cs3 = plan.getReservationById(reservationID3); + + assertNotNull(cs); + assertNotNull(cs2); + assertNull(cs3); + + System.out.println("--------AFTER SIMPLE ALLOCATION (queue: " + + reservationID + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + for (long i = 90 * step; i < 100 * step; i++) { + assertTrue( + "Agent-based allocation unexpected", + Resources.equals(cs.getResourcesAtTime(i), + Resource.newInstance(2048 * 20, 2 * 20))); + } + // RR2 is pushed out by the presence of RR + for (long i = 80 * step; i < 90 * step; i++) { + assertTrue( + "Agent-based allocation unexpected", + Resources.equals(cs2.getResourcesAtTime(i), + Resource.newInstance(2048 * 20, 2 * 20))); + } + } + @Test public void testOrder() throws PlanningException { prepareBasicPlan(); @@ -186,7 +278,6 @@ public class TestGreedyReservationAgent { assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1)); assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1)); assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1)); - System.out.println("--------AFTER ORDER ALLOCATION (queue: " + reservationID + ")----------"); System.out.println(plan.toString()); @@ -376,7 +467,6 @@ public class TestGreedyReservationAgent { ReservationAllocation cs = plan.getReservationById(reservationID); assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1)); - System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID + ")----------"); System.out.println(plan.toString());