YARN-2998. Abstract out scheduler independent PlanFollower components. (Anubhav Dhoot via kasha)
(cherry picked from commit e7257acd8a
)
This commit is contained in:
parent
cde5bfe3ec
commit
798ab51289
|
@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoSchedule
|
||||||
import org.apache.hadoop.yarn.sls.SLSRunner;
|
import org.apache.hadoop.yarn.sls.SLSRunner;
|
||||||
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
|
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
|
||||||
import org.apache.hadoop.yarn.sls.web.SLSWebApp;
|
import org.apache.hadoop.yarn.sls.web.SLSWebApp;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
|
@ -867,6 +868,11 @@ public class ResourceSchedulerWrapper
|
||||||
return scheduler.getMaximumResourceCapability();
|
return scheduler.getMaximumResourceCapability();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResourceCalculator getResourceCalculator() {
|
||||||
|
return scheduler.getResourceCalculator();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getNumClusterNodes() {
|
public int getNumClusterNodes() {
|
||||||
return scheduler.getNumClusterNodes();
|
return scheduler.getNumClusterNodes();
|
||||||
|
|
|
@ -38,6 +38,7 @@ Release 2.7.0 - UNRELEASED
|
||||||
YARN-2738. [YARN-2574] Add FairReservationSystem for FairScheduler.
|
YARN-2738. [YARN-2574] Add FairReservationSystem for FairScheduler.
|
||||||
(Anubhav Dhoot via kasha)
|
(Anubhav Dhoot via kasha)
|
||||||
|
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
YARN-2950. Change message to mandate, not suggest JS requirement on UI.
|
YARN-2950. Change message to mandate, not suggest JS requirement on UI.
|
||||||
|
@ -125,6 +126,10 @@ Release 2.7.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-2943. Added node-labels page on RM web UI. (Wangda Tan via jianhe)
|
YARN-2943. Added node-labels page on RM web UI. (Wangda Tan via jianhe)
|
||||||
|
|
||||||
|
YARN-2998. Abstract out scheduler independent PlanFollower components.
|
||||||
|
(Anubhav Dhoot via kasha)
|
||||||
|
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -0,0 +1,412 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
|
||||||
|
private static final Logger LOG = LoggerFactory
|
||||||
|
.getLogger(CapacitySchedulerPlanFollower.class);
|
||||||
|
|
||||||
|
protected Collection<Plan> plans = new ArrayList<Plan>();
|
||||||
|
protected YarnScheduler scheduler;
|
||||||
|
protected Clock clock;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans) {
|
||||||
|
this.clock = clock;
|
||||||
|
this.scheduler = sched;
|
||||||
|
this.plans.addAll(plans);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void run() {
|
||||||
|
for (Plan plan : plans) {
|
||||||
|
synchronizePlan(plan);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void setPlans(Collection<Plan> plans) {
|
||||||
|
this.plans.clear();
|
||||||
|
this.plans.addAll(plans);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void synchronizePlan(Plan plan) {
|
||||||
|
String planQueueName = plan.getQueueName();
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Running plan follower edit policy for plan: " + planQueueName);
|
||||||
|
}
|
||||||
|
// align with plan step
|
||||||
|
long step = plan.getStep();
|
||||||
|
long now = clock.getTime();
|
||||||
|
if (now % step != 0) {
|
||||||
|
now += step - (now % step);
|
||||||
|
}
|
||||||
|
Queue planQueue = getPlanQueue(planQueueName);
|
||||||
|
if (planQueue == null) return;
|
||||||
|
|
||||||
|
// first we publish to the plan the current availability of resources
|
||||||
|
Resource clusterResources = scheduler.getClusterResource();
|
||||||
|
Resource planResources = getPlanResources(plan, planQueue,
|
||||||
|
clusterResources);
|
||||||
|
|
||||||
|
Set<ReservationAllocation> currentReservations =
|
||||||
|
plan.getReservationsAtTime(now);
|
||||||
|
Set<String> curReservationNames = new HashSet<String>();
|
||||||
|
Resource reservedResources = Resource.newInstance(0, 0);
|
||||||
|
int numRes = getReservedResources(now, currentReservations,
|
||||||
|
curReservationNames, reservedResources);
|
||||||
|
|
||||||
|
// create the default reservation queue if it doesnt exist
|
||||||
|
String defReservationId = getReservationIdFromQueueName(planQueueName) +
|
||||||
|
PlanQueue.DEFAULT_QUEUE_SUFFIX;
|
||||||
|
String defReservationQueue = getReservationQueueName(planQueueName,
|
||||||
|
defReservationId);
|
||||||
|
createDefaultReservationQueue(planQueueName, planQueue,
|
||||||
|
defReservationId);
|
||||||
|
curReservationNames.add(defReservationId);
|
||||||
|
|
||||||
|
// if the resources dedicated to this plan has shrunk invoke replanner
|
||||||
|
if (arePlanResourcesLessThanReservations(clusterResources, planResources,
|
||||||
|
reservedResources)) {
|
||||||
|
try {
|
||||||
|
plan.getReplanner().plan(plan, null);
|
||||||
|
} catch (PlanningException e) {
|
||||||
|
LOG.warn("Exception while trying to replan: {}", planQueueName, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// identify the reservations that have expired and new reservations that
|
||||||
|
// have to be activated
|
||||||
|
List<? extends Queue> resQueues = getChildReservationQueues(planQueue);
|
||||||
|
Set<String> expired = new HashSet<String>();
|
||||||
|
for (Queue resQueue : resQueues) {
|
||||||
|
String resQueueName = resQueue.getQueueName();
|
||||||
|
String reservationId = getReservationIdFromQueueName(resQueueName);
|
||||||
|
if (curReservationNames.contains(reservationId)) {
|
||||||
|
// it is already existing reservation, so needed not create new
|
||||||
|
// reservation queue
|
||||||
|
curReservationNames.remove(reservationId);
|
||||||
|
} else {
|
||||||
|
// the reservation has termination, mark for cleanup
|
||||||
|
expired.add(reservationId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// garbage collect expired reservations
|
||||||
|
cleanupExpiredQueues(planQueueName, plan.getMoveOnExpiry(), expired,
|
||||||
|
defReservationQueue);
|
||||||
|
|
||||||
|
// Add new reservations and update existing ones
|
||||||
|
float totalAssignedCapacity = 0f;
|
||||||
|
if (currentReservations != null) {
|
||||||
|
// first release all excess capacity in default queue
|
||||||
|
try {
|
||||||
|
setQueueEntitlement(planQueueName, defReservationQueue, 0f, 1.0f);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
LOG.warn(
|
||||||
|
"Exception while trying to release default queue capacity for plan: {}",
|
||||||
|
planQueueName, e);
|
||||||
|
}
|
||||||
|
// sort allocations from the one giving up the most resources, to the
|
||||||
|
// one asking for the most
|
||||||
|
// avoid order-of-operation errors that temporarily violate 100%
|
||||||
|
// capacity bound
|
||||||
|
List<ReservationAllocation> sortedAllocations =
|
||||||
|
sortByDelta(
|
||||||
|
new ArrayList<ReservationAllocation>(currentReservations), now,
|
||||||
|
plan);
|
||||||
|
for (ReservationAllocation res : sortedAllocations) {
|
||||||
|
String currResId = res.getReservationId().toString();
|
||||||
|
if (curReservationNames.contains(currResId)) {
|
||||||
|
addReservationQueue(planQueueName, planQueue, currResId);
|
||||||
|
}
|
||||||
|
Resource capToAssign = res.getResourcesAtTime(now);
|
||||||
|
float targetCapacity = 0f;
|
||||||
|
if (planResources.getMemory() > 0
|
||||||
|
&& planResources.getVirtualCores() > 0) {
|
||||||
|
targetCapacity =
|
||||||
|
calculateReservationToPlanRatio(clusterResources,
|
||||||
|
planResources,
|
||||||
|
capToAssign);
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(
|
||||||
|
"Assigning capacity of {} to queue {} with target capacity {}",
|
||||||
|
capToAssign, currResId, targetCapacity);
|
||||||
|
}
|
||||||
|
// set maxCapacity to 100% unless the job requires gang, in which
|
||||||
|
// case we stick to capacity (as running early/before is likely a
|
||||||
|
// waste of resources)
|
||||||
|
float maxCapacity = 1.0f;
|
||||||
|
if (res.containsGangs()) {
|
||||||
|
maxCapacity = targetCapacity;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
setQueueEntitlement(planQueueName, currResId, targetCapacity, maxCapacity);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
LOG.warn("Exception while trying to size reservation for plan: {}",
|
||||||
|
currResId, planQueueName, e);
|
||||||
|
}
|
||||||
|
totalAssignedCapacity += targetCapacity;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// compute the default queue capacity
|
||||||
|
float defQCap = 1.0f - totalAssignedCapacity;
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} "
|
||||||
|
+ "currReservation: {} default-queue capacity: {}", planResources,
|
||||||
|
numRes, defQCap);
|
||||||
|
}
|
||||||
|
// set the default queue to eat-up all remaining capacity
|
||||||
|
try {
|
||||||
|
setQueueEntitlement(planQueueName, defReservationQueue, defQCap, 1.0f);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
LOG.warn(
|
||||||
|
"Exception while trying to reclaim default queue capacity for plan: {}",
|
||||||
|
planQueueName, e);
|
||||||
|
}
|
||||||
|
// garbage collect finished reservations from plan
|
||||||
|
try {
|
||||||
|
plan.archiveCompletedReservations(now);
|
||||||
|
} catch (PlanningException e) {
|
||||||
|
LOG.error("Exception in archiving completed reservations: ", e);
|
||||||
|
}
|
||||||
|
LOG.info("Finished iteration of plan follower edit policy for plan: "
|
||||||
|
+ planQueueName);
|
||||||
|
|
||||||
|
// Extension: update plan with app states,
|
||||||
|
// useful to support smart replanning
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getReservationIdFromQueueName(String resQueueName) {
|
||||||
|
return resQueueName;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setQueueEntitlement(String planQueueName, String currResId,
|
||||||
|
float targetCapacity,
|
||||||
|
float maxCapacity) throws YarnException {
|
||||||
|
String reservationQueueName = getReservationQueueName(planQueueName,
|
||||||
|
currResId);
|
||||||
|
scheduler.setEntitlement(reservationQueueName, new QueueEntitlement(
|
||||||
|
targetCapacity, maxCapacity));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Schedulers have different ways of naming queues. See YARN-2773
|
||||||
|
protected String getReservationQueueName(String planQueueName,
|
||||||
|
String reservationId) {
|
||||||
|
return reservationId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* First sets entitlement of queues to zero to prevent new app submission.
|
||||||
|
* Then move all apps in the set of queues to the parent plan queue's default
|
||||||
|
* reservation queue if move is enabled. Finally cleanups the queue by killing
|
||||||
|
* any apps (if move is disabled or move failed) and removing the queue
|
||||||
|
*/
|
||||||
|
protected void cleanupExpiredQueues(String planQueueName,
|
||||||
|
boolean shouldMove, Set<String> toRemove, String defReservationQueue) {
|
||||||
|
for (String expiredReservationId : toRemove) {
|
||||||
|
try {
|
||||||
|
// reduce entitlement to 0
|
||||||
|
String expiredReservation = getReservationQueueName(planQueueName,
|
||||||
|
expiredReservationId);
|
||||||
|
setQueueEntitlement(planQueueName, expiredReservation, 0.0f, 0.0f);
|
||||||
|
if (shouldMove) {
|
||||||
|
moveAppsInQueueSync(expiredReservation, defReservationQueue);
|
||||||
|
}
|
||||||
|
if (scheduler.getAppsInQueue(expiredReservation).size() > 0) {
|
||||||
|
scheduler.killAllAppsInQueue(expiredReservation);
|
||||||
|
LOG.info("Killing applications in queue: {}", expiredReservation);
|
||||||
|
} else {
|
||||||
|
scheduler.removeQueue(expiredReservation);
|
||||||
|
LOG.info("Queue: " + expiredReservation + " removed");
|
||||||
|
}
|
||||||
|
} catch (YarnException e) {
|
||||||
|
LOG.warn("Exception while trying to expire reservation: {}",
|
||||||
|
expiredReservationId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Move all apps in the set of queues to the parent plan queue's default
|
||||||
|
* reservation queue in a synchronous fashion
|
||||||
|
*/
|
||||||
|
private void moveAppsInQueueSync(String expiredReservation,
|
||||||
|
String defReservationQueue) {
|
||||||
|
List<ApplicationAttemptId> activeApps =
|
||||||
|
scheduler.getAppsInQueue(expiredReservation);
|
||||||
|
if (activeApps.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (ApplicationAttemptId app : activeApps) {
|
||||||
|
// fallback to parent's default queue
|
||||||
|
try {
|
||||||
|
scheduler.moveApplication(app.getApplicationId(), defReservationQueue);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
LOG.warn(
|
||||||
|
"Encountered unexpected error during migration of application: {}" +
|
||||||
|
" from reservation: {}",
|
||||||
|
app, expiredReservation, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected int getReservedResources(long now, Set<ReservationAllocation>
|
||||||
|
currentReservations, Set<String> curReservationNames,
|
||||||
|
Resource reservedResources) {
|
||||||
|
int numRes = 0;
|
||||||
|
if (currentReservations != null) {
|
||||||
|
numRes = currentReservations.size();
|
||||||
|
for (ReservationAllocation reservation : currentReservations) {
|
||||||
|
curReservationNames.add(reservation.getReservationId().toString());
|
||||||
|
Resources.addTo(reservedResources, reservation.getResourcesAtTime(now));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return numRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sort in the order from the least new amount of resources asked (likely
|
||||||
|
* negative) to the highest. This prevents "order-of-operation" errors related
|
||||||
|
* to exceeding 100% capacity temporarily.
|
||||||
|
*/
|
||||||
|
protected List<ReservationAllocation> sortByDelta(
|
||||||
|
List<ReservationAllocation> currentReservations, long now, Plan plan) {
|
||||||
|
Collections.sort(currentReservations, new ReservationAllocationComparator(
|
||||||
|
now, this, plan));
|
||||||
|
return currentReservations;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get queue associated with reservable queue named
|
||||||
|
* @param planQueueName Name of the reservable queue
|
||||||
|
* @return queue associated with the reservable queue
|
||||||
|
*/
|
||||||
|
protected abstract Queue getPlanQueue(String planQueueName);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculates ratio of reservationResources to planResources
|
||||||
|
*/
|
||||||
|
protected abstract float calculateReservationToPlanRatio(
|
||||||
|
Resource clusterResources, Resource planResources,
|
||||||
|
Resource reservationResources);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if plan resources are less than expected reservation resources
|
||||||
|
*/
|
||||||
|
protected abstract boolean arePlanResourcesLessThanReservations(
|
||||||
|
Resource clusterResources, Resource planResources,
|
||||||
|
Resource reservedResources);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a list of reservation queues for this planQueue
|
||||||
|
*/
|
||||||
|
protected abstract List<? extends Queue> getChildReservationQueues(
|
||||||
|
Queue planQueue);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a new reservation queue for reservation currResId for this planQueue
|
||||||
|
*/
|
||||||
|
protected abstract void addReservationQueue(
|
||||||
|
String planQueueName, Queue queue, String currResId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the default reservation queue for use when no reservation is
|
||||||
|
* used for applications submitted to this planQueue
|
||||||
|
*/
|
||||||
|
protected abstract void createDefaultReservationQueue(
|
||||||
|
String planQueueName, Queue queue, String defReservationQueue);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get plan resources for this planQueue
|
||||||
|
*/
|
||||||
|
protected abstract Resource getPlanResources(
|
||||||
|
Plan plan, Queue queue, Resource clusterResources);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get reservation queue resources if it exists otherwise return null
|
||||||
|
*/
|
||||||
|
protected abstract Resource getReservationQueueResourceIfExists(Plan plan,
|
||||||
|
ReservationId reservationId);
|
||||||
|
|
||||||
|
private static class ReservationAllocationComparator implements
|
||||||
|
Comparator<ReservationAllocation> {
|
||||||
|
AbstractSchedulerPlanFollower planFollower;
|
||||||
|
long now;
|
||||||
|
Plan plan;
|
||||||
|
|
||||||
|
ReservationAllocationComparator(long now,
|
||||||
|
AbstractSchedulerPlanFollower planFollower, Plan plan) {
|
||||||
|
this.now = now;
|
||||||
|
this.planFollower = planFollower;
|
||||||
|
this.plan = plan;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Resource getUnallocatedReservedResources(
|
||||||
|
ReservationAllocation reservation) {
|
||||||
|
Resource resResource;
|
||||||
|
Resource reservationResource = planFollower
|
||||||
|
.getReservationQueueResourceIfExists
|
||||||
|
(plan, reservation.getReservationId());
|
||||||
|
if (reservationResource != null) {
|
||||||
|
resResource =
|
||||||
|
Resources.subtract(
|
||||||
|
reservation.getResourcesAtTime(now),
|
||||||
|
reservationResource);
|
||||||
|
} else {
|
||||||
|
resResource = reservation.getResourcesAtTime(now);
|
||||||
|
}
|
||||||
|
return resResource;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compare(ReservationAllocation lhs, ReservationAllocation rhs) {
|
||||||
|
// compute delta between current and previous reservation, and compare
|
||||||
|
// based on that
|
||||||
|
Resource lhsRes = getUnallocatedReservedResources(lhs);
|
||||||
|
Resource rhsRes = getUnallocatedReservedResources(rhs);
|
||||||
|
return lhsRes.compareTo(rhsRes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -19,26 +19,19 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ReservationQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ReservationQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -58,319 +51,119 @@ import org.slf4j.LoggerFactory;
|
||||||
* differences among existing queues). This makes it resilient to frequency of
|
* differences among existing queues). This makes it resilient to frequency of
|
||||||
* synchronization, and RM restart issues (no "catch up" is necessary).
|
* synchronization, and RM restart issues (no "catch up" is necessary).
|
||||||
*/
|
*/
|
||||||
public class CapacitySchedulerPlanFollower implements PlanFollower {
|
public class CapacitySchedulerPlanFollower extends AbstractSchedulerPlanFollower {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory
|
private static final Logger LOG = LoggerFactory
|
||||||
.getLogger(CapacitySchedulerPlanFollower.class);
|
.getLogger(CapacitySchedulerPlanFollower.class);
|
||||||
|
|
||||||
private Collection<Plan> plans = new ArrayList<Plan>();
|
private CapacityScheduler cs;
|
||||||
|
|
||||||
private Clock clock;
|
|
||||||
private CapacityScheduler scheduler;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans) {
|
public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans) {
|
||||||
|
super.init(clock, sched, plans);
|
||||||
LOG.info("Initializing Plan Follower Policy:"
|
LOG.info("Initializing Plan Follower Policy:"
|
||||||
+ this.getClass().getCanonicalName());
|
+ this.getClass().getCanonicalName());
|
||||||
if (!(sched instanceof CapacityScheduler)) {
|
if (!(sched instanceof CapacityScheduler)) {
|
||||||
throw new YarnRuntimeException(
|
throw new YarnRuntimeException(
|
||||||
"CapacitySchedulerPlanFollower can only work with CapacityScheduler");
|
"CapacitySchedulerPlanFollower can only work with CapacityScheduler");
|
||||||
}
|
}
|
||||||
this.clock = clock;
|
this.cs = (CapacityScheduler) sched;
|
||||||
this.scheduler = (CapacityScheduler) sched;
|
|
||||||
this.plans.addAll(plans);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void run() {
|
protected Queue getPlanQueue(String planQueueName) {
|
||||||
for (Plan plan : plans) {
|
CSQueue queue = cs.getQueue(planQueueName);
|
||||||
synchronizePlan(plan);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void synchronizePlan(Plan plan) {
|
|
||||||
String planQueueName = plan.getQueueName();
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Running plan follower edit policy for plan: " + planQueueName);
|
|
||||||
}
|
|
||||||
// align with plan step
|
|
||||||
long step = plan.getStep();
|
|
||||||
long now = clock.getTime();
|
|
||||||
if (now % step != 0) {
|
|
||||||
now += step - (now % step);
|
|
||||||
}
|
|
||||||
CSQueue queue = scheduler.getQueue(planQueueName);
|
|
||||||
if (!(queue instanceof PlanQueue)) {
|
if (!(queue instanceof PlanQueue)) {
|
||||||
LOG.error("The Plan is not an PlanQueue!");
|
LOG.error("The Plan is not an PlanQueue!");
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected float calculateReservationToPlanRatio(
|
||||||
|
Resource clusterResources, Resource planResources,
|
||||||
|
Resource reservationResources) {
|
||||||
|
return Resources.divide(cs.getResourceCalculator(),
|
||||||
|
clusterResources, reservationResources, planResources);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean arePlanResourcesLessThanReservations(
|
||||||
|
Resource clusterResources, Resource planResources,
|
||||||
|
Resource reservedResources) {
|
||||||
|
return Resources.greaterThan(cs.getResourceCalculator(),
|
||||||
|
clusterResources, reservedResources, planResources);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<? extends Queue> getChildReservationQueues(Queue queue) {
|
||||||
|
PlanQueue planQueue = (PlanQueue)queue;
|
||||||
|
List<CSQueue> childQueues = planQueue.getChildQueues();
|
||||||
|
return childQueues;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void addReservationQueue(
|
||||||
|
String planQueueName, Queue queue, String currResId) {
|
||||||
|
PlanQueue planQueue = (PlanQueue)queue;
|
||||||
|
try {
|
||||||
|
ReservationQueue resQueue =
|
||||||
|
new ReservationQueue(cs, currResId, planQueue);
|
||||||
|
cs.addQueue(resQueue);
|
||||||
|
} catch (SchedulerDynamicEditException e) {
|
||||||
|
LOG.warn(
|
||||||
|
"Exception while trying to activate reservation: {} for plan: {}",
|
||||||
|
currResId, planQueueName, e);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn(
|
||||||
|
"Exception while trying to activate reservation: {} for plan: {}",
|
||||||
|
currResId, planQueueName, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void createDefaultReservationQueue(
|
||||||
|
String planQueueName, Queue queue, String defReservationId) {
|
||||||
|
PlanQueue planQueue = (PlanQueue)queue;
|
||||||
|
if (cs.getQueue(defReservationId) == null) {
|
||||||
|
try {
|
||||||
|
ReservationQueue defQueue =
|
||||||
|
new ReservationQueue(cs, defReservationId, planQueue);
|
||||||
|
cs.addQueue(defQueue);
|
||||||
|
} catch (SchedulerDynamicEditException e) {
|
||||||
|
LOG.warn(
|
||||||
|
"Exception while trying to create default reservation queue for plan: {}",
|
||||||
|
planQueueName, e);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn(
|
||||||
|
"Exception while trying to create default reservation queue for " +
|
||||||
|
"plan: {}",
|
||||||
|
planQueueName, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Resource getPlanResources(
|
||||||
|
Plan plan, Queue queue, Resource clusterResources) {
|
||||||
PlanQueue planQueue = (PlanQueue)queue;
|
PlanQueue planQueue = (PlanQueue)queue;
|
||||||
// first we publish to the plan the current availability of resources
|
|
||||||
Resource clusterResources = scheduler.getClusterResource();
|
|
||||||
float planAbsCap = planQueue.getAbsoluteCapacity();
|
float planAbsCap = planQueue.getAbsoluteCapacity();
|
||||||
Resource planResources = Resources.multiply(clusterResources, planAbsCap);
|
Resource planResources = Resources.multiply(clusterResources, planAbsCap);
|
||||||
plan.setTotalCapacity(planResources);
|
plan.setTotalCapacity(planResources);
|
||||||
|
return planResources;
|
||||||
Set<ReservationAllocation> currentReservations =
|
|
||||||
plan.getReservationsAtTime(now);
|
|
||||||
Set<String> curReservationNames = new HashSet<String>();
|
|
||||||
Resource reservedResources = Resource.newInstance(0, 0);
|
|
||||||
int numRes = 0;
|
|
||||||
if (currentReservations != null) {
|
|
||||||
numRes = currentReservations.size();
|
|
||||||
for (ReservationAllocation reservation : currentReservations) {
|
|
||||||
curReservationNames.add(reservation.getReservationId().toString());
|
|
||||||
Resources.addTo(reservedResources, reservation.getResourcesAtTime(now));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// create the default reservation queue if it doesnt exist
|
|
||||||
String defReservationQueue = planQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX;
|
|
||||||
if (scheduler.getQueue(defReservationQueue) == null) {
|
|
||||||
try {
|
|
||||||
ReservationQueue defQueue =
|
|
||||||
new ReservationQueue(scheduler, defReservationQueue, planQueue);
|
|
||||||
scheduler.addQueue(defQueue);
|
|
||||||
} catch (SchedulerDynamicEditException e) {
|
|
||||||
LOG.warn(
|
|
||||||
"Exception while trying to create default reservation queue for plan: {}",
|
|
||||||
planQueueName, e);
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn(
|
|
||||||
"Exception while trying to create default reservation queue for plan: {}",
|
|
||||||
planQueueName, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
curReservationNames.add(defReservationQueue);
|
|
||||||
// if the resources dedicated to this plan has shrunk invoke replanner
|
|
||||||
if (Resources.greaterThan(scheduler.getResourceCalculator(),
|
|
||||||
clusterResources, reservedResources, planResources)) {
|
|
||||||
try {
|
|
||||||
plan.getReplanner().plan(plan, null);
|
|
||||||
} catch (PlanningException e) {
|
|
||||||
LOG.warn("Exception while trying to replan: {}", planQueueName, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// identify the reservations that have expired and new reservations that
|
|
||||||
// have to be activated
|
|
||||||
List<CSQueue> resQueues = planQueue.getChildQueues();
|
|
||||||
Set<String> expired = new HashSet<String>();
|
|
||||||
for (CSQueue resQueue : resQueues) {
|
|
||||||
String resQueueName = resQueue.getQueueName();
|
|
||||||
if (curReservationNames.contains(resQueueName)) {
|
|
||||||
// it is already existing reservation, so needed not create new
|
|
||||||
// reservation queue
|
|
||||||
curReservationNames.remove(resQueueName);
|
|
||||||
} else {
|
|
||||||
// the reservation has termination, mark for cleanup
|
|
||||||
expired.add(resQueueName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// garbage collect expired reservations
|
|
||||||
cleanupExpiredQueues(plan.getMoveOnExpiry(), expired, defReservationQueue);
|
|
||||||
|
|
||||||
// Add new reservations and update existing ones
|
|
||||||
float totalAssignedCapacity = 0f;
|
|
||||||
if (currentReservations != null) {
|
|
||||||
// first release all excess capacity in default queue
|
|
||||||
try {
|
|
||||||
scheduler.setEntitlement(defReservationQueue, new QueueEntitlement(0f,
|
|
||||||
1.0f));
|
|
||||||
} catch (YarnException e) {
|
|
||||||
LOG.warn(
|
|
||||||
"Exception while trying to release default queue capacity for plan: {}",
|
|
||||||
planQueueName, e);
|
|
||||||
}
|
|
||||||
// sort allocations from the one giving up the most resources, to the
|
|
||||||
// one asking for the most
|
|
||||||
// avoid order-of-operation errors that temporarily violate 100%
|
|
||||||
// capacity bound
|
|
||||||
List<ReservationAllocation> sortedAllocations =
|
|
||||||
sortByDelta(
|
|
||||||
new ArrayList<ReservationAllocation>(currentReservations), now);
|
|
||||||
for (ReservationAllocation res : sortedAllocations) {
|
|
||||||
String currResId = res.getReservationId().toString();
|
|
||||||
if (curReservationNames.contains(currResId)) {
|
|
||||||
try {
|
|
||||||
ReservationQueue resQueue =
|
|
||||||
new ReservationQueue(scheduler, currResId, planQueue);
|
|
||||||
scheduler.addQueue(resQueue);
|
|
||||||
} catch (SchedulerDynamicEditException e) {
|
|
||||||
LOG.warn(
|
|
||||||
"Exception while trying to activate reservation: {} for plan: {}",
|
|
||||||
currResId, planQueueName, e);
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn(
|
|
||||||
"Exception while trying to activate reservation: {} for plan: {}",
|
|
||||||
currResId, planQueueName, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Resource capToAssign = res.getResourcesAtTime(now);
|
|
||||||
float targetCapacity = 0f;
|
|
||||||
if (planResources.getMemory() > 0
|
|
||||||
&& planResources.getVirtualCores() > 0) {
|
|
||||||
targetCapacity =
|
|
||||||
Resources.divide(scheduler.getResourceCalculator(),
|
|
||||||
clusterResources, capToAssign, planResources);
|
|
||||||
}
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(
|
|
||||||
"Assigning capacity of {} to queue {} with target capacity {}",
|
|
||||||
capToAssign, currResId, targetCapacity);
|
|
||||||
}
|
|
||||||
// set maxCapacity to 100% unless the job requires gang, in which
|
|
||||||
// case we stick to capacity (as running early/before is likely a
|
|
||||||
// waste of resources)
|
|
||||||
float maxCapacity = 1.0f;
|
|
||||||
if (res.containsGangs()) {
|
|
||||||
maxCapacity = targetCapacity;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
scheduler.setEntitlement(currResId, new QueueEntitlement(
|
|
||||||
targetCapacity, maxCapacity));
|
|
||||||
} catch (YarnException e) {
|
|
||||||
LOG.warn("Exception while trying to size reservation for plan: {}",
|
|
||||||
currResId, planQueueName, e);
|
|
||||||
}
|
|
||||||
totalAssignedCapacity += targetCapacity;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// compute the default queue capacity
|
|
||||||
float defQCap = 1.0f - totalAssignedCapacity;
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} "
|
|
||||||
+ "currReservation: {} default-queue capacity: {}", planResources,
|
|
||||||
numRes, defQCap);
|
|
||||||
}
|
|
||||||
// set the default queue to eat-up all remaining capacity
|
|
||||||
try {
|
|
||||||
scheduler.setEntitlement(defReservationQueue, new QueueEntitlement(
|
|
||||||
defQCap, 1.0f));
|
|
||||||
} catch (YarnException e) {
|
|
||||||
LOG.warn(
|
|
||||||
"Exception while trying to reclaim default queue capacity for plan: {}",
|
|
||||||
planQueueName, e);
|
|
||||||
}
|
|
||||||
// garbage collect finished reservations from plan
|
|
||||||
try {
|
|
||||||
plan.archiveCompletedReservations(now);
|
|
||||||
} catch (PlanningException e) {
|
|
||||||
LOG.error("Exception in archiving completed reservations: ", e);
|
|
||||||
}
|
|
||||||
LOG.info("Finished iteration of plan follower edit policy for plan: "
|
|
||||||
+ planQueueName);
|
|
||||||
|
|
||||||
// Extension: update plan with app states,
|
|
||||||
// useful to support smart replanning
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Move all apps in the set of queues to the parent plan queue's default
|
|
||||||
* reservation queue in a synchronous fashion
|
|
||||||
*/
|
|
||||||
private void moveAppsInQueueSync(String expiredReservation,
|
|
||||||
String defReservationQueue) {
|
|
||||||
List<ApplicationAttemptId> activeApps =
|
|
||||||
scheduler.getAppsInQueue(expiredReservation);
|
|
||||||
if (activeApps.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
for (ApplicationAttemptId app : activeApps) {
|
|
||||||
// fallback to parent's default queue
|
|
||||||
try {
|
|
||||||
scheduler.moveApplication(app.getApplicationId(), defReservationQueue);
|
|
||||||
} catch (YarnException e) {
|
|
||||||
LOG.warn(
|
|
||||||
"Encountered unexpected error during migration of application: {} from reservation: {}",
|
|
||||||
app, expiredReservation, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* First sets entitlement of queues to zero to prevent new app submission.
|
|
||||||
* Then move all apps in the set of queues to the parent plan queue's default
|
|
||||||
* reservation queue if move is enabled. Finally cleanups the queue by killing
|
|
||||||
* any apps (if move is disabled or move failed) and removing the queue
|
|
||||||
*/
|
|
||||||
private void cleanupExpiredQueues(boolean shouldMove, Set<String> toRemove,
|
|
||||||
String defReservationQueue) {
|
|
||||||
for (String expiredReservation : toRemove) {
|
|
||||||
try {
|
|
||||||
// reduce entitlement to 0
|
|
||||||
scheduler.setEntitlement(expiredReservation, new QueueEntitlement(0.0f,
|
|
||||||
0.0f));
|
|
||||||
if (shouldMove) {
|
|
||||||
moveAppsInQueueSync(expiredReservation, defReservationQueue);
|
|
||||||
}
|
|
||||||
if (scheduler.getAppsInQueue(expiredReservation).size() > 0) {
|
|
||||||
scheduler.killAllAppsInQueue(expiredReservation);
|
|
||||||
LOG.info("Killing applications in queue: {}", expiredReservation);
|
|
||||||
} else {
|
|
||||||
scheduler.removeQueue(expiredReservation);
|
|
||||||
LOG.info("Queue: " + expiredReservation + " removed");
|
|
||||||
}
|
|
||||||
} catch (YarnException e) {
|
|
||||||
LOG.warn("Exception while trying to expire reservation: {}",
|
|
||||||
expiredReservation, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void setPlans(Collection<Plan> plans) {
|
protected Resource getReservationQueueResourceIfExists(Plan plan,
|
||||||
this.plans.clear();
|
ReservationId reservationId) {
|
||||||
this.plans.addAll(plans);
|
CSQueue resQueue = cs.getQueue(reservationId.toString());
|
||||||
}
|
Resource reservationResource = null;
|
||||||
|
|
||||||
/**
|
|
||||||
* Sort in the order from the least new amount of resources asked (likely
|
|
||||||
* negative) to the highest. This prevents "order-of-operation" errors related
|
|
||||||
* to exceeding 100% capacity temporarily.
|
|
||||||
*/
|
|
||||||
private List<ReservationAllocation> sortByDelta(
|
|
||||||
List<ReservationAllocation> currentReservations, long now) {
|
|
||||||
Collections.sort(currentReservations, new ReservationAllocationComparator(
|
|
||||||
scheduler, now));
|
|
||||||
return currentReservations;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class ReservationAllocationComparator implements
|
|
||||||
Comparator<ReservationAllocation> {
|
|
||||||
CapacityScheduler scheduler;
|
|
||||||
long now;
|
|
||||||
|
|
||||||
ReservationAllocationComparator(CapacityScheduler scheduler, long now) {
|
|
||||||
this.scheduler = scheduler;
|
|
||||||
this.now = now;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Resource getUnallocatedReservedResources(
|
|
||||||
ReservationAllocation reservation) {
|
|
||||||
Resource resResource;
|
|
||||||
CSQueue resQueue =
|
|
||||||
scheduler.getQueue(reservation.getReservationId().toString());
|
|
||||||
if (resQueue != null) {
|
if (resQueue != null) {
|
||||||
resResource =
|
reservationResource = Resources.multiply(cs.getClusterResource(),
|
||||||
Resources.subtract(
|
resQueue.getAbsoluteCapacity());
|
||||||
reservation.getResourcesAtTime(now),
|
|
||||||
Resources.multiply(scheduler.getClusterResource(),
|
|
||||||
resQueue.getAbsoluteCapacity()));
|
|
||||||
} else {
|
|
||||||
resResource = reservation.getResourcesAtTime(now);
|
|
||||||
}
|
}
|
||||||
return resResource;
|
return reservationResource;
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int compare(ReservationAllocation lhs, ReservationAllocation rhs) {
|
|
||||||
// compute delta between current and previous reservation, and compare
|
|
||||||
// based on that
|
|
||||||
Resource lhsRes = getUnallocatedReservedResources(lhs);
|
|
||||||
Resource rhsRes = getUnallocatedReservedResources(rhs);
|
|
||||||
return lhsRes.compareTo(rhsRes);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This interface is used by the components to talk to the
|
* This interface is used by the components to talk to the
|
||||||
|
@ -98,6 +99,10 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
|
||||||
@Stable
|
@Stable
|
||||||
public Resource getMaximumResourceCapability();
|
public Resource getMaximumResourceCapability();
|
||||||
|
|
||||||
|
@LimitedPrivate("yarn")
|
||||||
|
@Evolving
|
||||||
|
ResourceCalculator getResourceCalculator();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the number of nodes available in the cluster.
|
* Get the number of nodes available in the cluster.
|
||||||
* @return the number of available nodes.
|
* @return the number of available nodes.
|
||||||
|
|
|
@ -544,9 +544,9 @@ public class FSLeafQueue extends FSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isStarved(Resource share) {
|
private boolean isStarved(Resource share) {
|
||||||
Resource desiredShare = Resources.min(FairScheduler.getResourceCalculator(),
|
Resource desiredShare = Resources.min(scheduler.getResourceCalculator(),
|
||||||
scheduler.getClusterResource(), share, getDemand());
|
scheduler.getClusterResource(), share, getDemand());
|
||||||
return Resources.lessThan(FairScheduler.getResourceCalculator(),
|
return Resources.lessThan(scheduler.getResourceCalculator(),
|
||||||
scheduler.getClusterResource(), getResourceUsage(), desiredShare);
|
scheduler.getClusterResource(), getResourceUsage(), desiredShare);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1094,7 +1094,8 @@ public class FairScheduler extends
|
||||||
return super.getApplicationAttempt(appAttemptId);
|
return super.getApplicationAttempt(appAttemptId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ResourceCalculator getResourceCalculator() {
|
@Override
|
||||||
|
public ResourceCalculator getResourceCalculator() {
|
||||||
return RESOURCE_CALCULATOR;
|
return RESOURCE_CALCULATOR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -919,6 +919,11 @@ public class FifoScheduler extends
|
||||||
return DEFAULT_QUEUE.getQueueUserAclInfo(null);
|
return DEFAULT_QUEUE.getQueueUserAclInfo(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResourceCalculator getResourceCalculator() {
|
||||||
|
return resourceCalculator;
|
||||||
|
}
|
||||||
|
|
||||||
private synchronized void addNode(RMNode nodeManager) {
|
private synchronized void addNode(RMNode nodeManager) {
|
||||||
FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
|
FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
|
||||||
usePortForNodeName);
|
usePortForNodeName);
|
||||||
|
|
|
@ -104,7 +104,7 @@ public class ReservationSystemTestUtil {
|
||||||
.assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
|
.assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setupFSAllocationFile(String allocationFile)
|
public static void setupFSAllocationFile(String allocationFile)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
PrintWriter out = new PrintWriter(new FileWriter(allocationFile));
|
PrintWriter out = new PrintWriter(new FileWriter(allocationFile));
|
||||||
out.println("<?xml version=\"1.0\"?>");
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
@ -130,7 +130,7 @@ public class ReservationSystemTestUtil {
|
||||||
out.close();
|
out.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
static void updateFSAllocationFile(String allocationFile)
|
public static void updateFSAllocationFile(String allocationFile)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
PrintWriter out = new PrintWriter(new FileWriter(allocationFile));
|
PrintWriter out = new PrintWriter(new FileWriter(allocationFile));
|
||||||
out.println("<?xml version=\"1.0\"?>");
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
|
|
@ -33,25 +33,20 @@ import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -62,21 +57,12 @@ import org.junit.rules.TestName;
|
||||||
import org.mockito.Matchers;
|
import org.mockito.Matchers;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
public class TestCapacitySchedulerPlanFollower {
|
public class TestCapacitySchedulerPlanFollower extends TestSchedulerPlanFollowerBase {
|
||||||
|
|
||||||
final static int GB = 1024;
|
|
||||||
|
|
||||||
private Clock mClock = null;
|
|
||||||
private CapacityScheduler scheduler = null;
|
|
||||||
private RMContext rmContext;
|
private RMContext rmContext;
|
||||||
private RMContext spyRMContext;
|
private RMContext spyRMContext;
|
||||||
private CapacitySchedulerContext csContext;
|
private CapacitySchedulerContext csContext;
|
||||||
private ReservationAgent mAgent;
|
private CapacityScheduler cs;
|
||||||
private Plan plan;
|
|
||||||
private Resource minAlloc = Resource.newInstance(GB, 1);
|
|
||||||
private Resource maxAlloc = Resource.newInstance(GB * 8, 8);
|
|
||||||
private ResourceCalculator res = new DefaultResourceCalculator();
|
|
||||||
private CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
|
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public TestName name = new TestName();
|
public TestName name = new TestName();
|
||||||
|
@ -84,7 +70,9 @@ public class TestCapacitySchedulerPlanFollower {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
CapacityScheduler spyCs = new CapacityScheduler();
|
CapacityScheduler spyCs = new CapacityScheduler();
|
||||||
scheduler = spy(spyCs);
|
cs = spy(spyCs);
|
||||||
|
scheduler = cs;
|
||||||
|
|
||||||
rmContext = TestUtils.getMockRMContext();
|
rmContext = TestUtils.getMockRMContext();
|
||||||
spyRMContext = spy(rmContext);
|
spyRMContext = spy(rmContext);
|
||||||
|
|
||||||
|
@ -100,7 +88,7 @@ public class TestCapacitySchedulerPlanFollower {
|
||||||
new CapacitySchedulerConfiguration();
|
new CapacitySchedulerConfiguration();
|
||||||
ReservationSystemTestUtil.setupQueueConfiguration(csConf);
|
ReservationSystemTestUtil.setupQueueConfiguration(csConf);
|
||||||
|
|
||||||
scheduler.setConf(csConf);
|
cs.setConf(csConf);
|
||||||
|
|
||||||
csContext = mock(CapacitySchedulerContext.class);
|
csContext = mock(CapacitySchedulerContext.class);
|
||||||
when(csContext.getConfiguration()).thenReturn(csConf);
|
when(csContext.getConfiguration()).thenReturn(csConf);
|
||||||
|
@ -119,9 +107,9 @@ public class TestCapacitySchedulerPlanFollower {
|
||||||
when(csContext.getContainerTokenSecretManager()).thenReturn(
|
when(csContext.getContainerTokenSecretManager()).thenReturn(
|
||||||
containerTokenSecretManager);
|
containerTokenSecretManager);
|
||||||
|
|
||||||
scheduler.setRMContext(spyRMContext);
|
cs.setRMContext(spyRMContext);
|
||||||
scheduler.init(csConf);
|
cs.init(csConf);
|
||||||
scheduler.start();
|
cs.start();
|
||||||
|
|
||||||
setupPlanFollower();
|
setupPlanFollower();
|
||||||
}
|
}
|
||||||
|
@ -132,7 +120,7 @@ public class TestCapacitySchedulerPlanFollower {
|
||||||
mAgent = mock(ReservationAgent.class);
|
mAgent = mock(ReservationAgent.class);
|
||||||
|
|
||||||
String reservationQ = testUtil.getFullReservationQueueName();
|
String reservationQ = testUtil.getFullReservationQueueName();
|
||||||
CapacitySchedulerConfiguration csConf = scheduler.getConfiguration();
|
CapacitySchedulerConfiguration csConf = cs.getConfiguration();
|
||||||
csConf.setReservationWindow(reservationQ, 20L);
|
csConf.setReservationWindow(reservationQ, 20L);
|
||||||
csConf.setMaximumCapacity(reservationQ, 40);
|
csConf.setMaximumCapacity(reservationQ, 40);
|
||||||
csConf.setAverageCapacity(reservationQ, 20);
|
csConf.setAverageCapacity(reservationQ, 20);
|
||||||
|
@ -153,155 +141,51 @@ public class TestCapacitySchedulerPlanFollower {
|
||||||
testPlanFollower(false);
|
testPlanFollower(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testPlanFollower(boolean isMove) throws PlanningException,
|
@Override
|
||||||
InterruptedException, AccessControlException {
|
protected void verifyCapacity(Queue defQ) {
|
||||||
// Initialize plan based on move flag
|
CSQueue csQueue = (CSQueue)defQ;
|
||||||
plan =
|
assertTrue(csQueue.getCapacity() > 0.9);
|
||||||
new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
|
}
|
||||||
scheduler.getClusterResource(), 1L, res,
|
|
||||||
scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated",
|
|
||||||
null, isMove);
|
|
||||||
|
|
||||||
// add a few reservations to the plan
|
@Override
|
||||||
long ts = System.currentTimeMillis();
|
protected Queue getDefaultQueue() {
|
||||||
ReservationId r1 = ReservationId.newInstance(ts, 1);
|
return cs.getQueue("dedicated" + PlanQueue.DEFAULT_QUEUE_SUFFIX);
|
||||||
int[] f1 = { 10, 10, 10, 10, 10 };
|
}
|
||||||
assertTrue(plan.toString(),
|
|
||||||
plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
|
|
||||||
"dedicated", 0, 0 + f1.length, ReservationSystemTestUtil
|
|
||||||
.generateAllocation(0L, 1L, f1), res, minAlloc)));
|
|
||||||
|
|
||||||
ReservationId r2 = ReservationId.newInstance(ts, 2);
|
@Override
|
||||||
assertTrue(plan.toString(),
|
protected int getNumberOfApplications(Queue queue) {
|
||||||
plan.addReservation(new InMemoryReservationAllocation(r2, null, "u3",
|
CSQueue csQueue = (CSQueue)queue;
|
||||||
"dedicated", 3, 3 + f1.length, ReservationSystemTestUtil
|
int numberOfApplications = csQueue.getNumApplications();
|
||||||
.generateAllocation(3L, 1L, f1), res, minAlloc)));
|
return numberOfApplications;
|
||||||
|
}
|
||||||
ReservationId r3 = ReservationId.newInstance(ts, 3);
|
|
||||||
int[] f2 = { 0, 10, 20, 10, 0 };
|
|
||||||
assertTrue(plan.toString(),
|
|
||||||
plan.addReservation(new InMemoryReservationAllocation(r3, null, "u4",
|
|
||||||
"dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
|
|
||||||
.generateAllocation(10L, 1L, f2), res, minAlloc)));
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected CapacitySchedulerPlanFollower createPlanFollower() {
|
||||||
CapacitySchedulerPlanFollower planFollower =
|
CapacitySchedulerPlanFollower planFollower =
|
||||||
new CapacitySchedulerPlanFollower();
|
new CapacitySchedulerPlanFollower();
|
||||||
planFollower.init(mClock, scheduler, Collections.singletonList(plan));
|
planFollower.init(mClock, scheduler, Collections.singletonList(plan));
|
||||||
|
return planFollower;
|
||||||
when(mClock.getTime()).thenReturn(0L);
|
|
||||||
planFollower.run();
|
|
||||||
|
|
||||||
CSQueue defQ =
|
|
||||||
scheduler.getQueue("dedicated" + PlanQueue.DEFAULT_QUEUE_SUFFIX);
|
|
||||||
CSQueue q = scheduler.getQueue(r1.toString());
|
|
||||||
assertNotNull(q);
|
|
||||||
// submit an app to r1
|
|
||||||
String user_0 = "test-user";
|
|
||||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
|
||||||
ApplicationAttemptId appAttemptId_0 =
|
|
||||||
ApplicationAttemptId.newInstance(appId, 0);
|
|
||||||
AppAddedSchedulerEvent addAppEvent =
|
|
||||||
new AppAddedSchedulerEvent(appId, q.getQueueName(), user_0);
|
|
||||||
scheduler.handle(addAppEvent);
|
|
||||||
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
|
|
||||||
new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
|
|
||||||
scheduler.handle(appAttemptAddedEvent);
|
|
||||||
|
|
||||||
// initial default reservation queue should have no apps
|
|
||||||
Assert.assertEquals(0, defQ.getNumApplications());
|
|
||||||
|
|
||||||
Assert.assertEquals(0.1, q.getCapacity(), 0.01);
|
|
||||||
Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
|
|
||||||
Assert.assertEquals(1, q.getNumApplications());
|
|
||||||
|
|
||||||
CSQueue q2 = scheduler.getQueue(r2.toString());
|
|
||||||
assertNull(q2);
|
|
||||||
CSQueue q3 = scheduler.getQueue(r3.toString());
|
|
||||||
assertNull(q3);
|
|
||||||
|
|
||||||
when(mClock.getTime()).thenReturn(3L);
|
|
||||||
planFollower.run();
|
|
||||||
|
|
||||||
Assert.assertEquals(0, defQ.getNumApplications());
|
|
||||||
q = scheduler.getQueue(r1.toString());
|
|
||||||
assertNotNull(q);
|
|
||||||
Assert.assertEquals(0.1, q.getCapacity(), 0.01);
|
|
||||||
Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
|
|
||||||
Assert.assertEquals(1, q.getNumApplications());
|
|
||||||
q2 = scheduler.getQueue(r2.toString());
|
|
||||||
assertNotNull(q2);
|
|
||||||
Assert.assertEquals(0.1, q.getCapacity(), 0.01);
|
|
||||||
Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
|
|
||||||
q3 = scheduler.getQueue(r3.toString());
|
|
||||||
assertNull(q3);
|
|
||||||
|
|
||||||
when(mClock.getTime()).thenReturn(10L);
|
|
||||||
planFollower.run();
|
|
||||||
|
|
||||||
q = scheduler.getQueue(r1.toString());
|
|
||||||
if (isMove) {
|
|
||||||
// app should have been moved to default reservation queue
|
|
||||||
Assert.assertEquals(1, defQ.getNumApplications());
|
|
||||||
assertNull(q);
|
|
||||||
} else {
|
|
||||||
// app should be killed
|
|
||||||
Assert.assertEquals(0, defQ.getNumApplications());
|
|
||||||
assertNotNull(q);
|
|
||||||
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
|
|
||||||
new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
|
|
||||||
RMAppAttemptState.KILLED, false);
|
|
||||||
scheduler.handle(appAttemptRemovedEvent);
|
|
||||||
}
|
}
|
||||||
q2 = scheduler.getQueue(r2.toString());
|
|
||||||
assertNull(q2);
|
|
||||||
q3 = scheduler.getQueue(r3.toString());
|
|
||||||
assertNotNull(q3);
|
|
||||||
Assert.assertEquals(0, q3.getCapacity(), 0.01);
|
|
||||||
Assert.assertEquals(1.0, q3.getMaximumCapacity(), 1.0);
|
|
||||||
|
|
||||||
when(mClock.getTime()).thenReturn(11L);
|
@Override
|
||||||
planFollower.run();
|
protected void assertReservationQueueExists(ReservationId r) {
|
||||||
|
CSQueue q = cs.getQueue(r.toString());
|
||||||
if (isMove) {
|
assertNotNull(q);
|
||||||
// app should have been moved to default reservation queue
|
|
||||||
Assert.assertEquals(1, defQ.getNumApplications());
|
|
||||||
} else {
|
|
||||||
// app should be killed
|
|
||||||
Assert.assertEquals(0, defQ.getNumApplications());
|
|
||||||
}
|
}
|
||||||
q = scheduler.getQueue(r1.toString());
|
|
||||||
assertNull(q);
|
@Override
|
||||||
q2 = scheduler.getQueue(r2.toString());
|
protected void assertReservationQueueExists(ReservationId r2,
|
||||||
|
double expectedCapacity, double expectedMaxCapacity) {
|
||||||
|
CSQueue q = cs.getQueue(r2.toString());
|
||||||
|
assertNotNull(q);
|
||||||
|
Assert.assertEquals(expectedCapacity, q.getCapacity(), 0.01);
|
||||||
|
Assert.assertEquals(expectedMaxCapacity, q.getMaximumCapacity(), 1.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void assertReservationQueueDoesNotExist(ReservationId r2) {
|
||||||
|
CSQueue q2 = cs.getQueue(r2.toString());
|
||||||
assertNull(q2);
|
assertNull(q2);
|
||||||
q3 = scheduler.getQueue(r3.toString());
|
|
||||||
assertNotNull(q3);
|
|
||||||
Assert.assertEquals(0.1, q3.getCapacity(), 0.01);
|
|
||||||
Assert.assertEquals(0.1, q3.getMaximumCapacity(), 1.0);
|
|
||||||
|
|
||||||
when(mClock.getTime()).thenReturn(12L);
|
|
||||||
planFollower.run();
|
|
||||||
|
|
||||||
q = scheduler.getQueue(r1.toString());
|
|
||||||
assertNull(q);
|
|
||||||
q2 = scheduler.getQueue(r2.toString());
|
|
||||||
assertNull(q2);
|
|
||||||
q3 = scheduler.getQueue(r3.toString());
|
|
||||||
assertNotNull(q3);
|
|
||||||
Assert.assertEquals(0.2, q3.getCapacity(), 0.01);
|
|
||||||
Assert.assertEquals(0.2, q3.getMaximumCapacity(), 1.0);
|
|
||||||
|
|
||||||
when(mClock.getTime()).thenReturn(16L);
|
|
||||||
planFollower.run();
|
|
||||||
|
|
||||||
q = scheduler.getQueue(r1.toString());
|
|
||||||
assertNull(q);
|
|
||||||
q2 = scheduler.getQueue(r2.toString());
|
|
||||||
assertNull(q2);
|
|
||||||
q3 = scheduler.getQueue(r3.toString());
|
|
||||||
assertNull(q3);
|
|
||||||
|
|
||||||
assertTrue(defQ.getCapacity() > 0.9);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ApplicationACLsManager mockAppACLsManager() {
|
public static ApplicationACLsManager mockAppACLsManager() {
|
||||||
|
@ -312,8 +196,11 @@ public class TestCapacitySchedulerPlanFollower {
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
if (scheduler != null) {
|
if (scheduler != null) {
|
||||||
scheduler.stop();
|
cs.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Queue getReservationQueue(String reservationId) {
|
||||||
|
return cs.getQueue(reservationId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,191 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||||
|
|
||||||
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
public abstract class TestSchedulerPlanFollowerBase {
|
||||||
|
final static int GB = 1024;
|
||||||
|
protected Clock mClock = null;
|
||||||
|
protected ResourceScheduler scheduler = null;
|
||||||
|
protected ReservationAgent mAgent;
|
||||||
|
protected Resource minAlloc = Resource.newInstance(GB, 1);
|
||||||
|
protected Resource maxAlloc = Resource.newInstance(GB * 8, 8);
|
||||||
|
protected CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
|
||||||
|
protected Plan plan;
|
||||||
|
private ResourceCalculator res = new DefaultResourceCalculator();
|
||||||
|
|
||||||
|
protected void testPlanFollower(boolean isMove) throws PlanningException,
|
||||||
|
InterruptedException, AccessControlException {
|
||||||
|
// Initialize plan based on move flag
|
||||||
|
plan =
|
||||||
|
new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
|
||||||
|
scheduler.getClusterResource(), 1L, res,
|
||||||
|
scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated",
|
||||||
|
null, isMove);
|
||||||
|
|
||||||
|
// add a few reservations to the plan
|
||||||
|
long ts = System.currentTimeMillis();
|
||||||
|
ReservationId r1 = ReservationId.newInstance(ts, 1);
|
||||||
|
int[] f1 = { 10, 10, 10, 10, 10 };
|
||||||
|
assertTrue(plan.toString(),
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
|
||||||
|
"dedicated", 0, 0 + f1.length, ReservationSystemTestUtil
|
||||||
|
.generateAllocation(0L, 1L, f1), res, minAlloc)));
|
||||||
|
|
||||||
|
ReservationId r2 = ReservationId.newInstance(ts, 2);
|
||||||
|
assertTrue(plan.toString(),
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(r2, null, "u3",
|
||||||
|
"dedicated", 3, 3 + f1.length, ReservationSystemTestUtil
|
||||||
|
.generateAllocation(3L, 1L, f1), res, minAlloc)));
|
||||||
|
|
||||||
|
ReservationId r3 = ReservationId.newInstance(ts, 3);
|
||||||
|
int[] f2 = { 0, 10, 20, 10, 0 };
|
||||||
|
assertTrue(plan.toString(),
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(r3, null, "u4",
|
||||||
|
"dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
|
||||||
|
.generateAllocation(10L, 1L, f2), res, minAlloc)));
|
||||||
|
|
||||||
|
AbstractSchedulerPlanFollower planFollower = createPlanFollower();
|
||||||
|
|
||||||
|
when(mClock.getTime()).thenReturn(0L);
|
||||||
|
planFollower.run();
|
||||||
|
|
||||||
|
Queue q = getReservationQueue(r1.toString());
|
||||||
|
assertReservationQueueExists(r1);
|
||||||
|
// submit an app to r1
|
||||||
|
String user_0 = "test-user";
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
|
ApplicationAttemptId appAttemptId_0 =
|
||||||
|
ApplicationAttemptId.newInstance(appId, 0);
|
||||||
|
AppAddedSchedulerEvent addAppEvent =
|
||||||
|
new AppAddedSchedulerEvent(appId, q.getQueueName(), user_0);
|
||||||
|
scheduler.handle(addAppEvent);
|
||||||
|
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
|
||||||
|
new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
|
||||||
|
scheduler.handle(appAttemptAddedEvent);
|
||||||
|
|
||||||
|
// initial default reservation queue should have no apps
|
||||||
|
|
||||||
|
Queue defQ = getDefaultQueue();
|
||||||
|
Assert.assertEquals(0, getNumberOfApplications(defQ));
|
||||||
|
|
||||||
|
assertReservationQueueExists(r1, 0.1, 0.1);
|
||||||
|
Assert.assertEquals(1, getNumberOfApplications(q));
|
||||||
|
|
||||||
|
assertReservationQueueDoesNotExist(r2);
|
||||||
|
assertReservationQueueDoesNotExist(r3);
|
||||||
|
|
||||||
|
when(mClock.getTime()).thenReturn(3L);
|
||||||
|
planFollower.run();
|
||||||
|
|
||||||
|
Assert.assertEquals(0, getNumberOfApplications(defQ));
|
||||||
|
assertReservationQueueExists(r1, 0.1, 0.1);
|
||||||
|
Assert.assertEquals(1, getNumberOfApplications(q));
|
||||||
|
assertReservationQueueExists(r2, 0.1, 0.1);
|
||||||
|
assertReservationQueueDoesNotExist(r3);
|
||||||
|
|
||||||
|
when(mClock.getTime()).thenReturn(10L);
|
||||||
|
planFollower.run();
|
||||||
|
|
||||||
|
q = getReservationQueue(r1.toString());
|
||||||
|
if (isMove) {
|
||||||
|
// app should have been moved to default reservation queue
|
||||||
|
Assert.assertEquals(1, getNumberOfApplications(defQ));
|
||||||
|
assertNull(q);
|
||||||
|
} else {
|
||||||
|
// app should be killed
|
||||||
|
Assert.assertEquals(0, getNumberOfApplications(defQ));
|
||||||
|
assertNotNull(q);
|
||||||
|
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
|
||||||
|
new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
|
||||||
|
RMAppAttemptState.KILLED, false);
|
||||||
|
scheduler.handle(appAttemptRemovedEvent);
|
||||||
|
}
|
||||||
|
assertReservationQueueDoesNotExist(r2);
|
||||||
|
assertReservationQueueExists(r3, 0, 1.0);
|
||||||
|
|
||||||
|
when(mClock.getTime()).thenReturn(11L);
|
||||||
|
planFollower.run();
|
||||||
|
|
||||||
|
if (isMove) {
|
||||||
|
// app should have been moved to default reservation queue
|
||||||
|
Assert.assertEquals(1, getNumberOfApplications(defQ));
|
||||||
|
} else {
|
||||||
|
// app should be killed
|
||||||
|
Assert.assertEquals(0, getNumberOfApplications(defQ));
|
||||||
|
}
|
||||||
|
assertReservationQueueDoesNotExist(r1);
|
||||||
|
assertReservationQueueDoesNotExist(r2);
|
||||||
|
assertReservationQueueExists(r3, 0.1, 0.1);
|
||||||
|
|
||||||
|
when(mClock.getTime()).thenReturn(12L);
|
||||||
|
planFollower.run();
|
||||||
|
|
||||||
|
assertReservationQueueDoesNotExist(r1);
|
||||||
|
assertReservationQueueDoesNotExist(r2);
|
||||||
|
assertReservationQueueExists(r3, 0.2, 0.2);
|
||||||
|
|
||||||
|
when(mClock.getTime()).thenReturn(16L);
|
||||||
|
planFollower.run();
|
||||||
|
|
||||||
|
assertReservationQueueDoesNotExist(r1);
|
||||||
|
assertReservationQueueDoesNotExist(r2);
|
||||||
|
assertReservationQueueDoesNotExist(r3);
|
||||||
|
|
||||||
|
verifyCapacity(defQ);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract Queue getReservationQueue(String reservationId);
|
||||||
|
|
||||||
|
protected abstract void verifyCapacity(Queue defQ);
|
||||||
|
|
||||||
|
protected abstract Queue getDefaultQueue();
|
||||||
|
|
||||||
|
protected abstract int getNumberOfApplications(Queue queue);
|
||||||
|
|
||||||
|
protected abstract AbstractSchedulerPlanFollower createPlanFollower();
|
||||||
|
|
||||||
|
protected abstract void assertReservationQueueExists(ReservationId r);
|
||||||
|
|
||||||
|
protected abstract void assertReservationQueueExists(ReservationId r2,
|
||||||
|
double expectedCapacity, double expectedMaxCapacity);
|
||||||
|
|
||||||
|
protected abstract void assertReservationQueueDoesNotExist(ReservationId r2);
|
||||||
|
}
|
Loading…
Reference in New Issue