From 20d389ce61eaacb5ddfb329015f50e96ad894f8d Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Mon, 14 Mar 2016 14:19:05 -0700 Subject: [PATCH] YARN-4719. Add a helper library to maintain node state and allows common queries. (kasha) --- .../scheduler/AbstractYarnScheduler.java | 170 ++-------- .../scheduler/ClusterNodeTracker.java | 300 ++++++++++++++++++ .../resourcemanager/scheduler/NodeFilter.java | 33 ++ .../scheduler/capacity/CapacityScheduler.java | 86 +++-- .../scheduler/fair/FSAppAttempt.java | 13 +- .../scheduler/fair/FairScheduler.java | 104 +++--- .../scheduler/fifo/FifoScheduler.java | 32 +- .../scheduler/TestAbstractYarnScheduler.java | 14 +- .../scheduler/capacity/TestReservations.java | 19 +- .../scheduler/fair/TestFairScheduler.java | 7 +- .../scheduler/fifo/TestFifoScheduler.java | 19 +- 11 files changed, 478 insertions(+), 319 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeFilter.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 7ca8671e79b..7d123012529 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -27,11 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -92,22 +88,10 @@ public abstract class AbstractYarnScheduler private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class); - // Nodes in the cluster, indexed by NodeId - protected Map nodes = new ConcurrentHashMap(); - - // Whole capacity of the cluster - protected Resource clusterResource = Resource.newInstance(0, 0); + protected final ClusterNodeTracker nodeTracker = + new ClusterNodeTracker<>(); protected Resource minimumAllocation; - protected Resource maximumAllocation; - private Resource configuredMaximumAllocation; - private int maxNodeMemory = -1; - private int maxNodeVCores = -1; - private final ReadLock maxAllocReadLock; - private final WriteLock maxAllocWriteLock; - - private boolean useConfiguredMaximumAllocationOnly = true; - private long configuredMaximumAllocationWaitTime; protected RMContext rmContext; @@ -132,9 +116,6 @@ public abstract class AbstractYarnScheduler */ public AbstractYarnScheduler(String name) { super(name); - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - this.maxAllocReadLock = lock.readLock(); - this.maxAllocWriteLock = lock.writeLock(); } @Override @@ -142,14 +123,21 @@ public abstract class AbstractYarnScheduler nmExpireInterval = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); - configuredMaximumAllocationWaitTime = + long configuredMaximumAllocationWaitTime = conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS); + nodeTracker.setConfiguredMaxAllocationWaitTime( + configuredMaximumAllocationWaitTime); maxClusterLevelAppPriority = getMaxPriorityFromConf(conf); createReleaseCache(); super.serviceInit(conf); } + @VisibleForTesting + public ClusterNodeTracker getNodeTracker() { + return nodeTracker; + } + public List getTransferredContainers( ApplicationAttemptId currentAttempt) { ApplicationId appId = currentAttempt.getApplicationId(); @@ -184,20 +172,21 @@ public abstract class AbstractYarnScheduler * Add blacklisted NodeIds to the list that is passed. * * @param app application attempt. - * @param blacklistNodeIdList the list to store blacklisted NodeIds. */ - public void addBlacklistedNodeIdsToList(SchedulerApplicationAttempt app, - List blacklistNodeIdList) { - for (Map.Entry nodeEntry : nodes.entrySet()) { - if (SchedulerAppUtils.isBlacklisted(app, nodeEntry.getValue(), LOG)) { - blacklistNodeIdList.add(nodeEntry.getKey()); + public List getBlacklistedNodes(final SchedulerApplicationAttempt app) { + + NodeFilter nodeFilter = new NodeFilter() { + @Override + public boolean accept(SchedulerNode node) { + return SchedulerAppUtils.isBlacklisted(app, node, LOG); } - } + }; + return nodeTracker.getNodes(nodeFilter); } @Override public Resource getClusterResource() { - return clusterResource; + return nodeTracker.getClusterCapacity(); } @Override @@ -207,22 +196,7 @@ public abstract class AbstractYarnScheduler @Override public Resource getMaximumResourceCapability() { - Resource maxResource; - maxAllocReadLock.lock(); - try { - if (useConfiguredMaximumAllocationOnly) { - if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp() - > configuredMaximumAllocationWaitTime) { - useConfiguredMaximumAllocationOnly = false; - } - maxResource = Resources.clone(configuredMaximumAllocation); - } else { - maxResource = Resources.clone(maximumAllocation); - } - } finally { - maxAllocReadLock.unlock(); - } - return maxResource; + return nodeTracker.getMaxAllowedAllocation(); } @Override @@ -231,15 +205,7 @@ public abstract class AbstractYarnScheduler } protected void initMaximumResourceCapability(Resource maximumAllocation) { - maxAllocWriteLock.lock(); - try { - if (this.configuredMaximumAllocation == null) { - this.configuredMaximumAllocation = Resources.clone(maximumAllocation); - this.maximumAllocation = Resources.clone(maximumAllocation); - } - } finally { - maxAllocWriteLock.unlock(); - } + nodeTracker.setConfiguredMaxAllocation(maximumAllocation); } protected synchronized void containerLaunchedOnNode( @@ -332,8 +298,7 @@ public abstract class AbstractYarnScheduler @Override public SchedulerNodeReport getNodeReport(NodeId nodeId) { - N node = nodes.get(nodeId); - return node == null ? null : new SchedulerNodeReport(node); + return nodeTracker.getNodeReport(nodeId); } @Override @@ -431,12 +396,13 @@ public abstract class AbstractYarnScheduler container)); // recover scheduler node - SchedulerNode schedulerNode = nodes.get(nm.getNodeID()); + SchedulerNode schedulerNode = nodeTracker.getNode(nm.getNodeID()); schedulerNode.recoverContainer(rmContainer); // recover queue: update headroom etc. Queue queue = schedulerAttempt.getQueue(); - queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer); + queue.recoverContainer( + getClusterResource(), schedulerAttempt, rmContainer); // recover scheduler attempt schedulerAttempt.recoverContainer(schedulerNode, rmContainer); @@ -621,7 +587,7 @@ public abstract class AbstractYarnScheduler @Override public SchedulerNode getSchedulerNode(NodeId nodeId) { - return nodes.get(nodeId); + return nodeTracker.getNode(nodeId); } @Override @@ -690,18 +656,12 @@ public abstract class AbstractYarnScheduler + " from: " + oldResource + ", to: " + newResource); - nodes.remove(nm.getNodeID()); - updateMaximumAllocation(node, false); + nodeTracker.removeNode(nm.getNodeID()); // update resource to node node.setTotalResource(newResource); - nodes.put(nm.getNodeID(), (N)node); - updateMaximumAllocation(node, true); - - // update resource to clusterResource - Resources.subtractFrom(clusterResource, oldResource); - Resources.addTo(clusterResource, newResource); + nodeTracker.addNode((N) node); } else { // Log resource change LOG.warn("Update resource on node: " + node.getNodeName() @@ -721,80 +681,8 @@ public abstract class AbstractYarnScheduler + " does not support reservations"); } - protected void updateMaximumAllocation(SchedulerNode node, boolean add) { - Resource totalResource = node.getTotalResource(); - maxAllocWriteLock.lock(); - try { - if (add) { // added node - int nodeMemory = totalResource.getMemory(); - if (nodeMemory > maxNodeMemory) { - maxNodeMemory = nodeMemory; - maximumAllocation.setMemory(Math.min( - configuredMaximumAllocation.getMemory(), maxNodeMemory)); - } - int nodeVCores = totalResource.getVirtualCores(); - if (nodeVCores > maxNodeVCores) { - maxNodeVCores = nodeVCores; - maximumAllocation.setVirtualCores(Math.min( - configuredMaximumAllocation.getVirtualCores(), maxNodeVCores)); - } - } else { // removed node - if (maxNodeMemory == totalResource.getMemory()) { - maxNodeMemory = -1; - } - if (maxNodeVCores == totalResource.getVirtualCores()) { - maxNodeVCores = -1; - } - // We only have to iterate through the nodes if the current max memory - // or vcores was equal to the removed node's - if (maxNodeMemory == -1 || maxNodeVCores == -1) { - for (Map.Entry nodeEntry : nodes.entrySet()) { - int nodeMemory = - nodeEntry.getValue().getTotalResource().getMemory(); - if (nodeMemory > maxNodeMemory) { - maxNodeMemory = nodeMemory; - } - int nodeVCores = - nodeEntry.getValue().getTotalResource().getVirtualCores(); - if (nodeVCores > maxNodeVCores) { - maxNodeVCores = nodeVCores; - } - } - if (maxNodeMemory == -1) { // no nodes - maximumAllocation.setMemory(configuredMaximumAllocation.getMemory()); - } else { - maximumAllocation.setMemory( - Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory)); - } - if (maxNodeVCores == -1) { // no nodes - maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores()); - } else { - maximumAllocation.setVirtualCores( - Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores)); - } - } - } - } finally { - maxAllocWriteLock.unlock(); - } - } - protected void refreshMaximumAllocation(Resource newMaxAlloc) { - maxAllocWriteLock.lock(); - try { - configuredMaximumAllocation = Resources.clone(newMaxAlloc); - int maxMemory = newMaxAlloc.getMemory(); - if (maxNodeMemory != -1) { - maxMemory = Math.min(maxMemory, maxNodeMemory); - } - int maxVcores = newMaxAlloc.getVirtualCores(); - if (maxNodeVCores != -1) { - maxVcores = Math.min(maxVcores, maxNodeVCores); - } - maximumAllocation = Resources.createResource(maxMemory, maxVcores); - } finally { - maxAllocWriteLock.unlock(); - } + nodeTracker.setConfiguredMaxAllocation(newMaxAlloc); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java new file mode 100644 index 00000000000..34b4267c758 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Helper library that: + * - tracks the state of all cluster {@link SchedulerNode}s + * - provides convenience methods to filter and sort nodes + */ +@InterfaceAudience.Private +public class ClusterNodeTracker { + private static final Log LOG = LogFactory.getLog(ClusterNodeTracker.class); + + private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true); + private Lock readLock = readWriteLock.readLock(); + private Lock writeLock = readWriteLock.writeLock(); + + private HashMap nodes = new HashMap<>(); + private Map nodesPerRack = new HashMap<>(); + + private Resource clusterCapacity = Resources.clone(Resources.none()); + private Resource staleClusterCapacity = null; + + // Max allocation + private int maxNodeMemory = -1; + private int maxNodeVCores = -1; + private Resource configuredMaxAllocation; + private boolean forceConfiguredMaxAllocation = true; + private long configuredMaxAllocationWaitTime; + + public void addNode(N node) { + writeLock.lock(); + try { + nodes.put(node.getNodeID(), node); + + // Update nodes per rack as well + String rackName = node.getRackName(); + Integer numNodes = nodesPerRack.get(rackName); + if (numNodes == null) { + numNodes = 0; + } + nodesPerRack.put(rackName, ++numNodes); + + // Update cluster capacity + Resources.addTo(clusterCapacity, node.getTotalResource()); + + // Update maximumAllocation + updateMaxResources(node, true); + } finally { + writeLock.unlock(); + } + } + + public boolean exists(NodeId nodeId) { + readLock.lock(); + try { + return nodes.containsKey(nodeId); + } finally { + readLock.unlock(); + } + } + + public N getNode(NodeId nodeId) { + readLock.lock(); + try { + return nodes.get(nodeId); + } finally { + readLock.unlock(); + } + } + + public SchedulerNodeReport getNodeReport(NodeId nodeId) { + readLock.lock(); + try { + N n = nodes.get(nodeId); + return n == null ? null : new SchedulerNodeReport(n); + } finally { + readLock.unlock(); + } + } + + public int nodeCount() { + readLock.lock(); + try { + return nodes.size(); + } finally { + readLock.unlock(); + } + } + + public int nodeCount(String rackName) { + readLock.lock(); + String rName = rackName == null ? "NULL" : rackName; + try { + Integer nodeCount = nodesPerRack.get(rName); + return nodeCount == null ? 0 : nodeCount; + } finally { + readLock.unlock(); + } + } + + public Resource getClusterCapacity() { + readLock.lock(); + try { + if (staleClusterCapacity == null || + !Resources.equals(staleClusterCapacity, clusterCapacity)) { + staleClusterCapacity = Resources.clone(clusterCapacity); + } + return staleClusterCapacity; + } finally { + readLock.unlock(); + } + } + + public N removeNode(NodeId nodeId) { + writeLock.lock(); + try { + N node = nodes.remove(nodeId); + if (node == null) { + LOG.warn("Attempting to remove a non-existent node " + nodeId); + return null; + } + + // Update nodes per rack as well + String rackName = node.getRackName(); + Integer numNodes = nodesPerRack.get(rackName); + if (numNodes > 0) { + nodesPerRack.put(rackName, --numNodes); + } else { + LOG.error("Attempting to remove node from an empty rack " + rackName); + } + + // Update cluster capacity + Resources.subtractFrom(clusterCapacity, node.getTotalResource()); + + // Update maximumAllocation + updateMaxResources(node, false); + + return node; + } finally { + writeLock.unlock(); + } + } + + public void setConfiguredMaxAllocation(Resource resource) { + writeLock.lock(); + try { + configuredMaxAllocation = Resources.clone(resource); + } finally { + writeLock.unlock(); + } + } + + public void setConfiguredMaxAllocationWaitTime( + long configuredMaxAllocationWaitTime) { + writeLock.lock(); + try { + this.configuredMaxAllocationWaitTime = + configuredMaxAllocationWaitTime; + } finally { + writeLock.unlock(); + } + } + + public Resource getMaxAllowedAllocation() { + readLock.lock(); + try { + if (forceConfiguredMaxAllocation && + System.currentTimeMillis() - ResourceManager.getClusterTimeStamp() + > configuredMaxAllocationWaitTime) { + forceConfiguredMaxAllocation = false; + } + + if (forceConfiguredMaxAllocation + || maxNodeMemory == -1 || maxNodeVCores == -1) { + return configuredMaxAllocation; + } + + return Resources.createResource( + Math.min(configuredMaxAllocation.getMemory(), maxNodeMemory), + Math.min(configuredMaxAllocation.getVirtualCores(), maxNodeVCores) + ); + } finally { + readLock.unlock(); + } + } + + private void updateMaxResources(SchedulerNode node, boolean add) { + Resource totalResource = node.getTotalResource(); + writeLock.lock(); + try { + if (add) { // added node + int nodeMemory = totalResource.getMemory(); + if (nodeMemory > maxNodeMemory) { + maxNodeMemory = nodeMemory; + } + int nodeVCores = totalResource.getVirtualCores(); + if (nodeVCores > maxNodeVCores) { + maxNodeVCores = nodeVCores; + } + } else { // removed node + if (maxNodeMemory == totalResource.getMemory()) { + maxNodeMemory = -1; + } + if (maxNodeVCores == totalResource.getVirtualCores()) { + maxNodeVCores = -1; + } + // We only have to iterate through the nodes if the current max memory + // or vcores was equal to the removed node's + if (maxNodeMemory == -1 || maxNodeVCores == -1) { + // Treat it like an empty cluster and add nodes + for (N n : nodes.values()) { + updateMaxResources(n, true); + } + } + } + } finally { + writeLock.unlock(); + } + } + + public List getAllNodes() { + return getNodes(null); + } + + /** + * Convenience method to filter nodes based on a condition. + */ + public List getNodes(NodeFilter nodeFilter) { + List nodeList = new ArrayList<>(); + readLock.lock(); + try { + if (nodeFilter == null) { + nodeList.addAll(nodes.values()); + } else { + for (N node : nodes.values()) { + if (nodeFilter.accept(node)) { + nodeList.add(node); + } + } + } + } finally { + readLock.unlock(); + } + return nodeList; + } + + /** + * Convenience method to sort nodes. + * + * Note that the sort is performed without holding a lock. We are sorting + * here instead of on the caller to allow for future optimizations (e.g. + * sort once every x milliseconds). + */ + public List sortedNodeList(Comparator comparator) { + List sortedList = null; + readLock.lock(); + try { + sortedList = new ArrayList(nodes.values()); + } finally { + readLock.unlock(); + } + Collections.sort(sortedList, comparator); + return sortedList; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeFilter.java new file mode 100644 index 00000000000..7b3e7a2ce75 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeFilter.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Convenience way to filter nodes based on a criteria. To be used in + * conjunction with {@link ClusterNodeTracker} + */ +@InterfaceAudience.Private +public interface NodeFilter { + + /** + * Criteria to accept node in the filtered list. + */ + boolean accept(SchedulerNode node); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6a1091d6d40..735306aeed3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -34,7 +34,6 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -219,8 +218,6 @@ public class CapacityScheduler extends private Map queues = new ConcurrentHashMap(); - private AtomicInteger numNodeManagers = new AtomicInteger(0); - private ResourceCalculator calculator; private boolean usePortForNodeName; @@ -280,7 +277,7 @@ public class CapacityScheduler extends @Override public int getNumClusterNodes() { - return numNodeManagers.get(); + return nodeTracker.nodeCount(); } @Override @@ -387,7 +384,7 @@ public class CapacityScheduler extends static void schedule(CapacityScheduler cs) { // First randomize the start point int current = 0; - Collection nodes = cs.getAllNodes().values(); + Collection nodes = cs.nodeTracker.getAllNodes(); int start = random.nextInt(nodes.size()); for (FiCaSchedulerNode node : nodes) { if (current++ >= start) { @@ -524,10 +521,11 @@ public class CapacityScheduler extends addNewQueues(queues, newQueues); // Re-configure queues - root.reinitialize(newRoot, clusterResource); + root.reinitialize(newRoot, getClusterResource()); updatePlacementRules(); // Re-calculate headroom for active applications + Resource clusterResource = getClusterResource(); root.updateClusterResource(clusterResource, new ResourceLimits( clusterResource)); @@ -995,7 +993,7 @@ public class CapacityScheduler extends allocation = application.getAllocation(getResourceCalculator(), - clusterResource, getMinimumResourceCapability()); + getClusterResource(), getMinimumResourceCapability()); } if (updateDemandForQueue != null && !application @@ -1036,7 +1034,8 @@ public class CapacityScheduler extends private synchronized void nodeUpdate(RMNode nm) { if (LOG.isDebugEnabled()) { - LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource); + LOG.debug("nodeUpdate: " + nm + + " clusterResources: " + getClusterResource()); } Resource releaseResources = Resource.newInstance(0, 0); @@ -1119,6 +1118,7 @@ public class CapacityScheduler extends private synchronized void updateNodeAndQueueResource(RMNode nm, ResourceOption resourceOption) { updateNodeResource(nm, resourceOption); + Resource clusterResource = getClusterResource(); root.updateClusterResource(clusterResource, new ResourceLimits( clusterResource)); } @@ -1128,7 +1128,7 @@ public class CapacityScheduler extends */ private synchronized void updateLabelsOnNode(NodeId nodeId, Set newLabels) { - FiCaSchedulerNode node = nodes.get(nodeId); + FiCaSchedulerNode node = nodeTracker.getNode(nodeId); if (null == node) { return; } @@ -1230,12 +1230,12 @@ public class CapacityScheduler extends LeafQueue queue = ((LeafQueue) reservedApplication.getQueue()); assignment = queue.assignContainers( - clusterResource, + getClusterResource(), node, // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( - RMNodeLabelsManager.NO_LABEL, clusterResource)), + RMNodeLabelsManager.NO_LABEL, getClusterResource())), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); if (assignment.isFulfilledReservation()) { CSAssignment tmp = @@ -1261,14 +1261,14 @@ public class CapacityScheduler extends } assignment = root.assignContainers( - clusterResource, + getClusterResource(), node, // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( - RMNodeLabelsManager.NO_LABEL, clusterResource)), + RMNodeLabelsManager.NO_LABEL, getClusterResource())), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - if (Resources.greaterThan(calculator, clusterResource, + if (Resources.greaterThan(calculator, getClusterResource(), assignment.getResource(), Resources.none())) { updateSchedulerHealth(lastNodeUpdateTime, node, assignment); return; @@ -1294,12 +1294,12 @@ public class CapacityScheduler extends // Try to use NON_EXCLUSIVE assignment = root.assignContainers( - clusterResource, + getClusterResource(), node, // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( - RMNodeLabelsManager.NO_LABEL, clusterResource)), + RMNodeLabelsManager.NO_LABEL, getClusterResource())), SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); updateSchedulerHealth(lastNodeUpdateTime, node, assignment); } @@ -1451,24 +1451,22 @@ public class CapacityScheduler extends private synchronized void addNode(RMNode nodeManager) { FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, usePortForNodeName, nodeManager.getNodeLabels()); - this.nodes.put(nodeManager.getNodeID(), schedulerNode); - Resources.addTo(clusterResource, schedulerNode.getTotalResource()); + nodeTracker.addNode(schedulerNode); // update this node to node label manager if (labelManager != null) { labelManager.activateNode(nodeManager.getNodeID(), schedulerNode.getTotalResource()); } - + + Resource clusterResource = getClusterResource(); root.updateClusterResource(clusterResource, new ResourceLimits( clusterResource)); - int numNodes = numNodeManagers.incrementAndGet(); - updateMaximumAllocation(schedulerNode, true); - + LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); - if (scheduleAsynchronously && numNodes == 1) { + if (scheduleAsynchronously && getNumClusterNodes() == 1) { asyncSchedulerThread.beginSchedule(); } } @@ -1478,20 +1476,14 @@ public class CapacityScheduler extends if (labelManager != null) { labelManager.deactivateNode(nodeInfo.getNodeID()); } - - FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID()); + + NodeId nodeId = nodeInfo.getNodeID(); + FiCaSchedulerNode node = nodeTracker.getNode(nodeId); if (node == null) { + LOG.error("Attempting to remove non-existent node " + nodeId); return; } - Resources.subtractFrom(clusterResource, node.getTotalResource()); - root.updateClusterResource(clusterResource, new ResourceLimits( - clusterResource)); - int numNodes = numNodeManagers.decrementAndGet(); - if (scheduleAsynchronously && numNodes == 0) { - asyncSchedulerThread.suspendSchedule(); - } - // Remove running containers List runningContainers = node.getRunningContainers(); for (RMContainer container : runningContainers) { @@ -1512,11 +1504,18 @@ public class CapacityScheduler extends RMContainerEventType.KILL); } - this.nodes.remove(nodeInfo.getNodeID()); - updateMaximumAllocation(node, false); + nodeTracker.removeNode(nodeId); + Resource clusterResource = getClusterResource(); + root.updateClusterResource(clusterResource, new ResourceLimits( + clusterResource)); + int numNodes = nodeTracker.nodeCount(); + + if (scheduleAsynchronously && numNodes == 0) { + asyncSchedulerThread.suspendSchedule(); + } LOG.info("Removed node " + nodeInfo.getNodeAddress() + - " clusterResource: " + clusterResource); + " clusterResource: " + getClusterResource()); } private void rollbackContainerResource( @@ -1568,7 +1567,7 @@ public class CapacityScheduler extends // Inform the queue LeafQueue queue = (LeafQueue)application.getQueue(); - queue.completedContainer(clusterResource, application, node, + queue.completedContainer(getClusterResource(), application, node, rmContainer, containerStatus, event, null, true); if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) { @@ -1594,7 +1593,7 @@ public class CapacityScheduler extends FiCaSchedulerApp app = (FiCaSchedulerApp)attempt; LeafQueue queue = (LeafQueue) attempt.getQueue(); try { - queue.decreaseContainer(clusterResource, decreaseRequest, app); + queue.decreaseContainer(getClusterResource(), decreaseRequest, app); // Notify RMNode that the container can be pulled by NodeManager in the // next heartbeat this.rmContext.getDispatcher().getEventHandler() @@ -1617,14 +1616,9 @@ public class CapacityScheduler extends @Lock(Lock.NoLock.class) public FiCaSchedulerNode getNode(NodeId nodeId) { - return nodes.get(nodeId); + return nodeTracker.getNode(nodeId); } - @Lock(Lock.NoLock.class) - Map getAllNodes() { - return nodes; - } - @Override @Lock(Lock.NoLock.class) public void recover(RMState state) throws Exception { @@ -1869,9 +1863,9 @@ public class CapacityScheduler extends } // Move all live containers for (RMContainer rmContainer : app.getLiveContainers()) { - source.detachContainer(clusterResource, app, rmContainer); + source.detachContainer(getClusterResource(), app, rmContainer); // attach the Container to another queue - dest.attachContainer(clusterResource, app, rmContainer); + dest.attachContainer(getClusterResource(), app, rmContainer); } // Detach the application.. source.finishApplicationAttempt(app, sourceQueueName); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index f1cefad6c48..e426da621f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -86,7 +86,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // Key = RackName, Value = Set of Nodes reserved by app on rack private Map> reservations = new HashMap<>(); - private List blacklistNodeIds = new ArrayList(); + private List blacklistNodeIds = new ArrayList<>(); /** * Delay scheduling: We often want to prioritize scheduling of node-local * containers over rack-local or off-switch containers. To achieve this @@ -185,14 +185,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt Resource availableResources) { if (appSchedulingInfo.getAndResetBlacklistChanged()) { blacklistNodeIds.clear(); - scheduler.addBlacklistedNodeIdsToList(this, blacklistNodeIds); + blacklistNodeIds.addAll(scheduler.getBlacklistedNodes(this)); } - for (NodeId nodeId: blacklistNodeIds) { - SchedulerNode node = scheduler.getSchedulerNode(nodeId); - if (node != null) { - Resources.subtractFrom(availableResources, - node.getUnallocatedResource()); - } + for (FSSchedulerNode node: blacklistNodeIds) { + Resources.subtractFrom(availableResources, + node.getUnallocatedResource()); } if (availableResources.getMemory() < 0) { availableResources.setMemory(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 917fc8a834c..ba90e2132dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -20,13 +20,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -186,14 +184,11 @@ public class FairScheduler extends private float reservableNodesRatio; // percentage of available nodes // an app can be reserved on - // Count of number of nodes per rack - private Map nodesPerRack = new ConcurrentHashMap<>(); - protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling - private Comparator nodeAvailableResourceComparator = + private Comparator nodeAvailableResourceComparator = new NodeAvailableResourceComparator(); // Node available resource comparator protected double nodeLocalityThreshold; // Cluster threshold for node locality protected double rackLocalityThreshold; // Cluster threshold for rack locality @@ -225,8 +220,8 @@ public class FairScheduler extends public boolean isAtLeastReservationThreshold( ResourceCalculator resourceCalculator, Resource resource) { - return Resources.greaterThanOrEqual( - resourceCalculator, clusterResource, resource, reservationThreshold); + return Resources.greaterThanOrEqual(resourceCalculator, + getClusterResource(), resource, reservationThreshold); } private void validateConf(Configuration conf) { @@ -272,11 +267,7 @@ public class FairScheduler extends } public int getNumNodesInRack(String rackName) { - String rName = rackName == null ? "NULL" : rackName; - if (nodesPerRack.containsKey(rName)) { - return nodesPerRack.get(rName); - } - return 0; + return nodeTracker.nodeCount(rackName); } public QueueManager getQueueManager() { @@ -352,6 +343,7 @@ public class FairScheduler extends // Recursively update demands for all queues rootQueue.updateDemand(); + Resource clusterResource = getClusterResource(); rootQueue.setFairShare(clusterResource); // Recursively compute fair shares for all queues // and update metrics @@ -526,6 +518,7 @@ public class FairScheduler extends Resource resDueToMinShare = Resources.none(); Resource resDueToFairShare = Resources.none(); ResourceCalculator calc = sched.getPolicy().getResourceCalculator(); + Resource clusterResource = getClusterResource(); if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { Resource target = Resources.componentwiseMin( sched.getMinShare(), sched.getDemand()); @@ -577,7 +570,7 @@ public class FairScheduler extends } private FSSchedulerNode getFSSchedulerNode(NodeId nodeId) { - return nodes.get(nodeId); + return nodeTracker.getNode(nodeId); } public double getNodeLocalityThreshold() { @@ -882,18 +875,11 @@ public class FairScheduler extends private synchronized void addNode(List containerReports, RMNode node) { FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName); - nodes.put(node.getNodeID(), schedulerNode); - String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); - if (nodesPerRack.containsKey(rackName)) { - nodesPerRack.put(rackName, nodesPerRack.get(rackName) + 1); - } else { - nodesPerRack.put(rackName, 1); - } - Resources.addTo(clusterResource, schedulerNode.getTotalResource()); - updateMaximumAllocation(schedulerNode, true); + nodeTracker.addNode(schedulerNode); triggerUpdate(); + Resource clusterResource = getClusterResource(); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); LOG.info("Added node " + node.getNodeAddress() + @@ -904,15 +890,12 @@ public class FairScheduler extends } private synchronized void removeNode(RMNode rmNode) { - FSSchedulerNode node = getFSSchedulerNode(rmNode.getNodeID()); - // This can occur when an UNHEALTHY node reconnects + NodeId nodeId = rmNode.getNodeID(); + FSSchedulerNode node = nodeTracker.getNode(nodeId); if (node == null) { + LOG.error("Attempting to remove non-existent node " + nodeId); return; } - Resources.subtractFrom(clusterResource, node.getTotalResource()); - updateRootQueueMetrics(); - - triggerUpdate(); // Remove running containers List runningContainers = node.getRunningContainers(); @@ -934,18 +917,13 @@ public class FairScheduler extends RMContainerEventType.KILL); } - nodes.remove(rmNode.getNodeID()); - String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); - if (nodesPerRack.containsKey(rackName) - && (nodesPerRack.get(rackName) > 0)) { - nodesPerRack.put(rackName, nodesPerRack.get(rackName) - 1); - } else { - LOG.error("Node [" + rmNode.getNodeAddress() + "] being removed from" + - " unknown rack [" + rackName + "] !!"); - } + nodeTracker.removeNode(nodeId); + Resource clusterResource = getClusterResource(); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); - updateMaximumAllocation(node, false); + updateRootQueueMetrics(); + triggerUpdate(); + LOG.info("Removed node " + rmNode.getNodeAddress() + " cluster capacity: " + clusterResource); } @@ -967,7 +945,7 @@ public class FairScheduler extends // Sanity check SchedulerUtils.normalizeRequests(ask, DOMINANT_RESOURCE_CALCULATOR, - clusterResource, minimumAllocation, getMaximumResourceCapability(), + getClusterResource(), minimumAllocation, getMaximumResourceCapability(), incrAllocation); // Record container allocation start time @@ -1034,7 +1012,8 @@ public class FairScheduler extends private synchronized void nodeUpdate(RMNode nm) { long start = getClock().getTime(); if (LOG.isDebugEnabled()) { - LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource); + LOG.debug("nodeUpdate: " + nm + + " cluster capacity: " + getClusterResource()); } eventLog.log("HEARTBEAT", nm.getHostName()); FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID()); @@ -1091,20 +1070,13 @@ public class FairScheduler extends void continuousSchedulingAttempt() throws InterruptedException { long start = getClock().getTime(); - List nodeIdList = new ArrayList(nodes.keySet()); - // Sort the nodes by space available on them, so that we offer - // containers on emptier nodes first, facilitating an even spread. This - // requires holding the scheduler lock, so that the space available on a - // node doesn't change during the sort. - synchronized (this) { - Collections.sort(nodeIdList, nodeAvailableResourceComparator); - } + List nodeIdList = + nodeTracker.sortedNodeList(nodeAvailableResourceComparator); // iterate all nodes - for (NodeId nodeId : nodeIdList) { - FSSchedulerNode node = getFSSchedulerNode(nodeId); + for (FSSchedulerNode node : nodeIdList) { try { - if (node != null && Resources.fitsIn(minimumAllocation, + if (Resources.fitsIn(minimumAllocation, node.getUnallocatedResource())) { attemptScheduling(node); } @@ -1126,19 +1098,14 @@ public class FairScheduler extends } /** Sort nodes by available resource */ - private class NodeAvailableResourceComparator implements Comparator { + private class NodeAvailableResourceComparator + implements Comparator { @Override - public int compare(NodeId n1, NodeId n2) { - if (!nodes.containsKey(n1)) { - return 1; - } - if (!nodes.containsKey(n2)) { - return -1; - } - return RESOURCE_CALCULATOR.compare(clusterResource, - nodes.get(n2).getUnallocatedResource(), - nodes.get(n1).getUnallocatedResource()); + public int compare(FSSchedulerNode n1, FSSchedulerNode n2) { + return RESOURCE_CALCULATOR.compare(getClusterResource(), + n2.getUnallocatedResource(), + n1.getUnallocatedResource()); } } @@ -1150,7 +1117,7 @@ public class FairScheduler extends } final NodeId nodeID = node.getNodeID(); - if (!nodes.containsKey(nodeID)) { + if (!nodeTracker.exists(nodeID)) { // The node might have just been removed while this thread was waiting // on the synchronized lock before it entered this synchronized method LOG.info("Skipping scheduling as the node " + nodeID + @@ -1203,7 +1170,7 @@ public class FairScheduler extends private void updateRootQueueMetrics() { rootMetrics.setAvailableResourcesToQueue( Resources.subtract( - clusterResource, rootMetrics.getAllocatedResources())); + getClusterResource(), rootMetrics.getAllocatedResources())); } /** @@ -1214,6 +1181,7 @@ public class FairScheduler extends */ private boolean shouldAttemptPreemption() { if (preemptionEnabled) { + Resource clusterResource = getClusterResource(); return (preemptionUtilizationThreshold < Math.max( (float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(), (float) rootMetrics.getAllocatedVirtualCores() / @@ -1547,7 +1515,7 @@ public class FairScheduler extends @Override public int getNumClusterNodes() { - return nodes.size(); + return nodeTracker.nodeCount(); } @Override @@ -1577,7 +1545,7 @@ public class FairScheduler extends // if it does not already exist, so it can be displayed on the web UI. synchronized (FairScheduler.this) { allocConf = queueInfo; - allocConf.getDefaultSchedulingPolicy().initialize(clusterResource); + allocConf.getDefaultSchedulingPolicy().initialize(getClusterResource()); queueMgr.updateAllocationConfiguration(allocConf); maxRunningEnforcer.updateRunnabilityOnReload(); } @@ -1721,7 +1689,7 @@ public class FairScheduler extends ResourceOption resourceOption) { super.updateNodeResource(nm, resourceOption); updateRootQueueMetrics(); - queueMgr.getRootQueue().setSteadyFairShare(clusterResource); + queueMgr.getRootQueue().setSteadyFairShare(getClusterResource()); queueMgr.getRootQueue().recomputeSteadyShares(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 147c3f38f61..cf125011f95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -142,6 +142,7 @@ public class FifoScheduler extends QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName()); queueInfo.setCapacity(1.0f); + Resource clusterResource = getClusterResource(); if (clusterResource.getMemory() == 0) { queueInfo.setCurrentCapacity(0.0f); } else { @@ -297,7 +298,7 @@ public class FifoScheduler extends @Override public int getNumClusterNodes() { - return nodes.size(); + return nodeTracker.nodeCount(); } @Override @@ -327,7 +328,8 @@ public class FifoScheduler extends // Sanity check SchedulerUtils.normalizeRequests(ask, resourceCalculator, - clusterResource, minimumAllocation, getMaximumResourceCapability()); + getClusterResource(), minimumAllocation, + getMaximumResourceCapability()); // Release containers releaseContainers(release, application); @@ -377,7 +379,7 @@ public class FifoScheduler extends } private FiCaSchedulerNode getNode(NodeId nodeId) { - return nodes.get(nodeId); + return nodeTracker.getNode(nodeId); } @VisibleForTesting @@ -526,7 +528,7 @@ public class FifoScheduler extends application.showRequests(); // Done - if (Resources.lessThan(resourceCalculator, clusterResource, + if (Resources.lessThan(resourceCalculator, getClusterResource(), node.getUnallocatedResource(), minimumAllocation)) { break; } @@ -764,7 +766,7 @@ public class FifoScheduler extends return; } - if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, + if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), node.getUnallocatedResource(), minimumAllocation)) { LOG.debug("Node heartbeat " + rmNode.getNodeID() + " available resource = " + node.getUnallocatedResource()); @@ -783,13 +785,13 @@ public class FifoScheduler extends } private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) { - schedulerAttempt.setHeadroom(Resources.subtract(clusterResource, + schedulerAttempt.setHeadroom(Resources.subtract(getClusterResource(), usedResource)); } private void updateAvailableResourcesMetrics() { - metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource, - usedResource)); + metrics.setAvailableResourcesToQueue( + Resources.subtract(getClusterResource(), usedResource)); } @Override @@ -925,7 +927,7 @@ public class FifoScheduler extends private Resource usedResource = recordFactory.newRecordInstance(Resource.class); private synchronized void removeNode(RMNode nodeInfo) { - FiCaSchedulerNode node = getNode(nodeInfo.getNodeID()); + FiCaSchedulerNode node = nodeTracker.getNode(nodeInfo.getNodeID()); if (node == null) { return; } @@ -937,13 +939,7 @@ public class FifoScheduler extends SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); } - - //Remove the node - this.nodes.remove(nodeInfo.getNodeID()); - updateMaximumAllocation(node, false); - - // Update cluster metrics - Resources.subtractFrom(clusterResource, node.getTotalResource()); + nodeTracker.removeNode(nodeInfo.getNodeID()); } @Override @@ -965,9 +961,7 @@ public class FifoScheduler extends private synchronized void addNode(RMNode nodeManager) { FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, usePortForNodeName); - this.nodes.put(nodeManager.getNodeID(), schedulerNode); - Resources.addTo(clusterResource, schedulerNode.getTotalResource()); - updateMaximumAllocation(schedulerNode, true); + nodeTracker.addNode(schedulerNode); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index e7ba58d9d38..81c8fe6d738 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -300,22 +300,16 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase { verifyMaximumResourceCapability(configuredMaximumResource, scheduler); - scheduler.nodes = new HashMap(); - - scheduler.nodes.put(mockNode1.getNodeID(), mockNode1); - scheduler.updateMaximumAllocation(mockNode1, true); + scheduler.nodeTracker.addNode(mockNode1); verifyMaximumResourceCapability(fullResource1, scheduler); - scheduler.nodes.put(mockNode2.getNodeID(), mockNode2); - scheduler.updateMaximumAllocation(mockNode2, true); + scheduler.nodeTracker.addNode(mockNode2); verifyMaximumResourceCapability(fullResource2, scheduler); - scheduler.nodes.remove(mockNode2.getNodeID()); - scheduler.updateMaximumAllocation(mockNode2, false); + scheduler.nodeTracker.removeNode(mockNode2.getNodeID()); verifyMaximumResourceCapability(fullResource1, scheduler); - scheduler.nodes.remove(mockNode1.getNodeID()); - scheduler.updateMaximumAllocation(mockNode1, false); + scheduler.nodeTracker.removeNode(mockNode1.getNodeID()); verifyMaximumResourceCapability(configuredMaximumResource, scheduler); } finally { rm.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 9047138256c..2ef5e390318 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -183,6 +183,7 @@ public class TestReservations { } @Test + @SuppressWarnings("unchecked") public void testReservation() throws Exception { // Test that we now unreserve and use a node that has space @@ -231,9 +232,9 @@ public class TestReservations { when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); - cs.getAllNodes().put(node_0.getNodeID(), node_0); - cs.getAllNodes().put(node_1.getNodeID(), node_1); - cs.getAllNodes().put(node_2.getNodeID(), node_2); + cs.getNodeTracker().addNode(node_0); + cs.getNodeTracker().addNode(node_1); + cs.getNodeTracker().addNode(node_2); final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); @@ -346,6 +347,7 @@ public class TestReservations { // Test that hitting a reservation limit and needing to unreserve // does not affect assigning containers for other users @Test + @SuppressWarnings("unchecked") public void testReservationLimitOtherUsers() throws Exception { CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); setup(csConf, true); @@ -395,9 +397,9 @@ public class TestReservations { when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); - cs.getAllNodes().put(node_0.getNodeID(), node_0); - cs.getAllNodes().put(node_1.getNodeID(), node_1); - cs.getAllNodes().put(node_2.getNodeID(), node_2); + cs.getNodeTracker().addNode(node_0); + cs.getNodeTracker().addNode(node_1); + cs.getNodeTracker().addNode(node_2); final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); @@ -641,6 +643,7 @@ public class TestReservations { } @Test + @SuppressWarnings("unchecked") public void testAssignContainersNeedToUnreserve() throws Exception { // Test that we now unreserve and use a node that has space Logger rootLogger = LogManager.getRootLogger(); @@ -684,8 +687,8 @@ public class TestReservations { FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8 * GB); - cs.getAllNodes().put(node_0.getNodeID(), node_0); - cs.getAllNodes().put(node_1.getNodeID(), node_1); + cs.getNodeTracker().addNode(node_0); + cs.getNodeTracker().addNode(node_1); when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 8d7c22e77cd..1add1937690 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -75,12 +75,10 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.resourcemanager.Application; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -108,7 +106,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.resource.Resources; @@ -2751,8 +2748,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2"); - NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent2); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1", "user1", 0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 9bfc283778c..44877fbeddd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -306,12 +306,7 @@ public class TestFifoScheduler { nmTokenSecretManager.rollMasterKey(); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); - FifoScheduler scheduler = new FifoScheduler(){ - @SuppressWarnings("unused") - public Map getNodes(){ - return nodes; - } - }; + FifoScheduler scheduler = new FifoScheduler(); RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, null, containerTokenSecretManager, nmTokenSecretManager, null, scheduler); rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class)); @@ -331,11 +326,7 @@ public class TestFifoScheduler { NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0); scheduler.handle(nodeEvent1); - Method method = scheduler.getClass().getDeclaredMethod("getNodes"); - @SuppressWarnings("unchecked") - Map schedulerNodes = - (Map) method.invoke(scheduler); - assertEquals(schedulerNodes.values().size(), 1); + assertEquals(scheduler.getNumClusterNodes(), 1); Resource newResource = Resources.createResource(1024, 4); @@ -345,9 +336,9 @@ public class TestFifoScheduler { scheduler.handle(node0ResourceUpdate); // SchedulerNode's total resource and available resource are changed. - assertEquals(schedulerNodes.get(node0.getNodeID()).getTotalResource() - .getMemory(), 1024); - assertEquals(schedulerNodes.get(node0.getNodeID()). + assertEquals(1024, scheduler.getNodeTracker().getNode(node0.getNodeID()) + .getTotalResource().getMemory()); + assertEquals(1024, scheduler.getNodeTracker().getNode(node0.getNodeID()). getUnallocatedResource().getMemory(), 1024); QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false); Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity(), 0.0f);