diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b803b89bb26..c3f20159c57 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -92,6 +92,9 @@ Release 2.8.0 - UNRELEASED YARN-2019. Retrospect on decision of making RM crashed if any exception throw in ZKRMStateStore. (Jian He via junping_du) + YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations. + (Jonathan Yaniv and Ishai Menache via curino) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java index 8a15ac6a07b..d2603c184d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java @@ -40,6 +40,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java deleted file mode 100644 index 214df1cecd3..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java +++ /dev/null @@ -1,390 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.reservation; - -import java.util.HashMap; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.yarn.api.records.ReservationDefinition; -import org.apache.hadoop.yarn.api.records.ReservationId; -import org.apache.hadoop.yarn.api.records.ReservationRequest; -import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This Agent employs a simple greedy placement strategy, placing the various - * stages of a {@link ReservationRequest} from the deadline moving backward - * towards the arrival. This allows jobs with earlier deadline to be scheduled - * greedily as well. Combined with an opportunistic anticipation of work if the - * cluster is not fully utilized also seems to provide good latency for - * best-effort jobs (i.e., jobs running without a reservation). - * - * This agent does not account for locality and only consider container - * granularity for validation purposes (i.e., you can't exceed max-container - * size). - */ -public class GreedyReservationAgent implements ReservationAgent { - - private static final Logger LOG = LoggerFactory - .getLogger(GreedyReservationAgent.class); - - @Override - public boolean createReservation(ReservationId reservationId, String user, - Plan plan, ReservationDefinition contract) throws PlanningException { - return computeAllocation(reservationId, user, plan, contract, null); - } - - @Override - public boolean updateReservation(ReservationId reservationId, String user, - Plan plan, ReservationDefinition contract) throws PlanningException { - return computeAllocation(reservationId, user, plan, contract, - plan.getReservationById(reservationId)); - } - - @Override - public boolean deleteReservation(ReservationId reservationId, String user, - Plan plan) throws PlanningException { - return plan.deleteReservation(reservationId); - } - - private boolean computeAllocation(ReservationId reservationId, String user, - Plan plan, ReservationDefinition contract, - ReservationAllocation oldReservation) throws PlanningException, - ContractValidationException { - LOG.info("placing the following ReservationRequest: " + contract); - - Resource totalCapacity = plan.getTotalCapacity(); - - // Here we can addd logic to adjust the ResourceDefinition to account for - // system "imperfections" (e.g., scheduling delays for large containers). - - // Align with plan step conservatively (i.e., ceil arrival, and floor - // deadline) - long earliestStart = contract.getArrival(); - long step = plan.getStep(); - if (earliestStart % step != 0) { - earliestStart = earliestStart + (step - (earliestStart % step)); - } - long deadline = - contract.getDeadline() - contract.getDeadline() % plan.getStep(); - - // setup temporary variables to handle time-relations between stages and - // intermediate answers - long curDeadline = deadline; - long oldDeadline = -1; - - Map allocations = - new HashMap(); - RLESparseResourceAllocation tempAssigned = - new RLESparseResourceAllocation(plan.getResourceCalculator(), - plan.getMinimumAllocation()); - - List stages = contract.getReservationRequests() - .getReservationResources(); - ReservationRequestInterpreter type = contract.getReservationRequests() - .getInterpreter(); - - boolean hasGang = false; - - // Iterate the stages in backward from deadline - for (ListIterator li = - stages.listIterator(stages.size()); li.hasPrevious();) { - - ReservationRequest currentReservationStage = li.previous(); - - // validate the RR respect basic constraints - validateInput(plan, currentReservationStage, totalCapacity); - - hasGang |= currentReservationStage.getConcurrency() > 1; - - // run allocation for a single stage - Map curAlloc = - placeSingleStage(plan, tempAssigned, currentReservationStage, - earliestStart, curDeadline, oldReservation, totalCapacity); - - if (curAlloc == null) { - // if we did not find an allocation for the currentReservationStage - // return null, unless the ReservationDefinition we are placing is of - // type ANY - if (type != ReservationRequestInterpreter.R_ANY) { - throw new PlanningException("The GreedyAgent" - + " couldn't find a valid allocation for your request"); - } else { - continue; - } - } else { - - // if we did find an allocation add it to the set of allocations - allocations.putAll(curAlloc); - - // if this request is of type ANY we are done searching (greedy) - // and can return the current allocation (break-out of the search) - if (type == ReservationRequestInterpreter.R_ANY) { - break; - } - - // if the request is of ORDER or ORDER_NO_GAP we constraint the next - // round of allocation to precede the current allocation, by setting - // curDeadline - if (type == ReservationRequestInterpreter.R_ORDER - || type == ReservationRequestInterpreter.R_ORDER_NO_GAP) { - curDeadline = findEarliestTime(curAlloc.keySet()); - - // for ORDER_NO_GAP verify that the allocation found so far has no - // gap, return null otherwise (the greedy procedure failed to find a - // no-gap - // allocation) - if (type == ReservationRequestInterpreter.R_ORDER_NO_GAP - && oldDeadline > 0) { - if (oldDeadline - findLatestTime(curAlloc.keySet()) > plan - .getStep()) { - throw new PlanningException("The GreedyAgent" - + " couldn't find a valid allocation for your request"); - } - } - // keep the variable oldDeadline pointing to the last deadline we - // found - oldDeadline = curDeadline; - } - } - } - - // / If we got here is because we failed to find an allocation for the - // ReservationDefinition give-up and report failure to the user - if (allocations.isEmpty()) { - throw new PlanningException("The GreedyAgent" - + " couldn't find a valid allocation for your request"); - } - - // create reservation with above allocations if not null/empty - - Resource ZERO_RES = Resource.newInstance(0, 0); - - long firstStartTime = findEarliestTime(allocations.keySet()); - - // add zero-padding from arrival up to the first non-null allocation - // to guarantee that the reservation exists starting at arrival - if (firstStartTime > earliestStart) { - allocations.put(new ReservationInterval(earliestStart, - firstStartTime), ZERO_RES); - firstStartTime = earliestStart; - // consider to add trailing zeros at the end for simmetry - } - - // Actually add/update the reservation in the plan. - // This is subject to validation as other agents might be placing - // in parallel and there might be sharing policies the agent is not - // aware off. - ReservationAllocation capReservation = - new InMemoryReservationAllocation(reservationId, contract, user, - plan.getQueueName(), firstStartTime, - findLatestTime(allocations.keySet()), allocations, - plan.getResourceCalculator(), plan.getMinimumAllocation(), hasGang); - if (oldReservation != null) { - return plan.updateReservation(capReservation); - } else { - return plan.addReservation(capReservation); - } - } - - private void validateInput(Plan plan, ReservationRequest rr, - Resource totalCapacity) throws ContractValidationException { - - if (rr.getConcurrency() < 1) { - throw new ContractValidationException("Gang Size should be >= 1"); - } - - if (rr.getNumContainers() <= 0) { - throw new ContractValidationException("Num containers should be >= 0"); - } - - // check that gangSize and numContainers are compatible - if (rr.getNumContainers() % rr.getConcurrency() != 0) { - throw new ContractValidationException( - "Parallelism must be an exact multiple of gang size"); - } - - // check that the largest container request does not exceed - // the cluster-wide limit for container sizes - if (Resources.greaterThan(plan.getResourceCalculator(), totalCapacity, - rr.getCapability(), plan.getMaximumAllocation())) { - throw new ContractValidationException("Individual" - + " capability requests should not exceed cluster's maxAlloc"); - } - } - - /** - * This method actually perform the placement of an atomic stage of the - * reservation. The key idea is to traverse the plan backward for a - * "lease-duration" worth of time, and compute what is the maximum multiple of - * our concurrency (gang) parameter we can fit. We do this and move towards - * previous instant in time until the time-window is exhausted or we placed - * all the user request. - */ - private Map placeSingleStage( - Plan plan, RLESparseResourceAllocation tempAssigned, - ReservationRequest rr, long earliestStart, long curDeadline, - ReservationAllocation oldResAllocation, final Resource totalCapacity) { - - Map allocationRequests = - new HashMap(); - - // compute the gang as a resource and get the duration - Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency()); - long dur = rr.getDuration(); - long step = plan.getStep(); - - // ceil the duration to the next multiple of the plan step - if (dur % step != 0) { - dur += (step - (dur % step)); - } - - // we know for sure that this division has no remainder (part of contract - // with user, validate before - int gangsToPlace = rr.getNumContainers() / rr.getConcurrency(); - - int maxGang = 0; - - // loop trying to place until we are done, or we are considering - // an invalid range of times - while (gangsToPlace > 0 && curDeadline - dur >= earliestStart) { - - // as we run along we remember how many gangs we can fit, and what - // was the most constraining moment in time (we will restart just - // after that to place the next batch) - maxGang = gangsToPlace; - long minPoint = curDeadline; - int curMaxGang = maxGang; - - // start placing at deadline (excluded due to [,) interval semantics and - // move backward - for (long t = curDeadline - plan.getStep(); t >= curDeadline - dur - && maxGang > 0; t = t - plan.getStep()) { - - // As we run along we will logically remove the previous allocation for - // this reservation - // if one existed - Resource oldResCap = Resource.newInstance(0, 0); - if (oldResAllocation != null) { - oldResCap = oldResAllocation.getResourcesAtTime(t); - } - - // compute net available resources - Resource netAvailableRes = Resources.clone(totalCapacity); - Resources.addTo(netAvailableRes, oldResCap); - Resources.subtractFrom(netAvailableRes, - plan.getTotalCommittedResources(t)); - Resources.subtractFrom(netAvailableRes, - tempAssigned.getCapacityAtTime(t)); - - // compute maximum number of gangs we could fit - curMaxGang = - (int) Math.floor(Resources.divide(plan.getResourceCalculator(), - totalCapacity, netAvailableRes, gang)); - - // pick the minimum between available resources in this instant, and how - // many gangs we have to place - curMaxGang = Math.min(gangsToPlace, curMaxGang); - - // compare with previous max, and set it. also remember *where* we found - // the minimum (useful for next attempts) - if (curMaxGang <= maxGang) { - maxGang = curMaxGang; - minPoint = t; - } - } - - // if we were able to place any gang, record this, and decrement - // gangsToPlace - if (maxGang > 0) { - gangsToPlace -= maxGang; - - ReservationInterval reservationInt = - new ReservationInterval(curDeadline - dur, curDeadline); - ReservationRequest reservationRequest = - ReservationRequest.newInstance(rr.getCapability(), - rr.getConcurrency() * maxGang, rr.getConcurrency(), - rr.getDuration()); - // remember occupied space (plan is read-only till we find a plausible - // allocation for the entire request). This is needed since we might be - // placing other ReservationRequest within the same - // ReservationDefinition, - // and we must avoid double-counting the available resources - final Resource reservationRes = ReservationSystemUtil.toResource( - reservationRequest); - tempAssigned.addInterval(reservationInt, reservationRes); - allocationRequests.put(reservationInt, reservationRes); - - } - - // reset our new starting point (curDeadline) to the most constraining - // point so far, we will look "left" of that to find more places where - // to schedule gangs (for sure nothing on the "right" of this point can - // fit a full gang. - curDeadline = minPoint; - } - - // if no gangs are left to place we succeed and return the allocation - if (gangsToPlace == 0) { - return allocationRequests; - } else { - // If we are here is becasue we did not manage to satisfy this request. - // So we need to remove unwanted side-effect from tempAssigned (needed - // for ANY). - for (Map.Entry tempAllocation : - allocationRequests.entrySet()) { - tempAssigned.removeInterval(tempAllocation.getKey(), - tempAllocation.getValue()); - } - // and return null to signal failure in this allocation - return null; - } - } - - // finds the leftmost point of this set of ReservationInterval - private long findEarliestTime(Set resInt) { - long ret = Long.MAX_VALUE; - for (ReservationInterval s : resInt) { - if (s.getStartTime() < ret) { - ret = s.getStartTime(); - } - } - return ret; - } - - // finds the rightmost point of this set of ReservationIntervals - private long findLatestTime(Set resInt) { - long ret = Long.MIN_VALUE; - for (ReservationInterval s : resInt) { - if (s.getEndTime() > ret) { - ret = s.getEndTime(); - } - } - return ret; - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java index 50d66cf8bc7..abc9c989e59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java @@ -33,6 +33,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.UTCClock; @@ -41,7 +43,12 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class InMemoryPlan implements Plan { +/** + * This class represents an in memory representation of the state of our + * reservation system, and provides accelerated access to both individual + * reservations and aggregate utilization of resources over time. + */ +public class InMemoryPlan implements Plan { private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class); @@ -75,7 +82,7 @@ class InMemoryPlan implements Plan { private Resource totalCapacity; - InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, + public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, ReservationAgent agent, Resource totalCapacity, long step, ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, String queueName, Planner replanner, boolean getMoveOnExpiry) { @@ -83,7 +90,7 @@ class InMemoryPlan implements Plan { maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock()); } - InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, + public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, ReservationAgent agent, Resource totalCapacity, long step, ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java index a4dd23bf73d..42a2243e557 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java @@ -29,9 +29,9 @@ import org.apache.hadoop.yarn.util.resource.Resources; /** * An in memory implementation of a reservation allocation using the * {@link RLESparseResourceAllocation} - * + * */ -class InMemoryReservationAllocation implements ReservationAllocation { +public class InMemoryReservationAllocation implements ReservationAllocation { private final String planName; private final ReservationId reservationID; @@ -45,7 +45,7 @@ class InMemoryReservationAllocation implements ReservationAllocation { private RLESparseResourceAllocation resourcesOverTime; - InMemoryReservationAllocation(ReservationId reservationID, + public InMemoryReservationAllocation(ReservationId reservationID, ReservationDefinition contract, String user, String planName, long startTime, long endTime, Map allocations, @@ -54,7 +54,7 @@ class InMemoryReservationAllocation implements ReservationAllocation { allocations, calculator, minAlloc, false); } - InMemoryReservationAllocation(ReservationId reservationID, + public InMemoryReservationAllocation(ReservationId reservationID, ReservationDefinition contract, String user, String planName, long startTime, long endTime, Map allocations, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java index e8e9e295e2c..f7ffbd0effd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; /** * A Plan represents the central data structure of a reservation system that diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java index 6d3506dd4b7..94e299e33a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java index b49e99ef6e9..be68906e81c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java @@ -1,26 +1,27 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import java.util.Set; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; /** * This interface provides a read-only view on the allocations made in this diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java index 2957cc6eaab..80f2ff7b1dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java @@ -38,7 +38,7 @@ import com.google.gson.stream.JsonWriter; /** * This is a run length encoded sparse data structure that maintains resource - * allocations over time + * allocations over time. */ public class RLESparseResourceAllocation { @@ -74,7 +74,7 @@ public class RLESparseResourceAllocation { /** * Add a resource for the specified interval - * + * * @param reservationInterval the interval for which the resource is to be * added * @param totCap the resource to be added @@ -138,7 +138,7 @@ public class RLESparseResourceAllocation { /** * Removes a resource for the specified interval - * + * * @param reservationInterval the interval for which the resource is to be * removed * @param totCap the resource to be removed @@ -189,7 +189,7 @@ public class RLESparseResourceAllocation { /** * Returns the capacity, i.e. total resources allocated at the specified point * of time - * + * * @param tick the time (UTC in ms) at which the capacity is requested * @return the resources allocated at the specified time */ @@ -208,7 +208,7 @@ public class RLESparseResourceAllocation { /** * Get the timestamp of the earliest resource allocation - * + * * @return the timestamp of the first resource allocation */ public long getEarliestStartTime() { @@ -226,7 +226,7 @@ public class RLESparseResourceAllocation { /** * Get the timestamp of the latest resource allocation - * + * * @return the timestamp of the last resource allocation */ public long getLatestEndTime() { @@ -244,7 +244,7 @@ public class RLESparseResourceAllocation { /** * Returns true if there are no non-zero entries - * + * * @return true if there are no allocations or false otherwise */ public boolean isEmpty() { @@ -287,7 +287,7 @@ public class RLESparseResourceAllocation { /** * Returns the JSON string representation of the current resources allocated * over time - * + * * @return the JSON string representation of the current resources allocated * over time */ @@ -312,4 +312,43 @@ public class RLESparseResourceAllocation { } } + /** + * Returns the representation of the current resources allocated over time as + * an interval map. + * + * @return the representation of the current resources allocated over time as + * an interval map. + */ + public Map toIntervalMap() { + + readLock.lock(); + try { + Map allocations = + new TreeMap(); + + // Empty + if (isEmpty()) { + return allocations; + } + + Map.Entry lastEntry = null; + for (Map.Entry entry : cumulativeCapacity.entrySet()) { + + if (lastEntry != null) { + ReservationInterval interval = + new ReservationInterval(lastEntry.getKey(), entry.getKey()); + Resource resource = lastEntry.getValue(); + + allocations.put(interval, resource); + } + + lastEntry = entry; + } + return allocations; + } finally { + readLock.unlock(); + } + + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java index 2af1ffdb7da..c430b1fea52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; public abstract class ReservationSchedulerConfiguration extends Configuration { @@ -33,11 +35,11 @@ public abstract class ReservationSchedulerConfiguration extends Configuration { @InterfaceAudience.Private public static final String DEFAULT_RESERVATION_AGENT_NAME = - "org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent"; + "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy"; @InterfaceAudience.Private public static final String DEFAULT_RESERVATION_PLANNER_NAME = - "org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner"; + "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.SimpleCapacityReplanner"; @InterfaceAudience.Private public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java index cb76dcf8e69..3309693843d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java @@ -24,12 +24,13 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ReservationId; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; /** * This interface is the one implemented by any system that wants to support diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java index 8affae43fe2..5562adcf95e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java @@ -25,7 +25,11 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.util.HashMap; import java.util.Map; -final class ReservationSystemUtil { +/** + * Simple helper class for static methods used to transform across + * common formats in tests + */ +public final class ReservationSystemUtil { private ReservationSystemUtil() { // not called diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java new file mode 100644 index 00000000000..a3899280a77 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A planning algorithm that first runs LowCostAligned, and if it fails runs + * Greedy. + */ +public class AlignedPlannerWithGreedy implements ReservationAgent { + + // Default smoothness factor + private static final int DEFAULT_SMOOTHNESS_FACTOR = 10; + + // Log + private static final Logger LOG = LoggerFactory + .getLogger(AlignedPlannerWithGreedy.class); + + // Smoothness factor + private final ReservationAgent planner; + + // Constructor + public AlignedPlannerWithGreedy() { + this(DEFAULT_SMOOTHNESS_FACTOR); + } + + // Constructor + public AlignedPlannerWithGreedy(int smoothnessFactor) { + + // List of algorithms + List listAlg = new LinkedList(); + + // LowCostAligned planning algorithm + ReservationAgent algAligned = + new IterativePlanner(new StageEarliestStartByDemand(), + new StageAllocatorLowCostAligned(smoothnessFactor)); + listAlg.add(algAligned); + + // Greedy planning algorithm + ReservationAgent algGreedy = + new IterativePlanner(new StageEarliestStartByJobArrival(), + new StageAllocatorGreedy()); + listAlg.add(algGreedy); + + // Set planner: + // 1. Attempt to execute algAligned + // 2. If failed, fall back to algGreedy + planner = new TryManyReservationAgents(listAlg); + + } + + @Override + public boolean createReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + LOG.info("placing the following ReservationRequest: " + contract); + + try { + boolean res = + planner.createReservation(reservationId, user, plan, contract); + + if (res) { + LOG.info("OUTCOME: SUCCESS, Reservation ID: " + + reservationId.toString() + ", Contract: " + contract.toString()); + } else { + LOG.info("OUTCOME: FAILURE, Reservation ID: " + + reservationId.toString() + ", Contract: " + contract.toString()); + } + return res; + } catch (PlanningException e) { + LOG.info("OUTCOME: FAILURE, Reservation ID: " + reservationId.toString() + + ", Contract: " + contract.toString()); + throw e; + } + + } + + @Override + public boolean updateReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + LOG.info("updating the following ReservationRequest: " + contract); + + return planner.updateReservation(reservationId, user, plan, contract); + + } + + @Override + public boolean deleteReservation(ReservationId reservationId, String user, + Plan plan) throws PlanningException { + + LOG.info("removing the following ReservationId: " + reservationId); + + return planner.deleteReservation(reservationId, user, plan); + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java new file mode 100644 index 00000000000..db82a66dee0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This Agent employs a simple greedy placement strategy, placing the various + * stages of a {@link ReservationDefinition} from the deadline moving backward + * towards the arrival. This allows jobs with earlier deadline to be scheduled + * greedily as well. Combined with an opportunistic anticipation of work if the + * cluster is not fully utilized also seems to provide good latency for + * best-effort jobs (i.e., jobs running without a reservation). + * + * This agent does not account for locality and only consider container + * granularity for validation purposes (i.e., you can't exceed max-container + * size). + */ + +public class GreedyReservationAgent implements ReservationAgent { + + // Log + private static final Logger LOG = LoggerFactory + .getLogger(GreedyReservationAgent.class); + + // Greedy planner + private final ReservationAgent planner = new IterativePlanner( + new StageEarliestStartByJobArrival(), new StageAllocatorGreedy()); + + @Override + public boolean createReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + LOG.info("placing the following ReservationRequest: " + contract); + + try { + boolean res = + planner.createReservation(reservationId, user, plan, contract); + + if (res) { + LOG.info("OUTCOME: SUCCESS, Reservation ID: " + + reservationId.toString() + ", Contract: " + contract.toString()); + } else { + LOG.info("OUTCOME: FAILURE, Reservation ID: " + + reservationId.toString() + ", Contract: " + contract.toString()); + } + return res; + } catch (PlanningException e) { + LOG.info("OUTCOME: FAILURE, Reservation ID: " + reservationId.toString() + + ", Contract: " + contract.toString()); + throw e; + } + + } + + @Override + public boolean updateReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + LOG.info("updating the following ReservationRequest: " + contract); + + return planner.updateReservation(reservationId, user, plan, contract); + + } + + @Override + public boolean deleteReservation(ReservationId reservationId, String user, + Plan plan) throws PlanningException { + + LOG.info("removing the following ReservationId: " + reservationId); + + return planner.deleteReservation(reservationId, user, plan); + + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java new file mode 100644 index 00000000000..342c2e7a504 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.HashMap; +import java.util.ListIterator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * A planning algorithm consisting of two main phases. The algorithm iterates + * over the job stages in descending order. For each stage, the algorithm: 1. + * Determines an interval [stageArrivalTime, stageDeadline) in which the stage + * is allocated. 2. Computes an allocation for the stage inside the interval. + * + * For ANY and ALL jobs, phase 1 sets the allocation window of each stage to be + * [jobArrival, jobDeadline]. For ORDER and ORDER_NO_GAP jobs, the deadline of + * each stage is set as succcessorStartTime - the starting time of its + * succeeding stage (or jobDeadline if it is the last stage). + * + * The phases are set using the two functions: 1. setAlgEarliestStartTime 2. + * setAlgComputeStageAllocation + */ +public class IterativePlanner extends PlanningAlgorithm { + + // Modifications performed by the algorithm that are not been reflected in the + // actual plan while a request is still pending. + private RLESparseResourceAllocation planModifications; + + // Data extracted from plan + private Map planLoads; + private Resource capacity; + private long step; + + // Job parameters + private ReservationRequestInterpreter jobType; + private long jobArrival; + private long jobDeadline; + + // Phase algorithms + private StageEarliestStart algStageEarliestStart = null; + private StageAllocator algStageAllocator = null; + + // Constructor + public IterativePlanner(StageEarliestStart algEarliestStartTime, + StageAllocator algStageAllocator) { + + setAlgStageEarliestStart(algEarliestStartTime); + setAlgStageAllocator(algStageAllocator); + + } + + @Override + public RLESparseResourceAllocation computeJobAllocation(Plan plan, + ReservationId reservationId, ReservationDefinition reservation) + throws ContractValidationException { + + // Initialize + initialize(plan, reservation); + + // If the job has been previously reserved, logically remove its allocation + ReservationAllocation oldReservation = + plan.getReservationById(reservationId); + if (oldReservation != null) { + ignoreOldAllocation(oldReservation); + } + + // Create the allocations data structure + RLESparseResourceAllocation allocations = + new RLESparseResourceAllocation(plan.getResourceCalculator(), + plan.getMinimumAllocation()); + + // Get a reverse iterator for the set of stages + ListIterator li = + reservation + .getReservationRequests() + .getReservationResources() + .listIterator( + reservation.getReservationRequests().getReservationResources() + .size()); + + // Current stage + ReservationRequest currentReservationStage; + + // Index, points on the current node + int index = + reservation.getReservationRequests().getReservationResources().size(); + + // Stage deadlines + long stageDeadline = stepRoundDown(reservation.getDeadline(), step); + long successorStartingTime = -1; + + // Iterate the stages in reverse order + while (li.hasPrevious()) { + + // Get current stage + currentReservationStage = li.previous(); + index -= 1; + + // Validate that the ReservationRequest respects basic constraints + validateInputStage(plan, currentReservationStage); + + // Compute an adjusted earliestStart for this resource + // (we need this to provision some space for the ORDER contracts) + long stageArrivalTime = reservation.getArrival(); + if (jobType == ReservationRequestInterpreter.R_ORDER + || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) { + stageArrivalTime = + computeEarliestStartingTime(plan, reservation, index, + currentReservationStage, stageDeadline); + } + stageArrivalTime = stepRoundUp(stageArrivalTime, step); + stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival()); + + // Compute the allocation of a single stage + Map curAlloc = + computeStageAllocation(plan, currentReservationStage, + stageArrivalTime, stageDeadline); + + // If we did not find an allocation, return NULL + // (unless it's an ANY job, then we simply continue). + if (curAlloc == null) { + + // If it's an ANY job, we can move to the next possible request + if (jobType == ReservationRequestInterpreter.R_ANY) { + continue; + } + + // Otherwise, the job cannot be allocated + return null; + + } + + // Get the start & end time of the current allocation + Long stageStartTime = findEarliestTime(curAlloc.keySet()); + Long stageEndTime = findLatestTime(curAlloc.keySet()); + + // If we did find an allocation for the stage, add it + for (Entry entry : curAlloc.entrySet()) { + allocations.addInterval(entry.getKey(), entry.getValue()); + } + + // If this is an ANY clause, we have finished + if (jobType == ReservationRequestInterpreter.R_ANY) { + break; + } + + // If ORDER job, set the stageDeadline of the next stage to be processed + if (jobType == ReservationRequestInterpreter.R_ORDER + || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) { + + // Verify that there is no gap, in case the job is ORDER_NO_GAP + if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP + && successorStartingTime != -1 + && successorStartingTime > stageEndTime) { + + return null; + + } + + // Store the stageStartTime and set the new stageDeadline + successorStartingTime = stageStartTime; + stageDeadline = stageStartTime; + + } + + } + + // If the allocation is empty, return an error + if (allocations.isEmpty()) { + return null; + } + + return allocations; + + } + + protected void initialize(Plan plan, ReservationDefinition reservation) { + + // Get plan step & capacity + capacity = plan.getTotalCapacity(); + step = plan.getStep(); + + // Get job parameters (type, arrival time & deadline) + jobType = reservation.getReservationRequests().getInterpreter(); + jobArrival = stepRoundUp(reservation.getArrival(), step); + jobDeadline = stepRoundDown(reservation.getDeadline(), step); + + // Dirty read of plan load + planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline); + + // Initialize the plan modifications + planModifications = + new RLESparseResourceAllocation(plan.getResourceCalculator(), + plan.getMinimumAllocation()); + + } + + private Map getAllLoadsInInterval(Plan plan, long startTime, + long endTime) { + + // Create map + Map loads = new HashMap(); + + // Calculate the load for every time slot between [start,end) + for (long t = startTime; t < endTime; t += step) { + Resource load = plan.getTotalCommittedResources(t); + loads.put(t, load); + } + + // Return map + return loads; + + } + + private void ignoreOldAllocation(ReservationAllocation oldReservation) { + + // If there is no old reservation, return + if (oldReservation == null) { + return; + } + + // Subtract each allocation interval from the planModifications + for (Entry entry : oldReservation + .getAllocationRequests().entrySet()) { + + // Read the entry + ReservationInterval interval = entry.getKey(); + Resource resource = entry.getValue(); + + // Find the actual request + Resource negativeResource = Resources.multiply(resource, -1); + + // Insert it into planModifications as a 'negative' request, to + // represent available resources + planModifications.addInterval(interval, negativeResource); + + } + + } + + private void validateInputStage(Plan plan, ReservationRequest rr) + throws ContractValidationException { + + // Validate concurrency + if (rr.getConcurrency() < 1) { + throw new ContractValidationException("Gang Size should be >= 1"); + } + + // Validate number of containers + if (rr.getNumContainers() <= 0) { + throw new ContractValidationException("Num containers should be > 0"); + } + + // Check that gangSize and numContainers are compatible + if (rr.getNumContainers() % rr.getConcurrency() != 0) { + throw new ContractValidationException( + "Parallelism must be an exact multiple of gang size"); + } + + // Check that the largest container request does not exceed the cluster-wide + // limit for container sizes + if (Resources.greaterThan(plan.getResourceCalculator(), capacity, + rr.getCapability(), plan.getMaximumAllocation())) { + + throw new ContractValidationException( + "Individual capability requests should not exceed cluster's " + + "maxAlloc"); + + } + + } + + // Call algEarliestStartTime() + protected long computeEarliestStartingTime(Plan plan, + ReservationDefinition reservation, int index, + ReservationRequest currentReservationStage, long stageDeadline) { + + return algStageEarliestStart.setEarliestStartTime(plan, reservation, index, + currentReservationStage, stageDeadline); + + } + + // Call algStageAllocator + protected Map computeStageAllocation( + Plan plan, ReservationRequest rr, long stageArrivalTime, + long stageDeadline) { + + return algStageAllocator.computeStageAllocation(plan, planLoads, + planModifications, rr, stageArrivalTime, stageDeadline); + + } + + // Set the algorithm: algStageEarliestStart + public IterativePlanner setAlgStageEarliestStart(StageEarliestStart alg) { + + this.algStageEarliestStart = alg; + return this; // To allow concatenation of setAlg() functions + + } + + // Set the algorithm: algStageAllocator + public IterativePlanner setAlgStageAllocator(StageAllocator alg) { + + this.algStageAllocator = alg; + return this; // To allow concatenation of setAlg() functions + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java similarity index 90% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java index 57f28ff034b..abac6ac2c14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java @@ -16,11 +16,13 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.resourcemanager.reservation; +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; import java.util.List; import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; public interface Planner { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java new file mode 100644 index 00000000000..9a0a0f079b5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; + +/** + * An abstract class that follows the general behavior of planning algorithms. + */ +public abstract class PlanningAlgorithm implements ReservationAgent { + + /** + * Performs the actual allocation for a ReservationDefinition within a Plan. + * + * @param reservationId the identifier of the reservation + * @param user the user who owns the reservation + * @param plan the Plan to which the reservation must be fitted + * @param contract encapsulates the resources required by the user for his + * session + * @param oldReservation the existing reservation (null if none) + * @return whether the allocateUser function was successful or not + * + * @throws PlanningException if the session cannot be fitted into the plan + * @throws ContractValidationException + */ + protected boolean allocateUser(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract, + ReservationAllocation oldReservation) throws PlanningException, + ContractValidationException { + + // Adjust the ResourceDefinition to account for system "imperfections" + // (e.g., scheduling delays for large containers). + ReservationDefinition adjustedContract = adjustContract(plan, contract); + + // Compute the job allocation + RLESparseResourceAllocation allocation = + computeJobAllocation(plan, reservationId, adjustedContract); + + // If no job allocation was found, fail + if (allocation == null) { + throw new PlanningException( + "The planning algorithm could not find a valid allocation" + + " for your request"); + } + + // Translate the allocation to a map (with zero paddings) + long step = plan.getStep(); + long jobArrival = stepRoundUp(adjustedContract.getArrival(), step); + long jobDeadline = stepRoundUp(adjustedContract.getDeadline(), step); + Map mapAllocations = + allocationsToPaddedMap(allocation, jobArrival, jobDeadline); + + // Create the reservation + ReservationAllocation capReservation = + new InMemoryReservationAllocation(reservationId, // ID + adjustedContract, // Contract + user, // User name + plan.getQueueName(), // Queue name + findEarliestTime(mapAllocations.keySet()), // Earliest start time + findLatestTime(mapAllocations.keySet()), // Latest end time + mapAllocations, // Allocations + plan.getResourceCalculator(), // Resource calculator + plan.getMinimumAllocation()); // Minimum allocation + + // Add (or update) the reservation allocation + if (oldReservation != null) { + return plan.updateReservation(capReservation); + } else { + return plan.addReservation(capReservation); + } + + } + + private Map + allocationsToPaddedMap(RLESparseResourceAllocation allocation, + long jobArrival, long jobDeadline) { + + // Allocate + Map mapAllocations = + allocation.toIntervalMap(); + + // Zero allocation + Resource zeroResource = Resource.newInstance(0, 0); + + // Pad at the beginning + long earliestStart = findEarliestTime(mapAllocations.keySet()); + if (jobArrival < earliestStart) { + mapAllocations.put(new ReservationInterval(jobArrival, earliestStart), + zeroResource); + } + + // Pad at the beginning + long latestEnd = findLatestTime(mapAllocations.keySet()); + if (latestEnd < jobDeadline) { + mapAllocations.put(new ReservationInterval(latestEnd, jobDeadline), + zeroResource); + } + + return mapAllocations; + + } + + public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan, + ReservationId reservationId, ReservationDefinition reservation) + throws PlanningException, ContractValidationException; + + @Override + public boolean createReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + // Allocate + return allocateUser(reservationId, user, plan, contract, null); + + } + + @Override + public boolean updateReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + // Get the old allocation + ReservationAllocation oldAlloc = plan.getReservationById(reservationId); + + // Allocate (ignores the old allocation) + return allocateUser(reservationId, user, plan, contract, oldAlloc); + + } + + @Override + public boolean deleteReservation(ReservationId reservationId, String user, + Plan plan) throws PlanningException { + + // Delete the existing reservation + return plan.deleteReservation(reservationId); + + } + + protected static long findEarliestTime(Set sesInt) { + + long ret = Long.MAX_VALUE; + for (ReservationInterval s : sesInt) { + if (s.getStartTime() < ret) { + ret = s.getStartTime(); + } + } + return ret; + + } + + protected static long findLatestTime(Set sesInt) { + + long ret = Long.MIN_VALUE; + for (ReservationInterval s : sesInt) { + if (s.getEndTime() > ret) { + ret = s.getEndTime(); + } + } + return ret; + + } + + protected static long stepRoundDown(long t, long step) { + return (t / step) * step; + } + + protected static long stepRoundUp(long t, long step) { + return ((t + step - 1) / step) * step; + } + + private ReservationDefinition adjustContract(Plan plan, + ReservationDefinition originalContract) { + + // Place here adjustment. For example using QueueMetrics we can track + // large container delays per YARN-YARN-1990 + + return originalContract; + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java similarity index 97% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java index 69550369e66..bdea2f47d57 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java @@ -15,10 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ -package org.apache.hadoop.yarn.server.resourcemanager.reservation; +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java similarity index 90% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java index b5a6a9900ff..750778320b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.resourcemanager.reservation; +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; import java.util.Iterator; import java.util.List; @@ -27,6 +27,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.UTCClock; @@ -87,8 +90,9 @@ public class SimpleCapacityReplanner implements Planner { // loop on all moment in time from now to the end of the check Zone // or the end of the planned sessions whichever comes first - for (long t = now; (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); t += - plan.getStep()) { + for (long t = now; + (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); + t += plan.getStep()) { Resource excessCap = Resources.subtract(plan.getTotalCommittedResources(t), totCap); // if we are violating @@ -98,7 +102,8 @@ public class SimpleCapacityReplanner implements Planner { new TreeSet(plan.getReservationsAtTime(t)); for (Iterator resIter = curReservations.iterator(); resIter.hasNext() - && Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE);) { + && Resources.greaterThan(resCalc, totCap, excessCap, + ZERO_RESOURCE);) { ReservationAllocation reservation = resIter.next(); plan.deleteReservation(reservation.getReservationId()); excessCap = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java new file mode 100644 index 00000000000..9df6b749462 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; + +/** + * Interface for allocating a single stage in IterativePlanner. + */ +public interface StageAllocator { + + /** + * Computes the allocation of a stage inside a defined time interval. + * + * @param plan the Plan to which the reservation must be fitted + * @param planLoads a 'dirty' read of the plan loads at each time + * @param planModifications the allocations performed by the planning + * algorithm which are not yet reflected by plan + * @param rr the stage + * @param stageEarliestStart the arrival time (earliest starting time) set for + * the stage by the two phase planning algorithm + * @param stageDeadline the deadline of the stage set by the two phase + * planning algorithm + * + * @return The computed allocation (or null if the stage could not be + * allocated) + */ + Map computeStageAllocation(Plan plan, + Map planLoads, + RLESparseResourceAllocation planModifications, ReservationRequest rr, + long stageEarliestStart, long stageDeadline); + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java new file mode 100644 index 00000000000..773fbdfc5a1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * Computes the stage allocation according to the greedy allocation rule. The + * greedy rule repeatedly allocates requested containers at the rightmost + * (latest) free interval. + */ + +public class StageAllocatorGreedy implements StageAllocator { + + @Override + public Map computeStageAllocation(Plan plan, + Map planLoads, + RLESparseResourceAllocation planModifications, ReservationRequest rr, + long stageEarliestStart, long stageDeadline) { + + Resource totalCapacity = plan.getTotalCapacity(); + + Map allocationRequests = + new HashMap(); + + // compute the gang as a resource and get the duration + Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency()); + long dur = rr.getDuration(); + long step = plan.getStep(); + + // ceil the duration to the next multiple of the plan step + if (dur % step != 0) { + dur += (step - (dur % step)); + } + + // we know for sure that this division has no remainder (part of contract + // with user, validate before + int gangsToPlace = rr.getNumContainers() / rr.getConcurrency(); + + int maxGang = 0; + + // loop trying to place until we are done, or we are considering + // an invalid range of times + while (gangsToPlace > 0 && stageDeadline - dur >= stageEarliestStart) { + + // as we run along we remember how many gangs we can fit, and what + // was the most constraining moment in time (we will restart just + // after that to place the next batch) + maxGang = gangsToPlace; + long minPoint = stageDeadline; + int curMaxGang = maxGang; + + // start placing at deadline (excluded due to [,) interval semantics and + // move backward + for (long t = stageDeadline - plan.getStep(); t >= stageDeadline - dur + && maxGang > 0; t = t - plan.getStep()) { + + // compute net available resources + Resource netAvailableRes = Resources.clone(totalCapacity); + // Resources.addTo(netAvailableRes, oldResCap); + Resources.subtractFrom(netAvailableRes, + plan.getTotalCommittedResources(t)); + Resources.subtractFrom(netAvailableRes, + planModifications.getCapacityAtTime(t)); + + // compute maximum number of gangs we could fit + curMaxGang = + (int) Math.floor(Resources.divide(plan.getResourceCalculator(), + totalCapacity, netAvailableRes, gang)); + + // pick the minimum between available resources in this instant, and how + // many gangs we have to place + curMaxGang = Math.min(gangsToPlace, curMaxGang); + + // compare with previous max, and set it. also remember *where* we found + // the minimum (useful for next attempts) + if (curMaxGang <= maxGang) { + maxGang = curMaxGang; + minPoint = t; + } + } + + // if we were able to place any gang, record this, and decrement + // gangsToPlace + if (maxGang > 0) { + gangsToPlace -= maxGang; + + ReservationInterval reservationInt = + new ReservationInterval(stageDeadline - dur, stageDeadline); + Resource reservationRes = + Resources.multiply(rr.getCapability(), rr.getConcurrency() + * maxGang); + // remember occupied space (plan is read-only till we find a plausible + // allocation for the entire request). This is needed since we might be + // placing other ReservationRequest within the same + // ReservationDefinition, + // and we must avoid double-counting the available resources + planModifications.addInterval(reservationInt, reservationRes); + allocationRequests.put(reservationInt, reservationRes); + + } + + // reset our new starting point (curDeadline) to the most constraining + // point so far, we will look "left" of that to find more places where + // to schedule gangs (for sure nothing on the "right" of this point can + // fit a full gang. + stageDeadline = minPoint; + } + + // if no gangs are left to place we succeed and return the allocation + if (gangsToPlace == 0) { + return allocationRequests; + } else { + // If we are here is becasue we did not manage to satisfy this request. + // So we need to remove unwanted side-effect from tempAssigned (needed + // for ANY). + for (Map.Entry tempAllocation + : allocationRequests.entrySet()) { + planModifications.removeInterval(tempAllocation.getKey(), + tempAllocation.getValue()); + } + // and return null to signal failure in this allocation + return null; + } + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java new file mode 100644 index 00000000000..4b5763d9200 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java @@ -0,0 +1,360 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.Comparator; +import java.util.Map; +import java.util.TreeSet; + +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * A stage allocator that iteratively allocates containers in the + * {@link DurationInterval} with lowest overall cost. The algorithm only + * considers intervals of the form: [stageDeadline - (n+1)*duration, + * stageDeadline - n*duration) for an integer n. This guarantees that the + * allocations are aligned (as opposed to overlapping duration intervals). + * + * The smoothnessFactor parameter controls the number of containers that are + * simultaneously allocated in each iteration of the algorithm. + */ + +public class StageAllocatorLowCostAligned implements StageAllocator { + + // Smoothness factor + private int smoothnessFactor = 10; + + // Constructor + public StageAllocatorLowCostAligned() { + } + + // Constructor + public StageAllocatorLowCostAligned(int smoothnessFactor) { + this.smoothnessFactor = smoothnessFactor; + } + + // computeJobAllocation() + @Override + public Map computeStageAllocation( + Plan plan, Map planLoads, + RLESparseResourceAllocation planModifications, ReservationRequest rr, + long stageEarliestStart, long stageDeadline) { + + // Initialize + ResourceCalculator resCalc = plan.getResourceCalculator(); + Resource capacity = plan.getTotalCapacity(); + long step = plan.getStep(); + + // Create allocationRequestsearlies + RLESparseResourceAllocation allocationRequests = + new RLESparseResourceAllocation(plan.getResourceCalculator(), + plan.getMinimumAllocation()); + + // Initialize parameters + long duration = stepRoundUp(rr.getDuration(), step); + int windowSizeInDurations = + (int) ((stageDeadline - stageEarliestStart) / duration); + int totalGangs = rr.getNumContainers() / rr.getConcurrency(); + int numContainersPerGang = rr.getConcurrency(); + Resource gang = + Resources.multiply(rr.getCapability(), numContainersPerGang); + + // Set maxGangsPerUnit + int maxGangsPerUnit = + (int) Math.max( + Math.floor(((double) totalGangs) / windowSizeInDurations), 1); + maxGangsPerUnit = Math.max(maxGangsPerUnit / smoothnessFactor, 1); + + // If window size is too small, return null + if (windowSizeInDurations <= 0) { + return null; + } + + // Initialize tree sorted by costs + TreeSet durationIntervalsSortedByCost = + new TreeSet(new Comparator() { + @Override + public int compare(DurationInterval val1, DurationInterval val2) { + + int cmp = Double.compare(val1.getTotalCost(), val2.getTotalCost()); + if (cmp != 0) { + return cmp; + } + + return (-1) * Long.compare(val1.getEndTime(), val2.getEndTime()); + } + }); + + // Add durationIntervals that end at (endTime - n*duration) for some n. + for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart + + duration; intervalEnd -= duration) { + + long intervalStart = intervalEnd - duration; + + // Get duration interval [intervalStart,intervalEnd) + DurationInterval durationInterval = + getDurationInterval(intervalStart, intervalEnd, planLoads, + planModifications, capacity, resCalc, step); + + // If the interval can fit a gang, add it to the tree + if (durationInterval.canAllocate(gang, capacity, resCalc)) { + durationIntervalsSortedByCost.add(durationInterval); + } + } + + // Allocate + int remainingGangs = totalGangs; + while (remainingGangs > 0) { + + // If no durationInterval can fit a gang, break and return null + if (durationIntervalsSortedByCost.isEmpty()) { + break; + } + + // Get best duration interval + DurationInterval bestDurationInterval = + durationIntervalsSortedByCost.first(); + int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs); + + // Add it + remainingGangs -= numGangsToAllocate; + + ReservationInterval reservationInt = + new ReservationInterval(bestDurationInterval.getStartTime(), + bestDurationInterval.getEndTime()); + + Resource reservationRes = + Resources.multiply(rr.getCapability(), rr.getConcurrency() + * numGangsToAllocate); + + planModifications.addInterval(reservationInt, reservationRes); + allocationRequests.addInterval(reservationInt, reservationRes); + + // Remove from tree + durationIntervalsSortedByCost.remove(bestDurationInterval); + + // Get updated interval + DurationInterval updatedDurationInterval = + getDurationInterval(bestDurationInterval.getStartTime(), + bestDurationInterval.getStartTime() + duration, planLoads, + planModifications, capacity, resCalc, step); + + // Add to tree, if possible + if (updatedDurationInterval.canAllocate(gang, capacity, resCalc)) { + durationIntervalsSortedByCost.add(updatedDurationInterval); + } + + } + + // Get the final allocation + Map allocations = + allocationRequests.toIntervalMap(); + + // If no gangs are left to place we succeed and return the allocation + if (remainingGangs <= 0) { + return allocations; + } else { + + // If we are here is because we did not manage to satisfy this request. + // We remove unwanted side-effect from planModifications (needed for ANY). + for (Map.Entry tempAllocation + : allocations.entrySet()) { + + planModifications.removeInterval(tempAllocation.getKey(), + tempAllocation.getValue()); + + } + // Return null to signal failure in this allocation + return null; + + } + + } + + protected DurationInterval getDurationInterval(long startTime, long endTime, + Map planLoads, + RLESparseResourceAllocation planModifications, Resource capacity, + ResourceCalculator resCalc, long step) { + + // Initialize the dominant loads structure + Resource dominantResources = Resource.newInstance(0, 0); + + // Calculate totalCost and maxLoad + double totalCost = 0.0; + for (long t = startTime; t < endTime; t += step) { + + // Get the load + Resource load = getLoadAtTime(t, planLoads, planModifications); + + // Increase the total cost + totalCost += calcCostOfLoad(load, capacity, resCalc); + + // Update the dominant resources + dominantResources = Resources.componentwiseMax(dominantResources, load); + + } + + // Return the corresponding durationInterval + return new DurationInterval(startTime, endTime, totalCost, + dominantResources); + + } + + protected double calcCostOfInterval(long startTime, long endTime, + Map planLoads, + RLESparseResourceAllocation planModifications, Resource capacity, + ResourceCalculator resCalc, long step) { + + // Sum costs in the interval [startTime,endTime) + double totalCost = 0.0; + for (long t = startTime; t < endTime; t += step) { + totalCost += calcCostOfTimeSlot(t, planLoads, planModifications, capacity, + resCalc); + } + + // Return sum + return totalCost; + + } + + protected double calcCostOfTimeSlot(long t, Map planLoads, + RLESparseResourceAllocation planModifications, Resource capacity, + ResourceCalculator resCalc) { + + // Get the current load at time t + Resource load = getLoadAtTime(t, planLoads, planModifications); + + // Return cost + return calcCostOfLoad(load, capacity, resCalc); + + } + + protected Resource getLoadAtTime(long t, Map planLoads, + RLESparseResourceAllocation planModifications) { + + Resource planLoad = planLoads.get(t); + planLoad = (planLoad == null) ? Resource.newInstance(0, 0) : planLoad; + + return Resources.add(planLoad, planModifications.getCapacityAtTime(t)); + + } + + protected double calcCostOfLoad(Resource load, Resource capacity, + ResourceCalculator resCalc) { + + return resCalc.ratio(load, capacity); + + } + + protected static long stepRoundDown(long t, long step) { + return (t / step) * step; + } + + protected static long stepRoundUp(long t, long step) { + return ((t + step - 1) / step) * step; + } + + /** + * An inner class that represents an interval, typically of length duration. + * The class holds the total cost of the interval and the maximal load inside + * the interval in each dimension (both calculated externally). + */ + protected static class DurationInterval { + + private long startTime; + private long endTime; + private double cost; + private Resource maxLoad; + + // Constructor + public DurationInterval(long startTime, long endTime, double cost, + Resource maxLoad) { + this.startTime = startTime; + this.endTime = endTime; + this.cost = cost; + this.maxLoad = maxLoad; + } + + // canAllocate() - boolean function, returns whether requestedResources + // can be allocated during the durationInterval without + // violating capacity constraints + public boolean canAllocate(Resource requestedResources, Resource capacity, + ResourceCalculator resCalc) { + + Resource updatedMaxLoad = Resources.add(maxLoad, requestedResources); + return (resCalc.compare(capacity, updatedMaxLoad, capacity) <= 0); + + } + + // numCanFit() - returns the maximal number of requestedResources can be + // allocated during the durationInterval without violating + // capacity constraints + public int numCanFit(Resource requestedResources, Resource capacity, + ResourceCalculator resCalc) { + + // Represents the largest resource demand that can be satisfied throughout + // the entire DurationInterval (i.e., during [startTime,endTime)) + Resource availableResources = Resources.subtract(capacity, maxLoad); + + // Maximal number of requestedResources that fit inside the interval + return (int) Math.floor(Resources.divide(resCalc, capacity, + availableResources, requestedResources)); + + } + + public long getStartTime() { + return this.startTime; + } + + public void setStartTime(long value) { + this.startTime = value; + } + + public long getEndTime() { + return this.endTime; + } + + public void setEndTime(long value) { + this.endTime = value; + } + + public Resource getMaxLoad() { + return this.maxLoad; + } + + public void setMaxLoad(Resource value) { + this.maxLoad = value; + } + + public double getTotalCost() { + return this.cost; + } + + public void setTotalCost(double value) { + this.cost = value; + } + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java new file mode 100644 index 00000000000..547616a0d55 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; + +/** + * Interface for setting the earliest start time of a stage in IterativePlanner. + */ +public interface StageEarliestStart { + + /** + * Computes the earliest allowed starting time for a given stage. + * + * @param plan the Plan to which the reservation must be fitted + * @param reservation the job contract + * @param index the index of the stage in the job contract + * @param currentReservationStage the stage + * @param stageDeadline the deadline of the stage set by the two phase + * planning algorithm + * + * @return the earliest allowed starting time for the stage. + */ + long setEarliestStartTime(Plan plan, ReservationDefinition reservation, + int index, ReservationRequest currentReservationStage, + long stageDeadline); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java new file mode 100644 index 00000000000..5a46a4e43a8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.ListIterator; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; + +/** + * Sets the earliest start time of a stage proportional to the job weight. The + * interval [jobArrival, stageDeadline) is divided as follows. First, each stage + * is guaranteed at least its requested duration. Then, the stage receives a + * fraction of the remaining time. The fraction is calculated as the ratio + * between the weight (total requested resources) of the stage and the total + * weight of all proceeding stages. + */ + +public class StageEarliestStartByDemand implements StageEarliestStart { + + private long step; + + @Override + public long setEarliestStartTime(Plan plan, + ReservationDefinition reservation, int index, ReservationRequest current, + long stageDeadline) { + + step = plan.getStep(); + + // If this is the first stage, don't bother with the computation. + if (index < 1) { + return reservation.getArrival(); + } + + // Get iterator + ListIterator li = + reservation.getReservationRequests().getReservationResources() + .listIterator(index); + ReservationRequest rr; + + // Calculate the total weight & total duration + double totalWeight = calcWeight(current); + long totalDuration = getRoundedDuration(current, plan); + + while (li.hasPrevious()) { + rr = li.previous(); + totalWeight += calcWeight(rr); + totalDuration += getRoundedDuration(rr, plan); + } + + // Compute the weight of the current stage as compared to remaining ones + double ratio = calcWeight(current) / totalWeight; + + // Estimate an early start time, such that: + // 1. Every stage is guaranteed to receive at least its duration + // 2. The remainder of the window is divided between stages + // proportionally to its workload (total memory consumption) + long window = stageDeadline - reservation.getArrival(); + long windowRemainder = window - totalDuration; + long earlyStart = + (long) (stageDeadline - getRoundedDuration(current, plan) + - (windowRemainder * ratio)); + + // Realign if necessary (since we did some arithmetic) + earlyStart = stepRoundUp(earlyStart, step); + + // Return + return earlyStart; + + } + + // Weight = total memory consumption of stage + protected double calcWeight(ReservationRequest stage) { + return (stage.getDuration() * stage.getCapability().getMemory()) + * (stage.getNumContainers()); + } + + protected long getRoundedDuration(ReservationRequest stage, Plan plan) { + return stepRoundUp(stage.getDuration(), step); + } + + protected static long stepRoundDown(long t, long step) { + return (t / step) * step; + } + + protected static long stepRoundUp(long t, long step) { + return ((t + step - 1) / step) * step; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java new file mode 100644 index 00000000000..8347816808f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; + +/** + * Sets the earliest start time of a stage as the job arrival time. + */ +public class StageEarliestStartByJobArrival implements StageEarliestStart { + + @Override + public long setEarliestStartTime(Plan plan, + ReservationDefinition reservation, int index, ReservationRequest current, + long stageDeadline) { + + return reservation.getArrival(); + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java new file mode 100644 index 00000000000..1d37ce596be --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; + +/** + * A planning algorithm that invokes several other planning algorithms according + * to a given order. If one of the planners succeeds, the allocation it + * generates is returned. + */ +public class TryManyReservationAgents implements ReservationAgent { + + // Planning algorithms + private final List algs; + + // Constructor + public TryManyReservationAgents(List algs) { + this.algs = new LinkedList(algs); + } + + @Override + public boolean createReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + // Save the planning exception + PlanningException planningException = null; + + // Try all of the algorithms, in order + for (ReservationAgent alg : algs) { + + try { + if (alg.createReservation(reservationId, user, plan, contract)) { + return true; + } + } catch (PlanningException e) { + planningException = e; + } + + } + + // If all of the algorithms failed and one of the algorithms threw an + // exception, throw the last planning exception + if (planningException != null) { + throw planningException; + } + + // If all of the algorithms failed, return false + return false; + + } + + @Override + public boolean updateReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + // Save the planning exception + PlanningException planningException = null; + + // Try all of the algorithms, in order + for (ReservationAgent alg : algs) { + + try { + if (alg.updateReservation(reservationId, user, plan, contract)) { + return true; + } + } catch (PlanningException e) { + planningException = e; + } + + } + + // If all of the algorithms failed and one of the algorithms threw an + // exception, throw the last planning exception + if (planningException != null) { + throw planningException; + } + + // If all of the algorithms failed, return false + return false; + + } + + @Override + public boolean deleteReservation(ReservationId reservationId, String user, + Plan plan) throws PlanningException { + + return plan.deleteReservation(reservationId); + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index be1d69a7013..adb9dcf8786 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -89,7 +90,7 @@ public class ReservationSystemTestUtil { Assert.assertEquals(planQName, plan.getQueueName()); Assert.assertEquals(8192, plan.getTotalCapacity().getMemory()); Assert.assertTrue( - plan.getReservationAgent() instanceof GreedyReservationAgent); + plan.getReservationAgent() instanceof AlignedPlannerWithGreedy); Assert.assertTrue( plan.getSharingPolicy() instanceof CapacityOverTimePolicy); } @@ -102,7 +103,7 @@ public class ReservationSystemTestUtil { Assert.assertEquals(newQ, newPlan.getQueueName()); Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory()); Assert - .assertTrue(newPlan.getReservationAgent() instanceof GreedyReservationAgent); + .assertTrue(newPlan.getReservationAgent() instanceof AlignedPlannerWithGreedy); Assert .assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java index 19f876d4077..f608c3ba01f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; - +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java index b8663f660d4..15f9a89f1c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java index f294eaf0945..4b685b28b02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java index e9a4f50bac5..43316f7a589 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java index 722fb29def4..b6d24b66b53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java @@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java index 1e156183c9c..809892c06c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java index d0f4dc6a9d9..f0cc49ca671 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -164,6 +164,53 @@ public class TestRLESparseResourceAllocation { Assert.assertTrue(rleSparseVector.isEmpty()); } + @Test + public void testToIntervalMap() { + ResourceCalculator resCalc = new DefaultResourceCalculator(); + Resource minAlloc = Resource.newInstance(1, 1); + RLESparseResourceAllocation rleSparseVector = + new RLESparseResourceAllocation(resCalc, minAlloc); + Map mapAllocations; + + // Check empty + mapAllocations = rleSparseVector.toIntervalMap(); + Assert.assertTrue(mapAllocations.isEmpty()); + + // Check full + int[] alloc = { 0, 5, 10, 10, 5, 0, 5, 0 }; + int start = 100; + Set> inputs = + generateAllocation(start, alloc, false).entrySet(); + for (Entry ip : inputs) { + rleSparseVector.addInterval(ip.getKey(), ip.getValue()); + } + mapAllocations = rleSparseVector.toIntervalMap(); + Assert.assertTrue(mapAllocations.size() == 5); + for (Entry entry : mapAllocations + .entrySet()) { + ReservationInterval interval = entry.getKey(); + Resource resource = entry.getValue(); + if (interval.getStartTime() == 101L) { + Assert.assertTrue(interval.getEndTime() == 102L); + Assert.assertEquals(resource, Resource.newInstance(5 * 1024, 5)); + } else if (interval.getStartTime() == 102L) { + Assert.assertTrue(interval.getEndTime() == 104L); + Assert.assertEquals(resource, Resource.newInstance(10 * 1024, 10)); + } else if (interval.getStartTime() == 104L) { + Assert.assertTrue(interval.getEndTime() == 105L); + Assert.assertEquals(resource, Resource.newInstance(5 * 1024, 5)); + } else if (interval.getStartTime() == 105L) { + Assert.assertTrue(interval.getEndTime() == 106L); + Assert.assertEquals(resource, Resource.newInstance(0 * 1024, 0)); + } else if (interval.getStartTime() == 106L) { + Assert.assertTrue(interval.getEndTime() == 107L); + Assert.assertEquals(resource, Resource.newInstance(5 * 1024, 5)); + } else { + Assert.fail(); + } + } + } + private Map generateAllocation( int startTime, int[] alloc, boolean isStep) { Map req = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java index 50df8fe091f..f5625fb27be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java new file mode 100644 index 00000000000..9a1621a7aaa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java @@ -0,0 +1,820 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Before; +import org.junit.Test; +import org.mortbay.log.Log; + +public class TestAlignedPlanner { + + ReservationAgent agent; + InMemoryPlan plan; + Resource minAlloc = Resource.newInstance(1024, 1); + ResourceCalculator res = new DefaultResourceCalculator(); + Resource maxAlloc = Resource.newInstance(1024 * 8, 8); + Random rand = new Random(); + long step; + + @Test + public void testSingleReservationAccept() throws PlanningException { + + // Prepare basic plan + int numJobsInScenario = initializeScenario1(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 5 * step, // Job arrival time + 20 * step, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(2048, 2), // Capability + 10, // Num containers + 5, // Concurrency + 10 * step) }, // Duration + ReservationRequestInterpreter.R_ORDER, "u1"); + + // Add reservation + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + + // CHECK: allocation was accepted + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == numJobsInScenario + 1); + + // Get reservation + ReservationAllocation alloc1 = plan.getReservationById(reservationID); + + // Verify allocation + assertTrue(alloc1.toString(), + check(alloc1, 10 * step, 20 * step, 10, 2048, 2)); + + } + + @Test + public void testOrderNoGapImpossible() throws PlanningException { + + // Prepare basic plan + int numJobsInScenario = initializeScenario2(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10L, // Job arrival time + 15 * step, // Job deadline + new ReservationRequest[] { + ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + step), // Duration + ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + step) }, // Duration + ReservationRequestInterpreter.R_ORDER_NO_GAP, "u1"); + + // Add reservation + try { + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + fail(); + } catch (PlanningException e) { + // Expected failure + } + + // CHECK: allocation was not accepted + assertTrue("Agent-based allocation should have failed", plan + .getAllReservations().size() == numJobsInScenario); + + } + + @Test + public void testOrderNoGapImpossible2() throws PlanningException { + + // Prepare basic plan + int numJobsInScenario = initializeScenario2(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10 * step, // Job arrival time + 13 * step, // Job deadline + new ReservationRequest[] { + ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + step), // Duration + ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 10, // Num containers + 10, // Concurrency + step) }, // Duration + ReservationRequestInterpreter.R_ORDER_NO_GAP, "u1"); + + // Add reservation + try { + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + fail(); + } catch (PlanningException e) { + // Expected failure + } + + // CHECK: allocation was not accepted + assertTrue("Agent-based allocation should have failed", plan + .getAllReservations().size() == numJobsInScenario); + + } + + @Test + public void testOrderImpossible() throws PlanningException { + + // Prepare basic plan + int numJobsInScenario = initializeScenario2(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10 * step, // Job arrival time + 15 * step, // Job deadline + new ReservationRequest[] { + ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + 2 * step), // Duration + ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + step) }, // Duration + ReservationRequestInterpreter.R_ORDER, "u1"); + + // Add reservation + try { + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + fail(); + } catch (PlanningException e) { + // Expected failure + } + + // CHECK: allocation was not accepted + assertTrue("Agent-based allocation should have failed", plan + .getAllReservations().size() == numJobsInScenario); + + } + + @Test + public void testAnyImpossible() throws PlanningException { + + // Prepare basic plan + int numJobsInScenario = initializeScenario2(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10 * step, // Job arrival time + 15 * step, // Job deadline + new ReservationRequest[] { + ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + 3 * step), // Duration + ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + 2 * step) }, // Duration + ReservationRequestInterpreter.R_ANY, "u1"); + + // Add reservation + try { + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + fail(); + } catch (PlanningException e) { + // Expected failure + } + + // CHECK: allocation was not accepted + assertTrue("Agent-based allocation should have failed", plan + .getAllReservations().size() == numJobsInScenario); + + } + + @Test + public void testAnyAccept() throws PlanningException { + + // Prepare basic plan + int numJobsInScenario = initializeScenario2(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10 * step, // Job arrival time + 15 * step, // Job deadline + new ReservationRequest[] { + ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + step), // Duration + ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + 2 * step) }, // Duration + ReservationRequestInterpreter.R_ANY, "u1"); + + // Add reservation + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + + // CHECK: allocation was accepted + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == numJobsInScenario + 1); + + // Get reservation + ReservationAllocation alloc1 = plan.getReservationById(reservationID); + + // Verify allocation + assertTrue(alloc1.toString(), + check(alloc1, 14 * step, 15 * step, 20, 1024, 1)); + + } + + @Test + public void testAllAccept() throws PlanningException { + + // Prepare basic plan + int numJobsInScenario = initializeScenario2(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10 * step, // Job arrival time + 15 * step, // Job deadline + new ReservationRequest[] { + ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + step), // Duration + ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + step) }, // Duration + ReservationRequestInterpreter.R_ALL, "u1"); + + // Add reservation + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + + // CHECK: allocation was accepted + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == numJobsInScenario + 1); + + // Get reservation + ReservationAllocation alloc1 = plan.getReservationById(reservationID); + + // Verify allocation + assertTrue(alloc1.toString(), + check(alloc1, 10 * step, 11 * step, 20, 1024, 1)); + assertTrue(alloc1.toString(), + check(alloc1, 14 * step, 15 * step, 20, 1024, 1)); + + } + + @Test + public void testAllImpossible() throws PlanningException { + + // Prepare basic plan + int numJobsInScenario = initializeScenario2(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10 * step, // Job arrival time + 15 * step, // Job deadline + new ReservationRequest[] { + ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + step), // Duration + ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + 2 * step) }, // Duration + ReservationRequestInterpreter.R_ALL, "u1"); + + // Add reservation + try { + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + fail(); + } catch (PlanningException e) { + // Expected failure + } + + // CHECK: allocation was not accepted + assertTrue("Agent-based allocation should have failed", plan + .getAllReservations().size() == numJobsInScenario); + + } + + @Test + public void testUpdate() throws PlanningException { + + // Create flexible reservation + ReservationDefinition rrFlex = + createReservationDefinition( + 10 * step, // Job arrival time + 14 * step, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 100, // Num containers + 1, // Concurrency + 2 * step) }, // Duration + ReservationRequestInterpreter.R_ALL, "u1"); + + // Create blocking reservation + ReservationDefinition rrBlock = + createReservationDefinition( + 10 * step, // Job arrival time + 11 * step, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 100, // Num containers + 100, // Concurrency + step) }, // Duration + ReservationRequestInterpreter.R_ALL, "u1"); + + // Create reservation IDs + ReservationId flexReservationID = + ReservationSystemTestUtil.getNewReservationId(); + ReservationId blockReservationID = + ReservationSystemTestUtil.getNewReservationId(); + + // Add block, add flex, remove block, update flex + agent.createReservation(blockReservationID, "uBlock", plan, rrBlock); + agent.createReservation(flexReservationID, "uFlex", plan, rrFlex); + agent.deleteReservation(blockReservationID, "uBlock", plan); + agent.updateReservation(flexReservationID, "uFlex", plan, rrFlex); + + // CHECK: allocation was accepted + assertTrue("Agent-based allocation failed", flexReservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == 1); + + // Get reservation + ReservationAllocation alloc1 = plan.getReservationById(flexReservationID); + + // Verify allocation + assertTrue(alloc1.toString(), + check(alloc1, 10 * step, 14 * step, 50, 1024, 1)); + + } + + @Test + public void testImpossibleDuration() throws PlanningException { + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10 * step, // Job arrival time + 15 * step, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 20, // Num containers + 20, // Concurrency + 10 * step) }, // Duration + ReservationRequestInterpreter.R_ALL, "u1"); + + // Add reservation + try { + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + fail(); + } catch (PlanningException e) { + // Expected failure + } + + // CHECK: allocation was not accepted + assertTrue("Agent-based allocation should have failed", plan + .getAllReservations().size() == 0); + + } + + @Test + public void testLoadedDurationIntervals() throws PlanningException { + + int numJobsInScenario = initializeScenario3(); + + // Create reservation + ReservationDefinition rr1 = + createReservationDefinition( + 10 * step, // Job arrival time + 13 * step, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 80, // Num containers + 10, // Concurrency + step) }, // Duration + ReservationRequestInterpreter.R_ALL, "u1"); + + // Add reservation + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr1); + + // CHECK: allocation was accepted + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == numJobsInScenario + 1); + + // Get reservation + ReservationAllocation alloc1 = plan.getReservationById(reservationID); + + // Verify allocation + assertTrue(alloc1.toString(), + check(alloc1, 10 * step, 11 * step, 20, 1024, 1)); + assertTrue(alloc1.toString(), + check(alloc1, 11 * step, 12 * step, 20, 1024, 1)); + assertTrue(alloc1.toString(), + check(alloc1, 12 * step, 13 * step, 40, 1024, 1)); + } + + @Test + public void testCostFunction() throws PlanningException { + + // Create large memory reservation + ReservationDefinition rr7Mem1Core = + createReservationDefinition( + 10 * step, // Job arrival time + 11 * step, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(7 * 1024, 1),// Capability + 1, // Num containers + 1, // Concurrency + step) }, // Duration + ReservationRequestInterpreter.R_ALL, "u1"); + + // Create reservation + ReservationDefinition rr6Mem6Cores = + createReservationDefinition( + 10 * step, // Job arrival time + 11 * step, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(6 * 1024, 6),// Capability + 1, // Num containers + 1, // Concurrency + step) }, // Duration + ReservationRequestInterpreter.R_ALL, "u2"); + + // Create reservation + ReservationDefinition rr = + createReservationDefinition( + 10 * step, // Job arrival time + 12 * step, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 1, // Num containers + 1, // Concurrency + step) }, // Duration + ReservationRequestInterpreter.R_ALL, "u3"); + + // Create reservation IDs + ReservationId reservationID1 = + ReservationSystemTestUtil.getNewReservationId(); + ReservationId reservationID2 = + ReservationSystemTestUtil.getNewReservationId(); + ReservationId reservationID3 = + ReservationSystemTestUtil.getNewReservationId(); + + // Add all + agent.createReservation(reservationID1, "u1", plan, rr7Mem1Core); + agent.createReservation(reservationID2, "u2", plan, rr6Mem6Cores); + agent.createReservation(reservationID3, "u3", plan, rr); + + // Get reservation + ReservationAllocation alloc3 = plan.getReservationById(reservationID3); + + assertTrue(alloc3.toString(), + check(alloc3, 10 * step, 11 * step, 0, 1024, 1)); + assertTrue(alloc3.toString(), + check(alloc3, 11 * step, 12 * step, 1, 1024, 1)); + + } + + @Test + public void testFromCluster() throws PlanningException { + + // int numJobsInScenario = initializeScenario3(); + + List list = new ArrayList(); + + // Create reservation + list.add(createReservationDefinition( + 1425716392178L, // Job arrival time + 1425722262791L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 7, // Num containers + 1, // Concurrency + 587000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u1")); + + list.add(createReservationDefinition( + 1425716406178L, // Job arrival time + 1425721255841L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 6, // Num containers + 1, // Concurrency + 485000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u2")); + + list.add(createReservationDefinition( + 1425716399178L, // Job arrival time + 1425723780138L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 6, // Num containers + 1, // Concurrency + 738000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u3")); + + list.add(createReservationDefinition( + 1425716437178L, // Job arrival time + 1425722968378L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 7, // Num containers + 1, // Concurrency + 653000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u4")); + + list.add(createReservationDefinition( + 1425716406178L, // Job arrival time + 1425721926090L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 6, // Num containers + 1, // Concurrency + 552000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u5")); + + list.add(createReservationDefinition( + 1425716379178L, // Job arrival time + 1425722238553L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 6, // Num containers + 1, // Concurrency + 586000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u6")); + + list.add(createReservationDefinition( + 1425716407178L, // Job arrival time + 1425722908317L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 7, // Num containers + 1, // Concurrency + 650000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u7")); + + list.add(createReservationDefinition( + 1425716452178L, // Job arrival time + 1425722841562L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 6, // Num containers + 1, // Concurrency + 639000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u8")); + + list.add(createReservationDefinition( + 1425716384178L, // Job arrival time + 1425721766129L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 7, // Num containers + 1, // Concurrency + 538000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u9")); + + list.add(createReservationDefinition( + 1425716437178L, // Job arrival time + 1425722507886L, // Job deadline + new ReservationRequest[] { ReservationRequest.newInstance( + Resource.newInstance(1024, 1), // Capability + 5, // Num containers + 1, // Concurrency + 607000) }, // Duration + ReservationRequestInterpreter.R_ALL, "u10")); + + // Add reservation + int i = 1; + for (ReservationDefinition rr : list) { + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + agent.createReservation(reservationID, "u" + Integer.toString(i), plan, + rr); + ++i; + } + + // CHECK: allocation was accepted + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == list.size()); + + } + + @Before + public void setup() throws Exception { + + // Initialize random seed + long seed = rand.nextLong(); + rand.setSeed(seed); + Log.info("Running with seed: " + seed); + + // Set cluster parameters + long timeWindow = 1000000L; + int capacityMem = 100 * 1024; + int capacityCores = 100; + step = 60000L; + + Resource clusterCapacity = Resource.newInstance(capacityMem, capacityCores); + + // Set configuration + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + String reservationQ = testUtil.getFullReservationQueueName(); + float instConstraint = 100; + float avgConstraint = 100; + + ReservationSchedulerConfiguration conf = + ReservationSystemTestUtil.createConf(reservationQ, timeWindow, + instConstraint, avgConstraint); + + CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); + policy.init(reservationQ, conf); + + QueueMetrics queueMetrics = mock(QueueMetrics.class); + + // Set planning agent + agent = new AlignedPlannerWithGreedy(); + + // Create Plan + plan = + new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step, + res, minAlloc, maxAlloc, "dedicated", null, true); + } + + private int initializeScenario1() throws PlanningException { + + // insert in the reservation a couple of controlled reservations, to create + // conditions for assignment that are non-empty + + addFixedAllocation(0L, step, new int[] { 10, 10, 20, 20, 20, 10, 10 }); + + System.out.println("--------BEFORE AGENT----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + return 1; + + } + + private int initializeScenario2() throws PlanningException { + + // insert in the reservation a couple of controlled reservations, to create + // conditions for assignment that are non-empty + + addFixedAllocation(11 * step, step, new int[] { 90, 90, 90 }); + + System.out.println("--------BEFORE AGENT----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + return 1; + + } + + private int initializeScenario3() throws PlanningException { + + // insert in the reservation a couple of controlled reservations, to create + // conditions for assignment that are non-empty + + addFixedAllocation(10 * step, step, new int[] { 70, 80, 60 }); + + System.out.println("--------BEFORE AGENT----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + return 1; + + } + + private void addFixedAllocation(long start, long step, int[] f) + throws PlanningException { + + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, + "user_fixed", "dedicated", start, start + f.length * step, + ReservationSystemTestUtil.generateAllocation(start, step, f), res, + minAlloc))); + + } + + private ReservationDefinition createReservationDefinition(long arrival, + long deadline, ReservationRequest[] reservationRequests, + ReservationRequestInterpreter rType, String username) { + + return ReservationDefinition.newInstance(arrival, deadline, + ReservationRequests.newInstance(Arrays.asList(reservationRequests), + rType), username); + + } + + private boolean check(ReservationAllocation alloc, long start, long end, + int containers, int mem, int cores) { + + Resource expectedResources = + Resource.newInstance(mem * containers, cores * containers); + + // Verify that all allocations in [start,end) equal containers * (mem,cores) + for (long i = start; i < end; i++) { + if (!Resources.equals(alloc.getResourcesAtTime(i), expectedResources)) { + return false; + } + } + return true; + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java similarity index 97% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java index de94dcd87e1..bd18a2f4543 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestGreedyReservationAgent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ -package org.apache.hadoop.yarn.server.resourcemanager.reservation; +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -37,6 +37,13 @@ import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java similarity index 90% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java index d4a97bacff8..aeb1e6a760c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ -package org.apache.hadoop.yarn.server.resourcemanager.reservation; +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -30,6 +30,14 @@ import java.util.TreeMap; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.NoOverCommitPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.SharingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.util.Clock;