YARN-4719. Add a helper library to maintain node state and allows common queries. (kasha)

This commit is contained in:
Karthik Kambatla 2016-03-14 14:19:05 -07:00
parent 5644137ada
commit 20d389ce61
11 changed files with 478 additions and 319 deletions

View File

@ -27,11 +27,7 @@
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -92,22 +88,10 @@ public abstract class AbstractYarnScheduler
private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
// Nodes in the cluster, indexed by NodeId
protected Map<NodeId, N> nodes = new ConcurrentHashMap<NodeId, N>();
// Whole capacity of the cluster
protected Resource clusterResource = Resource.newInstance(0, 0);
protected final ClusterNodeTracker<N> nodeTracker =
new ClusterNodeTracker<>();
protected Resource minimumAllocation;
protected Resource maximumAllocation;
private Resource configuredMaximumAllocation;
private int maxNodeMemory = -1;
private int maxNodeVCores = -1;
private final ReadLock maxAllocReadLock;
private final WriteLock maxAllocWriteLock;
private boolean useConfiguredMaximumAllocationOnly = true;
private long configuredMaximumAllocationWaitTime;
protected RMContext rmContext;
@ -132,9 +116,6 @@ public abstract class AbstractYarnScheduler
*/
public AbstractYarnScheduler(String name) {
super(name);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.maxAllocReadLock = lock.readLock();
this.maxAllocWriteLock = lock.writeLock();
}
@Override
@ -142,14 +123,21 @@ public void serviceInit(Configuration conf) throws Exception {
nmExpireInterval =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
configuredMaximumAllocationWaitTime =
long configuredMaximumAllocationWaitTime =
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
nodeTracker.setConfiguredMaxAllocationWaitTime(
configuredMaximumAllocationWaitTime);
maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);
createReleaseCache();
super.serviceInit(conf);
}
@VisibleForTesting
public ClusterNodeTracker getNodeTracker() {
return nodeTracker;
}
public List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {
ApplicationId appId = currentAttempt.getApplicationId();
@ -184,20 +172,21 @@ public List<Container> getTransferredContainers(
* Add blacklisted NodeIds to the list that is passed.
*
* @param app application attempt.
* @param blacklistNodeIdList the list to store blacklisted NodeIds.
*/
public void addBlacklistedNodeIdsToList(SchedulerApplicationAttempt app,
List<NodeId> blacklistNodeIdList) {
for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
if (SchedulerAppUtils.isBlacklisted(app, nodeEntry.getValue(), LOG)) {
blacklistNodeIdList.add(nodeEntry.getKey());
public List<N> getBlacklistedNodes(final SchedulerApplicationAttempt app) {
NodeFilter nodeFilter = new NodeFilter() {
@Override
public boolean accept(SchedulerNode node) {
return SchedulerAppUtils.isBlacklisted(app, node, LOG);
}
}
};
return nodeTracker.getNodes(nodeFilter);
}
@Override
public Resource getClusterResource() {
return clusterResource;
return nodeTracker.getClusterCapacity();
}
@Override
@ -207,22 +196,7 @@ public Resource getMinimumResourceCapability() {
@Override
public Resource getMaximumResourceCapability() {
Resource maxResource;
maxAllocReadLock.lock();
try {
if (useConfiguredMaximumAllocationOnly) {
if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
> configuredMaximumAllocationWaitTime) {
useConfiguredMaximumAllocationOnly = false;
}
maxResource = Resources.clone(configuredMaximumAllocation);
} else {
maxResource = Resources.clone(maximumAllocation);
}
} finally {
maxAllocReadLock.unlock();
}
return maxResource;
return nodeTracker.getMaxAllowedAllocation();
}
@Override
@ -231,15 +205,7 @@ public Resource getMaximumResourceCapability(String queueName) {
}
protected void initMaximumResourceCapability(Resource maximumAllocation) {
maxAllocWriteLock.lock();
try {
if (this.configuredMaximumAllocation == null) {
this.configuredMaximumAllocation = Resources.clone(maximumAllocation);
this.maximumAllocation = Resources.clone(maximumAllocation);
}
} finally {
maxAllocWriteLock.unlock();
}
nodeTracker.setConfiguredMaxAllocation(maximumAllocation);
}
protected synchronized void containerLaunchedOnNode(
@ -332,8 +298,7 @@ public RMContainer getRMContainer(ContainerId containerId) {
@Override
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
N node = nodes.get(nodeId);
return node == null ? null : new SchedulerNodeReport(node);
return nodeTracker.getNodeReport(nodeId);
}
@Override
@ -431,12 +396,13 @@ public synchronized void recoverContainersOnNode(
container));
// recover scheduler node
SchedulerNode schedulerNode = nodes.get(nm.getNodeID());
SchedulerNode schedulerNode = nodeTracker.getNode(nm.getNodeID());
schedulerNode.recoverContainer(rmContainer);
// recover queue: update headroom etc.
Queue queue = schedulerAttempt.getQueue();
queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);
queue.recoverContainer(
getClusterResource(), schedulerAttempt, rmContainer);
// recover scheduler attempt
schedulerAttempt.recoverContainer(schedulerNode, rmContainer);
@ -621,7 +587,7 @@ protected abstract void decreaseContainer(
@Override
public SchedulerNode getSchedulerNode(NodeId nodeId) {
return nodes.get(nodeId);
return nodeTracker.getNode(nodeId);
}
@Override
@ -690,18 +656,12 @@ public synchronized void updateNodeResource(RMNode nm,
+ " from: " + oldResource + ", to: "
+ newResource);
nodes.remove(nm.getNodeID());
updateMaximumAllocation(node, false);
nodeTracker.removeNode(nm.getNodeID());
// update resource to node
node.setTotalResource(newResource);
nodes.put(nm.getNodeID(), (N)node);
updateMaximumAllocation(node, true);
// update resource to clusterResource
Resources.subtractFrom(clusterResource, oldResource);
Resources.addTo(clusterResource, newResource);
nodeTracker.addNode((N) node);
} else {
// Log resource change
LOG.warn("Update resource on node: " + node.getNodeName()
@ -721,80 +681,8 @@ public Set<String> getPlanQueues() throws YarnException {
+ " does not support reservations");
}
protected void updateMaximumAllocation(SchedulerNode node, boolean add) {
Resource totalResource = node.getTotalResource();
maxAllocWriteLock.lock();
try {
if (add) { // added node
int nodeMemory = totalResource.getMemory();
if (nodeMemory > maxNodeMemory) {
maxNodeMemory = nodeMemory;
maximumAllocation.setMemory(Math.min(
configuredMaximumAllocation.getMemory(), maxNodeMemory));
}
int nodeVCores = totalResource.getVirtualCores();
if (nodeVCores > maxNodeVCores) {
maxNodeVCores = nodeVCores;
maximumAllocation.setVirtualCores(Math.min(
configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
}
} else { // removed node
if (maxNodeMemory == totalResource.getMemory()) {
maxNodeMemory = -1;
}
if (maxNodeVCores == totalResource.getVirtualCores()) {
maxNodeVCores = -1;
}
// We only have to iterate through the nodes if the current max memory
// or vcores was equal to the removed node's
if (maxNodeMemory == -1 || maxNodeVCores == -1) {
for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
int nodeMemory =
nodeEntry.getValue().getTotalResource().getMemory();
if (nodeMemory > maxNodeMemory) {
maxNodeMemory = nodeMemory;
}
int nodeVCores =
nodeEntry.getValue().getTotalResource().getVirtualCores();
if (nodeVCores > maxNodeVCores) {
maxNodeVCores = nodeVCores;
}
}
if (maxNodeMemory == -1) { // no nodes
maximumAllocation.setMemory(configuredMaximumAllocation.getMemory());
} else {
maximumAllocation.setMemory(
Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory));
}
if (maxNodeVCores == -1) { // no nodes
maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores());
} else {
maximumAllocation.setVirtualCores(
Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
}
}
}
} finally {
maxAllocWriteLock.unlock();
}
}
protected void refreshMaximumAllocation(Resource newMaxAlloc) {
maxAllocWriteLock.lock();
try {
configuredMaximumAllocation = Resources.clone(newMaxAlloc);
int maxMemory = newMaxAlloc.getMemory();
if (maxNodeMemory != -1) {
maxMemory = Math.min(maxMemory, maxNodeMemory);
}
int maxVcores = newMaxAlloc.getVirtualCores();
if (maxNodeVCores != -1) {
maxVcores = Math.min(maxVcores, maxNodeVCores);
}
maximumAllocation = Resources.createResource(maxMemory, maxVcores);
} finally {
maxAllocWriteLock.unlock();
}
nodeTracker.setConfiguredMaxAllocation(newMaxAlloc);
}
@Override

View File

@ -0,0 +1,300 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Helper library that:
* - tracks the state of all cluster {@link SchedulerNode}s
* - provides convenience methods to filter and sort nodes
*/
@InterfaceAudience.Private
public class ClusterNodeTracker<N extends SchedulerNode> {
private static final Log LOG = LogFactory.getLog(ClusterNodeTracker.class);
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
private Lock readLock = readWriteLock.readLock();
private Lock writeLock = readWriteLock.writeLock();
private HashMap<NodeId, N> nodes = new HashMap<>();
private Map<String, Integer> nodesPerRack = new HashMap<>();
private Resource clusterCapacity = Resources.clone(Resources.none());
private Resource staleClusterCapacity = null;
// Max allocation
private int maxNodeMemory = -1;
private int maxNodeVCores = -1;
private Resource configuredMaxAllocation;
private boolean forceConfiguredMaxAllocation = true;
private long configuredMaxAllocationWaitTime;
public void addNode(N node) {
writeLock.lock();
try {
nodes.put(node.getNodeID(), node);
// Update nodes per rack as well
String rackName = node.getRackName();
Integer numNodes = nodesPerRack.get(rackName);
if (numNodes == null) {
numNodes = 0;
}
nodesPerRack.put(rackName, ++numNodes);
// Update cluster capacity
Resources.addTo(clusterCapacity, node.getTotalResource());
// Update maximumAllocation
updateMaxResources(node, true);
} finally {
writeLock.unlock();
}
}
public boolean exists(NodeId nodeId) {
readLock.lock();
try {
return nodes.containsKey(nodeId);
} finally {
readLock.unlock();
}
}
public N getNode(NodeId nodeId) {
readLock.lock();
try {
return nodes.get(nodeId);
} finally {
readLock.unlock();
}
}
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
readLock.lock();
try {
N n = nodes.get(nodeId);
return n == null ? null : new SchedulerNodeReport(n);
} finally {
readLock.unlock();
}
}
public int nodeCount() {
readLock.lock();
try {
return nodes.size();
} finally {
readLock.unlock();
}
}
public int nodeCount(String rackName) {
readLock.lock();
String rName = rackName == null ? "NULL" : rackName;
try {
Integer nodeCount = nodesPerRack.get(rName);
return nodeCount == null ? 0 : nodeCount;
} finally {
readLock.unlock();
}
}
public Resource getClusterCapacity() {
readLock.lock();
try {
if (staleClusterCapacity == null ||
!Resources.equals(staleClusterCapacity, clusterCapacity)) {
staleClusterCapacity = Resources.clone(clusterCapacity);
}
return staleClusterCapacity;
} finally {
readLock.unlock();
}
}
public N removeNode(NodeId nodeId) {
writeLock.lock();
try {
N node = nodes.remove(nodeId);
if (node == null) {
LOG.warn("Attempting to remove a non-existent node " + nodeId);
return null;
}
// Update nodes per rack as well
String rackName = node.getRackName();
Integer numNodes = nodesPerRack.get(rackName);
if (numNodes > 0) {
nodesPerRack.put(rackName, --numNodes);
} else {
LOG.error("Attempting to remove node from an empty rack " + rackName);
}
// Update cluster capacity
Resources.subtractFrom(clusterCapacity, node.getTotalResource());
// Update maximumAllocation
updateMaxResources(node, false);
return node;
} finally {
writeLock.unlock();
}
}
public void setConfiguredMaxAllocation(Resource resource) {
writeLock.lock();
try {
configuredMaxAllocation = Resources.clone(resource);
} finally {
writeLock.unlock();
}
}
public void setConfiguredMaxAllocationWaitTime(
long configuredMaxAllocationWaitTime) {
writeLock.lock();
try {
this.configuredMaxAllocationWaitTime =
configuredMaxAllocationWaitTime;
} finally {
writeLock.unlock();
}
}
public Resource getMaxAllowedAllocation() {
readLock.lock();
try {
if (forceConfiguredMaxAllocation &&
System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
> configuredMaxAllocationWaitTime) {
forceConfiguredMaxAllocation = false;
}
if (forceConfiguredMaxAllocation
|| maxNodeMemory == -1 || maxNodeVCores == -1) {
return configuredMaxAllocation;
}
return Resources.createResource(
Math.min(configuredMaxAllocation.getMemory(), maxNodeMemory),
Math.min(configuredMaxAllocation.getVirtualCores(), maxNodeVCores)
);
} finally {
readLock.unlock();
}
}
private void updateMaxResources(SchedulerNode node, boolean add) {
Resource totalResource = node.getTotalResource();
writeLock.lock();
try {
if (add) { // added node
int nodeMemory = totalResource.getMemory();
if (nodeMemory > maxNodeMemory) {
maxNodeMemory = nodeMemory;
}
int nodeVCores = totalResource.getVirtualCores();
if (nodeVCores > maxNodeVCores) {
maxNodeVCores = nodeVCores;
}
} else { // removed node
if (maxNodeMemory == totalResource.getMemory()) {
maxNodeMemory = -1;
}
if (maxNodeVCores == totalResource.getVirtualCores()) {
maxNodeVCores = -1;
}
// We only have to iterate through the nodes if the current max memory
// or vcores was equal to the removed node's
if (maxNodeMemory == -1 || maxNodeVCores == -1) {
// Treat it like an empty cluster and add nodes
for (N n : nodes.values()) {
updateMaxResources(n, true);
}
}
}
} finally {
writeLock.unlock();
}
}
public List<N> getAllNodes() {
return getNodes(null);
}
/**
* Convenience method to filter nodes based on a condition.
*/
public List<N> getNodes(NodeFilter nodeFilter) {
List<N> nodeList = new ArrayList<>();
readLock.lock();
try {
if (nodeFilter == null) {
nodeList.addAll(nodes.values());
} else {
for (N node : nodes.values()) {
if (nodeFilter.accept(node)) {
nodeList.add(node);
}
}
}
} finally {
readLock.unlock();
}
return nodeList;
}
/**
* Convenience method to sort nodes.
*
* Note that the sort is performed without holding a lock. We are sorting
* here instead of on the caller to allow for future optimizations (e.g.
* sort once every x milliseconds).
*/
public List<N> sortedNodeList(Comparator<N> comparator) {
List<N> sortedList = null;
readLock.lock();
try {
sortedList = new ArrayList(nodes.values());
} finally {
readLock.unlock();
}
Collections.sort(sortedList, comparator);
return sortedList;
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Convenience way to filter nodes based on a criteria. To be used in
* conjunction with {@link ClusterNodeTracker}
*/
@InterfaceAudience.Private
public interface NodeFilter {
/**
* Criteria to accept node in the filtered list.
*/
boolean accept(SchedulerNode node);
}

View File

@ -34,7 +34,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@ -219,8 +218,6 @@ public Configuration getConf() {
private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
private AtomicInteger numNodeManagers = new AtomicInteger(0);
private ResourceCalculator calculator;
private boolean usePortForNodeName;
@ -280,7 +277,7 @@ public PartitionedQueueComparator getPartitionedQueueComparator() {
@Override
public int getNumClusterNodes() {
return numNodeManagers.get();
return nodeTracker.nodeCount();
}
@Override
@ -387,7 +384,7 @@ long getAsyncScheduleInterval() {
static void schedule(CapacityScheduler cs) {
// First randomize the start point
int current = 0;
Collection<FiCaSchedulerNode> nodes = cs.getAllNodes().values();
Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes();
int start = random.nextInt(nodes.size());
for (FiCaSchedulerNode node : nodes) {
if (current++ >= start) {
@ -524,10 +521,11 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf)
addNewQueues(queues, newQueues);
// Re-configure queues
root.reinitialize(newRoot, clusterResource);
root.reinitialize(newRoot, getClusterResource());
updatePlacementRules();
// Re-calculate headroom for active applications
Resource clusterResource = getClusterResource();
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
@ -995,7 +993,7 @@ ask, getResourceCalculator(), getClusterResource(),
allocation = application.getAllocation(getResourceCalculator(),
clusterResource, getMinimumResourceCapability());
getClusterResource(), getMinimumResourceCapability());
}
if (updateDemandForQueue != null && !application
@ -1036,7 +1034,8 @@ public List<QueueUserACLInfo> getQueueUserAclInfo() {
private synchronized void nodeUpdate(RMNode nm) {
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
LOG.debug("nodeUpdate: " + nm +
" clusterResources: " + getClusterResource());
}
Resource releaseResources = Resource.newInstance(0, 0);
@ -1119,6 +1118,7 @@ private synchronized void nodeUpdate(RMNode nm) {
private synchronized void updateNodeAndQueueResource(RMNode nm,
ResourceOption resourceOption) {
updateNodeResource(nm, resourceOption);
Resource clusterResource = getClusterResource();
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
}
@ -1128,7 +1128,7 @@ private synchronized void updateNodeAndQueueResource(RMNode nm,
*/
private synchronized void updateLabelsOnNode(NodeId nodeId,
Set<String> newLabels) {
FiCaSchedulerNode node = nodes.get(nodeId);
FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
if (null == node) {
return;
}
@ -1230,12 +1230,12 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
assignment =
queue.assignContainers(
clusterResource,
getClusterResource(),
node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager.getResourceByLabel(
RMNodeLabelsManager.NO_LABEL, clusterResource)),
RMNodeLabelsManager.NO_LABEL, getClusterResource())),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
if (assignment.isFulfilledReservation()) {
CSAssignment tmp =
@ -1261,14 +1261,14 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
}
assignment = root.assignContainers(
clusterResource,
getClusterResource(),
node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager.getResourceByLabel(
RMNodeLabelsManager.NO_LABEL, clusterResource)),
RMNodeLabelsManager.NO_LABEL, getClusterResource())),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
if (Resources.greaterThan(calculator, clusterResource,
if (Resources.greaterThan(calculator, getClusterResource(),
assignment.getResource(), Resources.none())) {
updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
return;
@ -1294,12 +1294,12 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
// Try to use NON_EXCLUSIVE
assignment = root.assignContainers(
clusterResource,
getClusterResource(),
node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager.getResourceByLabel(
RMNodeLabelsManager.NO_LABEL, clusterResource)),
RMNodeLabelsManager.NO_LABEL, getClusterResource())),
SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY);
updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
}
@ -1451,24 +1451,22 @@ public void handle(SchedulerEvent event) {
private synchronized void addNode(RMNode nodeManager) {
FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
usePortForNodeName, nodeManager.getNodeLabels());
this.nodes.put(nodeManager.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, schedulerNode.getTotalResource());
nodeTracker.addNode(schedulerNode);
// update this node to node label manager
if (labelManager != null) {
labelManager.activateNode(nodeManager.getNodeID(),
schedulerNode.getTotalResource());
}
Resource clusterResource = getClusterResource();
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
int numNodes = numNodeManagers.incrementAndGet();
updateMaximumAllocation(schedulerNode, true);
LOG.info("Added node " + nodeManager.getNodeAddress() +
" clusterResource: " + clusterResource);
if (scheduleAsynchronously && numNodes == 1) {
if (scheduleAsynchronously && getNumClusterNodes() == 1) {
asyncSchedulerThread.beginSchedule();
}
}
@ -1478,20 +1476,14 @@ private synchronized void removeNode(RMNode nodeInfo) {
if (labelManager != null) {
labelManager.deactivateNode(nodeInfo.getNodeID());
}
FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID());
NodeId nodeId = nodeInfo.getNodeID();
FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
if (node == null) {
LOG.error("Attempting to remove non-existent node " + nodeId);
return;
}
Resources.subtractFrom(clusterResource, node.getTotalResource());
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
int numNodes = numNodeManagers.decrementAndGet();
if (scheduleAsynchronously && numNodes == 0) {
asyncSchedulerThread.suspendSchedule();
}
// Remove running containers
List<RMContainer> runningContainers = node.getRunningContainers();
for (RMContainer container : runningContainers) {
@ -1512,11 +1504,18 @@ private synchronized void removeNode(RMNode nodeInfo) {
RMContainerEventType.KILL);
}
this.nodes.remove(nodeInfo.getNodeID());
updateMaximumAllocation(node, false);
nodeTracker.removeNode(nodeId);
Resource clusterResource = getClusterResource();
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
int numNodes = nodeTracker.nodeCount();
if (scheduleAsynchronously && numNodes == 0) {
asyncSchedulerThread.suspendSchedule();
}
LOG.info("Removed node " + nodeInfo.getNodeAddress() +
" clusterResource: " + clusterResource);
" clusterResource: " + getClusterResource());
}
private void rollbackContainerResource(
@ -1568,7 +1567,7 @@ protected void completedContainerInternal(
// Inform the queue
LeafQueue queue = (LeafQueue)application.getQueue();
queue.completedContainer(clusterResource, application, node,
queue.completedContainer(getClusterResource(), application, node,
rmContainer, containerStatus, event, null, true);
if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) {
@ -1594,7 +1593,7 @@ protected void decreaseContainer(SchedContainerChangeRequest decreaseRequest,
FiCaSchedulerApp app = (FiCaSchedulerApp)attempt;
LeafQueue queue = (LeafQueue) attempt.getQueue();
try {
queue.decreaseContainer(clusterResource, decreaseRequest, app);
queue.decreaseContainer(getClusterResource(), decreaseRequest, app);
// Notify RMNode that the container can be pulled by NodeManager in the
// next heartbeat
this.rmContext.getDispatcher().getEventHandler()
@ -1617,14 +1616,9 @@ public FiCaSchedulerApp getApplicationAttempt(
@Lock(Lock.NoLock.class)
public FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
return nodeTracker.getNode(nodeId);
}
@Lock(Lock.NoLock.class)
Map<NodeId, FiCaSchedulerNode> getAllNodes() {
return nodes;
}
@Override
@Lock(Lock.NoLock.class)
public void recover(RMState state) throws Exception {
@ -1869,9 +1863,9 @@ public synchronized String moveApplication(ApplicationId appId,
}
// Move all live containers
for (RMContainer rmContainer : app.getLiveContainers()) {
source.detachContainer(clusterResource, app, rmContainer);
source.detachContainer(getClusterResource(), app, rmContainer);
// attach the Container to another queue
dest.attachContainer(clusterResource, app, rmContainer);
dest.attachContainer(getClusterResource(), app, rmContainer);
}
// Detach the application..
source.finishApplicationAttempt(app, sourceQueueName);

View File

@ -86,7 +86,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// Key = RackName, Value = Set of Nodes reserved by app on rack
private Map<String, Set<String>> reservations = new HashMap<>();
private List<NodeId> blacklistNodeIds = new ArrayList<NodeId>();
private List<FSSchedulerNode> blacklistNodeIds = new ArrayList<>();
/**
* Delay scheduling: We often want to prioritize scheduling of node-local
* containers over rack-local or off-switch containers. To achieve this
@ -185,14 +185,11 @@ private void subtractResourcesOnBlacklistedNodes(
Resource availableResources) {
if (appSchedulingInfo.getAndResetBlacklistChanged()) {
blacklistNodeIds.clear();
scheduler.addBlacklistedNodeIdsToList(this, blacklistNodeIds);
blacklistNodeIds.addAll(scheduler.getBlacklistedNodes(this));
}
for (NodeId nodeId: blacklistNodeIds) {
SchedulerNode node = scheduler.getSchedulerNode(nodeId);
if (node != null) {
Resources.subtractFrom(availableResources,
node.getUnallocatedResource());
}
for (FSSchedulerNode node: blacklistNodeIds) {
Resources.subtractFrom(availableResources,
node.getUnallocatedResource());
}
if (availableResources.getMemory() < 0) {
availableResources.setMemory(0);

View File

@ -20,13 +20,11 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -186,14 +184,11 @@ public class FairScheduler extends
private float reservableNodesRatio; // percentage of available nodes
// an app can be reserved on
// Count of number of nodes per rack
private Map<String, Integer> nodesPerRack = new ConcurrentHashMap<>();
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
private Comparator<NodeId> nodeAvailableResourceComparator =
private Comparator<FSSchedulerNode> nodeAvailableResourceComparator =
new NodeAvailableResourceComparator(); // Node available resource comparator
protected double nodeLocalityThreshold; // Cluster threshold for node locality
protected double rackLocalityThreshold; // Cluster threshold for rack locality
@ -225,8 +220,8 @@ public FairScheduler() {
public boolean isAtLeastReservationThreshold(
ResourceCalculator resourceCalculator, Resource resource) {
return Resources.greaterThanOrEqual(
resourceCalculator, clusterResource, resource, reservationThreshold);
return Resources.greaterThanOrEqual(resourceCalculator,
getClusterResource(), resource, reservationThreshold);
}
private void validateConf(Configuration conf) {
@ -272,11 +267,7 @@ public FairSchedulerConfiguration getConf() {
}
public int getNumNodesInRack(String rackName) {
String rName = rackName == null ? "NULL" : rackName;
if (nodesPerRack.containsKey(rName)) {
return nodesPerRack.get(rName);
}
return 0;
return nodeTracker.nodeCount(rackName);
}
public QueueManager getQueueManager() {
@ -352,6 +343,7 @@ protected synchronized void update() {
// Recursively update demands for all queues
rootQueue.updateDemand();
Resource clusterResource = getClusterResource();
rootQueue.setFairShare(clusterResource);
// Recursively compute fair shares for all queues
// and update metrics
@ -526,6 +518,7 @@ protected Resource resourceDeficit(FSLeafQueue sched, long curTime) {
Resource resDueToMinShare = Resources.none();
Resource resDueToFairShare = Resources.none();
ResourceCalculator calc = sched.getPolicy().getResourceCalculator();
Resource clusterResource = getClusterResource();
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
Resource target = Resources.componentwiseMin(
sched.getMinShare(), sched.getDemand());
@ -577,7 +570,7 @@ public Resource getIncrementResourceCapability() {
}
private FSSchedulerNode getFSSchedulerNode(NodeId nodeId) {
return nodes.get(nodeId);
return nodeTracker.getNode(nodeId);
}
public double getNodeLocalityThreshold() {
@ -882,18 +875,11 @@ protected synchronized void completedContainerInternal(
private synchronized void addNode(List<NMContainerStatus> containerReports,
RMNode node) {
FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
nodes.put(node.getNodeID(), schedulerNode);
String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
if (nodesPerRack.containsKey(rackName)) {
nodesPerRack.put(rackName, nodesPerRack.get(rackName) + 1);
} else {
nodesPerRack.put(rackName, 1);
}
Resources.addTo(clusterResource, schedulerNode.getTotalResource());
updateMaximumAllocation(schedulerNode, true);
nodeTracker.addNode(schedulerNode);
triggerUpdate();
Resource clusterResource = getClusterResource();
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares();
LOG.info("Added node " + node.getNodeAddress() +
@ -904,15 +890,12 @@ private synchronized void addNode(List<NMContainerStatus> containerReports,
}
private synchronized void removeNode(RMNode rmNode) {
FSSchedulerNode node = getFSSchedulerNode(rmNode.getNodeID());
// This can occur when an UNHEALTHY node reconnects
NodeId nodeId = rmNode.getNodeID();
FSSchedulerNode node = nodeTracker.getNode(nodeId);
if (node == null) {
LOG.error("Attempting to remove non-existent node " + nodeId);
return;
}
Resources.subtractFrom(clusterResource, node.getTotalResource());
updateRootQueueMetrics();
triggerUpdate();
// Remove running containers
List<RMContainer> runningContainers = node.getRunningContainers();
@ -934,18 +917,13 @@ private synchronized void removeNode(RMNode rmNode) {
RMContainerEventType.KILL);
}
nodes.remove(rmNode.getNodeID());
String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
if (nodesPerRack.containsKey(rackName)
&& (nodesPerRack.get(rackName) > 0)) {
nodesPerRack.put(rackName, nodesPerRack.get(rackName) - 1);
} else {
LOG.error("Node [" + rmNode.getNodeAddress() + "] being removed from" +
" unknown rack [" + rackName + "] !!");
}
nodeTracker.removeNode(nodeId);
Resource clusterResource = getClusterResource();
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares();
updateMaximumAllocation(node, false);
updateRootQueueMetrics();
triggerUpdate();
LOG.info("Removed node " + rmNode.getNodeAddress() +
" cluster capacity: " + clusterResource);
}
@ -967,7 +945,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
// Sanity check
SchedulerUtils.normalizeRequests(ask, DOMINANT_RESOURCE_CALCULATOR,
clusterResource, minimumAllocation, getMaximumResourceCapability(),
getClusterResource(), minimumAllocation, getMaximumResourceCapability(),
incrAllocation);
// Record container allocation start time
@ -1034,7 +1012,8 @@ clusterResource, minimumAllocation, getMaximumResourceCapability(),
private synchronized void nodeUpdate(RMNode nm) {
long start = getClock().getTime();
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
LOG.debug("nodeUpdate: " + nm +
" cluster capacity: " + getClusterResource());
}
eventLog.log("HEARTBEAT", nm.getHostName());
FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
@ -1091,20 +1070,13 @@ private synchronized void nodeUpdate(RMNode nm) {
void continuousSchedulingAttempt() throws InterruptedException {
long start = getClock().getTime();
List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
// Sort the nodes by space available on them, so that we offer
// containers on emptier nodes first, facilitating an even spread. This
// requires holding the scheduler lock, so that the space available on a
// node doesn't change during the sort.
synchronized (this) {
Collections.sort(nodeIdList, nodeAvailableResourceComparator);
}
List<FSSchedulerNode> nodeIdList =
nodeTracker.sortedNodeList(nodeAvailableResourceComparator);
// iterate all nodes
for (NodeId nodeId : nodeIdList) {
FSSchedulerNode node = getFSSchedulerNode(nodeId);
for (FSSchedulerNode node : nodeIdList) {
try {
if (node != null && Resources.fitsIn(minimumAllocation,
if (Resources.fitsIn(minimumAllocation,
node.getUnallocatedResource())) {
attemptScheduling(node);
}
@ -1126,19 +1098,14 @@ void continuousSchedulingAttempt() throws InterruptedException {
}
/** Sort nodes by available resource */
private class NodeAvailableResourceComparator implements Comparator<NodeId> {
private class NodeAvailableResourceComparator
implements Comparator<FSSchedulerNode> {
@Override
public int compare(NodeId n1, NodeId n2) {
if (!nodes.containsKey(n1)) {
return 1;
}
if (!nodes.containsKey(n2)) {
return -1;
}
return RESOURCE_CALCULATOR.compare(clusterResource,
nodes.get(n2).getUnallocatedResource(),
nodes.get(n1).getUnallocatedResource());
public int compare(FSSchedulerNode n1, FSSchedulerNode n2) {
return RESOURCE_CALCULATOR.compare(getClusterResource(),
n2.getUnallocatedResource(),
n1.getUnallocatedResource());
}
}
@ -1150,7 +1117,7 @@ synchronized void attemptScheduling(FSSchedulerNode node) {
}
final NodeId nodeID = node.getNodeID();
if (!nodes.containsKey(nodeID)) {
if (!nodeTracker.exists(nodeID)) {
// The node might have just been removed while this thread was waiting
// on the synchronized lock before it entered this synchronized method
LOG.info("Skipping scheduling as the node " + nodeID +
@ -1203,7 +1170,7 @@ public ResourceCalculator getResourceCalculator() {
private void updateRootQueueMetrics() {
rootMetrics.setAvailableResourcesToQueue(
Resources.subtract(
clusterResource, rootMetrics.getAllocatedResources()));
getClusterResource(), rootMetrics.getAllocatedResources()));
}
/**
@ -1214,6 +1181,7 @@ private void updateRootQueueMetrics() {
*/
private boolean shouldAttemptPreemption() {
if (preemptionEnabled) {
Resource clusterResource = getClusterResource();
return (preemptionUtilizationThreshold < Math.max(
(float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(),
(float) rootMetrics.getAllocatedVirtualCores() /
@ -1547,7 +1515,7 @@ public List<QueueUserACLInfo> getQueueUserAclInfo() {
@Override
public int getNumClusterNodes() {
return nodes.size();
return nodeTracker.nodeCount();
}
@Override
@ -1577,7 +1545,7 @@ public void onReload(AllocationConfiguration queueInfo) {
// if it does not already exist, so it can be displayed on the web UI.
synchronized (FairScheduler.this) {
allocConf = queueInfo;
allocConf.getDefaultSchedulingPolicy().initialize(clusterResource);
allocConf.getDefaultSchedulingPolicy().initialize(getClusterResource());
queueMgr.updateAllocationConfiguration(allocConf);
maxRunningEnforcer.updateRunnabilityOnReload();
}
@ -1721,7 +1689,7 @@ public synchronized void updateNodeResource(RMNode nm,
ResourceOption resourceOption) {
super.updateNodeResource(nm, resourceOption);
updateRootQueueMetrics();
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().setSteadyFairShare(getClusterResource());
queueMgr.getRootQueue().recomputeSteadyShares();
}

View File

@ -142,6 +142,7 @@ public QueueInfo getQueueInfo(
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName());
queueInfo.setCapacity(1.0f);
Resource clusterResource = getClusterResource();
if (clusterResource.getMemory() == 0) {
queueInfo.setCurrentCapacity(0.0f);
} else {
@ -297,7 +298,7 @@ public synchronized Configuration getConf() {
@Override
public int getNumClusterNodes() {
return nodes.size();
return nodeTracker.nodeCount();
}
@Override
@ -327,7 +328,8 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId,
// Sanity check
SchedulerUtils.normalizeRequests(ask, resourceCalculator,
clusterResource, minimumAllocation, getMaximumResourceCapability());
getClusterResource(), minimumAllocation,
getMaximumResourceCapability());
// Release containers
releaseContainers(release, application);
@ -377,7 +379,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId,
}
private FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
return nodeTracker.getNode(nodeId);
}
@VisibleForTesting
@ -526,7 +528,7 @@ private void assignContainers(FiCaSchedulerNode node) {
application.showRequests();
// Done
if (Resources.lessThan(resourceCalculator, clusterResource,
if (Resources.lessThan(resourceCalculator, getClusterResource(),
node.getUnallocatedResource(), minimumAllocation)) {
break;
}
@ -764,7 +766,7 @@ private synchronized void nodeUpdate(RMNode rmNode) {
return;
}
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
node.getUnallocatedResource(), minimumAllocation)) {
LOG.debug("Node heartbeat " + rmNode.getNodeID() +
" available resource = " + node.getUnallocatedResource());
@ -783,13 +785,13 @@ private void increaseUsedResources(RMContainer rmContainer) {
}
private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) {
schedulerAttempt.setHeadroom(Resources.subtract(clusterResource,
schedulerAttempt.setHeadroom(Resources.subtract(getClusterResource(),
usedResource));
}
private void updateAvailableResourcesMetrics() {
metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource,
usedResource));
metrics.setAvailableResourcesToQueue(
Resources.subtract(getClusterResource(), usedResource));
}
@Override
@ -925,7 +927,7 @@ protected synchronized void completedContainerInternal(
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
private synchronized void removeNode(RMNode nodeInfo) {
FiCaSchedulerNode node = getNode(nodeInfo.getNodeID());
FiCaSchedulerNode node = nodeTracker.getNode(nodeInfo.getNodeID());
if (node == null) {
return;
}
@ -937,13 +939,7 @@ private synchronized void removeNode(RMNode nodeInfo) {
SchedulerUtils.LOST_CONTAINER),
RMContainerEventType.KILL);
}
//Remove the node
this.nodes.remove(nodeInfo.getNodeID());
updateMaximumAllocation(node, false);
// Update cluster metrics
Resources.subtractFrom(clusterResource, node.getTotalResource());
nodeTracker.removeNode(nodeInfo.getNodeID());
}
@Override
@ -965,9 +961,7 @@ public ResourceCalculator getResourceCalculator() {
private synchronized void addNode(RMNode nodeManager) {
FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
usePortForNodeName);
this.nodes.put(nodeManager.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, schedulerNode.getTotalResource());
updateMaximumAllocation(schedulerNode, true);
nodeTracker.addNode(schedulerNode);
}
@Override

View File

@ -300,22 +300,16 @@ public void testUpdateMaxAllocationUsesTotal() throws IOException {
verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
scheduler.nodes = new HashMap<NodeId, SchedulerNode>();
scheduler.nodes.put(mockNode1.getNodeID(), mockNode1);
scheduler.updateMaximumAllocation(mockNode1, true);
scheduler.nodeTracker.addNode(mockNode1);
verifyMaximumResourceCapability(fullResource1, scheduler);
scheduler.nodes.put(mockNode2.getNodeID(), mockNode2);
scheduler.updateMaximumAllocation(mockNode2, true);
scheduler.nodeTracker.addNode(mockNode2);
verifyMaximumResourceCapability(fullResource2, scheduler);
scheduler.nodes.remove(mockNode2.getNodeID());
scheduler.updateMaximumAllocation(mockNode2, false);
scheduler.nodeTracker.removeNode(mockNode2.getNodeID());
verifyMaximumResourceCapability(fullResource1, scheduler);
scheduler.nodes.remove(mockNode1.getNodeID());
scheduler.updateMaximumAllocation(mockNode1, false);
scheduler.nodeTracker.removeNode(mockNode1.getNodeID());
verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
} finally {
rm.stop();

View File

@ -183,6 +183,7 @@ static LeafQueue stubLeafQueue(LeafQueue queue) {
}
@Test
@SuppressWarnings("unchecked")
public void testReservation() throws Exception {
// Test that we now unreserve and use a node that has space
@ -231,9 +232,9 @@ public void testReservation() throws Exception {
when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
cs.getAllNodes().put(node_0.getNodeID(), node_0);
cs.getAllNodes().put(node_1.getNodeID(), node_1);
cs.getAllNodes().put(node_2.getNodeID(), node_2);
cs.getNodeTracker().addNode(node_0);
cs.getNodeTracker().addNode(node_1);
cs.getNodeTracker().addNode(node_2);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
@ -346,6 +347,7 @@ public void testReservation() throws Exception {
// Test that hitting a reservation limit and needing to unreserve
// does not affect assigning containers for other users
@Test
@SuppressWarnings("unchecked")
public void testReservationLimitOtherUsers() throws Exception {
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
setup(csConf, true);
@ -395,9 +397,9 @@ public void testReservationLimitOtherUsers() throws Exception {
when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
cs.getAllNodes().put(node_0.getNodeID(), node_0);
cs.getAllNodes().put(node_1.getNodeID(), node_1);
cs.getAllNodes().put(node_2.getNodeID(), node_2);
cs.getNodeTracker().addNode(node_0);
cs.getNodeTracker().addNode(node_1);
cs.getNodeTracker().addNode(node_2);
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
@ -641,6 +643,7 @@ public void testReservationNoContinueLook() throws Exception {
}
@Test
@SuppressWarnings("unchecked")
public void testAssignContainersNeedToUnreserve() throws Exception {
// Test that we now unreserve and use a node that has space
Logger rootLogger = LogManager.getRootLogger();
@ -684,8 +687,8 @@ public void testAssignContainersNeedToUnreserve() throws Exception {
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
8 * GB);
cs.getAllNodes().put(node_0.getNodeID(), node_0);
cs.getAllNodes().put(node_1.getNodeID(), node_1);
cs.getNodeTracker().addNode(node_0);
cs.getNodeTracker().addNode(node_1);
when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);

View File

@ -75,12 +75,10 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -108,7 +106,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -2751,8 +2748,8 @@ public void testReservationsStrictLocality() throws IOException {
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent2);
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
"user1", 0);

View File

@ -306,12 +306,7 @@ public void testUpdateResourceOnNode() throws Exception {
nmTokenSecretManager.rollMasterKey();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
FifoScheduler scheduler = new FifoScheduler(){
@SuppressWarnings("unused")
public Map<NodeId, FiCaSchedulerNode> getNodes(){
return nodes;
}
};
FifoScheduler scheduler = new FifoScheduler();
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
null, containerTokenSecretManager, nmTokenSecretManager, null, scheduler);
rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
@ -331,11 +326,7 @@ public Map<NodeId, FiCaSchedulerNode> getNodes(){
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0);
scheduler.handle(nodeEvent1);
Method method = scheduler.getClass().getDeclaredMethod("getNodes");
@SuppressWarnings("unchecked")
Map<NodeId, FiCaSchedulerNode> schedulerNodes =
(Map<NodeId, FiCaSchedulerNode>) method.invoke(scheduler);
assertEquals(schedulerNodes.values().size(), 1);
assertEquals(scheduler.getNumClusterNodes(), 1);
Resource newResource = Resources.createResource(1024, 4);
@ -345,9 +336,9 @@ public Map<NodeId, FiCaSchedulerNode> getNodes(){
scheduler.handle(node0ResourceUpdate);
// SchedulerNode's total resource and available resource are changed.
assertEquals(schedulerNodes.get(node0.getNodeID()).getTotalResource()
.getMemory(), 1024);
assertEquals(schedulerNodes.get(node0.getNodeID()).
assertEquals(1024, scheduler.getNodeTracker().getNode(node0.getNodeID())
.getTotalResource().getMemory());
assertEquals(1024, scheduler.getNodeTracker().getNode(node0.getNodeID()).
getUnallocatedResource().getMemory(), 1024);
QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false);
Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity(), 0.0f);