YARN-5329. Placement Agent enhancements required to support recurring reservations in ReservationSystem. (Carlo Curino via Subru).
(cherry picked from commit e6e614e380
)
This commit is contained in:
parent
5c2f07eed7
commit
829a8e26b9
|
@ -723,6 +723,12 @@ public class InMemoryPlan implements Plan {
|
|||
+ periodicRle.getTimePeriod() + ")");
|
||||
}
|
||||
|
||||
if (period < (end - start)) {
|
||||
throw new PlanningException(
|
||||
"Invalid input: (end - start) = (" + end + " - " + start + ") = "
|
||||
+ (end - start) + " > period = " + period);
|
||||
}
|
||||
|
||||
// find the minimum resources available among all the instances that fit
|
||||
// in the LCM
|
||||
long numInstInLCM = periodicRle.getTimePeriod() / period;
|
||||
|
|
|
@ -221,7 +221,8 @@ public class PeriodicRLESparseResourceAllocation
|
|||
NavigableMap<Long, Resource> cumulativeMap = this.getCumulative();
|
||||
Long previous = cumulativeMap.floorKey(relativeStart);
|
||||
previous = (previous != null) ? previous : 0;
|
||||
for (long i = 0; i <= (end - start) / timePeriod; i++) {
|
||||
//make sure to go one past end, to catch end times extending past period
|
||||
for (long i = 0; i <= 1 + (end - start) / timePeriod; i++) {
|
||||
for (Map.Entry<Long, Resource> e : cumulativeMap.entrySet()) {
|
||||
long curKey = e.getKey() + (i * timePeriod);
|
||||
if (curKey >= previous && (start + curKey - relativeStart) <= end) {
|
||||
|
|
|
@ -423,7 +423,8 @@ public class RLESparseResourceAllocation {
|
|||
Resource outRes) {
|
||||
|
||||
if (out.isEmpty() || (out.lastEntry() != null && outRes == null)
|
||||
|| !Resources.equals(out.lastEntry().getValue(), outRes)) {
|
||||
|| (out.lastEntry().getValue() != null
|
||||
&& !Resources.equals(out.lastEntry().getValue(), outRes))) {
|
||||
out.put(time, outRes);
|
||||
}
|
||||
|
||||
|
@ -460,7 +461,8 @@ public class RLESparseResourceAllocation {
|
|||
if (!Resources.fitsIn(b, a)) {
|
||||
throw new PlanningException(
|
||||
"RLESparseResourceAllocation: merge failed as the "
|
||||
+ "resulting RLESparseResourceAllocation would be negative");
|
||||
+ "resulting RLESparseResourceAllocation would "
|
||||
+ "be negative, when testing: (" + eB + ") > (" + eA + ")");
|
||||
} else {
|
||||
return Resources.subtract(a, b);
|
||||
}
|
||||
|
|
|
@ -98,6 +98,12 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
// Current stage
|
||||
ReservationRequest currentReservationStage;
|
||||
|
||||
// initialize periodicity
|
||||
long period = 0;
|
||||
if(reservation.getRecurrenceExpression() != null){
|
||||
period = Long.parseLong(reservation.getRecurrenceExpression());
|
||||
}
|
||||
|
||||
// Iterate the stages in reverse order
|
||||
while (stageProvider.hasNext()) {
|
||||
|
||||
|
@ -117,7 +123,7 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
// Compute stage allocation
|
||||
Map<ReservationInterval, Resource> curAlloc =
|
||||
computeStageAllocation(plan, currentReservationStage, stageArrival,
|
||||
stageDeadline, user, reservationId);
|
||||
stageDeadline, period, user, reservationId);
|
||||
|
||||
// If we did not find an allocation, return NULL
|
||||
// (unless it's an ANY job, then we simply continue).
|
||||
|
@ -216,11 +222,10 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
planLoads = plan.getCumulativeLoadOverTime(jobArrival, jobDeadline);
|
||||
ReservationAllocation oldRes = plan.getReservationById(reservationId);
|
||||
if (oldRes != null) {
|
||||
planLoads =
|
||||
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
|
||||
plan.getTotalCapacity(), planLoads,
|
||||
oldRes.getResourcesOverTime(), RLEOperator.subtract, jobArrival,
|
||||
jobDeadline);
|
||||
planLoads = RLESparseResourceAllocation.merge(
|
||||
plan.getResourceCalculator(), plan.getTotalCapacity(), planLoads,
|
||||
oldRes.getResourcesOverTime(jobArrival, jobDeadline),
|
||||
RLEOperator.subtract, jobArrival, jobDeadline);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -309,13 +314,13 @@ public class IterativePlanner extends PlanningAlgorithm {
|
|||
}
|
||||
|
||||
// Call algStageAllocator
|
||||
protected Map<ReservationInterval, Resource> computeStageAllocation(
|
||||
Plan plan, ReservationRequest rr, long stageArrivalTime,
|
||||
long stageDeadline, String user, ReservationId oldId)
|
||||
throws PlanningException {
|
||||
protected Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
|
||||
ReservationRequest rr, long stageArrivalTime, long stageDeadline,
|
||||
long period, String user, ReservationId oldId) throws PlanningException {
|
||||
|
||||
return algStageAllocator.computeStageAllocation(plan, planLoads,
|
||||
planModifications, rr, stageArrivalTime, stageDeadline, user, oldId);
|
||||
planModifications, rr, stageArrivalTime, stageDeadline, period, user,
|
||||
oldId);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
|||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.PeriodicRLESparseResourceAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
|
||||
|
@ -63,21 +64,33 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
|
|||
|
||||
// Compute the job allocation
|
||||
RLESparseResourceAllocation allocation =
|
||||
computeJobAllocation(plan, reservationId, adjustedContract, user);
|
||||
computeJobAllocation(plan, reservationId, adjustedContract, user);
|
||||
|
||||
long period = Long.parseLong(contract.getRecurrenceExpression());
|
||||
|
||||
// Make allocation periodic if request is periodic
|
||||
if (contract.getRecurrenceExpression() != null) {
|
||||
if (period > 0) {
|
||||
allocation =
|
||||
new PeriodicRLESparseResourceAllocation(allocation, period);
|
||||
}
|
||||
}
|
||||
|
||||
// If no job allocation was found, fail
|
||||
if (allocation == null) {
|
||||
throw new PlanningException(
|
||||
"The planning algorithm could not find a valid allocation"
|
||||
+ " for your request");
|
||||
"The planning algorithm could not find a valid allocation"
|
||||
+ " for your request");
|
||||
}
|
||||
|
||||
// Translate the allocation to a map (with zero paddings)
|
||||
long step = plan.getStep();
|
||||
|
||||
long jobArrival = stepRoundUp(adjustedContract.getArrival(), step);
|
||||
long jobDeadline = stepRoundUp(adjustedContract.getDeadline(), step);
|
||||
|
||||
Map<ReservationInterval, Resource> mapAllocations =
|
||||
allocationsToPaddedMap(allocation, jobArrival, jobDeadline);
|
||||
allocationsToPaddedMap(allocation, jobArrival, jobDeadline, period);
|
||||
|
||||
// Create the reservation
|
||||
ReservationAllocation capReservation =
|
||||
|
@ -85,8 +98,7 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
|
|||
adjustedContract, // Contract
|
||||
user, // User name
|
||||
plan.getQueueName(), // Queue name
|
||||
findEarliestTime(mapAllocations), // Earliest start time
|
||||
findLatestTime(mapAllocations), // Latest end time
|
||||
adjustedContract.getArrival(), adjustedContract.getDeadline(),
|
||||
mapAllocations, // Allocations
|
||||
plan.getResourceCalculator(), // Resource calculator
|
||||
plan.getMinimumAllocation()); // Minimum allocation
|
||||
|
@ -100,33 +112,46 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
|
|||
|
||||
}
|
||||
|
||||
private Map<ReservationInterval, Resource>
|
||||
allocationsToPaddedMap(RLESparseResourceAllocation allocation,
|
||||
long jobArrival, long jobDeadline) {
|
||||
|
||||
// Allocate
|
||||
Map<ReservationInterval, Resource> mapAllocations =
|
||||
allocation.toIntervalMap();
|
||||
private Map<ReservationInterval, Resource> allocationsToPaddedMap(
|
||||
RLESparseResourceAllocation allocation, long jobArrival, long jobDeadline,
|
||||
long period) {
|
||||
|
||||
// Zero allocation
|
||||
Resource zeroResource = Resource.newInstance(0, 0);
|
||||
|
||||
// Pad at the beginning
|
||||
long earliestStart = findEarliestTime(mapAllocations);
|
||||
if (jobArrival < earliestStart) {
|
||||
mapAllocations.put(new ReservationInterval(jobArrival, earliestStart),
|
||||
zeroResource);
|
||||
if (period > 0) {
|
||||
if ((jobDeadline - jobArrival) >= period) {
|
||||
allocation.addInterval(new ReservationInterval(0L, period),
|
||||
zeroResource);
|
||||
}
|
||||
jobArrival = jobArrival % period;
|
||||
jobDeadline = jobDeadline % period;
|
||||
|
||||
if (jobArrival <= jobDeadline) {
|
||||
allocation.addInterval(new ReservationInterval(0, jobArrival),
|
||||
zeroResource);
|
||||
allocation.addInterval(new ReservationInterval(jobDeadline, period),
|
||||
zeroResource);
|
||||
} else {
|
||||
allocation.addInterval(new ReservationInterval(jobDeadline, jobArrival),
|
||||
zeroResource);
|
||||
}
|
||||
} else {
|
||||
// Pad at the beginning
|
||||
long earliestStart = findEarliestTime(allocation.toIntervalMap());
|
||||
if (jobArrival < earliestStart) {
|
||||
allocation.addInterval(
|
||||
new ReservationInterval(jobArrival, earliestStart), zeroResource);
|
||||
}
|
||||
|
||||
// Pad at the beginning
|
||||
long latestEnd = findLatestTime(allocation.toIntervalMap());
|
||||
if (latestEnd < jobDeadline) {
|
||||
allocation.addInterval(new ReservationInterval(latestEnd, jobDeadline),
|
||||
zeroResource);
|
||||
}
|
||||
}
|
||||
|
||||
// Pad at the beginning
|
||||
long latestEnd = findLatestTime(mapAllocations);
|
||||
if (latestEnd < jobDeadline) {
|
||||
mapAllocations.put(new ReservationInterval(latestEnd, jobDeadline),
|
||||
zeroResource);
|
||||
}
|
||||
|
||||
return mapAllocations;
|
||||
|
||||
return allocation.toIntervalMap();
|
||||
}
|
||||
|
||||
public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan,
|
||||
|
|
|
@ -45,6 +45,7 @@ public interface StageAllocator {
|
|||
* the stage by the two phase planning algorithm
|
||||
* @param stageDeadline the deadline of the stage set by the two phase
|
||||
* planning algorithm
|
||||
* @param period the periodicity with which this stage appears
|
||||
* @param user name of the user
|
||||
* @param oldId identifier of the old reservation
|
||||
*
|
||||
|
@ -55,7 +56,7 @@ public interface StageAllocator {
|
|||
Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
|
||||
RLESparseResourceAllocation planLoads,
|
||||
RLESparseResourceAllocation planModifications, ReservationRequest rr,
|
||||
long stageArrival, long stageDeadline, String user,
|
||||
long stageArrival, long stageDeadline, long period, String user,
|
||||
ReservationId oldId) throws PlanningException;
|
||||
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ public class StageAllocatorGreedy implements StageAllocator {
|
|||
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
|
||||
RLESparseResourceAllocation planLoads,
|
||||
RLESparseResourceAllocation planModifications, ReservationRequest rr,
|
||||
long stageEarliestStart, long stageDeadline, String user,
|
||||
long stageEarliestStart, long stageDeadline, long period, String user,
|
||||
ReservationId oldId) throws PlanningException {
|
||||
|
||||
Resource totalCapacity = plan.getTotalCapacity();
|
||||
|
@ -69,7 +69,7 @@ public class StageAllocatorGreedy implements StageAllocator {
|
|||
|
||||
RLESparseResourceAllocation netAvailable =
|
||||
plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
|
||||
stageDeadline, 0);
|
||||
stageDeadline, period);
|
||||
|
||||
netAvailable =
|
||||
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
|
||||
|
|
|
@ -54,7 +54,7 @@ public class StageAllocatorGreedyRLE implements StageAllocator {
|
|||
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
|
||||
RLESparseResourceAllocation planLoads,
|
||||
RLESparseResourceAllocation planModifications, ReservationRequest rr,
|
||||
long stageEarliestStart, long stageDeadline, String user,
|
||||
long stageEarliestStart, long stageDeadline, long period, String user,
|
||||
ReservationId oldId) throws PlanningException {
|
||||
|
||||
// abort early if the interval is not satisfiable
|
||||
|
@ -83,8 +83,9 @@ public class StageAllocatorGreedyRLE implements StageAllocator {
|
|||
int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
|
||||
|
||||
// get available resources from plan
|
||||
RLESparseResourceAllocation netRLERes = plan.getAvailableResourceOverTime(
|
||||
user, oldId, stageEarliestStart, stageDeadline, 0);
|
||||
RLESparseResourceAllocation netRLERes =
|
||||
plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
|
||||
stageDeadline, period);
|
||||
|
||||
// remove plan modifications
|
||||
netRLERes =
|
||||
|
|
|
@ -70,15 +70,15 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
|
|||
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
|
||||
RLESparseResourceAllocation planLoads,
|
||||
RLESparseResourceAllocation planModifications, ReservationRequest rr,
|
||||
long stageArrival, long stageDeadline, String user, ReservationId oldId)
|
||||
throws PlanningException {
|
||||
long stageArrival, long stageDeadline, long period, String user,
|
||||
ReservationId oldId) throws PlanningException {
|
||||
|
||||
// Initialize
|
||||
ResourceCalculator resCalc = plan.getResourceCalculator();
|
||||
Resource capacity = plan.getTotalCapacity();
|
||||
|
||||
RLESparseResourceAllocation netRLERes = plan.getAvailableResourceOverTime(
|
||||
user, oldId, stageArrival, stageDeadline, 0);
|
||||
user, oldId, stageArrival, stageDeadline, period);
|
||||
|
||||
long step = plan.getStep();
|
||||
|
||||
|
|
|
@ -180,8 +180,12 @@ public abstract class BaseSharingPolicyTest {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
rle.addInterval(new ReservationInterval(rStart, rEnd), alloc);
|
||||
if(rStart > rEnd){
|
||||
rle.addInterval(new ReservationInterval(rStart, period), alloc);
|
||||
rle.addInterval(new ReservationInterval(0, rEnd), alloc);
|
||||
} else {
|
||||
rle.addInterval(new ReservationInterval(rStart, rEnd), alloc);
|
||||
}
|
||||
return rle;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.Arrays;
|
|||
import java.util.Collection;
|
||||
|
||||
import net.jcip.annotations.NotThreadSafe;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
|
||||
import org.junit.Test;
|
||||
|
@ -39,63 +40,67 @@ public class TestCapacityOverTimePolicy extends BaseSharingPolicyTest {
|
|||
final static long ONEDAY = 86400 * 1000;
|
||||
final static long ONEHOUR = 3600 * 1000;
|
||||
final static long ONEMINUTE = 60 * 1000;
|
||||
final static String TWODAYPERIOD = "7200000";
|
||||
final static String TWOHOURPERIOD = "7200000";
|
||||
final static String ONEDAYPERIOD = "86400000";
|
||||
|
||||
@Parameterized.Parameters(name = "Duration {0}, height {1}," +
|
||||
" submission {2}, periodic {3})")
|
||||
" numSubmission {2}, periodic {3})")
|
||||
public static Collection<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
|
||||
// easy fit
|
||||
{ONEHOUR, 0.25, 1, null, null },
|
||||
{ONEHOUR, 0.25, 1, TWODAYPERIOD, null },
|
||||
{ONEHOUR, 0.25, 1, TWOHOURPERIOD, null },
|
||||
{ONEHOUR, 0.25, 1, ONEDAYPERIOD, null },
|
||||
|
||||
// instantaneous high, but fit integral and inst limits
|
||||
{ONEMINUTE, 0.74, 1, null, null },
|
||||
{ONEMINUTE, 0.74, 1, TWODAYPERIOD, null },
|
||||
{ONEMINUTE, 0.74, 1, TWOHOURPERIOD, null },
|
||||
{ONEMINUTE, 0.74, 1, ONEDAYPERIOD, null },
|
||||
|
||||
// barely fit
|
||||
{ONEHOUR, 0.76, 1, null, PlanningQuotaException.class },
|
||||
{ONEHOUR, 0.76, 1, TWODAYPERIOD, PlanningQuotaException.class },
|
||||
{ONEHOUR, 0.76, 1, TWOHOURPERIOD, PlanningQuotaException.class },
|
||||
{ONEHOUR, 0.76, 1, ONEDAYPERIOD, PlanningQuotaException.class },
|
||||
|
||||
// overcommit with single reservation
|
||||
{ONEHOUR, 1.1, 1, null, PlanningQuotaException.class },
|
||||
{ONEHOUR, 1.1, 1, TWODAYPERIOD, PlanningQuotaException.class },
|
||||
{ONEHOUR, 1.1, 1, TWOHOURPERIOD, PlanningQuotaException.class },
|
||||
{ONEHOUR, 1.1, 1, ONEDAYPERIOD, PlanningQuotaException.class },
|
||||
|
||||
// barely fit with multiple reservations (instantaneously, lowering to
|
||||
// 1min to fit integral)
|
||||
{ONEMINUTE, 0.25, 3, null, null },
|
||||
{ONEMINUTE, 0.25, 3, TWODAYPERIOD, null },
|
||||
{ONEMINUTE, 0.25, 3, TWOHOURPERIOD, null },
|
||||
{ONEMINUTE, 0.25, 3, ONEDAYPERIOD, null },
|
||||
|
||||
// overcommit with multiple reservations (instantaneously)
|
||||
{ONEMINUTE, 0.25, 4, null, PlanningQuotaException.class },
|
||||
{ONEMINUTE, 0.25, 4, TWODAYPERIOD, PlanningQuotaException.class },
|
||||
{ONEMINUTE, 0.25, 4, TWOHOURPERIOD, PlanningQuotaException.class },
|
||||
{ONEMINUTE, 0.25, 4, ONEDAYPERIOD, PlanningQuotaException.class },
|
||||
|
||||
// (non-periodic) reservation longer than window
|
||||
{25 * ONEHOUR, 0.25, 1, null, PlanningQuotaException.class },
|
||||
{25 * ONEHOUR, 0.25, 1, TWODAYPERIOD, PlanningQuotaException.class },
|
||||
// NOTE: we generally don't accept periodicity < duration but the test
|
||||
// generator will "wrap" this correctly
|
||||
{25 * ONEHOUR, 0.25, 1, TWOHOURPERIOD, PlanningQuotaException.class },
|
||||
{25 * ONEHOUR, 0.25, 1, ONEDAYPERIOD, PlanningQuotaException.class },
|
||||
|
||||
// (non-periodic) reservation longer than window
|
||||
{25 * ONEHOUR, 0.05, 5, null, PlanningQuotaException.class },
|
||||
{25 * ONEHOUR, 0.05, 5, TWODAYPERIOD, PlanningQuotaException.class },
|
||||
// NOTE: we generally don't accept periodicity < duration but the test
|
||||
// generator will "wrap" this correctly
|
||||
{25 * ONEHOUR, 0.05, 5, TWOHOURPERIOD, PlanningQuotaException.class },
|
||||
{25 * ONEHOUR, 0.05, 5, ONEDAYPERIOD, PlanningQuotaException.class },
|
||||
|
||||
// overcommit integral
|
||||
{ONEDAY, 0.26, 1, null, PlanningQuotaException.class },
|
||||
{2 * ONEHOUR, 0.26, 1, TWODAYPERIOD, PlanningQuotaException.class },
|
||||
{2 * ONEHOUR, 0.26, 1, TWOHOURPERIOD, PlanningQuotaException.class },
|
||||
{2 * ONEDAY, 0.26, 1, ONEDAYPERIOD, PlanningQuotaException.class },
|
||||
|
||||
// overcommit integral
|
||||
{ONEDAY / 2, 0.51, 1, null, PlanningQuotaException.class },
|
||||
{2 * ONEHOUR / 2, 0.51, 1, TWODAYPERIOD,
|
||||
{2 * ONEHOUR / 2, 0.51, 1, TWOHOURPERIOD,
|
||||
PlanningQuotaException.class },
|
||||
{2 * ONEDAY / 2, 0.51, 1, ONEDAYPERIOD, PlanningQuotaException.class }
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*******************************************************************************/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -119,6 +120,18 @@ public class TestInMemoryPlan {
|
|||
checkAllocation(plan, alloc, start, 0);
|
||||
}
|
||||
|
||||
@Test(expected = PlanningException.class)
|
||||
public void testOutOfRange() throws PlanningException {
|
||||
maxPeriodicity = 100;
|
||||
Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
|
||||
resCalc, minAlloc, maxAlloc, planName, replanner, true, maxPeriodicity,
|
||||
context, new UTCClock());
|
||||
|
||||
// we expect the plan to complaint as the range 330-150 > 50
|
||||
RLESparseResourceAllocation availableBefore =
|
||||
plan.getAvailableResourceOverTime(user, null, 150, 330, 50);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddPeriodicReservation() throws PlanningException {
|
||||
|
||||
|
@ -146,8 +159,14 @@ public class TestInMemoryPlan {
|
|||
checkAllocation(plan, alloc, start, period);
|
||||
|
||||
RLESparseResourceAllocation available =
|
||||
plan.getAvailableResourceOverTime(user, reservationID, 150, 330, 50);
|
||||
System.out.println(available);
|
||||
plan.getAvailableResourceOverTime(user, null, 130, 170, 50);
|
||||
|
||||
// the reservation has period 20 starting at 10, and the interaction with
|
||||
// the period 50 request means that every 10 we expect a "90GB" point
|
||||
assertEquals(92160, available.getCapacityAtTime(130).getMemorySize());
|
||||
assertEquals(92160, available.getCapacityAtTime(140).getMemorySize());
|
||||
assertEquals(92160, available.getCapacityAtTime(150).getMemorySize());
|
||||
|
||||
}
|
||||
|
||||
private void checkAllocation(Plan plan, int[] alloc, int start,
|
||||
|
@ -162,18 +181,18 @@ public class TestInMemoryPlan {
|
|||
for (int i = 0; i < alloc.length; i++) {
|
||||
// only one instance for non-periodic reservation
|
||||
if (periodicity <= 0) {
|
||||
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
|
||||
assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
|
||||
plan.getTotalCommittedResources(start + i));
|
||||
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
|
||||
assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
|
||||
userCons.getCapacityAtTime(start + i));
|
||||
} else {
|
||||
// periodic reservations should repeat
|
||||
long y = 0;
|
||||
Resource res = Resource.newInstance(1024 * (alloc[i]), (alloc[i]));
|
||||
while (y <= end * 2) {
|
||||
Assert.assertEquals("At time: " + start + i + y, res,
|
||||
assertEquals("At time: " + start + i + y, res,
|
||||
plan.getTotalCommittedResources(start + i + y));
|
||||
Assert.assertEquals(" At time: " + (start + i + y), res,
|
||||
assertEquals(" At time: " + (start + i + y), res,
|
||||
userCons.getCapacityAtTime(start + i + y));
|
||||
y = y + periodicity;
|
||||
}
|
||||
|
@ -253,9 +272,9 @@ public class TestInMemoryPlan {
|
|||
RLESparseResourceAllocation userCons =
|
||||
plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
|
||||
for (int i = 0; i < alloc.length; i++) {
|
||||
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
|
||||
assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
|
||||
plan.getTotalCommittedResources(start + i));
|
||||
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
|
||||
assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
|
||||
userCons.getCapacityAtTime(start + i));
|
||||
}
|
||||
|
||||
|
@ -275,9 +294,9 @@ public class TestInMemoryPlan {
|
|||
start + updatedAlloc.length);
|
||||
|
||||
for (int i = 0; i < updatedAlloc.length; i++) {
|
||||
Assert.assertEquals(Resource.newInstance(1024 * (updatedAlloc[i] + i),
|
||||
assertEquals(Resource.newInstance(1024 * (updatedAlloc[i] + i),
|
||||
updatedAlloc[i] + i), plan.getTotalCommittedResources(start + i));
|
||||
Assert.assertEquals(Resource.newInstance(1024 * (updatedAlloc[i] + i),
|
||||
assertEquals(Resource.newInstance(1024 * (updatedAlloc[i] + i),
|
||||
updatedAlloc[i] + i), userCons.getCapacityAtTime(start + i));
|
||||
}
|
||||
}
|
||||
|
@ -371,10 +390,10 @@ public class TestInMemoryPlan {
|
|||
plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
|
||||
|
||||
for (int i = 0; i < alloc.length; i++) {
|
||||
Assert.assertEquals(
|
||||
assertEquals(
|
||||
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
|
||||
plan.getTotalCommittedResources(start + i));
|
||||
Assert.assertEquals(
|
||||
assertEquals(
|
||||
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
|
||||
userCons.getCapacityAtTime(start + i));
|
||||
}
|
||||
|
@ -389,9 +408,9 @@ public class TestInMemoryPlan {
|
|||
userCons =
|
||||
plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
|
||||
for (int i = 0; i < alloc.length; i++) {
|
||||
Assert.assertEquals(Resource.newInstance(0, 0),
|
||||
assertEquals(Resource.newInstance(0, 0),
|
||||
plan.getTotalCommittedResources(start + i));
|
||||
Assert.assertEquals(Resource.newInstance(0, 0),
|
||||
assertEquals(Resource.newInstance(0, 0),
|
||||
userCons.getCapacityAtTime(start + i));
|
||||
}
|
||||
}
|
||||
|
@ -492,11 +511,11 @@ public class TestInMemoryPlan {
|
|||
plan.getConsumptionForUserOverTime(user, start, start + alloc2.length);
|
||||
|
||||
for (int i = 0; i < alloc2.length; i++) {
|
||||
Assert.assertEquals(
|
||||
assertEquals(
|
||||
Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i),
|
||||
alloc1[i] + alloc2[i] + i),
|
||||
plan.getTotalCommittedResources(start + i));
|
||||
Assert.assertEquals(
|
||||
assertEquals(
|
||||
Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i),
|
||||
alloc1[i] + alloc2[i] + i),
|
||||
userCons.getCapacityAtTime(start + i));
|
||||
|
@ -530,9 +549,9 @@ public class TestInMemoryPlan {
|
|||
|
||||
Assert.assertNull(plan.getReservationById(reservationID1));
|
||||
for (int i = 0; i < alloc1.length; i++) {
|
||||
Assert.assertEquals(Resource.newInstance(0, 0),
|
||||
assertEquals(Resource.newInstance(0, 0),
|
||||
plan.getTotalCommittedResources(start + i));
|
||||
Assert.assertEquals(Resource.newInstance(0, 0),
|
||||
assertEquals(Resource.newInstance(0, 0),
|
||||
userCons.getCapacityAtTime(start + i));
|
||||
}
|
||||
}
|
||||
|
@ -721,16 +740,16 @@ public class TestInMemoryPlan {
|
|||
private void doAssertions(Plan plan, ReservationAllocation rAllocation) {
|
||||
ReservationId reservationID = rAllocation.getReservationId();
|
||||
Assert.assertNotNull(plan.getReservationById(reservationID));
|
||||
Assert.assertEquals(rAllocation, plan.getReservationById(reservationID));
|
||||
assertEquals(rAllocation, plan.getReservationById(reservationID));
|
||||
Assert.assertTrue(((InMemoryPlan) plan).getAllReservations().size() == 1);
|
||||
if (rAllocation.getPeriodicity() <= 0) {
|
||||
Assert.assertEquals(rAllocation.getEndTime(), plan.getLastEndTime());
|
||||
assertEquals(rAllocation.getEndTime(), plan.getLastEndTime());
|
||||
}
|
||||
Assert.assertEquals(totalCapacity, plan.getTotalCapacity());
|
||||
Assert.assertEquals(minAlloc, plan.getMinimumAllocation());
|
||||
Assert.assertEquals(maxAlloc, plan.getMaximumAllocation());
|
||||
Assert.assertEquals(resCalc, plan.getResourceCalculator());
|
||||
Assert.assertEquals(planName, plan.getQueueName());
|
||||
assertEquals(totalCapacity, plan.getTotalCapacity());
|
||||
assertEquals(minAlloc, plan.getMinimumAllocation());
|
||||
assertEquals(maxAlloc, plan.getMaximumAllocation());
|
||||
assertEquals(resCalc, plan.getResourceCalculator());
|
||||
assertEquals(planName, plan.getQueueName());
|
||||
Assert.assertTrue(plan.getMoveOnExpiry());
|
||||
}
|
||||
|
||||
|
|
|
@ -26,11 +26,14 @@ import static org.mockito.Mockito.mock;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import net.jcip.annotations.NotThreadSafe;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||
|
@ -54,11 +57,27 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
|||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
/**
|
||||
* This class tests the {@code AlignedPlannerWithGreedy} agent.
|
||||
*/
|
||||
@RunWith(value = Parameterized.class)
|
||||
@NotThreadSafe
|
||||
@SuppressWarnings("VisibilityModifier")
|
||||
public class TestAlignedPlanner {
|
||||
|
||||
@Parameterized.Parameter(value = 0)
|
||||
public String recurrenceExpression;
|
||||
|
||||
final static String NONPERIODIC = "0";
|
||||
final static String THREEHOURPERIOD = "10800000";
|
||||
final static String ONEDAYPERIOD = "86400000";
|
||||
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(TestAlignedPlanner.class);
|
||||
|
||||
|
@ -72,6 +91,16 @@ public class TestAlignedPlanner {
|
|||
private Resource clusterCapacity;
|
||||
private long step;
|
||||
|
||||
|
||||
@Parameterized.Parameters(name = "Testing: periodicity {0})")
|
||||
public static Collection<Object[]> data() {
|
||||
return Arrays.asList(new Object[][]{
|
||||
{NONPERIODIC},
|
||||
{THREEHOURPERIOD},
|
||||
{ONEDAYPERIOD}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleReservationAccept() throws PlanningException {
|
||||
|
||||
|
@ -107,6 +136,9 @@ public class TestAlignedPlanner {
|
|||
assertTrue(alloc1.toString(),
|
||||
check(alloc1, 10 * step, 20 * step, 10, 2048, 2));
|
||||
|
||||
System.out.println("--------AFTER AGENT----------");
|
||||
System.out.println(plan.toString());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1139,9 +1171,11 @@ public class TestAlignedPlanner {
|
|||
long deadline, ReservationRequest[] reservationRequests,
|
||||
ReservationRequestInterpreter rType, String username) {
|
||||
|
||||
return ReservationDefinition.newInstance(arrival, deadline,
|
||||
ReservationRequests.newInstance(Arrays.asList(reservationRequests),
|
||||
rType), username);
|
||||
|
||||
return ReservationDefinition.newInstance(arrival,
|
||||
deadline, ReservationRequests
|
||||
.newInstance(Arrays.asList(reservationRequests), rType),
|
||||
username, recurrenceExpression, Priority.UNDEFINED);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -59,13 +59,20 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
@SuppressWarnings("VisibilityModifier")
|
||||
public class TestGreedyReservationAgent {
|
||||
|
||||
|
||||
@Parameterized.Parameter(value = 0)
|
||||
public boolean allocateLeft;
|
||||
|
||||
@Parameterized.Parameter(value = 1)
|
||||
public String recurrenceExpression;
|
||||
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(TestGreedyReservationAgent.class);
|
||||
|
||||
|
@ -76,16 +83,18 @@ public class TestGreedyReservationAgent {
|
|||
Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
|
||||
Random rand = new Random();
|
||||
long step;
|
||||
boolean allocateLeft;
|
||||
|
||||
public TestGreedyReservationAgent(Boolean b){
|
||||
this.allocateLeft = b;
|
||||
}
|
||||
|
||||
@Parameters
|
||||
@Parameterized.Parameters(name = "Testing: allocateLeft {0}," +
|
||||
" recurrenceExpression {1})")
|
||||
public static Collection<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{true}, {false}});
|
||||
{true, "0"},
|
||||
{false, "0"},
|
||||
{true, "7200000"},
|
||||
{false, "7200000"},
|
||||
{true, "86400000"},
|
||||
{false, "86400000"}
|
||||
});
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -134,6 +143,7 @@ public class TestGreedyReservationAgent {
|
|||
ReservationDefinition rr = new ReservationDefinitionPBImpl();
|
||||
rr.setArrival(5 * step);
|
||||
rr.setDeadline(20 * step);
|
||||
rr.setRecurrenceExpression(recurrenceExpression);
|
||||
ReservationRequest r = ReservationRequest.newInstance(
|
||||
Resource.newInstance(2048, 2), 10, 5, 10 * step);
|
||||
ReservationRequests reqs = new ReservationRequestsPBImpl();
|
||||
|
@ -193,6 +203,7 @@ public class TestGreedyReservationAgent {
|
|||
ReservationDefinition rr = new ReservationDefinitionPBImpl();
|
||||
rr.setArrival(5 * step);
|
||||
rr.setDeadline(100 * step);
|
||||
rr.setRecurrenceExpression(recurrenceExpression);
|
||||
ReservationRequest r =
|
||||
ReservationRequest.newInstance(Resource.newInstance(2048, 2), 20, 20,
|
||||
10 * step);
|
||||
|
@ -283,8 +294,9 @@ public class TestGreedyReservationAgent {
|
|||
int[] f = { 100, 100 };
|
||||
|
||||
ReservationDefinition rDef =
|
||||
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||
30 * step, 30 * step + f.length * step, f.length * step);
|
||||
ReservationSystemTestUtil.createSimpleReservationDefinition(30 * step,
|
||||
30 * step + f.length * step, f.length * step, 1,
|
||||
recurrenceExpression);
|
||||
assertTrue(plan.toString(),
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||
|
@ -296,6 +308,7 @@ public class TestGreedyReservationAgent {
|
|||
ReservationDefinition rr = new ReservationDefinitionPBImpl();
|
||||
rr.setArrival(0 * step);
|
||||
rr.setDeadline(70 * step);
|
||||
rr.setRecurrenceExpression(recurrenceExpression);
|
||||
ReservationRequests reqs = new ReservationRequestsPBImpl();
|
||||
reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER);
|
||||
ReservationRequest r = ReservationRequest.newInstance(
|
||||
|
@ -346,10 +359,11 @@ public class TestGreedyReservationAgent {
|
|||
prepareBasicPlan();
|
||||
// create a completely utilized segment at time 30
|
||||
int[] f = { 100, 100 };
|
||||
ReservationDefinition rDef =
|
||||
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||
30, 30 * step + f.length * step, f.length * step);
|
||||
assertTrue(plan.toString(),
|
||||
ReservationDefinition rDef = ReservationSystemTestUtil
|
||||
.createSimpleReservationDefinition(30, 30 * step + f.length * step,
|
||||
f.length * step, 1, recurrenceExpression);
|
||||
assertTrue(
|
||||
plan.toString(),
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||
"dedicated", 30 * step, 30 * step + f.length * step,
|
||||
|
@ -406,6 +420,7 @@ public class TestGreedyReservationAgent {
|
|||
ReservationDefinition rr = new ReservationDefinitionPBImpl();
|
||||
rr.setArrival(0 * step);
|
||||
rr.setDeadline(60 * step);
|
||||
rr.setRecurrenceExpression(recurrenceExpression);
|
||||
ReservationRequests reqs = new ReservationRequestsPBImpl();
|
||||
reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
|
||||
ReservationRequest r = ReservationRequest.newInstance(
|
||||
|
@ -453,6 +468,7 @@ public class TestGreedyReservationAgent {
|
|||
ReservationDefinition rr = new ReservationDefinitionPBImpl();
|
||||
rr.setArrival(100 * step);
|
||||
rr.setDeadline(120 * step);
|
||||
rr.setRecurrenceExpression(recurrenceExpression);
|
||||
ReservationRequests reqs = new ReservationRequestsPBImpl();
|
||||
reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
|
||||
ReservationRequest r = ReservationRequest.newInstance(
|
||||
|
@ -494,6 +510,7 @@ public class TestGreedyReservationAgent {
|
|||
ReservationDefinition rr = new ReservationDefinitionPBImpl();
|
||||
rr.setArrival(100 * step);
|
||||
rr.setDeadline(120 * step);
|
||||
rr.setRecurrenceExpression(recurrenceExpression);
|
||||
ReservationRequests reqs = new ReservationRequestsPBImpl();
|
||||
reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
|
||||
ReservationRequest r = ReservationRequest.newInstance(
|
||||
|
@ -542,6 +559,7 @@ public class TestGreedyReservationAgent {
|
|||
ReservationDefinition rr = new ReservationDefinitionPBImpl();
|
||||
rr.setArrival(100L);
|
||||
rr.setDeadline(120L);
|
||||
rr.setRecurrenceExpression(recurrenceExpression);
|
||||
ReservationRequests reqs = new ReservationRequestsPBImpl();
|
||||
reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
|
||||
|
||||
|
@ -587,6 +605,7 @@ public class TestGreedyReservationAgent {
|
|||
ReservationDefinition rr = new ReservationDefinitionPBImpl();
|
||||
rr.setArrival(100 * step);
|
||||
rr.setDeadline(120 * step);
|
||||
rr.setRecurrenceExpression(recurrenceExpression);
|
||||
ReservationRequests reqs = new ReservationRequestsPBImpl();
|
||||
reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
|
||||
ReservationRequest r = ReservationRequest.newInstance(
|
||||
|
@ -635,6 +654,7 @@ public class TestGreedyReservationAgent {
|
|||
ReservationDefinition rr = new ReservationDefinitionPBImpl();
|
||||
rr.setArrival(100L);
|
||||
rr.setDeadline(120L);
|
||||
rr.setRecurrenceExpression(recurrenceExpression);
|
||||
ReservationRequests reqs = new ReservationRequestsPBImpl();
|
||||
reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
|
||||
ReservationRequest r = ReservationRequest.newInstance(
|
||||
|
@ -759,25 +779,4 @@ public class TestGreedyReservationAgent {
|
|||
+ " in " + (end - start) + "ms");
|
||||
}
|
||||
|
||||
public static void main(String[] arg) {
|
||||
|
||||
boolean left = false;
|
||||
// run a stress test with by default 1000 random jobs
|
||||
int numJobs = 1000;
|
||||
if (arg.length > 0) {
|
||||
numJobs = Integer.parseInt(arg[0]);
|
||||
}
|
||||
if (arg.length > 1) {
|
||||
left = Boolean.parseBoolean(arg[1]);
|
||||
}
|
||||
|
||||
try {
|
||||
TestGreedyReservationAgent test = new TestGreedyReservationAgent(left);
|
||||
test.setup();
|
||||
test.testStress(numJobs);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,213 @@
|
|||
/*****************************************************************************
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*****************************************************************************/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequests;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Random;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
* General purpose ReservationAgent tester.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
@SuppressWarnings("VisibilityModifier")
|
||||
public class TestReservationAgents {
|
||||
|
||||
@Parameterized.Parameter(value = 0)
|
||||
public Class agentClass;
|
||||
|
||||
@Parameterized.Parameter(value = 1)
|
||||
public boolean allocateLeft;
|
||||
|
||||
@Parameterized.Parameter(value = 2)
|
||||
public String recurrenceExpression;
|
||||
|
||||
@Parameterized.Parameter(value = 3)
|
||||
public int numOfNodes;
|
||||
|
||||
private long step;
|
||||
private Random rand = new Random(2);
|
||||
private ReservationAgent agent;
|
||||
private Plan plan;
|
||||
private ResourceCalculator resCalc = new DefaultResourceCalculator();
|
||||
private Resource minAlloc = Resource.newInstance(1024, 1);
|
||||
private Resource maxAlloc = Resource.newInstance(32 * 1023, 32);
|
||||
|
||||
private long timeHorizon = 2 * 24 * 3600 * 1000; // 2 days
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestReservationAgents.class);
|
||||
|
||||
@Parameterized.Parameters(name = "Testing: agent {0}, allocateLeft: {1}," +
|
||||
" recurrenceExpression: {2}, numNodes: {3})")
|
||||
public static Collection<Object[]> data() {
|
||||
return Arrays.asList(
|
||||
new Object[][] {{GreedyReservationAgent.class, true, "0", 100 },
|
||||
{GreedyReservationAgent.class, false, "0", 100 },
|
||||
{GreedyReservationAgent.class, true, "7200000", 100 },
|
||||
{GreedyReservationAgent.class, false, "7200000", 100 },
|
||||
{GreedyReservationAgent.class, true, "86400000", 100 },
|
||||
{GreedyReservationAgent.class, false, "86400000", 100 },
|
||||
{AlignedPlannerWithGreedy.class, true, "0", 100 },
|
||||
{AlignedPlannerWithGreedy.class, false, "0", 100 },
|
||||
{AlignedPlannerWithGreedy.class, true, "7200000", 100 },
|
||||
{AlignedPlannerWithGreedy.class, false, "7200000", 100 },
|
||||
{AlignedPlannerWithGreedy.class, true, "86400000", 100 },
|
||||
{AlignedPlannerWithGreedy.class, false, "86400000", 100 } });
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
|
||||
long seed = rand.nextLong();
|
||||
rand.setSeed(seed);
|
||||
LOG.info("Running with seed: " + seed);
|
||||
|
||||
// setting completely loose quotas
|
||||
long timeWindow = 1000000L;
|
||||
Resource clusterCapacity =
|
||||
Resource.newInstance(numOfNodes * 1024, numOfNodes);
|
||||
step = 1000L;
|
||||
String reservationQ =
|
||||
ReservationSystemTestUtil.getFullReservationQueueName();
|
||||
|
||||
float instConstraint = 100;
|
||||
float avgConstraint = 100;
|
||||
|
||||
ReservationSchedulerConfiguration conf = ReservationSystemTestUtil
|
||||
.createConf(reservationQ, timeWindow, instConstraint, avgConstraint);
|
||||
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
|
||||
policy.init(reservationQ, conf);
|
||||
|
||||
// setting conf to
|
||||
conf.setBoolean(GreedyReservationAgent.FAVOR_EARLY_ALLOCATION,
|
||||
allocateLeft);
|
||||
agent = (ReservationAgent) agentClass.newInstance();
|
||||
agent.init(conf);
|
||||
|
||||
QueueMetrics queueMetrics = mock(QueueMetrics.class);
|
||||
RMContext context = ReservationSystemTestUtil.createMockRMContext();
|
||||
|
||||
plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
|
||||
resCalc, minAlloc, maxAlloc, "dedicated", null, true, context);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
|
||||
long period = Long.parseLong(recurrenceExpression);
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
ReservationDefinition rr = createRandomRequest(i);
|
||||
if (rr != null) {
|
||||
ReservationId reservationID =
|
||||
ReservationSystemTestUtil.getNewReservationId();
|
||||
try {
|
||||
agent.createReservation(reservationID, "u1", plan, rr);
|
||||
} catch (PlanningException p) {
|
||||
// happens
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private ReservationDefinition createRandomRequest(int i)
|
||||
throws PlanningException {
|
||||
long arrival = (long) Math.floor(rand.nextDouble() * timeHorizon);
|
||||
long period = Long.parseLong(recurrenceExpression);
|
||||
|
||||
// min between period and rand around 30min
|
||||
long duration =
|
||||
(long) Math.round(Math.min(rand.nextDouble() * 3600 * 1000, period));
|
||||
|
||||
// min between period and rand around 5x duration
|
||||
long deadline = (long) Math
|
||||
.ceil(arrival + Math.min(duration * rand.nextDouble() * 10, period));
|
||||
|
||||
assert((deadline - arrival) <= period);
|
||||
|
||||
RLESparseResourceAllocation available = plan
|
||||
.getAvailableResourceOverTime("u1", null, arrival, deadline, period);
|
||||
NavigableMap<Long, Resource> availableMap = available.getCumulative();
|
||||
|
||||
// look at available space, and for each segment, use half of it with 50%
|
||||
// probability
|
||||
List<ReservationRequest> reservationRequests = new ArrayList<>();
|
||||
for (Map.Entry<Long, Resource> e : availableMap.entrySet()) {
|
||||
if (e.getValue() != null && rand.nextDouble() > 0.001) {
|
||||
int numContainers = (int) Math.ceil(Resources.divide(resCalc,
|
||||
plan.getTotalCapacity(), e.getValue(), minAlloc) / 2);
|
||||
long tempDur =
|
||||
Math.min(duration, availableMap.higherKey(e.getKey()) - e.getKey());
|
||||
reservationRequests.add(ReservationRequest.newInstance(minAlloc,
|
||||
numContainers, 1, tempDur));
|
||||
}
|
||||
}
|
||||
|
||||
if (reservationRequests.size() < 1) {
|
||||
return null;
|
||||
}
|
||||
|
||||
ReservationDefinition rr = new ReservationDefinitionPBImpl();
|
||||
rr.setArrival(arrival);
|
||||
rr.setDeadline(deadline);
|
||||
rr.setRecurrenceExpression(recurrenceExpression);
|
||||
ReservationRequests reqs = new ReservationRequestsPBImpl();
|
||||
reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER);
|
||||
reqs.setReservationResources(reservationRequests);
|
||||
rr.setReservationRequests(reqs);
|
||||
rr.setReservationName("res_" + i);
|
||||
|
||||
return rr;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue