YARN-3454. Add efficient merge operation to RLESparseResourceAllocation (Carlo Curino via asuresh)
(cherry picked from commit da1016365a
)
This commit is contained in:
parent
08727287de
commit
cbbdbe3cdd
|
@ -511,6 +511,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-4279. Mark ApplicationId and ApplicationAttemptId static methods as @Public,
|
||||
@Unstable. (stevel)
|
||||
|
||||
YARN-3454. Add efficient merge operation to RLESparseResourceAllocation
|
||||
(Carlo Curino via asuresh)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||
|
|
|
@ -108,7 +108,7 @@ public class InMemoryPlan implements Plan {
|
|||
this.resCalc = resCalc;
|
||||
this.minAlloc = minAlloc;
|
||||
this.maxAlloc = maxAlloc;
|
||||
this.rleSparseVector = new RLESparseResourceAllocation(resCalc, minAlloc);
|
||||
this.rleSparseVector = new RLESparseResourceAllocation(resCalc);
|
||||
this.queueName = queueName;
|
||||
this.replanner = replanner;
|
||||
this.getMoveOnExpiry = getMoveOnExpiry;
|
||||
|
@ -129,7 +129,7 @@ public class InMemoryPlan implements Plan {
|
|||
String user = reservation.getUser();
|
||||
RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user);
|
||||
if (resAlloc == null) {
|
||||
resAlloc = new RLESparseResourceAllocation(resCalc, minAlloc);
|
||||
resAlloc = new RLESparseResourceAllocation(resCalc);
|
||||
userResourceAlloc.put(user, resAlloc);
|
||||
}
|
||||
for (Map.Entry<ReservationInterval, Resource> r : allocationRequests
|
||||
|
@ -492,7 +492,7 @@ public class InMemoryPlan implements Plan {
|
|||
public long getLastEndTime() {
|
||||
readLock.lock();
|
||||
try {
|
||||
return rleSparseVector.getLatestEndTime();
|
||||
return rleSparseVector.getLatestNonNullTime();
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
|
|
@ -67,7 +67,7 @@ public class InMemoryReservationAllocation implements ReservationAllocation {
|
|||
this.allocationRequests = allocations;
|
||||
this.planName = planName;
|
||||
this.hasGang = hasGang;
|
||||
resourcesOverTime = new RLESparseResourceAllocation(calculator, minAlloc);
|
||||
resourcesOverTime = new RLESparseResourceAllocation(calculator);
|
||||
for (Map.Entry<ReservationInterval, Resource> r : allocations
|
||||
.entrySet()) {
|
||||
resourcesOverTime.addInterval(r.getKey(), r.getValue());
|
||||
|
|
|
@ -24,13 +24,12 @@ import java.util.Iterator;
|
|||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
|
@ -43,9 +42,9 @@ import com.google.gson.stream.JsonWriter;
|
|||
public class RLESparseResourceAllocation {
|
||||
|
||||
private static final int THRESHOLD = 100;
|
||||
private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
|
||||
private static final Resource ZERO_RESOURCE = Resources.none();
|
||||
|
||||
private TreeMap<Long, Resource> cumulativeCapacity =
|
||||
private NavigableMap<Long, Resource> cumulativeCapacity =
|
||||
new TreeMap<Long, Resource>();
|
||||
|
||||
private final ReentrantReadWriteLock readWriteLock =
|
||||
|
@ -54,26 +53,20 @@ public class RLESparseResourceAllocation {
|
|||
private final Lock writeLock = readWriteLock.writeLock();
|
||||
|
||||
private final ResourceCalculator resourceCalculator;
|
||||
private final Resource minAlloc;
|
||||
|
||||
public RLESparseResourceAllocation(ResourceCalculator resourceCalculator,
|
||||
Resource minAlloc) {
|
||||
public RLESparseResourceAllocation(ResourceCalculator resourceCalculator) {
|
||||
this.resourceCalculator = resourceCalculator;
|
||||
this.minAlloc = minAlloc;
|
||||
}
|
||||
|
||||
private boolean isSameAsPrevious(Long key, Resource capacity) {
|
||||
Entry<Long, Resource> previous = cumulativeCapacity.lowerEntry(key);
|
||||
return (previous != null && previous.getValue().equals(capacity));
|
||||
}
|
||||
|
||||
private boolean isSameAsNext(Long key, Resource capacity) {
|
||||
Entry<Long, Resource> next = cumulativeCapacity.higherEntry(key);
|
||||
return (next != null && next.getValue().equals(capacity));
|
||||
public RLESparseResourceAllocation(NavigableMap<Long, Resource> out,
|
||||
ResourceCalculator resourceCalculator) {
|
||||
// miss check for repeated entries
|
||||
this.cumulativeCapacity = out;
|
||||
this.resourceCalculator = resourceCalculator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a resource for the specified interval
|
||||
* Add a resource for the specified interval.
|
||||
*
|
||||
* @param reservationInterval the interval for which the resource is to be
|
||||
* added
|
||||
|
@ -87,48 +80,15 @@ public class RLESparseResourceAllocation {
|
|||
}
|
||||
writeLock.lock();
|
||||
try {
|
||||
long startKey = reservationInterval.getStartTime();
|
||||
long endKey = reservationInterval.getEndTime();
|
||||
NavigableMap<Long, Resource> ticks =
|
||||
cumulativeCapacity.headMap(endKey, false);
|
||||
if (ticks != null && !ticks.isEmpty()) {
|
||||
Resource updatedCapacity = Resource.newInstance(0, 0);
|
||||
Entry<Long, Resource> lowEntry = ticks.floorEntry(startKey);
|
||||
if (lowEntry == null) {
|
||||
// This is the earliest starting interval
|
||||
cumulativeCapacity.put(startKey, totCap);
|
||||
} else {
|
||||
updatedCapacity = Resources.add(lowEntry.getValue(), totCap);
|
||||
// Add a new tick only if the updated value is different
|
||||
// from the previous tick
|
||||
if ((startKey == lowEntry.getKey())
|
||||
&& (isSameAsPrevious(lowEntry.getKey(), updatedCapacity))) {
|
||||
cumulativeCapacity.remove(lowEntry.getKey());
|
||||
} else {
|
||||
cumulativeCapacity.put(startKey, updatedCapacity);
|
||||
}
|
||||
}
|
||||
// Increase all the capacities of overlapping intervals
|
||||
Set<Entry<Long, Resource>> overlapSet =
|
||||
ticks.tailMap(startKey, false).entrySet();
|
||||
for (Entry<Long, Resource> entry : overlapSet) {
|
||||
updatedCapacity = Resources.add(entry.getValue(), totCap);
|
||||
entry.setValue(updatedCapacity);
|
||||
}
|
||||
} else {
|
||||
// This is the first interval to be added
|
||||
cumulativeCapacity.put(startKey, totCap);
|
||||
}
|
||||
Resource nextTick = cumulativeCapacity.get(endKey);
|
||||
if (nextTick != null) {
|
||||
// If there is overlap, remove the duplicate entry
|
||||
if (isSameAsPrevious(endKey, nextTick)) {
|
||||
cumulativeCapacity.remove(endKey);
|
||||
}
|
||||
} else {
|
||||
// Decrease capacity as this is end of the interval
|
||||
cumulativeCapacity.put(endKey, Resources.subtract(cumulativeCapacity
|
||||
.floorEntry(endKey).getValue(), totCap));
|
||||
NavigableMap<Long, Resource> addInt = new TreeMap<Long, Resource>();
|
||||
addInt.put(reservationInterval.getStartTime(), totCap);
|
||||
addInt.put(reservationInterval.getEndTime(), ZERO_RESOURCE);
|
||||
try {
|
||||
cumulativeCapacity =
|
||||
merge(resourceCalculator, totCap, cumulativeCapacity, addInt,
|
||||
Long.MIN_VALUE, Long.MAX_VALUE, RLEOperator.add);
|
||||
} catch (PlanningException e) {
|
||||
// never happens for add
|
||||
}
|
||||
return true;
|
||||
} finally {
|
||||
|
@ -137,7 +97,7 @@ public class RLESparseResourceAllocation {
|
|||
}
|
||||
|
||||
/**
|
||||
* Removes a resource for the specified interval
|
||||
* Removes a resource for the specified interval.
|
||||
*
|
||||
* @param reservationInterval the interval for which the resource is to be
|
||||
* removed
|
||||
|
@ -151,34 +111,16 @@ public class RLESparseResourceAllocation {
|
|||
}
|
||||
writeLock.lock();
|
||||
try {
|
||||
long startKey = reservationInterval.getStartTime();
|
||||
long endKey = reservationInterval.getEndTime();
|
||||
// update the start key
|
||||
NavigableMap<Long, Resource> ticks =
|
||||
cumulativeCapacity.headMap(endKey, false);
|
||||
// Decrease all the capacities of overlapping intervals
|
||||
SortedMap<Long, Resource> overlapSet = ticks.tailMap(startKey);
|
||||
if (overlapSet != null && !overlapSet.isEmpty()) {
|
||||
Resource updatedCapacity = Resource.newInstance(0, 0);
|
||||
long currentKey = -1;
|
||||
for (Iterator<Entry<Long, Resource>> overlapEntries =
|
||||
overlapSet.entrySet().iterator(); overlapEntries.hasNext();) {
|
||||
Entry<Long, Resource> entry = overlapEntries.next();
|
||||
currentKey = entry.getKey();
|
||||
updatedCapacity = Resources.subtract(entry.getValue(), totCap);
|
||||
// update each entry between start and end key
|
||||
cumulativeCapacity.put(currentKey, updatedCapacity);
|
||||
}
|
||||
// Remove the first overlap entry if it is same as previous after
|
||||
// updation
|
||||
Long firstKey = overlapSet.firstKey();
|
||||
if (isSameAsPrevious(firstKey, overlapSet.get(firstKey))) {
|
||||
cumulativeCapacity.remove(firstKey);
|
||||
}
|
||||
// Remove the next entry if it is same as end entry after updation
|
||||
if ((currentKey != -1) && (isSameAsNext(currentKey, updatedCapacity))) {
|
||||
cumulativeCapacity.remove(cumulativeCapacity.higherKey(currentKey));
|
||||
}
|
||||
|
||||
NavigableMap<Long, Resource> removeInt = new TreeMap<Long, Resource>();
|
||||
removeInt.put(reservationInterval.getStartTime(), totCap);
|
||||
removeInt.put(reservationInterval.getEndTime(), ZERO_RESOURCE);
|
||||
try {
|
||||
cumulativeCapacity =
|
||||
merge(resourceCalculator, totCap, cumulativeCapacity, removeInt,
|
||||
Long.MIN_VALUE, Long.MAX_VALUE, RLEOperator.subtract);
|
||||
} catch (PlanningException e) {
|
||||
// never happens for subtract
|
||||
}
|
||||
return true;
|
||||
} finally {
|
||||
|
@ -188,9 +130,8 @@ public class RLESparseResourceAllocation {
|
|||
|
||||
/**
|
||||
* Returns the capacity, i.e. total resources allocated at the specified point
|
||||
* of time
|
||||
* of time.
|
||||
*
|
||||
* @param tick the time (UTC in ms) at which the capacity is requested
|
||||
* @return the resources allocated at the specified time
|
||||
*/
|
||||
public Resource getCapacityAtTime(long tick) {
|
||||
|
@ -207,7 +148,7 @@ public class RLESparseResourceAllocation {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the timestamp of the earliest resource allocation
|
||||
* Get the timestamp of the earliest resource allocation.
|
||||
*
|
||||
* @return the timestamp of the first resource allocation
|
||||
*/
|
||||
|
@ -225,17 +166,24 @@ public class RLESparseResourceAllocation {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the timestamp of the latest resource allocation
|
||||
* Get the timestamp of the latest non-null resource allocation.
|
||||
*
|
||||
* @return the timestamp of the last resource allocation
|
||||
*/
|
||||
public long getLatestEndTime() {
|
||||
public long getLatestNonNullTime() {
|
||||
readLock.lock();
|
||||
try {
|
||||
if (cumulativeCapacity.isEmpty()) {
|
||||
return -1;
|
||||
} else {
|
||||
return cumulativeCapacity.lastKey();
|
||||
// the last entry might contain null (to terminate
|
||||
// the sequence)... return previous one.
|
||||
Entry<Long, Resource> last = cumulativeCapacity.lastEntry();
|
||||
if (last.getValue() == null) {
|
||||
return cumulativeCapacity.floorKey(last.getKey() - 1);
|
||||
} else {
|
||||
return last.getKey();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
|
@ -243,7 +191,7 @@ public class RLESparseResourceAllocation {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns true if there are no non-zero entries
|
||||
* Returns true if there are no non-zero entries.
|
||||
*
|
||||
* @return true if there are no allocations or false otherwise
|
||||
*/
|
||||
|
@ -253,9 +201,11 @@ public class RLESparseResourceAllocation {
|
|||
if (cumulativeCapacity.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
// Deletion leaves a single zero entry so check for that
|
||||
if (cumulativeCapacity.size() == 1) {
|
||||
return cumulativeCapacity.firstEntry().getValue().equals(ZERO_RESOURCE);
|
||||
// Deletion leaves a single zero entry with a null at the end so check for
|
||||
// that
|
||||
if (cumulativeCapacity.size() == 2) {
|
||||
return cumulativeCapacity.firstEntry().getValue().equals(ZERO_RESOURCE)
|
||||
&& cumulativeCapacity.lastEntry().getValue() == null;
|
||||
}
|
||||
return false;
|
||||
} finally {
|
||||
|
@ -286,7 +236,7 @@ public class RLESparseResourceAllocation {
|
|||
|
||||
/**
|
||||
* Returns the JSON string representation of the current resources allocated
|
||||
* over time
|
||||
* over time.
|
||||
*
|
||||
* @return the JSON string representation of the current resources allocated
|
||||
* over time
|
||||
|
@ -314,7 +264,7 @@ public class RLESparseResourceAllocation {
|
|||
|
||||
/**
|
||||
* Returns the representation of the current resources allocated over time as
|
||||
* an interval map.
|
||||
* an interval map (in the defined non-null range).
|
||||
*
|
||||
* @return the representation of the current resources allocated over time as
|
||||
* an interval map.
|
||||
|
@ -334,7 +284,7 @@ public class RLESparseResourceAllocation {
|
|||
Map.Entry<Long, Resource> lastEntry = null;
|
||||
for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) {
|
||||
|
||||
if (lastEntry != null) {
|
||||
if (lastEntry != null && entry.getValue() != null) {
|
||||
ReservationInterval interval =
|
||||
new ReservationInterval(lastEntry.getKey(), entry.getKey());
|
||||
Resource resource = lastEntry.getValue();
|
||||
|
@ -348,7 +298,235 @@ public class RLESparseResourceAllocation {
|
|||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public NavigableMap<Long, Resource> getCumulative() {
|
||||
readLock.lock();
|
||||
try {
|
||||
return cumulativeCapacity;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Merges the range start to end of two {@code RLESparseResourceAllocation}
|
||||
* using a given {@code RLEOperator}.
|
||||
*
|
||||
* @param resCalc the resource calculator
|
||||
* @param clusterResource the total cluster resources (for DRF)
|
||||
* @param a the left operand
|
||||
* @param b the right operand
|
||||
* @param operator the operator to be applied during merge
|
||||
* @param start the start-time of the range to be considered
|
||||
* @param end the end-time of the range to be considered
|
||||
* @return the a merged RLESparseResourceAllocation, produced by applying
|
||||
* "operator" to "a" and "b"
|
||||
* @throws PlanningException in case the operator is subtractTestPositive and
|
||||
* the result would contain a negative value
|
||||
*/
|
||||
public static RLESparseResourceAllocation merge(ResourceCalculator resCalc,
|
||||
Resource clusterResource, RLESparseResourceAllocation a,
|
||||
RLESparseResourceAllocation b, RLEOperator operator, long start, long end)
|
||||
throws PlanningException {
|
||||
NavigableMap<Long, Resource> cumA =
|
||||
a.getRangeOverlapping(start, end).getCumulative();
|
||||
NavigableMap<Long, Resource> cumB =
|
||||
b.getRangeOverlapping(start, end).getCumulative();
|
||||
NavigableMap<Long, Resource> out =
|
||||
merge(resCalc, clusterResource, cumA, cumB, start, end, operator);
|
||||
return new RLESparseResourceAllocation(out, resCalc);
|
||||
}
|
||||
|
||||
private static NavigableMap<Long, Resource> merge(ResourceCalculator resCalc,
|
||||
Resource clusterResource, NavigableMap<Long, Resource> a,
|
||||
NavigableMap<Long, Resource> b, long start, long end,
|
||||
RLEOperator operator) throws PlanningException {
|
||||
|
||||
// handle special cases of empty input
|
||||
if (a == null || a.isEmpty()) {
|
||||
if (operator == RLEOperator.subtract
|
||||
|| operator == RLEOperator.subtractTestNonNegative) {
|
||||
return negate(operator, b);
|
||||
} else {
|
||||
return b;
|
||||
}
|
||||
}
|
||||
if (b == null || b.isEmpty()) {
|
||||
return a;
|
||||
}
|
||||
|
||||
// define iterators and support variables
|
||||
Iterator<Entry<Long, Resource>> aIt = a.entrySet().iterator();
|
||||
Iterator<Entry<Long, Resource>> bIt = b.entrySet().iterator();
|
||||
Entry<Long, Resource> curA = aIt.next();
|
||||
Entry<Long, Resource> curB = bIt.next();
|
||||
Entry<Long, Resource> lastA = null;
|
||||
Entry<Long, Resource> lastB = null;
|
||||
boolean aIsDone = false;
|
||||
boolean bIsDone = false;
|
||||
|
||||
TreeMap<Long, Resource> out = new TreeMap<Long, Resource>();
|
||||
|
||||
while (!(curA.equals(lastA) && curB.equals(lastB))) {
|
||||
|
||||
Resource outRes;
|
||||
long time = -1;
|
||||
|
||||
// curA is smaller than curB
|
||||
if (bIsDone || (curA.getKey() < curB.getKey() && !aIsDone)) {
|
||||
outRes = combineValue(operator, resCalc, clusterResource, curA, lastB);
|
||||
time = (curA.getKey() < start) ? start : curA.getKey();
|
||||
lastA = curA;
|
||||
if (aIt.hasNext()) {
|
||||
curA = aIt.next();
|
||||
} else {
|
||||
aIsDone = true;
|
||||
}
|
||||
|
||||
} else {
|
||||
// curB is smaller than curA
|
||||
if (aIsDone || (curA.getKey() > curB.getKey() && !bIsDone)) {
|
||||
outRes =
|
||||
combineValue(operator, resCalc, clusterResource, lastA, curB);
|
||||
time = (curB.getKey() < start) ? start : curB.getKey();
|
||||
lastB = curB;
|
||||
if (bIt.hasNext()) {
|
||||
curB = bIt.next();
|
||||
} else {
|
||||
bIsDone = true;
|
||||
}
|
||||
|
||||
} else {
|
||||
// curA is equal to curB
|
||||
outRes = combineValue(operator, resCalc, clusterResource, curA, curB);
|
||||
time = (curA.getKey() < start) ? start : curA.getKey();
|
||||
lastA = curA;
|
||||
if (aIt.hasNext()) {
|
||||
curA = aIt.next();
|
||||
} else {
|
||||
aIsDone = true;
|
||||
}
|
||||
lastB = curB;
|
||||
if (bIt.hasNext()) {
|
||||
curB = bIt.next();
|
||||
} else {
|
||||
bIsDone = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// add to out if not redundant
|
||||
addIfNeeded(out, time, outRes);
|
||||
}
|
||||
addIfNeeded(out, end, null);
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
private static NavigableMap<Long, Resource> negate(RLEOperator operator,
|
||||
NavigableMap<Long, Resource> a) throws PlanningException {
|
||||
|
||||
TreeMap<Long, Resource> out = new TreeMap<Long, Resource>();
|
||||
for (Entry<Long, Resource> e : a.entrySet()) {
|
||||
Resource val = Resources.negate(e.getValue());
|
||||
// test for negative value and throws
|
||||
if (operator == RLEOperator.subtractTestNonNegative
|
||||
&& (Resources.fitsIn(val, ZERO_RESOURCE) &&
|
||||
!Resources.equals(val, ZERO_RESOURCE))) {
|
||||
throw new PlanningException(
|
||||
"RLESparseResourceAllocation: merge failed as the "
|
||||
+ "resulting RLESparseResourceAllocation would be negative");
|
||||
}
|
||||
out.put(e.getKey(), val);
|
||||
}
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
private static void addIfNeeded(TreeMap<Long, Resource> out, long time,
|
||||
Resource outRes) {
|
||||
|
||||
if (out.isEmpty() || (out.lastEntry() != null && outRes == null)
|
||||
|| !Resources.equals(out.lastEntry().getValue(), outRes)) {
|
||||
out.put(time, outRes);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static Resource combineValue(RLEOperator op,
|
||||
ResourceCalculator resCalc, Resource clusterResource,
|
||||
Entry<Long, Resource> eA, Entry<Long, Resource> eB)
|
||||
throws PlanningException {
|
||||
|
||||
// deal with nulls
|
||||
if (eA == null || eA.getValue() == null) {
|
||||
if (eB == null || eB.getValue() == null) {
|
||||
return null;
|
||||
}
|
||||
if (op == RLEOperator.subtract) {
|
||||
return Resources.negate(eB.getValue());
|
||||
} else {
|
||||
return eB.getValue();
|
||||
}
|
||||
}
|
||||
if (eB == null || eB.getValue() == null) {
|
||||
return eA.getValue();
|
||||
}
|
||||
|
||||
Resource a = eA.getValue();
|
||||
Resource b = eB.getValue();
|
||||
switch (op) {
|
||||
case add:
|
||||
return Resources.add(a, b);
|
||||
case subtract:
|
||||
return Resources.subtract(a, b);
|
||||
case subtractTestNonNegative:
|
||||
if (!Resources.fitsIn(b, a)) {
|
||||
throw new PlanningException(
|
||||
"RLESparseResourceAllocation: merge failed as the "
|
||||
+ "resulting RLESparseResourceAllocation would be negative");
|
||||
} else {
|
||||
return Resources.subtract(a, b);
|
||||
}
|
||||
case min:
|
||||
return Resources.min(resCalc, clusterResource, a, b);
|
||||
case max:
|
||||
return Resources.max(resCalc, clusterResource, a, b);
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public RLESparseResourceAllocation getRangeOverlapping(long start, long end) {
|
||||
readLock.lock();
|
||||
try {
|
||||
NavigableMap<Long, Resource> a = this.getCumulative();
|
||||
|
||||
if (a != null && !a.isEmpty()) {
|
||||
// include the portion of previous entry that overlaps start
|
||||
if (start > a.firstKey()) {
|
||||
long previous = a.floorKey(start);
|
||||
a = a.tailMap(previous, true);
|
||||
}
|
||||
a = a.headMap(end, true);
|
||||
}
|
||||
RLESparseResourceAllocation ret =
|
||||
new RLESparseResourceAllocation(a, resourceCalculator);
|
||||
return ret;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* The set of operators that can be applied to two
|
||||
* {@code RLESparseResourceAllocation} during a merge operation.
|
||||
*/
|
||||
public enum RLEOperator {
|
||||
add, subtract, min, max, subtractTestNonNegative
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -95,8 +95,7 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
|
||||
// Create the allocations data structure
|
||||
RLESparseResourceAllocation allocations =
|
||||
new RLESparseResourceAllocation(plan.getResourceCalculator(),
|
||||
plan.getMinimumAllocation());
|
||||
new RLESparseResourceAllocation(plan.getResourceCalculator());
|
||||
|
||||
// Get a reverse iterator for the set of stages
|
||||
ListIterator<ReservationRequest> li =
|
||||
|
@ -219,8 +218,7 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
|
||||
// Initialize the plan modifications
|
||||
planModifications =
|
||||
new RLESparseResourceAllocation(plan.getResourceCalculator(),
|
||||
plan.getMinimumAllocation());
|
||||
new RLESparseResourceAllocation(plan.getResourceCalculator());
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -69,8 +69,7 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
|
|||
|
||||
// Create allocationRequestsearlies
|
||||
RLESparseResourceAllocation allocationRequests =
|
||||
new RLESparseResourceAllocation(plan.getResourceCalculator(),
|
||||
plan.getMinimumAllocation());
|
||||
new RLESparseResourceAllocation(plan.getResourceCalculator());
|
||||
|
||||
// Initialize parameters
|
||||
long duration = stepRoundUp(rr.getDuration(), step);
|
||||
|
|
|
@ -17,17 +17,25 @@
|
|||
*******************************************************************************/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -37,13 +45,249 @@ public class TestRLESparseResourceAllocation {
|
|||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(TestRLESparseResourceAllocation.class);
|
||||
|
||||
@Test
|
||||
public void testMergeAdd() throws PlanningException {
|
||||
|
||||
TreeMap<Long, Resource> a = new TreeMap<>();
|
||||
TreeMap<Long, Resource> b = new TreeMap<>();
|
||||
|
||||
setupArrays(a, b);
|
||||
|
||||
RLESparseResourceAllocation rleA =
|
||||
new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
|
||||
RLESparseResourceAllocation rleB =
|
||||
new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
|
||||
|
||||
RLESparseResourceAllocation out =
|
||||
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
|
||||
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
|
||||
RLEOperator.add, 18, 45);
|
||||
|
||||
System.out.println(out);
|
||||
|
||||
long[] time = { 18, 20, 22, 30, 33, 40, 43, 45 };
|
||||
int[] alloc = { 10, 15, 20, 25, 30, 40, 30 };
|
||||
|
||||
validate(out, time, alloc);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeMin() throws PlanningException {
|
||||
|
||||
TreeMap<Long, Resource> a = new TreeMap<>();
|
||||
TreeMap<Long, Resource> b = new TreeMap<>();
|
||||
|
||||
setupArrays(a, b);
|
||||
|
||||
RLESparseResourceAllocation rleA =
|
||||
new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
|
||||
RLESparseResourceAllocation rleB =
|
||||
new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
|
||||
|
||||
RLESparseResourceAllocation out =
|
||||
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
|
||||
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
|
||||
RLEOperator.min, 0, 60);
|
||||
|
||||
System.out.println(out);
|
||||
|
||||
long[] time = { 10, 22, 33, 40, 43, 50, 60 };
|
||||
int[] alloc = { 5, 10, 15, 20, 10, 0 };
|
||||
|
||||
validate(out, time, alloc);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeMax() throws PlanningException {
|
||||
|
||||
TreeMap<Long, Resource> a = new TreeMap<>();
|
||||
TreeMap<Long, Resource> b = new TreeMap<>();
|
||||
|
||||
setupArrays(a, b);
|
||||
|
||||
RLESparseResourceAllocation rleA =
|
||||
new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
|
||||
RLESparseResourceAllocation rleB =
|
||||
new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
|
||||
|
||||
RLESparseResourceAllocation out =
|
||||
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
|
||||
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
|
||||
RLEOperator.max, 0, 60);
|
||||
|
||||
System.out.println(out);
|
||||
|
||||
long[] time = { 10, 20, 30, 40, 50, 60 };
|
||||
int[] alloc = { 5, 10, 15, 20, 10 };
|
||||
|
||||
validate(out, time, alloc);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeSubtract() throws PlanningException {
|
||||
|
||||
TreeMap<Long, Resource> a = new TreeMap<>();
|
||||
TreeMap<Long, Resource> b = new TreeMap<>();
|
||||
|
||||
setupArrays(a, b);
|
||||
|
||||
RLESparseResourceAllocation rleA =
|
||||
new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
|
||||
RLESparseResourceAllocation rleB =
|
||||
new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
|
||||
|
||||
RLESparseResourceAllocation out =
|
||||
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
|
||||
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
|
||||
RLEOperator.subtract, 0, 60);
|
||||
|
||||
System.out.println(out);
|
||||
|
||||
long[] time = { 10, 11, 20, 22, 30, 33, 43, 50, 60 };
|
||||
int[] alloc = { 5, 0, 5, 0, 5, 0, 10, -10 };
|
||||
|
||||
validate(out, time, alloc);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergesubtractTestNonNegative() throws PlanningException {
|
||||
|
||||
// starting with default array example
|
||||
TreeMap<Long, Resource> a = new TreeMap<>();
|
||||
TreeMap<Long, Resource> b = new TreeMap<>();
|
||||
|
||||
setupArrays(a, b);
|
||||
|
||||
RLESparseResourceAllocation rleA =
|
||||
new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
|
||||
RLESparseResourceAllocation rleB =
|
||||
new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
|
||||
|
||||
try {
|
||||
RLESparseResourceAllocation out =
|
||||
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
|
||||
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
|
||||
RLEOperator.subtractTestNonNegative, 0, 60);
|
||||
fail();
|
||||
} catch (PlanningException pe) {
|
||||
// Expected!
|
||||
}
|
||||
|
||||
// NOTE a is empty!! so the subtraction is implicitly considered negative
|
||||
// and the test should fail
|
||||
|
||||
a = new TreeMap<>();
|
||||
b = new TreeMap<>();
|
||||
b.put(11L, Resource.newInstance(5, 6));
|
||||
|
||||
rleA = new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
|
||||
rleB = new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
|
||||
|
||||
try {
|
||||
RLESparseResourceAllocation out =
|
||||
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
|
||||
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
|
||||
RLEOperator.subtractTestNonNegative, 0, 60);
|
||||
fail();
|
||||
} catch (PlanningException pe) {
|
||||
// Expected!
|
||||
}
|
||||
|
||||
// Testing that the subtractTestNonNegative detects problems even if only one
|
||||
// of the resource dimensions is "<0"
|
||||
a.put(10L, Resource.newInstance(10, 5));
|
||||
b.put(11L, Resource.newInstance(5, 6));
|
||||
|
||||
rleA = new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
|
||||
rleB = new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
|
||||
|
||||
try {
|
||||
RLESparseResourceAllocation out =
|
||||
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
|
||||
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
|
||||
RLEOperator.subtractTestNonNegative, 0, 60);
|
||||
fail();
|
||||
} catch (PlanningException pe) {
|
||||
// Expected!
|
||||
}
|
||||
|
||||
// try with reverse setting
|
||||
a.put(10L, Resource.newInstance(5, 10));
|
||||
b.put(11L, Resource.newInstance(6, 5));
|
||||
|
||||
rleA = new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
|
||||
rleB = new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
|
||||
|
||||
try {
|
||||
RLESparseResourceAllocation out =
|
||||
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
|
||||
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
|
||||
RLEOperator.subtractTestNonNegative, 0, 60);
|
||||
fail();
|
||||
} catch (PlanningException pe) {
|
||||
// Expected!
|
||||
}
|
||||
|
||||
// trying a case that should work
|
||||
a.put(10L, Resource.newInstance(10, 6));
|
||||
b.put(11L, Resource.newInstance(5, 6));
|
||||
|
||||
rleA = new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
|
||||
rleB = new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
|
||||
|
||||
RLESparseResourceAllocation out =
|
||||
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
|
||||
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
|
||||
RLEOperator.subtractTestNonNegative, 0, 60);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void testMergeSpeed() throws PlanningException {
|
||||
|
||||
for (int j = 0; j < 100; j++) {
|
||||
TreeMap<Long, Resource> a = new TreeMap<>();
|
||||
TreeMap<Long, Resource> b = new TreeMap<>();
|
||||
Random rand = new Random();
|
||||
long startA = 0;
|
||||
long startB = 0;
|
||||
|
||||
for (int i = 0; i < 1000 + rand.nextInt(9000); i++) {
|
||||
startA += rand.nextInt(100);
|
||||
startB += rand.nextInt(100);
|
||||
a.put(startA,
|
||||
Resource.newInstance(rand.nextInt(10240), rand.nextInt(10)));
|
||||
b.put(startB,
|
||||
Resource.newInstance(rand.nextInt(10240), rand.nextInt(10)));
|
||||
}
|
||||
|
||||
RLESparseResourceAllocation rleA =
|
||||
new RLESparseResourceAllocation(a, new DefaultResourceCalculator());
|
||||
RLESparseResourceAllocation rleB =
|
||||
new RLESparseResourceAllocation(b, new DefaultResourceCalculator());
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
RLESparseResourceAllocation out =
|
||||
RLESparseResourceAllocation.merge(new DefaultResourceCalculator(),
|
||||
Resource.newInstance(100 * 128 * 1024, 100 * 32), rleA, rleB,
|
||||
RLEOperator.add, Long.MIN_VALUE, Long.MAX_VALUE);
|
||||
long end = System.currentTimeMillis();
|
||||
|
||||
System.out.println(" Took: " + (end - start) + "ms ");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlocks() {
|
||||
ResourceCalculator resCalc = new DefaultResourceCalculator();
|
||||
Resource minAlloc = Resource.newInstance(1, 1);
|
||||
|
||||
RLESparseResourceAllocation rleSparseVector =
|
||||
new RLESparseResourceAllocation(resCalc, minAlloc);
|
||||
new RLESparseResourceAllocation(resCalc);
|
||||
int[] alloc = { 10, 10, 10, 10, 10, 10 };
|
||||
int start = 100;
|
||||
Set<Entry<ReservationInterval, Resource>> inputs =
|
||||
|
@ -75,12 +319,63 @@ public class TestRLESparseResourceAllocation {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSteps() {
|
||||
public void testPartialRemoval() {
|
||||
ResourceCalculator resCalc = new DefaultResourceCalculator();
|
||||
Resource minAlloc = Resource.newInstance(1, 1);
|
||||
|
||||
RLESparseResourceAllocation rleSparseVector =
|
||||
new RLESparseResourceAllocation(resCalc, minAlloc);
|
||||
new RLESparseResourceAllocation(resCalc);
|
||||
|
||||
ReservationInterval riAdd = new ReservationInterval(10, 20);
|
||||
Resource rr = Resource.newInstance(1024 * 100, 100);
|
||||
|
||||
ReservationInterval riAdd2 = new ReservationInterval(20, 30);
|
||||
|
||||
Resource rr2 = Resource.newInstance(1024 * 200, 200);
|
||||
|
||||
ReservationInterval riRemove = new ReservationInterval(12, 25);
|
||||
// same if we use this
|
||||
// ReservationRequest rrRemove =
|
||||
// ReservationRequest.newInstance(Resource.newInstance(1024, 1), 100, 1,6);
|
||||
LOG.info(rleSparseVector.toString());
|
||||
|
||||
rleSparseVector.addInterval(riAdd, rr);
|
||||
rleSparseVector.addInterval(riAdd2, rr2);
|
||||
LOG.info(rleSparseVector.toString());
|
||||
|
||||
rleSparseVector.removeInterval(riRemove, rr);
|
||||
LOG.info(rleSparseVector.toString());
|
||||
|
||||
// Current bug prevents this to pass. The RLESparseResourceAllocation
|
||||
// does not handle removal of "partial"
|
||||
// allocations correctly.
|
||||
Assert.assertEquals(102400, rleSparseVector.getCapacityAtTime(10)
|
||||
.getMemory());
|
||||
Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(13).getMemory());
|
||||
Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(19).getMemory());
|
||||
Assert.assertEquals(102400, rleSparseVector.getCapacityAtTime(21)
|
||||
.getMemory());
|
||||
Assert.assertEquals(2 * 102400, rleSparseVector.getCapacityAtTime(26)
|
||||
.getMemory());
|
||||
|
||||
ReservationInterval riRemove2 = new ReservationInterval(9, 13);
|
||||
rleSparseVector.removeInterval(riRemove2, rr);
|
||||
LOG.info(rleSparseVector.toString());
|
||||
|
||||
Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(11).getMemory());
|
||||
Assert.assertEquals(-102400, rleSparseVector.getCapacityAtTime(9)
|
||||
.getMemory());
|
||||
Assert.assertEquals(0, rleSparseVector.getCapacityAtTime(13).getMemory());
|
||||
Assert.assertEquals(102400, rleSparseVector.getCapacityAtTime(20)
|
||||
.getMemory());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSteps() {
|
||||
ResourceCalculator resCalc = new DefaultResourceCalculator();
|
||||
|
||||
RLESparseResourceAllocation rleSparseVector =
|
||||
new RLESparseResourceAllocation(resCalc);
|
||||
int[] alloc = { 10, 10, 10, 10, 10, 10 };
|
||||
int start = 100;
|
||||
Set<Entry<ReservationInterval, Resource>> inputs =
|
||||
|
@ -102,7 +397,7 @@ public class TestRLESparseResourceAllocation {
|
|||
Assert.assertEquals(Resource.newInstance(0, 0),
|
||||
rleSparseVector.getCapacityAtTime(start + alloc.length + 2));
|
||||
for (Entry<ReservationInterval, Resource> ip : inputs) {
|
||||
rleSparseVector.removeInterval(ip.getKey(),ip.getValue());
|
||||
rleSparseVector.removeInterval(ip.getKey(), ip.getValue());
|
||||
}
|
||||
LOG.info(rleSparseVector.toString());
|
||||
for (int i = 0; i < alloc.length; i++) {
|
||||
|
@ -115,10 +410,9 @@ public class TestRLESparseResourceAllocation {
|
|||
@Test
|
||||
public void testSkyline() {
|
||||
ResourceCalculator resCalc = new DefaultResourceCalculator();
|
||||
Resource minAlloc = Resource.newInstance(1, 1);
|
||||
|
||||
RLESparseResourceAllocation rleSparseVector =
|
||||
new RLESparseResourceAllocation(resCalc, minAlloc);
|
||||
new RLESparseResourceAllocation(resCalc);
|
||||
int[] alloc = { 0, 5, 10, 10, 5, 0 };
|
||||
int start = 100;
|
||||
Set<Entry<ReservationInterval, Resource>> inputs =
|
||||
|
@ -151,11 +445,10 @@ public class TestRLESparseResourceAllocation {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testZeroAlloaction() {
|
||||
public void testZeroAllocation() {
|
||||
ResourceCalculator resCalc = new DefaultResourceCalculator();
|
||||
Resource minAlloc = Resource.newInstance(1, 1);
|
||||
RLESparseResourceAllocation rleSparseVector =
|
||||
new RLESparseResourceAllocation(resCalc, minAlloc);
|
||||
new RLESparseResourceAllocation(resCalc);
|
||||
rleSparseVector.addInterval(new ReservationInterval(0, Long.MAX_VALUE),
|
||||
Resource.newInstance(0, 0));
|
||||
LOG.info(rleSparseVector.toString());
|
||||
|
@ -167,9 +460,8 @@ public class TestRLESparseResourceAllocation {
|
|||
@Test
|
||||
public void testToIntervalMap() {
|
||||
ResourceCalculator resCalc = new DefaultResourceCalculator();
|
||||
Resource minAlloc = Resource.newInstance(1, 1);
|
||||
RLESparseResourceAllocation rleSparseVector =
|
||||
new RLESparseResourceAllocation(resCalc, minAlloc);
|
||||
new RLESparseResourceAllocation(resCalc);
|
||||
Map<ReservationInterval, Resource> mapAllocations;
|
||||
|
||||
// Check empty
|
||||
|
@ -186,8 +478,7 @@ public class TestRLESparseResourceAllocation {
|
|||
}
|
||||
mapAllocations = rleSparseVector.toIntervalMap();
|
||||
Assert.assertTrue(mapAllocations.size() == 5);
|
||||
for (Entry<ReservationInterval, Resource> entry : mapAllocations
|
||||
.entrySet()) {
|
||||
for (Entry<ReservationInterval, Resource> entry : mapAllocations.entrySet()) {
|
||||
ReservationInterval interval = entry.getKey();
|
||||
Resource resource = entry.getValue();
|
||||
if (interval.getStartTime() == 101L) {
|
||||
|
@ -211,8 +502,38 @@ public class TestRLESparseResourceAllocation {
|
|||
}
|
||||
}
|
||||
|
||||
private Map<ReservationInterval, Resource> generateAllocation(
|
||||
int startTime, int[] alloc, boolean isStep) {
|
||||
private void setupArrays(TreeMap<Long, Resource> a, TreeMap<Long, Resource> b) {
|
||||
a.put(10L, Resource.newInstance(5, 5));
|
||||
a.put(20L, Resource.newInstance(10, 10));
|
||||
a.put(30L, Resource.newInstance(15, 15));
|
||||
a.put(40L, Resource.newInstance(20, 20));
|
||||
a.put(50L, Resource.newInstance(0, 0));
|
||||
|
||||
b.put(11L, Resource.newInstance(5, 5));
|
||||
b.put(22L, Resource.newInstance(10, 10));
|
||||
b.put(33L, Resource.newInstance(15, 15));
|
||||
b.put(40L, Resource.newInstance(20, 20));
|
||||
b.put(42L, Resource.newInstance(20, 20));
|
||||
b.put(43L, Resource.newInstance(10, 10));
|
||||
}
|
||||
|
||||
private void validate(RLESparseResourceAllocation out, long[] time,
|
||||
int[] alloc) {
|
||||
int i = 0;
|
||||
for (Entry<Long, Resource> res : out.getCumulative().entrySet()) {
|
||||
assertEquals(time[i], ((long) res.getKey()));
|
||||
if (i > alloc.length - 1) {
|
||||
assertNull(res.getValue());
|
||||
} else {
|
||||
assertEquals(alloc[i], res.getValue().getVirtualCores());
|
||||
}
|
||||
i++;
|
||||
}
|
||||
assertEquals(time.length, i);
|
||||
}
|
||||
|
||||
private Map<ReservationInterval, Resource> generateAllocation(int startTime,
|
||||
int[] alloc, boolean isStep) {
|
||||
Map<ReservationInterval, Resource> req =
|
||||
new HashMap<ReservationInterval, Resource>();
|
||||
int numContainers = 0;
|
||||
|
|
Loading…
Reference in New Issue