YARN-569. Add support for requesting and enforcing preemption requests via

a capacity monitor. Contributed by Carlo Curino, Chris Douglas


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1502084 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Christopher Douglas 2013-07-11 01:20:37 +00:00
parent f0983d958d
commit 495c24331e
18 changed files with 1825 additions and 103 deletions

View File

@ -456,6 +456,9 @@ Release 2.1.0-beta - 2013-07-02
YARN-883. Expose Fair Scheduler-specific queue metrics. (sandyr via tucu) YARN-883. Expose Fair Scheduler-specific queue metrics. (sandyr via tucu)
YARN-569. Add support for requesting and enforcing preemption requests via
a capacity monitor. (Carlo Curino, cdouglas)
OPTIMIZATIONS OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it YARN-512. Log aggregation root directory check is more expensive than it

View File

@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.util.Records;
@Stable @Stable
public abstract class Priority implements Comparable<Priority> { public abstract class Priority implements Comparable<Priority> {
public static final Priority UNDEFINED = newInstance(-1);
@Public @Public
@Stable @Stable
public static Priority newInstance(int p) { public static Priority newInstance(int p) {

View File

@ -132,6 +132,18 @@ public class YarnConfiguration extends Configuration {
RM_PREFIX + "scheduler.client.thread-count"; RM_PREFIX + "scheduler.client.thread-count";
public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 50; public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 50;
/**
* Enable periodic monitor threads.
* @see #RM_SCHEDULER_MONITOR_POLICIES
*/
public static final String RM_SCHEDULER_ENABLE_MONITORS =
RM_PREFIX + "scheduler.monitor.enable";
public static final boolean DEFAULT_RM_SCHEDULER_ENABLE_MONITORS = false;
/** List of SchedulingEditPolicy classes affecting the scheduler. */
public static final String RM_SCHEDULER_MONITOR_POLICIES =
RM_PREFIX + "scheduler.monitor.policies";
/** The address of the RM web application.*/ /** The address of the RM web application.*/
public static final String RM_WEBAPP_ADDRESS = public static final String RM_WEBAPP_ADDRESS =
RM_PREFIX + "webapp.address"; RM_PREFIX + "webapp.address";

View File

@ -291,6 +291,22 @@
<value>1000</value> <value>1000</value>
</property> </property>
<property>
<description>Enable a set of periodic monitors (specified in
yarn.resourcemanager.scheduler.monitor.policies) that affect the
scheduler.</description>
<name>yarn.resourcemanager.scheduler.monitor.enable</name>
<value>false</value>
</property>
<property>
<description>The list of SchedulingEditPolicy classes that interact with
the scheduler. A particular module may be incompatible with the
scheduler, other policies, or a configuration of either.</description>
<name>yarn.resourcemanager.scheduler.monitor.policies</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy</value>
</property>
<!-- Node Manager Configs --> <!-- Node Manager Configs -->
<property> <property>
<description>The hostname of the NM.</description> <description>The hostname of the NM.</description>

View File

@ -411,6 +411,19 @@ public class ApplicationMasterService extends AbstractService implements
allocateResponse.setResponseId(lastResponse.getResponseId() + 1); allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
allocateResponse.setAvailableResources(allocation.getResourceLimit()); allocateResponse.setAvailableResources(allocation.getResourceLimit());
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
// add preemption to the allocateResponse message (if any)
allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
// Adding NMTokens for allocated containers.
if (!allocation.getContainers().isEmpty()) {
allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager()
.createAndGetNMTokens(app.getUser(), appAttemptId,
allocation.getContainers()));
}
// before returning response, verify in sync
AllocateResponse oldResponse = AllocateResponse oldResponse =
responseMap.put(appAttemptId, allocateResponse); responseMap.put(appAttemptId, allocateResponse);
if (oldResponse == null) { if (oldResponse == null) {
@ -421,18 +434,7 @@ public class ApplicationMasterService extends AbstractService implements
LOG.error(message); LOG.error(message);
return resync; return resync;
} }
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
// add preemption to the allocateResponse message (if any)
allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
// Adding NMTokens for allocated containers.
if (!allocation.getContainers().isEmpty()) {
allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager()
.createAndGetNMTokens(app.getUser(), appAttemptId,
allocation.getContainers()));
}
return allocateResponse; return allocateResponse;
} }
} }

View File

@ -18,8 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@ -61,9 +63,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
@ -237,6 +243,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
throw new RuntimeException("Failed to initialize scheduler", ioe); throw new RuntimeException("Failed to initialize scheduler", ioe);
} }
// creating monitors that handle preemption
createPolicyMonitors();
masterService = createApplicationMasterService(); masterService = createApplicationMasterService();
addService(masterService) ; addService(masterService) ;
@ -315,7 +324,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
} catch (ClassNotFoundException e) { } catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate Scheduler: " throw new YarnRuntimeException("Could not instantiate Scheduler: "
+ schedulerClassName, e); + schedulerClassName, e);
} } }
}
protected ApplicationMasterLauncher createAMLauncher() { protected ApplicationMasterLauncher createAMLauncher() {
return new ApplicationMasterLauncher(this.rmContext); return new ApplicationMasterLauncher(this.rmContext);
@ -476,6 +486,36 @@ public class ResourceManager extends CompositeService implements Recoverable {
} }
} }
@Private
public static final class
RMContainerPreemptEventDispatcher
implements EventHandler<ContainerPreemptEvent> {
private final PreemptableResourceScheduler scheduler;
public RMContainerPreemptEventDispatcher(
PreemptableResourceScheduler scheduler) {
this.scheduler = scheduler;
}
@Override
public void handle(ContainerPreemptEvent event) {
ApplicationAttemptId aid = event.getAppId();
RMContainer container = event.getContainer();
switch (event.getType()) {
case DROP_RESERVATION:
scheduler.dropContainerReservation(container);
break;
case PREEMPT_CONTAINER:
scheduler.preemptContainer(aid, container);
break;
case KILL_CONTAINER:
scheduler.killContainer(container);
break;
}
}
}
@Private @Private
public static final class ApplicationAttemptEventDispatcher implements public static final class ApplicationAttemptEventDispatcher implements
EventHandler<RMAppAttemptEvent> { EventHandler<RMAppAttemptEvent> {
@ -676,7 +716,37 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected ApplicationMasterService createApplicationMasterService() { protected ApplicationMasterService createApplicationMasterService() {
return new ApplicationMasterService(this.rmContext, scheduler); return new ApplicationMasterService(this.rmContext, scheduler);
} }
protected void createPolicyMonitors() {
if (scheduler instanceof PreemptableResourceScheduler
&& conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) {
LOG.info("Loading policy monitors");
List<SchedulingEditPolicy> policies = conf.getInstances(
YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
SchedulingEditPolicy.class);
if (policies.size() > 0) {
this.rmDispatcher.register(ContainerPreemptEventType.class,
new RMContainerPreemptEventDispatcher(
(PreemptableResourceScheduler) scheduler));
for (SchedulingEditPolicy policy : policies) {
LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
policy.init(conf, this.rmContext.getDispatcher().getEventHandler(),
(PreemptableResourceScheduler) scheduler);
// periodically check whether we need to take action to guarantee
// constraints
SchedulingMonitor mon = new SchedulingMonitor(policy);
addService(mon);
}
} else {
LOG.warn("Policy monitors configured (" +
YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS +
") but none specified (" +
YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES + ")");
}
}
}
protected AdminService createAdminService( protected AdminService createAdminService(
ClientRMService clientRMService, ClientRMService clientRMService,

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
public interface SchedulingEditPolicy {
public void init(Configuration config,
EventHandler<ContainerPreemptEvent> dispatcher,
PreemptableResourceScheduler scheduler);
/**
* This method is invoked at regular intervals. Internally the policy is
* allowed to track containers and affect the scheduler. The "actions"
* performed are passed back through an EventHandler.
*/
public void editSchedule();
public long getMonitoringInterval();
public String getPolicyName();
}

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import com.google.common.annotations.VisibleForTesting;
public class SchedulingMonitor extends AbstractService {
private final SchedulingEditPolicy scheduleEditPolicy;
private static final Log LOG = LogFactory.getLog(SchedulingMonitor.class);
//thread which runs periodically to see the last time since a heartbeat is
//received.
private Thread checkerThread;
private volatile boolean stopped;
private long monitorInterval;
public SchedulingMonitor(SchedulingEditPolicy scheduleEditPolicy) {
super("SchedulingMonitor (" + scheduleEditPolicy.getPolicyName() + ")");
this.scheduleEditPolicy = scheduleEditPolicy;
this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
}
public long getMonitorInterval() {
return monitorInterval;
}
public void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
}
@Override
public void serviceStart() throws Exception {
assert !stopped : "starting when already stopped";
checkerThread = new Thread(new PreemptionChecker());
checkerThread.setName(getName());
checkerThread.start();
super.serviceStart();
}
@Override
public void serviceStop() throws Exception {
stopped = true;
if (checkerThread != null) {
checkerThread.interrupt();
}
super.serviceStop();
}
@VisibleForTesting
public void invokePolicy(){
scheduleEditPolicy.editSchedule();
}
private class PreemptionChecker implements Runnable {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
//invoke the preemption policy at a regular pace
//the policy will generate preemption or kill events
//managed by the dispatcher
invokePolicy();
try {
Thread.sleep(monitorInterval);
} catch (InterruptedException e) {
LOG.info(getName() + " thread interrupted");
break;
}
}
}
}
}

View File

@ -0,0 +1,669 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
/**
* This class implement a {@link SchedulingEditPolicy} that is designed to be
* paired with the {@code CapacityScheduler}. At every invocation of {@code
* editSchedule()} it computes the ideal amount of resources assigned to each
* queue (for each queue in the hierarchy), and determines whether preemption
* is needed. Overcapacity is distributed among queues in a weighted fair manner,
* where the weight is the amount of guaranteed capacity for the queue.
* Based on this ideal assignment it determines whether preemption is required
* and select a set of containers from each application that would be killed if
* the corresponding amount of resources is not freed up by the application.
*
* If not in {@code observeOnly} mode, it triggers preemption requests via a
* {@link ContainerPreemptEvent} that the {@code ResourceManager} will ensure
* to deliver to the application (or to execute).
*
* If the deficit of resources is persistent over a long enough period of time
* this policy will trigger forced termination of containers (again by generating
* {@link ContainerPreemptEvent}).
*/
public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolicy {
private static final Log LOG =
LogFactory.getLog(ProportionalCapacityPreemptionPolicy.class);
/** If true, run the policy but do not affect the cluster with preemption and
* kill events. */
public static final String OBSERVE_ONLY =
"yarn.resourcemanager.monitor.capacity.preemption.observe_only";
/** Time in milliseconds between invocations of this policy */
public static final String MONITORING_INTERVAL =
"yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval";
/** Time in milliseconds between requesting a preemption from an application
* and killing the container. */
public static final String WAIT_TIME_BEFORE_KILL =
"yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill";
/** Maximum percentage of resources preempted in a single round. By
* controlling this value one can throttle the pace at which containers are
* reclaimed from the cluster. After computing the total desired preemption,
* the policy scales it back within this limit. */
public static final String TOTAL_PREEMPTION_PER_ROUND =
"yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round";
/** Maximum amount of resources above the target capacity ignored for
* preemption. This defines a deadzone around the target capacity that helps
* prevent thrashing and oscillations around the computed target balance.
* High values would slow the time to capacity and (absent natural
* completions) it might prevent convergence to guaranteed capacity. */
public static final String MAX_IGNORED_OVER_CAPACITY =
"yarn.resourcemanager.monitor.capacity.preemption.max_ignored_over_capacity";
/**
* Given a computed preemption target, account for containers naturally
* expiring and preempt only this percentage of the delta. This determines
* the rate of geometric convergence into the deadzone ({@link
* #MAX_IGNORED_OVER_CAPACITY}). For example, a termination factor of 0.5
* will reclaim almost 95% of resources within 5 * {@link
* #WAIT_TIME_BEFORE_KILL}, even absent natural termination. */
public static final String NATURAL_TERMINATION_FACTOR =
"yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
//the dispatcher to send preempt and kill events
public EventHandler<ContainerPreemptEvent> dispatcher;
private final Clock clock;
private double maxIgnoredOverCapacity;
private long maxWaitTime;
private CapacityScheduler scheduler;
private long monitoringInterval;
private final Map<RMContainer,Long> preempted =
new HashMap<RMContainer,Long>();
private ResourceCalculator rc;
private float percentageClusterPreemptionAllowed;
private double naturalTerminationFactor;
private boolean observeOnly;
public ProportionalCapacityPreemptionPolicy() {
clock = new SystemClock();
}
public ProportionalCapacityPreemptionPolicy(Configuration config,
EventHandler<ContainerPreemptEvent> dispatcher,
CapacityScheduler scheduler) {
this(config, dispatcher, scheduler, new SystemClock());
}
public ProportionalCapacityPreemptionPolicy(Configuration config,
EventHandler<ContainerPreemptEvent> dispatcher,
CapacityScheduler scheduler, Clock clock) {
init(config, dispatcher, scheduler);
this.clock = clock;
}
public void init(Configuration config,
EventHandler<ContainerPreemptEvent> disp,
PreemptableResourceScheduler sched) {
LOG.info("Preemption monitor:" + this.getClass().getCanonicalName());
assert null == scheduler : "Unexpected duplicate call to init";
if (!(sched instanceof CapacityScheduler)) {
throw new YarnRuntimeException("Class " +
sched.getClass().getCanonicalName() + " not instance of " +
CapacityScheduler.class.getCanonicalName());
}
dispatcher = disp;
scheduler = (CapacityScheduler) sched;
maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1);
naturalTerminationFactor =
config.getDouble(NATURAL_TERMINATION_FACTOR, 0.2);
maxWaitTime = config.getLong(WAIT_TIME_BEFORE_KILL, 15000);
monitoringInterval = config.getLong(MONITORING_INTERVAL, 3000);
percentageClusterPreemptionAllowed =
config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1);
observeOnly = config.getBoolean(OBSERVE_ONLY, false);
rc = scheduler.getResourceCalculator();
}
@Override
public void editSchedule(){
CSQueue root = scheduler.getRootQueue();
Resource clusterResources =
Resources.clone(scheduler.getClusterResources());
containerBasedPreemptOrKill(root, clusterResources);
}
/**
* This method selects and tracks containers to be preempted. If a container
* is in the target list for more than maxWaitTime it is killed.
*
* @param root the root of the CapacityScheduler queue hierarchy
* @param clusterResources the total amount of resources in the cluster
*/
private void containerBasedPreemptOrKill(CSQueue root,
Resource clusterResources) {
// extract a summary of the queues from scheduler
TempQueue tRoot;
synchronized (scheduler) {
tRoot = cloneQueues(root, clusterResources);
}
// compute the ideal distribution of resources among queues
// updates cloned queues state accordingly
tRoot.idealAssigned = tRoot.guaranteed;
Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
percentageClusterPreemptionAllowed);
List<TempQueue> queues =
recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
// based on ideal allocation select containers to be preempted from each
// queue and each application
Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
getContainersToPreempt(queues, clusterResources);
logToCSV(queues);
// if we are in observeOnly mode return before any action is taken
if (observeOnly) {
return;
}
// preempt (or kill) the selected containers
for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
: toPreempt.entrySet()) {
for (RMContainer container : e.getValue()) {
// if we tried to preempt this for more than maxWaitTime
if (preempted.get(container) != null &&
preempted.get(container) + maxWaitTime < clock.getTime()) {
// kill it
dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
ContainerPreemptEventType.KILL_CONTAINER));
preempted.remove(container);
} else {
//otherwise just send preemption events
dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
ContainerPreemptEventType.PREEMPT_CONTAINER));
if (preempted.get(container) == null) {
preempted.put(container, clock.getTime());
}
}
}
}
// Keep the preempted list clean
for (Iterator<RMContainer> i = preempted.keySet().iterator(); i.hasNext();){
RMContainer id = i.next();
// garbage collect containers that are irrelevant for preemption
if (preempted.get(id) + 2 * maxWaitTime < clock.getTime()) {
i.remove();
}
}
}
/**
* This method recursively computes the ideal assignment of resources to each
* level of the hierarchy. This ensures that leafs that are over-capacity but
* with parents within capacity will not be preempted. Preemptions are allowed
* within each subtree according to local over/under capacity.
*
* @param root the root of the cloned queue hierachy
* @param totalPreemptionAllowed maximum amount of preemption allowed
* @return a list of leaf queues updated with preemption targets
*/
private List<TempQueue> recursivelyComputeIdealAssignment(
TempQueue root, Resource totalPreemptionAllowed) {
List<TempQueue> leafs = new ArrayList<TempQueue>();
if (root.getChildren() != null &&
root.getChildren().size() > 0) {
// compute ideal distribution at this level
computeIdealResourceDistribution(rc, root.getChildren(),
totalPreemptionAllowed, root.idealAssigned);
// compute recursively for lower levels and build list of leafs
for(TempQueue t : root.getChildren()) {
leafs.addAll(recursivelyComputeIdealAssignment(t, totalPreemptionAllowed));
}
} else {
// we are in a leaf nothing to do, just return yourself
return Collections.singletonList(root);
}
return leafs;
}
/**
* This method computes (for a single level in the tree, passed as a {@code
* List<TempQueue>}) the ideal assignment of resources. This is done
* recursively to allocate capacity fairly across all queues with pending
* demands. It terminates when no resources are left to assign, or when all
* demand is satisfied.
*
* @param rc resource calculator
* @param queues a list of cloned queues to be assigned capacity to (this is
* an out param)
* @param totalPreemptionAllowed total amount of preemption we allow
* @param tot_guarant the amount of capacity assigned to this pool of queues
*/
private void computeIdealResourceDistribution(ResourceCalculator rc,
List<TempQueue> queues, Resource totalPreemptionAllowed, Resource tot_guarant) {
// qAlloc tracks currently active queues (will decrease progressively as
// demand is met)
List<TempQueue> qAlloc = new ArrayList<TempQueue>(queues);
// unassigned tracks how much resources are still to assign, initialized
// with the total capacity for this set of queues
Resource unassigned = Resources.clone(tot_guarant);
//assign all cluster resources until no more demand, or no resources are left
while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
unassigned, Resources.none())) {
Resource wQassigned = Resource.newInstance(0, 0);
// we compute normalizedGuarantees capacity based on currently active
// queues
resetCapacity(rc, unassigned, qAlloc);
// offer for each queue their capacity first and in following invocations
// their share of over-capacity
for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
TempQueue sub = i.next();
Resource wQavail =
Resources.multiply(unassigned, sub.normalizedGuarantee);
Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
Resource wQdone = Resources.subtract(wQavail, wQidle);
// if the queue returned a value > 0 it means it is fully satisfied
// and it is removed from the list of active queues qAlloc
if (!Resources.greaterThan(rc, tot_guarant,
wQdone, Resources.none())) {
i.remove();
}
Resources.addTo(wQassigned, wQdone);
}
Resources.subtractFrom(unassigned, wQassigned);
}
// based on ideal assignment computed above and current assignment we derive
// how much preemption is required overall
Resource totPreemptionNeeded = Resource.newInstance(0, 0);
for (TempQueue t:queues) {
if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) {
Resources.addTo(totPreemptionNeeded,
Resources.subtract(t.current, t.idealAssigned));
}
}
// if we need to preempt more than is allowed, compute a factor (0<f<1)
// that is used to scale down how much we ask back from each queue
float scalingFactor = 1.0F;
if (Resources.greaterThan(rc, tot_guarant,
totPreemptionNeeded, totalPreemptionAllowed)) {
scalingFactor = Resources.divide(rc, tot_guarant,
totalPreemptionAllowed, totPreemptionNeeded);
}
// assign to each queue the amount of actual preemption based on local
// information of ideal preemption and scaling factor
for (TempQueue t : queues) {
t.assignPreemption(scalingFactor, rc, tot_guarant);
}
if (LOG.isDebugEnabled()) {
long time = clock.getTime();
for (TempQueue t : queues) {
LOG.debug(time + ": " + t);
}
}
}
/**
* Computes a normalizedGuaranteed capacity based on active queues
* @param rc resource calculator
* @param clusterResource the total amount of resources in the cluster
* @param queues the list of queues to consider
*/
private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
List<TempQueue> queues) {
Resource activeCap = Resource.newInstance(0, 0);
for (TempQueue q : queues) {
Resources.addTo(activeCap, q.guaranteed);
}
for (TempQueue q : queues) {
q.normalizedGuarantee = Resources.divide(rc, clusterResource,
q.guaranteed, activeCap);
}
}
/**
* Based a resource preemption target drop reservations of containers and
* if necessary select containers for preemption from applications in each
* over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to
* account for containers that will naturally complete.
*
* @param queues set of leaf queues to preempt from
* @param clusterResource total amount of cluster resources
* @return a map of applciationID to set of containers to preempt
*/
private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
List<TempQueue> queues, Resource clusterResource) {
Map<ApplicationAttemptId,Set<RMContainer>> list =
new HashMap<ApplicationAttemptId,Set<RMContainer>>();
for (TempQueue qT : queues) {
// we act only if we are violating balance by more than
// maxIgnoredOverCapacity
if (Resources.greaterThan(rc, clusterResource, qT.current,
Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) {
// we introduce a dampening factor naturalTerminationFactor that
// accounts for natural termination of containers
Resource resToObtain =
Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
// lock the leafqueue while we scan applications and unreserve
synchronized(qT.leafQueue) {
NavigableSet<FiCaSchedulerApp> ns =
(NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
Iterator<FiCaSchedulerApp> desc = ns.descendingIterator();
qT.actuallyPreempted = Resources.clone(resToObtain);
while (desc.hasNext()) {
FiCaSchedulerApp fc = desc.next();
if (Resources.lessThanOrEqual(rc, clusterResource,
resToObtain, Resources.none())) {
break;
}
list.put(fc.getApplicationAttemptId(),
preemptFrom(fc, clusterResource, resToObtain));
}
}
}
}
return list;
}
/**
* Given a target preemption for a specific application, select containers
* to preempt (after unreserving all reservation for that app).
*
* @param app
* @param clusterResource
* @param rsrcPreempt
* @return
*/
private Set<RMContainer> preemptFrom(
FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt) {
Set<RMContainer> ret = new HashSet<RMContainer>();
ApplicationAttemptId appId = app.getApplicationAttemptId();
// first drop reserved containers towards rsrcPreempt
List<RMContainer> reservations =
new ArrayList<RMContainer>(app.getReservedContainers());
for (RMContainer c : reservations) {
if (Resources.lessThanOrEqual(rc, clusterResource,
rsrcPreempt, Resources.none())) {
return ret;
}
if (!observeOnly) {
dispatcher.handle(new ContainerPreemptEvent(appId, c,
ContainerPreemptEventType.DROP_RESERVATION));
}
Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
}
// if more resources are to be freed go through all live containers in
// reverse priority and reverse allocation order and mark them for
// preemption
List<RMContainer> containers =
new ArrayList<RMContainer>(app.getLiveContainers());
sortContainers(containers);
for (RMContainer c : containers) {
if (Resources.lessThanOrEqual(rc, clusterResource,
rsrcPreempt, Resources.none())) {
return ret;
}
ret.add(c);
Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
}
return ret;
}
/**
* Compare by reversed priority order first, and then reversed containerId
* order
* @param containers
*/
@VisibleForTesting
static void sortContainers(List<RMContainer> containers){
Collections.sort(containers, new Comparator<RMContainer>() {
@Override
public int compare(RMContainer a, RMContainer b) {
Comparator<Priority> c = new org.apache.hadoop.yarn.server
.resourcemanager.resource.Priority.Comparator();
int priorityComp = c.compare(b.getContainer().getPriority(),
a.getContainer().getPriority());
if (priorityComp != 0) {
return priorityComp;
}
return b.getContainerId().getId() -
a.getContainerId().getId();
}
});
}
@Override
public long getMonitoringInterval() {
return monitoringInterval;
}
@Override
public String getPolicyName() {
return "ProportionalCapacityPreemptionPolicy";
}
/**
* This method walks a tree of CSQueue and clones the portion of the state
* relevant for preemption in TempQueue(s). It also maintains a pointer to
* the leaves. Finally it aggregates pending resources in each queue and rolls
* it up to higher levels.
*
* @param root the root of the CapacityScheduler queue hierarchy
* @param clusterResources the total amount of resources in the cluster
* @return the root of the cloned queue hierarchy
*/
private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
TempQueue ret;
synchronized (root) {
float absUsed = root.getAbsoluteUsedCapacity();
Resource current = Resources.multiply(clusterResources, absUsed);
Resource guaranteed =
Resources.multiply(clusterResources, root.getAbsoluteCapacity());
if (root instanceof LeafQueue) {
LeafQueue l = (LeafQueue) root;
Resource pending = l.getTotalResourcePending();
ret = new TempQueue(root.getQueueName(), current, pending, guaranteed);
ret.setLeafQueue(l);
} else {
Resource pending = Resource.newInstance(0, 0);
ret = new TempQueue(root.getQueueName(), current, pending, guaranteed);
for (CSQueue c : root.getChildQueues()) {
ret.addChild(cloneQueues(c, clusterResources));
}
}
}
return ret;
}
// simple printout function that reports internal queue state (useful for
// plotting)
private void logToCSV(List<TempQueue> unorderedqueues){
List<TempQueue> queues = new ArrayList<TempQueue>(unorderedqueues);
Collections.sort(queues, new Comparator<TempQueue>(){
@Override
public int compare(TempQueue o1, TempQueue o2) {
return o1.queueName.compareTo(o2.queueName);
}});
String queueState = " QUEUESTATE: " + clock.getTime();
StringBuilder sb = new StringBuilder();
sb.append(queueState);
for (TempQueue tq : queues) {
sb.append(", ");
tq.appendLogString(sb);
}
LOG.info(sb.toString());
}
/**
* Temporary data-structure tracking resource availability, pending resource
* need, current utilization. Used to clone {@link CSQueue}.
*/
static class TempQueue {
final String queueName;
final Resource current;
final Resource pending;
final Resource guaranteed;
Resource idealAssigned;
Resource toBePreempted;
Resource actuallyPreempted;
double normalizedGuarantee;
final ArrayList<TempQueue> children;
LeafQueue leafQueue;
TempQueue(String queueName, Resource current, Resource pending,
Resource guaranteed) {
this.queueName = queueName;
this.current = current;
this.pending = pending;
this.guaranteed = guaranteed;
this.idealAssigned = Resource.newInstance(0, 0);
this.actuallyPreempted = Resource.newInstance(0, 0);
this.toBePreempted = Resource.newInstance(0, 0);
this.normalizedGuarantee = Float.NaN;
this.children = new ArrayList<TempQueue>();
}
public void setLeafQueue(LeafQueue l){
assert children.size() == 0;
this.leafQueue = l;
}
/**
* When adding a child we also aggregate its pending resource needs.
* @param q the child queue to add to this queue
*/
public void addChild(TempQueue q) {
assert leafQueue == null;
children.add(q);
Resources.addTo(pending, q.pending);
}
public void addChildren(ArrayList<TempQueue> queues) {
assert leafQueue == null;
children.addAll(queues);
}
public ArrayList<TempQueue> getChildren(){
return children;
}
// This function "accepts" all the resources it can (pending) and return
// the unused ones
Resource offer(Resource avail, ResourceCalculator rc,
Resource clusterResource) {
// remain = avail - min(avail, current + pending - assigned)
Resource accepted = Resources.min(rc, clusterResource,
avail,
Resources.subtract(
Resources.add(current, pending),
idealAssigned));
Resource remain = Resources.subtract(avail, accepted);
Resources.addTo(idealAssigned, accepted);
return remain;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("CUR: ").append(current)
.append(" PEN: ").append(pending)
.append(" GAR: ").append(guaranteed)
.append(" NORM: ").append(normalizedGuarantee)
.append(" IDEAL_ASSIGNED: ").append(idealAssigned)
.append(" IDEAL_PREEMPT: ").append(toBePreempted)
.append(" ACTUAL_PREEMPT: ").append(actuallyPreempted);
return sb.toString();
}
public void assignPreemption(float scalingFactor,
ResourceCalculator rc, Resource clusterResource) {
if (Resources.greaterThan(rc, clusterResource, current, idealAssigned)) {
toBePreempted = Resources.multiply(
Resources.subtract(current, idealAssigned), scalingFactor);
} else {
toBePreempted = Resource.newInstance(0, 0);
}
}
void appendLogString(StringBuilder sb) {
sb.append(queueName).append(", ")
.append(current.getMemory()).append(", ")
.append(current.getVirtualCores()).append(", ")
.append(pending.getMemory()).append(", ")
.append(pending.getVirtualCores()).append(", ")
.append(guaranteed.getMemory()).append(", ")
.append(guaranteed.getVirtualCores()).append(", ")
.append(idealAssigned.getMemory()).append(", ")
.append(idealAssigned.getVirtualCores()).append(", ")
.append(toBePreempted.getMemory()).append(", ")
.append(toBePreempted.getVirtualCores() ).append(", ")
.append(actuallyPreempted.getMemory()).append(", ")
.append(actuallyPreempted.getVirtualCores());
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -205,6 +206,14 @@ public class AppSchedulingInfo {
return requests.get(priority); return requests.get(priority);
} }
synchronized public List<ResourceRequest> getAllResourceRequests() {
List<ResourceRequest> ret = new ArrayList<ResourceRequest>();
for (Map<String, ResourceRequest> r : requests.values()) {
ret.addAll(r.values());
}
return ret;
}
synchronized public ResourceRequest getResourceRequest(Priority priority, synchronized public ResourceRequest getResourceRequest(Priority priority,
String resourceName) { String resourceName) {
Map<String, ResourceRequest> nodeRequests = requests.get(priority); Map<String, ResourceRequest> nodeRequests = requests.get(priority);

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
/**
* Simple event class used to communicate containers unreservations, preemption, killing
*/
public class ContainerPreemptEvent
extends AbstractEvent<ContainerPreemptEventType> {
private final ApplicationAttemptId aid;
private final RMContainer container;
public ContainerPreemptEvent(ApplicationAttemptId aid, RMContainer container,
ContainerPreemptEventType type) {
super(type);
this.aid = aid;
this.container = container;
}
public RMContainer getContainer(){
return this.container;
}
public ApplicationAttemptId getAppId() {
return aid;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(super.toString());
sb.append(" ").append(getAppId());
sb.append(" ").append(getContainer().getContainerId());
return sb.toString();
}
}

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
public enum ContainerPreemptEventType {
DROP_RESERVATION,
PREEMPT_CONTAINER,
KILL_CONTAINER
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
/**
* Interface for a scheduler that supports preemption/killing
*
*/
public interface PreemptableResourceScheduler extends ResourceScheduler {
/**
* If the scheduler support container reservations, this method is used to
* ask the scheduler to drop the reservation for the given container.
* @param container Reference to reserved container allocation.
*/
void dropContainerReservation(RMContainer container);
/**
* Ask the scheduler to obtain back the container from a specific application
* by issuing a preemption request
* @param aid the application from which we want to get a container back
* @param container the container we want back
*/
void preemptContainer(ApplicationAttemptId aid, RMContainer container);
/**
* Ask the scheduler to forcibly interrupt the container given as input
* @param container
*/
void killContainer(RMContainer container);
}

View File

@ -61,8 +61,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@ -83,8 +83,9 @@ import org.apache.hadoop.yarn.util.resource.Resources;
@LimitedPrivate("yarn") @LimitedPrivate("yarn")
@Evolving @Evolving
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class CapacityScheduler public class CapacityScheduler
implements ResourceScheduler, CapacitySchedulerContext, Configurable { implements PreemptableResourceScheduler, CapacitySchedulerContext,
Configurable {
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
@ -525,8 +526,8 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
// Sanity check // Sanity check
SchedulerUtils.normalizeRequests( SchedulerUtils.normalizeRequests(
ask, calculator, getClusterResources(), minimumAllocation, ask, getResourceCalculator(), getClusterResources(),
maximumAllocation); getMinimumResourceCapability(), maximumAllocation);
// Release containers // Release containers
for (ContainerId releasedContainerId : release) { for (ContainerId releasedContainerId : release) {
@ -578,9 +579,8 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
" #ask=" + ask.size()); " #ask=" + ask.size());
} }
return new Allocation( return application.getAllocation(getResourceCalculator(),
application.pullNewlyAllocatedContainers(), clusterResource, getMinimumResourceCapability());
application.getHeadroom());
} }
} }
@ -812,7 +812,8 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
Container container = rmContainer.getContainer(); Container container = rmContainer.getContainer();
// Get the application for the finished container // Get the application for the finished container
ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId(); ApplicationAttemptId applicationAttemptId =
container.getId().getApplicationAttemptId();
FiCaSchedulerApp application = getApplication(applicationAttemptId); FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) { if (application == null) {
LOG.info("Container " + container + " of" + LOG.info("Container " + container + " of" +
@ -869,5 +870,41 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
FiCaSchedulerNode node = getNode(nodeId); FiCaSchedulerNode node = getNode(nodeId);
return node == null ? null : new SchedulerNodeReport(node); return node == null ? null : new SchedulerNodeReport(node);
} }
@Override
public void dropContainerReservation(RMContainer container) {
if(LOG.isDebugEnabled()){
LOG.debug("DROP_RESERVATION:" + container.toString());
}
completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
SchedulerUtils.UNRESERVED_CONTAINER),
RMContainerEventType.KILL);
}
@Override
public void preemptContainer(ApplicationAttemptId aid, RMContainer cont) {
if(LOG.isDebugEnabled()){
LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() +
" container: " + cont.toString());
}
FiCaSchedulerApp app = applications.get(aid);
if (app != null) {
app.addPreemptContainer(cont.getContainerId());
}
}
@Override
public void killContainer(RMContainer cont) {
if(LOG.isDebugEnabled()){
LOG.debug("KILL_CONTAINER: container" + cont.toString());
}
completedContainer(cont,
SchedulerUtils.createAbnormalContainerStatus(
cont.getContainerId(),"Container being forcibly preempted:"
+ cont.getContainerId()),
RMContainerEventType.KILL);
}
} }

View File

@ -1390,17 +1390,19 @@ public class LeafQueue implements CSQueue {
node.reserveResource(application, priority, rmContainer); node.reserveResource(application, priority, rmContainer);
} }
private void unreserve(FiCaSchedulerApp application, Priority priority, private boolean unreserve(FiCaSchedulerApp application, Priority priority,
FiCaSchedulerNode node, RMContainer rmContainer) { FiCaSchedulerNode node, RMContainer rmContainer) {
// Done with the reservation? // Done with the reservation?
application.unreserve(node, priority); if (application.unreserve(node, priority)) {
node.unreserveResource(application); node.unreserveResource(application);
// Update reserved metrics
getMetrics().unreserveResource(
application.getUser(), rmContainer.getContainer().getResource());
}
// Update reserved metrics
getMetrics().unreserveResource(
application.getUser(), rmContainer.getContainer().getResource());
return true;
}
return false;
}
@Override @Override
public void completedContainer(Resource clusterResource, public void completedContainer(Resource clusterResource,
@ -1411,37 +1413,40 @@ public class LeafQueue implements CSQueue {
synchronized (this) { synchronized (this) {
Container container = rmContainer.getContainer(); Container container = rmContainer.getContainer();
boolean removed = false;
// Inform the application & the node // Inform the application & the node
// Note: It's safe to assume that all state changes to RMContainer // Note: It's safe to assume that all state changes to RMContainer
// happen under scheduler's lock... // happen under scheduler's lock...
// So, this is, in effect, a transaction across application & node // So, this is, in effect, a transaction across application & node
if (rmContainer.getState() == RMContainerState.RESERVED) { if (rmContainer.getState() == RMContainerState.RESERVED) {
unreserve(application, rmContainer.getReservedPriority(), removed = unreserve(application, rmContainer.getReservedPriority(),
node, rmContainer); node, rmContainer);
} else { } else {
application.containerCompleted(rmContainer, containerStatus, event); removed =
application.containerCompleted(rmContainer, containerStatus, event);
node.releaseContainer(container); node.releaseContainer(container);
} }
// Book-keeping // Book-keeping
releaseResource(clusterResource, if (removed) {
application, container.getResource()); releaseResource(clusterResource,
application, container.getResource());
LOG.info("completedContainer" + LOG.info("completedContainer" +
" container=" + container + " container=" + container +
" resource=" + container.getResource() + " resource=" + container.getResource() +
" queue=" + this + " queue=" + this +
" usedCapacity=" + getUsedCapacity() + " usedCapacity=" + getUsedCapacity() +
" absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
" used=" + usedResources + " used=" + usedResources +
" cluster=" + clusterResource); " cluster=" + clusterResource);
// Inform the parent queue
getParent().completedContainer(clusterResource, application,
node, rmContainer, null, event);
}
} }
// Inform the parent queue
getParent().completedContainer(clusterResource, application,
node, rmContainer, null, event);
} }
} }
@ -1588,5 +1593,19 @@ public class LeafQueue implements CSQueue {
getParent().recoverContainer(clusterResource, application, container); getParent().recoverContainer(clusterResource, application, container);
} }
// need to access the list of apps from the preemption monitor
public Set<FiCaSchedulerApp> getApplications() {
return Collections.unmodifiableSet(activeApplications);
}
// return a single Resource capturing the overal amount of pending resources
public Resource getTotalResourcePending() {
Resource ret = BuilderUtils.newResource(0, 0);
for (FiCaSchedulerApp f : activeApplications) {
Resources.addTo(ret, f.getTotalPendingRequests());
}
return ret;
}
} }

View File

@ -20,9 +20,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -38,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
@ -53,11 +55,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import com.google.common.collect.HashMultiset; import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset; import com.google.common.collect.Multiset;
@ -85,17 +89,19 @@ public class FiCaSchedulerApp extends SchedulerApplication {
private Resource resourceLimit = recordFactory private Resource resourceLimit = recordFactory
.newRecordInstance(Resource.class); .newRecordInstance(Resource.class);
private Map<ContainerId, RMContainer> liveContainers private Map<ContainerId, RMContainer> liveContainers =
= new HashMap<ContainerId, RMContainer>(); new HashMap<ContainerId, RMContainer>();
private List<RMContainer> newlyAllocatedContainers = private List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>(); new ArrayList<RMContainer>();
final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
new HashMap<Priority, Map<NodeId, RMContainer>>(); new HashMap<Priority, Map<NodeId, RMContainer>>();
private boolean isStopped = false; private boolean isStopped = false;
private final Set<ContainerId> containersToPreempt =
new HashSet<ContainerId>();
/** /**
* Count how many times the application has been given an opportunity * Count how many times the application has been given an opportunity
* to schedule a task at each priority. Each time the scheduler * to schedule a task at each priority. Each time the scheduler
@ -219,12 +225,17 @@ public class FiCaSchedulerApp extends SchedulerApplication {
RMContainerEventType.LAUNCHED)); RMContainerEventType.LAUNCHED));
} }
synchronized public void containerCompleted(RMContainer rmContainer, synchronized public boolean containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) { ContainerStatus containerStatus, RMContainerEventType event) {
// Remove from the list of containers
if (null == liveContainers.remove(rmContainer.getContainerId())) {
return false;
}
Container container = rmContainer.getContainer(); Container container = rmContainer.getContainer();
ContainerId containerId = container.getId(); ContainerId containerId = container.getId();
// Inform the container // Inform the container
rmContainer.handle( rmContainer.handle(
new RMContainerFinishedEvent( new RMContainerFinishedEvent(
@ -234,9 +245,8 @@ public class FiCaSchedulerApp extends SchedulerApplication {
); );
LOG.info("Completed container: " + rmContainer.getContainerId() + LOG.info("Completed container: " + rmContainer.getContainerId() +
" in state: " + rmContainer.getState() + " event:" + event); " in state: " + rmContainer.getState() + " event:" + event);
// Remove from the list of containers containersToPreempt.remove(rmContainer.getContainerId());
liveContainers.remove(rmContainer.getContainerId());
RMAuditLogger.logSuccess(getUser(), RMAuditLogger.logSuccess(getUser(),
AuditConstants.RELEASE_CONTAINER, "SchedulerApp", AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
@ -246,6 +256,8 @@ public class FiCaSchedulerApp extends SchedulerApplication {
Resource containerResource = rmContainer.getContainer().getResource(); Resource containerResource = rmContainer.getContainer().getResource();
queue.getMetrics().releaseResources(getUser(), 1, containerResource); queue.getMetrics().releaseResources(getUser(), 1, containerResource);
Resources.subtractFrom(currentConsumption, containerResource); Resources.subtractFrom(currentConsumption, containerResource);
return true;
} }
synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
@ -345,7 +357,8 @@ public class FiCaSchedulerApp extends SchedulerApplication {
} }
/** /**
* Return the number of times the application has been given an opportunity * @param priority Target priority
* @return the number of times the application has been given an opportunity
* to schedule a task at the given priority since the last time it * to schedule a task at the given priority since the last time it
* successfully did so. * successfully did so.
*/ */
@ -419,33 +432,36 @@ public class FiCaSchedulerApp extends SchedulerApplication {
return rmContainer; return rmContainer;
} }
public synchronized void unreserve(FiCaSchedulerNode node, Priority priority) { public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) {
Map<NodeId, RMContainer> reservedContainers = Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority); this.reservedContainers.get(priority);
RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
if (reservedContainers.isEmpty()) {
this.reservedContainers.remove(priority);
}
// reservedContainer should not be null here
if (reservedContainer == null) {
String errorMesssage =
"Application " + getApplicationId() + " is trying to unreserve "
+ " on node " + node + ", currently has "
+ reservedContainers.size() + " at priority " + priority
+ "; currentReservation " + currentReservation;
LOG.warn(errorMesssage);
throw new YarnRuntimeException(errorMesssage);
}
// Reset the re-reservation count
resetReReservations(priority);
Resource resource = reservedContainer.getContainer().getResource(); if (reservedContainers != null) {
Resources.subtractFrom(currentReservation, resource); RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
LOG.info("Application " + getApplicationId() + " unreserved " + " on node " // unreserve is now triggered in new scenarios (preemption)
+ node + ", currently has " + reservedContainers.size() + " at priority " // as a consequence reservedcontainer might be null, adding NP-checks
+ priority + "; currentReservation " + currentReservation); if (reservedContainer != null
&& reservedContainer.getContainer() != null
&& reservedContainer.getContainer().getResource() != null) {
if (reservedContainers.isEmpty()) {
this.reservedContainers.remove(priority);
}
// Reset the re-reservation count
resetReReservations(priority);
Resource resource = reservedContainer.getContainer().getResource();
Resources.subtractFrom(currentReservation, resource);
LOG.info("Application " + getApplicationId() + " unreserved "
+ " on node " + node + ", currently has " + reservedContainers.size()
+ " at priority " + priority + "; currentReservation "
+ currentReservation);
return true;
}
}
return false;
} }
/** /**
@ -509,4 +525,55 @@ public class FiCaSchedulerApp extends SchedulerApplication {
public Queue getQueue() { public Queue getQueue() {
return queue; return queue;
} }
public Resource getTotalPendingRequests() {
Resource ret = Resource.newInstance(0, 0);
for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) {
// to avoid double counting we count only "ANY" resource requests
if (ResourceRequest.isAnyLocation(rr.getResourceName())){
Resources.addTo(ret,
Resources.multiply(rr.getCapability(), rr.getNumContainers()));
}
}
return ret;
}
public synchronized void addPreemptContainer(ContainerId cont){
// ignore already completed containers
if (liveContainers.containsKey(cont)) {
containersToPreempt.add(cont);
}
}
/**
* This method produces an Allocation that includes the current view
* of the resources that will be allocated to and preempted from this
* application.
*
* @param rc
* @param clusterResource
* @param minimumAllocation
* @return an allocation
*/
public synchronized Allocation getAllocation(ResourceCalculator rc,
Resource clusterResource, Resource minimumAllocation) {
Set<ContainerId> currentContPreemption = Collections.unmodifiableSet(
new HashSet<ContainerId>(containersToPreempt));
containersToPreempt.clear();
Resource tot = Resource.newInstance(0, 0);
for(ContainerId c : currentContPreemption){
Resources.addTo(tot,
liveContainers.get(c).getContainer().getResource());
}
int numCont = (int) Math.ceil(
Resources.divide(rc, clusterResource, tot, minimumAllocation));
ResourceRequest rr = ResourceRequest.newInstance(
Priority.UNDEFINED, ResourceRequest.ANY,
minimumAllocation, numCont);
return new Allocation(pullNewlyAllocatedContainers(), getHeadroom(),
null, currentContPreemption,
Collections.singletonList(rr));
}
} }

View File

@ -142,8 +142,9 @@ public class FiCaSchedulerNode extends SchedulerNode {
} }
/* remove the containers from the nodemanger */ /* remove the containers from the nodemanger */
launchedContainers.remove(container.getId()); if (null != launchedContainers.remove(container.getId())) {
updateResource(container); updateResource(container);
}
LOG.info("Released container " + container.getId() + LOG.info("Released container " + container.getId() +
" of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
@ -226,18 +227,25 @@ public class FiCaSchedulerNode extends SchedulerNode {
public synchronized void unreserveResource( public synchronized void unreserveResource(
SchedulerApplication application) { SchedulerApplication application) {
// Cannot unreserve for wrong application...
ApplicationAttemptId reservedApplication =
reservedContainer.getContainer().getId().getApplicationAttemptId();
if (!reservedApplication.equals(
application.getApplicationAttemptId())) {
throw new IllegalStateException("Trying to unreserve " +
" for application " + application.getApplicationAttemptId() +
" when currently reserved " +
" for application " + reservedApplication.getApplicationId() +
" on node " + this);
}
// adding NP checks as this can now be called for preemption
if (reservedContainer != null
&& reservedContainer.getContainer() != null
&& reservedContainer.getContainer().getId() != null
&& reservedContainer.getContainer().getId().getApplicationAttemptId() != null) {
// Cannot unreserve for wrong application...
ApplicationAttemptId reservedApplication =
reservedContainer.getContainer().getId().getApplicationAttemptId();
if (!reservedApplication.equals(
application.getApplicationAttemptId())) {
throw new IllegalStateException("Trying to unreserve " +
" for application " + application.getApplicationAttemptId() +
" when currently reserved " +
" for application " + reservedApplication.getApplicationId() +
" on node " + this);
}
}
reservedContainer = null; reservedContainer = null;
} }

View File

@ -0,0 +1,541 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.NavigableSet;
import java.util.Random;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
public class TestProportionalCapacityPreemptionPolicy {
static final long TS = 3141592653L;
int appAlloc = 0;
Random rand = null;
Clock mClock = null;
Configuration conf = null;
CapacityScheduler mCS = null;
EventHandler<ContainerPreemptEvent> mDisp = null;
ResourceCalculator rc = new DefaultResourceCalculator();
final ApplicationAttemptId appA = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 0), 0);
final ApplicationAttemptId appB = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 1), 0);
final ApplicationAttemptId appC = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 2), 0);
final ApplicationAttemptId appD = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 3), 0);
final ApplicationAttemptId appE = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 4), 0);
final ArgumentCaptor<ContainerPreemptEvent> evtCaptor =
ArgumentCaptor.forClass(ContainerPreemptEvent.class);
@Rule public TestName name = new TestName();
@Before
@SuppressWarnings("unchecked")
public void setup() {
conf = new Configuration(false);
conf.setLong(WAIT_TIME_BEFORE_KILL, 10000);
conf.setLong(MONITORING_INTERVAL, 3000);
// report "ideal" preempt
conf.setFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 1.0);
conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 1.0);
mClock = mock(Clock.class);
mCS = mock(CapacityScheduler.class);
when(mCS.getResourceCalculator()).thenReturn(rc);
mDisp = mock(EventHandler.class);
rand = new Random();
long seed = rand.nextLong();
System.out.println(name.getMethodName() + " SEED: " + seed);
rand.setSeed(seed);
appAlloc = 0;
}
@Test
public void testIgnore() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 0, 60, 40 }, // used
{ 0, 0, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 3, 1, 1, 1 }, // apps
{ -1, 1, 1, 1 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// don't correct imbalances without demand
verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
}
@Test
public void testProportionalPreemption() {
int[][] qData = new int[][]{
// / A B C D
{ 100, 10, 40, 20, 30 }, // abs
{ 100, 30, 60, 10, 0 }, // used
{ 45, 20, 5, 20, 0 }, // pending
{ 0, 0, 0, 0, 0 }, // reserved
{ 3, 1, 1, 1, 0 }, // apps
{ -1, 1, 1, 1, 1 }, // req granularity
{ 4, 0, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testPreemptCycle() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 0, 60, 40 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 3, 1, 1, 1 }, // apps
{ -1, 1, 1, 1 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// ensure all pending rsrc from A get preempted from other queues
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
}
@Test
public void testExpireKill() {
final long killTime = 10000L;
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 0, 60, 40 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 3, 1, 1, 1 }, // apps
{ -1, 1, 1, 1 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
conf.setLong(WAIT_TIME_BEFORE_KILL, killTime);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
// ensure all pending rsrc from A get preempted from other queues
when(mClock.getTime()).thenReturn(0L);
policy.editSchedule();
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
// requests reiterated
when(mClock.getTime()).thenReturn(killTime / 2);
policy.editSchedule();
verify(mDisp, times(20)).handle(argThat(new IsPreemptionRequestFor(appC)));
// kill req sent
when(mClock.getTime()).thenReturn(killTime + 1);
policy.editSchedule();
verify(mDisp, times(30)).handle(evtCaptor.capture());
List<ContainerPreemptEvent> events = evtCaptor.getAllValues();
for (ContainerPreemptEvent e : events.subList(20, 30)) {
assertEquals(appC, e.getAppId());
assertEquals(KILL_CONTAINER, e.getType());
}
}
@Test
public void testDeadzone() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 39, 43, 21 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 3, 1, 1, 1 }, // apps
{ -1, 1, 1, 1 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
conf.setFloat(MAX_IGNORED_OVER_CAPACITY, (float) 0.1);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// ignore 10% overcapacity to avoid jitter
verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
}
@Test
public void testOverCapacityImbalance() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 55, 45, 0 }, // used
{ 20, 10, 10, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 2, 1, 1, 0 }, // apps
{ -1, 1, 1, 0 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// correct imbalance between over-capacity queues
verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testNaturalTermination() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 55, 45, 0 }, // used
{ 20, 10, 10, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 2, 1, 1, 0 }, // apps
{ -1, 1, 1, 0 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 0.1);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// ignore 10% imbalance between over-capacity queues
verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
}
@Test
public void testObserveOnly() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 90, 10, 0 }, // used
{ 80, 10, 20, 50 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 2, 1, 1, 0 }, // apps
{ -1, 1, 1, 0 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
conf.setBoolean(OBSERVE_ONLY, true);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// verify even severe imbalance not affected
verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
}
@Test
public void testHierarchical() {
int[][] qData = new int[][] {
// / A B C D E F
{ 200, 100, 50, 50, 100, 10, 90 }, // abs
{ 200, 110, 60, 50, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 10, 0, 10 }, // pending
{ 0, 0, 0, 0, 0, 0, 0 }, // reserved
{ 4, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1 }, // req granularity
{ 2, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// verify capacity taken from A1, not B1 despite B1 being far over
// its absolute guaranteed capacity
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testHierarchicalLarge() {
int[][] qData = new int[][] {
// / A B C D E F G H I
{ 400, 200, 60,140, 100, 70, 30, 100, 10, 90 }, // abs
{ 400, 210, 70,140, 100, 50, 50, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 0, 0, 0, 0, 0, 15 }, // pending
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
{ 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity
{ 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// verify capacity taken from A1, not H1 despite H1 being far over
// its absolute guaranteed capacity
// XXX note: compensating for rounding error in Resources.multiplyTo
// which is likely triggered since we use small numbers for readability
verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appE)));
}
@Test
public void testContainerOrdering(){
List<RMContainer> containers = new ArrayList<RMContainer>();
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 10), 0);
// create a set of containers
RMContainer rm1 = mockContainer(appAttId, 5, mock(Resource.class), 3);
RMContainer rm2 = mockContainer(appAttId, 3, mock(Resource.class), 3);
RMContainer rm3 = mockContainer(appAttId, 2, mock(Resource.class), 2);
RMContainer rm4 = mockContainer(appAttId, 1, mock(Resource.class), 2);
RMContainer rm5 = mockContainer(appAttId, 4, mock(Resource.class), 1);
// insert them in non-sorted order
containers.add(rm3);
containers.add(rm2);
containers.add(rm1);
containers.add(rm5);
containers.add(rm4);
// sort them
ProportionalCapacityPreemptionPolicy.sortContainers(containers);
// verify the "priority"-first, "reverse container-id"-second
// ordering is enforced correctly
assert containers.get(0).equals(rm1);
assert containers.get(1).equals(rm2);
assert containers.get(2).equals(rm3);
assert containers.get(3).equals(rm4);
assert containers.get(4).equals(rm5);
}
static class IsPreemptionRequestFor
extends ArgumentMatcher<ContainerPreemptEvent> {
private final ApplicationAttemptId appAttId;
private final ContainerPreemptEventType type;
IsPreemptionRequestFor(ApplicationAttemptId appAttId) {
this(appAttId, PREEMPT_CONTAINER);
}
IsPreemptionRequestFor(ApplicationAttemptId appAttId,
ContainerPreemptEventType type) {
this.appAttId = appAttId;
this.type = type;
}
@Override
public boolean matches(Object o) {
return appAttId.equals(((ContainerPreemptEvent)o).getAppId())
&& type.equals(((ContainerPreemptEvent)o).getType());
}
@Override
public String toString() {
return appAttId.toString();
}
}
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
ProportionalCapacityPreemptionPolicy policy =
new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock);
ParentQueue mRoot = buildMockRootQueue(rand, qData);
when(mCS.getRootQueue()).thenReturn(mRoot);
Resource clusterResources =
Resource.newInstance(leafAbsCapacities(qData[0], qData[6]), 0);
when(mCS.getClusterResources()).thenReturn(clusterResources);
return policy;
}
ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
int[] abs = queueData[0];
int[] used = queueData[1];
int[] pending = queueData[2];
int[] reserved = queueData[3];
int[] apps = queueData[4];
int[] gran = queueData[5];
int[] queues = queueData[6];
return mockNested(abs, used, pending, reserved, apps, gran, queues);
}
ParentQueue mockNested(int[] abs, int[] used,
int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) {
float tot = leafAbsCapacities(abs, queues);
Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
ParentQueue root = mockParentQueue(null, queues[0], pqs);
when(root.getQueueName()).thenReturn("/");
when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
for (int i = 1; i < queues.length; ++i) {
final CSQueue q;
final ParentQueue p = pqs.removeLast();
final String queueName = "queue" + ((char)('A' + i - 1));
if (queues[i] > 0) {
q = mockParentQueue(p, queues[i], pqs);
} else {
q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran);
}
when(q.getParent()).thenReturn(p);
when(q.getQueueName()).thenReturn(queueName);
when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
}
assert 0 == pqs.size();
return root;
}
ParentQueue mockParentQueue(ParentQueue p, int subqueues,
Deque<ParentQueue> pqs) {
ParentQueue pq = mock(ParentQueue.class);
List<CSQueue> cqs = new ArrayList<CSQueue>();
when(pq.getChildQueues()).thenReturn(cqs);
for (int i = 0; i < subqueues; ++i) {
pqs.add(pq);
}
if (p != null) {
p.getChildQueues().add(pq);
}
return pq;
}
LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
LeafQueue lq = mock(LeafQueue.class);
when(lq.getTotalResourcePending()).thenReturn(
Resource.newInstance(pending[i], 0));
// consider moving where CapacityScheduler::comparator accessible
NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
new Comparator<FiCaSchedulerApp>() {
@Override
public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
return a1.getApplicationAttemptId()
.compareTo(a2.getApplicationAttemptId());
}
});
// applications are added in global L->R order in queues
if (apps[i] != 0) {
int aUsed = used[i] / apps[i];
int aPending = pending[i] / apps[i];
int aReserve = reserved[i] / apps[i];
for (int a = 0; a < apps[i]; ++a) {
qApps.add(mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]));
++appAlloc;
}
}
when(lq.getApplications()).thenReturn(qApps);
p.getChildQueues().add(lq);
return lq;
}
FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved,
int gran) {
FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
ApplicationId appId = ApplicationId.newInstance(TS, id);
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0);
when(app.getApplicationId()).thenReturn(appId);
when(app.getApplicationAttemptId()).thenReturn(appAttId);
int cAlloc = 0;
Resource unit = Resource.newInstance(gran, 0);
List<RMContainer> cReserved = new ArrayList<RMContainer>();
for (int i = 0; i < reserved; i += gran) {
cReserved.add(mockContainer(appAttId, cAlloc, unit, 1));
++cAlloc;
}
when(app.getReservedContainers()).thenReturn(cReserved);
List<RMContainer> cLive = new ArrayList<RMContainer>();
for (int i = 0; i < used; i += gran) {
cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
++cAlloc;
}
when(app.getLiveContainers()).thenReturn(cLive);
return app;
}
RMContainer mockContainer(ApplicationAttemptId appAttId, int id,
Resource r, int priority) {
ContainerId cId = ContainerId.newInstance(appAttId, id);
Container c = mock(Container.class);
when(c.getResource()).thenReturn(r);
when(c.getPriority()).thenReturn(Priority.create(priority));
RMContainer mC = mock(RMContainer.class);
when(mC.getContainerId()).thenReturn(cId);
when(mC.getContainer()).thenReturn(c);
return mC;
}
static int leafAbsCapacities(int[] abs, int[] subqueues) {
int ret = 0;
for (int i = 0; i < abs.length; ++i) {
if (0 == subqueues[i]) {
ret += abs[i];
}
}
return ret;
}
void printString(CSQueue nq, String indent) {
if (nq instanceof ParentQueue) {
System.out.println(indent + nq.getQueueName()
+ " cur:" + nq.getAbsoluteUsedCapacity()
+ " guar:" + nq.getAbsoluteCapacity()
);
for (CSQueue q : ((ParentQueue)nq).getChildQueues()) {
printString(q, indent + " ");
}
} else {
System.out.println(indent + nq.getQueueName()
+ " pen:" + ((LeafQueue) nq).getTotalResourcePending()
+ " cur:" + nq.getAbsoluteUsedCapacity()
+ " guar:" + nq.getAbsoluteCapacity()
);
for (FiCaSchedulerApp a : ((LeafQueue)nq).getApplications()) {
System.out.println(indent + " " + a.getApplicationId());
}
}
}
}