YARN-7541. Node updates don't update the maximum cluster capability for resources other than CPU and memory

(cherry picked from commit 8498d287cd)
(cherry picked from commit 0e73efb1ad68398ce5f7cde51466cea8c2153659)
This commit is contained in:
Daniel Templeton 2017-11-29 10:36:19 -08:00
parent 60b35eccfd
commit 9de795f12a
5 changed files with 230 additions and 30 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.api.records;
import java.util.Arrays;
import java.util.Map;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.classification.InterfaceAudience;
@ -84,6 +85,26 @@ public abstract class Resource implements Comparable<Resource> {
return new LightWeightResource(memory, vCores);
}
/**
* Create a new {@link Resource} instance with the given CPU and memory
* values and additional resource values as set in the {@code others}
* parameter. Note that the CPU and memory settings in the {@code others}
* parameter will be ignored.
*
* @param memory the memory value
* @param vCores the CPU value
* @param others a map of other resource values indexed by resource name
* @return a {@link Resource} instance with the given resource values
*/
@Public
@Stable
public static Resource newInstance(long memory, int vCores,
Map<String, Long> others) {
ResourceInformation[] info = ResourceUtils.createResourceTypesArray(others);
return new LightWeightResource(memory, vCores, info);
}
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static Resource newInstance(Resource resource) {

View File

@ -641,7 +641,6 @@ public class ResourceUtils {
return result;
}
/**
* Reinitialize all resource types from external source (in case of client,
* server will send the updated list and local resourceutils cache will be
@ -662,4 +661,34 @@ public class ResourceUtils {
ResourceUtils
.initializeResourcesFromResourceInformationMap(resourceInformationMap);
}
/**
* Create an array of {@link ResourceInformation} objects corresponding to
* the passed in map of names to values. The array will be ordered according
* to the order returned by {@link #getResourceTypesArray()}. The value of
* each resource type in the returned array will either be the value given for
* that resource in the {@code res} parameter or, if none is given, 0.
*
* @param res the map of resource type values
* @return an array of {@link ResourceInformation} instances
*/
public static ResourceInformation[] createResourceTypesArray(Map<String,
Long> res) {
ResourceInformation[] info = new ResourceInformation[resourceTypes.size()];
for (Entry<String, Integer> entry : RESOURCE_NAME_TO_INDEX.entrySet()) {
int index = entry.getValue();
Long value = res.get(entry.getKey());
if (value == null) {
value = 0L;
}
info[index] = new ResourceInformation();
ResourceInformation.copy(resourceTypesArray[index], info[index]);
info[index].setValue(value);
}
return info;
}
}

View File

@ -24,11 +24,14 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@ -60,11 +63,16 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
Resources.clone(Resources.none());
// Max allocation
private long maxNodeMemory = -1;
private int maxNodeVCores = -1;
private final long[] maxAllocation;
private Resource configuredMaxAllocation;
private boolean forceConfiguredMaxAllocation = true;
private long configuredMaxAllocationWaitTime;
private boolean reportedMaxAllocation = false;
public ClusterNodeTracker() {
maxAllocation = new long[ResourceUtils.getNumberOfKnownResourceTypes()];
Arrays.fill(maxAllocation, -1);
}
public void addNode(N node) {
writeLock.lock();
@ -208,17 +216,18 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
forceConfiguredMaxAllocation = false;
}
if (forceConfiguredMaxAllocation
|| maxNodeMemory == -1 || maxNodeVCores == -1) {
if (forceConfiguredMaxAllocation || !reportedMaxAllocation) {
return configuredMaxAllocation;
}
Resource ret = Resources.clone(configuredMaxAllocation);
if (ret.getMemorySize() > maxNodeMemory) {
ret.setMemorySize(maxNodeMemory);
for (int i = 0; i < maxAllocation.length; i++) {
ResourceInformation info = ret.getResourceInformation(i);
if (info.getValue() > maxAllocation[i]) {
info.setValue(maxAllocation[i]);
}
if (ret.getVirtualCores() > maxNodeVCores) {
ret.setVirtualCores(maxNodeVCores);
}
return ret;
@ -229,31 +238,51 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
private void updateMaxResources(SchedulerNode node, boolean add) {
Resource totalResource = node.getTotalResource();
ResourceInformation[] totalResources;
if (totalResource != null) {
totalResources = totalResource.getResources();
} else {
LOG.warn(node.getNodeName() + " reported in with null resources, which "
+ "indicates a problem in the source code. Please file an issue at "
+ "https://issues.apache.org/jira/secure/CreateIssue!default.jspa");
return;
}
writeLock.lock();
try {
if (add) { // added node
long nodeMemory = totalResource.getMemorySize();
if (nodeMemory > maxNodeMemory) {
maxNodeMemory = nodeMemory;
// If we add a node, we must have a max allocation for all resource
// types
reportedMaxAllocation = true;
for (int i = 0; i < maxAllocation.length; i++) {
long value = totalResources[i].getValue();
if (value > maxAllocation[i]) {
maxAllocation[i] = value;
}
int nodeVCores = totalResource.getVirtualCores();
if (nodeVCores > maxNodeVCores) {
maxNodeVCores = nodeVCores;
}
} else { // removed node
if (maxNodeMemory == totalResource.getMemorySize()) {
maxNodeMemory = -1;
boolean recalculate = false;
for (int i = 0; i < maxAllocation.length; i++) {
if (totalResources[i].getValue() == maxAllocation[i]) {
// No need to set reportedMaxAllocation to false here because we
// will recalculate before we release the lock.
maxAllocation[i] = -1;
recalculate = true;
}
if (maxNodeVCores == totalResource.getVirtualCores()) {
maxNodeVCores = -1;
}
// We only have to iterate through the nodes if the current max memory
// or vcores was equal to the removed node's
if (maxNodeMemory == -1 || maxNodeVCores == -1) {
if (recalculate) {
// Treat it like an empty cluster and add nodes
for (N n : nodes.values()) {
updateMaxResources(n, true);
}
reportedMaxAllocation = false;
nodes.values().forEach(n -> updateMaxResources(n, true));
}
}
} finally {

View File

@ -49,6 +49,10 @@ public class MockNodes {
private static int NODE_ID = 0;
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
public static void resetHostIds() {
NODE_ID = 0;
}
public static List<RMNode> newNodes(int racks, int nodesPerRack,
Resource perNode) {
List<RMNode> list = Lists.newArrayList();

View File

@ -17,16 +17,21 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import static org.junit.Assert.assertEquals;
/**
@ -34,11 +39,15 @@ import static org.junit.Assert.assertEquals;
* loss of generality.
*/
public class TestClusterNodeTracker {
private ClusterNodeTracker<FSSchedulerNode> nodeTracker =
new ClusterNodeTracker<>();
private ClusterNodeTracker<FSSchedulerNode> nodeTracker;
@Before
public void setup() {
nodeTracker = new ClusterNodeTracker<>();
}
private void addEight4x4Nodes() {
MockNodes.resetHostIds();
List<RMNode> rmNodes =
MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4));
for (RMNode rmNode : rmNodes) {
@ -48,6 +57,7 @@ public class TestClusterNodeTracker {
@Test
public void testGetNodeCount() {
addEight4x4Nodes();
assertEquals("Incorrect number of nodes in the cluster",
8, nodeTracker.nodeCount());
@ -57,6 +67,7 @@ public class TestClusterNodeTracker {
@Test
public void testGetNodesForResourceName() throws Exception {
addEight4x4Nodes();
assertEquals("Incorrect number of nodes matching ANY",
8, nodeTracker.getNodesByResourceName(ResourceRequest.ANY).size());
@ -66,4 +77,110 @@ public class TestClusterNodeTracker {
assertEquals("Incorrect number of nodes matching node",
1, nodeTracker.getNodesByResourceName("host0").size());
}
@Test
public void testMaxAllowedAllocation() {
// Add a third resource
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RESOURCE_TYPES, "test1");
ResourceUtils.resetResourceTypes(conf);
setup();
Resource maximum = Resource.newInstance(10240, 10,
Collections.singletonMap("test1", 10L));
nodeTracker.setConfiguredMaxAllocation(maximum);
Resource result = nodeTracker.getMaxAllowedAllocation();
assertEquals("With no nodes added, the ClusterNodeTracker did not return "
+ "the configured max allocation", maximum, result);
List<RMNode> smallNodes =
MockNodes.newNodes(1, 1, Resource.newInstance(1024, 2,
Collections.singletonMap("test1", 4L)));
FSSchedulerNode smallNode = new FSSchedulerNode(smallNodes.get(0), false);
List<RMNode> mediumNodes =
MockNodes.newNodes(1, 1, Resource.newInstance(4096, 2,
Collections.singletonMap("test1", 2L)));
FSSchedulerNode mediumNode = new FSSchedulerNode(mediumNodes.get(0), false);
List<RMNode> largeNodes =
MockNodes.newNodes(1, 1, Resource.newInstance(16384, 4,
Collections.singletonMap("test1", 1L)));
FSSchedulerNode largeNode = new FSSchedulerNode(largeNodes.get(0), false);
nodeTracker.addNode(mediumNode);
result = nodeTracker.getMaxAllowedAllocation();
assertEquals("With a single node added, the ClusterNodeTracker did not "
+ "return that node's resources as the maximum allocation",
mediumNodes.get(0).getTotalCapability(), result);
nodeTracker.addNode(smallNode);
result = nodeTracker.getMaxAllowedAllocation();
assertEquals("With two nodes added, the ClusterNodeTracker did not "
+ "return a the maximum allocation that was the max of their aggregate "
+ "resources",
Resource.newInstance(4096, 2, Collections.singletonMap("test1", 4L)),
result);
nodeTracker.removeNode(smallNode.getNodeID());
result = nodeTracker.getMaxAllowedAllocation();
assertEquals("After removing a node, the ClusterNodeTracker did not "
+ "recalculate the adjusted maximum allocation correctly",
mediumNodes.get(0).getTotalCapability(), result);
nodeTracker.addNode(largeNode);
result = nodeTracker.getMaxAllowedAllocation();
assertEquals("With two nodes added, the ClusterNodeTracker did not "
+ "return a the maximum allocation that was the max of their aggregate "
+ "resources",
Resource.newInstance(10240, 4, Collections.singletonMap("test1", 2L)),
result);
nodeTracker.removeNode(largeNode.getNodeID());
result = nodeTracker.getMaxAllowedAllocation();
assertEquals("After removing a node, the ClusterNodeTracker did not "
+ "recalculate the adjusted maximum allocation correctly",
mediumNodes.get(0).getTotalCapability(), result);
nodeTracker.removeNode(mediumNode.getNodeID());
result = nodeTracker.getMaxAllowedAllocation();
assertEquals("After removing all nodes, the ClusterNodeTracker did not "
+ "return the configured maximum allocation", maximum, result);
nodeTracker.addNode(smallNode);
nodeTracker.addNode(mediumNode);
nodeTracker.addNode(largeNode);
result = nodeTracker.getMaxAllowedAllocation();
assertEquals("With three nodes added, the ClusterNodeTracker did not "
+ "return a the maximum allocation that was the max of their aggregate "
+ "resources",
Resource.newInstance(10240, 4, Collections.singletonMap("test1", 4L)),
result);
nodeTracker.removeNode(smallNode.getNodeID());
nodeTracker.removeNode(mediumNode.getNodeID());
nodeTracker.removeNode(largeNode.getNodeID());
result = nodeTracker.getMaxAllowedAllocation();
assertEquals("After removing all nodes, the ClusterNodeTracker did not "
+ "return the configured maximum allocation", maximum, result);
}
}