YARN-5329. Placement Agent enhancements required to support recurring reservations in ReservationSystem. (Carlo Curino via Subru).

(cherry picked from commit e6e614e380)
This commit is contained in:
Subru Krishnan 2017-10-04 19:28:27 -07:00
parent 7836a6b59a
commit 7fd4a997f4
15 changed files with 443 additions and 128 deletions

View File

@ -723,6 +723,12 @@ public class InMemoryPlan implements Plan {
+ periodicRle.getTimePeriod() + ")");
}
if (period < (end - start)) {
throw new PlanningException(
"Invalid input: (end - start) = (" + end + " - " + start + ") = "
+ (end - start) + " > period = " + period);
}
// find the minimum resources available among all the instances that fit
// in the LCM
long numInstInLCM = periodicRle.getTimePeriod() / period;

View File

@ -221,7 +221,8 @@ public class PeriodicRLESparseResourceAllocation
NavigableMap<Long, Resource> cumulativeMap = this.getCumulative();
Long previous = cumulativeMap.floorKey(relativeStart);
previous = (previous != null) ? previous : 0;
for (long i = 0; i <= (end - start) / timePeriod; i++) {
//make sure to go one past end, to catch end times extending past period
for (long i = 0; i <= 1 + (end - start) / timePeriod; i++) {
for (Map.Entry<Long, Resource> e : cumulativeMap.entrySet()) {
long curKey = e.getKey() + (i * timePeriod);
if (curKey >= previous && (start + curKey - relativeStart) <= end) {

View File

@ -423,7 +423,8 @@ public class RLESparseResourceAllocation {
Resource outRes) {
if (out.isEmpty() || (out.lastEntry() != null && outRes == null)
|| !Resources.equals(out.lastEntry().getValue(), outRes)) {
|| (out.lastEntry().getValue() != null
&& !Resources.equals(out.lastEntry().getValue(), outRes))) {
out.put(time, outRes);
}
@ -460,7 +461,8 @@ public class RLESparseResourceAllocation {
if (!Resources.fitsIn(b, a)) {
throw new PlanningException(
"RLESparseResourceAllocation: merge failed as the "
+ "resulting RLESparseResourceAllocation would be negative");
+ "resulting RLESparseResourceAllocation would "
+ "be negative, when testing: (" + eB + ") > (" + eA + ")");
} else {
return Resources.subtract(a, b);
}

View File

@ -98,6 +98,12 @@ public class IterativePlanner extends PlanningAlgorithm {
// Current stage
ReservationRequest currentReservationStage;
// initialize periodicity
long period = 0;
if(reservation.getRecurrenceExpression() != null){
period = Long.parseLong(reservation.getRecurrenceExpression());
}
// Iterate the stages in reverse order
while (stageProvider.hasNext()) {
@ -117,7 +123,7 @@ public class IterativePlanner extends PlanningAlgorithm {
// Compute stage allocation
Map<ReservationInterval, Resource> curAlloc =
computeStageAllocation(plan, currentReservationStage, stageArrival,
stageDeadline, user, reservationId);
stageDeadline, period, user, reservationId);
// If we did not find an allocation, return NULL
// (unless it's an ANY job, then we simply continue).
@ -216,11 +222,10 @@ public class IterativePlanner extends PlanningAlgorithm {
planLoads = plan.getCumulativeLoadOverTime(jobArrival, jobDeadline);
ReservationAllocation oldRes = plan.getReservationById(reservationId);
if (oldRes != null) {
planLoads =
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
plan.getTotalCapacity(), planLoads,
oldRes.getResourcesOverTime(), RLEOperator.subtract, jobArrival,
jobDeadline);
planLoads = RLESparseResourceAllocation.merge(
plan.getResourceCalculator(), plan.getTotalCapacity(), planLoads,
oldRes.getResourcesOverTime(jobArrival, jobDeadline),
RLEOperator.subtract, jobArrival, jobDeadline);
}
}
@ -309,13 +314,13 @@ public class IterativePlanner extends PlanningAlgorithm {
}
// Call algStageAllocator
protected Map<ReservationInterval, Resource> computeStageAllocation(
Plan plan, ReservationRequest rr, long stageArrivalTime,
long stageDeadline, String user, ReservationId oldId)
throws PlanningException {
protected Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
ReservationRequest rr, long stageArrivalTime, long stageDeadline,
long period, String user, ReservationId oldId) throws PlanningException {
return algStageAllocator.computeStageAllocation(plan, planLoads,
planModifications, rr, stageArrivalTime, stageDeadline, user, oldId);
planModifications, rr, stageArrivalTime, stageDeadline, period, user,
oldId);
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.PeriodicRLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
@ -63,21 +64,33 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
// Compute the job allocation
RLESparseResourceAllocation allocation =
computeJobAllocation(plan, reservationId, adjustedContract, user);
computeJobAllocation(plan, reservationId, adjustedContract, user);
long period = Long.parseLong(contract.getRecurrenceExpression());
// Make allocation periodic if request is periodic
if (contract.getRecurrenceExpression() != null) {
if (period > 0) {
allocation =
new PeriodicRLESparseResourceAllocation(allocation, period);
}
}
// If no job allocation was found, fail
if (allocation == null) {
throw new PlanningException(
"The planning algorithm could not find a valid allocation"
+ " for your request");
"The planning algorithm could not find a valid allocation"
+ " for your request");
}
// Translate the allocation to a map (with zero paddings)
long step = plan.getStep();
long jobArrival = stepRoundUp(adjustedContract.getArrival(), step);
long jobDeadline = stepRoundUp(adjustedContract.getDeadline(), step);
Map<ReservationInterval, Resource> mapAllocations =
allocationsToPaddedMap(allocation, jobArrival, jobDeadline);
allocationsToPaddedMap(allocation, jobArrival, jobDeadline, period);
// Create the reservation
ReservationAllocation capReservation =
@ -85,8 +98,7 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
adjustedContract, // Contract
user, // User name
plan.getQueueName(), // Queue name
findEarliestTime(mapAllocations), // Earliest start time
findLatestTime(mapAllocations), // Latest end time
adjustedContract.getArrival(), adjustedContract.getDeadline(),
mapAllocations, // Allocations
plan.getResourceCalculator(), // Resource calculator
plan.getMinimumAllocation()); // Minimum allocation
@ -100,33 +112,46 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
}
private Map<ReservationInterval, Resource>
allocationsToPaddedMap(RLESparseResourceAllocation allocation,
long jobArrival, long jobDeadline) {
// Allocate
Map<ReservationInterval, Resource> mapAllocations =
allocation.toIntervalMap();
private Map<ReservationInterval, Resource> allocationsToPaddedMap(
RLESparseResourceAllocation allocation, long jobArrival, long jobDeadline,
long period) {
// Zero allocation
Resource zeroResource = Resource.newInstance(0, 0);
// Pad at the beginning
long earliestStart = findEarliestTime(mapAllocations);
if (jobArrival < earliestStart) {
mapAllocations.put(new ReservationInterval(jobArrival, earliestStart),
zeroResource);
if (period > 0) {
if ((jobDeadline - jobArrival) >= period) {
allocation.addInterval(new ReservationInterval(0L, period),
zeroResource);
}
jobArrival = jobArrival % period;
jobDeadline = jobDeadline % period;
if (jobArrival <= jobDeadline) {
allocation.addInterval(new ReservationInterval(0, jobArrival),
zeroResource);
allocation.addInterval(new ReservationInterval(jobDeadline, period),
zeroResource);
} else {
allocation.addInterval(new ReservationInterval(jobDeadline, jobArrival),
zeroResource);
}
} else {
// Pad at the beginning
long earliestStart = findEarliestTime(allocation.toIntervalMap());
if (jobArrival < earliestStart) {
allocation.addInterval(
new ReservationInterval(jobArrival, earliestStart), zeroResource);
}
// Pad at the beginning
long latestEnd = findLatestTime(allocation.toIntervalMap());
if (latestEnd < jobDeadline) {
allocation.addInterval(new ReservationInterval(latestEnd, jobDeadline),
zeroResource);
}
}
// Pad at the beginning
long latestEnd = findLatestTime(mapAllocations);
if (latestEnd < jobDeadline) {
mapAllocations.put(new ReservationInterval(latestEnd, jobDeadline),
zeroResource);
}
return mapAllocations;
return allocation.toIntervalMap();
}
public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan,

View File

@ -45,6 +45,7 @@ public interface StageAllocator {
* the stage by the two phase planning algorithm
* @param stageDeadline the deadline of the stage set by the two phase
* planning algorithm
* @param period the periodicity with which this stage appears
* @param user name of the user
* @param oldId identifier of the old reservation
*
@ -55,7 +56,7 @@ public interface StageAllocator {
Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageArrival, long stageDeadline, String user,
long stageArrival, long stageDeadline, long period, String user,
ReservationId oldId) throws PlanningException;
}

View File

@ -43,7 +43,7 @@ public class StageAllocatorGreedy implements StageAllocator {
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageEarliestStart, long stageDeadline, String user,
long stageEarliestStart, long stageDeadline, long period, String user,
ReservationId oldId) throws PlanningException {
Resource totalCapacity = plan.getTotalCapacity();
@ -69,7 +69,7 @@ public class StageAllocatorGreedy implements StageAllocator {
RLESparseResourceAllocation netAvailable =
plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
stageDeadline, 0);
stageDeadline, period);
netAvailable =
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),

View File

@ -54,7 +54,7 @@ public class StageAllocatorGreedyRLE implements StageAllocator {
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageEarliestStart, long stageDeadline, String user,
long stageEarliestStart, long stageDeadline, long period, String user,
ReservationId oldId) throws PlanningException {
// abort early if the interval is not satisfiable
@ -83,8 +83,9 @@ public class StageAllocatorGreedyRLE implements StageAllocator {
int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
// get available resources from plan
RLESparseResourceAllocation netRLERes = plan.getAvailableResourceOverTime(
user, oldId, stageEarliestStart, stageDeadline, 0);
RLESparseResourceAllocation netRLERes =
plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
stageDeadline, period);
// remove plan modifications
netRLERes =

View File

@ -70,15 +70,15 @@ public class StageAllocatorLowCostAligned implements StageAllocator {
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
RLESparseResourceAllocation planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageArrival, long stageDeadline, String user, ReservationId oldId)
throws PlanningException {
long stageArrival, long stageDeadline, long period, String user,
ReservationId oldId) throws PlanningException {
// Initialize
ResourceCalculator resCalc = plan.getResourceCalculator();
Resource capacity = plan.getTotalCapacity();
RLESparseResourceAllocation netRLERes = plan.getAvailableResourceOverTime(
user, oldId, stageArrival, stageDeadline, 0);
user, oldId, stageArrival, stageDeadline, period);
long step = plan.getStep();

View File

@ -180,8 +180,12 @@ public abstract class BaseSharingPolicyTest {
}
}
rle.addInterval(new ReservationInterval(rStart, rEnd), alloc);
if(rStart > rEnd){
rle.addInterval(new ReservationInterval(rStart, period), alloc);
rle.addInterval(new ReservationInterval(0, rEnd), alloc);
} else {
rle.addInterval(new ReservationInterval(rStart, rEnd), alloc);
}
return rle;
}

View File

@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.Collection;
import net.jcip.annotations.NotThreadSafe;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
import org.junit.Test;
@ -39,63 +40,67 @@ public class TestCapacityOverTimePolicy extends BaseSharingPolicyTest {
final static long ONEDAY = 86400 * 1000;
final static long ONEHOUR = 3600 * 1000;
final static long ONEMINUTE = 60 * 1000;
final static String TWODAYPERIOD = "7200000";
final static String TWOHOURPERIOD = "7200000";
final static String ONEDAYPERIOD = "86400000";
@Parameterized.Parameters(name = "Duration {0}, height {1}," +
" submission {2}, periodic {3})")
" numSubmission {2}, periodic {3})")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
// easy fit
{ONEHOUR, 0.25, 1, null, null },
{ONEHOUR, 0.25, 1, TWODAYPERIOD, null },
{ONEHOUR, 0.25, 1, TWOHOURPERIOD, null },
{ONEHOUR, 0.25, 1, ONEDAYPERIOD, null },
// instantaneous high, but fit integral and inst limits
{ONEMINUTE, 0.74, 1, null, null },
{ONEMINUTE, 0.74, 1, TWODAYPERIOD, null },
{ONEMINUTE, 0.74, 1, TWOHOURPERIOD, null },
{ONEMINUTE, 0.74, 1, ONEDAYPERIOD, null },
// barely fit
{ONEHOUR, 0.76, 1, null, PlanningQuotaException.class },
{ONEHOUR, 0.76, 1, TWODAYPERIOD, PlanningQuotaException.class },
{ONEHOUR, 0.76, 1, TWOHOURPERIOD, PlanningQuotaException.class },
{ONEHOUR, 0.76, 1, ONEDAYPERIOD, PlanningQuotaException.class },
// overcommit with single reservation
{ONEHOUR, 1.1, 1, null, PlanningQuotaException.class },
{ONEHOUR, 1.1, 1, TWODAYPERIOD, PlanningQuotaException.class },
{ONEHOUR, 1.1, 1, TWOHOURPERIOD, PlanningQuotaException.class },
{ONEHOUR, 1.1, 1, ONEDAYPERIOD, PlanningQuotaException.class },
// barely fit with multiple reservations (instantaneously, lowering to
// 1min to fit integral)
{ONEMINUTE, 0.25, 3, null, null },
{ONEMINUTE, 0.25, 3, TWODAYPERIOD, null },
{ONEMINUTE, 0.25, 3, TWOHOURPERIOD, null },
{ONEMINUTE, 0.25, 3, ONEDAYPERIOD, null },
// overcommit with multiple reservations (instantaneously)
{ONEMINUTE, 0.25, 4, null, PlanningQuotaException.class },
{ONEMINUTE, 0.25, 4, TWODAYPERIOD, PlanningQuotaException.class },
{ONEMINUTE, 0.25, 4, TWOHOURPERIOD, PlanningQuotaException.class },
{ONEMINUTE, 0.25, 4, ONEDAYPERIOD, PlanningQuotaException.class },
// (non-periodic) reservation longer than window
{25 * ONEHOUR, 0.25, 1, null, PlanningQuotaException.class },
{25 * ONEHOUR, 0.25, 1, TWODAYPERIOD, PlanningQuotaException.class },
// NOTE: we generally don't accept periodicity < duration but the test
// generator will "wrap" this correctly
{25 * ONEHOUR, 0.25, 1, TWOHOURPERIOD, PlanningQuotaException.class },
{25 * ONEHOUR, 0.25, 1, ONEDAYPERIOD, PlanningQuotaException.class },
// (non-periodic) reservation longer than window
{25 * ONEHOUR, 0.05, 5, null, PlanningQuotaException.class },
{25 * ONEHOUR, 0.05, 5, TWODAYPERIOD, PlanningQuotaException.class },
// NOTE: we generally don't accept periodicity < duration but the test
// generator will "wrap" this correctly
{25 * ONEHOUR, 0.05, 5, TWOHOURPERIOD, PlanningQuotaException.class },
{25 * ONEHOUR, 0.05, 5, ONEDAYPERIOD, PlanningQuotaException.class },
// overcommit integral
{ONEDAY, 0.26, 1, null, PlanningQuotaException.class },
{2 * ONEHOUR, 0.26, 1, TWODAYPERIOD, PlanningQuotaException.class },
{2 * ONEHOUR, 0.26, 1, TWOHOURPERIOD, PlanningQuotaException.class },
{2 * ONEDAY, 0.26, 1, ONEDAYPERIOD, PlanningQuotaException.class },
// overcommit integral
{ONEDAY / 2, 0.51, 1, null, PlanningQuotaException.class },
{2 * ONEHOUR / 2, 0.51, 1, TWODAYPERIOD,
{2 * ONEHOUR / 2, 0.51, 1, TWOHOURPERIOD,
PlanningQuotaException.class },
{2 * ONEDAY / 2, 0.51, 1, ONEDAYPERIOD, PlanningQuotaException.class }

View File

@ -17,6 +17,7 @@
*******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -119,6 +120,18 @@ public class TestInMemoryPlan {
checkAllocation(plan, alloc, start, 0);
}
@Test(expected = PlanningException.class)
public void testOutOfRange() throws PlanningException {
maxPeriodicity = 100;
Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true, maxPeriodicity,
context, new UTCClock());
// we expect the plan to complaint as the range 330-150 > 50
RLESparseResourceAllocation availableBefore =
plan.getAvailableResourceOverTime(user, null, 150, 330, 50);
}
@Test
public void testAddPeriodicReservation() throws PlanningException {
@ -146,8 +159,14 @@ public class TestInMemoryPlan {
checkAllocation(plan, alloc, start, period);
RLESparseResourceAllocation available =
plan.getAvailableResourceOverTime(user, reservationID, 150, 330, 50);
System.out.println(available);
plan.getAvailableResourceOverTime(user, null, 130, 170, 50);
// the reservation has period 20 starting at 10, and the interaction with
// the period 50 request means that every 10 we expect a "90GB" point
assertEquals(92160, available.getCapacityAtTime(130).getMemorySize());
assertEquals(92160, available.getCapacityAtTime(140).getMemorySize());
assertEquals(92160, available.getCapacityAtTime(150).getMemorySize());
}
private void checkAllocation(Plan plan, int[] alloc, int start,
@ -162,18 +181,18 @@ public class TestInMemoryPlan {
for (int i = 0; i < alloc.length; i++) {
// only one instance for non-periodic reservation
if (periodicity <= 0) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
userCons.getCapacityAtTime(start + i));
} else {
// periodic reservations should repeat
long y = 0;
Resource res = Resource.newInstance(1024 * (alloc[i]), (alloc[i]));
while (y <= end * 2) {
Assert.assertEquals("At time: " + start + i + y, res,
assertEquals("At time: " + start + i + y, res,
plan.getTotalCommittedResources(start + i + y));
Assert.assertEquals(" At time: " + (start + i + y), res,
assertEquals(" At time: " + (start + i + y), res,
userCons.getCapacityAtTime(start + i + y));
y = y + periodicity;
}
@ -253,9 +272,9 @@ public class TestInMemoryPlan {
RLESparseResourceAllocation userCons =
plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])),
userCons.getCapacityAtTime(start + i));
}
@ -275,9 +294,9 @@ public class TestInMemoryPlan {
start + updatedAlloc.length);
for (int i = 0; i < updatedAlloc.length; i++) {
Assert.assertEquals(Resource.newInstance(1024 * (updatedAlloc[i] + i),
assertEquals(Resource.newInstance(1024 * (updatedAlloc[i] + i),
updatedAlloc[i] + i), plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(1024 * (updatedAlloc[i] + i),
assertEquals(Resource.newInstance(1024 * (updatedAlloc[i] + i),
updatedAlloc[i] + i), userCons.getCapacityAtTime(start + i));
}
}
@ -371,10 +390,10 @@ public class TestInMemoryPlan {
plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(
assertEquals(
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
assertEquals(
Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)),
userCons.getCapacityAtTime(start + i));
}
@ -389,9 +408,9 @@ public class TestInMemoryPlan {
userCons =
plan.getConsumptionForUserOverTime(user, start, start + alloc.length);
for (int i = 0; i < alloc.length; i++) {
Assert.assertEquals(Resource.newInstance(0, 0),
assertEquals(Resource.newInstance(0, 0),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(0, 0),
assertEquals(Resource.newInstance(0, 0),
userCons.getCapacityAtTime(start + i));
}
}
@ -492,11 +511,11 @@ public class TestInMemoryPlan {
plan.getConsumptionForUserOverTime(user, start, start + alloc2.length);
for (int i = 0; i < alloc2.length; i++) {
Assert.assertEquals(
assertEquals(
Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i),
alloc1[i] + alloc2[i] + i),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(
assertEquals(
Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i),
alloc1[i] + alloc2[i] + i),
userCons.getCapacityAtTime(start + i));
@ -530,9 +549,9 @@ public class TestInMemoryPlan {
Assert.assertNull(plan.getReservationById(reservationID1));
for (int i = 0; i < alloc1.length; i++) {
Assert.assertEquals(Resource.newInstance(0, 0),
assertEquals(Resource.newInstance(0, 0),
plan.getTotalCommittedResources(start + i));
Assert.assertEquals(Resource.newInstance(0, 0),
assertEquals(Resource.newInstance(0, 0),
userCons.getCapacityAtTime(start + i));
}
}
@ -721,16 +740,16 @@ public class TestInMemoryPlan {
private void doAssertions(Plan plan, ReservationAllocation rAllocation) {
ReservationId reservationID = rAllocation.getReservationId();
Assert.assertNotNull(plan.getReservationById(reservationID));
Assert.assertEquals(rAllocation, plan.getReservationById(reservationID));
assertEquals(rAllocation, plan.getReservationById(reservationID));
Assert.assertTrue(((InMemoryPlan) plan).getAllReservations().size() == 1);
if (rAllocation.getPeriodicity() <= 0) {
Assert.assertEquals(rAllocation.getEndTime(), plan.getLastEndTime());
assertEquals(rAllocation.getEndTime(), plan.getLastEndTime());
}
Assert.assertEquals(totalCapacity, plan.getTotalCapacity());
Assert.assertEquals(minAlloc, plan.getMinimumAllocation());
Assert.assertEquals(maxAlloc, plan.getMaximumAllocation());
Assert.assertEquals(resCalc, plan.getResourceCalculator());
Assert.assertEquals(planName, plan.getQueueName());
assertEquals(totalCapacity, plan.getTotalCapacity());
assertEquals(minAlloc, plan.getMinimumAllocation());
assertEquals(maxAlloc, plan.getMaximumAllocation());
assertEquals(resCalc, plan.getResourceCalculator());
assertEquals(planName, plan.getQueueName());
Assert.assertTrue(plan.getMoveOnExpiry());
}

View File

@ -26,11 +26,14 @@ import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import net.jcip.annotations.NotThreadSafe;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
@ -54,11 +57,27 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class tests the {@code AlignedPlannerWithGreedy} agent.
*/
@RunWith(value = Parameterized.class)
@NotThreadSafe
@SuppressWarnings("VisibilityModifier")
public class TestAlignedPlanner {
@Parameterized.Parameter(value = 0)
public String recurrenceExpression;
final static String NONPERIODIC = "0";
final static String THREEHOURPERIOD = "10800000";
final static String ONEDAYPERIOD = "86400000";
private static final Logger LOG = LoggerFactory
.getLogger(TestAlignedPlanner.class);
@ -72,6 +91,16 @@ public class TestAlignedPlanner {
private Resource clusterCapacity;
private long step;
@Parameterized.Parameters(name = "Testing: periodicity {0})")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{
{NONPERIODIC},
{THREEHOURPERIOD},
{ONEDAYPERIOD}
});
}
@Test
public void testSingleReservationAccept() throws PlanningException {
@ -107,6 +136,9 @@ public class TestAlignedPlanner {
assertTrue(alloc1.toString(),
check(alloc1, 10 * step, 20 * step, 10, 2048, 2));
System.out.println("--------AFTER AGENT----------");
System.out.println(plan.toString());
}
@Test
@ -1139,9 +1171,11 @@ public class TestAlignedPlanner {
long deadline, ReservationRequest[] reservationRequests,
ReservationRequestInterpreter rType, String username) {
return ReservationDefinition.newInstance(arrival, deadline,
ReservationRequests.newInstance(Arrays.asList(reservationRequests),
rType), username);
return ReservationDefinition.newInstance(arrival,
deadline, ReservationRequests
.newInstance(Arrays.asList(reservationRequests), rType),
username, recurrenceExpression, Priority.UNDEFINED);
}

View File

@ -59,13 +59,20 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
@SuppressWarnings("VisibilityModifier")
public class TestGreedyReservationAgent {
@Parameterized.Parameter(value = 0)
public boolean allocateLeft;
@Parameterized.Parameter(value = 1)
public String recurrenceExpression;
private static final Logger LOG = LoggerFactory
.getLogger(TestGreedyReservationAgent.class);
@ -76,16 +83,18 @@ public class TestGreedyReservationAgent {
Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
Random rand = new Random();
long step;
boolean allocateLeft;
public TestGreedyReservationAgent(Boolean b){
this.allocateLeft = b;
}
@Parameters
@Parameterized.Parameters(name = "Testing: allocateLeft {0}," +
" recurrenceExpression {1})")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{true}, {false}});
{true, "0"},
{false, "0"},
{true, "7200000"},
{false, "7200000"},
{true, "86400000"},
{false, "86400000"}
});
}
@Before
@ -134,6 +143,7 @@ public class TestGreedyReservationAgent {
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(5 * step);
rr.setDeadline(20 * step);
rr.setRecurrenceExpression(recurrenceExpression);
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(2048, 2), 10, 5, 10 * step);
ReservationRequests reqs = new ReservationRequestsPBImpl();
@ -193,6 +203,7 @@ public class TestGreedyReservationAgent {
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(5 * step);
rr.setDeadline(100 * step);
rr.setRecurrenceExpression(recurrenceExpression);
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(2048, 2), 20, 20,
10 * step);
@ -283,8 +294,9 @@ public class TestGreedyReservationAgent {
int[] f = { 100, 100 };
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
30 * step, 30 * step + f.length * step, f.length * step);
ReservationSystemTestUtil.createSimpleReservationDefinition(30 * step,
30 * step + f.length * step, f.length * step, 1,
recurrenceExpression);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
@ -296,6 +308,7 @@ public class TestGreedyReservationAgent {
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(0 * step);
rr.setDeadline(70 * step);
rr.setRecurrenceExpression(recurrenceExpression);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER);
ReservationRequest r = ReservationRequest.newInstance(
@ -346,10 +359,11 @@ public class TestGreedyReservationAgent {
prepareBasicPlan();
// create a completely utilized segment at time 30
int[] f = { 100, 100 };
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
30, 30 * step + f.length * step, f.length * step);
assertTrue(plan.toString(),
ReservationDefinition rDef = ReservationSystemTestUtil
.createSimpleReservationDefinition(30, 30 * step + f.length * step,
f.length * step, 1, recurrenceExpression);
assertTrue(
plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", 30 * step, 30 * step + f.length * step,
@ -406,6 +420,7 @@ public class TestGreedyReservationAgent {
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(0 * step);
rr.setDeadline(60 * step);
rr.setRecurrenceExpression(recurrenceExpression);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP);
ReservationRequest r = ReservationRequest.newInstance(
@ -453,6 +468,7 @@ public class TestGreedyReservationAgent {
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(100 * step);
rr.setDeadline(120 * step);
rr.setRecurrenceExpression(recurrenceExpression);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
ReservationRequest r = ReservationRequest.newInstance(
@ -494,6 +510,7 @@ public class TestGreedyReservationAgent {
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(100 * step);
rr.setDeadline(120 * step);
rr.setRecurrenceExpression(recurrenceExpression);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
ReservationRequest r = ReservationRequest.newInstance(
@ -542,6 +559,7 @@ public class TestGreedyReservationAgent {
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(100L);
rr.setDeadline(120L);
rr.setRecurrenceExpression(recurrenceExpression);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ANY);
@ -587,6 +605,7 @@ public class TestGreedyReservationAgent {
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(100 * step);
rr.setDeadline(120 * step);
rr.setRecurrenceExpression(recurrenceExpression);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
ReservationRequest r = ReservationRequest.newInstance(
@ -635,6 +654,7 @@ public class TestGreedyReservationAgent {
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(100L);
rr.setDeadline(120L);
rr.setRecurrenceExpression(recurrenceExpression);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
ReservationRequest r = ReservationRequest.newInstance(
@ -759,25 +779,4 @@ public class TestGreedyReservationAgent {
+ " in " + (end - start) + "ms");
}
public static void main(String[] arg) {
boolean left = false;
// run a stress test with by default 1000 random jobs
int numJobs = 1000;
if (arg.length > 0) {
numJobs = Integer.parseInt(arg[0]);
}
if (arg.length > 1) {
left = Boolean.parseBoolean(arg[1]);
}
try {
TestGreedyReservationAgent test = new TestGreedyReservationAgent(left);
test.setup();
test.testStress(numJobs);
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,213 @@
/*****************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import static org.mockito.Mockito.mock;
/**
* General purpose ReservationAgent tester.
*/
@RunWith(Parameterized.class)
@SuppressWarnings("VisibilityModifier")
public class TestReservationAgents {
@Parameterized.Parameter(value = 0)
public Class agentClass;
@Parameterized.Parameter(value = 1)
public boolean allocateLeft;
@Parameterized.Parameter(value = 2)
public String recurrenceExpression;
@Parameterized.Parameter(value = 3)
public int numOfNodes;
private long step;
private Random rand = new Random(2);
private ReservationAgent agent;
private Plan plan;
private ResourceCalculator resCalc = new DefaultResourceCalculator();
private Resource minAlloc = Resource.newInstance(1024, 1);
private Resource maxAlloc = Resource.newInstance(32 * 1023, 32);
private long timeHorizon = 2 * 24 * 3600 * 1000; // 2 days
private static final Logger LOG =
LoggerFactory.getLogger(TestReservationAgents.class);
@Parameterized.Parameters(name = "Testing: agent {0}, allocateLeft: {1}," +
" recurrenceExpression: {2}, numNodes: {3})")
public static Collection<Object[]> data() {
return Arrays.asList(
new Object[][] {{GreedyReservationAgent.class, true, "0", 100 },
{GreedyReservationAgent.class, false, "0", 100 },
{GreedyReservationAgent.class, true, "7200000", 100 },
{GreedyReservationAgent.class, false, "7200000", 100 },
{GreedyReservationAgent.class, true, "86400000", 100 },
{GreedyReservationAgent.class, false, "86400000", 100 },
{AlignedPlannerWithGreedy.class, true, "0", 100 },
{AlignedPlannerWithGreedy.class, false, "0", 100 },
{AlignedPlannerWithGreedy.class, true, "7200000", 100 },
{AlignedPlannerWithGreedy.class, false, "7200000", 100 },
{AlignedPlannerWithGreedy.class, true, "86400000", 100 },
{AlignedPlannerWithGreedy.class, false, "86400000", 100 } });
}
@Before
public void setup() throws Exception {
long seed = rand.nextLong();
rand.setSeed(seed);
LOG.info("Running with seed: " + seed);
// setting completely loose quotas
long timeWindow = 1000000L;
Resource clusterCapacity =
Resource.newInstance(numOfNodes * 1024, numOfNodes);
step = 1000L;
String reservationQ =
ReservationSystemTestUtil.getFullReservationQueueName();
float instConstraint = 100;
float avgConstraint = 100;
ReservationSchedulerConfiguration conf = ReservationSystemTestUtil
.createConf(reservationQ, timeWindow, instConstraint, avgConstraint);
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
policy.init(reservationQ, conf);
// setting conf to
conf.setBoolean(GreedyReservationAgent.FAVOR_EARLY_ALLOCATION,
allocateLeft);
agent = (ReservationAgent) agentClass.newInstance();
agent.init(conf);
QueueMetrics queueMetrics = mock(QueueMetrics.class);
RMContext context = ReservationSystemTestUtil.createMockRMContext();
plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
resCalc, minAlloc, maxAlloc, "dedicated", null, true, context);
}
@Test
public void test() throws Exception {
long period = Long.parseLong(recurrenceExpression);
for (int i = 0; i < 1000; i++) {
ReservationDefinition rr = createRandomRequest(i);
if (rr != null) {
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
try {
agent.createReservation(reservationID, "u1", plan, rr);
} catch (PlanningException p) {
// happens
}
}
}
}
private ReservationDefinition createRandomRequest(int i)
throws PlanningException {
long arrival = (long) Math.floor(rand.nextDouble() * timeHorizon);
long period = Long.parseLong(recurrenceExpression);
// min between period and rand around 30min
long duration =
(long) Math.round(Math.min(rand.nextDouble() * 3600 * 1000, period));
// min between period and rand around 5x duration
long deadline = (long) Math
.ceil(arrival + Math.min(duration * rand.nextDouble() * 10, period));
assert((deadline - arrival) <= period);
RLESparseResourceAllocation available = plan
.getAvailableResourceOverTime("u1", null, arrival, deadline, period);
NavigableMap<Long, Resource> availableMap = available.getCumulative();
// look at available space, and for each segment, use half of it with 50%
// probability
List<ReservationRequest> reservationRequests = new ArrayList<>();
for (Map.Entry<Long, Resource> e : availableMap.entrySet()) {
if (e.getValue() != null && rand.nextDouble() > 0.001) {
int numContainers = (int) Math.ceil(Resources.divide(resCalc,
plan.getTotalCapacity(), e.getValue(), minAlloc) / 2);
long tempDur =
Math.min(duration, availableMap.higherKey(e.getKey()) - e.getKey());
reservationRequests.add(ReservationRequest.newInstance(minAlloc,
numContainers, 1, tempDur));
}
}
if (reservationRequests.size() < 1) {
return null;
}
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(arrival);
rr.setDeadline(deadline);
rr.setRecurrenceExpression(recurrenceExpression);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER);
reqs.setReservationResources(reservationRequests);
rr.setReservationRequests(reqs);
rr.setReservationName("res_" + i);
return rr;
}
}