From 5cf5c41a895f5ab8bf6270089f8cfdea50573a97 Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Wed, 10 Feb 2016 09:11:15 -0800 Subject: [PATCH] YARN-4360. Improve GreedyReservationAgent to support "early" allocations, and performance improvements (curino via asuresh) --- hadoop-yarn-project/CHANGES.txt | 3 + .../reservation/CapacityOverTimePolicy.java | 9 + .../planning/AlignedPlannerWithGreedy.java | 4 +- .../planning/GreedyReservationAgent.java | 40 ++- .../planning/IterativePlanner.java | 239 +++++++++++------ .../planning/StageAllocatorGreedyRLE.java | 245 ++++++++++++++++++ .../ReservationSystemTestUtil.java | 19 +- .../planning/TestGreedyReservationAgent.java | 120 +++++++-- 8 files changed, 565 insertions(+), 114 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3a0e0b1f0cc..ff37e1a2a63 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -799,6 +799,9 @@ Release 2.8.0 - UNRELEASED YARN-4662. Document some newly added metrics. (Jian He via xgong) + YARN-4360. Improve GreedyReservationAgent to support "early" allocations, + and performance improvements (curino via asuresh) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java index 424b54316df..80f6c880962 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java @@ -214,6 +214,15 @@ public RLESparseResourceAllocation availableResources( RLESparseResourceAllocation used = plan.getConsumptionForUserOverTime(user, start, end); + // add back in old reservation used resources if any + ReservationAllocation old = plan.getReservationById(oldId); + if (old != null) { + used = + RLESparseResourceAllocation.merge(plan.getResourceCalculator(), + Resources.clone(plan.getTotalCapacity()), used, + old.getResourcesOverTime(), RLEOperator.subtract, start, end); + } + instRLEQuota = RLESparseResourceAllocation.merge(plan.getResourceCalculator(), planTotalCapacity, instRLEQuota, used, RLEOperator.subtract, start, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java index a3899280a77..b23cf1e933e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java @@ -58,13 +58,13 @@ public AlignedPlannerWithGreedy(int smoothnessFactor) { // LowCostAligned planning algorithm ReservationAgent algAligned = new IterativePlanner(new StageEarliestStartByDemand(), - new StageAllocatorLowCostAligned(smoothnessFactor)); + new StageAllocatorLowCostAligned(smoothnessFactor), false); listAlg.add(algAligned); // Greedy planning algorithm ReservationAgent algGreedy = new IterativePlanner(new StageEarliestStartByJobArrival(), - new StageAllocatorGreedy()); + new StageAllocatorGreedy(), false); listAlg.add(algGreedy); // Set planner: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java index db82a66dee0..915a834179f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; @@ -45,9 +46,44 @@ public class GreedyReservationAgent implements ReservationAgent { .getLogger(GreedyReservationAgent.class); // Greedy planner - private final ReservationAgent planner = new IterativePlanner( - new StageEarliestStartByJobArrival(), new StageAllocatorGreedy()); + private final ReservationAgent planner; + public final static String GREEDY_FAVOR_EARLY_ALLOCATION = + "yarn.resourcemanager.reservation-system.favor-early-allocation"; + + public final static boolean DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION = true; + + private final boolean allocateLeft; + + public GreedyReservationAgent() { + this(new Configuration()); + } + + public GreedyReservationAgent(Configuration yarnConfiguration) { + + allocateLeft = + yarnConfiguration.getBoolean(GREEDY_FAVOR_EARLY_ALLOCATION, + DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION); + + if (allocateLeft) { + LOG.info("Initializing the GreedyReservationAgent to favor \"early\"" + + " (left) allocations (controlled by parameter: " + + GREEDY_FAVOR_EARLY_ALLOCATION + ")"); + } else { + LOG.info("Initializing the GreedyReservationAgent to favor \"late\"" + + " (right) allocations (controlled by parameter: " + + GREEDY_FAVOR_EARLY_ALLOCATION + ")"); + } + + planner = + new IterativePlanner(new StageEarliestStartByJobArrival(), + new StageAllocatorGreedyRLE(allocateLeft), allocateLeft); + + } + + public boolean isAllocateLeft(){ + return allocateLeft; + } @Override public boolean createReservation(ReservationId reservationId, String user, Plan plan, ReservationDefinition contract) throws PlanningException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java index 77362d58290..24d237a7998 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java @@ -19,9 +19,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; import java.util.HashMap; +import java.util.HashSet; import java.util.ListIterator; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; @@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.util.resource.Resources; @@ -69,11 +72,13 @@ public class IterativePlanner extends PlanningAlgorithm { // Phase algorithms private StageEarliestStart algStageEarliestStart = null; private StageAllocator algStageAllocator = null; + private final boolean allocateLeft; // Constructor public IterativePlanner(StageEarliestStart algEarliestStartTime, - StageAllocator algStageAllocator) { + StageAllocator algStageAllocator, boolean allocateLeft) { + this.allocateLeft = allocateLeft; setAlgStageEarliestStart(algEarliestStartTime); setAlgStageAllocator(algStageAllocator); @@ -85,61 +90,49 @@ public RLESparseResourceAllocation computeJobAllocation(Plan plan, String user) throws PlanningException { // Initialize - initialize(plan, reservation); - - // If the job has been previously reserved, logically remove its allocation - ReservationAllocation oldReservation = - plan.getReservationById(reservationId); - if (oldReservation != null) { - ignoreOldAllocation(oldReservation); - } + initialize(plan, reservationId, reservation); // Create the allocations data structure RLESparseResourceAllocation allocations = new RLESparseResourceAllocation(plan.getResourceCalculator()); - // Get a reverse iterator for the set of stages - ListIterator li = - reservation - .getReservationRequests() - .getReservationResources() - .listIterator( - reservation.getReservationRequests().getReservationResources() - .size()); + StageProvider stageProvider = new StageProvider(allocateLeft, reservation); // Current stage ReservationRequest currentReservationStage; - // Index, points on the current node - int index = - reservation.getReservationRequests().getReservationResources().size(); - // Stage deadlines long stageDeadline = stepRoundDown(reservation.getDeadline(), step); long successorStartingTime = -1; + long predecessorEndTime = stepRoundDown(reservation.getArrival(), step); + long stageArrivalTime = -1; // Iterate the stages in reverse order - while (li.hasPrevious()) { + while (stageProvider.hasNext()) { // Get current stage - currentReservationStage = li.previous(); - index -= 1; + currentReservationStage = stageProvider.next(); // Validate that the ReservationRequest respects basic constraints validateInputStage(plan, currentReservationStage); // Compute an adjusted earliestStart for this resource // (we need this to provision some space for the ORDER contracts) - long stageArrivalTime = reservation.getArrival(); - if (jobType == ReservationRequestInterpreter.R_ORDER - || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) { - stageArrivalTime = - computeEarliestStartingTime(plan, reservation, index, - currentReservationStage, stageDeadline); - } - stageArrivalTime = stepRoundUp(stageArrivalTime, step); - stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival()); + if (allocateLeft) { + stageArrivalTime = predecessorEndTime; + } else { + stageArrivalTime = reservation.getArrival(); + if (jobType == ReservationRequestInterpreter.R_ORDER + || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) { + stageArrivalTime = + computeEarliestStartingTime(plan, reservation, + stageProvider.getCurrentIndex(), currentReservationStage, + stageDeadline); + } + stageArrivalTime = stepRoundUp(stageArrivalTime, step); + stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival()); + } // Compute the allocation of a single stage Map curAlloc = computeStageAllocation(plan, currentReservationStage, @@ -155,7 +148,7 @@ public RLESparseResourceAllocation computeJobAllocation(Plan plan, } // Otherwise, the job cannot be allocated - return null; + throw new PlanningException("The request cannot be satisfied"); } @@ -177,33 +170,41 @@ public RLESparseResourceAllocation computeJobAllocation(Plan plan, if (jobType == ReservationRequestInterpreter.R_ORDER || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) { + // CHECK ORDER_NO_GAP // Verify that there is no gap, in case the job is ORDER_NO_GAP + // note that the test is different left-to-right and right-to-left if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP && successorStartingTime != -1 - && successorStartingTime > stageEndTime) { - - return null; - + && ((allocateLeft && predecessorEndTime < stageStartTime) || + (!allocateLeft && (stageEndTime < successorStartingTime)) + ) + || (!isNonPreemptiveAllocation(curAlloc))) { + throw new PlanningException( + "The allocation found does not respect ORDER_NO_GAP"); } - // Store the stageStartTime and set the new stageDeadline - successorStartingTime = stageStartTime; - stageDeadline = stageStartTime; - + if (allocateLeft) { + // Store the stageStartTime and set the new stageDeadline + predecessorEndTime = stageEndTime; + } else { + // Store the stageStartTime and set the new stageDeadline + successorStartingTime = stageStartTime; + stageDeadline = stageStartTime; + } } - } // If the allocation is empty, return an error if (allocations.isEmpty()) { - return null; + throw new PlanningException("The request cannot be satisfied"); } return allocations; } - protected void initialize(Plan plan, ReservationDefinition reservation) { + protected void initialize(Plan plan, ReservationId reservationId, + ReservationDefinition reservation) throws PlanningException { // Get plan step & capacity capacity = plan.getTotalCapacity(); @@ -214,13 +215,26 @@ protected void initialize(Plan plan, ReservationDefinition reservation) { jobArrival = stepRoundUp(reservation.getArrival(), step); jobDeadline = stepRoundDown(reservation.getDeadline(), step); - // Dirty read of plan load - planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline); - // Initialize the plan modifications planModifications = new RLESparseResourceAllocation(plan.getResourceCalculator()); + // Dirty read of plan load + + // planLoads are not used by other StageAllocators... and don't deal + // well with huge reservation ranges + if (this.algStageAllocator instanceof StageAllocatorLowCostAligned) { + planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline); + ReservationAllocation oldRes = plan.getReservationById(reservationId); + if (oldRes != null) { + planModifications = + RLESparseResourceAllocation.merge(plan.getResourceCalculator(), + plan.getTotalCapacity(), planModifications, + oldRes.getResourcesOverTime(), RLEOperator.subtract, + jobArrival, jobDeadline); + } + } + } private Map getAllLoadsInInterval(Plan plan, long startTime, @@ -240,32 +254,6 @@ private Map getAllLoadsInInterval(Plan plan, long startTime, } - private void ignoreOldAllocation(ReservationAllocation oldReservation) { - - // If there is no old reservation, return - if (oldReservation == null) { - return; - } - - // Subtract each allocation interval from the planModifications - for (Entry entry : oldReservation - .getAllocationRequests().entrySet()) { - - // Read the entry - ReservationInterval interval = entry.getKey(); - Resource resource = entry.getValue(); - - // Find the actual request - Resource negativeResource = Resources.multiply(resource, -1); - - // Insert it into planModifications as a 'negative' request, to - // represent available resources - planModifications.addInterval(interval, negativeResource); - - } - - } - private void validateInputStage(Plan plan, ReservationRequest rr) throws ContractValidationException { @@ -291,13 +279,56 @@ private void validateInputStage(Plan plan, ReservationRequest rr) rr.getCapability(), plan.getMaximumAllocation())) { throw new ContractValidationException( - "Individual capability requests should not exceed cluster's " + - "maxAlloc"); + "Individual capability requests should not exceed cluster's " + + "maxAlloc"); } } + private boolean isNonPreemptiveAllocation( + Map curAlloc) { + + // Checks whether a stage allocation is non preemptive or not. + // Assumption: the intervals are non-intersecting (as returned by + // computeStageAllocation()). + // For a non-preemptive allocation, only two end points appear exactly once + + Set endPoints = new HashSet(2 * curAlloc.size()); + for (Entry entry : curAlloc.entrySet()) { + + ReservationInterval interval = entry.getKey(); + Resource resource = entry.getValue(); + + // Ignore intervals with no allocation + if (Resources.equals(resource, Resource.newInstance(0, 0))) { + continue; + } + + // Get endpoints + Long left = interval.getStartTime(); + Long right = interval.getEndTime(); + + // Add left endpoint if we haven't seen it before, remove otherwise + if (!endPoints.contains(left)) { + endPoints.add(left); + } else { + endPoints.remove(left); + } + + // Add right endpoint if we haven't seen it before, remove otherwise + if (!endPoints.contains(right)) { + endPoints.add(right); + } else { + endPoints.remove(right); + } + } + + // Non-preemptive only if endPoints is of size 2 + return (endPoints.size() == 2); + + } + // Call algEarliestStartTime() protected long computeEarliestStartingTime(Plan plan, ReservationDefinition reservation, int index, @@ -335,4 +366,60 @@ public IterativePlanner setAlgStageAllocator(StageAllocator alg) { } + /** + * Helper class that provide a list of ReservationRequests and iterates + * forward or backward depending whether we are allocating left-to-right or + * right-to-left. + */ + public static class StageProvider { + + private final boolean allocateLeft; + + private ListIterator li; + + public StageProvider(boolean allocateLeft, + ReservationDefinition reservation) { + + this.allocateLeft = allocateLeft; + int startingIndex; + if (allocateLeft) { + startingIndex = 0; + } else { + startingIndex = + reservation.getReservationRequests().getReservationResources() + .size(); + } + // Get a reverse iterator for the set of stages + li = + reservation.getReservationRequests().getReservationResources() + .listIterator(startingIndex); + + } + + public boolean hasNext() { + if (allocateLeft) { + return li.hasNext(); + } else { + return li.hasPrevious(); + } + } + + public ReservationRequest next() { + if (allocateLeft) { + return li.next(); + } else { + return li.previous(); + } + } + + public int getCurrentIndex() { + if (allocateLeft) { + return li.nextIndex() - 1; + } else { + return li.previousIndex() + 1; + } + } + + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java new file mode 100644 index 00000000000..c5a3192625c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; + +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * Computes the stage allocation according to the greedy allocation rule. The + * greedy rule repeatedly allocates requested containers at the leftmost or + * rightmost possible interval. This implementation leverages the + * run-length-encoding of the time-series we operate on and proceed more quickly + * than the baseline. + */ + +public class StageAllocatorGreedyRLE implements StageAllocator { + + private final boolean allocateLeft; + + public StageAllocatorGreedyRLE(boolean allocateLeft) { + this.allocateLeft = allocateLeft; + } + + @Override + public Map computeStageAllocation(Plan plan, + Map planLoads, + RLESparseResourceAllocation planModifications, ReservationRequest rr, + long stageEarliestStart, long stageDeadline, String user, + ReservationId oldId) throws PlanningException { + + // abort early if the interval is not satisfiable + if (stageEarliestStart + rr.getDuration() > stageDeadline) { + return null; + } + + Map allocationRequests = + new HashMap(); + + Resource totalCapacity = plan.getTotalCapacity(); + + // compute the gang as a resource and get the duration + Resource sizeOfGang = + Resources.multiply(rr.getCapability(), rr.getConcurrency()); + long dur = rr.getDuration(); + long step = plan.getStep(); + + // ceil the duration to the next multiple of the plan step + if (dur % step != 0) { + dur += (step - (dur % step)); + } + + // we know for sure that this division has no remainder (part of contract + // with user, validate before + int gangsToPlace = rr.getNumContainers() / rr.getConcurrency(); + + // get available resources from plan + RLESparseResourceAllocation netRLERes = + plan.getAvailableResourceOverTime(user, oldId, stageEarliestStart, + stageDeadline); + + // remove plan modifications + netRLERes = + RLESparseResourceAllocation.merge(plan.getResourceCalculator(), + totalCapacity, netRLERes, planModifications, RLEOperator.subtract, + stageEarliestStart, stageDeadline); + + // loop trying to place until we are done, or we are considering + // an invalid range of times + while (gangsToPlace > 0 && stageEarliestStart + dur <= stageDeadline) { + + // as we run along we remember how many gangs we can fit, and what + // was the most constraining moment in time (we will restart just + // after that to place the next batch) + int maxGang = gangsToPlace; + long minPoint = -1; + + // focus our attention to a time-range under consideration + NavigableMap partialMap = + netRLERes.getRangeOverlapping(stageEarliestStart, stageDeadline) + .getCumulative(); + + // revert the map for right-to-left allocation + if (!allocateLeft) { + partialMap = partialMap.descendingMap(); + } + + Iterator> netIt = partialMap.entrySet().iterator(); + + long oldT = stageDeadline; + + // internal loop, tries to allocate as many gang as possible starting + // at a given point in time, if it fails we move to the next time + // interval (with outside loop) + while (maxGang > 0 && netIt.hasNext()) { + + long t; + Resource curAvailRes; + + Entry e = netIt.next(); + if (allocateLeft) { + t = Math.max(e.getKey(), stageEarliestStart); + curAvailRes = e.getValue(); + } else { + t = oldT; + oldT = e.getKey(); + //attention: higher means lower, because we reversed the map direction + curAvailRes = partialMap.higherEntry(t).getValue(); + } + + // check exit/skip conditions/ + if (curAvailRes == null) { + //skip undefined regions (should not happen beside borders) + continue; + } + if (exitCondition(t, stageEarliestStart, stageDeadline, dur)) { + break; + } + + // compute maximum number of gangs we could fit + int curMaxGang = + (int) Math.floor(Resources.divide(plan.getResourceCalculator(), + totalCapacity, curAvailRes, sizeOfGang)); + curMaxGang = Math.min(gangsToPlace, curMaxGang); + + // compare with previous max, and set it. also remember *where* we found + // the minimum (useful for next attempts) + if (curMaxGang <= maxGang) { + maxGang = curMaxGang; + minPoint = t; + } + } + + // update data structures that retain the progress made so far + gangsToPlace = + trackProgress(planModifications, rr, stageEarliestStart, + stageDeadline, allocationRequests, dur, gangsToPlace, maxGang); + + // reset the next range of time-intervals to deal with + if (allocateLeft) { + // set earliest start to the min of the constraining "range" or my the + // end of this allocation + stageEarliestStart = + Math.min(partialMap.higherKey(minPoint), stageEarliestStart + dur); + } else { + // same as above moving right-to-left + stageDeadline = + Math.max(partialMap.higherKey(minPoint), stageDeadline - dur); + } + } + + // if no gangs are left to place we succeed and return the allocation + if (gangsToPlace == 0) { + return allocationRequests; + } else { + // If we are here is because we did not manage to satisfy this request. + // So we need to remove unwanted side-effect from tempAssigned (needed + // for ANY). + for (Map.Entry tempAllocation : + allocationRequests.entrySet()) { + planModifications.removeInterval(tempAllocation.getKey(), + tempAllocation.getValue()); + } + // and return null to signal failure in this allocation + return null; + } + + } + + private int trackProgress(RLESparseResourceAllocation planModifications, + ReservationRequest rr, long stageEarliestStart, long stageDeadline, + Map allocationRequests, long dur, + int gangsToPlace, int maxGang) { + // if we were able to place any gang, record this, and decrement + // gangsToPlace + if (maxGang > 0) { + gangsToPlace -= maxGang; + + ReservationInterval reservationInt = + computeReservationInterval(stageEarliestStart, stageDeadline, dur); + Resource reservationRes = + Resources.multiply(rr.getCapability(), rr.getConcurrency() * maxGang); + // remember occupied space (plan is read-only till we find a plausible + // allocation for the entire request). This is needed since we might be + // placing other ReservationRequest within the same + // ReservationDefinition, + // and we must avoid double-counting the available resources + planModifications.addInterval(reservationInt, reservationRes); + allocationRequests.put(reservationInt, reservationRes); + + } + return gangsToPlace; + } + + private ReservationInterval computeReservationInterval( + long stageEarliestStart, long stageDeadline, long dur) { + ReservationInterval reservationInt; + if (allocateLeft) { + reservationInt = + new ReservationInterval(stageEarliestStart, stageEarliestStart + dur); + } else { + reservationInt = + new ReservationInterval(stageDeadline - dur, stageDeadline); + } + return reservationInt; + } + + + private boolean exitCondition(long t, long stageEarliestStart, + long stageDeadline, long dur) { + if (allocateLeft) { + return t >= stageEarliestStart + dur; + } else { + return t < stageDeadline - dur; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index 0aedc6a4cfe..4aef7aee900 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -19,9 +19,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anySetOf; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import java.io.FileWriter; import java.io.IOException; @@ -75,12 +73,14 @@ public static ReservationId getNewReservationId() { public static ReservationSchedulerConfiguration createConf( String reservationQ, long timeWindow, float instConstraint, float avgConstraint) { - ReservationSchedulerConfiguration conf = - mock(ReservationSchedulerConfiguration.class); + + ReservationSchedulerConfiguration realConf = new CapacitySchedulerConfiguration(); + ReservationSchedulerConfiguration conf = spy(realConf); when(conf.getReservationWindow(reservationQ)).thenReturn(timeWindow); when(conf.getInstantaneousMaxCapacity(reservationQ)) .thenReturn(instConstraint); when(conf.getAverageCapacity(reservationQ)).thenReturn(avgConstraint); + return conf; } @@ -177,10 +177,15 @@ public static FairScheduler setupFairScheduler(RMContext rmContext, public static ReservationDefinition createSimpleReservationDefinition( long arrival, long deadline, long duration) { + return createSimpleReservationDefinition(arrival, deadline, duration, 1); + } + + public static ReservationDefinition createSimpleReservationDefinition( + long arrival, long deadline, long duration, int parallelism) { // create a request with a single atomic ask ReservationRequest r = - ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1, - duration); + ReservationRequest.newInstance(Resource.newInstance(1024, 1), + parallelism, parallelism, duration); ReservationDefinition rDef = new ReservationDefinitionPBImpl(); ReservationRequests reqs = new ReservationRequestsPBImpl(); reqs.setReservationResources(Collections.singletonList(r)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java index f81e7ec3ecb..b8a618bac6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java @@ -26,6 +26,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -55,8 +57,12 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.mortbay.log.Log; +@RunWith(Parameterized.class) public class TestGreedyReservationAgent { ReservationAgent agent; @@ -66,6 +72,17 @@ public class TestGreedyReservationAgent { Resource maxAlloc = Resource.newInstance(1024 * 8, 8); Random rand = new Random(); long step; + boolean allocateLeft; + + public TestGreedyReservationAgent(Boolean b){ + this.allocateLeft = b; + } + + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { + {true}, {false}}); + } @Before public void setup() throws Exception { @@ -90,7 +107,11 @@ public void setup() throws Exception { CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); policy.init(reservationQ, conf); - agent = new GreedyReservationAgent(); + // setting conf to + conf.setBoolean(GreedyReservationAgent.GREEDY_FAVOR_EARLY_ALLOCATION, + allocateLeft); + + agent = new GreedyReservationAgent(conf); QueueMetrics queueMetrics = mock(QueueMetrics.class); RMContext context = ReservationSystemTestUtil.createMockRMContext(); @@ -130,13 +151,21 @@ public void testSimple() throws PlanningException { System.out.println(plan.toString()); System.out.println(plan.toCumulativeString()); - for (long i = 10 * step; i < 20 * step; i++) { - assertTrue( - "Agent-based allocation unexpected", - Resources.equals(cs.getResourcesAtTime(i), - Resource.newInstance(2048 * 10, 2 * 10))); + if(allocateLeft){ + for (long i = 5 * step; i < 15 * step; i++) { + assertTrue( + "Agent-based allocation unexpected", + Resources.equals(cs.getResourcesAtTime(i), + Resource.newInstance(2048 * 10, 2 * 10))); + } + } else { + for (long i = 10 * step; i < 20 * step; i++) { + assertTrue( + "Agent-based allocation unexpected", + Resources.equals(cs.getResourcesAtTime(i), + Resource.newInstance(2048 * 10, 2 * 10))); + } } - } @SuppressWarnings("javadoc") @@ -212,18 +241,33 @@ public void testSharingPolicyFeedback() throws PlanningException { System.out.println(plan.toString()); System.out.println(plan.toCumulativeString()); - for (long i = 90 * step; i < 100 * step; i++) { - assertTrue( - "Agent-based allocation unexpected", - Resources.equals(cs.getResourcesAtTime(i), - Resource.newInstance(2048 * 20, 2 * 20))); - } - // RR2 is pushed out by the presence of RR - for (long i = 80 * step; i < 90 * step; i++) { - assertTrue( - "Agent-based allocation unexpected", - Resources.equals(cs2.getResourcesAtTime(i), - Resource.newInstance(2048 * 20, 2 * 20))); + if (allocateLeft) { + for (long i = 5 * step; i < 15 * step; i++) { + assertTrue( + "Agent-based allocation unexpected", + Resources.equals(cs.getResourcesAtTime(i), + Resource.newInstance(2048 * 20, 2 * 20))); + } + for (long i = 15 * step; i < 25 * step; i++) { + // RR2 is pushed out by the presence of RR + assertTrue( + "Agent-based allocation unexpected", + Resources.equals(cs2.getResourcesAtTime(i), + Resource.newInstance(2048 * 20, 2 * 20))); + } + } else { + for (long i = 90 * step; i < 100 * step; i++) { + assertTrue( + "Agent-based allocation unexpected", + Resources.equals(cs.getResourcesAtTime(i), + Resource.newInstance(2048 * 20, 2 * 20))); + } + for (long i = 80 * step; i < 90 * step; i++) { + assertTrue( + "Agent-based allocation unexpected", + Resources.equals(cs2.getResourcesAtTime(i), + Resource.newInstance(2048 * 20, 2 * 20))); + } } } @@ -274,10 +318,18 @@ public void testOrder() throws PlanningException { ReservationAllocation cs = plan.getReservationById(reservationID); - assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1)); - assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1)); - assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1)); - assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1)); + if (allocateLeft) { + assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1)); + assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1)); + assertTrue(cs.toString(), check(cs, 32 * step, 42 * step, 20, 1024, 1)); + assertTrue(cs.toString(), check(cs, 42 * step, 62 * step, 10, 1024, 1)); + + } else { + assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1)); + assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1)); + assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1)); + assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1)); + } System.out.println("--------AFTER ORDER ALLOCATION (queue: " + reservationID + ")----------"); System.out.println(plan.toString()); @@ -466,7 +518,12 @@ public void testAny() throws PlanningException { ReservationAllocation cs = plan.getReservationById(reservationID); - assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1)); + if (allocateLeft) { + assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 5, 1024, 1)); + } else { + assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1)); + } + System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID + ")----------"); System.out.println(plan.toString()); @@ -551,8 +608,13 @@ public void testAll() throws PlanningException { ReservationAllocation cs = plan.getReservationById(reservationID); - assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 20, 1024, 1)); - assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 25, 1024, 1)); + if (allocateLeft) { + assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 25, 1024, 1)); + assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1)); + } else { + assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 20, 1024, 1)); + assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 25, 1024, 1)); + } System.out.println("--------AFTER ALL ALLOCATION (queue: " + reservationID + ")----------"); @@ -695,14 +757,18 @@ public void testStress(int numJobs) throws PlanningException, IOException { public static void main(String[] arg) { + boolean left = false; // run a stress test with by default 1000 random jobs int numJobs = 1000; if (arg.length > 0) { numJobs = Integer.parseInt(arg[0]); } + if (arg.length > 1) { + left = Boolean.parseBoolean(arg[1]); + } try { - TestGreedyReservationAgent test = new TestGreedyReservationAgent(); + TestGreedyReservationAgent test = new TestGreedyReservationAgent(left); test.setup(); test.testStress(numJobs); } catch (Exception e) {