YARN-4360. Improve GreedyReservationAgent to support "early" allocations, and performance improvements (curino via asuresh)

This commit is contained in:
Arun Suresh 2016-02-10 09:11:15 -08:00
parent a429f857b2
commit 5cf5c41a89
8 changed files with 565 additions and 114 deletions

View File

@ -799,6 +799,9 @@ Release 2.8.0 - UNRELEASED
YARN-4662. Document some newly added metrics. (Jian He via xgong)
YARN-4360. Improve GreedyReservationAgent to support "early" allocations,
and performance improvements (curino via asuresh)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -214,6 +214,15 @@ public class CapacityOverTimePolicy implements SharingPolicy {
RLESparseResourceAllocation used =
plan.getConsumptionForUserOverTime(user, start, end);
// add back in old reservation used resources if any
ReservationAllocation old = plan.getReservationById(oldId);
if (old != null) {
used =
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
Resources.clone(plan.getTotalCapacity()), used,
old.getResourcesOverTime(), RLEOperator.subtract, start, end);
}
instRLEQuota =
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
planTotalCapacity, instRLEQuota, used, RLEOperator.subtract, start,

View File

@ -58,13 +58,13 @@ public class AlignedPlannerWithGreedy implements ReservationAgent {
// LowCostAligned planning algorithm
ReservationAgent algAligned =
new IterativePlanner(new StageEarliestStartByDemand(),
new StageAllocatorLowCostAligned(smoothnessFactor));
new StageAllocatorLowCostAligned(smoothnessFactor), false);
listAlg.add(algAligned);
// Greedy planning algorithm
ReservationAgent algGreedy =
new IterativePlanner(new StageEarliestStartByJobArrival(),
new StageAllocatorGreedy());
new StageAllocatorGreedy(), false);
listAlg.add(algGreedy);
// Set planner:

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
@ -45,9 +46,44 @@ public class GreedyReservationAgent implements ReservationAgent {
.getLogger(GreedyReservationAgent.class);
// Greedy planner
private final ReservationAgent planner = new IterativePlanner(
new StageEarliestStartByJobArrival(), new StageAllocatorGreedy());
private final ReservationAgent planner;
public final static String GREEDY_FAVOR_EARLY_ALLOCATION =
"yarn.resourcemanager.reservation-system.favor-early-allocation";
public final static boolean DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION = true;
private final boolean allocateLeft;
public GreedyReservationAgent() {
this(new Configuration());
}
public GreedyReservationAgent(Configuration yarnConfiguration) {
allocateLeft =
yarnConfiguration.getBoolean(GREEDY_FAVOR_EARLY_ALLOCATION,
DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION);
if (allocateLeft) {
LOG.info("Initializing the GreedyReservationAgent to favor \"early\""
+ " (left) allocations (controlled by parameter: "
+ GREEDY_FAVOR_EARLY_ALLOCATION + ")");
} else {
LOG.info("Initializing the GreedyReservationAgent to favor \"late\""
+ " (right) allocations (controlled by parameter: "
+ GREEDY_FAVOR_EARLY_ALLOCATION + ")");
}
planner =
new IterativePlanner(new StageEarliestStartByJobArrival(),
new StageAllocatorGreedyRLE(allocateLeft), allocateLeft);
}
public boolean isAllocateLeft(){
return allocateLeft;
}
@Override
public boolean createReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {

View File

@ -19,9 +19,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.HashMap;
import java.util.HashSet;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -69,11 +72,13 @@ public class IterativePlanner extends PlanningAlgorithm {
// Phase algorithms
private StageEarliestStart algStageEarliestStart = null;
private StageAllocator algStageAllocator = null;
private final boolean allocateLeft;
// Constructor
public IterativePlanner(StageEarliestStart algEarliestStartTime,
StageAllocator algStageAllocator) {
StageAllocator algStageAllocator, boolean allocateLeft) {
this.allocateLeft = allocateLeft;
setAlgStageEarliestStart(algEarliestStartTime);
setAlgStageAllocator(algStageAllocator);
@ -85,61 +90,49 @@ public class IterativePlanner extends PlanningAlgorithm {
String user) throws PlanningException {
// Initialize
initialize(plan, reservation);
// If the job has been previously reserved, logically remove its allocation
ReservationAllocation oldReservation =
plan.getReservationById(reservationId);
if (oldReservation != null) {
ignoreOldAllocation(oldReservation);
}
initialize(plan, reservationId, reservation);
// Create the allocations data structure
RLESparseResourceAllocation allocations =
new RLESparseResourceAllocation(plan.getResourceCalculator());
// Get a reverse iterator for the set of stages
ListIterator<ReservationRequest> li =
reservation
.getReservationRequests()
.getReservationResources()
.listIterator(
reservation.getReservationRequests().getReservationResources()
.size());
StageProvider stageProvider = new StageProvider(allocateLeft, reservation);
// Current stage
ReservationRequest currentReservationStage;
// Index, points on the current node
int index =
reservation.getReservationRequests().getReservationResources().size();
// Stage deadlines
long stageDeadline = stepRoundDown(reservation.getDeadline(), step);
long successorStartingTime = -1;
long predecessorEndTime = stepRoundDown(reservation.getArrival(), step);
long stageArrivalTime = -1;
// Iterate the stages in reverse order
while (li.hasPrevious()) {
while (stageProvider.hasNext()) {
// Get current stage
currentReservationStage = li.previous();
index -= 1;
currentReservationStage = stageProvider.next();
// Validate that the ReservationRequest respects basic constraints
validateInputStage(plan, currentReservationStage);
// Compute an adjusted earliestStart for this resource
// (we need this to provision some space for the ORDER contracts)
long stageArrivalTime = reservation.getArrival();
if (allocateLeft) {
stageArrivalTime = predecessorEndTime;
} else {
stageArrivalTime = reservation.getArrival();
if (jobType == ReservationRequestInterpreter.R_ORDER
|| jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
stageArrivalTime =
computeEarliestStartingTime(plan, reservation, index,
currentReservationStage, stageDeadline);
computeEarliestStartingTime(plan, reservation,
stageProvider.getCurrentIndex(), currentReservationStage,
stageDeadline);
}
stageArrivalTime = stepRoundUp(stageArrivalTime, step);
stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival());
}
// Compute the allocation of a single stage
Map<ReservationInterval, Resource> curAlloc =
computeStageAllocation(plan, currentReservationStage,
@ -155,7 +148,7 @@ public class IterativePlanner extends PlanningAlgorithm {
}
// Otherwise, the job cannot be allocated
return null;
throw new PlanningException("The request cannot be satisfied");
}
@ -177,33 +170,41 @@ public class IterativePlanner extends PlanningAlgorithm {
if (jobType == ReservationRequestInterpreter.R_ORDER
|| jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
// CHECK ORDER_NO_GAP
// Verify that there is no gap, in case the job is ORDER_NO_GAP
// note that the test is different left-to-right and right-to-left
if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP
&& successorStartingTime != -1
&& successorStartingTime > stageEndTime) {
return null;
&& ((allocateLeft && predecessorEndTime < stageStartTime) ||
(!allocateLeft && (stageEndTime < successorStartingTime))
)
|| (!isNonPreemptiveAllocation(curAlloc))) {
throw new PlanningException(
"The allocation found does not respect ORDER_NO_GAP");
}
if (allocateLeft) {
// Store the stageStartTime and set the new stageDeadline
predecessorEndTime = stageEndTime;
} else {
// Store the stageStartTime and set the new stageDeadline
successorStartingTime = stageStartTime;
stageDeadline = stageStartTime;
}
}
}
// If the allocation is empty, return an error
if (allocations.isEmpty()) {
return null;
throw new PlanningException("The request cannot be satisfied");
}
return allocations;
}
protected void initialize(Plan plan, ReservationDefinition reservation) {
protected void initialize(Plan plan, ReservationId reservationId,
ReservationDefinition reservation) throws PlanningException {
// Get plan step & capacity
capacity = plan.getTotalCapacity();
@ -214,13 +215,26 @@ public class IterativePlanner extends PlanningAlgorithm {
jobArrival = stepRoundUp(reservation.getArrival(), step);
jobDeadline = stepRoundDown(reservation.getDeadline(), step);
// Dirty read of plan load
planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline);
// Initialize the plan modifications
planModifications =
new RLESparseResourceAllocation(plan.getResourceCalculator());
// Dirty read of plan load
// planLoads are not used by other StageAllocators... and don't deal
// well with huge reservation ranges
if (this.algStageAllocator instanceof StageAllocatorLowCostAligned) {
planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline);
ReservationAllocation oldRes = plan.getReservationById(reservationId);
if (oldRes != null) {
planModifications =
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
plan.getTotalCapacity(), planModifications,
oldRes.getResourcesOverTime(), RLEOperator.subtract,
jobArrival, jobDeadline);
}
}
}
private Map<Long, Resource> getAllLoadsInInterval(Plan plan, long startTime,
@ -240,32 +254,6 @@ public class IterativePlanner extends PlanningAlgorithm {
}
private void ignoreOldAllocation(ReservationAllocation oldReservation) {
// If there is no old reservation, return
if (oldReservation == null) {
return;
}
// Subtract each allocation interval from the planModifications
for (Entry<ReservationInterval, Resource> entry : oldReservation
.getAllocationRequests().entrySet()) {
// Read the entry
ReservationInterval interval = entry.getKey();
Resource resource = entry.getValue();
// Find the actual request
Resource negativeResource = Resources.multiply(resource, -1);
// Insert it into planModifications as a 'negative' request, to
// represent available resources
planModifications.addInterval(interval, negativeResource);
}
}
private void validateInputStage(Plan plan, ReservationRequest rr)
throws ContractValidationException {
@ -291,13 +279,56 @@ public class IterativePlanner extends PlanningAlgorithm {
rr.getCapability(), plan.getMaximumAllocation())) {
throw new ContractValidationException(
"Individual capability requests should not exceed cluster's " +
"maxAlloc");
"Individual capability requests should not exceed cluster's "
+ "maxAlloc");
}
}
private boolean isNonPreemptiveAllocation(
Map<ReservationInterval, Resource> curAlloc) {
// Checks whether a stage allocation is non preemptive or not.
// Assumption: the intervals are non-intersecting (as returned by
// computeStageAllocation()).
// For a non-preemptive allocation, only two end points appear exactly once
Set<Long> endPoints = new HashSet<Long>(2 * curAlloc.size());
for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
ReservationInterval interval = entry.getKey();
Resource resource = entry.getValue();
// Ignore intervals with no allocation
if (Resources.equals(resource, Resource.newInstance(0, 0))) {
continue;
}
// Get endpoints
Long left = interval.getStartTime();
Long right = interval.getEndTime();
// Add left endpoint if we haven't seen it before, remove otherwise
if (!endPoints.contains(left)) {
endPoints.add(left);
} else {
endPoints.remove(left);
}
// Add right endpoint if we haven't seen it before, remove otherwise
if (!endPoints.contains(right)) {
endPoints.add(right);
} else {
endPoints.remove(right);
}
}
// Non-preemptive only if endPoints is of size 2
return (endPoints.size() == 2);
}
// Call algEarliestStartTime()
protected long computeEarliestStartingTime(Plan plan,
ReservationDefinition reservation, int index,
@ -335,4 +366,60 @@ public class IterativePlanner extends PlanningAlgorithm {
}
/**
* Helper class that provide a list of ReservationRequests and iterates
* forward or backward depending whether we are allocating left-to-right or
* right-to-left.
*/
public static class StageProvider {
private final boolean allocateLeft;
private ListIterator<ReservationRequest> li;
public StageProvider(boolean allocateLeft,
ReservationDefinition reservation) {
this.allocateLeft = allocateLeft;
int startingIndex;
if (allocateLeft) {
startingIndex = 0;
} else {
startingIndex =
reservation.getReservationRequests().getReservationResources()
.size();
}
// Get a reverse iterator for the set of stages
li =
reservation.getReservationRequests().getReservationResources()
.listIterator(startingIndex);
}
public boolean hasNext() {
if (allocateLeft) {
return li.hasNext();
} else {
return li.hasPrevious();
}
}
public ReservationRequest next() {
if (allocateLeft) {
return li.next();
} else {
return li.previous();
}
}
public int getCurrentIndex() {
if (allocateLeft) {
return li.nextIndex() - 1;
} else {
return li.previousIndex() + 1;
}
}
}
}

View File

@ -0,0 +1,245 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* Computes the stage allocation according to the greedy allocation rule. The
* greedy rule repeatedly allocates requested containers at the leftmost or
* rightmost possible interval. This implementation leverages the
* run-length-encoding of the time-series we operate on and proceed more quickly
* than the baseline.
*/
public class StageAllocatorGreedyRLE implements StageAllocator {
private final boolean allocateLeft;
public StageAllocatorGreedyRLE(boolean allocateLeft) {
this.allocateLeft = allocateLeft;
}
@Override
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
Map<Long, Resource> planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageEarliestStart, long stageDeadline, String user,
ReservationId oldId) throws PlanningException {
// abort early if the interval is not satisfiable
if (stageEarliestStart + rr.getDuration() > stageDeadline) {
return null;
}
Map<ReservationInterval, Resource> allocationRequests =
new HashMap<ReservationInterval, Resource>();
Resource totalCapacity = plan.getTotalCapacity();
// compute the gang as a resource and get the duration
Resource sizeOfGang =
Resources.multiply(rr.getCapability(), rr.getConcurrency());
long dur = rr.getDuration();
long step = plan.getStep();
// ceil the duration to the next multiple of the plan step
if (dur % step != 0) {
dur += (step - (dur % step));
}
// we know for sure that this division has no remainder (part of contract
// with user, validate before
int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
// get available resources from plan
RLESparseResourceAllocation netRLERes =
plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart,
stageDeadline);
// remove plan modifications
netRLERes =
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
totalCapacity, netRLERes, planModifications, RLEOperator.subtract,
stageEarliestStart, stageDeadline);
// loop trying to place until we are done, or we are considering
// an invalid range of times
while (gangsToPlace > 0 && stageEarliestStart + dur <= stageDeadline) {
// as we run along we remember how many gangs we can fit, and what
// was the most constraining moment in time (we will restart just
// after that to place the next batch)
int maxGang = gangsToPlace;
long minPoint = -1;
// focus our attention to a time-range under consideration
NavigableMap<Long, Resource> partialMap =
netRLERes.getRangeOverlapping(stageEarliestStart, stageDeadline)
.getCumulative();
// revert the map for right-to-left allocation
if (!allocateLeft) {
partialMap = partialMap.descendingMap();
}
Iterator<Entry<Long, Resource>> netIt = partialMap.entrySet().iterator();
long oldT = stageDeadline;
// internal loop, tries to allocate as many gang as possible starting
// at a given point in time, if it fails we move to the next time
// interval (with outside loop)
while (maxGang > 0 && netIt.hasNext()) {
long t;
Resource curAvailRes;
Entry<Long, Resource> e = netIt.next();
if (allocateLeft) {
t = Math.max(e.getKey(), stageEarliestStart);
curAvailRes = e.getValue();
} else {
t = oldT;
oldT = e.getKey();
//attention: higher means lower, because we reversed the map direction
curAvailRes = partialMap.higherEntry(t).getValue();
}
// check exit/skip conditions/
if (curAvailRes == null) {
//skip undefined regions (should not happen beside borders)
continue;
}
if (exitCondition(t, stageEarliestStart, stageDeadline, dur)) {
break;
}
// compute maximum number of gangs we could fit
int curMaxGang =
(int) Math.floor(Resources.divide(plan.getResourceCalculator(),
totalCapacity, curAvailRes, sizeOfGang));
curMaxGang = Math.min(gangsToPlace, curMaxGang);
// compare with previous max, and set it. also remember *where* we found
// the minimum (useful for next attempts)
if (curMaxGang <= maxGang) {
maxGang = curMaxGang;
minPoint = t;
}
}
// update data structures that retain the progress made so far
gangsToPlace =
trackProgress(planModifications, rr, stageEarliestStart,
stageDeadline, allocationRequests, dur, gangsToPlace, maxGang);
// reset the next range of time-intervals to deal with
if (allocateLeft) {
// set earliest start to the min of the constraining "range" or my the
// end of this allocation
stageEarliestStart =
Math.min(partialMap.higherKey(minPoint), stageEarliestStart + dur);
} else {
// same as above moving right-to-left
stageDeadline =
Math.max(partialMap.higherKey(minPoint), stageDeadline - dur);
}
}
// if no gangs are left to place we succeed and return the allocation
if (gangsToPlace == 0) {
return allocationRequests;
} else {
// If we are here is because we did not manage to satisfy this request.
// So we need to remove unwanted side-effect from tempAssigned (needed
// for ANY).
for (Map.Entry<ReservationInterval, Resource> tempAllocation :
allocationRequests.entrySet()) {
planModifications.removeInterval(tempAllocation.getKey(),
tempAllocation.getValue());
}
// and return null to signal failure in this allocation
return null;
}
}
private int trackProgress(RLESparseResourceAllocation planModifications,
ReservationRequest rr, long stageEarliestStart, long stageDeadline,
Map<ReservationInterval, Resource> allocationRequests, long dur,
int gangsToPlace, int maxGang) {
// if we were able to place any gang, record this, and decrement
// gangsToPlace
if (maxGang > 0) {
gangsToPlace -= maxGang;
ReservationInterval reservationInt =
computeReservationInterval(stageEarliestStart, stageDeadline, dur);
Resource reservationRes =
Resources.multiply(rr.getCapability(), rr.getConcurrency() * maxGang);
// remember occupied space (plan is read-only till we find a plausible
// allocation for the entire request). This is needed since we might be
// placing other ReservationRequest within the same
// ReservationDefinition,
// and we must avoid double-counting the available resources
planModifications.addInterval(reservationInt, reservationRes);
allocationRequests.put(reservationInt, reservationRes);
}
return gangsToPlace;
}
private ReservationInterval computeReservationInterval(
long stageEarliestStart, long stageDeadline, long dur) {
ReservationInterval reservationInt;
if (allocateLeft) {
reservationInt =
new ReservationInterval(stageEarliestStart, stageEarliestStart + dur);
} else {
reservationInt =
new ReservationInterval(stageDeadline - dur, stageDeadline);
}
return reservationInt;
}
private boolean exitCondition(long t, long stageEarliestStart,
long stageDeadline, long dur) {
if (allocateLeft) {
return t >= stageEarliestStart + dur;
} else {
return t < stageDeadline - dur;
}
}
}

View File

@ -19,9 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anySetOf;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
import java.io.FileWriter;
import java.io.IOException;
@ -75,12 +73,14 @@ public class ReservationSystemTestUtil {
public static ReservationSchedulerConfiguration createConf(
String reservationQ, long timeWindow, float instConstraint,
float avgConstraint) {
ReservationSchedulerConfiguration conf =
mock(ReservationSchedulerConfiguration.class);
ReservationSchedulerConfiguration realConf = new CapacitySchedulerConfiguration();
ReservationSchedulerConfiguration conf = spy(realConf);
when(conf.getReservationWindow(reservationQ)).thenReturn(timeWindow);
when(conf.getInstantaneousMaxCapacity(reservationQ))
.thenReturn(instConstraint);
when(conf.getAverageCapacity(reservationQ)).thenReturn(avgConstraint);
return conf;
}
@ -177,10 +177,15 @@ public class ReservationSystemTestUtil {
public static ReservationDefinition createSimpleReservationDefinition(
long arrival, long deadline, long duration) {
return createSimpleReservationDefinition(arrival, deadline, duration, 1);
}
public static ReservationDefinition createSimpleReservationDefinition(
long arrival, long deadline, long duration, int parallelism) {
// create a request with a single atomic ask
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1,
duration);
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
parallelism, parallelism, duration);
ReservationDefinition rDef = new ReservationDefinitionPBImpl();
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setReservationResources(Collections.singletonList(r));

View File

@ -26,6 +26,8 @@ import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -55,8 +57,12 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.mortbay.log.Log;
@RunWith(Parameterized.class)
public class TestGreedyReservationAgent {
ReservationAgent agent;
@ -66,6 +72,17 @@ public class TestGreedyReservationAgent {
Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
Random rand = new Random();
long step;
boolean allocateLeft;
public TestGreedyReservationAgent(Boolean b){
this.allocateLeft = b;
}
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{true}, {false}});
}
@Before
public void setup() throws Exception {
@ -90,7 +107,11 @@ public class TestGreedyReservationAgent {
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
policy.init(reservationQ, conf);
agent = new GreedyReservationAgent();
// setting conf to
conf.setBoolean(GreedyReservationAgent.GREEDY_FAVOR_EARLY_ALLOCATION,
allocateLeft);
agent = new GreedyReservationAgent(conf);
QueueMetrics queueMetrics = mock(QueueMetrics.class);
RMContext context = ReservationSystemTestUtil.createMockRMContext();
@ -130,13 +151,21 @@ public class TestGreedyReservationAgent {
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
if(allocateLeft){
for (long i = 5 * step; i < 15 * step; i++) {
assertTrue(
"Agent-based allocation unexpected",
Resources.equals(cs.getResourcesAtTime(i),
Resource.newInstance(2048 * 10, 2 * 10)));
}
} else {
for (long i = 10 * step; i < 20 * step; i++) {
assertTrue(
"Agent-based allocation unexpected",
Resources.equals(cs.getResourcesAtTime(i),
Resource.newInstance(2048 * 10, 2 * 10)));
}
}
}
@SuppressWarnings("javadoc")
@ -212,13 +241,27 @@ public class TestGreedyReservationAgent {
System.out.println(plan.toString());
System.out.println(plan.toCumulativeString());
if (allocateLeft) {
for (long i = 5 * step; i < 15 * step; i++) {
assertTrue(
"Agent-based allocation unexpected",
Resources.equals(cs.getResourcesAtTime(i),
Resource.newInstance(2048 * 20, 2 * 20)));
}
for (long i = 15 * step; i < 25 * step; i++) {
// RR2 is pushed out by the presence of RR
assertTrue(
"Agent-based allocation unexpected",
Resources.equals(cs2.getResourcesAtTime(i),
Resource.newInstance(2048 * 20, 2 * 20)));
}
} else {
for (long i = 90 * step; i < 100 * step; i++) {
assertTrue(
"Agent-based allocation unexpected",
Resources.equals(cs.getResourcesAtTime(i),
Resource.newInstance(2048 * 20, 2 * 20)));
}
// RR2 is pushed out by the presence of RR
for (long i = 80 * step; i < 90 * step; i++) {
assertTrue(
"Agent-based allocation unexpected",
@ -226,6 +269,7 @@ public class TestGreedyReservationAgent {
Resource.newInstance(2048 * 20, 2 * 20)));
}
}
}
@Test
public void testOrder() throws PlanningException {
@ -274,10 +318,18 @@ public class TestGreedyReservationAgent {
ReservationAllocation cs = plan.getReservationById(reservationID);
if (allocateLeft) {
assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
assertTrue(cs.toString(), check(cs, 32 * step, 42 * step, 20, 1024, 1));
assertTrue(cs.toString(), check(cs, 42 * step, 62 * step, 10, 1024, 1));
} else {
assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1));
assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1));
assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1));
assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1));
}
System.out.println("--------AFTER ORDER ALLOCATION (queue: "
+ reservationID + ")----------");
System.out.println(plan.toString());
@ -466,7 +518,12 @@ public class TestGreedyReservationAgent {
ReservationAllocation cs = plan.getReservationById(reservationID);
if (allocateLeft) {
assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 5, 1024, 1));
} else {
assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
}
System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID
+ ")----------");
System.out.println(plan.toString());
@ -551,8 +608,13 @@ public class TestGreedyReservationAgent {
ReservationAllocation cs = plan.getReservationById(reservationID);
if (allocateLeft) {
assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 25, 1024, 1));
assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1));
} else {
assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 20, 1024, 1));
assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 25, 1024, 1));
}
System.out.println("--------AFTER ALL ALLOCATION (queue: " + reservationID
+ ")----------");
@ -695,14 +757,18 @@ public class TestGreedyReservationAgent {
public static void main(String[] arg) {
boolean left = false;
// run a stress test with by default 1000 random jobs
int numJobs = 1000;
if (arg.length > 0) {
numJobs = Integer.parseInt(arg[0]);
}
if (arg.length > 1) {
left = Boolean.parseBoolean(arg[1]);
}
try {
TestGreedyReservationAgent test = new TestGreedyReservationAgent();
TestGreedyReservationAgent test = new TestGreedyReservationAgent(left);
test.setup();
test.testStress(numJobs);
} catch (Exception e) {