From d0c3ca05dea4201e70e22ebe41bc004073dbfe30 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Tue, 12 Aug 2014 23:04:06 +0000 Subject: [PATCH] YARN-2399. FairScheduler: Merge AppSchedulable and FSSchedulerApp into FSAppAttempt. (kasha) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1617623 13f79535-47bb-0310-9956-ffa450edef68 --- .../sls/scheduler/FairSchedulerMetrics.java | 8 +- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/AppSchedulable.java | 463 ----------- .../scheduler/fair/FSAppAttempt.java | 768 ++++++++++++++++++ .../scheduler/fair/FSLeafQueue.java | 66 +- .../scheduler/fair/FSQueue.java | 24 +- .../scheduler/fair/FSSchedulerApp.java | 360 -------- .../scheduler/fair/FSSchedulerNode.java | 6 +- .../scheduler/fair/FairScheduler.java | 68 +- .../scheduler/fair/FifoAppComparator.java | 8 +- .../fair/MaxRunningAppsEnforcer.java | 52 +- .../scheduler/fair/NewAppWeightBooster.java | 2 +- .../scheduler/fair/Schedulable.java | 70 +- .../scheduler/fair/WeightAdjuster.java | 2 +- .../webapp/dao/FairSchedulerInfo.java | 3 +- .../dao/FairSchedulerLeafQueueInfo.java | 9 +- .../scheduler/fair/FakeSchedulable.java | 18 +- ...chedulerApp.java => TestFSAppAttempt.java} | 49 +- .../scheduler/fair/TestFSLeafQueue.java | 2 +- .../scheduler/fair/TestFairScheduler.java | 109 ++- .../fair/TestMaxRunningAppsEnforcer.java | 42 +- 21 files changed, 1034 insertions(+), 1098 deletions(-) delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/{TestFSSchedulerApp.java => TestFSAppAttempt.java} (82%) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java index f427dcd557a..d100e1de583 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java @@ -22,10 +22,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair - .AppSchedulable; + .FSAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair - .FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import com.codahale.metrics.Gauge; import org.apache.hadoop.yarn.sls.SLSRunner; @@ -66,8 +65,7 @@ public class FairSchedulerMetrics extends SchedulerMetrics { public void trackApp(ApplicationAttemptId appAttemptId, String oldAppId) { super.trackApp(appAttemptId, oldAppId); FairScheduler fair = (FairScheduler) scheduler; - final AppSchedulable app = fair.getSchedulerApp(appAttemptId) - .getAppSchedulable(); + final FSAppAttempt app = fair.getSchedulerApp(appAttemptId); metrics.register("variable.app." + oldAppId + ".demand.memory", new Gauge() { @Override diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c1b4118ab51..fe330d6e079 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -104,6 +104,9 @@ Release 2.6.0 - UNRELEASED YARN-2317. Updated the document about how to write YARN applications. (Li Lu via zjshen) + YARN-2399. FairScheduler: Merge AppSchedulable and FSSchedulerApp into + FSAppAttempt. (kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java deleted file mode 100644 index 0c36c55892b..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ /dev/null @@ -1,463 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.Collection; -import java.util.Comparator; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; - -@Private -@Unstable -public class AppSchedulable extends Schedulable { - private static final DefaultResourceCalculator RESOURCE_CALCULATOR - = new DefaultResourceCalculator(); - - private FairScheduler scheduler; - private FSSchedulerApp app; - private Resource demand = Resources.createResource(0); - private long startTime; - private static final Log LOG = LogFactory.getLog(AppSchedulable.class); - private FSLeafQueue queue; - private RMContainerTokenSecretManager containerTokenSecretManager; - private Priority priority; - private ResourceWeights resourceWeights; - - private RMContainerComparator comparator = new RMContainerComparator(); - - public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) { - this.scheduler = scheduler; - this.app = app; - this.startTime = scheduler.getClock().getTime(); - this.queue = queue; - this.containerTokenSecretManager = scheduler. - getContainerTokenSecretManager(); - this.priority = Priority.newInstance(1); - this.resourceWeights = new ResourceWeights(); - } - - @Override - public String getName() { - return app.getApplicationId().toString(); - } - - public FSSchedulerApp getApp() { - return app; - } - - public ResourceWeights getResourceWeights() { - return resourceWeights; - } - - @Override - public void updateDemand() { - demand = Resources.createResource(0); - // Demand is current consumption plus outstanding requests - Resources.addTo(demand, app.getCurrentConsumption()); - - // Add up outstanding resource requests - synchronized (app) { - for (Priority p : app.getPriorities()) { - for (ResourceRequest r : app.getResourceRequests(p).values()) { - Resource total = Resources.multiply(r.getCapability(), r.getNumContainers()); - Resources.addTo(demand, total); - } - } - } - } - - @Override - public Resource getDemand() { - return demand; - } - - @Override - public long getStartTime() { - return startTime; - } - - @Override - public Resource getResourceUsage() { - // Here the getPreemptedResources() always return zero, except in - // a preemption round - return Resources.subtract(app.getCurrentConsumption(), - app.getPreemptedResources()); - } - - - @Override - public Resource getMinShare() { - return Resources.none(); - } - - @Override - public Resource getMaxShare() { - return Resources.unbounded(); - } - - /** - * Get metrics reference from containing queue. - */ - public QueueMetrics getMetrics() { - return queue.getMetrics(); - } - - @Override - public ResourceWeights getWeights() { - return scheduler.getAppWeight(this); - } - - @Override - public Priority getPriority() { - // Right now per-app priorities are not passed to scheduler, - // so everyone has the same priority. - return priority; - } - - /** - * Create and return a container object reflecting an allocation for the - * given appliction on the given node with the given capability and - * priority. - */ - public Container createContainer( - FSSchedulerApp application, FSSchedulerNode node, - Resource capability, Priority priority) { - - NodeId nodeId = node.getRMNode().getNodeID(); - ContainerId containerId = BuilderUtils.newContainerId(application - .getApplicationAttemptId(), application.getNewContainerId()); - - // Create the container - Container container = - BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() - .getHttpAddress(), capability, priority, null); - - return container; - } - - /** - * Reserve a spot for {@code container} on this {@code node}. If - * the container is {@code alreadyReserved} on the node, simply - * update relevant bookeeping. This dispatches ro relevant handlers - * in the {@link FSSchedulerNode} and {@link SchedulerApp} classes. - */ - private void reserve(Priority priority, FSSchedulerNode node, - Container container, boolean alreadyReserved) { - LOG.info("Making reservation: node=" + node.getNodeName() + - " app_id=" + app.getApplicationId()); - if (!alreadyReserved) { - getMetrics().reserveResource(app.getUser(), container.getResource()); - RMContainer rmContainer = app.reserve(node, priority, null, - container); - node.reserveResource(app, priority, rmContainer); - } - - else { - RMContainer rmContainer = node.getReservedContainer(); - app.reserve(node, priority, rmContainer, container); - node.reserveResource(app, priority, rmContainer); - } - } - - /** - * Remove the reservation on {@code node} at the given - * {@link Priority}. This dispatches to the SchedulerApp and SchedulerNode - * handlers for an unreservation. - */ - public void unreserve(Priority priority, FSSchedulerNode node) { - RMContainer rmContainer = node.getReservedContainer(); - app.unreserve(node, priority); - node.unreserveResource(app); - getMetrics().unreserveResource( - app.getUser(), rmContainer.getContainer().getResource()); - } - - /** - * Assign a container to this node to facilitate {@code request}. If node does - * not have enough memory, create a reservation. This is called once we are - * sure the particular request should be facilitated by this node. - * - * @param node - * The node to try placing the container on. - * @param priority - * The requested priority for the container. - * @param request - * The ResourceRequest we're trying to satisfy. - * @param type - * The locality of the assignment. - * @param reserved - * Whether there's already a container reserved for this app on the node. - * @return - * If an assignment was made, returns the resources allocated to the - * container. If a reservation was made, returns - * FairScheduler.CONTAINER_RESERVED. If no assignment or reservation was - * made, returns an empty resource. - */ - private Resource assignContainer(FSSchedulerNode node, - ResourceRequest request, NodeType type, - boolean reserved) { - - // How much does this request need? - Resource capability = request.getCapability(); - - // How much does the node have? - Resource available = node.getAvailableResource(); - - Container container = null; - if (reserved) { - container = node.getReservedContainer().getContainer(); - } else { - container = createContainer(app, node, capability, request.getPriority()); - } - - // Can we allocate a container on this node? - if (Resources.fitsIn(capability, available)) { - // Inform the application of the new container for this request - RMContainer allocatedContainer = - app.allocate(type, node, request.getPriority(), request, container); - if (allocatedContainer == null) { - // Did the application need this resource? - if (reserved) { - unreserve(request.getPriority(), node); - } - return Resources.none(); - } - - // If we had previously made a reservation, delete it - if (reserved) { - unreserve(request.getPriority(), node); - } - - // Inform the node - node.allocateContainer(allocatedContainer); - - // If this container is used to run AM, update the leaf queue's AM usage - if (app.getLiveContainers().size() == 1 && - !app.getUnmanagedAM()) { - queue.addAMResourceUsage(container.getResource()); - app.setAmRunning(true); - } - - return container.getResource(); - } else { - // The desired container won't fit here, so reserve - reserve(request.getPriority(), node, container, reserved); - - return FairScheduler.CONTAINER_RESERVED; - } - } - - private Resource assignContainer(FSSchedulerNode node, boolean reserved) { - if (LOG.isDebugEnabled()) { - LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved); - } - - Collection prioritiesToTry = (reserved) ? - Arrays.asList(node.getReservedContainer().getReservedPriority()) : - app.getPriorities(); - - // For each priority, see if we can schedule a node local, rack local - // or off-switch request. Rack of off-switch requests may be delayed - // (not scheduled) in order to promote better locality. - synchronized (app) { - for (Priority priority : prioritiesToTry) { - if (app.getTotalRequiredResources(priority) <= 0 || - !hasContainerForNode(priority, node)) { - continue; - } - - app.addSchedulingOpportunity(priority); - - // Check the AM resource usage for the leaf queue - if (app.getLiveContainers().size() == 0 - && !app.getUnmanagedAM()) { - if (!queue.canRunAppAM(app.getAMResource())) { - return Resources.none(); - } - } - - ResourceRequest rackLocalRequest = app.getResourceRequest(priority, - node.getRackName()); - ResourceRequest localRequest = app.getResourceRequest(priority, - node.getNodeName()); - - if (localRequest != null && !localRequest.getRelaxLocality()) { - LOG.warn("Relax locality off is not supported on local request: " - + localRequest); - } - - NodeType allowedLocality; - if (scheduler.isContinuousSchedulingEnabled()) { - allowedLocality = app.getAllowedLocalityLevelByTime(priority, - scheduler.getNodeLocalityDelayMs(), - scheduler.getRackLocalityDelayMs(), - scheduler.getClock().getTime()); - } else { - allowedLocality = app.getAllowedLocalityLevel(priority, - scheduler.getNumClusterNodes(), - scheduler.getNodeLocalityThreshold(), - scheduler.getRackLocalityThreshold()); - } - - if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 - && localRequest != null && localRequest.getNumContainers() != 0) { - return assignContainer(node, localRequest, - NodeType.NODE_LOCAL, reserved); - } - - if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) { - continue; - } - - if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 - && (allowedLocality.equals(NodeType.RACK_LOCAL) || - allowedLocality.equals(NodeType.OFF_SWITCH))) { - return assignContainer(node, rackLocalRequest, - NodeType.RACK_LOCAL, reserved); - } - - ResourceRequest offSwitchRequest = app.getResourceRequest(priority, - ResourceRequest.ANY); - if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) { - continue; - } - - if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0 - && allowedLocality.equals(NodeType.OFF_SWITCH)) { - return assignContainer(node, offSwitchRequest, - NodeType.OFF_SWITCH, reserved); - } - } - } - return Resources.none(); - } - - /** - * Called when this application already has an existing reservation on the - * given node. Sees whether we can turn the reservation into an allocation. - * Also checks whether the application needs the reservation anymore, and - * releases it if not. - * - * @param node - * Node that the application has an existing reservation on - */ - public Resource assignReservedContainer(FSSchedulerNode node) { - RMContainer rmContainer = node.getReservedContainer(); - Priority priority = rmContainer.getReservedPriority(); - - // Make sure the application still needs requests at this priority - if (app.getTotalRequiredResources(priority) == 0) { - unreserve(priority, node); - return Resources.none(); - } - - // Fail early if the reserved container won't fit. - // Note that we have an assumption here that there's only one container size - // per priority. - if (!Resources.fitsIn(node.getReservedContainer().getReservedResource(), - node.getAvailableResource())) { - return Resources.none(); - } - - return assignContainer(node, true); - } - - @Override - public Resource assignContainer(FSSchedulerNode node) { - return assignContainer(node, false); - } - - /** - * Preempt a running container according to the priority - */ - @Override - public RMContainer preemptContainer() { - if (LOG.isDebugEnabled()) { - LOG.debug("App " + getName() + " is going to preempt a running " + - "container"); - } - - RMContainer toBePreempted = null; - for (RMContainer container : app.getLiveContainers()) { - if (! app.getPreemptionContainers().contains(container) && - (toBePreempted == null || - comparator.compare(toBePreempted, container) > 0)) { - toBePreempted = container; - } - } - return toBePreempted; - } - - /** - * Whether this app has containers requests that could be satisfied on the - * given node, if the node had full space. - */ - public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) { - ResourceRequest anyRequest = app.getResourceRequest(prio, ResourceRequest.ANY); - ResourceRequest rackRequest = app.getResourceRequest(prio, node.getRackName()); - ResourceRequest nodeRequest = app.getResourceRequest(prio, node.getNodeName()); - - return - // There must be outstanding requests at the given priority: - anyRequest != null && anyRequest.getNumContainers() > 0 && - // If locality relaxation is turned off at *-level, there must be a - // non-zero request for the node's rack: - (anyRequest.getRelaxLocality() || - (rackRequest != null && rackRequest.getNumContainers() > 0)) && - // If locality relaxation is turned off at rack-level, there must be a - // non-zero request at the node: - (rackRequest == null || rackRequest.getRelaxLocality() || - (nodeRequest != null && nodeRequest.getNumContainers() > 0)) && - // The requested container must be able to fit on the node: - Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null, - anyRequest.getCapability(), node.getRMNode().getTotalCapability()); - } - - static class RMContainerComparator implements Comparator, - Serializable { - @Override - public int compare(RMContainer c1, RMContainer c2) { - int ret = c1.getContainer().getPriority().compareTo( - c2.getContainer().getPriority()); - if (ret == 0) { - return c2.getContainerId().compareTo(c1.getContainerId()); - } - return ret; - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java new file mode 100644 index 00000000000..eb6f6413893 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -0,0 +1,768 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * Represents an application attempt from the viewpoint of the Fair Scheduler. + */ +@Private +@Unstable +public class FSAppAttempt extends SchedulerApplicationAttempt + implements Schedulable { + + private static final Log LOG = LogFactory.getLog(FSAppAttempt.class); + private static final DefaultResourceCalculator RESOURCE_CALCULATOR + = new DefaultResourceCalculator(); + + private long startTime; + private Priority priority; + private ResourceWeights resourceWeights; + private Resource demand = Resources.createResource(0); + private FairScheduler scheduler; + private Resource fairShare = Resources.createResource(0, 0); + private Resource preemptedResources = Resources.createResource(0); + private RMContainerComparator comparator = new RMContainerComparator(); + private final Map preemptionMap = new HashMap(); + + /** + * Delay scheduling: We often want to prioritize scheduling of node-local + * containers over rack-local or off-switch containers. To acheive this + * we first only allow node-local assigments for a given prioirty level, + * then relax the locality threshold once we've had a long enough period + * without succesfully scheduling. We measure both the number of "missed" + * scheduling opportunities since the last container was scheduled + * at the current allowed level and the time since the last container + * was scheduled. Currently we use only the former. + */ + private final Map allowedLocalityLevel = + new HashMap(); + + public FSAppAttempt(FairScheduler scheduler, + ApplicationAttemptId applicationAttemptId, String user, FSLeafQueue queue, + ActiveUsersManager activeUsersManager, RMContext rmContext) { + super(applicationAttemptId, user, queue, activeUsersManager, rmContext); + + this.scheduler = scheduler; + this.startTime = scheduler.getClock().getTime(); + this.priority = Priority.newInstance(1); + this.resourceWeights = new ResourceWeights(); + } + + public ResourceWeights getResourceWeights() { + return resourceWeights; + } + + /** + * Get metrics reference from containing queue. + */ + public QueueMetrics getMetrics() { + return queue.getMetrics(); + } + + synchronized public void containerCompleted(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event) { + + Container container = rmContainer.getContainer(); + ContainerId containerId = container.getId(); + + // Remove from the list of newly allocated containers if found + newlyAllocatedContainers.remove(rmContainer); + + // Inform the container + rmContainer.handle( + new RMContainerFinishedEvent( + containerId, + containerStatus, + event) + ); + LOG.info("Completed container: " + rmContainer.getContainerId() + + " in state: " + rmContainer.getState() + " event:" + event); + + // Remove from the list of containers + liveContainers.remove(rmContainer.getContainerId()); + + RMAuditLogger.logSuccess(getUser(), + AuditConstants.RELEASE_CONTAINER, "SchedulerApp", + getApplicationId(), containerId); + + // Update usage metrics + Resource containerResource = rmContainer.getContainer().getResource(); + queue.getMetrics().releaseResources(getUser(), 1, containerResource); + Resources.subtractFrom(currentConsumption, containerResource); + + // remove from preemption map if it is completed + preemptionMap.remove(rmContainer); + } + + private synchronized void unreserveInternal( + Priority priority, FSSchedulerNode node) { + Map reservedContainers = + this.reservedContainers.get(priority); + RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); + if (reservedContainers.isEmpty()) { + this.reservedContainers.remove(priority); + } + + // Reset the re-reservation count + resetReReservations(priority); + + Resource resource = reservedContainer.getContainer().getResource(); + Resources.subtractFrom(currentReservation, resource); + + LOG.info("Application " + getApplicationId() + " unreserved " + " on node " + + node + ", currently has " + reservedContainers.size() + " at priority " + + priority + "; currentReservation " + currentReservation); + } + + public synchronized float getLocalityWaitFactor( + Priority priority, int clusterNodes) { + // Estimate: Required unique resources (i.e. hosts + racks) + int requiredResources = + Math.max(this.getResourceRequests(priority).size() - 1, 0); + + // waitFactor can't be more than '1' + // i.e. no point skipping more than clustersize opportunities + return Math.min(((float)requiredResources / clusterNodes), 1.0f); + } + + /** + * Return the level at which we are allowed to schedule containers, given the + * current size of the cluster and thresholds indicating how many nodes to + * fail at (as a fraction of cluster size) before relaxing scheduling + * constraints. + */ + public synchronized NodeType getAllowedLocalityLevel(Priority priority, + int numNodes, double nodeLocalityThreshold, double rackLocalityThreshold) { + // upper limit on threshold + if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; } + if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; } + + // If delay scheduling is not being used, can schedule anywhere + if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) { + return NodeType.OFF_SWITCH; + } + + // Default level is NODE_LOCAL + if (!allowedLocalityLevel.containsKey(priority)) { + allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL); + return NodeType.NODE_LOCAL; + } + + NodeType allowed = allowedLocalityLevel.get(priority); + + // If level is already most liberal, we're done + if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH; + + double threshold = allowed.equals(NodeType.NODE_LOCAL) ? nodeLocalityThreshold : + rackLocalityThreshold; + + // Relax locality constraints once we've surpassed threshold. + if (getSchedulingOpportunities(priority) > (numNodes * threshold)) { + if (allowed.equals(NodeType.NODE_LOCAL)) { + allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL); + resetSchedulingOpportunities(priority); + } + else if (allowed.equals(NodeType.RACK_LOCAL)) { + allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH); + resetSchedulingOpportunities(priority); + } + } + return allowedLocalityLevel.get(priority); + } + + /** + * Return the level at which we are allowed to schedule containers. + * Given the thresholds indicating how much time passed before relaxing + * scheduling constraints. + */ + public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority, + long nodeLocalityDelayMs, long rackLocalityDelayMs, + long currentTimeMs) { + + // if not being used, can schedule anywhere + if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) { + return NodeType.OFF_SWITCH; + } + + // default level is NODE_LOCAL + if (! allowedLocalityLevel.containsKey(priority)) { + allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL); + return NodeType.NODE_LOCAL; + } + + NodeType allowed = allowedLocalityLevel.get(priority); + + // if level is already most liberal, we're done + if (allowed.equals(NodeType.OFF_SWITCH)) { + return NodeType.OFF_SWITCH; + } + + // check waiting time + long waitTime = currentTimeMs; + if (lastScheduledContainer.containsKey(priority)) { + waitTime -= lastScheduledContainer.get(priority); + } else { + waitTime -= getStartTime(); + } + + long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ? + nodeLocalityDelayMs : rackLocalityDelayMs; + + if (waitTime > thresholdTime) { + if (allowed.equals(NodeType.NODE_LOCAL)) { + allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL); + resetSchedulingOpportunities(priority, currentTimeMs); + } else if (allowed.equals(NodeType.RACK_LOCAL)) { + allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH); + resetSchedulingOpportunities(priority, currentTimeMs); + } + } + return allowedLocalityLevel.get(priority); + } + + synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, + Priority priority, ResourceRequest request, + Container container) { + // Update allowed locality level + NodeType allowed = allowedLocalityLevel.get(priority); + if (allowed != null) { + if (allowed.equals(NodeType.OFF_SWITCH) && + (type.equals(NodeType.NODE_LOCAL) || + type.equals(NodeType.RACK_LOCAL))) { + this.resetAllowedLocalityLevel(priority, type); + } + else if (allowed.equals(NodeType.RACK_LOCAL) && + type.equals(NodeType.NODE_LOCAL)) { + this.resetAllowedLocalityLevel(priority, type); + } + } + + // Required sanity check - AM can call 'allocate' to update resource + // request without locking the scheduler, hence we need to check + if (getTotalRequiredResources(priority) <= 0) { + return null; + } + + // Create RMContainer + RMContainer rmContainer = new RMContainerImpl(container, + getApplicationAttemptId(), node.getNodeID(), + appSchedulingInfo.getUser(), rmContext); + + // Add it to allContainers list. + newlyAllocatedContainers.add(rmContainer); + liveContainers.put(container.getId(), rmContainer); + + // Update consumption and track allocations + List resourceRequestList = appSchedulingInfo.allocate( + type, node, priority, request, container); + Resources.addTo(currentConsumption, container.getResource()); + + // Update resource requests related to "request" and store in RMContainer + ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); + + // Inform the container + rmContainer.handle( + new RMContainerEvent(container.getId(), RMContainerEventType.START)); + + if (LOG.isDebugEnabled()) { + LOG.debug("allocate: applicationAttemptId=" + + container.getId().getApplicationAttemptId() + + " container=" + container.getId() + " host=" + + container.getNodeId().getHost() + " type=" + type); + } + RMAuditLogger.logSuccess(getUser(), + AuditConstants.ALLOC_CONTAINER, "SchedulerApp", + getApplicationId(), container.getId()); + + return rmContainer; + } + + /** + * Should be called when the scheduler assigns a container at a higher + * degree of locality than the current threshold. Reset the allowed locality + * level to a higher degree of locality. + */ + public synchronized void resetAllowedLocalityLevel(Priority priority, + NodeType level) { + NodeType old = allowedLocalityLevel.get(priority); + LOG.info("Raising locality level from " + old + " to " + level + " at " + + " priority " + priority); + allowedLocalityLevel.put(priority, level); + } + + // related methods + public void addPreemption(RMContainer container, long time) { + assert preemptionMap.get(container) == null; + preemptionMap.put(container, time); + Resources.addTo(preemptedResources, container.getAllocatedResource()); + } + + public Long getContainerPreemptionTime(RMContainer container) { + return preemptionMap.get(container); + } + + public Set getPreemptionContainers() { + return preemptionMap.keySet(); + } + + @Override + public FSLeafQueue getQueue() { + return (FSLeafQueue)super.getQueue(); + } + + public Resource getPreemptedResources() { + return preemptedResources; + } + + public void resetPreemptedResources() { + preemptedResources = Resources.createResource(0); + for (RMContainer container : getPreemptionContainers()) { + Resources.addTo(preemptedResources, container.getAllocatedResource()); + } + } + + public void clearPreemptedResources() { + preemptedResources.setMemory(0); + preemptedResources.setVirtualCores(0); + } + + /** + * Create and return a container object reflecting an allocation for the + * given appliction on the given node with the given capability and + * priority. + */ + public Container createContainer( + FSSchedulerNode node, Resource capability, Priority priority) { + + NodeId nodeId = node.getRMNode().getNodeID(); + ContainerId containerId = BuilderUtils.newContainerId( + getApplicationAttemptId(), getNewContainerId()); + + // Create the container + Container container = + BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() + .getHttpAddress(), capability, priority, null); + + return container; + } + + /** + * Reserve a spot for {@code container} on this {@code node}. If + * the container is {@code alreadyReserved} on the node, simply + * update relevant bookeeping. This dispatches ro relevant handlers + * in {@link FSSchedulerNode}.. + */ + private void reserve(Priority priority, FSSchedulerNode node, + Container container, boolean alreadyReserved) { + LOG.info("Making reservation: node=" + node.getNodeName() + + " app_id=" + getApplicationId()); + + if (!alreadyReserved) { + getMetrics().reserveResource(getUser(), container.getResource()); + RMContainer rmContainer = + super.reserve(node, priority, null, container); + node.reserveResource(this, priority, rmContainer); + } else { + RMContainer rmContainer = node.getReservedContainer(); + super.reserve(node, priority, rmContainer, container); + node.reserveResource(this, priority, rmContainer); + } + } + + /** + * Remove the reservation on {@code node} at the given {@link Priority}. + * This dispatches SchedulerNode handlers as well. + */ + public void unreserve(Priority priority, FSSchedulerNode node) { + RMContainer rmContainer = node.getReservedContainer(); + unreserveInternal(priority, node); + node.unreserveResource(this); + getMetrics().unreserveResource( + getUser(), rmContainer.getContainer().getResource()); + } + + /** + * Assign a container to this node to facilitate {@code request}. If node does + * not have enough memory, create a reservation. This is called once we are + * sure the particular request should be facilitated by this node. + * + * @param node + * The node to try placing the container on. + * @param request + * The ResourceRequest we're trying to satisfy. + * @param type + * The locality of the assignment. + * @param reserved + * Whether there's already a container reserved for this app on the node. + * @return + * If an assignment was made, returns the resources allocated to the + * container. If a reservation was made, returns + * FairScheduler.CONTAINER_RESERVED. If no assignment or reservation was + * made, returns an empty resource. + */ + private Resource assignContainer( + FSSchedulerNode node, ResourceRequest request, NodeType type, + boolean reserved) { + + // How much does this request need? + Resource capability = request.getCapability(); + + // How much does the node have? + Resource available = node.getAvailableResource(); + + Container container = null; + if (reserved) { + container = node.getReservedContainer().getContainer(); + } else { + container = createContainer(node, capability, request.getPriority()); + } + + // Can we allocate a container on this node? + if (Resources.fitsIn(capability, available)) { + // Inform the application of the new container for this request + RMContainer allocatedContainer = + allocate(type, node, request.getPriority(), request, container); + if (allocatedContainer == null) { + // Did the application need this resource? + if (reserved) { + unreserve(request.getPriority(), node); + } + return Resources.none(); + } + + // If we had previously made a reservation, delete it + if (reserved) { + unreserve(request.getPriority(), node); + } + + // Inform the node + node.allocateContainer(allocatedContainer); + + // If this container is used to run AM, update the leaf queue's AM usage + if (getLiveContainers().size() == 1 && !getUnmanagedAM()) { + getQueue().addAMResourceUsage(container.getResource()); + setAmRunning(true); + } + + return container.getResource(); + } else { + // The desired container won't fit here, so reserve + reserve(request.getPriority(), node, container, reserved); + + return FairScheduler.CONTAINER_RESERVED; + } + } + + private Resource assignContainer(FSSchedulerNode node, boolean reserved) { + if (LOG.isDebugEnabled()) { + LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved); + } + + Collection prioritiesToTry = (reserved) ? + Arrays.asList(node.getReservedContainer().getReservedPriority()) : + getPriorities(); + + // For each priority, see if we can schedule a node local, rack local + // or off-switch request. Rack of off-switch requests may be delayed + // (not scheduled) in order to promote better locality. + synchronized (this) { + for (Priority priority : prioritiesToTry) { + if (getTotalRequiredResources(priority) <= 0 || + !hasContainerForNode(priority, node)) { + continue; + } + + addSchedulingOpportunity(priority); + + // Check the AM resource usage for the leaf queue + if (getLiveContainers().size() == 0 && !getUnmanagedAM()) { + if (!getQueue().canRunAppAM(getAMResource())) { + return Resources.none(); + } + } + + ResourceRequest rackLocalRequest = getResourceRequest(priority, + node.getRackName()); + ResourceRequest localRequest = getResourceRequest(priority, + node.getNodeName()); + + if (localRequest != null && !localRequest.getRelaxLocality()) { + LOG.warn("Relax locality off is not supported on local request: " + + localRequest); + } + + NodeType allowedLocality; + if (scheduler.isContinuousSchedulingEnabled()) { + allowedLocality = getAllowedLocalityLevelByTime(priority, + scheduler.getNodeLocalityDelayMs(), + scheduler.getRackLocalityDelayMs(), + scheduler.getClock().getTime()); + } else { + allowedLocality = getAllowedLocalityLevel(priority, + scheduler.getNumClusterNodes(), + scheduler.getNodeLocalityThreshold(), + scheduler.getRackLocalityThreshold()); + } + + if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 + && localRequest != null && localRequest.getNumContainers() != 0) { + return assignContainer(node, localRequest, + NodeType.NODE_LOCAL, reserved); + } + + if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) { + continue; + } + + if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 + && (allowedLocality.equals(NodeType.RACK_LOCAL) || + allowedLocality.equals(NodeType.OFF_SWITCH))) { + return assignContainer(node, rackLocalRequest, + NodeType.RACK_LOCAL, reserved); + } + + ResourceRequest offSwitchRequest = + getResourceRequest(priority, ResourceRequest.ANY); + if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) { + continue; + } + + if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0 + && allowedLocality.equals(NodeType.OFF_SWITCH)) { + return assignContainer(node, offSwitchRequest, + NodeType.OFF_SWITCH, reserved); + } + } + } + return Resources.none(); + } + + /** + * Called when this application already has an existing reservation on the + * given node. Sees whether we can turn the reservation into an allocation. + * Also checks whether the application needs the reservation anymore, and + * releases it if not. + * + * @param node + * Node that the application has an existing reservation on + */ + public Resource assignReservedContainer(FSSchedulerNode node) { + RMContainer rmContainer = node.getReservedContainer(); + Priority priority = rmContainer.getReservedPriority(); + + // Make sure the application still needs requests at this priority + if (getTotalRequiredResources(priority) == 0) { + unreserve(priority, node); + return Resources.none(); + } + + // Fail early if the reserved container won't fit. + // Note that we have an assumption here that there's only one container size + // per priority. + if (!Resources.fitsIn(node.getReservedContainer().getReservedResource(), + node.getAvailableResource())) { + return Resources.none(); + } + + return assignContainer(node, true); + } + + + /** + * Whether this app has containers requests that could be satisfied on the + * given node, if the node had full space. + */ + public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) { + ResourceRequest anyRequest = getResourceRequest(prio, ResourceRequest.ANY); + ResourceRequest rackRequest = getResourceRequest(prio, node.getRackName()); + ResourceRequest nodeRequest = getResourceRequest(prio, node.getNodeName()); + + return + // There must be outstanding requests at the given priority: + anyRequest != null && anyRequest.getNumContainers() > 0 && + // If locality relaxation is turned off at *-level, there must be a + // non-zero request for the node's rack: + (anyRequest.getRelaxLocality() || + (rackRequest != null && rackRequest.getNumContainers() > 0)) && + // If locality relaxation is turned off at rack-level, there must be a + // non-zero request at the node: + (rackRequest == null || rackRequest.getRelaxLocality() || + (nodeRequest != null && nodeRequest.getNumContainers() > 0)) && + // The requested container must be able to fit on the node: + Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null, + anyRequest.getCapability(), node.getRMNode().getTotalCapability()); + } + + + static class RMContainerComparator implements Comparator, + Serializable { + @Override + public int compare(RMContainer c1, RMContainer c2) { + int ret = c1.getContainer().getPriority().compareTo( + c2.getContainer().getPriority()); + if (ret == 0) { + return c2.getContainerId().compareTo(c1.getContainerId()); + } + return ret; + } + } + + /* Schedulable methods implementation */ + + @Override + public String getName() { + return getApplicationId().toString(); + } + + @Override + public Resource getDemand() { + return demand; + } + + @Override + public long getStartTime() { + return startTime; + } + + @Override + public Resource getMinShare() { + return Resources.none(); + } + + @Override + public Resource getMaxShare() { + return Resources.unbounded(); + } + + @Override + public Resource getResourceUsage() { + // Here the getPreemptedResources() always return zero, except in + // a preemption round + return Resources.subtract(getCurrentConsumption(), getPreemptedResources()); + } + + @Override + public ResourceWeights getWeights() { + return scheduler.getAppWeight(this); + } + + @Override + public Priority getPriority() { + // Right now per-app priorities are not passed to scheduler, + // so everyone has the same priority. + return priority; + } + + @Override + public Resource getFairShare() { + return this.fairShare; + } + + @Override + public void setFairShare(Resource fairShare) { + this.fairShare = fairShare; + } + + @Override + public boolean isActive() { + return true; + } + + + @Override + public void updateDemand() { + demand = Resources.createResource(0); + // Demand is current consumption plus outstanding requests + Resources.addTo(demand, getCurrentConsumption()); + + // Add up outstanding resource requests + synchronized (this) { + for (Priority p : getPriorities()) { + for (ResourceRequest r : getResourceRequests(p).values()) { + Resource total = Resources.multiply(r.getCapability(), r.getNumContainers()); + Resources.addTo(demand, total); + } + } + } + } + + @Override + public Resource assignContainer(FSSchedulerNode node) { + return assignContainer(node, false); + } + + /** + * Preempt a running container according to the priority + */ + @Override + public RMContainer preemptContainer() { + if (LOG.isDebugEnabled()) { + LOG.debug("App " + getName() + " is going to preempt a running " + + "container"); + } + + RMContainer toBePreempted = null; + for (RMContainer container : getLiveContainers()) { + if (!getPreemptionContainers().contains(container) && + (toBePreempted == null || + comparator.compare(toBePreempted, container) > 0)) { + toBePreempted = container; + } + } + return toBePreempted; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 3b3f6ce2296..49e8ef06122 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -44,11 +44,11 @@ import org.apache.hadoop.yarn.util.resource.Resources; public class FSLeafQueue extends FSQueue { private static final Log LOG = LogFactory.getLog( FSLeafQueue.class.getName()); - - private final List runnableAppScheds = // apps that are runnable - new ArrayList(); - private final List nonRunnableAppScheds = - new ArrayList(); + + private final List runnableApps = // apps that are runnable + new ArrayList(); + private final List nonRunnableApps = + new ArrayList(); private Resource demand = Resources.createResource(0); @@ -70,33 +70,31 @@ public class FSLeafQueue extends FSQueue { amResourceUsage = Resource.newInstance(0, 0); } - public void addApp(FSSchedulerApp app, boolean runnable) { - AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this); - app.setAppSchedulable(appSchedulable); + public void addApp(FSAppAttempt app, boolean runnable) { if (runnable) { - runnableAppScheds.add(appSchedulable); + runnableApps.add(app); } else { - nonRunnableAppScheds.add(appSchedulable); + nonRunnableApps.add(app); } } // for testing - void addAppSchedulable(AppSchedulable appSched) { - runnableAppScheds.add(appSched); + void addAppSchedulable(FSAppAttempt appSched) { + runnableApps.add(appSched); } /** * Removes the given app from this queue. * @return whether or not the app was runnable */ - public boolean removeApp(FSSchedulerApp app) { - if (runnableAppScheds.remove(app.getAppSchedulable())) { + public boolean removeApp(FSAppAttempt app) { + if (runnableApps.remove(app)) { // Update AM resource usage if (app.isAmRunning() && app.getAMResource() != null) { Resources.subtractFrom(amResourceUsage, app.getAMResource()); } return true; - } else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) { + } else if (nonRunnableApps.remove(app)) { return false; } else { throw new IllegalStateException("Given app to remove " + app + @@ -104,22 +102,22 @@ public class FSLeafQueue extends FSQueue { } } - public Collection getRunnableAppSchedulables() { - return runnableAppScheds; + public Collection getRunnableAppSchedulables() { + return runnableApps; } - public List getNonRunnableAppSchedulables() { - return nonRunnableAppScheds; + public List getNonRunnableAppSchedulables() { + return nonRunnableApps; } @Override public void collectSchedulerApplications( Collection apps) { - for (AppSchedulable appSched : runnableAppScheds) { - apps.add(appSched.getApp().getApplicationAttemptId()); + for (FSAppAttempt appSched : runnableApps) { + apps.add(appSched.getApplicationAttemptId()); } - for (AppSchedulable appSched : nonRunnableAppScheds) { - apps.add(appSched.getApp().getApplicationAttemptId()); + for (FSAppAttempt appSched : nonRunnableApps) { + apps.add(appSched.getApplicationAttemptId()); } } @@ -145,10 +143,10 @@ public class FSLeafQueue extends FSQueue { @Override public Resource getResourceUsage() { Resource usage = Resources.createResource(0); - for (AppSchedulable app : runnableAppScheds) { + for (FSAppAttempt app : runnableApps) { Resources.addTo(usage, app.getResourceUsage()); } - for (AppSchedulable app : nonRunnableAppScheds) { + for (FSAppAttempt app : nonRunnableApps) { Resources.addTo(usage, app.getResourceUsage()); } return usage; @@ -165,13 +163,13 @@ public class FSLeafQueue extends FSQueue { Resource maxRes = scheduler.getAllocationConfiguration() .getMaxResources(getName()); demand = Resources.createResource(0); - for (AppSchedulable sched : runnableAppScheds) { + for (FSAppAttempt sched : runnableApps) { if (Resources.equals(demand, maxRes)) { break; } updateDemandForApp(sched, maxRes); } - for (AppSchedulable sched : nonRunnableAppScheds) { + for (FSAppAttempt sched : nonRunnableApps) { if (Resources.equals(demand, maxRes)) { break; } @@ -183,7 +181,7 @@ public class FSLeafQueue extends FSQueue { } } - private void updateDemandForApp(AppSchedulable sched, Resource maxRes) { + private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) { sched.updateDemand(); Resource toAdd = sched.getDemand(); if (LOG.isDebugEnabled()) { @@ -207,9 +205,9 @@ public class FSLeafQueue extends FSQueue { } Comparator comparator = policy.getComparator(); - Collections.sort(runnableAppScheds, comparator); - for (AppSchedulable sched : runnableAppScheds) { - if (SchedulerAppUtils.isBlacklisted(sched.getApp(), node, LOG)) { + Collections.sort(runnableApps, comparator); + for (FSAppAttempt sched : runnableApps) { + if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) { continue; } @@ -237,8 +235,8 @@ public class FSLeafQueue extends FSQueue { // Choose the app that is most over fair share Comparator comparator = policy.getComparator(); - AppSchedulable candidateSched = null; - for (AppSchedulable sched : runnableAppScheds) { + FSAppAttempt candidateSched = null; + for (FSAppAttempt sched : runnableApps) { if (candidateSched == null || comparator.compare(sched, candidateSched) > 0) { candidateSched = sched; @@ -291,7 +289,7 @@ public class FSLeafQueue extends FSQueue { @Override public int getNumRunnableApps() { - return runnableAppScheds.size(); + return runnableApps.size(); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 1e94046100a..c071c73321d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -39,7 +39,8 @@ import org.apache.hadoop.yarn.util.resource.Resources; @Private @Unstable -public abstract class FSQueue extends Schedulable implements Queue { +public abstract class FSQueue implements Queue, Schedulable { + private Resource fairShare = Resources.createResource(0, 0); private final String name; protected final FairScheduler scheduler; private final FSQueueMetrics metrics; @@ -139,10 +140,15 @@ public abstract class FSQueue extends Schedulable implements Queue { public FSQueueMetrics getMetrics() { return metrics; } - + + /** Get the fair share assigned to this Schedulable. */ + public Resource getFairShare() { + return fairShare; + } + @Override public void setFairShare(Resource fairShare) { - super.setFairShare(fairShare); + this.fairShare = fairShare; metrics.setFairShare(fairShare); } @@ -187,4 +193,16 @@ public abstract class FSQueue extends Schedulable implements Queue { } return true; } + + @Override + public boolean isActive() { + return getNumRunnableApps() > 0; + } + + /** Convenient toString implementation for debugging. */ + @Override + public String toString() { + return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]", + getName(), getDemand(), getResourceUsage(), fairShare, getWeights()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java deleted file mode 100644 index 20cf3952d2d..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ /dev/null @@ -1,360 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.util.resource.Resources; - -/** - * Represents an application attempt from the viewpoint of the Fair Scheduler. - */ -@Private -@Unstable -public class FSSchedulerApp extends SchedulerApplicationAttempt { - - private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class); - - private AppSchedulable appSchedulable; - - final Map preemptionMap = new HashMap(); - - private Resource preemptedResources = Resources.createResource(0); - - public FSSchedulerApp(ApplicationAttemptId applicationAttemptId, - String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager, - RMContext rmContext) { - super(applicationAttemptId, user, queue, activeUsersManager, rmContext); - } - - public void setAppSchedulable(AppSchedulable appSchedulable) { - this.appSchedulable = appSchedulable; - } - - public AppSchedulable getAppSchedulable() { - return appSchedulable; - } - - synchronized public void containerCompleted(RMContainer rmContainer, - ContainerStatus containerStatus, RMContainerEventType event) { - - Container container = rmContainer.getContainer(); - ContainerId containerId = container.getId(); - - // Remove from the list of newly allocated containers if found - newlyAllocatedContainers.remove(rmContainer); - - // Inform the container - rmContainer.handle( - new RMContainerFinishedEvent( - containerId, - containerStatus, - event) - ); - LOG.info("Completed container: " + rmContainer.getContainerId() + - " in state: " + rmContainer.getState() + " event:" + event); - - // Remove from the list of containers - liveContainers.remove(rmContainer.getContainerId()); - - RMAuditLogger.logSuccess(getUser(), - AuditConstants.RELEASE_CONTAINER, "SchedulerApp", - getApplicationId(), containerId); - - // Update usage metrics - Resource containerResource = rmContainer.getContainer().getResource(); - queue.getMetrics().releaseResources(getUser(), 1, containerResource); - Resources.subtractFrom(currentConsumption, containerResource); - - // remove from preemption map if it is completed - preemptionMap.remove(rmContainer); - } - - public synchronized void unreserve(FSSchedulerNode node, Priority priority) { - Map reservedContainers = - this.reservedContainers.get(priority); - RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); - if (reservedContainers.isEmpty()) { - this.reservedContainers.remove(priority); - } - - // Reset the re-reservation count - resetReReservations(priority); - - Resource resource = reservedContainer.getContainer().getResource(); - Resources.subtractFrom(currentReservation, resource); - - LOG.info("Application " + getApplicationId() + " unreserved " + " on node " - + node + ", currently has " + reservedContainers.size() + " at priority " - + priority + "; currentReservation " + currentReservation); - } - - public synchronized float getLocalityWaitFactor( - Priority priority, int clusterNodes) { - // Estimate: Required unique resources (i.e. hosts + racks) - int requiredResources = - Math.max(this.getResourceRequests(priority).size() - 1, 0); - - // waitFactor can't be more than '1' - // i.e. no point skipping more than clustersize opportunities - return Math.min(((float)requiredResources / clusterNodes), 1.0f); - } - - /** - * Delay scheduling: We often want to prioritize scheduling of node-local - * containers over rack-local or off-switch containers. To acheive this - * we first only allow node-local assigments for a given prioirty level, - * then relax the locality threshold once we've had a long enough period - * without succesfully scheduling. We measure both the number of "missed" - * scheduling opportunities since the last container was scheduled - * at the current allowed level and the time since the last container - * was scheduled. Currently we use only the former. - */ - - // Current locality threshold - final Map allowedLocalityLevel = new HashMap< - Priority, NodeType>(); - - /** - * Return the level at which we are allowed to schedule containers, given the - * current size of the cluster and thresholds indicating how many nodes to - * fail at (as a fraction of cluster size) before relaxing scheduling - * constraints. - */ - public synchronized NodeType getAllowedLocalityLevel(Priority priority, - int numNodes, double nodeLocalityThreshold, double rackLocalityThreshold) { - // upper limit on threshold - if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; } - if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; } - - // If delay scheduling is not being used, can schedule anywhere - if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) { - return NodeType.OFF_SWITCH; - } - - // Default level is NODE_LOCAL - if (!allowedLocalityLevel.containsKey(priority)) { - allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL); - return NodeType.NODE_LOCAL; - } - - NodeType allowed = allowedLocalityLevel.get(priority); - - // If level is already most liberal, we're done - if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH; - - double threshold = allowed.equals(NodeType.NODE_LOCAL) ? nodeLocalityThreshold : - rackLocalityThreshold; - - // Relax locality constraints once we've surpassed threshold. - if (getSchedulingOpportunities(priority) > (numNodes * threshold)) { - if (allowed.equals(NodeType.NODE_LOCAL)) { - allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL); - resetSchedulingOpportunities(priority); - } - else if (allowed.equals(NodeType.RACK_LOCAL)) { - allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH); - resetSchedulingOpportunities(priority); - } - } - return allowedLocalityLevel.get(priority); - } - - /** - * Return the level at which we are allowed to schedule containers. - * Given the thresholds indicating how much time passed before relaxing - * scheduling constraints. - */ - public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority, - long nodeLocalityDelayMs, long rackLocalityDelayMs, - long currentTimeMs) { - - // if not being used, can schedule anywhere - if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) { - return NodeType.OFF_SWITCH; - } - - // default level is NODE_LOCAL - if (! allowedLocalityLevel.containsKey(priority)) { - allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL); - return NodeType.NODE_LOCAL; - } - - NodeType allowed = allowedLocalityLevel.get(priority); - - // if level is already most liberal, we're done - if (allowed.equals(NodeType.OFF_SWITCH)) { - return NodeType.OFF_SWITCH; - } - - // check waiting time - long waitTime = currentTimeMs; - if (lastScheduledContainer.containsKey(priority)) { - waitTime -= lastScheduledContainer.get(priority); - } else { - waitTime -= appSchedulable.getStartTime(); - } - - long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ? - nodeLocalityDelayMs : rackLocalityDelayMs; - - if (waitTime > thresholdTime) { - if (allowed.equals(NodeType.NODE_LOCAL)) { - allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL); - resetSchedulingOpportunities(priority, currentTimeMs); - } else if (allowed.equals(NodeType.RACK_LOCAL)) { - allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH); - resetSchedulingOpportunities(priority, currentTimeMs); - } - } - return allowedLocalityLevel.get(priority); - } - - synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, - Priority priority, ResourceRequest request, - Container container) { - // Update allowed locality level - NodeType allowed = allowedLocalityLevel.get(priority); - if (allowed != null) { - if (allowed.equals(NodeType.OFF_SWITCH) && - (type.equals(NodeType.NODE_LOCAL) || - type.equals(NodeType.RACK_LOCAL))) { - this.resetAllowedLocalityLevel(priority, type); - } - else if (allowed.equals(NodeType.RACK_LOCAL) && - type.equals(NodeType.NODE_LOCAL)) { - this.resetAllowedLocalityLevel(priority, type); - } - } - - // Required sanity check - AM can call 'allocate' to update resource - // request without locking the scheduler, hence we need to check - if (getTotalRequiredResources(priority) <= 0) { - return null; - } - - // Create RMContainer - RMContainer rmContainer = new RMContainerImpl(container, - getApplicationAttemptId(), node.getNodeID(), - appSchedulingInfo.getUser(), rmContext); - - // Add it to allContainers list. - newlyAllocatedContainers.add(rmContainer); - liveContainers.put(container.getId(), rmContainer); - - // Update consumption and track allocations - List resourceRequestList = appSchedulingInfo.allocate( - type, node, priority, request, container); - Resources.addTo(currentConsumption, container.getResource()); - - // Update resource requests related to "request" and store in RMContainer - ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); - - // Inform the container - rmContainer.handle( - new RMContainerEvent(container.getId(), RMContainerEventType.START)); - - if (LOG.isDebugEnabled()) { - LOG.debug("allocate: applicationAttemptId=" - + container.getId().getApplicationAttemptId() - + " container=" + container.getId() + " host=" - + container.getNodeId().getHost() + " type=" + type); - } - RMAuditLogger.logSuccess(getUser(), - AuditConstants.ALLOC_CONTAINER, "SchedulerApp", - getApplicationId(), container.getId()); - - return rmContainer; - } - - /** - * Should be called when the scheduler assigns a container at a higher - * degree of locality than the current threshold. Reset the allowed locality - * level to a higher degree of locality. - */ - public synchronized void resetAllowedLocalityLevel(Priority priority, - NodeType level) { - NodeType old = allowedLocalityLevel.get(priority); - LOG.info("Raising locality level from " + old + " to " + level + " at " + - " priority " + priority); - allowedLocalityLevel.put(priority, level); - } - - // related methods - public void addPreemption(RMContainer container, long time) { - assert preemptionMap.get(container) == null; - preemptionMap.put(container, time); - Resources.addTo(preemptedResources, container.getAllocatedResource()); - } - - public Long getContainerPreemptionTime(RMContainer container) { - return preemptionMap.get(container); - } - - public Set getPreemptionContainers() { - return preemptionMap.keySet(); - } - - @Override - public FSLeafQueue getQueue() { - return (FSLeafQueue)super.getQueue(); - } - - public Resource getPreemptedResources() { - return preemptedResources; - } - - public void resetPreemptedResources() { - preemptedResources = Resources.createResource(0); - for (RMContainer container : getPreemptionContainers()) { - Resources.addTo(preemptedResources, container.getAllocatedResource()); - } - } - - public void clearPreemptedResources() { - preemptedResources.setMemory(0); - preemptedResources.setVirtualCores(0); - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index 69f2ab3d844..be08dff397a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -35,7 +35,7 @@ public class FSSchedulerNode extends SchedulerNode { private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class); - private AppSchedulable reservedAppSchedulable; + private FSAppAttempt reservedAppSchedulable; public FSSchedulerNode(RMNode node, boolean usePortForNodeName) { super(node, usePortForNodeName); @@ -76,7 +76,7 @@ public class FSSchedulerNode extends SchedulerNode { " on node " + this + " for application " + application); } setReservedContainer(container); - this.reservedAppSchedulable = ((FSSchedulerApp) application).getAppSchedulable(); + this.reservedAppSchedulable = (FSAppAttempt) application; } @Override @@ -98,7 +98,7 @@ public class FSSchedulerNode extends SchedulerNode { this.reservedAppSchedulable = null; } - public synchronized AppSchedulable getReservedAppSchedulable() { + public synchronized FSAppAttempt getReservedAppSchedulable() { return reservedAppSchedulable; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 8765ba04dc7..4748d73e038 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -117,7 +117,7 @@ import com.google.common.base.Preconditions; @Unstable @SuppressWarnings("unchecked") public class FairScheduler extends - AbstractYarnScheduler { + AbstractYarnScheduler { private FairSchedulerConfiguration conf; private Resource incrAllocation; @@ -432,8 +432,8 @@ public class FairScheduler extends try { // Reset preemptedResource for each app for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { - for (AppSchedulable app : queue.getRunnableAppSchedulables()) { - app.getApp().resetPreemptedResources(); + for (FSAppAttempt app : queue.getRunnableAppSchedulables()) { + app.resetPreemptedResources(); } } @@ -453,8 +453,8 @@ public class FairScheduler extends } finally { // Clear preemptedResources for each app for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { - for (AppSchedulable app : queue.getRunnableAppSchedulables()) { - app.getApp().clearPreemptedResources(); + for (FSAppAttempt app : queue.getRunnableAppSchedulables()) { + app.clearPreemptedResources(); } } } @@ -465,7 +465,7 @@ public class FairScheduler extends protected void warnOrKillContainer(RMContainer container) { ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); - FSSchedulerApp app = getSchedulerApp(appAttemptId); + FSAppAttempt app = getSchedulerApp(appAttemptId); FSLeafQueue queue = app.getQueue(); LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + "res=" + container.getContainer().getResource() + @@ -490,7 +490,7 @@ public class FairScheduler extends (getClock().getTime() - time) + "ms)"); } } else { - // track the request in the FSSchedulerApp itself + // track the request in the FSAppAttempt itself app.addPreemption(container, getClock().getTime()); } } @@ -541,7 +541,7 @@ public class FairScheduler extends } // synchronized for sizeBasedWeight - public synchronized ResourceWeights getAppWeight(AppSchedulable app) { + public synchronized ResourceWeights getAppWeight(FSAppAttempt app) { double weight = 1.0; if (sizeBasedWeight) { // Set weight based on current memory demand @@ -636,8 +636,8 @@ public class FairScheduler extends return; } - SchedulerApplication application = - new SchedulerApplication(queue, user); + SchedulerApplication application = + new SchedulerApplication(queue, user); applications.put(applicationId, application); queue.getMetrics().submitApp(user); @@ -661,13 +661,13 @@ public class FairScheduler extends ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, boolean isAttemptRecovering) { - SchedulerApplication application = + SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); String user = application.getUser(); FSLeafQueue queue = (FSLeafQueue) application.getQueue(); - FSSchedulerApp attempt = - new FSSchedulerApp(applicationAttemptId, user, + FSAppAttempt attempt = + new FSAppAttempt(this, applicationAttemptId, user, queue, new ActiveUsersManager(getRootQueueMetrics()), rmContext); if (transferStateFromPreviousAttempt) { @@ -742,7 +742,7 @@ public class FairScheduler extends private synchronized void removeApplication(ApplicationId applicationId, RMAppState finalState) { - SchedulerApplication application = + SchedulerApplication application = applications.get(applicationId); if (application == null){ LOG.warn("Couldn't find application " + applicationId); @@ -757,9 +757,9 @@ public class FairScheduler extends RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { LOG.info("Application " + applicationAttemptId + " is done." + " finalState=" + rmAppAttemptFinalState); - SchedulerApplication application = + SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); - FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId); + FSAppAttempt attempt = getSchedulerApp(applicationAttemptId); if (attempt == null || application == null) { LOG.info("Unknown application " + applicationAttemptId + " has completed!"); @@ -820,7 +820,7 @@ public class FairScheduler extends Container container = rmContainer.getContainer(); // Get the application for the finished container - FSSchedulerApp application = + FSAppAttempt application = getCurrentAttemptForContainer(container.getId()); ApplicationId appId = container.getId().getApplicationAttemptId().getApplicationId(); @@ -835,8 +835,7 @@ public class FairScheduler extends FSSchedulerNode node = getFSSchedulerNode(container.getNodeId()); if (rmContainer.getState() == RMContainerState.RESERVED) { - application.unreserve(node, rmContainer.getReservedPriority()); - node.unreserveResource(application); + application.unreserve(rmContainer.getReservedPriority(), node); } else { application.containerCompleted(rmContainer, containerStatus, event); node.releaseContainer(container); @@ -896,7 +895,7 @@ public class FairScheduler extends List ask, List release, List blacklistAdditions, List blacklistRemovals) { // Make sure this application exists - FSSchedulerApp application = getSchedulerApp(appAttemptId); + FSAppAttempt application = getSchedulerApp(appAttemptId); if (application == null) { LOG.info("Calling allocate on removed " + "or non existant application " + appAttemptId); @@ -1066,13 +1065,13 @@ public class FairScheduler extends // 1. Check for reserved applications // 2. Schedule if there are no reservations - AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable(); + FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); if (reservedAppSchedulable != null) { Priority reservedPriority = node.getReservedContainer().getReservedPriority(); if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) { // Don't hold the reservation if app can no longer use it LOG.info("Releasing reservation that cannot be satisfied for application " - + reservedAppSchedulable.getApp().getApplicationAttemptId() + + reservedAppSchedulable.getApplicationAttemptId() + " on node " + node); reservedAppSchedulable.unreserve(reservedPriority, node); reservedAppSchedulable = null; @@ -1080,7 +1079,7 @@ public class FairScheduler extends // Reservation exists; try to fulfill the reservation if (LOG.isDebugEnabled()) { LOG.debug("Trying to fulfill reservation for application " - + reservedAppSchedulable.getApp().getApplicationAttemptId() + + reservedAppSchedulable.getApplicationAttemptId() + " on node: " + node); } @@ -1105,8 +1104,8 @@ public class FairScheduler extends updateRootQueueMetrics(); } - public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) { - return (FSSchedulerApp) super.getApplicationAttempt(appAttemptId); + public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) { + return super.getApplicationAttempt(appAttemptId); } /** @@ -1268,8 +1267,8 @@ public class FairScheduler extends fsOpDurations = FSOpDurations.getInstance(true); // This stores per-application scheduling information - this.applications = - new ConcurrentHashMap>(); + this.applications = new ConcurrentHashMap< + ApplicationId, SchedulerApplication>(); this.eventLog = new FairSchedulerEventLog(); eventLog.init(this.conf); @@ -1369,7 +1368,7 @@ public class FairScheduler extends @Override public List getQueueUserAclInfo() { - UserGroupInformation user = null; + UserGroupInformation user; try { user = UserGroupInformation.getCurrentUser(); } catch (IOException ioe) { @@ -1431,11 +1430,11 @@ public class FairScheduler extends @Override public synchronized String moveApplication(ApplicationId appId, String queueName) throws YarnException { - SchedulerApplication app = applications.get(appId); + SchedulerApplication app = applications.get(appId); if (app == null) { throw new YarnException("App to be moved " + appId + " not found."); } - FSSchedulerApp attempt = (FSSchedulerApp) app.getCurrentAppAttempt(); + FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt(); // To serialize with FairScheduler#allocate, synchronize on app attempt synchronized (attempt) { FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue(); @@ -1448,8 +1447,7 @@ public class FairScheduler extends return oldQueue.getQueueName(); } - if (oldQueue.getRunnableAppSchedulables().contains( - attempt.getAppSchedulable())) { + if (oldQueue.getRunnableAppSchedulables().contains(attempt)) { verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue); } @@ -1458,7 +1456,7 @@ public class FairScheduler extends } } - private void verifyMoveDoesNotViolateConstraints(FSSchedulerApp app, + private void verifyMoveDoesNotViolateConstraints(FSAppAttempt app, FSLeafQueue oldQueue, FSLeafQueue targetQueue) throws YarnException { String queueName = targetQueue.getQueueName(); ApplicationAttemptId appAttId = app.getApplicationAttemptId(); @@ -1495,8 +1493,8 @@ public class FairScheduler extends * Helper for moveApplication, which has appropriate synchronization, so all * operations will be atomic. */ - private void executeMove(SchedulerApplication app, - FSSchedulerApp attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) { + private void executeMove(SchedulerApplication app, + FSAppAttempt attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) { boolean wasRunnable = oldQueue.removeApp(attempt); // if app was not runnable before, it may be runnable now boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java index c87f4ca6b8c..1b75740a155 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java @@ -25,15 +25,15 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; /** - * Order {@link AppSchedulable} objects by priority and then by submit time, as + * Order {@link FSAppAttempt} objects by priority and then by submit time, as * in the default scheduler in Hadoop. */ @Private @Unstable -public class FifoAppComparator implements Comparator, Serializable { +public class FifoAppComparator implements Comparator, Serializable { private static final long serialVersionUID = 3428835083489547918L; - public int compare(AppSchedulable a1, AppSchedulable a2) { + public int compare(FSAppAttempt a1, FSAppAttempt a2) { int res = a1.getPriority().compareTo(a2.getPriority()); if (res == 0) { if (a1.getStartTime() < a2.getStartTime()) { @@ -44,7 +44,7 @@ public class FifoAppComparator implements Comparator, Serializab } if (res == 0) { // If there is a tie, break it by app ID to get a deterministic order - res = a1.getApp().getApplicationId().compareTo(a2.getApp().getApplicationId()); + res = a1.getApplicationId().compareTo(a2.getApplicationId()); } return res; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java index 359519a2f29..feeda1e90c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java @@ -43,7 +43,7 @@ public class MaxRunningAppsEnforcer { // Tracks the number of running applications by user. private final Map usersNumRunnableApps; @VisibleForTesting - final ListMultimap usersNonRunnableApps; + final ListMultimap usersNonRunnableApps; public MaxRunningAppsEnforcer(FairScheduler scheduler) { this.scheduler = scheduler; @@ -80,7 +80,7 @@ public class MaxRunningAppsEnforcer { * Tracks the given new runnable app for purposes of maintaining max running * app limits. */ - public void trackRunnableApp(FSSchedulerApp app) { + public void trackRunnableApp(FSAppAttempt app) { String user = app.getUser(); FSLeafQueue queue = app.getQueue(); // Increment running counts for all parent queues @@ -99,9 +99,9 @@ public class MaxRunningAppsEnforcer { * Tracks the given new non runnable app so that it can be made runnable when * it would not violate max running app limits. */ - public void trackNonRunnableApp(FSSchedulerApp app) { + public void trackNonRunnableApp(FSAppAttempt app) { String user = app.getUser(); - usersNonRunnableApps.put(user, app.getAppSchedulable()); + usersNonRunnableApps.put(user, app); } /** @@ -111,7 +111,7 @@ public class MaxRunningAppsEnforcer { * Runs in O(n log(n)) where n is the number of queues that are under the * highest queue that went from having no slack to having slack. */ - public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) { + public void updateRunnabilityOnAppRemoval(FSAppAttempt app, FSLeafQueue queue) { AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); // childqueueX might have no pending apps itself, but if a queue higher up @@ -133,8 +133,8 @@ public class MaxRunningAppsEnforcer { parent = parent.getParent(); } - List> appsNowMaybeRunnable = - new ArrayList>(); + List> appsNowMaybeRunnable = + new ArrayList>(); // Compile lists of apps which may now be runnable // We gather lists instead of building a set of all non-runnable apps so @@ -150,26 +150,26 @@ public class MaxRunningAppsEnforcer { userNumRunning = 0; } if (userNumRunning == allocConf.getUserMaxApps(user) - 1) { - List userWaitingApps = usersNonRunnableApps.get(user); + List userWaitingApps = usersNonRunnableApps.get(user); if (userWaitingApps != null) { appsNowMaybeRunnable.add(userWaitingApps); } } // Scan through and check whether this means that any apps are now runnable - Iterator iter = new MultiListStartTimeIterator( + Iterator iter = new MultiListStartTimeIterator( appsNowMaybeRunnable); - FSSchedulerApp prev = null; - List noLongerPendingApps = new ArrayList(); + FSAppAttempt prev = null; + List noLongerPendingApps = new ArrayList(); while (iter.hasNext()) { - FSSchedulerApp next = iter.next(); + FSAppAttempt next = iter.next(); if (next == prev) { continue; } if (canAppBeRunnable(next.getQueue(), next.getUser())) { trackRunnableApp(next); - AppSchedulable appSched = next.getAppSchedulable(); + FSAppAttempt appSched = next; next.getQueue().getRunnableAppSchedulables().add(appSched); noLongerPendingApps.add(appSched); @@ -186,14 +186,14 @@ public class MaxRunningAppsEnforcer { // We remove the apps from their pending lists afterwards so that we don't // pull them out from under the iterator. If they are not in these lists // in the first place, there is a bug. - for (AppSchedulable appSched : noLongerPendingApps) { - if (!appSched.getApp().getQueue().getNonRunnableAppSchedulables() + for (FSAppAttempt appSched : noLongerPendingApps) { + if (!appSched.getQueue().getNonRunnableAppSchedulables() .remove(appSched)) { LOG.error("Can't make app runnable that does not already exist in queue" + " as non-runnable: " + appSched + ". This should never happen."); } - if (!usersNonRunnableApps.remove(appSched.getApp().getUser(), appSched)) { + if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) { LOG.error("Waiting app " + appSched + " expected to be in " + "usersNonRunnableApps, but was not. This should never happen."); } @@ -204,7 +204,7 @@ public class MaxRunningAppsEnforcer { * Updates the relevant tracking variables after a runnable app with the given * queue and user has been removed. */ - public void untrackRunnableApp(FSSchedulerApp app) { + public void untrackRunnableApp(FSAppAttempt app) { // Update usersRunnableApps String user = app.getUser(); int newUserNumRunning = usersNumRunnableApps.get(user) - 1; @@ -226,8 +226,8 @@ public class MaxRunningAppsEnforcer { /** * Stops tracking the given non-runnable app */ - public void untrackNonRunnableApp(FSSchedulerApp app) { - usersNonRunnableApps.remove(app.getUser(), app.getAppSchedulable()); + public void untrackNonRunnableApp(FSAppAttempt app) { + usersNonRunnableApps.remove(app.getUser(), app); } /** @@ -235,7 +235,7 @@ public class MaxRunningAppsEnforcer { * of non-runnable applications. */ private void gatherPossiblyRunnableAppLists(FSQueue queue, - List> appLists) { + List> appLists) { if (queue.getNumRunnableApps() < scheduler.getAllocationConfiguration() .getQueueMaxApps(queue.getName())) { if (queue instanceof FSLeafQueue) { @@ -259,14 +259,14 @@ public class MaxRunningAppsEnforcer { * of O(num lists) time. */ static class MultiListStartTimeIterator implements - Iterator { + Iterator { - private List[] appLists; + private List[] appLists; private int[] curPositionsInAppLists; private PriorityQueue appListsByCurStartTime; @SuppressWarnings("unchecked") - public MultiListStartTimeIterator(List> appListList) { + public MultiListStartTimeIterator(List> appListList) { appLists = appListList.toArray(new List[appListList.size()]); curPositionsInAppLists = new int[appLists.length]; appListsByCurStartTime = new PriorityQueue(); @@ -284,10 +284,10 @@ public class MaxRunningAppsEnforcer { } @Override - public FSSchedulerApp next() { + public FSAppAttempt next() { IndexAndTime indexAndTime = appListsByCurStartTime.remove(); int nextListIndex = indexAndTime.index; - AppSchedulable next = appLists[nextListIndex] + FSAppAttempt next = appLists[nextListIndex] .get(curPositionsInAppLists[nextListIndex]); curPositionsInAppLists[nextListIndex]++; @@ -299,7 +299,7 @@ public class MaxRunningAppsEnforcer { } appListsByCurStartTime.add(indexAndTime); - return next.getApp(); + return next; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java index e77eed79568..fb32e565808 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java @@ -48,7 +48,7 @@ public class NewAppWeightBooster extends Configured implements WeightAdjuster { super.setConf(conf); } - public double adjustWeight(AppSchedulable app, double curWeight) { + public double adjustWeight(FSAppAttempt app, double curWeight) { long start = app.getStartTime(); long now = System.currentTimeMillis(); if (now - start < duration) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java index 5134be4dce9..122b986defc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java @@ -27,20 +27,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.Resources; /** - * A Schedulable represents an entity that can launch tasks, such as a job - * or a queue. It provides a common interface so that algorithms such as fair - * sharing can be applied both within a queue and across queues. There are - * currently two types of Schedulables: JobSchedulables, which represent a - * single job, and QueueSchedulables, which allocate among jobs in their queue. - * - * Separate sets of Schedulables are used for maps and reduces. Each queue has - * both a mapSchedulable and a reduceSchedulable, and so does each job. + * A Schedulable represents an entity that can be scheduled such as an + * application or a queue. It provides a common interface so that algorithms + * such as fair sharing can be applied both within a queue and across queues. * * A Schedulable is responsible for three roles: - * 1) It can launch tasks through assignTask(). - * 2) It provides information about the job/queue to the scheduler, including: + * 1) Assign resources through {@link #assignContainer}. + * 2) It provides information about the app/queue to the scheduler, including: * - Demand (maximum number of tasks required) - * - Number of currently running tasks * - Minimum share (for queues) * - Job/queue weight (for fair sharing) * - Start time and priority (for FIFO) @@ -57,81 +51,61 @@ import org.apache.hadoop.yarn.util.resource.Resources; */ @Private @Unstable -public abstract class Schedulable { - /** Fair share assigned to this Schedulable */ - private Resource fairShare = Resources.createResource(0); - +public interface Schedulable { /** * Name of job/queue, used for debugging as well as for breaking ties in * scheduling order deterministically. */ - public abstract String getName(); + public String getName(); /** * Maximum number of resources required by this Schedulable. This is defined as * number of currently utilized resources + number of unlaunched resources (that * are either not yet launched or need to be speculated). */ - public abstract Resource getDemand(); + public Resource getDemand(); /** Get the aggregate amount of resources consumed by the schedulable. */ - public abstract Resource getResourceUsage(); + public Resource getResourceUsage(); /** Minimum Resource share assigned to the schedulable. */ - public abstract Resource getMinShare(); + public Resource getMinShare(); /** Maximum Resource share assigned to the schedulable. */ - public abstract Resource getMaxShare(); + public Resource getMaxShare(); /** Job/queue weight in fair sharing. */ - public abstract ResourceWeights getWeights(); + public ResourceWeights getWeights(); /** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/ - public abstract long getStartTime(); + public long getStartTime(); /** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */ - public abstract Priority getPriority(); + public Priority getPriority(); /** Refresh the Schedulable's demand and those of its children if any. */ - public abstract void updateDemand(); + public void updateDemand(); /** * Assign a container on this node if possible, and return the amount of * resources assigned. */ - public abstract Resource assignContainer(FSSchedulerNode node); + public Resource assignContainer(FSSchedulerNode node); /** * Preempt a container from this Schedulable if possible. */ - public abstract RMContainer preemptContainer(); - - /** Assign a fair share to this Schedulable. */ - public void setFairShare(Resource fairShare) { - this.fairShare = fairShare; - } + public RMContainer preemptContainer(); /** Get the fair share assigned to this Schedulable. */ - public Resource getFairShare() { - return fairShare; - } + public Resource getFairShare(); + + /** Assign a fair share to this Schedulable. */ + public void setFairShare(Resource fairShare); /** * Returns true if queue has atleast one app running. Always returns true for * AppSchedulables. */ - public boolean isActive() { - if (this instanceof FSQueue) { - FSQueue queue = (FSQueue) this; - return queue.getNumRunnableApps() > 0; - } - return true; - } - - /** Convenient toString implementation for debugging. */ - @Override - public String toString() { - return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]", - getName(), getDemand(), getResourceUsage(), fairShare, getWeights()); - } + public boolean isActive(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java index 1a9467fc003..67364ed6e91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java @@ -32,5 +32,5 @@ import org.apache.hadoop.conf.Configurable; @Private @Unstable public interface WeightAdjuster { - public double adjustWeight(AppSchedulable app, double curWeight); + public double adjustWeight(FSAppAttempt app, double curWeight); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java index f136f940107..23f8c01c38a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java @@ -46,8 +46,7 @@ public class FairSchedulerInfo extends SchedulerInfo { } public int getAppFairShare(ApplicationAttemptId appAttemptId) { - return scheduler.getSchedulerApp(appAttemptId). - getAppSchedulable().getFairShare().getMemory(); + return scheduler.getSchedulerApp(appAttemptId).getFairShare().getMemory(); } public FairSchedulerQueueInfo getRootQueueInfo() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java index 5cdfb2abdf7..d389b9f0765 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java @@ -24,7 +24,8 @@ import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AppSchedulable; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair + .FSAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; @@ -39,9 +40,9 @@ public class FairSchedulerLeafQueueInfo extends FairSchedulerQueueInfo { public FairSchedulerLeafQueueInfo(FSLeafQueue queue, FairScheduler scheduler) { super(queue, scheduler); - Collection apps = queue.getRunnableAppSchedulables(); - for (AppSchedulable app : apps) { - if (app.getApp().isPending()) { + Collection apps = queue.getRunnableAppSchedulables(); + for (FSAppAttempt app : apps) { + if (app.isPending()) { numPendingApps++; } else { numActiveApps++; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java index dcfc2d3aa2f..5bd52ab7a07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java @@ -28,10 +28,11 @@ import org.apache.hadoop.yarn.util.resource.Resources; /** * Dummy implementation of Schedulable for unit testing. */ -public class FakeSchedulable extends Schedulable { +public class FakeSchedulable implements Schedulable { private Resource usage; private Resource minShare; private Resource maxShare; + private Resource fairShare; private ResourceWeights weights; private Priority priority; private long startTime; @@ -89,6 +90,21 @@ public class FakeSchedulable extends Schedulable { return null; } + @Override + public Resource getFairShare() { + return this.fairShare; + } + + @Override + public void setFairShare(Resource fairShare) { + this.fairShare = fairShare; + } + + @Override + public boolean isActive() { + return true; + } + @Override public Resource getDemand() { return null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java similarity index 82% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java index 2d5a6d4bc8c..0ab1f70147b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java @@ -18,18 +18,20 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import static org.junit.Assert.assertEquals; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.util.Clock; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -public class TestFSSchedulerApp { +public class TestFSAppAttempt extends FairSchedulerTestBase { private class MockClock implements Clock { private long time = 0; @@ -44,11 +46,12 @@ public class TestFSSchedulerApp { } - private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { - ApplicationId appIdImpl = ApplicationId.newInstance(0, appId); - ApplicationAttemptId attId = - ApplicationAttemptId.newInstance(appIdImpl, attemptId); - return attId; + @Before + public void setup() { + Configuration conf = createConfiguration(); + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); } @Test @@ -60,11 +63,10 @@ public class TestFSSchedulerApp { double rackLocalityThreshold = .6; ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); - RMContext rmContext = Mockito.mock(RMContext.class); - Mockito.when(rmContext.getEpoch()).thenReturn(0); - FSSchedulerApp schedulerApp = - new FSSchedulerApp(applicationAttemptId, "user1", queue , null, - rmContext); + RMContext rmContext = resourceManager.getRMContext(); + FSAppAttempt schedulerApp = + new FSAppAttempt(scheduler, applicationAttemptId, "user1", queue , + null, rmContext); // Default level should be node-local assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel( @@ -113,25 +115,21 @@ public class TestFSSchedulerApp { @Test public void testDelaySchedulingForContinuousScheduling() throws InterruptedException { - FSLeafQueue queue = Mockito.mock(FSLeafQueue.class); + FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue("queue", true); Priority prio = Mockito.mock(Priority.class); Mockito.when(prio.getPriority()).thenReturn(1); MockClock clock = new MockClock(); + scheduler.setClock(clock); long nodeLocalityDelayMs = 5 * 1000L; // 5 seconds long rackLocalityDelayMs = 6 * 1000L; // 6 seconds - RMContext rmContext = Mockito.mock(RMContext.class); - Mockito.when(rmContext.getEpoch()).thenReturn(0); + RMContext rmContext = resourceManager.getRMContext(); ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); - FSSchedulerApp schedulerApp = - new FSSchedulerApp(applicationAttemptId, "user1", queue, + FSAppAttempt schedulerApp = + new FSAppAttempt(scheduler, applicationAttemptId, "user1", queue, null, rmContext); - AppSchedulable appSchedulable = Mockito.mock(AppSchedulable.class); - long startTime = clock.getTime(); - Mockito.when(appSchedulable.getStartTime()).thenReturn(startTime); - schedulerApp.setAppSchedulable(appSchedulable); // Default level should be node-local assertEquals(NodeType.NODE_LOCAL, @@ -179,12 +177,11 @@ public class TestFSSchedulerApp { Priority prio = Mockito.mock(Priority.class); Mockito.when(prio.getPriority()).thenReturn(1); - RMContext rmContext = Mockito.mock(RMContext.class); - Mockito.when(rmContext.getEpoch()).thenReturn(0); + RMContext rmContext = resourceManager.getRMContext(); ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); - FSSchedulerApp schedulerApp = - new FSSchedulerApp(applicationAttemptId, "user1", queue , null, - rmContext); + FSAppAttempt schedulerApp = + new FSAppAttempt(scheduler, applicationAttemptId, "user1", queue , + null, rmContext); assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel( prio, 10, -1.0, -1.0)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 4c07ee885f6..7323b6ab050 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -62,7 +62,7 @@ public class TestFSLeafQueue { @Test public void testUpdateDemand() { - AppSchedulable app = mock(AppSchedulable.class); + FSAppAttempt app = mock(FSAppAttempt.class); Mockito.when(app.getDemand()).thenReturn(maxResource); schedulable.addAppSchedulable(app); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 030ca7dd7b3..fb158730082 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -58,7 +58,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -82,13 +81,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -1539,7 +1536,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); assertEquals(1, app.getLiveContainers().size()); ContainerId containerId = scheduler.getSchedulerApp(attId) @@ -1613,9 +1610,9 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "norealuserhasthisname2", 1); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); assertNotNull("The application was not allowed", app1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); assertNull("The application was allowed", app2); } @@ -1688,8 +1685,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { "user1", 2); ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true); queue1.setPolicy(new FifoPolicy()); @@ -1731,7 +1728,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId attId = createSchedulingRequest(1024, "root.default", "user", 8); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); // set maxAssign to 2: only 2 containers should be allocated scheduler.maxAssign = 2; @@ -1766,7 +1763,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId attId = createSchedulingRequest(0, 1, "root.default", "user", 8); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); // set maxAssign to 2: only 2 containers should be allocated scheduler.maxAssign = 2; @@ -1830,10 +1827,10 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId attId4 = createSchedulingRequest(1024, fifoQueue, user, 4); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); - FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3); - FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); + FSAppAttempt app4 = scheduler.getSchedulerApp(attId4); scheduler.getQueueManager().getLeafQueue(fifoQueue, true) .setPolicy(SchedulingPolicy.parse("fifo")); @@ -1952,7 +1949,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); assertEquals(0, app.getLiveContainers().size()); assertEquals(0, app.getReservedContainers().size()); @@ -2025,7 +2022,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSAppAttempt app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -2066,7 +2063,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSAppAttempt app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -2101,7 +2098,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1", "user1", 0); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true); ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true); @@ -2143,7 +2140,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default", "user1", 2); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); scheduler.update(); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); @@ -2165,10 +2162,10 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1", "user1", 2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterResource()); @@ -2208,13 +2205,13 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1", "user1", 2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); - FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterResource()); @@ -2247,19 +2244,19 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3); ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app4 = scheduler.getSchedulerApp(appAttId4); + FSAppAttempt app4 = scheduler.getSchedulerApp(appAttId4); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterResource()); @@ -2341,7 +2338,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSAppAttempt app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -2353,14 +2350,14 @@ public class TestFairScheduler extends FairSchedulerTestBase { } private void verifyAppRunnable(ApplicationAttemptId attId, boolean runnable) { - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); FSLeafQueue queue = app.getQueue(); - Collection runnableApps = + Collection runnableApps = queue.getRunnableAppSchedulables(); - Collection nonRunnableApps = + Collection nonRunnableApps = queue.getNonRunnableAppSchedulables(); - assertEquals(runnable, runnableApps.contains(app.getAppSchedulable())); - assertEquals(!runnable, nonRunnableApps.contains(app.getAppSchedulable())); + assertEquals(runnable, runnableApps.contains(app)); + assertEquals(!runnable, nonRunnableApps.contains(app)); } private void verifyQueueNumRunnable(String queueName, int numRunnableInQueue, @@ -2465,7 +2462,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId attId1 = createAppAttemptId(1, 1); createApplicationWithAMResource(attId1, "queue1", "user1", amResource1); createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application1's AM requests 1024 MB memory", @@ -2479,7 +2476,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId attId2 = createAppAttemptId(2, 1); createApplicationWithAMResource(attId2, "queue1", "user1", amResource1); createSchedulingRequestExistingApplication(1024, 1, amPriority, attId2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application2's AM requests 1024 MB memory", @@ -2493,7 +2490,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId attId3 = createAppAttemptId(3, 1); createApplicationWithAMResource(attId3, "queue1", "user1", amResource1); createSchedulingRequestExistingApplication(1024, 1, amPriority, attId3); - FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application3's AM requests 1024 MB memory", @@ -2529,7 +2526,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId attId4 = createAppAttemptId(4, 1); createApplicationWithAMResource(attId4, "queue1", "user1", amResource2); createSchedulingRequestExistingApplication(2048, 2, amPriority, attId4); - FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4); + FSAppAttempt app4 = scheduler.getSchedulerApp(attId4); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application4's AM requests 2048 MB memory", @@ -2543,7 +2540,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId attId5 = createAppAttemptId(5, 1); createApplicationWithAMResource(attId5, "queue1", "user1", amResource2); createSchedulingRequestExistingApplication(2048, 2, amPriority, attId5); - FSSchedulerApp app5 = scheduler.getSchedulerApp(attId5); + FSAppAttempt app5 = scheduler.getSchedulerApp(attId5); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application5's AM requests 2048 MB memory", @@ -2586,7 +2583,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId attId6 = createAppAttemptId(6, 1); createApplicationWithAMResource(attId6, "queue1", "user1", amResource3); createSchedulingRequestExistingApplication(1860, 2, amPriority, attId6); - FSSchedulerApp app6 = scheduler.getSchedulerApp(attId6); + FSAppAttempt app6 = scheduler.getSchedulerApp(attId6); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application6's AM should not be running", @@ -2677,7 +2674,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId attId1 = createAppAttemptId(1, 1); createApplicationWithAMResource(attId1, "queue1", "test1", amResource1); createSchedulingRequestExistingApplication(2048, 1, amPriority, attId1); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application1's AM requests 2048 MB memory", @@ -2691,7 +2688,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId attId2 = createAppAttemptId(2, 1); createApplicationWithAMResource(attId2, "queue2", "test1", amResource1); createSchedulingRequestExistingApplication(2048, 1, amPriority, attId2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application2's AM requests 2048 MB memory", @@ -2823,7 +2820,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // at least one pass Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500); - FSSchedulerApp app = fs.getSchedulerApp(appAttemptId); + FSAppAttempt app = fs.getSchedulerApp(appAttemptId); // Wait until app gets resources. while (app.getCurrentConsumption().equals(Resources.none())) { } @@ -3007,7 +3004,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers() .size()); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId); + FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); // ResourceRequest will be empty once NodeUpdate is completed Assert.assertNull(app.getResourceRequest(priority, host)); @@ -3063,7 +3060,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId appAttemptId = createSchedulingRequest(GB, "root.default", "user", 1); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId); + FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); // Verify the blacklist can be updated independent of requesting containers scheduler.allocate(appAttemptId, Collections.emptyList(), @@ -3171,12 +3168,10 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals(Resource.newInstance(3072, 3), oldQueue.getDemand()); scheduler.moveApplication(appId, "queue2"); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttId); + FSAppAttempt app = scheduler.getSchedulerApp(appAttId); assertSame(targetQueue, app.getQueue()); - assertFalse(oldQueue.getRunnableAppSchedulables() - .contains(app.getAppSchedulable())); - assertTrue(targetQueue.getRunnableAppSchedulables() - .contains(app.getAppSchedulable())); + assertFalse(oldQueue.getRunnableAppSchedulables().contains(app)); + assertTrue(targetQueue.getRunnableAppSchedulables().contains(app)); assertEquals(Resource.newInstance(0, 0), oldQueue.getResourceUsage()); assertEquals(Resource.newInstance(1024, 1), targetQueue.getResourceUsage()); assertEquals(0, oldQueue.getNumRunnableApps()); @@ -3224,17 +3219,13 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId appAttId = createSchedulingRequest(1024, 1, "queue1", "user1", 3); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttId); - assertTrue(oldQueue.getNonRunnableAppSchedulables() - .contains(app.getAppSchedulable())); + FSAppAttempt app = scheduler.getSchedulerApp(appAttId); + assertTrue(oldQueue.getNonRunnableAppSchedulables().contains(app)); scheduler.moveApplication(appAttId.getApplicationId(), "queue2"); - assertFalse(oldQueue.getNonRunnableAppSchedulables() - .contains(app.getAppSchedulable())); - assertFalse(targetQueue.getNonRunnableAppSchedulables() - .contains(app.getAppSchedulable())); - assertTrue(targetQueue.getRunnableAppSchedulables() - .contains(app.getAppSchedulable())); + assertFalse(oldQueue.getNonRunnableAppSchedulables().contains(app)); + assertFalse(targetQueue.getNonRunnableAppSchedulables().contains(app)); + assertTrue(targetQueue.getRunnableAppSchedulables().contains(app)); assertEquals(1, targetQueue.getNumRunnableApps()); assertEquals(1, queueMgr.getRootQueue().getNumRunnableApps()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java index cc738f5eb01..20ff2c9cd25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java @@ -42,12 +42,13 @@ public class TestMaxRunningAppsEnforcer { private int appNum; private TestFairScheduler.MockClock clock; private RMContext rmContext; + private FairScheduler scheduler; @Before public void setup() throws Exception { Configuration conf = new Configuration(); clock = new TestFairScheduler.MockClock(); - FairScheduler scheduler = mock(FairScheduler.class); + scheduler = mock(FairScheduler.class); when(scheduler.getConf()).thenReturn( new FairSchedulerConfiguration(conf)); when(scheduler.getClock()).thenReturn(clock); @@ -65,11 +66,11 @@ public class TestMaxRunningAppsEnforcer { when(rmContext.getEpoch()).thenReturn(0); } - private FSSchedulerApp addApp(FSLeafQueue queue, String user) { + private FSAppAttempt addApp(FSLeafQueue queue, String user) { ApplicationId appId = ApplicationId.newInstance(0l, appNum++); ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0); boolean runnable = maxAppsEnforcer.canAppBeRunnable(queue, user); - FSSchedulerApp app = new FSSchedulerApp(attId, user, queue, null, + FSAppAttempt app = new FSAppAttempt(scheduler, attId, user, queue, null, rmContext); queue.addApp(app, runnable); if (runnable) { @@ -80,7 +81,7 @@ public class TestMaxRunningAppsEnforcer { return app; } - private void removeApp(FSSchedulerApp app) { + private void removeApp(FSAppAttempt app) { app.getQueue().removeApp(app); maxAppsEnforcer.untrackRunnableApp(app); maxAppsEnforcer.updateRunnabilityOnAppRemoval(app, app.getQueue()); @@ -93,7 +94,7 @@ public class TestMaxRunningAppsEnforcer { queueMaxApps.put("root", 2); queueMaxApps.put("root.queue1", 1); queueMaxApps.put("root.queue2", 1); - FSSchedulerApp app1 = addApp(leaf1, "user"); + FSAppAttempt app1 = addApp(leaf1, "user"); addApp(leaf2, "user"); addApp(leaf2, "user"); assertEquals(1, leaf1.getRunnableAppSchedulables().size()); @@ -110,7 +111,7 @@ public class TestMaxRunningAppsEnforcer { FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); queueMaxApps.put("root.queue1", 2); - FSSchedulerApp app1 = addApp(leaf1, "user"); + FSAppAttempt app1 = addApp(leaf1, "user"); addApp(leaf2, "user"); addApp(leaf2, "user"); assertEquals(1, leaf1.getRunnableAppSchedulables().size()); @@ -128,7 +129,7 @@ public class TestMaxRunningAppsEnforcer { FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.leaf2", true); queueMaxApps.put("root.queue1.leaf1", 2); userMaxApps.put("user1", 1); - FSSchedulerApp app1 = addApp(leaf1, "user1"); + FSAppAttempt app1 = addApp(leaf1, "user1"); addApp(leaf1, "user2"); addApp(leaf1, "user3"); addApp(leaf2, "user1"); @@ -147,7 +148,7 @@ public class TestMaxRunningAppsEnforcer { FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); queueMaxApps.put("root.queue1", 2); - FSSchedulerApp app1 = addApp(leaf1, "user"); + FSAppAttempt app1 = addApp(leaf1, "user"); addApp(leaf2, "user"); addApp(leaf2, "user"); clock.tick(20); @@ -167,7 +168,7 @@ public class TestMaxRunningAppsEnforcer { FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); queueMaxApps.put("root.queue1", 2); - FSSchedulerApp app1 = addApp(leaf1, "user"); + FSAppAttempt app1 = addApp(leaf1, "user"); addApp(leaf2, "user"); addApp(leaf2, "user"); addApp(leaf2, "user"); @@ -182,21 +183,18 @@ public class TestMaxRunningAppsEnforcer { @Test public void testMultiListStartTimeIteratorEmptyAppLists() { - List> lists = new ArrayList>(); - lists.add(Arrays.asList(mockAppSched(1))); - lists.add(Arrays.asList(mockAppSched(2))); - Iterator iter = + List> lists = new ArrayList>(); + lists.add(Arrays.asList(mockAppAttempt(1))); + lists.add(Arrays.asList(mockAppAttempt(2))); + Iterator iter = new MaxRunningAppsEnforcer.MultiListStartTimeIterator(lists); - assertEquals(1, iter.next().getAppSchedulable().getStartTime()); - assertEquals(2, iter.next().getAppSchedulable().getStartTime()); + assertEquals(1, iter.next().getStartTime()); + assertEquals(2, iter.next().getStartTime()); } - private AppSchedulable mockAppSched(long startTime) { - AppSchedulable appSched = mock(AppSchedulable.class); - when(appSched.getStartTime()).thenReturn(startTime); - FSSchedulerApp schedApp = mock(FSSchedulerApp.class); - when(schedApp.getAppSchedulable()).thenReturn(appSched); - when(appSched.getApp()).thenReturn(schedApp); - return appSched; + private FSAppAttempt mockAppAttempt(long startTime) { + FSAppAttempt schedApp = mock(FSAppAttempt.class); + when(schedApp.getStartTime()).thenReturn(startTime); + return schedApp; } }