diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 29775532d57..eb86c742eb7 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -456,6 +456,9 @@ Release 2.1.0-beta - 2013-07-02 YARN-883. Expose Fair Scheduler-specific queue metrics. (sandyr via tucu) + YARN-569. Add support for requesting and enforcing preemption requests via + a capacity monitor. (Carlo Curino, cdouglas) + OPTIMIZATIONS YARN-512. Log aggregation root directory check is more expensive than it diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Priority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Priority.java index 0d825bf46ef..1e3d2290538 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Priority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Priority.java @@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.util.Records; @Stable public abstract class Priority implements Comparable { + public static final Priority UNDEFINED = newInstance(-1); + @Public @Stable public static Priority newInstance(int p) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 8b5dc083d80..44c35c3d58b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -132,6 +132,18 @@ public class YarnConfiguration extends Configuration { RM_PREFIX + "scheduler.client.thread-count"; public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 50; + /** + * Enable periodic monitor threads. + * @see #RM_SCHEDULER_MONITOR_POLICIES + */ + public static final String RM_SCHEDULER_ENABLE_MONITORS = + RM_PREFIX + "scheduler.monitor.enable"; + public static final boolean DEFAULT_RM_SCHEDULER_ENABLE_MONITORS = false; + + /** List of SchedulingEditPolicy classes affecting the scheduler. */ + public static final String RM_SCHEDULER_MONITOR_POLICIES = + RM_PREFIX + "scheduler.monitor.policies"; + /** The address of the RM web application.*/ public static final String RM_WEBAPP_ADDRESS = RM_PREFIX + "webapp.address"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 72845384857..b6753bc4adc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -291,6 +291,22 @@ 1000 + + Enable a set of periodic monitors (specified in + yarn.resourcemanager.scheduler.monitor.policies) that affect the + scheduler. + yarn.resourcemanager.scheduler.monitor.enable + false + + + + The list of SchedulingEditPolicy classes that interact with + the scheduler. A particular module may be incompatible with the + scheduler, other policies, or a configuration of either. + yarn.resourcemanager.scheduler.monitor.policies + org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy + + The hostname of the NM. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index d5f0a67fef3..aa9be4b6b9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -411,6 +411,19 @@ public class ApplicationMasterService extends AbstractService implements allocateResponse.setResponseId(lastResponse.getResponseId() + 1); allocateResponse.setAvailableResources(allocation.getResourceLimit()); + allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); + + // add preemption to the allocateResponse message (if any) + allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation)); + + // Adding NMTokens for allocated containers. + if (!allocation.getContainers().isEmpty()) { + allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager() + .createAndGetNMTokens(app.getUser(), appAttemptId, + allocation.getContainers())); + } + + // before returning response, verify in sync AllocateResponse oldResponse = responseMap.put(appAttemptId, allocateResponse); if (oldResponse == null) { @@ -421,18 +434,7 @@ public class ApplicationMasterService extends AbstractService implements LOG.error(message); return resync; } - - allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); - - // add preemption to the allocateResponse message (if any) - allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation)); - - // Adding NMTokens for allocated containers. - if (!allocation.getContainers().isEmpty()) { - allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager() - .createAndGetNMTokens(app.getUser(), appAttemptId, - allocation.getContainers())); - } + return allocateResponse; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 291f24410b1..841f387e7d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -18,8 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; - import java.io.IOException; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -61,9 +63,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; @@ -237,6 +243,9 @@ public class ResourceManager extends CompositeService implements Recoverable { throw new RuntimeException("Failed to initialize scheduler", ioe); } + // creating monitors that handle preemption + createPolicyMonitors(); + masterService = createApplicationMasterService(); addService(masterService) ; @@ -315,7 +324,8 @@ public class ResourceManager extends CompositeService implements Recoverable { } catch (ClassNotFoundException e) { throw new YarnRuntimeException("Could not instantiate Scheduler: " + schedulerClassName, e); - } } + } + } protected ApplicationMasterLauncher createAMLauncher() { return new ApplicationMasterLauncher(this.rmContext); @@ -476,6 +486,36 @@ public class ResourceManager extends CompositeService implements Recoverable { } } + @Private + public static final class + RMContainerPreemptEventDispatcher + implements EventHandler { + + private final PreemptableResourceScheduler scheduler; + + public RMContainerPreemptEventDispatcher( + PreemptableResourceScheduler scheduler) { + this.scheduler = scheduler; + } + + @Override + public void handle(ContainerPreemptEvent event) { + ApplicationAttemptId aid = event.getAppId(); + RMContainer container = event.getContainer(); + switch (event.getType()) { + case DROP_RESERVATION: + scheduler.dropContainerReservation(container); + break; + case PREEMPT_CONTAINER: + scheduler.preemptContainer(aid, container); + break; + case KILL_CONTAINER: + scheduler.killContainer(container); + break; + } + } + } + @Private public static final class ApplicationAttemptEventDispatcher implements EventHandler { @@ -676,7 +716,37 @@ public class ResourceManager extends CompositeService implements Recoverable { protected ApplicationMasterService createApplicationMasterService() { return new ApplicationMasterService(this.rmContext, scheduler); } - + + protected void createPolicyMonitors() { + if (scheduler instanceof PreemptableResourceScheduler + && conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) { + LOG.info("Loading policy monitors"); + List policies = conf.getInstances( + YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + SchedulingEditPolicy.class); + if (policies.size() > 0) { + this.rmDispatcher.register(ContainerPreemptEventType.class, + new RMContainerPreemptEventDispatcher( + (PreemptableResourceScheduler) scheduler)); + for (SchedulingEditPolicy policy : policies) { + LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName()); + policy.init(conf, this.rmContext.getDispatcher().getEventHandler(), + (PreemptableResourceScheduler) scheduler); + // periodically check whether we need to take action to guarantee + // constraints + SchedulingMonitor mon = new SchedulingMonitor(policy); + addService(mon); + + } + } else { + LOG.warn("Policy monitors configured (" + + YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS + + ") but none specified (" + + YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES + ")"); + } + } + } protected AdminService createAdminService( ClientRMService clientRMService, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java new file mode 100644 index 00000000000..1ebc19fbbe0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.monitor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; + +public interface SchedulingEditPolicy { + + public void init(Configuration config, + EventHandler dispatcher, + PreemptableResourceScheduler scheduler); + + /** + * This method is invoked at regular intervals. Internally the policy is + * allowed to track containers and affect the scheduler. The "actions" + * performed are passed back through an EventHandler. + */ + public void editSchedule(); + + public long getMonitoringInterval(); + + public String getPolicyName(); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java new file mode 100644 index 00000000000..2e93a9e51c2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.monitor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; + +import com.google.common.annotations.VisibleForTesting; + +public class SchedulingMonitor extends AbstractService { + + private final SchedulingEditPolicy scheduleEditPolicy; + private static final Log LOG = LogFactory.getLog(SchedulingMonitor.class); + + //thread which runs periodically to see the last time since a heartbeat is + //received. + private Thread checkerThread; + private volatile boolean stopped; + private long monitorInterval; + + public SchedulingMonitor(SchedulingEditPolicy scheduleEditPolicy) { + super("SchedulingMonitor (" + scheduleEditPolicy.getPolicyName() + ")"); + this.scheduleEditPolicy = scheduleEditPolicy; + this.monitorInterval = scheduleEditPolicy.getMonitoringInterval(); + } + + public long getMonitorInterval() { + return monitorInterval; + } + + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { + assert !stopped : "starting when already stopped"; + checkerThread = new Thread(new PreemptionChecker()); + checkerThread.setName(getName()); + checkerThread.start(); + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + stopped = true; + if (checkerThread != null) { + checkerThread.interrupt(); + } + super.serviceStop(); + } + + @VisibleForTesting + public void invokePolicy(){ + scheduleEditPolicy.editSchedule(); + } + + private class PreemptionChecker implements Runnable { + @Override + public void run() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + //invoke the preemption policy at a regular pace + //the policy will generate preemption or kill events + //managed by the dispatcher + invokePolicy(); + try { + Thread.sleep(monitorInterval); + } catch (InterruptedException e) { + LOG.info(getName() + " thread interrupted"); + break; + } + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java new file mode 100644 index 00000000000..7ea73d9471e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -0,0 +1,669 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This class implement a {@link SchedulingEditPolicy} that is designed to be + * paired with the {@code CapacityScheduler}. At every invocation of {@code + * editSchedule()} it computes the ideal amount of resources assigned to each + * queue (for each queue in the hierarchy), and determines whether preemption + * is needed. Overcapacity is distributed among queues in a weighted fair manner, + * where the weight is the amount of guaranteed capacity for the queue. + * Based on this ideal assignment it determines whether preemption is required + * and select a set of containers from each application that would be killed if + * the corresponding amount of resources is not freed up by the application. + * + * If not in {@code observeOnly} mode, it triggers preemption requests via a + * {@link ContainerPreemptEvent} that the {@code ResourceManager} will ensure + * to deliver to the application (or to execute). + * + * If the deficit of resources is persistent over a long enough period of time + * this policy will trigger forced termination of containers (again by generating + * {@link ContainerPreemptEvent}). + */ +public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolicy { + + private static final Log LOG = + LogFactory.getLog(ProportionalCapacityPreemptionPolicy.class); + + /** If true, run the policy but do not affect the cluster with preemption and + * kill events. */ + public static final String OBSERVE_ONLY = + "yarn.resourcemanager.monitor.capacity.preemption.observe_only"; + /** Time in milliseconds between invocations of this policy */ + public static final String MONITORING_INTERVAL = + "yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval"; + /** Time in milliseconds between requesting a preemption from an application + * and killing the container. */ + public static final String WAIT_TIME_BEFORE_KILL = + "yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill"; + /** Maximum percentage of resources preempted in a single round. By + * controlling this value one can throttle the pace at which containers are + * reclaimed from the cluster. After computing the total desired preemption, + * the policy scales it back within this limit. */ + public static final String TOTAL_PREEMPTION_PER_ROUND = + "yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round"; + /** Maximum amount of resources above the target capacity ignored for + * preemption. This defines a deadzone around the target capacity that helps + * prevent thrashing and oscillations around the computed target balance. + * High values would slow the time to capacity and (absent natural + * completions) it might prevent convergence to guaranteed capacity. */ + public static final String MAX_IGNORED_OVER_CAPACITY = + "yarn.resourcemanager.monitor.capacity.preemption.max_ignored_over_capacity"; + /** + * Given a computed preemption target, account for containers naturally + * expiring and preempt only this percentage of the delta. This determines + * the rate of geometric convergence into the deadzone ({@link + * #MAX_IGNORED_OVER_CAPACITY}). For example, a termination factor of 0.5 + * will reclaim almost 95% of resources within 5 * {@link + * #WAIT_TIME_BEFORE_KILL}, even absent natural termination. */ + public static final String NATURAL_TERMINATION_FACTOR = + "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; + + //the dispatcher to send preempt and kill events + public EventHandler dispatcher; + + private final Clock clock; + private double maxIgnoredOverCapacity; + private long maxWaitTime; + private CapacityScheduler scheduler; + private long monitoringInterval; + private final Map preempted = + new HashMap(); + private ResourceCalculator rc; + private float percentageClusterPreemptionAllowed; + private double naturalTerminationFactor; + private boolean observeOnly; + + public ProportionalCapacityPreemptionPolicy() { + clock = new SystemClock(); + } + + public ProportionalCapacityPreemptionPolicy(Configuration config, + EventHandler dispatcher, + CapacityScheduler scheduler) { + this(config, dispatcher, scheduler, new SystemClock()); + } + + public ProportionalCapacityPreemptionPolicy(Configuration config, + EventHandler dispatcher, + CapacityScheduler scheduler, Clock clock) { + init(config, dispatcher, scheduler); + this.clock = clock; + } + + public void init(Configuration config, + EventHandler disp, + PreemptableResourceScheduler sched) { + LOG.info("Preemption monitor:" + this.getClass().getCanonicalName()); + assert null == scheduler : "Unexpected duplicate call to init"; + if (!(sched instanceof CapacityScheduler)) { + throw new YarnRuntimeException("Class " + + sched.getClass().getCanonicalName() + " not instance of " + + CapacityScheduler.class.getCanonicalName()); + } + dispatcher = disp; + scheduler = (CapacityScheduler) sched; + maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1); + naturalTerminationFactor = + config.getDouble(NATURAL_TERMINATION_FACTOR, 0.2); + maxWaitTime = config.getLong(WAIT_TIME_BEFORE_KILL, 15000); + monitoringInterval = config.getLong(MONITORING_INTERVAL, 3000); + percentageClusterPreemptionAllowed = + config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1); + observeOnly = config.getBoolean(OBSERVE_ONLY, false); + rc = scheduler.getResourceCalculator(); + } + + @Override + public void editSchedule(){ + CSQueue root = scheduler.getRootQueue(); + Resource clusterResources = + Resources.clone(scheduler.getClusterResources()); + containerBasedPreemptOrKill(root, clusterResources); + } + + /** + * This method selects and tracks containers to be preempted. If a container + * is in the target list for more than maxWaitTime it is killed. + * + * @param root the root of the CapacityScheduler queue hierarchy + * @param clusterResources the total amount of resources in the cluster + */ + private void containerBasedPreemptOrKill(CSQueue root, + Resource clusterResources) { + + // extract a summary of the queues from scheduler + TempQueue tRoot; + synchronized (scheduler) { + tRoot = cloneQueues(root, clusterResources); + } + + // compute the ideal distribution of resources among queues + // updates cloned queues state accordingly + tRoot.idealAssigned = tRoot.guaranteed; + Resource totalPreemptionAllowed = Resources.multiply(clusterResources, + percentageClusterPreemptionAllowed); + List queues = + recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed); + + // based on ideal allocation select containers to be preempted from each + // queue and each application + Map> toPreempt = + getContainersToPreempt(queues, clusterResources); + + logToCSV(queues); + + // if we are in observeOnly mode return before any action is taken + if (observeOnly) { + return; + } + + // preempt (or kill) the selected containers + for (Map.Entry> e + : toPreempt.entrySet()) { + for (RMContainer container : e.getValue()) { + // if we tried to preempt this for more than maxWaitTime + if (preempted.get(container) != null && + preempted.get(container) + maxWaitTime < clock.getTime()) { + // kill it + dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container, + ContainerPreemptEventType.KILL_CONTAINER)); + preempted.remove(container); + } else { + //otherwise just send preemption events + dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container, + ContainerPreemptEventType.PREEMPT_CONTAINER)); + if (preempted.get(container) == null) { + preempted.put(container, clock.getTime()); + } + } + } + } + + // Keep the preempted list clean + for (Iterator i = preempted.keySet().iterator(); i.hasNext();){ + RMContainer id = i.next(); + // garbage collect containers that are irrelevant for preemption + if (preempted.get(id) + 2 * maxWaitTime < clock.getTime()) { + i.remove(); + } + } + } + + /** + * This method recursively computes the ideal assignment of resources to each + * level of the hierarchy. This ensures that leafs that are over-capacity but + * with parents within capacity will not be preempted. Preemptions are allowed + * within each subtree according to local over/under capacity. + * + * @param root the root of the cloned queue hierachy + * @param totalPreemptionAllowed maximum amount of preemption allowed + * @return a list of leaf queues updated with preemption targets + */ + private List recursivelyComputeIdealAssignment( + TempQueue root, Resource totalPreemptionAllowed) { + List leafs = new ArrayList(); + if (root.getChildren() != null && + root.getChildren().size() > 0) { + // compute ideal distribution at this level + computeIdealResourceDistribution(rc, root.getChildren(), + totalPreemptionAllowed, root.idealAssigned); + // compute recursively for lower levels and build list of leafs + for(TempQueue t : root.getChildren()) { + leafs.addAll(recursivelyComputeIdealAssignment(t, totalPreemptionAllowed)); + } + } else { + // we are in a leaf nothing to do, just return yourself + return Collections.singletonList(root); + } + return leafs; + } + + /** + * This method computes (for a single level in the tree, passed as a {@code + * List}) the ideal assignment of resources. This is done + * recursively to allocate capacity fairly across all queues with pending + * demands. It terminates when no resources are left to assign, or when all + * demand is satisfied. + * + * @param rc resource calculator + * @param queues a list of cloned queues to be assigned capacity to (this is + * an out param) + * @param totalPreemptionAllowed total amount of preemption we allow + * @param tot_guarant the amount of capacity assigned to this pool of queues + */ + private void computeIdealResourceDistribution(ResourceCalculator rc, + List queues, Resource totalPreemptionAllowed, Resource tot_guarant) { + + // qAlloc tracks currently active queues (will decrease progressively as + // demand is met) + List qAlloc = new ArrayList(queues); + // unassigned tracks how much resources are still to assign, initialized + // with the total capacity for this set of queues + Resource unassigned = Resources.clone(tot_guarant); + + //assign all cluster resources until no more demand, or no resources are left + while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant, + unassigned, Resources.none())) { + Resource wQassigned = Resource.newInstance(0, 0); + + // we compute normalizedGuarantees capacity based on currently active + // queues + resetCapacity(rc, unassigned, qAlloc); + + // offer for each queue their capacity first and in following invocations + // their share of over-capacity + for (Iterator i = qAlloc.iterator(); i.hasNext();) { + TempQueue sub = i.next(); + Resource wQavail = + Resources.multiply(unassigned, sub.normalizedGuarantee); + Resource wQidle = sub.offer(wQavail, rc, tot_guarant); + Resource wQdone = Resources.subtract(wQavail, wQidle); + // if the queue returned a value > 0 it means it is fully satisfied + // and it is removed from the list of active queues qAlloc + if (!Resources.greaterThan(rc, tot_guarant, + wQdone, Resources.none())) { + i.remove(); + } + Resources.addTo(wQassigned, wQdone); + } + Resources.subtractFrom(unassigned, wQassigned); + } + + // based on ideal assignment computed above and current assignment we derive + // how much preemption is required overall + Resource totPreemptionNeeded = Resource.newInstance(0, 0); + for (TempQueue t:queues) { + if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) { + Resources.addTo(totPreemptionNeeded, + Resources.subtract(t.current, t.idealAssigned)); + } + } + + // if we need to preempt more than is allowed, compute a factor (0 queues) { + Resource activeCap = Resource.newInstance(0, 0); + for (TempQueue q : queues) { + Resources.addTo(activeCap, q.guaranteed); + } + for (TempQueue q : queues) { + q.normalizedGuarantee = Resources.divide(rc, clusterResource, + q.guaranteed, activeCap); + } + } + + /** + * Based a resource preemption target drop reservations of containers and + * if necessary select containers for preemption from applications in each + * over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to + * account for containers that will naturally complete. + * + * @param queues set of leaf queues to preempt from + * @param clusterResource total amount of cluster resources + * @return a map of applciationID to set of containers to preempt + */ + private Map> getContainersToPreempt( + List queues, Resource clusterResource) { + + Map> list = + new HashMap>(); + + for (TempQueue qT : queues) { + // we act only if we are violating balance by more than + // maxIgnoredOverCapacity + if (Resources.greaterThan(rc, clusterResource, qT.current, + Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) { + // we introduce a dampening factor naturalTerminationFactor that + // accounts for natural termination of containers + Resource resToObtain = + Resources.multiply(qT.toBePreempted, naturalTerminationFactor); + + // lock the leafqueue while we scan applications and unreserve + synchronized(qT.leafQueue) { + NavigableSet ns = + (NavigableSet) qT.leafQueue.getApplications(); + Iterator desc = ns.descendingIterator(); + qT.actuallyPreempted = Resources.clone(resToObtain); + while (desc.hasNext()) { + FiCaSchedulerApp fc = desc.next(); + if (Resources.lessThanOrEqual(rc, clusterResource, + resToObtain, Resources.none())) { + break; + } + list.put(fc.getApplicationAttemptId(), + preemptFrom(fc, clusterResource, resToObtain)); + } + } + } + } + return list; + } + + /** + * Given a target preemption for a specific application, select containers + * to preempt (after unreserving all reservation for that app). + * + * @param app + * @param clusterResource + * @param rsrcPreempt + * @return + */ + private Set preemptFrom( + FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt) { + Set ret = new HashSet(); + ApplicationAttemptId appId = app.getApplicationAttemptId(); + + // first drop reserved containers towards rsrcPreempt + List reservations = + new ArrayList(app.getReservedContainers()); + for (RMContainer c : reservations) { + if (Resources.lessThanOrEqual(rc, clusterResource, + rsrcPreempt, Resources.none())) { + return ret; + } + if (!observeOnly) { + dispatcher.handle(new ContainerPreemptEvent(appId, c, + ContainerPreemptEventType.DROP_RESERVATION)); + } + Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); + } + + // if more resources are to be freed go through all live containers in + // reverse priority and reverse allocation order and mark them for + // preemption + List containers = + new ArrayList(app.getLiveContainers()); + + sortContainers(containers); + + for (RMContainer c : containers) { + if (Resources.lessThanOrEqual(rc, clusterResource, + rsrcPreempt, Resources.none())) { + return ret; + } + ret.add(c); + Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); + } + + return ret; + } + + /** + * Compare by reversed priority order first, and then reversed containerId + * order + * @param containers + */ + @VisibleForTesting + static void sortContainers(List containers){ + Collections.sort(containers, new Comparator() { + @Override + public int compare(RMContainer a, RMContainer b) { + Comparator c = new org.apache.hadoop.yarn.server + .resourcemanager.resource.Priority.Comparator(); + int priorityComp = c.compare(b.getContainer().getPriority(), + a.getContainer().getPriority()); + if (priorityComp != 0) { + return priorityComp; + } + return b.getContainerId().getId() - + a.getContainerId().getId(); + } + }); + } + + @Override + public long getMonitoringInterval() { + return monitoringInterval; + } + + @Override + public String getPolicyName() { + return "ProportionalCapacityPreemptionPolicy"; + } + + + /** + * This method walks a tree of CSQueue and clones the portion of the state + * relevant for preemption in TempQueue(s). It also maintains a pointer to + * the leaves. Finally it aggregates pending resources in each queue and rolls + * it up to higher levels. + * + * @param root the root of the CapacityScheduler queue hierarchy + * @param clusterResources the total amount of resources in the cluster + * @return the root of the cloned queue hierarchy + */ + private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { + TempQueue ret; + synchronized (root) { + float absUsed = root.getAbsoluteUsedCapacity(); + Resource current = Resources.multiply(clusterResources, absUsed); + Resource guaranteed = + Resources.multiply(clusterResources, root.getAbsoluteCapacity()); + if (root instanceof LeafQueue) { + LeafQueue l = (LeafQueue) root; + Resource pending = l.getTotalResourcePending(); + ret = new TempQueue(root.getQueueName(), current, pending, guaranteed); + ret.setLeafQueue(l); + } else { + Resource pending = Resource.newInstance(0, 0); + ret = new TempQueue(root.getQueueName(), current, pending, guaranteed); + for (CSQueue c : root.getChildQueues()) { + ret.addChild(cloneQueues(c, clusterResources)); + } + } + } + return ret; + } + + // simple printout function that reports internal queue state (useful for + // plotting) + private void logToCSV(List unorderedqueues){ + List queues = new ArrayList(unorderedqueues); + Collections.sort(queues, new Comparator(){ + @Override + public int compare(TempQueue o1, TempQueue o2) { + return o1.queueName.compareTo(o2.queueName); + }}); + String queueState = " QUEUESTATE: " + clock.getTime(); + StringBuilder sb = new StringBuilder(); + sb.append(queueState); + for (TempQueue tq : queues) { + sb.append(", "); + tq.appendLogString(sb); + } + LOG.info(sb.toString()); + } + + /** + * Temporary data-structure tracking resource availability, pending resource + * need, current utilization. Used to clone {@link CSQueue}. + */ + static class TempQueue { + final String queueName; + final Resource current; + final Resource pending; + final Resource guaranteed; + Resource idealAssigned; + Resource toBePreempted; + Resource actuallyPreempted; + + double normalizedGuarantee; + + final ArrayList children; + LeafQueue leafQueue; + + TempQueue(String queueName, Resource current, Resource pending, + Resource guaranteed) { + this.queueName = queueName; + this.current = current; + this.pending = pending; + this.guaranteed = guaranteed; + this.idealAssigned = Resource.newInstance(0, 0); + this.actuallyPreempted = Resource.newInstance(0, 0); + this.toBePreempted = Resource.newInstance(0, 0); + this.normalizedGuarantee = Float.NaN; + this.children = new ArrayList(); + } + + public void setLeafQueue(LeafQueue l){ + assert children.size() == 0; + this.leafQueue = l; + } + + /** + * When adding a child we also aggregate its pending resource needs. + * @param q the child queue to add to this queue + */ + public void addChild(TempQueue q) { + assert leafQueue == null; + children.add(q); + Resources.addTo(pending, q.pending); + } + + public void addChildren(ArrayList queues) { + assert leafQueue == null; + children.addAll(queues); + } + + + public ArrayList getChildren(){ + return children; + } + + // This function "accepts" all the resources it can (pending) and return + // the unused ones + Resource offer(Resource avail, ResourceCalculator rc, + Resource clusterResource) { + // remain = avail - min(avail, current + pending - assigned) + Resource accepted = Resources.min(rc, clusterResource, + avail, + Resources.subtract( + Resources.add(current, pending), + idealAssigned)); + Resource remain = Resources.subtract(avail, accepted); + Resources.addTo(idealAssigned, accepted); + return remain; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("CUR: ").append(current) + .append(" PEN: ").append(pending) + .append(" GAR: ").append(guaranteed) + .append(" NORM: ").append(normalizedGuarantee) + .append(" IDEAL_ASSIGNED: ").append(idealAssigned) + .append(" IDEAL_PREEMPT: ").append(toBePreempted) + .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted); + + return sb.toString(); + } + public void assignPreemption(float scalingFactor, + ResourceCalculator rc, Resource clusterResource) { + if (Resources.greaterThan(rc, clusterResource, current, idealAssigned)) { + toBePreempted = Resources.multiply( + Resources.subtract(current, idealAssigned), scalingFactor); + } else { + toBePreempted = Resource.newInstance(0, 0); + } + } + + void appendLogString(StringBuilder sb) { + sb.append(queueName).append(", ") + .append(current.getMemory()).append(", ") + .append(current.getVirtualCores()).append(", ") + .append(pending.getMemory()).append(", ") + .append(pending.getVirtualCores()).append(", ") + .append(guaranteed.getMemory()).append(", ") + .append(guaranteed.getVirtualCores()).append(", ") + .append(idealAssigned.getMemory()).append(", ") + .append(idealAssigned.getVirtualCores()).append(", ") + .append(toBePreempted.getMemory()).append(", ") + .append(toBePreempted.getVirtualCores() ).append(", ") + .append(actuallyPreempted.getMemory()).append(", ") + .append(actuallyPreempted.getVirtualCores()); + } + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index e89c8feed0c..1ff00be4ce6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -205,6 +206,14 @@ public class AppSchedulingInfo { return requests.get(priority); } + synchronized public List getAllResourceRequests() { + List ret = new ArrayList(); + for (Map r : requests.values()) { + ret.addAll(r.values()); + } + return ret; + } + synchronized public ResourceRequest getResourceRequest(Priority priority, String resourceName) { Map nodeRequests = requests.get(priority); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java new file mode 100644 index 00000000000..8eba48d0120 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java @@ -0,0 +1,57 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +/** + * Simple event class used to communicate containers unreservations, preemption, killing + */ +public class ContainerPreemptEvent + extends AbstractEvent { + + private final ApplicationAttemptId aid; + private final RMContainer container; + + public ContainerPreemptEvent(ApplicationAttemptId aid, RMContainer container, + ContainerPreemptEventType type) { + super(type); + this.aid = aid; + this.container = container; + } + + public RMContainer getContainer(){ + return this.container; + } + + public ApplicationAttemptId getAppId() { + return aid; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(super.toString()); + sb.append(" ").append(getAppId()); + sb.append(" ").append(getContainer().getContainerId()); + return sb.toString(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java new file mode 100644 index 00000000000..a70a8367672 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +public enum ContainerPreemptEventType { + + DROP_RESERVATION, + PREEMPT_CONTAINER, + KILL_CONTAINER + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java new file mode 100644 index 00000000000..c89696d1530 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +/** + * Interface for a scheduler that supports preemption/killing + * + */ +public interface PreemptableResourceScheduler extends ResourceScheduler { + + /** + * If the scheduler support container reservations, this method is used to + * ask the scheduler to drop the reservation for the given container. + * @param container Reference to reserved container allocation. + */ + void dropContainerReservation(RMContainer container); + + /** + * Ask the scheduler to obtain back the container from a specific application + * by issuing a preemption request + * @param aid the application from which we want to get a container back + * @param container the container we want back + */ + void preemptContainer(ApplicationAttemptId aid, RMContainer container); + + /** + * Ask the scheduler to forcibly interrupt the container given as input + * @param container + */ + void killContainer(RMContainer container); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 0fb20713868..57ffc0c396f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -61,8 +61,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -83,8 +83,9 @@ import org.apache.hadoop.yarn.util.resource.Resources; @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") -public class CapacityScheduler -implements ResourceScheduler, CapacitySchedulerContext, Configurable { +public class CapacityScheduler + implements PreemptableResourceScheduler, CapacitySchedulerContext, + Configurable { private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); @@ -525,8 +526,8 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable { // Sanity check SchedulerUtils.normalizeRequests( - ask, calculator, getClusterResources(), minimumAllocation, - maximumAllocation); + ask, getResourceCalculator(), getClusterResources(), + getMinimumResourceCapability(), maximumAllocation); // Release containers for (ContainerId releasedContainerId : release) { @@ -578,9 +579,8 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable { " #ask=" + ask.size()); } - return new Allocation( - application.pullNewlyAllocatedContainers(), - application.getHeadroom()); + return application.getAllocation(getResourceCalculator(), + clusterResource, getMinimumResourceCapability()); } } @@ -812,7 +812,8 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable { Container container = rmContainer.getContainer(); // Get the application for the finished container - ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId(); + ApplicationAttemptId applicationAttemptId = + container.getId().getApplicationAttemptId(); FiCaSchedulerApp application = getApplication(applicationAttemptId); if (application == null) { LOG.info("Container " + container + " of" + @@ -869,5 +870,41 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable { FiCaSchedulerNode node = getNode(nodeId); return node == null ? null : new SchedulerNodeReport(node); } - + + @Override + public void dropContainerReservation(RMContainer container) { + if(LOG.isDebugEnabled()){ + LOG.debug("DROP_RESERVATION:" + container.toString()); + } + completedContainer(container, + SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), + SchedulerUtils.UNRESERVED_CONTAINER), + RMContainerEventType.KILL); + } + + @Override + public void preemptContainer(ApplicationAttemptId aid, RMContainer cont) { + if(LOG.isDebugEnabled()){ + LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() + + " container: " + cont.toString()); + } + FiCaSchedulerApp app = applications.get(aid); + if (app != null) { + app.addPreemptContainer(cont.getContainerId()); + } + } + + @Override + public void killContainer(RMContainer cont) { + if(LOG.isDebugEnabled()){ + LOG.debug("KILL_CONTAINER: container" + cont.toString()); + } + completedContainer(cont, + SchedulerUtils.createAbnormalContainerStatus( + cont.getContainerId(),"Container being forcibly preempted:" + + cont.getContainerId()), + RMContainerEventType.KILL); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index dbfa7444183..3d9ac4f3ce3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1390,17 +1390,19 @@ public class LeafQueue implements CSQueue { node.reserveResource(application, priority, rmContainer); } - private void unreserve(FiCaSchedulerApp application, Priority priority, + private boolean unreserve(FiCaSchedulerApp application, Priority priority, FiCaSchedulerNode node, RMContainer rmContainer) { // Done with the reservation? - application.unreserve(node, priority); - node.unreserveResource(application); - - // Update reserved metrics - getMetrics().unreserveResource( - application.getUser(), rmContainer.getContainer().getResource()); - } + if (application.unreserve(node, priority)) { + node.unreserveResource(application); + // Update reserved metrics + getMetrics().unreserveResource( + application.getUser(), rmContainer.getContainer().getResource()); + return true; + } + return false; + } @Override public void completedContainer(Resource clusterResource, @@ -1411,37 +1413,40 @@ public class LeafQueue implements CSQueue { synchronized (this) { Container container = rmContainer.getContainer(); - + + boolean removed = false; // Inform the application & the node // Note: It's safe to assume that all state changes to RMContainer // happen under scheduler's lock... // So, this is, in effect, a transaction across application & node if (rmContainer.getState() == RMContainerState.RESERVED) { - unreserve(application, rmContainer.getReservedPriority(), + removed = unreserve(application, rmContainer.getReservedPriority(), node, rmContainer); } else { - application.containerCompleted(rmContainer, containerStatus, event); + removed = + application.containerCompleted(rmContainer, containerStatus, event); node.releaseContainer(container); } - // Book-keeping - releaseResource(clusterResource, - application, container.getResource()); - - LOG.info("completedContainer" + - " container=" + container + - " resource=" + container.getResource() + - " queue=" + this + - " usedCapacity=" + getUsedCapacity() + - " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + usedResources + - " cluster=" + clusterResource); + if (removed) { + releaseResource(clusterResource, + application, container.getResource()); + LOG.info("completedContainer" + + " container=" + container + + " resource=" + container.getResource() + + " queue=" + this + + " usedCapacity=" + getUsedCapacity() + + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + + " used=" + usedResources + + " cluster=" + clusterResource); + // Inform the parent queue + getParent().completedContainer(clusterResource, application, + node, rmContainer, null, event); + } } - // Inform the parent queue - getParent().completedContainer(clusterResource, application, - node, rmContainer, null, event); + } } @@ -1588,5 +1593,19 @@ public class LeafQueue implements CSQueue { getParent().recoverContainer(clusterResource, application, container); } - + + // need to access the list of apps from the preemption monitor + public Set getApplications() { + return Collections.unmodifiableSet(activeApplications); + } + + // return a single Resource capturing the overal amount of pending resources + public Resource getTotalResourcePending() { + Resource ret = BuilderUtils.newResource(0, 0); + for (FiCaSchedulerApp f : activeApplications) { + Resources.addTo(ret, f.getTotalPendingRequests()); + } + return ret; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 8e2020abc79..a261dbfd5a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -20,9 +20,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; @@ -53,11 +55,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; @@ -85,17 +89,19 @@ public class FiCaSchedulerApp extends SchedulerApplication { private Resource resourceLimit = recordFactory .newRecordInstance(Resource.class); - private Map liveContainers - = new HashMap(); - private List newlyAllocatedContainers = - new ArrayList(); + private Map liveContainers = + new HashMap(); + private List newlyAllocatedContainers = + new ArrayList(); final Map> reservedContainers = new HashMap>(); private boolean isStopped = false; - + private final Set containersToPreempt = + new HashSet(); + /** * Count how many times the application has been given an opportunity * to schedule a task at each priority. Each time the scheduler @@ -219,12 +225,17 @@ public class FiCaSchedulerApp extends SchedulerApplication { RMContainerEventType.LAUNCHED)); } - synchronized public void containerCompleted(RMContainer rmContainer, + synchronized public boolean containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { - + + // Remove from the list of containers + if (null == liveContainers.remove(rmContainer.getContainerId())) { + return false; + } + Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); - + // Inform the container rmContainer.handle( new RMContainerFinishedEvent( @@ -234,9 +245,8 @@ public class FiCaSchedulerApp extends SchedulerApplication { ); LOG.info("Completed container: " + rmContainer.getContainerId() + " in state: " + rmContainer.getState() + " event:" + event); - - // Remove from the list of containers - liveContainers.remove(rmContainer.getContainerId()); + + containersToPreempt.remove(rmContainer.getContainerId()); RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER, "SchedulerApp", @@ -246,6 +256,8 @@ public class FiCaSchedulerApp extends SchedulerApplication { Resource containerResource = rmContainer.getContainer().getResource(); queue.getMetrics().releaseResources(getUser(), 1, containerResource); Resources.subtractFrom(currentConsumption, containerResource); + + return true; } synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, @@ -345,7 +357,8 @@ public class FiCaSchedulerApp extends SchedulerApplication { } /** - * Return the number of times the application has been given an opportunity + * @param priority Target priority + * @return the number of times the application has been given an opportunity * to schedule a task at the given priority since the last time it * successfully did so. */ @@ -419,33 +432,36 @@ public class FiCaSchedulerApp extends SchedulerApplication { return rmContainer; } - public synchronized void unreserve(FiCaSchedulerNode node, Priority priority) { - Map reservedContainers = - this.reservedContainers.get(priority); - RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); - if (reservedContainers.isEmpty()) { - this.reservedContainers.remove(priority); - } - - // reservedContainer should not be null here - if (reservedContainer == null) { - String errorMesssage = - "Application " + getApplicationId() + " is trying to unreserve " - + " on node " + node + ", currently has " - + reservedContainers.size() + " at priority " + priority - + "; currentReservation " + currentReservation; - LOG.warn(errorMesssage); - throw new YarnRuntimeException(errorMesssage); - } - // Reset the re-reservation count - resetReReservations(priority); + public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) { + Map reservedContainers = + this.reservedContainers.get(priority); - Resource resource = reservedContainer.getContainer().getResource(); - Resources.subtractFrom(currentReservation, resource); + if (reservedContainers != null) { + RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); - LOG.info("Application " + getApplicationId() + " unreserved " + " on node " - + node + ", currently has " + reservedContainers.size() + " at priority " - + priority + "; currentReservation " + currentReservation); + // unreserve is now triggered in new scenarios (preemption) + // as a consequence reservedcontainer might be null, adding NP-checks + if (reservedContainer != null + && reservedContainer.getContainer() != null + && reservedContainer.getContainer().getResource() != null) { + + if (reservedContainers.isEmpty()) { + this.reservedContainers.remove(priority); + } + // Reset the re-reservation count + resetReReservations(priority); + + Resource resource = reservedContainer.getContainer().getResource(); + Resources.subtractFrom(currentReservation, resource); + + LOG.info("Application " + getApplicationId() + " unreserved " + + " on node " + node + ", currently has " + reservedContainers.size() + + " at priority " + priority + "; currentReservation " + + currentReservation); + return true; + } + } + return false; } /** @@ -509,4 +525,55 @@ public class FiCaSchedulerApp extends SchedulerApplication { public Queue getQueue() { return queue; } + + public Resource getTotalPendingRequests() { + Resource ret = Resource.newInstance(0, 0); + for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) { + // to avoid double counting we count only "ANY" resource requests + if (ResourceRequest.isAnyLocation(rr.getResourceName())){ + Resources.addTo(ret, + Resources.multiply(rr.getCapability(), rr.getNumContainers())); + } + } + return ret; + } + + public synchronized void addPreemptContainer(ContainerId cont){ + // ignore already completed containers + if (liveContainers.containsKey(cont)) { + containersToPreempt.add(cont); + } + } + + /** + * This method produces an Allocation that includes the current view + * of the resources that will be allocated to and preempted from this + * application. + * + * @param rc + * @param clusterResource + * @param minimumAllocation + * @return an allocation + */ + public synchronized Allocation getAllocation(ResourceCalculator rc, + Resource clusterResource, Resource minimumAllocation) { + + Set currentContPreemption = Collections.unmodifiableSet( + new HashSet(containersToPreempt)); + containersToPreempt.clear(); + Resource tot = Resource.newInstance(0, 0); + for(ContainerId c : currentContPreemption){ + Resources.addTo(tot, + liveContainers.get(c).getContainer().getResource()); + } + int numCont = (int) Math.ceil( + Resources.divide(rc, clusterResource, tot, minimumAllocation)); + ResourceRequest rr = ResourceRequest.newInstance( + Priority.UNDEFINED, ResourceRequest.ANY, + minimumAllocation, numCont); + return new Allocation(pullNewlyAllocatedContainers(), getHeadroom(), + null, currentContPreemption, + Collections.singletonList(rr)); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index e61ad729c81..bb9ba92e0ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -142,8 +142,9 @@ public class FiCaSchedulerNode extends SchedulerNode { } /* remove the containers from the nodemanger */ - launchedContainers.remove(container.getId()); - updateResource(container); + if (null != launchedContainers.remove(container.getId())) { + updateResource(container); + } LOG.info("Released container " + container.getId() + " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + @@ -226,18 +227,25 @@ public class FiCaSchedulerNode extends SchedulerNode { public synchronized void unreserveResource( SchedulerApplication application) { - // Cannot unreserve for wrong application... - ApplicationAttemptId reservedApplication = - reservedContainer.getContainer().getId().getApplicationAttemptId(); - if (!reservedApplication.equals( - application.getApplicationAttemptId())) { - throw new IllegalStateException("Trying to unreserve " + - " for application " + application.getApplicationAttemptId() + - " when currently reserved " + - " for application " + reservedApplication.getApplicationId() + - " on node " + this); - } + // adding NP checks as this can now be called for preemption + if (reservedContainer != null + && reservedContainer.getContainer() != null + && reservedContainer.getContainer().getId() != null + && reservedContainer.getContainer().getId().getApplicationAttemptId() != null) { + + // Cannot unreserve for wrong application... + ApplicationAttemptId reservedApplication = + reservedContainer.getContainer().getId().getApplicationAttemptId(); + if (!reservedApplication.equals( + application.getApplicationAttemptId())) { + throw new IllegalStateException("Trying to unreserve " + + " for application " + application.getApplicationAttemptId() + + " when currently reserved " + + " for application " + reservedApplication.getApplicationId() + + " on node " + this); + } + } reservedContainer = null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java new file mode 100644 index 00000000000..713962b2b3d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -0,0 +1,541 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.NavigableSet; +import java.util.Random; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; + +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class TestProportionalCapacityPreemptionPolicy { + + static final long TS = 3141592653L; + + int appAlloc = 0; + Random rand = null; + Clock mClock = null; + Configuration conf = null; + CapacityScheduler mCS = null; + EventHandler mDisp = null; + ResourceCalculator rc = new DefaultResourceCalculator(); + final ApplicationAttemptId appA = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(TS, 0), 0); + final ApplicationAttemptId appB = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(TS, 1), 0); + final ApplicationAttemptId appC = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(TS, 2), 0); + final ApplicationAttemptId appD = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(TS, 3), 0); + final ApplicationAttemptId appE = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(TS, 4), 0); + final ArgumentCaptor evtCaptor = + ArgumentCaptor.forClass(ContainerPreemptEvent.class); + + @Rule public TestName name = new TestName(); + + @Before + @SuppressWarnings("unchecked") + public void setup() { + conf = new Configuration(false); + conf.setLong(WAIT_TIME_BEFORE_KILL, 10000); + conf.setLong(MONITORING_INTERVAL, 3000); + // report "ideal" preempt + conf.setFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 1.0); + conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 1.0); + + mClock = mock(Clock.class); + mCS = mock(CapacityScheduler.class); + when(mCS.getResourceCalculator()).thenReturn(rc); + mDisp = mock(EventHandler.class); + rand = new Random(); + long seed = rand.nextLong(); + System.out.println(name.getMethodName() + " SEED: " + seed); + rand.setSeed(seed); + appAlloc = 0; + } + + @Test + public void testIgnore() { + int[][] qData = new int[][]{ + // / A B C + { 100, 40, 40, 20 }, // abs + { 100, 0, 60, 40 }, // used + { 0, 0, 0, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + { 3, 1, 1, 1 }, // apps + { -1, 1, 1, 1 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // don't correct imbalances without demand + verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class)); + } + + @Test + public void testProportionalPreemption() { + int[][] qData = new int[][]{ + // / A B C D + { 100, 10, 40, 20, 30 }, // abs + { 100, 30, 60, 10, 0 }, // used + { 45, 20, 5, 20, 0 }, // pending + { 0, 0, 0, 0, 0 }, // reserved + { 3, 1, 1, 1, 0 }, // apps + { -1, 1, 1, 1, 1 }, // req granularity + { 4, 0, 0, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA))); + } + + @Test + public void testPreemptCycle() { + int[][] qData = new int[][]{ + // / A B C + { 100, 40, 40, 20 }, // abs + { 100, 0, 60, 40 }, // used + { 10, 10, 0, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + { 3, 1, 1, 1 }, // apps + { -1, 1, 1, 1 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // ensure all pending rsrc from A get preempted from other queues + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + } + + @Test + public void testExpireKill() { + final long killTime = 10000L; + int[][] qData = new int[][]{ + // / A B C + { 100, 40, 40, 20 }, // abs + { 100, 0, 60, 40 }, // used + { 10, 10, 0, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + { 3, 1, 1, 1 }, // apps + { -1, 1, 1, 1 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + conf.setLong(WAIT_TIME_BEFORE_KILL, killTime); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + + // ensure all pending rsrc from A get preempted from other queues + when(mClock.getTime()).thenReturn(0L); + policy.editSchedule(); + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // requests reiterated + when(mClock.getTime()).thenReturn(killTime / 2); + policy.editSchedule(); + verify(mDisp, times(20)).handle(argThat(new IsPreemptionRequestFor(appC))); + + // kill req sent + when(mClock.getTime()).thenReturn(killTime + 1); + policy.editSchedule(); + verify(mDisp, times(30)).handle(evtCaptor.capture()); + List events = evtCaptor.getAllValues(); + for (ContainerPreemptEvent e : events.subList(20, 30)) { + assertEquals(appC, e.getAppId()); + assertEquals(KILL_CONTAINER, e.getType()); + } + } + + @Test + public void testDeadzone() { + int[][] qData = new int[][]{ + // / A B C + { 100, 40, 40, 20 }, // abs + { 100, 39, 43, 21 }, // used + { 10, 10, 0, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + { 3, 1, 1, 1 }, // apps + { -1, 1, 1, 1 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + conf.setFloat(MAX_IGNORED_OVER_CAPACITY, (float) 0.1); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // ignore 10% overcapacity to avoid jitter + verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class)); + } + + @Test + public void testOverCapacityImbalance() { + int[][] qData = new int[][]{ + // / A B C + { 100, 40, 40, 20 }, // abs + { 100, 55, 45, 0 }, // used + { 20, 10, 10, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + { 2, 1, 1, 0 }, // apps + { -1, 1, 1, 0 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // correct imbalance between over-capacity queues + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA))); + } + + @Test + public void testNaturalTermination() { + int[][] qData = new int[][]{ + // / A B C + { 100, 40, 40, 20 }, // abs + { 100, 55, 45, 0 }, // used + { 20, 10, 10, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + { 2, 1, 1, 0 }, // apps + { -1, 1, 1, 0 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 0.1); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // ignore 10% imbalance between over-capacity queues + verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class)); + } + + @Test + public void testObserveOnly() { + int[][] qData = new int[][]{ + // / A B C + { 100, 40, 40, 20 }, // abs + { 100, 90, 10, 0 }, // used + { 80, 10, 20, 50 }, // pending + { 0, 0, 0, 0 }, // reserved + { 2, 1, 1, 0 }, // apps + { -1, 1, 1, 0 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + conf.setBoolean(OBSERVE_ONLY, true); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // verify even severe imbalance not affected + verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class)); + } + + @Test + public void testHierarchical() { + int[][] qData = new int[][] { + // / A B C D E F + { 200, 100, 50, 50, 100, 10, 90 }, // abs + { 200, 110, 60, 50, 90, 90, 0 }, // used + { 10, 0, 0, 0, 10, 0, 10 }, // pending + { 0, 0, 0, 0, 0, 0, 0 }, // reserved + { 4, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, 1, -1, 1, 1 }, // req granularity + { 2, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // verify capacity taken from A1, not B1 despite B1 being far over + // its absolute guaranteed capacity + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA))); + } + + @Test + public void testHierarchicalLarge() { + int[][] qData = new int[][] { + // / A B C D E F G H I + { 400, 200, 60,140, 100, 70, 30, 100, 10, 90 }, // abs + { 400, 210, 70,140, 100, 50, 50, 90, 90, 0 }, // used + { 10, 0, 0, 0, 0, 0, 0, 0, 0, 15 }, // pending + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved + { 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity + { 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // verify capacity taken from A1, not H1 despite H1 being far over + // its absolute guaranteed capacity + + // XXX note: compensating for rounding error in Resources.multiplyTo + // which is likely triggered since we use small numbers for readability + verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appE))); + } + + @Test + public void testContainerOrdering(){ + + List containers = new ArrayList(); + + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(TS, 10), 0); + + // create a set of containers + RMContainer rm1 = mockContainer(appAttId, 5, mock(Resource.class), 3); + RMContainer rm2 = mockContainer(appAttId, 3, mock(Resource.class), 3); + RMContainer rm3 = mockContainer(appAttId, 2, mock(Resource.class), 2); + RMContainer rm4 = mockContainer(appAttId, 1, mock(Resource.class), 2); + RMContainer rm5 = mockContainer(appAttId, 4, mock(Resource.class), 1); + + // insert them in non-sorted order + containers.add(rm3); + containers.add(rm2); + containers.add(rm1); + containers.add(rm5); + containers.add(rm4); + + // sort them + ProportionalCapacityPreemptionPolicy.sortContainers(containers); + + // verify the "priority"-first, "reverse container-id"-second + // ordering is enforced correctly + assert containers.get(0).equals(rm1); + assert containers.get(1).equals(rm2); + assert containers.get(2).equals(rm3); + assert containers.get(3).equals(rm4); + assert containers.get(4).equals(rm5); + + } + + static class IsPreemptionRequestFor + extends ArgumentMatcher { + private final ApplicationAttemptId appAttId; + private final ContainerPreemptEventType type; + IsPreemptionRequestFor(ApplicationAttemptId appAttId) { + this(appAttId, PREEMPT_CONTAINER); + } + IsPreemptionRequestFor(ApplicationAttemptId appAttId, + ContainerPreemptEventType type) { + this.appAttId = appAttId; + this.type = type; + } + @Override + public boolean matches(Object o) { + return appAttId.equals(((ContainerPreemptEvent)o).getAppId()) + && type.equals(((ContainerPreemptEvent)o).getType()); + } + @Override + public String toString() { + return appAttId.toString(); + } + } + + ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) { + ProportionalCapacityPreemptionPolicy policy = + new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock); + ParentQueue mRoot = buildMockRootQueue(rand, qData); + when(mCS.getRootQueue()).thenReturn(mRoot); + + Resource clusterResources = + Resource.newInstance(leafAbsCapacities(qData[0], qData[6]), 0); + when(mCS.getClusterResources()).thenReturn(clusterResources); + return policy; + } + + ParentQueue buildMockRootQueue(Random r, int[]... queueData) { + int[] abs = queueData[0]; + int[] used = queueData[1]; + int[] pending = queueData[2]; + int[] reserved = queueData[3]; + int[] apps = queueData[4]; + int[] gran = queueData[5]; + int[] queues = queueData[6]; + + return mockNested(abs, used, pending, reserved, apps, gran, queues); + } + + ParentQueue mockNested(int[] abs, int[] used, + int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) { + float tot = leafAbsCapacities(abs, queues); + Deque pqs = new LinkedList(); + ParentQueue root = mockParentQueue(null, queues[0], pqs); + when(root.getQueueName()).thenReturn("/"); + when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot); + when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot); + for (int i = 1; i < queues.length; ++i) { + final CSQueue q; + final ParentQueue p = pqs.removeLast(); + final String queueName = "queue" + ((char)('A' + i - 1)); + if (queues[i] > 0) { + q = mockParentQueue(p, queues[i], pqs); + } else { + q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran); + } + when(q.getParent()).thenReturn(p); + when(q.getQueueName()).thenReturn(queueName); + when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot); + when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot); + } + assert 0 == pqs.size(); + return root; + } + + ParentQueue mockParentQueue(ParentQueue p, int subqueues, + Deque pqs) { + ParentQueue pq = mock(ParentQueue.class); + List cqs = new ArrayList(); + when(pq.getChildQueues()).thenReturn(cqs); + for (int i = 0; i < subqueues; ++i) { + pqs.add(pq); + } + if (p != null) { + p.getChildQueues().add(pq); + } + return pq; + } + + LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, + int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) { + LeafQueue lq = mock(LeafQueue.class); + when(lq.getTotalResourcePending()).thenReturn( + Resource.newInstance(pending[i], 0)); + // consider moving where CapacityScheduler::comparator accessible + NavigableSet qApps = new TreeSet( + new Comparator() { + @Override + public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { + return a1.getApplicationAttemptId() + .compareTo(a2.getApplicationAttemptId()); + } + }); + // applications are added in global L->R order in queues + if (apps[i] != 0) { + int aUsed = used[i] / apps[i]; + int aPending = pending[i] / apps[i]; + int aReserve = reserved[i] / apps[i]; + for (int a = 0; a < apps[i]; ++a) { + qApps.add(mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i])); + ++appAlloc; + } + } + when(lq.getApplications()).thenReturn(qApps); + p.getChildQueues().add(lq); + return lq; + } + + FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved, + int gran) { + FiCaSchedulerApp app = mock(FiCaSchedulerApp.class); + + ApplicationId appId = ApplicationId.newInstance(TS, id); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0); + when(app.getApplicationId()).thenReturn(appId); + when(app.getApplicationAttemptId()).thenReturn(appAttId); + + int cAlloc = 0; + Resource unit = Resource.newInstance(gran, 0); + List cReserved = new ArrayList(); + for (int i = 0; i < reserved; i += gran) { + cReserved.add(mockContainer(appAttId, cAlloc, unit, 1)); + ++cAlloc; + } + when(app.getReservedContainers()).thenReturn(cReserved); + + List cLive = new ArrayList(); + for (int i = 0; i < used; i += gran) { + cLive.add(mockContainer(appAttId, cAlloc, unit, 1)); + ++cAlloc; + } + when(app.getLiveContainers()).thenReturn(cLive); + return app; + } + + RMContainer mockContainer(ApplicationAttemptId appAttId, int id, + Resource r, int priority) { + ContainerId cId = ContainerId.newInstance(appAttId, id); + Container c = mock(Container.class); + when(c.getResource()).thenReturn(r); + when(c.getPriority()).thenReturn(Priority.create(priority)); + RMContainer mC = mock(RMContainer.class); + when(mC.getContainerId()).thenReturn(cId); + when(mC.getContainer()).thenReturn(c); + return mC; + } + + static int leafAbsCapacities(int[] abs, int[] subqueues) { + int ret = 0; + for (int i = 0; i < abs.length; ++i) { + if (0 == subqueues[i]) { + ret += abs[i]; + } + } + return ret; + } + + void printString(CSQueue nq, String indent) { + if (nq instanceof ParentQueue) { + System.out.println(indent + nq.getQueueName() + + " cur:" + nq.getAbsoluteUsedCapacity() + + " guar:" + nq.getAbsoluteCapacity() + ); + for (CSQueue q : ((ParentQueue)nq).getChildQueues()) { + printString(q, indent + " "); + } + } else { + System.out.println(indent + nq.getQueueName() + + " pen:" + ((LeafQueue) nq).getTotalResourcePending() + + " cur:" + nq.getAbsoluteUsedCapacity() + + " guar:" + nq.getAbsoluteCapacity() + ); + for (FiCaSchedulerApp a : ((LeafQueue)nq).getApplications()) { + System.out.println(indent + " " + a.getApplicationId()); + } + } + } + +}