Revert "YARN=5181. ClusterNodeTracker: add method to get list of nodes matching a specific resourceName. (kasha via asuresh)"
This reverts commit e905a42a2c
.
This commit is contained in:
parent
dc2f4b6ac8
commit
5f2d33a551
|
@ -18,13 +18,11 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
@ -52,8 +50,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
|
||||||
private Lock writeLock = readWriteLock.writeLock();
|
private Lock writeLock = readWriteLock.writeLock();
|
||||||
|
|
||||||
private HashMap<NodeId, N> nodes = new HashMap<>();
|
private HashMap<NodeId, N> nodes = new HashMap<>();
|
||||||
private Map<String, N> nodeNameToNodeMap = new HashMap<>();
|
private Map<String, Integer> nodesPerRack = new HashMap<>();
|
||||||
private Map<String, List<N>> nodesPerRack = new HashMap<>();
|
|
||||||
|
|
||||||
private Resource clusterCapacity = Resources.clone(Resources.none());
|
private Resource clusterCapacity = Resources.clone(Resources.none());
|
||||||
private Resource staleClusterCapacity = null;
|
private Resource staleClusterCapacity = null;
|
||||||
|
@ -69,16 +66,14 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
nodes.put(node.getNodeID(), node);
|
nodes.put(node.getNodeID(), node);
|
||||||
nodeNameToNodeMap.put(node.getNodeName(), node);
|
|
||||||
|
|
||||||
// Update nodes per rack as well
|
// Update nodes per rack as well
|
||||||
String rackName = node.getRackName();
|
String rackName = node.getRackName();
|
||||||
List<N> nodesList = nodesPerRack.get(rackName);
|
Integer numNodes = nodesPerRack.get(rackName);
|
||||||
if (nodesList == null) {
|
if (numNodes == null) {
|
||||||
nodesList = new ArrayList<>();
|
numNodes = 0;
|
||||||
nodesPerRack.put(rackName, nodesList);
|
|
||||||
}
|
}
|
||||||
nodesList.add(node);
|
nodesPerRack.put(rackName, ++numNodes);
|
||||||
|
|
||||||
// Update cluster capacity
|
// Update cluster capacity
|
||||||
Resources.addTo(clusterCapacity, node.getTotalResource());
|
Resources.addTo(clusterCapacity, node.getTotalResource());
|
||||||
|
@ -131,8 +126,8 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
String rName = rackName == null ? "NULL" : rackName;
|
String rName = rackName == null ? "NULL" : rackName;
|
||||||
try {
|
try {
|
||||||
List<N> nodesList = nodesPerRack.get(rName);
|
Integer nodeCount = nodesPerRack.get(rName);
|
||||||
return nodesList == null ? 0 : nodesList.size();
|
return nodeCount == null ? 0 : nodeCount;
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -159,18 +154,14 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
|
||||||
LOG.warn("Attempting to remove a non-existent node " + nodeId);
|
LOG.warn("Attempting to remove a non-existent node " + nodeId);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
nodeNameToNodeMap.remove(node.getNodeName());
|
|
||||||
|
|
||||||
// Update nodes per rack as well
|
// Update nodes per rack as well
|
||||||
String rackName = node.getRackName();
|
String rackName = node.getRackName();
|
||||||
List<N> nodesList = nodesPerRack.get(rackName);
|
Integer numNodes = nodesPerRack.get(rackName);
|
||||||
if (nodesList == null) {
|
if (numNodes > 0) {
|
||||||
LOG.error("Attempting to remove node from an empty rack " + rackName);
|
nodesPerRack.put(rackName, --numNodes);
|
||||||
} else {
|
} else {
|
||||||
nodesList.remove(node);
|
LOG.error("Attempting to remove node from an empty rack " + rackName);
|
||||||
if (nodesList.isEmpty()) {
|
|
||||||
nodesPerRack.remove(rackName);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update cluster capacity
|
// Update cluster capacity
|
||||||
|
@ -306,29 +297,4 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
|
||||||
Collections.sort(sortedList, comparator);
|
Collections.sort(sortedList, comparator);
|
||||||
return sortedList;
|
return sortedList;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Convenience method to return list of nodes corresponding to resourceName
|
|
||||||
* passed in the {@link ResourceRequest}.
|
|
||||||
*
|
|
||||||
* @param resourceName Host/rack name of the resource, or
|
|
||||||
* {@link ResourceRequest#ANY}
|
|
||||||
* @return list of nodes that match the resourceName
|
|
||||||
*/
|
|
||||||
public List<N> getNodesByResourceName(final String resourceName) {
|
|
||||||
Preconditions.checkArgument(
|
|
||||||
resourceName != null && !resourceName.isEmpty());
|
|
||||||
List<N> retNodes = new ArrayList<>();
|
|
||||||
if (ResourceRequest.ANY.equals(resourceName)) {
|
|
||||||
return getAllNodes();
|
|
||||||
} else if (nodeNameToNodeMap.containsKey(resourceName)) {
|
|
||||||
retNodes.add(nodeNameToNodeMap.get(resourceName));
|
|
||||||
} else if (nodesPerRack.containsKey(resourceName)) {
|
|
||||||
return nodesPerRack.get(resourceName);
|
|
||||||
} else {
|
|
||||||
LOG.info(
|
|
||||||
"Could not find a node matching given resourceName " + resourceName);
|
|
||||||
}
|
|
||||||
return retNodes;
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -1,69 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test class to verify ClusterNodeTracker. Using FSSchedulerNode without
|
|
||||||
* loss of generality.
|
|
||||||
*/
|
|
||||||
public class TestClusterNodeTracker {
|
|
||||||
private ClusterNodeTracker<FSSchedulerNode> nodeTracker =
|
|
||||||
new ClusterNodeTracker<>();
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setup() {
|
|
||||||
List<RMNode> rmNodes =
|
|
||||||
MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4));
|
|
||||||
for (RMNode rmNode : rmNodes) {
|
|
||||||
nodeTracker.addNode(new FSSchedulerNode(rmNode, false));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetNodeCount() {
|
|
||||||
assertEquals("Incorrect number of nodes in the cluster",
|
|
||||||
8, nodeTracker.nodeCount());
|
|
||||||
|
|
||||||
assertEquals("Incorrect number of nodes in each rack",
|
|
||||||
4, nodeTracker.nodeCount("rack0"));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetNodesForResourceName() throws Exception {
|
|
||||||
assertEquals("Incorrect number of nodes matching ANY",
|
|
||||||
8, nodeTracker.getNodesByResourceName(ResourceRequest.ANY).size());
|
|
||||||
|
|
||||||
assertEquals("Incorrect number of nodes matching rack",
|
|
||||||
4, nodeTracker.getNodesByResourceName("rack0").size());
|
|
||||||
|
|
||||||
assertEquals("Incorrect number of nodes matching node",
|
|
||||||
1, nodeTracker.getNodesByResourceName("host0").size());
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue