diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 4bd96112a85..51fd15f0d1c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -317,6 +317,9 @@ Release 2.4.0 - UNRELEASED YARN-1598. HA-related rmadmin commands don't work on a secure cluster (kasha) + YARN-1603. Remove two *.orig files which were unexpectedly committed. + (Zhijie Shen via junping_du) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java.orig b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java.orig deleted file mode 100644 index 9fc43299681..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java.orig +++ /dev/null @@ -1,1361 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.QueueACL; -import org.apache.hadoop.yarn.api.records.QueueInfo; -import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; -import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; - -import com.google.common.annotations.VisibleForTesting; - -/** - * A scheduler that schedules resources between a set of queues. The scheduler - * keeps track of the resources used by each queue, and attempts to maintain - * fairness by scheduling tasks at queues whose allocations are farthest below - * an ideal fair distribution. - * - * The fair scheduler supports hierarchical queues. All queues descend from a - * queue named "root". Available resources are distributed among the children - * of the root queue in the typical fair scheduling fashion. Then, the children - * distribute the resources assigned to them to their children in the same - * fashion. Applications may only be scheduled on leaf queues. Queues can be - * specified as children of other queues by placing them as sub-elements of their - * parents in the fair scheduler configuration file. - * - * A queue's name starts with the names of its parents, with periods as - * separators. So a queue named "queue1" under the root named, would be - * referred to as "root.queue1", and a queue named "queue2" under a queue - * named "parent1" would be referred to as "root.parent1.queue2". - */ -@LimitedPrivate("yarn") -@Unstable -@SuppressWarnings("unchecked") -public class FairScheduler implements ResourceScheduler { - private boolean initialized; - private FairSchedulerConfiguration conf; - private RMContext rmContext; - private Resource minimumAllocation; - private Resource maximumAllocation; - private Resource incrAllocation; - private QueueManager queueMgr; - private Clock clock; - private boolean usePortForNodeName; - - private static final Log LOG = LogFactory.getLog(FairScheduler.class); - - private static final ResourceCalculator RESOURCE_CALCULATOR = - new DefaultResourceCalculator(); - - // Value that container assignment methods return when a container is - // reserved - public static final Resource CONTAINER_RESERVED = Resources.createResource(-1); - - // How often fair shares are re-calculated (ms) - protected long UPDATE_INTERVAL = 500; - - private final static List EMPTY_CONTAINER_LIST = - new ArrayList(); - - private static final Allocation EMPTY_ALLOCATION = - new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0)); - - // Aggregate metrics - FSQueueMetrics rootMetrics; - - // Time when we last updated preemption vars - protected long lastPreemptionUpdateTime; - // Time we last ran preemptTasksIfNecessary - private long lastPreemptCheckTime; - - // This stores per-application scheduling information, - @VisibleForTesting - protected Map applications = - new ConcurrentHashMap(); - - // Nodes in the cluster, indexed by NodeId - private Map nodes = - new ConcurrentHashMap(); - - // Aggregate capacity of the cluster - private Resource clusterCapacity = - RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); - - // How often tasks are preempted - protected long preemptionInterval; - - // ms to wait before force killing stuff (must be longer than a couple - // of heartbeats to give task-kill commands a chance to act). - protected long waitTimeBeforeKill; - - // Containers whose AMs have been warned that they will be preempted soon. - private List warnedContainers = new ArrayList(); - - protected boolean preemptionEnabled; - protected boolean sizeBasedWeight; // Give larger weights to larger jobs - protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster - protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not - protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling - private Comparator nodeAvailableResourceComparator = - new NodeAvailableResourceComparator(); // Node available resource comparator - protected double nodeLocalityThreshold; // Cluster threshold for node locality - protected double rackLocalityThreshold; // Cluster threshold for rack locality - protected long nodeLocalityDelayMs; // Delay for node locality - protected long rackLocalityDelayMs; // Delay for rack locality - private FairSchedulerEventLog eventLog; // Machine-readable event log - protected boolean assignMultiple; // Allocate multiple containers per - // heartbeat - protected int maxAssign; // Max containers to assign per heartbeat - - @VisibleForTesting - final MaxRunningAppsEnforcer maxRunningEnforcer; - - private AllocationFileLoaderService allocsLoader; - @VisibleForTesting - AllocationConfiguration allocConf; - - public FairScheduler() { - clock = new SystemClock(); - allocsLoader = new AllocationFileLoaderService(); - queueMgr = new QueueManager(this); - maxRunningEnforcer = new MaxRunningAppsEnforcer(this); - } - - private void validateConf(Configuration conf) { - // validate scheduler memory allocation setting - int minMem = conf.getInt( - YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); - int maxMem = conf.getInt( - YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); - - if (minMem < 0 || minMem > maxMem) { - throw new YarnRuntimeException("Invalid resource scheduler memory" - + " allocation configuration" - + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB - + "=" + minMem - + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB - + "=" + maxMem + ", min should equal greater than 0" - + ", max should be no smaller than min."); - } - - // validate scheduler vcores allocation setting - int minVcores = conf.getInt( - YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); - int maxVcores = conf.getInt( - YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); - - if (minVcores < 0 || minVcores > maxVcores) { - throw new YarnRuntimeException("Invalid resource scheduler vcores" - + " allocation configuration" - + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES - + "=" + minVcores - + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES - + "=" + maxVcores + ", min should equal greater than 0" - + ", max should be no smaller than min."); - } - } - - public FairSchedulerConfiguration getConf() { - return conf; - } - - public QueueManager getQueueManager() { - return queueMgr; - } - - @Override - public RMContainer getRMContainer(ContainerId containerId) { - FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId); - return (attempt == null) ? null : attempt.getRMContainer(containerId); - } - - private FSSchedulerApp getCurrentAttemptForContainer( - ContainerId containerId) { - SchedulerApplication app = - applications.get(containerId.getApplicationAttemptId() - .getApplicationId()); - if (app != null) { - return (FSSchedulerApp) app.getCurrentAppAttempt(); - } - return null; - } - - /** - * A runnable which calls {@link FairScheduler#update()} every - * UPDATE_INTERVAL milliseconds. - */ - private class UpdateThread implements Runnable { - public void run() { - while (true) { - try { - Thread.sleep(UPDATE_INTERVAL); - update(); - preemptTasksIfNecessary(); - } catch (Exception e) { - LOG.error("Exception in fair scheduler UpdateThread", e); - } - } - } - } - - /** - * Recompute the internal variables used by the scheduler - per-job weights, - * fair shares, deficits, minimum slot allocations, and amount of used and - * required resources per job. - */ - protected synchronized void update() { - updatePreemptionVariables(); // Determine if any queues merit preemption - - FSQueue rootQueue = queueMgr.getRootQueue(); - - // Recursively update demands for all queues - rootQueue.updateDemand(); - - rootQueue.setFairShare(clusterCapacity); - // Recursively compute fair shares for all queues - // and update metrics - rootQueue.recomputeShares(); - } - - /** - * Update the preemption fields for all QueueScheduables, i.e. the times since - * each queue last was at its guaranteed share and at > 1/2 of its fair share - * for each type of task. - */ - private void updatePreemptionVariables() { - long now = clock.getTime(); - lastPreemptionUpdateTime = now; - for (FSLeafQueue sched : queueMgr.getLeafQueues()) { - if (!isStarvedForMinShare(sched)) { - sched.setLastTimeAtMinShare(now); - } - if (!isStarvedForFairShare(sched)) { - sched.setLastTimeAtHalfFairShare(now); - } - } - } - - /** - * Is a queue below its min share for the given task type? - */ - boolean isStarvedForMinShare(FSLeafQueue sched) { - Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, - sched.getMinShare(), sched.getDemand()); - return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity, - sched.getResourceUsage(), desiredShare); - } - - /** - * Is a queue being starved for fair share for the given task type? This is - * defined as being below half its fair share. - */ - boolean isStarvedForFairShare(FSLeafQueue sched) { - Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, - Resources.multiply(sched.getFairShare(), .5), sched.getDemand()); - return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity, - sched.getResourceUsage(), desiredFairShare); - } - - /** - * Check for queues that need tasks preempted, either because they have been - * below their guaranteed share for minSharePreemptionTimeout or they have - * been below half their fair share for the fairSharePreemptionTimeout. If - * such queues exist, compute how many tasks of each type need to be preempted - * and then select the right ones using preemptTasks. - */ - protected synchronized void preemptTasksIfNecessary() { - if (!preemptionEnabled) { - return; - } - - long curTime = clock.getTime(); - if (curTime - lastPreemptCheckTime < preemptionInterval) { - return; - } - lastPreemptCheckTime = curTime; - - Resource resToPreempt = Resources.none(); - - for (FSLeafQueue sched : queueMgr.getLeafQueues()) { - resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime)); - } - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt, - Resources.none())) { - preemptResources(queueMgr.getLeafQueues(), resToPreempt); - } - } - - /** - * Preempt a quantity of resources from a list of QueueSchedulables. The - * policy for this is to pick apps from queues that are over their fair share, - * but make sure that no queue is placed below its fair share in the process. - * We further prioritize preemption by choosing containers with lowest - * priority to preempt. - */ - protected void preemptResources(Collection scheds, - Resource toPreempt) { - if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) { - return; - } - - Map apps = - new HashMap(); - Map queues = - new HashMap(); - - // Collect running containers from over-scheduled queues - List runningContainers = new ArrayList(); - for (FSLeafQueue sched : scheds) { - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - sched.getResourceUsage(), sched.getFairShare())) { - for (AppSchedulable as : sched.getRunnableAppSchedulables()) { - for (RMContainer c : as.getApp().getLiveContainers()) { - runningContainers.add(c); - apps.put(c, as.getApp()); - queues.put(c, sched); - } - } - } - } - - // Sort containers into reverse order of priority - Collections.sort(runningContainers, new Comparator() { - public int compare(RMContainer c1, RMContainer c2) { - int ret = c1.getContainer().getPriority().compareTo( - c2.getContainer().getPriority()); - if (ret == 0) { - return c2.getContainerId().compareTo(c1.getContainerId()); - } - return ret; - } - }); - - // Scan down the list of containers we've already warned and kill them - // if we need to. Remove any containers from the list that we don't need - // or that are no longer running. - Iterator warnedIter = warnedContainers.iterator(); - Set preemptedThisRound = new HashSet(); - while (warnedIter.hasNext()) { - RMContainer container = warnedIter.next(); - if (container.getState() == RMContainerState.RUNNING && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - toPreempt, Resources.none())) { - warnOrKillContainer(container, apps.get(container), queues.get(container)); - preemptedThisRound.add(container); - Resources.subtractFrom(toPreempt, container.getContainer().getResource()); - } else { - warnedIter.remove(); - } - } - - // Scan down the rest of the containers until we've preempted enough, making - // sure we don't preempt too many from any queue - Iterator runningIter = runningContainers.iterator(); - while (runningIter.hasNext() && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - toPreempt, Resources.none())) { - RMContainer container = runningIter.next(); - FSLeafQueue sched = queues.get(container); - if (!preemptedThisRound.contains(container) && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - sched.getResourceUsage(), sched.getFairShare())) { - warnOrKillContainer(container, apps.get(container), sched); - - warnedContainers.add(container); - Resources.subtractFrom(toPreempt, container.getContainer().getResource()); - } - } - } - - private void warnOrKillContainer(RMContainer container, FSSchedulerApp app, - FSLeafQueue queue) { - LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + - "res=" + container.getContainer().getResource() + - ") from queue " + queue.getName()); - - Long time = app.getContainerPreemptionTime(container); - - if (time != null) { - // if we asked for preemption more than maxWaitTimeBeforeKill ms ago, - // proceed with kill - if (time + waitTimeBeforeKill < clock.getTime()) { - ContainerStatus status = - SchedulerUtils.createPreemptedContainerStatus( - container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); - - // TODO: Not sure if this ever actually adds this to the list of cleanup - // containers on the RMNode (see SchedulerNode.releaseContainer()). - completedContainer(container, status, RMContainerEventType.KILL); - LOG.info("Killing container" + container + - " (after waiting for premption for " + - (clock.getTime() - time) + "ms)"); - } - } else { - // track the request in the FSSchedulerApp itself - app.addPreemption(container, clock.getTime()); - } - } - - /** - * Return the resource amount that this queue is allowed to preempt, if any. - * If the queue has been below its min share for at least its preemption - * timeout, it should preempt the difference between its current share and - * this min share. If it has been below half its fair share for at least the - * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its - * full fair share. If both conditions hold, we preempt the max of the two - * amounts (this shouldn't happen unless someone sets the timeouts to be - * identical for some reason). - */ - protected Resource resToPreempt(FSLeafQueue sched, long curTime) { - String queue = sched.getName(); - long minShareTimeout = allocConf.getMinSharePreemptionTimeout(queue); - long fairShareTimeout = allocConf.getFairSharePreemptionTimeout(); - Resource resDueToMinShare = Resources.none(); - Resource resDueToFairShare = Resources.none(); - if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { - Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, - sched.getMinShare(), sched.getDemand()); - resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, - Resources.none(), Resources.subtract(target, sched.getResourceUsage())); - } - if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { - Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, - sched.getFairShare(), sched.getDemand()); - resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, - Resources.none(), Resources.subtract(target, sched.getResourceUsage())); - } - Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, - resDueToMinShare, resDueToFairShare); - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - resToPreempt, Resources.none())) { - String message = "Should preempt " + resToPreempt + " res for queue " - + sched.getName() + ": resDueToMinShare = " + resDueToMinShare - + ", resDueToFairShare = " + resDueToFairShare; - LOG.info(message); - } - return resToPreempt; - } - - public RMContainerTokenSecretManager getContainerTokenSecretManager() { - return rmContext.getContainerTokenSecretManager(); - } - - // synchronized for sizeBasedWeight - public synchronized ResourceWeights getAppWeight(AppSchedulable app) { - double weight = 1.0; - if (sizeBasedWeight) { - // Set weight based on current memory demand - weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2); - } - weight *= app.getPriority().getPriority(); - if (weightAdjuster != null) { - // Run weight through the user-supplied weightAdjuster - weight = weightAdjuster.adjustWeight(app, weight); - } - return new ResourceWeights((float)weight); - } - - @Override - public Resource getMinimumResourceCapability() { - return minimumAllocation; - } - - public Resource getIncrementResourceCapability() { - return incrAllocation; - } - - @Override - public Resource getMaximumResourceCapability() { - return maximumAllocation; - } - - public double getNodeLocalityThreshold() { - return nodeLocalityThreshold; - } - - public double getRackLocalityThreshold() { - return rackLocalityThreshold; - } - - public long getNodeLocalityDelayMs() { - return nodeLocalityDelayMs; - } - - public long getRackLocalityDelayMs() { - return rackLocalityDelayMs; - } - - public boolean isContinuousSchedulingEnabled() { - return continuousSchedulingEnabled; - } - - public synchronized int getContinuousSchedulingSleepMs() { - return continuousSchedulingSleepMs; - } - - public Resource getClusterCapacity() { - return clusterCapacity; - } - - public synchronized Clock getClock() { - return clock; - } - - protected synchronized void setClock(Clock clock) { - this.clock = clock; - } - - public FairSchedulerEventLog getEventLog() { - return eventLog; - } - - /** - * Add a new application to the scheduler, with a given id, queue name, and - * user. This will accept a new app even if the user or queue is above - * configured limits, but the app will not be marked as runnable. - */ - protected synchronized void addApplication(ApplicationId applicationId, - String queueName, String user) { - if (queueName == null || queueName.isEmpty()) { - String message = "Reject application " + applicationId + - " submitted by user " + user + " with an empty queue name."; - LOG.info(message); - rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, message)); - return; - } - - RMApp rmApp = rmContext.getRMApps().get(applicationId); - FSLeafQueue queue = assignToQueue(rmApp, queueName, user); - if (queue == null) { - rmContext.getDispatcher().getEventHandler().handle( - new RMAppRejectedEvent(applicationId, - "Application rejected by queue placement policy")); - return; - } - - // Enforce ACLs - UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user); - - if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) - && !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) { - String msg = "User " + userUgi.getUserName() + - " cannot submit applications to queue " + queue.getName(); - LOG.info(msg); - rmContext.getDispatcher().getEventHandler() - .handle(new RMAppRejectedEvent(applicationId, msg)); - return; - } - - SchedulerApplication application = - new SchedulerApplication(queue, user); - applications.put(applicationId, application); - queue.getMetrics().submitApp(user); - - LOG.info("Accepted application " + applicationId + " from user: " + user - + ", in queue: " + queueName + ", currently num of applications: " - + applications.size()); - rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); - } - - /** - * Add a new application attempt to the scheduler. - */ - protected synchronized void addApplicationAttempt( - ApplicationAttemptId applicationAttemptId, - boolean transferStateFromPreviousAttempt) { - SchedulerApplication application = - applications.get(applicationAttemptId.getApplicationId()); - String user = application.getUser(); - FSLeafQueue queue = (FSLeafQueue) application.getQueue(); - - FSSchedulerApp attempt = - new FSSchedulerApp(applicationAttemptId, user, - queue, new ActiveUsersManager(getRootQueueMetrics()), - rmContext); - if (transferStateFromPreviousAttempt) { - attempt.transferStateFromPreviousAttempt(application - .getCurrentAppAttempt()); - } - application.setCurrentAppAttempt(attempt); - - boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user); - queue.addApp(attempt, runnable); - if (runnable) { - maxRunningEnforcer.trackRunnableApp(attempt); - } else { - maxRunningEnforcer.trackNonRunnableApp(attempt); - } - - queue.getMetrics().submitAppAttempt(user); - - LOG.info("Added Application Attempt " + applicationAttemptId - + " to scheduler from user: " + user); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); - } - - @VisibleForTesting - FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) { - FSLeafQueue queue = null; - try { - QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy(); - queueName = placementPolicy.assignAppToQueue(queueName, user); - if (queueName == null) { - return null; - } - queue = queueMgr.getLeafQueue(queueName, true); - } catch (IOException ex) { - LOG.error("Error assigning app to queue, rejecting", ex); - } - - if (rmApp != null) { - rmApp.setQueue(queue.getName()); - } else { - LOG.warn("Couldn't find RM app to set queue name on"); - } - - return queue; - } - - private synchronized void removeApplication(ApplicationId applicationId, - RMAppState finalState) { - SchedulerApplication application = applications.get(applicationId); - if (application == null){ - LOG.warn("Couldn't find application " + applicationId); - return; - } - application.stop(finalState); - applications.remove(applicationId); - } - - private synchronized void removeApplicationAttempt( - ApplicationAttemptId applicationAttemptId, - RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { - LOG.info("Application " + applicationAttemptId + " is done." + - " finalState=" + rmAppAttemptFinalState); - SchedulerApplication application = - applications.get(applicationAttemptId.getApplicationId()); - FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId); - - if (attempt == null || application == null) { - LOG.info("Unknown application " + applicationAttemptId + " has completed!"); - return; - } - - // Release all the running containers - for (RMContainer rmContainer : attempt.getLiveContainers()) { - if (keepContainers - && rmContainer.getState().equals(RMContainerState.RUNNING)) { - // do not kill the running container in the case of work-preserving AM - // restart. - LOG.info("Skip killing " + rmContainer.getContainerId()); - continue; - } - completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - rmContainer.getContainerId(), - SchedulerUtils.COMPLETED_APPLICATION), - RMContainerEventType.KILL); - } - - // Release all reserved containers - for (RMContainer rmContainer : attempt.getReservedContainers()) { - completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - rmContainer.getContainerId(), - "Application Complete"), - RMContainerEventType.KILL); - } - // Clean up pending requests, metrics etc. - attempt.stop(rmAppAttemptFinalState); - - // Inform the queue - FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue() - .getQueueName(), false); - boolean wasRunnable = queue.removeApp(attempt); - - if (wasRunnable) { - maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt); - } else { - maxRunningEnforcer.untrackNonRunnableApp(attempt); - } - } - - /** - * Clean up a completed container. - */ - private synchronized void completedContainer(RMContainer rmContainer, - ContainerStatus containerStatus, RMContainerEventType event) { - if (rmContainer == null) { - LOG.info("Null container completed..."); - return; - } - - Container container = rmContainer.getContainer(); - - // Get the application for the finished container - FSSchedulerApp application = - getCurrentAttemptForContainer(container.getId()); - ApplicationId appId = - container.getId().getApplicationAttemptId().getApplicationId(); - if (application == null) { - LOG.info("Container " + container + " of" + - " unknown application attempt " + appId + - " completed with event " + event); - return; - } - - // Get the node on which the container was allocated - FSSchedulerNode node = nodes.get(container.getNodeId()); - - if (rmContainer.getState() == RMContainerState.RESERVED) { - application.unreserve(node, rmContainer.getReservedPriority()); - node.unreserveResource(application); - } else { - application.containerCompleted(rmContainer, containerStatus, event); - node.releaseContainer(container); - updateRootQueueMetrics(); - } - - LOG.info("Application attempt " + application.getApplicationAttemptId() - + " released container " + container.getId() + " on node: " + node - + " with event: " + event); - } - - private synchronized void addNode(RMNode node) { - nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName)); - Resources.addTo(clusterCapacity, node.getTotalCapability()); - updateRootQueueMetrics(); - - LOG.info("Added node " + node.getNodeAddress() + - " cluster capacity: " + clusterCapacity); - } - - private synchronized void removeNode(RMNode rmNode) { - FSSchedulerNode node = nodes.get(rmNode.getNodeID()); - // This can occur when an UNHEALTHY node reconnects - if (node == null) { - return; - } - Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability()); - updateRootQueueMetrics(); - - // Remove running containers - List runningContainers = node.getRunningContainers(); - for (RMContainer container : runningContainers) { - completedContainer(container, - SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), - SchedulerUtils.LOST_CONTAINER), - RMContainerEventType.KILL); - } - - // Remove reservations, if any - RMContainer reservedContainer = node.getReservedContainer(); - if (reservedContainer != null) { - completedContainer(reservedContainer, - SchedulerUtils.createAbnormalContainerStatus( - reservedContainer.getContainerId(), - SchedulerUtils.LOST_CONTAINER), - RMContainerEventType.KILL); - } - - nodes.remove(rmNode.getNodeID()); - LOG.info("Removed node " + rmNode.getNodeAddress() + - " cluster capacity: " + clusterCapacity); - } - - @Override - public Allocation allocate(ApplicationAttemptId appAttemptId, - List ask, List release, List blacklistAdditions, List blacklistRemovals) { - - // Make sure this application exists - FSSchedulerApp application = getSchedulerApp(appAttemptId); - if (application == null) { - LOG.info("Calling allocate on removed " + - "or non existant application " + appAttemptId); - return EMPTY_ALLOCATION; - } - - // Sanity check - SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(), - clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation); - - // Release containers - for (ContainerId releasedContainerId : release) { - RMContainer rmContainer = getRMContainer(releasedContainerId); - if (rmContainer == null) { - RMAuditLogger.logFailure(application.getUser(), - AuditConstants.RELEASE_CONTAINER, - "Unauthorized access or invalid container", "FairScheduler", - "Trying to release container not owned by app or with invalid id", - application.getApplicationId(), releasedContainerId); - } - completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - releasedContainerId, - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); - } - - synchronized (application) { - if (!ask.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("allocate: pre-update" + - " applicationAttemptId=" + appAttemptId + - " application=" + application.getApplicationId()); - } - application.showRequests(); - - // Update application requests - application.updateResourceRequests(ask); - - LOG.debug("allocate: post-update"); - application.showRequests(); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("allocate:" + - " applicationAttemptId=" + appAttemptId + - " #ask=" + ask.size()); - - LOG.debug("Preempting " + application.getPreemptionContainers().size() - + " container(s)"); - } - - Set preemptionContainerIds = new HashSet(); - for (RMContainer container : application.getPreemptionContainers()) { - preemptionContainerIds.add(container.getContainerId()); - } - - application.updateBlacklist(blacklistAdditions, blacklistRemovals); - - return new Allocation(application.pullNewlyAllocatedContainers(), - application.getHeadroom(), preemptionContainerIds); - } - } - - /** - * Process a container which has launched on a node, as reported by the node. - */ - private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) { - // Get the application for the finished container - FSSchedulerApp application = getCurrentAttemptForContainer(containerId); - if (application == null) { - LOG.info("Unknown application " - + containerId.getApplicationAttemptId().getApplicationId() - + " launched container " + containerId + " on node: " + node); - return; - } - - application.containerLaunchedOnNode(containerId, node.getNodeID()); - } - - /** - * Process a heartbeat update from a node. - */ - private synchronized void nodeUpdate(RMNode nm) { - if (LOG.isDebugEnabled()) { - LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity); - } - eventLog.log("HEARTBEAT", nm.getHostName()); - FSSchedulerNode node = nodes.get(nm.getNodeID()); - - // Update resource if any change - SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG); - - List containerInfoList = nm.pullContainerUpdates(); - List newlyLaunchedContainers = new ArrayList(); - List completedContainers = new ArrayList(); - for(UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } - - // Process completed containers - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - LOG.debug("Container FINISHED: " + containerId); - completedContainer(getRMContainer(containerId), - completedContainer, RMContainerEventType.FINISHED); - } - - if (continuousSchedulingEnabled) { - if (!completedContainers.isEmpty()) { - attemptScheduling(node); - } - } else { - attemptScheduling(node); - } - } - - private void continuousScheduling() { - while (true) { - List nodeIdList = new ArrayList(nodes.keySet()); - Collections.sort(nodeIdList, nodeAvailableResourceComparator); - - // iterate all nodes - for (NodeId nodeId : nodeIdList) { - if (nodes.containsKey(nodeId)) { - FSSchedulerNode node = nodes.get(nodeId); - try { - if (Resources.fitsIn(minimumAllocation, - node.getAvailableResource())) { - attemptScheduling(node); - } - } catch (Throwable ex) { - LOG.warn("Error while attempting scheduling for node " + node + - ": " + ex.toString(), ex); - } - } - } - try { - Thread.sleep(getContinuousSchedulingSleepMs()); - } catch (InterruptedException e) { - LOG.warn("Error while doing sleep in continuous scheduling: " + - e.toString(), e); - } - } - } - - /** Sort nodes by available resource */ - private class NodeAvailableResourceComparator implements Comparator { - - @Override - public int compare(NodeId n1, NodeId n2) { - return RESOURCE_CALCULATOR.compare(clusterCapacity, - nodes.get(n2).getAvailableResource(), - nodes.get(n1).getAvailableResource()); - } - } - - private synchronized void attemptScheduling(FSSchedulerNode node) { - // Assign new containers... - // 1. Check for reserved applications - // 2. Schedule if there are no reservations - - AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable(); - if (reservedAppSchedulable != null) { - Priority reservedPriority = node.getReservedContainer().getReservedPriority(); - if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) { - // Don't hold the reservation if app can no longer use it - LOG.info("Releasing reservation that cannot be satisfied for application " - + reservedAppSchedulable.getApp().getApplicationAttemptId() - + " on node " + node); - reservedAppSchedulable.unreserve(reservedPriority, node); - reservedAppSchedulable = null; - } else { - // Reservation exists; try to fulfill the reservation - LOG.info("Trying to fulfill reservation for application " - + reservedAppSchedulable.getApp().getApplicationAttemptId() - + " on node: " + node); - - node.getReservedAppSchedulable().assignReservedContainer(node); - } - } - if (reservedAppSchedulable == null) { - // No reservation, schedule at queue which is farthest below fair share - int assignedContainers = 0; - while (node.getReservedContainer() == null) { - boolean assignedContainer = false; - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - queueMgr.getRootQueue().assignContainer(node), - Resources.none())) { - assignedContainers++; - assignedContainer = true; - } - if (!assignedContainer) { break; } - if (!assignMultiple) { break; } - if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; } - } - } - updateRootQueueMetrics(); - } - - @Override - public SchedulerNodeReport getNodeReport(NodeId nodeId) { - FSSchedulerNode node = nodes.get(nodeId); - return node == null ? null : new SchedulerNodeReport(node); - } - - public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) { - SchedulerApplication app = - applications.get(appAttemptId.getApplicationId()); - if (app != null) { - return (FSSchedulerApp) app.getCurrentAppAttempt(); - } - return null; - } - - @Override - public SchedulerAppReport getSchedulerAppInfo( - ApplicationAttemptId appAttemptId) { - FSSchedulerApp attempt = getSchedulerApp(appAttemptId); - if (attempt == null) { - LOG.error("Request for appInfo of unknown attempt" + appAttemptId); - return null; - } - return new SchedulerAppReport(attempt); - } - - @Override - public ApplicationResourceUsageReport getAppResourceUsageReport( - ApplicationAttemptId appAttemptId) { - FSSchedulerApp attempt = getSchedulerApp(appAttemptId); - if (attempt == null) { - LOG.error("Request for appInfo of unknown attempt" + appAttemptId); - return null; - } - return attempt.getResourceUsageReport(); - } - - /** - * Subqueue metrics might be a little out of date because fair shares are - * recalculated at the update interval, but the root queue metrics needs to - * be updated synchronously with allocations and completions so that cluster - * metrics will be consistent. - */ - private void updateRootQueueMetrics() { - rootMetrics.setAvailableResourcesToQueue( - Resources.subtract( - clusterCapacity, rootMetrics.getAllocatedResources())); - } - - @Override - public QueueMetrics getRootQueueMetrics() { - return rootMetrics; - } - - @Override - public void handle(SchedulerEvent event) { - switch (event.getType()) { - case NODE_ADDED: - if (!(event instanceof NodeAddedSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; - addNode(nodeAddedEvent.getAddedRMNode()); - break; - case NODE_REMOVED: - if (!(event instanceof NodeRemovedSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event; - removeNode(nodeRemovedEvent.getRemovedRMNode()); - break; - case NODE_UPDATE: - if (!(event instanceof NodeUpdateSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; - nodeUpdate(nodeUpdatedEvent.getRMNode()); - break; - case APP_ADDED: - if (!(event instanceof AppAddedSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; - addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); - break; - case APP_REMOVED: - if (!(event instanceof AppRemovedSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event; - removeApplication(appRemovedEvent.getApplicationID(), - appRemovedEvent.getFinalState()); - break; - case APP_ATTEMPT_ADDED: - if (!(event instanceof AppAttemptAddedSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - AppAttemptAddedSchedulerEvent appAttemptAddedEvent = - (AppAttemptAddedSchedulerEvent) event; - addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); - break; - case APP_ATTEMPT_REMOVED: - if (!(event instanceof AppAttemptRemovedSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent = - (AppAttemptRemovedSchedulerEvent) event; - removeApplicationAttempt( - appAttemptRemovedEvent.getApplicationAttemptID(), - appAttemptRemovedEvent.getFinalAttemptState(), - appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts()); - break; - case CONTAINER_EXPIRED: - if (!(event instanceof ContainerExpiredSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - ContainerExpiredSchedulerEvent containerExpiredEvent = - (ContainerExpiredSchedulerEvent)event; - ContainerId containerId = containerExpiredEvent.getContainerId(); - completedContainer(getRMContainer(containerId), - SchedulerUtils.createAbnormalContainerStatus( - containerId, - SchedulerUtils.EXPIRED_CONTAINER), - RMContainerEventType.EXPIRE); - break; - default: - LOG.error("Unknown event arrived at FairScheduler: " + event.toString()); - } - } - - @Override - public void recover(RMState state) throws Exception { - // NOT IMPLEMENTED - } - - @Override - public synchronized void reinitialize(Configuration conf, RMContext rmContext) - throws IOException { - if (!initialized) { - this.conf = new FairSchedulerConfiguration(conf); - validateConf(this.conf); - minimumAllocation = this.conf.getMinimumAllocation(); - maximumAllocation = this.conf.getMaximumAllocation(); - incrAllocation = this.conf.getIncrementAllocation(); - continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); - continuousSchedulingSleepMs = - this.conf.getContinuousSchedulingSleepMs(); - nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); - rackLocalityThreshold = this.conf.getLocalityThresholdRack(); - nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); - rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); - preemptionEnabled = this.conf.getPreemptionEnabled(); - assignMultiple = this.conf.getAssignMultiple(); - maxAssign = this.conf.getMaxAssign(); - sizeBasedWeight = this.conf.getSizeBasedWeight(); - preemptionInterval = this.conf.getPreemptionInterval(); - waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); - usePortForNodeName = this.conf.getUsePortForNodeName(); - - rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); - this.rmContext = rmContext; - this.eventLog = new FairSchedulerEventLog(); - eventLog.init(this.conf); - - initialized = true; - - allocConf = new AllocationConfiguration(conf); - try { - queueMgr.initialize(conf); - } catch (Exception e) { - throw new IOException("Failed to start FairScheduler", e); - } - - Thread updateThread = new Thread(new UpdateThread()); - updateThread.setName("FairSchedulerUpdateThread"); - updateThread.setDaemon(true); - updateThread.start(); - - if (continuousSchedulingEnabled) { - // start continuous scheduling thread - Thread schedulingThread = new Thread( - new Runnable() { - @Override - public void run() { - continuousScheduling(); - } - } - ); - schedulingThread.setName("ContinuousScheduling"); - schedulingThread.setDaemon(true); - schedulingThread.start(); - } - - allocsLoader.init(conf); - allocsLoader.setReloadListener(new AllocationReloadListener()); - // If we fail to load allocations file on initialize, we want to fail - // immediately. After a successful load, exceptions on future reloads - // will just result in leaving things as they are. - try { - allocsLoader.reloadAllocations(); - } catch (Exception e) { - throw new IOException("Failed to initialize FairScheduler", e); - } - allocsLoader.start(); - } else { - try { - allocsLoader.reloadAllocations(); - } catch (Exception e) { - LOG.error("Failed to reload allocations file", e); - } - } - } - - @Override - public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, - boolean recursive) throws IOException { - if (!queueMgr.exists(queueName)) { - throw new IOException("queue " + queueName + " does not exist"); - } - return queueMgr.getQueue(queueName).getQueueInfo(includeChildQueues, - recursive); - } - - @Override - public List getQueueUserAclInfo() { - UserGroupInformation user = null; - try { - user = UserGroupInformation.getCurrentUser(); - } catch (IOException ioe) { - return new ArrayList(); - } - - return queueMgr.getRootQueue().getQueueUserAclInfo(user); - } - - @Override - public int getNumClusterNodes() { - return nodes.size(); - } - - @Override - public synchronized boolean checkAccess(UserGroupInformation callerUGI, - QueueACL acl, String queueName) { - FSQueue queue = getQueueManager().getQueue(queueName); - if (queue == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("ACL not found for queue access-type " + acl - + " for queue " + queueName); - } - return false; - } - return queue.hasAccess(acl, callerUGI); - } - - public AllocationConfiguration getAllocationConfiguration() { - return allocConf; - } - - private class AllocationReloadListener implements - AllocationFileLoaderService.Listener { - - @Override - public void onReload(AllocationConfiguration queueInfo) { - // Commit the reload; also create any queue defined in the alloc file - // if it does not already exist, so it can be displayed on the web UI. - synchronized (FairScheduler.this) { - allocConf = queueInfo; - allocConf.getDefaultSchedulingPolicy().initialize(clusterCapacity); - queueMgr.updateAllocationConfiguration(allocConf); - } - } - } - - @Override - public List getAppsInQueue(String queueName) { - FSQueue queue = queueMgr.getQueue(queueName); - if (queue == null) { - return null; - } - List apps = new ArrayList(); - queue.collectSchedulerApplications(apps); - return apps; - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java.orig b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java.orig deleted file mode 100644 index 2dc0e8805e7..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java.orig +++ /dev/null @@ -1,615 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import junit.framework.Assert; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.net.NetworkTopology; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.QueueInfo; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.InlineDispatcher; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.resourcemanager.Application; -import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; -import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.Task; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class TestFifoScheduler { - private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); - private final int GB = 1024; - - private ResourceManager resourceManager = null; - - private static final RecordFactory recordFactory = - RecordFactoryProvider.getRecordFactory(null); - - @Before - public void setUp() throws Exception { - resourceManager = new ResourceManager(); - Configuration conf = new Configuration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, - FifoScheduler.class, ResourceScheduler.class); - resourceManager.init(conf); - } - - @After - public void tearDown() throws Exception { - resourceManager.stop(); - } - - private org.apache.hadoop.yarn.server.resourcemanager.NodeManager - registerNode(String hostName, int containerManagerPort, int nmHttpPort, - String rackName, Resource capability) throws IOException, - YarnException { - return new org.apache.hadoop.yarn.server.resourcemanager.NodeManager( - hostName, containerManagerPort, nmHttpPort, rackName, capability, - resourceManager); - } - - private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { - ApplicationId appIdImpl = ApplicationId.newInstance(0, appId); - ApplicationAttemptId attId = - ApplicationAttemptId.newInstance(appIdImpl, attemptId); - return attId; - } - - private ResourceRequest createResourceRequest(int memory, String host, - int priority, int numContainers) { - ResourceRequest request = recordFactory - .newRecordInstance(ResourceRequest.class); - request.setCapability(Resources.createResource(memory)); - request.setResourceName(host); - request.setNumContainers(numContainers); - Priority prio = recordFactory.newRecordInstance(Priority.class); - prio.setPriority(priority); - request.setPriority(prio); - return request; - } - - @Test(timeout=5000) - public void testFifoSchedulerCapacityWhenNoNMs() { - FifoScheduler scheduler = new FifoScheduler(); - QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false); - Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity()); - } - - @Test(timeout=5000) - public void testAppAttemptMetrics() throws Exception { - AsyncDispatcher dispatcher = new InlineDispatcher(); - RMContext rmContext = new RMContextImpl(dispatcher, null, - null, null, null, null, null, null, null); - - FifoScheduler schedular = new FifoScheduler(); - schedular.reinitialize(new Configuration(), rmContext); - QueueMetrics metrics = schedular.getRootQueueMetrics(); - int beforeAppsSubmitted = metrics.getAppsSubmitted(); - - ApplicationId appId = BuilderUtils.newApplicationId(200, 1); - ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( - appId, 1); - - SchedulerEvent appEvent = new AppAddedSchedulerEvent(appId, "queue", "user"); - schedular.handle(appEvent); - SchedulerEvent attemptEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - schedular.handle(attemptEvent); - - appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2); - SchedulerEvent attemptEvent2 = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - schedular.handle(attemptEvent2); - - int afterAppsSubmitted = metrics.getAppsSubmitted(); - Assert.assertEquals(1, afterAppsSubmitted - beforeAppsSubmitted); - } - - @Test(timeout=2000) - public void testNodeLocalAssignment() throws Exception { - AsyncDispatcher dispatcher = new InlineDispatcher(); - Configuration conf = new Configuration(); - RMContainerTokenSecretManager containerTokenSecretManager = - new RMContainerTokenSecretManager(conf); - containerTokenSecretManager.rollMasterKey(); - NMTokenSecretManagerInRM nmTokenSecretManager = - new NMTokenSecretManagerInRM(conf); - nmTokenSecretManager.rollMasterKey(); - RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, - null, containerTokenSecretManager, nmTokenSecretManager, null); - - FifoScheduler scheduler = new FifoScheduler(); - scheduler.reinitialize(new Configuration(), rmContext); - - RMNode node0 = MockNodes.newNodeInfo(1, - Resources.createResource(1024 * 64), 1, "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0); - scheduler.handle(nodeEvent1); - - int _appId = 1; - int _appAttemptId = 1; - ApplicationAttemptId appAttemptId = createAppAttemptId(_appId, - _appAttemptId); - AppAddedSchedulerEvent appEvent = - new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue1", - "user1"); - scheduler.handle(appEvent); - AppAttemptAddedSchedulerEvent attemptEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - scheduler.handle(attemptEvent); - - int memory = 64; - int nConts = 3; - int priority = 20; - - List ask = new ArrayList(); - ResourceRequest nodeLocal = createResourceRequest(memory, - node0.getHostName(), priority, nConts); - ResourceRequest rackLocal = createResourceRequest(memory, - node0.getRackName(), priority, nConts); - ResourceRequest any = createResourceRequest(memory, ResourceRequest.ANY, priority, - nConts); - ask.add(nodeLocal); - ask.add(rackLocal); - ask.add(any); - scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null); - - NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0); - - // Before the node update event, there are 3 local requests outstanding - Assert.assertEquals(3, nodeLocal.getNumContainers()); - - scheduler.handle(node0Update); - - // After the node update event, check that there are no more local requests - // outstanding - Assert.assertEquals(0, nodeLocal.getNumContainers()); - //Also check that the containers were scheduled - SchedulerAppReport info = scheduler.getSchedulerAppInfo(appAttemptId); - Assert.assertEquals(3, info.getLiveContainers().size()); - } - - @Test(timeout=2000) - public void testUpdateResourceOnNode() throws Exception { - AsyncDispatcher dispatcher = new InlineDispatcher(); - Configuration conf = new Configuration(); - RMContainerTokenSecretManager containerTokenSecretManager = - new RMContainerTokenSecretManager(conf); - containerTokenSecretManager.rollMasterKey(); - NMTokenSecretManagerInRM nmTokenSecretManager = - new NMTokenSecretManagerInRM(conf); - nmTokenSecretManager.rollMasterKey(); - RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, - null, containerTokenSecretManager, nmTokenSecretManager, null); - - FifoScheduler scheduler = new FifoScheduler(){ - @SuppressWarnings("unused") - public Map getNodes(){ - return nodes; - } - }; - scheduler.reinitialize(new Configuration(), rmContext); - RMNode node0 = MockNodes.newNodeInfo(1, - Resources.createResource(2048, 4), 1, "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0); - scheduler.handle(nodeEvent1); - - Method method = scheduler.getClass().getDeclaredMethod("getNodes"); - @SuppressWarnings("unchecked") - Map schedulerNodes = - (Map) method.invoke(scheduler); - assertEquals(schedulerNodes.values().size(), 1); - - // set resource of RMNode to 1024 and verify it works. - node0.setResourceOption(ResourceOption.newInstance( - Resources.createResource(1024, 4), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)); - assertEquals(node0.getTotalCapability().getMemory(), 1024); - // verify that SchedulerNode's resource hasn't been changed. - assertEquals(schedulerNodes.get(node0.getNodeID()). - getAvailableResource().getMemory(), 2048); - // now, NM heartbeat comes. - NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0); - scheduler.handle(node0Update); - // SchedulerNode's available resource is changed. - assertEquals(schedulerNodes.get(node0.getNodeID()). - getAvailableResource().getMemory(), 1024); - QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false); - Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity()); - - int _appId = 1; - int _appAttemptId = 1; - ApplicationAttemptId appAttemptId = createAppAttemptId(_appId, - _appAttemptId); - AppAddedSchedulerEvent appEvent = - new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue1", - "user1"); - scheduler.handle(appEvent); - AppAttemptAddedSchedulerEvent attemptEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - scheduler.handle(attemptEvent); - - int memory = 1024; - int priority = 1; - - List ask = new ArrayList(); - ResourceRequest nodeLocal = createResourceRequest(memory, - node0.getHostName(), priority, 1); - ResourceRequest rackLocal = createResourceRequest(memory, - node0.getRackName(), priority, 1); - ResourceRequest any = createResourceRequest(memory, ResourceRequest.ANY, priority, - 1); - ask.add(nodeLocal); - ask.add(rackLocal); - ask.add(any); - scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null); - - // Before the node update event, there are one local request - Assert.assertEquals(1, nodeLocal.getNumContainers()); - - // Now schedule. - scheduler.handle(node0Update); - - // After the node update event, check no local request - Assert.assertEquals(0, nodeLocal.getNumContainers()); - // Also check that one container was scheduled - SchedulerAppReport info = scheduler.getSchedulerAppInfo(appAttemptId); - Assert.assertEquals(1, info.getLiveContainers().size()); - // And check the default Queue now is full. - queueInfo = scheduler.getQueueInfo(null, false, false); - Assert.assertEquals(1.0f, queueInfo.getCurrentCapacity()); - } - -// @Test - public void testFifoScheduler() throws Exception { - - LOG.info("--- START: testFifoScheduler ---"); - - final int GB = 1024; - - // Register node1 - String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); - nm_0.heartbeat(); - - // Register node2 - String host_1 = "host_1"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = - registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); - nm_1.heartbeat(); - - // ResourceRequest priorities - Priority priority_0 = - org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(0); - Priority priority_1 = - org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(1); - - // Submit an application - Application application_0 = new Application("user_0", resourceManager); - application_0.submit(); - - application_0.addNodeManager(host_0, 1234, nm_0); - application_0.addNodeManager(host_1, 1234, nm_1); - - Resource capability_0_0 = Resources.createResource(GB); - application_0.addResourceRequestSpec(priority_1, capability_0_0); - - Resource capability_0_1 = Resources.createResource(2 * GB); - application_0.addResourceRequestSpec(priority_0, capability_0_1); - - Task task_0_0 = new Task(application_0, priority_1, - new String[] {host_0, host_1}); - application_0.addTask(task_0_0); - - // Submit another application - Application application_1 = new Application("user_1", resourceManager); - application_1.submit(); - - application_1.addNodeManager(host_0, 1234, nm_0); - application_1.addNodeManager(host_1, 1234, nm_1); - - Resource capability_1_0 = Resources.createResource(3 * GB); - application_1.addResourceRequestSpec(priority_1, capability_1_0); - - Resource capability_1_1 = Resources.createResource(4 * GB); - application_1.addResourceRequestSpec(priority_0, capability_1_1); - - Task task_1_0 = new Task(application_1, priority_1, - new String[] {host_0, host_1}); - application_1.addTask(task_1_0); - - // Send resource requests to the scheduler - LOG.info("Send resource requests to the scheduler"); - application_0.schedule(); - application_1.schedule(); - - // Send a heartbeat to kick the tires on the Scheduler - LOG.info("Send a heartbeat to kick the tires on the Scheduler... " + - "nm0 -> task_0_0 and task_1_0 allocated, used=4G " + - "nm1 -> nothing allocated"); - nm_0.heartbeat(); // task_0_0 and task_1_0 allocated, used=4G - nm_1.heartbeat(); // nothing allocated - - // Get allocations from the scheduler - application_0.schedule(); // task_0_0 - checkApplicationResourceUsage(GB, application_0); - - application_1.schedule(); // task_1_0 - checkApplicationResourceUsage(3 * GB, application_1); - - nm_0.heartbeat(); - nm_1.heartbeat(); - - checkNodeResourceUsage(4*GB, nm_0); // task_0_0 (1G) and task_1_0 (3G) - checkNodeResourceUsage(0*GB, nm_1); // no tasks, 2G available - - LOG.info("Adding new tasks..."); - - Task task_1_1 = new Task(application_1, priority_1, - new String[] {ResourceRequest.ANY}); - application_1.addTask(task_1_1); - - Task task_1_2 = new Task(application_1, priority_1, - new String[] {ResourceRequest.ANY}); - application_1.addTask(task_1_2); - - Task task_1_3 = new Task(application_1, priority_0, - new String[] {ResourceRequest.ANY}); - application_1.addTask(task_1_3); - - application_1.schedule(); - - Task task_0_1 = new Task(application_0, priority_1, - new String[] {host_0, host_1}); - application_0.addTask(task_0_1); - - Task task_0_2 = new Task(application_0, priority_1, - new String[] {host_0, host_1}); - application_0.addTask(task_0_2); - - Task task_0_3 = new Task(application_0, priority_0, - new String[] {ResourceRequest.ANY}); - application_0.addTask(task_0_3); - - application_0.schedule(); - - // Send a heartbeat to kick the tires on the Scheduler - LOG.info("Sending hb from " + nm_0.getHostName()); - nm_0.heartbeat(); // nothing new, used=4G - - LOG.info("Sending hb from " + nm_1.getHostName()); - nm_1.heartbeat(); // task_0_3, used=2G - - // Get allocations from the scheduler - LOG.info("Trying to allocate..."); - application_0.schedule(); - checkApplicationResourceUsage(3 * GB, application_0); - application_1.schedule(); - checkApplicationResourceUsage(3 * GB, application_1); - nm_0.heartbeat(); - nm_1.heartbeat(); - checkNodeResourceUsage(4*GB, nm_0); - checkNodeResourceUsage(2*GB, nm_1); - - // Complete tasks - LOG.info("Finishing up task_0_0"); - application_0.finishTask(task_0_0); // Now task_0_1 - application_0.schedule(); - application_1.schedule(); - nm_0.heartbeat(); - nm_1.heartbeat(); - checkApplicationResourceUsage(3 * GB, application_0); - checkApplicationResourceUsage(3 * GB, application_1); - checkNodeResourceUsage(4*GB, nm_0); - checkNodeResourceUsage(2*GB, nm_1); - - LOG.info("Finishing up task_1_0"); - application_1.finishTask(task_1_0); // Now task_0_2 - application_0.schedule(); // final overcommit for app0 caused here - application_1.schedule(); - nm_0.heartbeat(); // final overcommit for app0 occurs here - nm_1.heartbeat(); - checkApplicationResourceUsage(4 * GB, application_0); - checkApplicationResourceUsage(0 * GB, application_1); - //checkNodeResourceUsage(1*GB, nm_0); // final over-commit -> rm.node->1G, test.node=2G - checkNodeResourceUsage(2*GB, nm_1); - - LOG.info("Finishing up task_0_3"); - application_0.finishTask(task_0_3); // No more - application_0.schedule(); - application_1.schedule(); - nm_0.heartbeat(); - nm_1.heartbeat(); - checkApplicationResourceUsage(2 * GB, application_0); - checkApplicationResourceUsage(0 * GB, application_1); - //checkNodeResourceUsage(2*GB, nm_0); // final over-commit, rm.node->1G, test.node->2G - checkNodeResourceUsage(0*GB, nm_1); - - LOG.info("Finishing up task_0_1"); - application_0.finishTask(task_0_1); - application_0.schedule(); - application_1.schedule(); - nm_0.heartbeat(); - nm_1.heartbeat(); - checkApplicationResourceUsage(1 * GB, application_0); - checkApplicationResourceUsage(0 * GB, application_1); - - LOG.info("Finishing up task_0_2"); - application_0.finishTask(task_0_2); // now task_1_3 can go! - application_0.schedule(); - application_1.schedule(); - nm_0.heartbeat(); - nm_1.heartbeat(); - checkApplicationResourceUsage(0 * GB, application_0); - checkApplicationResourceUsage(4 * GB, application_1); - - LOG.info("Finishing up task_1_3"); - application_1.finishTask(task_1_3); // now task_1_1 - application_0.schedule(); - application_1.schedule(); - nm_0.heartbeat(); - nm_1.heartbeat(); - checkApplicationResourceUsage(0 * GB, application_0); - checkApplicationResourceUsage(3 * GB, application_1); - - LOG.info("Finishing up task_1_1"); - application_1.finishTask(task_1_1); - application_0.schedule(); - application_1.schedule(); - nm_0.heartbeat(); - nm_1.heartbeat(); - checkApplicationResourceUsage(0 * GB, application_0); - checkApplicationResourceUsage(3 * GB, application_1); - - LOG.info("--- END: testFifoScheduler ---"); - } - - @SuppressWarnings("resource") - @Test - public void testBlackListNodes() throws Exception { - Configuration conf = new Configuration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, - ResourceScheduler.class); - MockRM rm = new MockRM(conf); - rm.start(); - FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler(); - - String host = "127.0.0.1"; - RMNode node = - MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host); - fs.handle(new NodeAddedSchedulerEvent(node)); - - ApplicationId appId = BuilderUtils.newApplicationId(100, 1); - ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( - appId, 1); - SchedulerEvent appEvent = - new AppAddedSchedulerEvent(appId, "default", - "user"); - fs.handle(appEvent); - SchedulerEvent attemptEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - fs.handle(attemptEvent); - - // Verify the blacklist can be updated independent of requesting containers - fs.allocate(appAttemptId, Collections.emptyList(), - Collections.emptyList(), - Collections.singletonList(host), null); - Assert.assertTrue(fs.getApplicationAttempt(appAttemptId).isBlacklisted(host)); - fs.allocate(appAttemptId, Collections.emptyList(), - Collections.emptyList(), null, - Collections.singletonList(host)); - Assert.assertFalse(fs.getApplicationAttempt(appAttemptId).isBlacklisted(host)); - rm.stop(); - } - - @Test - public void testGetAppsInQueue() throws Exception { - Application application_0 = new Application("user_0", resourceManager); - application_0.submit(); - - Application application_1 = new Application("user_0", resourceManager); - application_1.submit(); - - ResourceScheduler scheduler = resourceManager.getResourceScheduler(); - - List appsInDefault = scheduler.getAppsInQueue("default"); - assertTrue(appsInDefault.contains(application_0.getApplicationAttemptId())); - assertTrue(appsInDefault.contains(application_1.getApplicationAttemptId())); - assertEquals(2, appsInDefault.size()); - - Assert.assertNull(scheduler.getAppsInQueue("someotherqueue")); - } - - @Test - public void testAddAndRemoveAppFromFiFoScheduler() throws Exception { - Configuration conf = new Configuration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, - ResourceScheduler.class); - MockRM rm = new MockRM(conf); - FifoScheduler fs = (FifoScheduler)rm.getResourceScheduler(); - TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(fs.applications, - fs, "queue"); - } - - private void checkApplicationResourceUsage(int expected, - Application application) { - Assert.assertEquals(expected, application.getUsedResources().getMemory()); - } - - private void checkNodeResourceUsage(int expected, - org.apache.hadoop.yarn.server.resourcemanager.NodeManager node) { - Assert.assertEquals(expected, node.getUsed().getMemory()); - node.checkResourceUsage(); - } - - public static void main(String[] arg) throws Exception { - TestFifoScheduler t = new TestFifoScheduler(); - t.setUp(); - t.testFifoScheduler(); - t.tearDown(); - } -}