YARN-3454. Add efficient merge operation to RLESparseResourceAllocation (Carlo Curino via asuresh)

This commit is contained in:
Arun Suresh 2015-11-21 09:59:41 -08:00
parent a30eccb38c
commit da1016365a
7 changed files with 627 additions and 128 deletions

View File

@ -563,6 +563,9 @@ Release 2.8.0 - UNRELEASED
YARN-4279. Mark ApplicationId and ApplicationAttemptId static methods as @Public, YARN-4279. Mark ApplicationId and ApplicationAttemptId static methods as @Public,
@Unstable. (stevel) @Unstable. (stevel)
YARN-3454. Add efficient merge operation to RLESparseResourceAllocation
(Carlo Curino via asuresh)
OPTIMIZATIONS OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -108,7 +108,7 @@ public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
this.resCalc = resCalc; this.resCalc = resCalc;
this.minAlloc = minAlloc; this.minAlloc = minAlloc;
this.maxAlloc = maxAlloc; this.maxAlloc = maxAlloc;
this.rleSparseVector = new RLESparseResourceAllocation(resCalc, minAlloc); this.rleSparseVector = new RLESparseResourceAllocation(resCalc);
this.queueName = queueName; this.queueName = queueName;
this.replanner = replanner; this.replanner = replanner;
this.getMoveOnExpiry = getMoveOnExpiry; this.getMoveOnExpiry = getMoveOnExpiry;
@ -129,7 +129,7 @@ private void incrementAllocation(ReservationAllocation reservation) {
String user = reservation.getUser(); String user = reservation.getUser();
RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user); RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
if (resAlloc == null) { if (resAlloc == null) {
resAlloc = new RLESparseResourceAllocation(resCalc, minAlloc); resAlloc = new RLESparseResourceAllocation(resCalc);
userResourceAlloc.put(user, resAlloc); userResourceAlloc.put(user, resAlloc);
} }
for (Map.Entry<ReservationInterval, Resource> r : allocationRequests for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
@ -492,7 +492,7 @@ public long getEarliestStartTime() {
public long getLastEndTime() { public long getLastEndTime() {
readLock.lock(); readLock.lock();
try { try {
return rleSparseVector.getLatestEndTime(); return rleSparseVector.getLatestNonNullTime();
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }

View File

@ -67,7 +67,7 @@ public InMemoryReservationAllocation(ReservationId reservationID,
this.allocationRequests = allocations; this.allocationRequests = allocations;
this.planName = planName; this.planName = planName;
this.hasGang = hasGang; this.hasGang = hasGang;
resourcesOverTime = new RLESparseResourceAllocation(calculator, minAlloc); resourcesOverTime = new RLESparseResourceAllocation(calculator);
for (Map.Entry<ReservationInterval, Resource> r : allocations for (Map.Entry<ReservationInterval, Resource> r : allocations
.entrySet()) { .entrySet()) {
resourcesOverTime.addInterval(r.getKey(), r.getValue()); resourcesOverTime.addInterval(r.getKey(), r.getValue());

View File

@ -24,13 +24,12 @@
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -43,9 +42,9 @@
public class RLESparseResourceAllocation { public class RLESparseResourceAllocation {
private static final int THRESHOLD = 100; private static final int THRESHOLD = 100;
private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); private static final Resource ZERO_RESOURCE = Resources.none();
private TreeMap<Long, Resource> cumulativeCapacity = private NavigableMap<Long, Resource> cumulativeCapacity =
new TreeMap<Long, Resource>(); new TreeMap<Long, Resource>();
private final ReentrantReadWriteLock readWriteLock = private final ReentrantReadWriteLock readWriteLock =
@ -54,26 +53,20 @@ public class RLESparseResourceAllocation {
private final Lock writeLock = readWriteLock.writeLock(); private final Lock writeLock = readWriteLock.writeLock();
private final ResourceCalculator resourceCalculator; private final ResourceCalculator resourceCalculator;
private final Resource minAlloc;
public RLESparseResourceAllocation(ResourceCalculator resourceCalculator, public RLESparseResourceAllocation(ResourceCalculator resourceCalculator) {
Resource minAlloc) {
this.resourceCalculator = resourceCalculator; this.resourceCalculator = resourceCalculator;
this.minAlloc = minAlloc;
} }
private boolean isSameAsPrevious(Long key, Resource capacity) { public RLESparseResourceAllocation(NavigableMap<Long, Resource> out,
Entry<Long, Resource> previous = cumulativeCapacity.lowerEntry(key); ResourceCalculator resourceCalculator) {
return (previous != null && previous.getValue().equals(capacity)); // miss check for repeated entries
} this.cumulativeCapacity = out;
this.resourceCalculator = resourceCalculator;
private boolean isSameAsNext(Long key, Resource capacity) {
Entry<Long, Resource> next = cumulativeCapacity.higherEntry(key);
return (next != null && next.getValue().equals(capacity));
} }
/** /**
* Add a resource for the specified interval * Add a resource for the specified interval.
* *
* @param reservationInterval the interval for which the resource is to be * @param reservationInterval the interval for which the resource is to be
* added * added
@ -87,48 +80,15 @@ public boolean addInterval(ReservationInterval reservationInterval,
} }
writeLock.lock(); writeLock.lock();
try { try {
long startKey = reservationInterval.getStartTime(); NavigableMap<Long, Resource> addInt = new TreeMap<Long, Resource>();
long endKey = reservationInterval.getEndTime(); addInt.put(reservationInterval.getStartTime(), totCap);
NavigableMap<Long, Resource> ticks = addInt.put(reservationInterval.getEndTime(), ZERO_RESOURCE);
cumulativeCapacity.headMap(endKey, false); try {
if (ticks != null && !ticks.isEmpty()) { cumulativeCapacity =
Resource updatedCapacity = Resource.newInstance(0, 0); merge(resourceCalculator, totCap, cumulativeCapacity, addInt,
Entry<Long, Resource> lowEntry = ticks.floorEntry(startKey); Long.MIN_VALUE, Long.MAX_VALUE, RLEOperator.add);
if (lowEntry == null) { } catch (PlanningException e) {
// This is the earliest starting interval // never happens for add
cumulativeCapacity.put(startKey, totCap);
} else {
updatedCapacity = Resources.add(lowEntry.getValue(), totCap);
// Add a new tick only if the updated value is different
// from the previous tick
if ((startKey == lowEntry.getKey())
&& (isSameAsPrevious(lowEntry.getKey(), updatedCapacity))) {
cumulativeCapacity.remove(lowEntry.getKey());
} else {
cumulativeCapacity.put(startKey, updatedCapacity);
}
}
// Increase all the capacities of overlapping intervals
Set<Entry<Long, Resource>> overlapSet =
ticks.tailMap(startKey, false).entrySet();
for (Entry<Long, Resource> entry : overlapSet) {
updatedCapacity = Resources.add(entry.getValue(), totCap);
entry.setValue(updatedCapacity);
}
} else {
// This is the first interval to be added
cumulativeCapacity.put(startKey, totCap);
}
Resource nextTick = cumulativeCapacity.get(endKey);
if (nextTick != null) {
// If there is overlap, remove the duplicate entry
if (isSameAsPrevious(endKey, nextTick)) {
cumulativeCapacity.remove(endKey);
}
} else {
// Decrease capacity as this is end of the interval
cumulativeCapacity.put(endKey, Resources.subtract(cumulativeCapacity
.floorEntry(endKey).getValue(), totCap));
} }
return true; return true;
} finally { } finally {
@ -137,7 +97,7 @@ public boolean addInterval(ReservationInterval reservationInterval,
} }
/** /**
* Removes a resource for the specified interval * Removes a resource for the specified interval.
* *
* @param reservationInterval the interval for which the resource is to be * @param reservationInterval the interval for which the resource is to be
* removed * removed
@ -151,34 +111,16 @@ public boolean removeInterval(ReservationInterval reservationInterval,
} }
writeLock.lock(); writeLock.lock();
try { try {
long startKey = reservationInterval.getStartTime();
long endKey = reservationInterval.getEndTime(); NavigableMap<Long, Resource> removeInt = new TreeMap<Long, Resource>();
// update the start key removeInt.put(reservationInterval.getStartTime(), totCap);
NavigableMap<Long, Resource> ticks = removeInt.put(reservationInterval.getEndTime(), ZERO_RESOURCE);
cumulativeCapacity.headMap(endKey, false); try {
// Decrease all the capacities of overlapping intervals cumulativeCapacity =
SortedMap<Long, Resource> overlapSet = ticks.tailMap(startKey); merge(resourceCalculator, totCap, cumulativeCapacity, removeInt,
if (overlapSet != null && !overlapSet.isEmpty()) { Long.MIN_VALUE, Long.MAX_VALUE, RLEOperator.subtract);
Resource updatedCapacity = Resource.newInstance(0, 0); } catch (PlanningException e) {
long currentKey = -1; // never happens for subtract
for (Iterator<Entry<Long, Resource>> overlapEntries =
overlapSet.entrySet().iterator(); overlapEntries.hasNext();) {
Entry<Long, Resource> entry = overlapEntries.next();
currentKey = entry.getKey();
updatedCapacity = Resources.subtract(entry.getValue(), totCap);
// update each entry between start and end key
cumulativeCapacity.put(currentKey, updatedCapacity);
}
// Remove the first overlap entry if it is same as previous after
// updation
Long firstKey = overlapSet.firstKey();
if (isSameAsPrevious(firstKey, overlapSet.get(firstKey))) {
cumulativeCapacity.remove(firstKey);
}
// Remove the next entry if it is same as end entry after updation
if ((currentKey != -1) && (isSameAsNext(currentKey, updatedCapacity))) {
cumulativeCapacity.remove(cumulativeCapacity.higherKey(currentKey));
}
} }
return true; return true;
} finally { } finally {
@ -188,9 +130,8 @@ public boolean removeInterval(ReservationInterval reservationInterval,
/** /**
* Returns the capacity, i.e. total resources allocated at the specified point * Returns the capacity, i.e. total resources allocated at the specified point
* of time * of time.
* *
* @param tick the time (UTC in ms) at which the capacity is requested
* @return the resources allocated at the specified time * @return the resources allocated at the specified time
*/ */
public Resource getCapacityAtTime(long tick) { public Resource getCapacityAtTime(long tick) {
@ -207,7 +148,7 @@ public Resource getCapacityAtTime(long tick) {
} }
/** /**
* Get the timestamp of the earliest resource allocation * Get the timestamp of the earliest resource allocation.
* *
* @return the timestamp of the first resource allocation * @return the timestamp of the first resource allocation
*/ */
@ -225,17 +166,24 @@ public long getEarliestStartTime() {
} }
/** /**
* Get the timestamp of the latest resource allocation * Get the timestamp of the latest non-null resource allocation.
* *
* @return the timestamp of the last resource allocation * @return the timestamp of the last resource allocation
*/ */
public long getLatestEndTime() { public long getLatestNonNullTime() {
readLock.lock(); readLock.lock();
try { try {
if (cumulativeCapacity.isEmpty()) { if (cumulativeCapacity.isEmpty()) {
return -1; return -1;
} else { } else {
return cumulativeCapacity.lastKey(); // the last entry might contain null (to terminate
// the sequence)... return previous one.
Entry<Long, Resource> last = cumulativeCapacity.lastEntry();
if (last.getValue() == null) {
return cumulativeCapacity.floorKey(last.getKey() - 1);
} else {
return last.getKey();
}
} }
} finally { } finally {
readLock.unlock(); readLock.unlock();
@ -243,7 +191,7 @@ public long getLatestEndTime() {
} }
/** /**
* Returns true if there are no non-zero entries * Returns true if there are no non-zero entries.
* *
* @return true if there are no allocations or false otherwise * @return true if there are no allocations or false otherwise
*/ */
@ -253,9 +201,11 @@ public boolean isEmpty() {
if (cumulativeCapacity.isEmpty()) { if (cumulativeCapacity.isEmpty()) {
return true; return true;
} }
// Deletion leaves a single zero entry so check for that // Deletion leaves a single zero entry with a null at the end so check for
if (cumulativeCapacity.size() == 1) { // that
return cumulativeCapacity.firstEntry().getValue().equals(ZERO_RESOURCE); if (cumulativeCapacity.size() == 2) {
return cumulativeCapacity.firstEntry().getValue().equals(ZERO_RESOURCE)
&& cumulativeCapacity.lastEntry().getValue() == null;
} }
return false; return false;
} finally { } finally {
@ -286,7 +236,7 @@ public String toString() {
/** /**
* Returns the JSON string representation of the current resources allocated * Returns the JSON string representation of the current resources allocated
* over time * over time.
* *
* @return the JSON string representation of the current resources allocated * @return the JSON string representation of the current resources allocated
* over time * over time
@ -314,7 +264,7 @@ public String toMemJSONString() {
/** /**
* Returns the representation of the current resources allocated over time as * Returns the representation of the current resources allocated over time as
* an interval map. * an interval map (in the defined non-null range).
* *
* @return the representation of the current resources allocated over time as * @return the representation of the current resources allocated over time as
* an interval map. * an interval map.
@ -334,7 +284,7 @@ public Map<ReservationInterval, Resource> toIntervalMap() {
Map.Entry<Long, Resource> lastEntry = null; Map.Entry<Long, Resource> lastEntry = null;
for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) { for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) {
if (lastEntry != null) { if (lastEntry != null && entry.getValue() != null) {
ReservationInterval interval = ReservationInterval interval =
new ReservationInterval(lastEntry.getKey(), entry.getKey()); new ReservationInterval(lastEntry.getKey(), entry.getKey());
Resource resource = lastEntry.getValue(); Resource resource = lastEntry.getValue();
@ -348,7 +298,235 @@ public Map<ReservationInterval, Resource> toIntervalMap() {
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
}
public NavigableMap<Long, Resource> getCumulative() {
readLock.lock();
try {
return cumulativeCapacity;
} finally {
readLock.unlock();
}
}
/**
* Merges the range start to end of two {@code RLESparseResourceAllocation}
* using a given {@code RLEOperator}.
*
* @param resCalc the resource calculator
* @param clusterResource the total cluster resources (for DRF)
* @param a the left operand
* @param b the right operand
* @param operator the operator to be applied during merge
* @param start the start-time of the range to be considered
* @param end the end-time of the range to be considered
* @return the a merged RLESparseResourceAllocation, produced by applying
* "operator" to "a" and "b"
* @throws PlanningException in case the operator is subtractTestPositive and
* the result would contain a negative value
*/
public static RLESparseResourceAllocation merge(ResourceCalculator resCalc,
Resource clusterResource, RLESparseResourceAllocation a,
RLESparseResourceAllocation b, RLEOperator operator, long start, long end)
throws PlanningException {
NavigableMap<Long, Resource> cumA =
a.getRangeOverlapping(start, end).getCumulative();
NavigableMap<Long, Resource> cumB =
b.getRangeOverlapping(start, end).getCumulative();
NavigableMap<Long, Resource> out =
merge(resCalc, clusterResource, cumA, cumB, start, end, operator);
return new RLESparseResourceAllocation(out, resCalc);
}
private static NavigableMap<Long, Resource> merge(ResourceCalculator resCalc,
Resource clusterResource, NavigableMap<Long, Resource> a,
NavigableMap<Long, Resource> b, long start, long end,
RLEOperator operator) throws PlanningException {
// handle special cases of empty input
if (a == null || a.isEmpty()) {
if (operator == RLEOperator.subtract
|| operator == RLEOperator.subtractTestNonNegative) {
return negate(operator, b);
} else {
return b;
}
}
if (b == null || b.isEmpty()) {
return a;
}
// define iterators and support variables
Iterator<Entry<Long, Resource>> aIt = a.entrySet().iterator();
Iterator<Entry<Long, Resource>> bIt = b.entrySet().iterator();
Entry<Long, Resource> curA = aIt.next();
Entry<Long, Resource> curB = bIt.next();
Entry<Long, Resource> lastA = null;
Entry<Long, Resource> lastB = null;
boolean aIsDone = false;
boolean bIsDone = false;
TreeMap<Long, Resource> out = new TreeMap<Long, Resource>();
while (!(curA.equals(lastA) && curB.equals(lastB))) {
Resource outRes;
long time = -1;
// curA is smaller than curB
if (bIsDone || (curA.getKey() < curB.getKey() && !aIsDone)) {
outRes = combineValue(operator, resCalc, clusterResource, curA, lastB);
time = (curA.getKey() < start) ? start : curA.getKey();
lastA = curA;
if (aIt.hasNext()) {
curA = aIt.next();
} else {
aIsDone = true;
}
} else {
// curB is smaller than curA
if (aIsDone || (curA.getKey() > curB.getKey() && !bIsDone)) {
outRes =
combineValue(operator, resCalc, clusterResource, lastA, curB);
time = (curB.getKey() < start) ? start : curB.getKey();
lastB = curB;
if (bIt.hasNext()) {
curB = bIt.next();
} else {
bIsDone = true;
}
} else {
// curA is equal to curB
outRes = combineValue(operator, resCalc, clusterResource, curA, curB);
time = (curA.getKey() < start) ? start : curA.getKey();
lastA = curA;
if (aIt.hasNext()) {
curA = aIt.next();
} else {
aIsDone = true;
}
lastB = curB;
if (bIt.hasNext()) {
curB = bIt.next();
} else {
bIsDone = true;
}
}
}
// add to out if not redundant
addIfNeeded(out, time, outRes);
}
addIfNeeded(out, end, null);
return out;
}
private static NavigableMap<Long, Resource> negate(RLEOperator operator,
NavigableMap<Long, Resource> a) throws PlanningException {
TreeMap<Long, Resource> out = new TreeMap<Long, Resource>();
for (Entry<Long, Resource> e : a.entrySet()) {
Resource val = Resources.negate(e.getValue());
// test for negative value and throws
if (operator == RLEOperator.subtractTestNonNegative
&& (Resources.fitsIn(val, ZERO_RESOURCE) &&
!Resources.equals(val, ZERO_RESOURCE))) {
throw new PlanningException(
"RLESparseResourceAllocation: merge failed as the "
+ "resulting RLESparseResourceAllocation would be negative");
}
out.put(e.getKey(), val);
}
return out;
}
private static void addIfNeeded(TreeMap<Long, Resource> out, long time,
Resource outRes) {
if (out.isEmpty() || (out.lastEntry() != null && outRes == null)
|| !Resources.equals(out.lastEntry().getValue(), outRes)) {
out.put(time, outRes);
}
}
private static Resource combineValue(RLEOperator op,
ResourceCalculator resCalc, Resource clusterResource,
Entry<Long, Resource> eA, Entry<Long, Resource> eB)
throws PlanningException {
// deal with nulls
if (eA == null || eA.getValue() == null) {
if (eB == null || eB.getValue() == null) {
return null;
}
if (op == RLEOperator.subtract) {
return Resources.negate(eB.getValue());
} else {
return eB.getValue();
}
}
if (eB == null || eB.getValue() == null) {
return eA.getValue();
}
Resource a = eA.getValue();
Resource b = eB.getValue();
switch (op) {
case add:
return Resources.add(a, b);
case subtract:
return Resources.subtract(a, b);
case subtractTestNonNegative:
if (!Resources.fitsIn(b, a)) {
throw new PlanningException(
"RLESparseResourceAllocation: merge failed as the "
+ "resulting RLESparseResourceAllocation would be negative");
} else {
return Resources.subtract(a, b);
}
case min:
return Resources.min(resCalc, clusterResource, a, b);
case max:
return Resources.max(resCalc, clusterResource, a, b);
default:
return null;
}
}
public RLESparseResourceAllocation getRangeOverlapping(long start, long end) {
readLock.lock();
try {
NavigableMap<Long, Resource> a = this.getCumulative();
if (a != null && !a.isEmpty()) {
// include the portion of previous entry that overlaps start
if (start > a.firstKey()) {
long previous = a.floorKey(start);
a = a.tailMap(previous, true);
}
a = a.headMap(end, true);
}
RLESparseResourceAllocation ret =
new RLESparseResourceAllocation(a, resourceCalculator);
return ret;
} finally {
readLock.unlock();
}
}
/**
* The set of operators that can be applied to two
* {@code RLESparseResourceAllocation} during a merge operation.
*/
public enum RLEOperator {
add, subtract, min, max, subtractTestNonNegative
} }
} }

View File

@ -95,8 +95,7 @@ public RLESparseResourceAllocation computeJobAllocation(Plan plan,
// Create the allocations data structure // Create the allocations data structure
RLESparseResourceAllocation allocations = RLESparseResourceAllocation allocations =
new RLESparseResourceAllocation(plan.getResourceCalculator(), new RLESparseResourceAllocation(plan.getResourceCalculator());
plan.getMinimumAllocation());
// Get a reverse iterator for the set of stages // Get a reverse iterator for the set of stages
ListIterator<ReservationRequest> li = ListIterator<ReservationRequest> li =
@ -219,8 +218,7 @@ protected void initialize(Plan plan, ReservationDefinition reservation) {
// Initialize the plan modifications // Initialize the plan modifications
planModifications = planModifications =
new RLESparseResourceAllocation(plan.getResourceCalculator(), new RLESparseResourceAllocation(plan.getResourceCalculator());
plan.getMinimumAllocation());
} }

View File

@ -69,8 +69,7 @@ public Map<ReservationInterval, Resource> computeStageAllocation(
// Create allocationRequestsearlies // Create allocationRequestsearlies
RLESparseResourceAllocation allocationRequests = RLESparseResourceAllocation allocationRequests =
new RLESparseResourceAllocation(plan.getResourceCalculator(), new RLESparseResourceAllocation(plan.getResourceCalculator());
plan.getMinimumAllocation());
// Initialize parameters // Initialize parameters
long duration = stepRoundUp(rr.getDuration(), step); long duration = stepRoundUp(rr.getDuration(), step);

View File

@ -17,17 +17,25 @@
*******************************************************************************/ *******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation; package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -37,13 +45,249 @@ public class TestRLESparseResourceAllocation {
private static final Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory
.getLogger(TestRLESparseResourceAllocation.class); .getLogger(TestRLESparseResourceAllocation.class);
@Test
public void testMergeAdd() throws PlanningException {
TreeMap<Long, Resource> a = new TreeMap<>();
TreeMap<Long, Resource> b = new TreeMap<>();
setupArrays(a, b);
RLESparseResourceAllocation rleA =
new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
RLESparseResourceAllocation rleB =
new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
RLESparseResourceAllocation out =
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
RLEOperator.add, 18, 45);
System.out.println(out);
long[] time = { 18, 20, 22, 30, 33, 40, 43, 45 };
int[] alloc = { 10, 15, 20, 25, 30, 40, 30 };
validate(out, time, alloc);
}
@Test
public void testMergeMin() throws PlanningException {
TreeMap<Long, Resource> a = new TreeMap<>();
TreeMap<Long, Resource> b = new TreeMap<>();
setupArrays(a, b);
RLESparseResourceAllocation rleA =
new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
RLESparseResourceAllocation rleB =
new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
RLESparseResourceAllocation out =
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
RLEOperator.min, 0, 60);
System.out.println(out);
long[] time = { 10, 22, 33, 40, 43, 50, 60 };
int[] alloc = { 5, 10, 15, 20, 10, 0 };
validate(out, time, alloc);
}
@Test
public void testMergeMax() throws PlanningException {
TreeMap<Long, Resource> a = new TreeMap<>();
TreeMap<Long, Resource> b = new TreeMap<>();
setupArrays(a, b);
RLESparseResourceAllocation rleA =
new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
RLESparseResourceAllocation rleB =
new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
RLESparseResourceAllocation out =
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
RLEOperator.max, 0, 60);
System.out.println(out);
long[] time = { 10, 20, 30, 40, 50, 60 };
int[] alloc = { 5, 10, 15, 20, 10 };
validate(out, time, alloc);
}
@Test
public void testMergeSubtract() throws PlanningException {
TreeMap<Long, Resource> a = new TreeMap<>();
TreeMap<Long, Resource> b = new TreeMap<>();
setupArrays(a, b);
RLESparseResourceAllocation rleA =
new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
RLESparseResourceAllocation rleB =
new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
RLESparseResourceAllocation out =
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
RLEOperator.subtract, 0, 60);
System.out.println(out);
long[] time = { 10, 11, 20, 22, 30, 33, 43, 50, 60 };
int[] alloc = { 5, 0, 5, 0, 5, 0, 10, -10 };
validate(out, time, alloc);
}
@Test
public void testMergesubtractTestNonNegative() throws PlanningException {
// starting with default array example
TreeMap<Long, Resource> a = new TreeMap<>();
TreeMap<Long, Resource> b = new TreeMap<>();
setupArrays(a, b);
RLESparseResourceAllocation rleA =
new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
RLESparseResourceAllocation rleB =
new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
try {
RLESparseResourceAllocation out =
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
RLEOperator.subtractTestNonNegative, 0, 60);
fail();
} catch (PlanningException pe) {
// Expected!
}
// NOTE a is empty!! so the subtraction is implicitly considered negative
// and the test should fail
a = new TreeMap<>();
b = new TreeMap<>();
b.put(11L, Resource.newInstance(5, 6));
rleA = new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
rleB = new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
try {
RLESparseResourceAllocation out =
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
RLEOperator.subtractTestNonNegative, 0, 60);
fail();
} catch (PlanningException pe) {
// Expected!
}
// Testing that the subtractTestNonNegative detects problems even if only one
// of the resource dimensions is "<0"
a.put(10L, Resource.newInstance(10, 5));
b.put(11L, Resource.newInstance(5, 6));
rleA = new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
rleB = new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
try {
RLESparseResourceAllocation out =
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
RLEOperator.subtractTestNonNegative, 0, 60);
fail();
} catch (PlanningException pe) {
// Expected!
}
// try with reverse setting
a.put(10L, Resource.newInstance(5, 10));
b.put(11L, Resource.newInstance(6, 5));
rleA = new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
rleB = new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
try {
RLESparseResourceAllocation out =
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
RLEOperator.subtractTestNonNegative, 0, 60);
fail();
} catch (PlanningException pe) {
// Expected!
}
// trying a case that should work
a.put(10L, Resource.newInstance(10, 6));
b.put(11L, Resource.newInstance(5, 6));
rleA = new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
rleB = new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
RLESparseResourceAllocation out =
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
RLEOperator.subtractTestNonNegative, 0, 60);
}
@Test
@Ignore
public void testMergeSpeed() throws PlanningException {
for (int j = 0; j < 100; j++) {
TreeMap<Long, Resource> a = new TreeMap<>();
TreeMap<Long, Resource> b = new TreeMap<>();
Random rand = new Random();
long startA = 0;
long startB = 0;
for (int i = 0; i < 1000 + rand.nextInt(9000); i++) {
startA += rand.nextInt(100);
startB += rand.nextInt(100);
a.put(startA,
Resource.newInstance(rand.nextInt(10240), rand.nextInt(10)));
b.put(startB,
Resource.newInstance(rand.nextInt(10240), rand.nextInt(10)));
}
RLESparseResourceAllocation rleA =
new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
RLESparseResourceAllocation rleB =
new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
long start = System.currentTimeMillis();
RLESparseResourceAllocation out =
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
RLEOperator.add, Long.MIN_VALUE, Long.MAX_VALUE);
long end = System.currentTimeMillis();
System.out.println(" Took: " + (end - start) + "ms ");
}
}
@Test @Test
public void testBlocks() { public void testBlocks() {
ResourceCalculator resCalc = new DefaultResourceCalculator(); ResourceCalculator resCalc = new DefaultResourceCalculator();
Resource minAlloc = Resource.newInstance(1, 1);
RLESparseResourceAllocation rleSparseVector = RLESparseResourceAllocation rleSparseVector =
new RLESparseResourceAllocation(resCalc, minAlloc); new RLESparseResourceAllocation(resCalc);
int[] alloc = { 10, 10, 10, 10, 10, 10 }; int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100; int start = 100;
Set<Entry<ReservationInterval, Resource>> inputs = Set<Entry<ReservationInterval, Resource>> inputs =
@ -75,12 +319,63 @@ public void testBlocks() {
} }
@Test @Test
public void testSteps() { public void testPartialRemoval() {
ResourceCalculator resCalc = new DefaultResourceCalculator(); ResourceCalculator resCalc = new DefaultResourceCalculator();
Resource minAlloc = Resource.newInstance(1, 1);
RLESparseResourceAllocation rleSparseVector = RLESparseResourceAllocation rleSparseVector =
new RLESparseResourceAllocation(resCalc, minAlloc); new RLESparseResourceAllocation(resCalc);
ReservationInterval riAdd = new ReservationInterval(10, 20);
Resource rr = Resource.newInstance(1024 * 100, 100);
ReservationInterval riAdd2 = new ReservationInterval(20, 30);
Resource rr2 = Resource.newInstance(1024 * 200, 200);
ReservationInterval riRemove = new ReservationInterval(12, 25);
// same if we use this
// ReservationRequest rrRemove =
// ReservationRequest.newInstance(Resource.newInstance(1024, 1), 100, 1,6);
LOG.info(rleSparseVector.toString());
rleSparseVector.addInterval(riAdd, rr);
rleSparseVector.addInterval(riAdd2, rr2);
LOG.info(rleSparseVector.toString());
rleSparseVector.removeInterval(riRemove, rr);
LOG.info(rleSparseVector.toString());
// Current bug prevents this to pass. The RLESparseResourceAllocation
// does not handle removal of "partial"
// allocations correctly.
Assert.assertEquals(102400, rleSparseVector.getCapacityAtTime(10)
.getMemory());
Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(13).getMemory());
Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(19).getMemory());
Assert.assertEquals(102400, rleSparseVector.getCapacityAtTime(21)
.getMemory());
Assert.assertEquals(2 * 102400, rleSparseVector.getCapacityAtTime(26)
.getMemory());
ReservationInterval riRemove2 = new ReservationInterval(9, 13);
rleSparseVector.removeInterval(riRemove2, rr);
LOG.info(rleSparseVector.toString());
Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(11).getMemory());
Assert.assertEquals(-102400, rleSparseVector.getCapacityAtTime(9)
.getMemory());
Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(13).getMemory());
Assert.assertEquals(102400, rleSparseVector.getCapacityAtTime(20)
.getMemory());
}
@Test
public void testSteps() {
ResourceCalculator resCalc = new DefaultResourceCalculator();
RLESparseResourceAllocation rleSparseVector =
new RLESparseResourceAllocation(resCalc);
int[] alloc = { 10, 10, 10, 10, 10, 10 }; int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100; int start = 100;
Set<Entry<ReservationInterval, Resource>> inputs = Set<Entry<ReservationInterval, Resource>> inputs =
@ -115,10 +410,9 @@ public void testSteps() {
@Test @Test
public void testSkyline() { public void testSkyline() {
ResourceCalculator resCalc = new DefaultResourceCalculator(); ResourceCalculator resCalc = new DefaultResourceCalculator();
Resource minAlloc = Resource.newInstance(1, 1);
RLESparseResourceAllocation rleSparseVector = RLESparseResourceAllocation rleSparseVector =
new RLESparseResourceAllocation(resCalc, minAlloc); new RLESparseResourceAllocation(resCalc);
int[] alloc = { 0, 5, 10, 10, 5, 0 }; int[] alloc = { 0, 5, 10, 10, 5, 0 };
int start = 100; int start = 100;
Set<Entry<ReservationInterval, Resource>> inputs = Set<Entry<ReservationInterval, Resource>> inputs =
@ -151,11 +445,10 @@ public void testSkyline() {
} }
@Test @Test
public void testZeroAlloaction() { public void testZeroAllocation() {
ResourceCalculator resCalc = new DefaultResourceCalculator(); ResourceCalculator resCalc = new DefaultResourceCalculator();
Resource minAlloc = Resource.newInstance(1, 1);
RLESparseResourceAllocation rleSparseVector = RLESparseResourceAllocation rleSparseVector =
new RLESparseResourceAllocation(resCalc, minAlloc); new RLESparseResourceAllocation(resCalc);
rleSparseVector.addInterval(new ReservationInterval(0, Long.MAX_VALUE), rleSparseVector.addInterval(new ReservationInterval(0, Long.MAX_VALUE),
Resource.newInstance(0, 0)); Resource.newInstance(0, 0));
LOG.info(rleSparseVector.toString()); LOG.info(rleSparseVector.toString());
@ -167,9 +460,8 @@ public void testZeroAlloaction() {
@Test @Test
public void testToIntervalMap() { public void testToIntervalMap() {
ResourceCalculator resCalc = new DefaultResourceCalculator(); ResourceCalculator resCalc = new DefaultResourceCalculator();
Resource minAlloc = Resource.newInstance(1, 1);
RLESparseResourceAllocation rleSparseVector = RLESparseResourceAllocation rleSparseVector =
new RLESparseResourceAllocation(resCalc, minAlloc); new RLESparseResourceAllocation(resCalc);
Map<ReservationInterval, Resource> mapAllocations; Map<ReservationInterval, Resource> mapAllocations;
// Check empty // Check empty
@ -186,8 +478,7 @@ public void testToIntervalMap() {
} }
mapAllocations = rleSparseVector.toIntervalMap(); mapAllocations = rleSparseVector.toIntervalMap();
Assert.assertTrue(mapAllocations.size() == 5); Assert.assertTrue(mapAllocations.size() == 5);
for (Entry<ReservationInterval, Resource> entry : mapAllocations for (Entry<ReservationInterval, Resource> entry : mapAllocations.entrySet()) {
.entrySet()) {
ReservationInterval interval = entry.getKey(); ReservationInterval interval = entry.getKey();
Resource resource = entry.getValue(); Resource resource = entry.getValue();
if (interval.getStartTime() == 101L) { if (interval.getStartTime() == 101L) {
@ -211,8 +502,38 @@ public void testToIntervalMap() {
} }
} }
private Map<ReservationInterval, Resource> generateAllocation( private void setupArrays(TreeMap<Long, Resource> a, TreeMap<Long, Resource> b) {
int startTime, int[] alloc, boolean isStep) { a.put(10L, Resource.newInstance(5, 5));
a.put(20L, Resource.newInstance(10, 10));
a.put(30L, Resource.newInstance(15, 15));
a.put(40L, Resource.newInstance(20, 20));
a.put(50L, Resource.newInstance(0, 0));
b.put(11L, Resource.newInstance(5, 5));
b.put(22L, Resource.newInstance(10, 10));
b.put(33L, Resource.newInstance(15, 15));
b.put(40L, Resource.newInstance(20, 20));
b.put(42L, Resource.newInstance(20, 20));
b.put(43L, Resource.newInstance(10, 10));
}
private void validate(RLESparseResourceAllocation out, long[] time,
int[] alloc) {
int i = 0;
for (Entry<Long, Resource> res : out.getCumulative().entrySet()) {
assertEquals(time[i], ((long) res.getKey()));
if (i > alloc.length - 1) {
assertNull(res.getValue());
} else {
assertEquals(alloc[i], res.getValue().getVirtualCores());
}
i++;
}
assertEquals(time.length, i);
}
private Map<ReservationInterval, Resource> generateAllocation(int startTime,
int[] alloc, boolean isStep) {
Map<ReservationInterval, Resource> req = Map<ReservationInterval, Resource> req =
new HashMap<ReservationInterval, Resource>(); new HashMap<ReservationInterval, Resource>();
int numContainers = 0; int numContainers = 0;