From da1016365aba1cee9c06771ab142d077379f27af Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Sat, 21 Nov 2015 09:59:41 -0800 Subject: [PATCH] YARN-3454. Add efficient merge operation to RLESparseResourceAllocation (Carlo Curino via asuresh) --- hadoop-yarn-project/CHANGES.txt | 3 + .../reservation/InMemoryPlan.java | 6 +- .../InMemoryReservationAllocation.java | 2 +- .../RLESparseResourceAllocation.java | 380 +++++++++++++----- .../planning/IterativePlanner.java | 6 +- .../StageAllocatorLowCostAligned.java | 3 +- .../TestRLESparseResourceAllocation.java | 355 +++++++++++++++- 7 files changed, 627 insertions(+), 128 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9de331c701a..29eca0c6df0 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -563,6 +563,9 @@ Release 2.8.0 - UNRELEASED YARN-4279. Mark ApplicationId and ApplicationAttemptId static methods as @Public, @Unstable. (stevel) + YARN-3454. Add efficient merge operation to RLESparseResourceAllocation + (Carlo Curino via asuresh) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java index 7e2567ba572..af42df947db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java @@ -108,7 +108,7 @@ public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, this.resCalc = resCalc; this.minAlloc = minAlloc; this.maxAlloc = maxAlloc; - this.rleSparseVector = new RLESparseResourceAllocation(resCalc, minAlloc); + this.rleSparseVector = new RLESparseResourceAllocation(resCalc); this.queueName = queueName; this.replanner = replanner; this.getMoveOnExpiry = getMoveOnExpiry; @@ -129,7 +129,7 @@ private void incrementAllocation(ReservationAllocation reservation) { String user = reservation.getUser(); RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user); if (resAlloc == null) { - resAlloc = new RLESparseResourceAllocation(resCalc, minAlloc); + resAlloc = new RLESparseResourceAllocation(resCalc); userResourceAlloc.put(user, resAlloc); } for (Map.Entry r : allocationRequests @@ -492,7 +492,7 @@ public long getEarliestStartTime() { public long getLastEndTime() { readLock.lock(); try { - return rleSparseVector.getLatestEndTime(); + return rleSparseVector.getLatestNonNullTime(); } finally { readLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java index 42a2243e557..55ab066ac16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java @@ -67,7 +67,7 @@ public InMemoryReservationAllocation(ReservationId reservationID, this.allocationRequests = allocations; this.planName = planName; this.hasGang = hasGang; - resourcesOverTime = new RLESparseResourceAllocation(calculator, minAlloc); + resourcesOverTime = new RLESparseResourceAllocation(calculator); for (Map.Entry r : allocations .entrySet()) { resourcesOverTime.addInterval(r.getKey(), r.getValue()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java index 80f2ff7b1dc..63defb5474b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java @@ -24,13 +24,12 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; -import java.util.Set; -import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -43,9 +42,9 @@ public class RLESparseResourceAllocation { private static final int THRESHOLD = 100; - private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); + private static final Resource ZERO_RESOURCE = Resources.none(); - private TreeMap cumulativeCapacity = + private NavigableMap cumulativeCapacity = new TreeMap(); private final ReentrantReadWriteLock readWriteLock = @@ -54,26 +53,20 @@ public class RLESparseResourceAllocation { private final Lock writeLock = readWriteLock.writeLock(); private final ResourceCalculator resourceCalculator; - private final Resource minAlloc; - public RLESparseResourceAllocation(ResourceCalculator resourceCalculator, - Resource minAlloc) { + public RLESparseResourceAllocation(ResourceCalculator resourceCalculator) { this.resourceCalculator = resourceCalculator; - this.minAlloc = minAlloc; } - private boolean isSameAsPrevious(Long key, Resource capacity) { - Entry previous = cumulativeCapacity.lowerEntry(key); - return (previous != null && previous.getValue().equals(capacity)); - } - - private boolean isSameAsNext(Long key, Resource capacity) { - Entry next = cumulativeCapacity.higherEntry(key); - return (next != null && next.getValue().equals(capacity)); + public RLESparseResourceAllocation(NavigableMap out, + ResourceCalculator resourceCalculator) { + // miss check for repeated entries + this.cumulativeCapacity = out; + this.resourceCalculator = resourceCalculator; } /** - * Add a resource for the specified interval + * Add a resource for the specified interval. * * @param reservationInterval the interval for which the resource is to be * added @@ -87,48 +80,15 @@ public boolean addInterval(ReservationInterval reservationInterval, } writeLock.lock(); try { - long startKey = reservationInterval.getStartTime(); - long endKey = reservationInterval.getEndTime(); - NavigableMap ticks = - cumulativeCapacity.headMap(endKey, false); - if (ticks != null && !ticks.isEmpty()) { - Resource updatedCapacity = Resource.newInstance(0, 0); - Entry lowEntry = ticks.floorEntry(startKey); - if (lowEntry == null) { - // This is the earliest starting interval - cumulativeCapacity.put(startKey, totCap); - } else { - updatedCapacity = Resources.add(lowEntry.getValue(), totCap); - // Add a new tick only if the updated value is different - // from the previous tick - if ((startKey == lowEntry.getKey()) - && (isSameAsPrevious(lowEntry.getKey(), updatedCapacity))) { - cumulativeCapacity.remove(lowEntry.getKey()); - } else { - cumulativeCapacity.put(startKey, updatedCapacity); - } - } - // Increase all the capacities of overlapping intervals - Set> overlapSet = - ticks.tailMap(startKey, false).entrySet(); - for (Entry entry : overlapSet) { - updatedCapacity = Resources.add(entry.getValue(), totCap); - entry.setValue(updatedCapacity); - } - } else { - // This is the first interval to be added - cumulativeCapacity.put(startKey, totCap); - } - Resource nextTick = cumulativeCapacity.get(endKey); - if (nextTick != null) { - // If there is overlap, remove the duplicate entry - if (isSameAsPrevious(endKey, nextTick)) { - cumulativeCapacity.remove(endKey); - } - } else { - // Decrease capacity as this is end of the interval - cumulativeCapacity.put(endKey, Resources.subtract(cumulativeCapacity - .floorEntry(endKey).getValue(), totCap)); + NavigableMap addInt = new TreeMap(); + addInt.put(reservationInterval.getStartTime(), totCap); + addInt.put(reservationInterval.getEndTime(), ZERO_RESOURCE); + try { + cumulativeCapacity = + merge(resourceCalculator, totCap, cumulativeCapacity, addInt, + Long.MIN_VALUE, Long.MAX_VALUE, RLEOperator.add); + } catch (PlanningException e) { + // never happens for add } return true; } finally { @@ -137,7 +97,7 @@ public boolean addInterval(ReservationInterval reservationInterval, } /** - * Removes a resource for the specified interval + * Removes a resource for the specified interval. * * @param reservationInterval the interval for which the resource is to be * removed @@ -151,34 +111,16 @@ public boolean removeInterval(ReservationInterval reservationInterval, } writeLock.lock(); try { - long startKey = reservationInterval.getStartTime(); - long endKey = reservationInterval.getEndTime(); - // update the start key - NavigableMap ticks = - cumulativeCapacity.headMap(endKey, false); - // Decrease all the capacities of overlapping intervals - SortedMap overlapSet = ticks.tailMap(startKey); - if (overlapSet != null && !overlapSet.isEmpty()) { - Resource updatedCapacity = Resource.newInstance(0, 0); - long currentKey = -1; - for (Iterator> overlapEntries = - overlapSet.entrySet().iterator(); overlapEntries.hasNext();) { - Entry entry = overlapEntries.next(); - currentKey = entry.getKey(); - updatedCapacity = Resources.subtract(entry.getValue(), totCap); - // update each entry between start and end key - cumulativeCapacity.put(currentKey, updatedCapacity); - } - // Remove the first overlap entry if it is same as previous after - // updation - Long firstKey = overlapSet.firstKey(); - if (isSameAsPrevious(firstKey, overlapSet.get(firstKey))) { - cumulativeCapacity.remove(firstKey); - } - // Remove the next entry if it is same as end entry after updation - if ((currentKey != -1) && (isSameAsNext(currentKey, updatedCapacity))) { - cumulativeCapacity.remove(cumulativeCapacity.higherKey(currentKey)); - } + + NavigableMap removeInt = new TreeMap(); + removeInt.put(reservationInterval.getStartTime(), totCap); + removeInt.put(reservationInterval.getEndTime(), ZERO_RESOURCE); + try { + cumulativeCapacity = + merge(resourceCalculator, totCap, cumulativeCapacity, removeInt, + Long.MIN_VALUE, Long.MAX_VALUE, RLEOperator.subtract); + } catch (PlanningException e) { + // never happens for subtract } return true; } finally { @@ -188,9 +130,8 @@ public boolean removeInterval(ReservationInterval reservationInterval, /** * Returns the capacity, i.e. total resources allocated at the specified point - * of time + * of time. * - * @param tick the time (UTC in ms) at which the capacity is requested * @return the resources allocated at the specified time */ public Resource getCapacityAtTime(long tick) { @@ -207,7 +148,7 @@ public Resource getCapacityAtTime(long tick) { } /** - * Get the timestamp of the earliest resource allocation + * Get the timestamp of the earliest resource allocation. * * @return the timestamp of the first resource allocation */ @@ -225,17 +166,24 @@ public long getEarliestStartTime() { } /** - * Get the timestamp of the latest resource allocation + * Get the timestamp of the latest non-null resource allocation. * * @return the timestamp of the last resource allocation */ - public long getLatestEndTime() { + public long getLatestNonNullTime() { readLock.lock(); try { if (cumulativeCapacity.isEmpty()) { return -1; } else { - return cumulativeCapacity.lastKey(); + // the last entry might contain null (to terminate + // the sequence)... return previous one. + Entry last = cumulativeCapacity.lastEntry(); + if (last.getValue() == null) { + return cumulativeCapacity.floorKey(last.getKey() - 1); + } else { + return last.getKey(); + } } } finally { readLock.unlock(); @@ -243,7 +191,7 @@ public long getLatestEndTime() { } /** - * Returns true if there are no non-zero entries + * Returns true if there are no non-zero entries. * * @return true if there are no allocations or false otherwise */ @@ -253,9 +201,11 @@ public boolean isEmpty() { if (cumulativeCapacity.isEmpty()) { return true; } - // Deletion leaves a single zero entry so check for that - if (cumulativeCapacity.size() == 1) { - return cumulativeCapacity.firstEntry().getValue().equals(ZERO_RESOURCE); + // Deletion leaves a single zero entry with a null at the end so check for + // that + if (cumulativeCapacity.size() == 2) { + return cumulativeCapacity.firstEntry().getValue().equals(ZERO_RESOURCE) + && cumulativeCapacity.lastEntry().getValue() == null; } return false; } finally { @@ -286,7 +236,7 @@ public String toString() { /** * Returns the JSON string representation of the current resources allocated - * over time + * over time. * * @return the JSON string representation of the current resources allocated * over time @@ -314,7 +264,7 @@ public String toMemJSONString() { /** * Returns the representation of the current resources allocated over time as - * an interval map. + * an interval map (in the defined non-null range). * * @return the representation of the current resources allocated over time as * an interval map. @@ -334,7 +284,7 @@ public Map toIntervalMap() { Map.Entry lastEntry = null; for (Map.Entry entry : cumulativeCapacity.entrySet()) { - if (lastEntry != null) { + if (lastEntry != null && entry.getValue() != null) { ReservationInterval interval = new ReservationInterval(lastEntry.getKey(), entry.getKey()); Resource resource = lastEntry.getValue(); @@ -348,7 +298,235 @@ public Map toIntervalMap() { } finally { readLock.unlock(); } + } + + public NavigableMap getCumulative() { + readLock.lock(); + try { + return cumulativeCapacity; + } finally { + readLock.unlock(); + } + } + + /** + * Merges the range start to end of two {@code RLESparseResourceAllocation} + * using a given {@code RLEOperator}. + * + * @param resCalc the resource calculator + * @param clusterResource the total cluster resources (for DRF) + * @param a the left operand + * @param b the right operand + * @param operator the operator to be applied during merge + * @param start the start-time of the range to be considered + * @param end the end-time of the range to be considered + * @return the a merged RLESparseResourceAllocation, produced by applying + * "operator" to "a" and "b" + * @throws PlanningException in case the operator is subtractTestPositive and + * the result would contain a negative value + */ + public static RLESparseResourceAllocation merge(ResourceCalculator resCalc, + Resource clusterResource, RLESparseResourceAllocation a, + RLESparseResourceAllocation b, RLEOperator operator, long start, long end) + throws PlanningException { + NavigableMap cumA = + a.getRangeOverlapping(start, end).getCumulative(); + NavigableMap cumB = + b.getRangeOverlapping(start, end).getCumulative(); + NavigableMap out = + merge(resCalc, clusterResource, cumA, cumB, start, end, operator); + return new RLESparseResourceAllocation(out, resCalc); + } + + private static NavigableMap merge(ResourceCalculator resCalc, + Resource clusterResource, NavigableMap a, + NavigableMap b, long start, long end, + RLEOperator operator) throws PlanningException { + + // handle special cases of empty input + if (a == null || a.isEmpty()) { + if (operator == RLEOperator.subtract + || operator == RLEOperator.subtractTestNonNegative) { + return negate(operator, b); + } else { + return b; + } + } + if (b == null || b.isEmpty()) { + return a; + } + + // define iterators and support variables + Iterator> aIt = a.entrySet().iterator(); + Iterator> bIt = b.entrySet().iterator(); + Entry curA = aIt.next(); + Entry curB = bIt.next(); + Entry lastA = null; + Entry lastB = null; + boolean aIsDone = false; + boolean bIsDone = false; + + TreeMap out = new TreeMap(); + + while (!(curA.equals(lastA) && curB.equals(lastB))) { + + Resource outRes; + long time = -1; + + // curA is smaller than curB + if (bIsDone || (curA.getKey() < curB.getKey() && !aIsDone)) { + outRes = combineValue(operator, resCalc, clusterResource, curA, lastB); + time = (curA.getKey() < start) ? start : curA.getKey(); + lastA = curA; + if (aIt.hasNext()) { + curA = aIt.next(); + } else { + aIsDone = true; + } + + } else { + // curB is smaller than curA + if (aIsDone || (curA.getKey() > curB.getKey() && !bIsDone)) { + outRes = + combineValue(operator, resCalc, clusterResource, lastA, curB); + time = (curB.getKey() < start) ? start : curB.getKey(); + lastB = curB; + if (bIt.hasNext()) { + curB = bIt.next(); + } else { + bIsDone = true; + } + + } else { + // curA is equal to curB + outRes = combineValue(operator, resCalc, clusterResource, curA, curB); + time = (curA.getKey() < start) ? start : curA.getKey(); + lastA = curA; + if (aIt.hasNext()) { + curA = aIt.next(); + } else { + aIsDone = true; + } + lastB = curB; + if (bIt.hasNext()) { + curB = bIt.next(); + } else { + bIsDone = true; + } + } + } + + // add to out if not redundant + addIfNeeded(out, time, outRes); + } + addIfNeeded(out, end, null); + + return out; + } + + private static NavigableMap negate(RLEOperator operator, + NavigableMap a) throws PlanningException { + + TreeMap out = new TreeMap(); + for (Entry e : a.entrySet()) { + Resource val = Resources.negate(e.getValue()); + // test for negative value and throws + if (operator == RLEOperator.subtractTestNonNegative + && (Resources.fitsIn(val, ZERO_RESOURCE) && + !Resources.equals(val, ZERO_RESOURCE))) { + throw new PlanningException( + "RLESparseResourceAllocation: merge failed as the " + + "resulting RLESparseResourceAllocation would be negative"); + } + out.put(e.getKey(), val); + } + + return out; + } + + private static void addIfNeeded(TreeMap out, long time, + Resource outRes) { + + if (out.isEmpty() || (out.lastEntry() != null && outRes == null) + || !Resources.equals(out.lastEntry().getValue(), outRes)) { + out.put(time, outRes); + } } + private static Resource combineValue(RLEOperator op, + ResourceCalculator resCalc, Resource clusterResource, + Entry eA, Entry eB) + throws PlanningException { + + // deal with nulls + if (eA == null || eA.getValue() == null) { + if (eB == null || eB.getValue() == null) { + return null; + } + if (op == RLEOperator.subtract) { + return Resources.negate(eB.getValue()); + } else { + return eB.getValue(); + } + } + if (eB == null || eB.getValue() == null) { + return eA.getValue(); + } + + Resource a = eA.getValue(); + Resource b = eB.getValue(); + switch (op) { + case add: + return Resources.add(a, b); + case subtract: + return Resources.subtract(a, b); + case subtractTestNonNegative: + if (!Resources.fitsIn(b, a)) { + throw new PlanningException( + "RLESparseResourceAllocation: merge failed as the " + + "resulting RLESparseResourceAllocation would be negative"); + } else { + return Resources.subtract(a, b); + } + case min: + return Resources.min(resCalc, clusterResource, a, b); + case max: + return Resources.max(resCalc, clusterResource, a, b); + default: + return null; + } + + } + + public RLESparseResourceAllocation getRangeOverlapping(long start, long end) { + readLock.lock(); + try { + NavigableMap a = this.getCumulative(); + + if (a != null && !a.isEmpty()) { + // include the portion of previous entry that overlaps start + if (start > a.firstKey()) { + long previous = a.floorKey(start); + a = a.tailMap(previous, true); + } + a = a.headMap(end, true); + } + RLESparseResourceAllocation ret = + new RLESparseResourceAllocation(a, resourceCalculator); + return ret; + } finally { + readLock.unlock(); + } + + } + + /** + * The set of operators that can be applied to two + * {@code RLESparseResourceAllocation} during a merge operation. + */ + public enum RLEOperator { + add, subtract, min, max, subtractTestNonNegative + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java index 342c2e7a504..d05b0ef192d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java @@ -95,8 +95,7 @@ public RLESparseResourceAllocation computeJobAllocation(Plan plan, // Create the allocations data structure RLESparseResourceAllocation allocations = - new RLESparseResourceAllocation(plan.getResourceCalculator(), - plan.getMinimumAllocation()); + new RLESparseResourceAllocation(plan.getResourceCalculator()); // Get a reverse iterator for the set of stages ListIterator li = @@ -219,8 +218,7 @@ protected void initialize(Plan plan, ReservationDefinition reservation) { // Initialize the plan modifications planModifications = - new RLESparseResourceAllocation(plan.getResourceCalculator(), - plan.getMinimumAllocation()); + new RLESparseResourceAllocation(plan.getResourceCalculator()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java index 4b5763d9200..04cce7ba5b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java @@ -69,8 +69,7 @@ public Map computeStageAllocation( // Create allocationRequestsearlies RLESparseResourceAllocation allocationRequests = - new RLESparseResourceAllocation(plan.getResourceCalculator(), - plan.getMinimumAllocation()); + new RLESparseResourceAllocation(plan.getResourceCalculator()); // Initialize parameters long duration = stepRoundUp(rr.getDuration(), step); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java index f0cc49ca671..85fafa7ecec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java @@ -17,17 +17,25 @@ *******************************************************************************/ package org.apache.hadoop.yarn.server.resourcemanager.reservation; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.Set; +import java.util.TreeMap; import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,13 +45,249 @@ public class TestRLESparseResourceAllocation { private static final Logger LOG = LoggerFactory .getLogger(TestRLESparseResourceAllocation.class); + @Test + public void testMergeAdd() throws PlanningException { + + TreeMap a = new TreeMap<>(); + TreeMap b = new TreeMap<>(); + + setupArrays(a, b); + + RLESparseResourceAllocation rleA = + new RLESparseResourceAllocation(a, new DefaultResourceCalculator()); + RLESparseResourceAllocation rleB = + new RLESparseResourceAllocation(b, new DefaultResourceCalculator()); + + RLESparseResourceAllocation out = + RLESparseResourceAllocation.merge(new DefaultResourceCalculator(), + Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB, + RLEOperator.add, 18, 45); + + System.out.println(out); + + long[] time = { 18, 20, 22, 30, 33, 40, 43, 45 }; + int[] alloc = { 10, 15, 20, 25, 30, 40, 30 }; + + validate(out, time, alloc); + } + + @Test + public void testMergeMin() throws PlanningException { + + TreeMap a = new TreeMap<>(); + TreeMap b = new TreeMap<>(); + + setupArrays(a, b); + + RLESparseResourceAllocation rleA = + new RLESparseResourceAllocation(a, new DefaultResourceCalculator()); + RLESparseResourceAllocation rleB = + new RLESparseResourceAllocation(b, new DefaultResourceCalculator()); + + RLESparseResourceAllocation out = + RLESparseResourceAllocation.merge(new DefaultResourceCalculator(), + Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB, + RLEOperator.min, 0, 60); + + System.out.println(out); + + long[] time = { 10, 22, 33, 40, 43, 50, 60 }; + int[] alloc = { 5, 10, 15, 20, 10, 0 }; + + validate(out, time, alloc); + + } + + @Test + public void testMergeMax() throws PlanningException { + + TreeMap a = new TreeMap<>(); + TreeMap b = new TreeMap<>(); + + setupArrays(a, b); + + RLESparseResourceAllocation rleA = + new RLESparseResourceAllocation(a, new DefaultResourceCalculator()); + RLESparseResourceAllocation rleB = + new RLESparseResourceAllocation(b, new DefaultResourceCalculator()); + + RLESparseResourceAllocation out = + RLESparseResourceAllocation.merge(new DefaultResourceCalculator(), + Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB, + RLEOperator.max, 0, 60); + + System.out.println(out); + + long[] time = { 10, 20, 30, 40, 50, 60 }; + int[] alloc = { 5, 10, 15, 20, 10 }; + + validate(out, time, alloc); + + } + + @Test + public void testMergeSubtract() throws PlanningException { + + TreeMap a = new TreeMap<>(); + TreeMap b = new TreeMap<>(); + + setupArrays(a, b); + + RLESparseResourceAllocation rleA = + new RLESparseResourceAllocation(a, new DefaultResourceCalculator()); + RLESparseResourceAllocation rleB = + new RLESparseResourceAllocation(b, new DefaultResourceCalculator()); + + RLESparseResourceAllocation out = + RLESparseResourceAllocation.merge(new DefaultResourceCalculator(), + Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB, + RLEOperator.subtract, 0, 60); + + System.out.println(out); + + long[] time = { 10, 11, 20, 22, 30, 33, 43, 50, 60 }; + int[] alloc = { 5, 0, 5, 0, 5, 0, 10, -10 }; + + validate(out, time, alloc); + + } + + @Test + public void testMergesubtractTestNonNegative() throws PlanningException { + + // starting with default array example + TreeMap a = new TreeMap<>(); + TreeMap b = new TreeMap<>(); + + setupArrays(a, b); + + RLESparseResourceAllocation rleA = + new RLESparseResourceAllocation(a, new DefaultResourceCalculator()); + RLESparseResourceAllocation rleB = + new RLESparseResourceAllocation(b, new DefaultResourceCalculator()); + + try { + RLESparseResourceAllocation out = + RLESparseResourceAllocation.merge(new DefaultResourceCalculator(), + Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB, + RLEOperator.subtractTestNonNegative, 0, 60); + fail(); + } catch (PlanningException pe) { + // Expected! + } + + // NOTE a is empty!! so the subtraction is implicitly considered negative + // and the test should fail + + a = new TreeMap<>(); + b = new TreeMap<>(); + b.put(11L, Resource.newInstance(5, 6)); + + rleA = new RLESparseResourceAllocation(a, new DefaultResourceCalculator()); + rleB = new RLESparseResourceAllocation(b, new DefaultResourceCalculator()); + + try { + RLESparseResourceAllocation out = + RLESparseResourceAllocation.merge(new DefaultResourceCalculator(), + Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB, + RLEOperator.subtractTestNonNegative, 0, 60); + fail(); + } catch (PlanningException pe) { + // Expected! + } + + // Testing that the subtractTestNonNegative detects problems even if only one + // of the resource dimensions is "<0" + a.put(10L, Resource.newInstance(10, 5)); + b.put(11L, Resource.newInstance(5, 6)); + + rleA = new RLESparseResourceAllocation(a, new DefaultResourceCalculator()); + rleB = new RLESparseResourceAllocation(b, new DefaultResourceCalculator()); + + try { + RLESparseResourceAllocation out = + RLESparseResourceAllocation.merge(new DefaultResourceCalculator(), + Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB, + RLEOperator.subtractTestNonNegative, 0, 60); + fail(); + } catch (PlanningException pe) { + // Expected! + } + + // try with reverse setting + a.put(10L, Resource.newInstance(5, 10)); + b.put(11L, Resource.newInstance(6, 5)); + + rleA = new RLESparseResourceAllocation(a, new DefaultResourceCalculator()); + rleB = new RLESparseResourceAllocation(b, new DefaultResourceCalculator()); + + try { + RLESparseResourceAllocation out = + RLESparseResourceAllocation.merge(new DefaultResourceCalculator(), + Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB, + RLEOperator.subtractTestNonNegative, 0, 60); + fail(); + } catch (PlanningException pe) { + // Expected! + } + + // trying a case that should work + a.put(10L, Resource.newInstance(10, 6)); + b.put(11L, Resource.newInstance(5, 6)); + + rleA = new RLESparseResourceAllocation(a, new DefaultResourceCalculator()); + rleB = new RLESparseResourceAllocation(b, new DefaultResourceCalculator()); + + RLESparseResourceAllocation out = + RLESparseResourceAllocation.merge(new DefaultResourceCalculator(), + Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB, + RLEOperator.subtractTestNonNegative, 0, 60); + + } + + @Test + @Ignore + public void testMergeSpeed() throws PlanningException { + + for (int j = 0; j < 100; j++) { + TreeMap a = new TreeMap<>(); + TreeMap b = new TreeMap<>(); + Random rand = new Random(); + long startA = 0; + long startB = 0; + + for (int i = 0; i < 1000 + rand.nextInt(9000); i++) { + startA += rand.nextInt(100); + startB += rand.nextInt(100); + a.put(startA, + Resource.newInstance(rand.nextInt(10240), rand.nextInt(10))); + b.put(startB, + Resource.newInstance(rand.nextInt(10240), rand.nextInt(10))); + } + + RLESparseResourceAllocation rleA = + new RLESparseResourceAllocation(a, new DefaultResourceCalculator()); + RLESparseResourceAllocation rleB = + new RLESparseResourceAllocation(b, new DefaultResourceCalculator()); + + long start = System.currentTimeMillis(); + RLESparseResourceAllocation out = + RLESparseResourceAllocation.merge(new DefaultResourceCalculator(), + Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB, + RLEOperator.add, Long.MIN_VALUE, Long.MAX_VALUE); + long end = System.currentTimeMillis(); + + System.out.println(" Took: " + (end - start) + "ms "); + } + + } + @Test public void testBlocks() { ResourceCalculator resCalc = new DefaultResourceCalculator(); - Resource minAlloc = Resource.newInstance(1, 1); RLESparseResourceAllocation rleSparseVector = - new RLESparseResourceAllocation(resCalc, minAlloc); + new RLESparseResourceAllocation(resCalc); int[] alloc = { 10, 10, 10, 10, 10, 10 }; int start = 100; Set> inputs = @@ -75,12 +319,63 @@ public void testBlocks() { } @Test - public void testSteps() { + public void testPartialRemoval() { ResourceCalculator resCalc = new DefaultResourceCalculator(); - Resource minAlloc = Resource.newInstance(1, 1); RLESparseResourceAllocation rleSparseVector = - new RLESparseResourceAllocation(resCalc, minAlloc); + new RLESparseResourceAllocation(resCalc); + + ReservationInterval riAdd = new ReservationInterval(10, 20); + Resource rr = Resource.newInstance(1024 * 100, 100); + + ReservationInterval riAdd2 = new ReservationInterval(20, 30); + + Resource rr2 = Resource.newInstance(1024 * 200, 200); + + ReservationInterval riRemove = new ReservationInterval(12, 25); + // same if we use this + // ReservationRequest rrRemove = + // ReservationRequest.newInstance(Resource.newInstance(1024, 1), 100, 1,6); + LOG.info(rleSparseVector.toString()); + + rleSparseVector.addInterval(riAdd, rr); + rleSparseVector.addInterval(riAdd2, rr2); + LOG.info(rleSparseVector.toString()); + + rleSparseVector.removeInterval(riRemove, rr); + LOG.info(rleSparseVector.toString()); + + // Current bug prevents this to pass. The RLESparseResourceAllocation + // does not handle removal of "partial" + // allocations correctly. + Assert.assertEquals(102400, rleSparseVector.getCapacityAtTime(10) + .getMemory()); + Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(13).getMemory()); + Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(19).getMemory()); + Assert.assertEquals(102400, rleSparseVector.getCapacityAtTime(21) + .getMemory()); + Assert.assertEquals(2 * 102400, rleSparseVector.getCapacityAtTime(26) + .getMemory()); + + ReservationInterval riRemove2 = new ReservationInterval(9, 13); + rleSparseVector.removeInterval(riRemove2, rr); + LOG.info(rleSparseVector.toString()); + + Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(11).getMemory()); + Assert.assertEquals(-102400, rleSparseVector.getCapacityAtTime(9) + .getMemory()); + Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(13).getMemory()); + Assert.assertEquals(102400, rleSparseVector.getCapacityAtTime(20) + .getMemory()); + + } + + @Test + public void testSteps() { + ResourceCalculator resCalc = new DefaultResourceCalculator(); + + RLESparseResourceAllocation rleSparseVector = + new RLESparseResourceAllocation(resCalc); int[] alloc = { 10, 10, 10, 10, 10, 10 }; int start = 100; Set> inputs = @@ -102,7 +397,7 @@ public void testSteps() { Assert.assertEquals(Resource.newInstance(0, 0), rleSparseVector.getCapacityAtTime(start + alloc.length + 2)); for (Entry ip : inputs) { - rleSparseVector.removeInterval(ip.getKey(),ip.getValue()); + rleSparseVector.removeInterval(ip.getKey(), ip.getValue()); } LOG.info(rleSparseVector.toString()); for (int i = 0; i < alloc.length; i++) { @@ -115,10 +410,9 @@ public void testSteps() { @Test public void testSkyline() { ResourceCalculator resCalc = new DefaultResourceCalculator(); - Resource minAlloc = Resource.newInstance(1, 1); RLESparseResourceAllocation rleSparseVector = - new RLESparseResourceAllocation(resCalc, minAlloc); + new RLESparseResourceAllocation(resCalc); int[] alloc = { 0, 5, 10, 10, 5, 0 }; int start = 100; Set> inputs = @@ -151,11 +445,10 @@ public void testSkyline() { } @Test - public void testZeroAlloaction() { + public void testZeroAllocation() { ResourceCalculator resCalc = new DefaultResourceCalculator(); - Resource minAlloc = Resource.newInstance(1, 1); RLESparseResourceAllocation rleSparseVector = - new RLESparseResourceAllocation(resCalc, minAlloc); + new RLESparseResourceAllocation(resCalc); rleSparseVector.addInterval(new ReservationInterval(0, Long.MAX_VALUE), Resource.newInstance(0, 0)); LOG.info(rleSparseVector.toString()); @@ -167,9 +460,8 @@ public void testZeroAlloaction() { @Test public void testToIntervalMap() { ResourceCalculator resCalc = new DefaultResourceCalculator(); - Resource minAlloc = Resource.newInstance(1, 1); RLESparseResourceAllocation rleSparseVector = - new RLESparseResourceAllocation(resCalc, minAlloc); + new RLESparseResourceAllocation(resCalc); Map mapAllocations; // Check empty @@ -186,8 +478,7 @@ public void testToIntervalMap() { } mapAllocations = rleSparseVector.toIntervalMap(); Assert.assertTrue(mapAllocations.size() == 5); - for (Entry entry : mapAllocations - .entrySet()) { + for (Entry entry : mapAllocations.entrySet()) { ReservationInterval interval = entry.getKey(); Resource resource = entry.getValue(); if (interval.getStartTime() == 101L) { @@ -211,8 +502,38 @@ public void testToIntervalMap() { } } - private Map generateAllocation( - int startTime, int[] alloc, boolean isStep) { + private void setupArrays(TreeMap a, TreeMap b) { + a.put(10L, Resource.newInstance(5, 5)); + a.put(20L, Resource.newInstance(10, 10)); + a.put(30L, Resource.newInstance(15, 15)); + a.put(40L, Resource.newInstance(20, 20)); + a.put(50L, Resource.newInstance(0, 0)); + + b.put(11L, Resource.newInstance(5, 5)); + b.put(22L, Resource.newInstance(10, 10)); + b.put(33L, Resource.newInstance(15, 15)); + b.put(40L, Resource.newInstance(20, 20)); + b.put(42L, Resource.newInstance(20, 20)); + b.put(43L, Resource.newInstance(10, 10)); + } + + private void validate(RLESparseResourceAllocation out, long[] time, + int[] alloc) { + int i = 0; + for (Entry res : out.getCumulative().entrySet()) { + assertEquals(time[i], ((long) res.getKey())); + if (i > alloc.length - 1) { + assertNull(res.getValue()); + } else { + assertEquals(alloc[i], res.getValue().getVirtualCores()); + } + i++; + } + assertEquals(time.length, i); + } + + private Map generateAllocation(int startTime, + int[] alloc, boolean isStep) { Map req = new HashMap(); int numContainers = 0;