diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java index e863d6877ce..5f1455ff023 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.api.records; import java.util.Arrays; +import java.util.Map; import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.classification.InterfaceAudience; @@ -84,6 +85,26 @@ public abstract class Resource implements Comparable { return new LightWeightResource(memory, vCores); } + /** + * Create a new {@link Resource} instance with the given CPU and memory + * values and additional resource values as set in the {@code others} + * parameter. Note that the CPU and memory settings in the {@code others} + * parameter will be ignored. + * + * @param memory the memory value + * @param vCores the CPU value + * @param others a map of other resource values indexed by resource name + * @return a {@link Resource} instance with the given resource values + */ + @Public + @Stable + public static Resource newInstance(long memory, int vCores, + Map others) { + ResourceInformation[] info = ResourceUtils.createResourceTypesArray(others); + + return new LightWeightResource(memory, vCores, info); + } + @InterfaceAudience.Private @InterfaceStability.Unstable public static Resource newInstance(Resource resource) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java index 540cd9e035e..04c2412daff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java @@ -641,7 +641,6 @@ public class ResourceUtils { return result; } - /** * Reinitialize all resource types from external source (in case of client, * server will send the updated list and local resourceutils cache will be @@ -662,4 +661,34 @@ public class ResourceUtils { ResourceUtils .initializeResourcesFromResourceInformationMap(resourceInformationMap); } + + /** + * Create an array of {@link ResourceInformation} objects corresponding to + * the passed in map of names to values. The array will be ordered according + * to the order returned by {@link #getResourceTypesArray()}. The value of + * each resource type in the returned array will either be the value given for + * that resource in the {@code res} parameter or, if none is given, 0. + * + * @param res the map of resource type values + * @return an array of {@link ResourceInformation} instances + */ + public static ResourceInformation[] createResourceTypesArray(Map res) { + ResourceInformation[] info = new ResourceInformation[resourceTypes.size()]; + + for (Entry entry : RESOURCE_NAME_TO_INDEX.entrySet()) { + int index = entry.getValue(); + Long value = res.get(entry.getKey()); + + if (value == null) { + value = 0L; + } + + info[index] = new ResourceInformation(); + ResourceInformation.copy(resourceTypesArray[index], info[index]); + info[index].setValue(value); + } + + return info; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java index 9e54ac6298a..66d88108932 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -24,11 +24,14 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -60,11 +63,16 @@ public class ClusterNodeTracker { Resources.clone(Resources.none()); // Max allocation - private long maxNodeMemory = -1; - private int maxNodeVCores = -1; + private final long[] maxAllocation; private Resource configuredMaxAllocation; private boolean forceConfiguredMaxAllocation = true; private long configuredMaxAllocationWaitTime; + private boolean reportedMaxAllocation = false; + + public ClusterNodeTracker() { + maxAllocation = new long[ResourceUtils.getNumberOfKnownResourceTypes()]; + Arrays.fill(maxAllocation, -1); + } public void addNode(N node) { writeLock.lock(); @@ -208,17 +216,18 @@ public class ClusterNodeTracker { forceConfiguredMaxAllocation = false; } - if (forceConfiguredMaxAllocation - || maxNodeMemory == -1 || maxNodeVCores == -1) { + if (forceConfiguredMaxAllocation || !reportedMaxAllocation) { return configuredMaxAllocation; } Resource ret = Resources.clone(configuredMaxAllocation); - if (ret.getMemorySize() > maxNodeMemory) { - ret.setMemorySize(maxNodeMemory); - } - if (ret.getVirtualCores() > maxNodeVCores) { - ret.setVirtualCores(maxNodeVCores); + + for (int i = 0; i < maxAllocation.length; i++) { + ResourceInformation info = ret.getResourceInformation(i); + + if (info.getValue() > maxAllocation[i]) { + info.setValue(maxAllocation[i]); + } } return ret; @@ -229,31 +238,51 @@ public class ClusterNodeTracker { private void updateMaxResources(SchedulerNode node, boolean add) { Resource totalResource = node.getTotalResource(); + ResourceInformation[] totalResources; + + if (totalResource != null) { + totalResources = totalResource.getResources(); + } else { + LOG.warn(node.getNodeName() + " reported in with null resources, which " + + "indicates a problem in the source code. Please file an issue at " + + "https://issues.apache.org/jira/secure/CreateIssue!default.jspa"); + + return; + } + writeLock.lock(); + try { if (add) { // added node - long nodeMemory = totalResource.getMemorySize(); - if (nodeMemory > maxNodeMemory) { - maxNodeMemory = nodeMemory; - } - int nodeVCores = totalResource.getVirtualCores(); - if (nodeVCores > maxNodeVCores) { - maxNodeVCores = nodeVCores; + // If we add a node, we must have a max allocation for all resource + // types + reportedMaxAllocation = true; + + for (int i = 0; i < maxAllocation.length; i++) { + long value = totalResources[i].getValue(); + + if (value > maxAllocation[i]) { + maxAllocation[i] = value; + } } } else { // removed node - if (maxNodeMemory == totalResource.getMemorySize()) { - maxNodeMemory = -1; - } - if (maxNodeVCores == totalResource.getVirtualCores()) { - maxNodeVCores = -1; + boolean recalculate = false; + + for (int i = 0; i < maxAllocation.length; i++) { + if (totalResources[i].getValue() == maxAllocation[i]) { + // No need to set reportedMaxAllocation to false here because we + // will recalculate before we release the lock. + maxAllocation[i] = -1; + recalculate = true; + } } + // We only have to iterate through the nodes if the current max memory // or vcores was equal to the removed node's - if (maxNodeMemory == -1 || maxNodeVCores == -1) { + if (recalculate) { // Treat it like an empty cluster and add nodes - for (N n : nodes.values()) { - updateMaxResources(n, true); - } + reportedMaxAllocation = false; + nodes.values().forEach(n -> updateMaxResources(n, true)); } } } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 611c7f24643..317c6489f32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -49,6 +49,10 @@ public class MockNodes { private static int NODE_ID = 0; private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + public static void resetHostIds() { + NODE_ID = 0; + } + public static List newNodes(int racks, int nodesPerRack, Resource perNode) { List list = Lists.newArrayList(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java index 7f527f1b432..c1703bc52e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java @@ -17,16 +17,21 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; + import org.junit.Before; import org.junit.Test; -import java.util.List; - import static org.junit.Assert.assertEquals; /** @@ -34,11 +39,15 @@ import static org.junit.Assert.assertEquals; * loss of generality. */ public class TestClusterNodeTracker { - private ClusterNodeTracker nodeTracker = - new ClusterNodeTracker<>(); + private ClusterNodeTracker nodeTracker; @Before public void setup() { + nodeTracker = new ClusterNodeTracker<>(); + } + + private void addEight4x4Nodes() { + MockNodes.resetHostIds(); List rmNodes = MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4)); for (RMNode rmNode : rmNodes) { @@ -48,6 +57,7 @@ public class TestClusterNodeTracker { @Test public void testGetNodeCount() { + addEight4x4Nodes(); assertEquals("Incorrect number of nodes in the cluster", 8, nodeTracker.nodeCount()); @@ -57,6 +67,7 @@ public class TestClusterNodeTracker { @Test public void testGetNodesForResourceName() throws Exception { + addEight4x4Nodes(); assertEquals("Incorrect number of nodes matching ANY", 8, nodeTracker.getNodesByResourceName(ResourceRequest.ANY).size()); @@ -66,4 +77,110 @@ public class TestClusterNodeTracker { assertEquals("Incorrect number of nodes matching node", 1, nodeTracker.getNodesByResourceName("host0").size()); } + + @Test + public void testMaxAllowedAllocation() { + // Add a third resource + Configuration conf = new Configuration(); + + conf.set(YarnConfiguration.RESOURCE_TYPES, "test1"); + + ResourceUtils.resetResourceTypes(conf); + setup(); + + Resource maximum = Resource.newInstance(10240, 10, + Collections.singletonMap("test1", 10L)); + + nodeTracker.setConfiguredMaxAllocation(maximum); + + Resource result = nodeTracker.getMaxAllowedAllocation(); + + assertEquals("With no nodes added, the ClusterNodeTracker did not return " + + "the configured max allocation", maximum, result); + + List smallNodes = + MockNodes.newNodes(1, 1, Resource.newInstance(1024, 2, + Collections.singletonMap("test1", 4L))); + FSSchedulerNode smallNode = new FSSchedulerNode(smallNodes.get(0), false); + List mediumNodes = + MockNodes.newNodes(1, 1, Resource.newInstance(4096, 2, + Collections.singletonMap("test1", 2L))); + FSSchedulerNode mediumNode = new FSSchedulerNode(mediumNodes.get(0), false); + List largeNodes = + MockNodes.newNodes(1, 1, Resource.newInstance(16384, 4, + Collections.singletonMap("test1", 1L))); + FSSchedulerNode largeNode = new FSSchedulerNode(largeNodes.get(0), false); + + nodeTracker.addNode(mediumNode); + + result = nodeTracker.getMaxAllowedAllocation(); + + assertEquals("With a single node added, the ClusterNodeTracker did not " + + "return that node's resources as the maximum allocation", + mediumNodes.get(0).getTotalCapability(), result); + + nodeTracker.addNode(smallNode); + + result = nodeTracker.getMaxAllowedAllocation(); + + assertEquals("With two nodes added, the ClusterNodeTracker did not " + + "return a the maximum allocation that was the max of their aggregate " + + "resources", + Resource.newInstance(4096, 2, Collections.singletonMap("test1", 4L)), + result); + + nodeTracker.removeNode(smallNode.getNodeID()); + + result = nodeTracker.getMaxAllowedAllocation(); + + assertEquals("After removing a node, the ClusterNodeTracker did not " + + "recalculate the adjusted maximum allocation correctly", + mediumNodes.get(0).getTotalCapability(), result); + + nodeTracker.addNode(largeNode); + + result = nodeTracker.getMaxAllowedAllocation(); + + assertEquals("With two nodes added, the ClusterNodeTracker did not " + + "return a the maximum allocation that was the max of their aggregate " + + "resources", + Resource.newInstance(10240, 4, Collections.singletonMap("test1", 2L)), + result); + + nodeTracker.removeNode(largeNode.getNodeID()); + + result = nodeTracker.getMaxAllowedAllocation(); + + assertEquals("After removing a node, the ClusterNodeTracker did not " + + "recalculate the adjusted maximum allocation correctly", + mediumNodes.get(0).getTotalCapability(), result); + + nodeTracker.removeNode(mediumNode.getNodeID()); + + result = nodeTracker.getMaxAllowedAllocation(); + + assertEquals("After removing all nodes, the ClusterNodeTracker did not " + + "return the configured maximum allocation", maximum, result); + + nodeTracker.addNode(smallNode); + nodeTracker.addNode(mediumNode); + nodeTracker.addNode(largeNode); + + result = nodeTracker.getMaxAllowedAllocation(); + + assertEquals("With three nodes added, the ClusterNodeTracker did not " + + "return a the maximum allocation that was the max of their aggregate " + + "resources", + Resource.newInstance(10240, 4, Collections.singletonMap("test1", 4L)), + result); + + nodeTracker.removeNode(smallNode.getNodeID()); + nodeTracker.removeNode(mediumNode.getNodeID()); + nodeTracker.removeNode(largeNode.getNodeID()); + + result = nodeTracker.getMaxAllowedAllocation(); + + assertEquals("After removing all nodes, the ClusterNodeTracker did not " + + "return the configured maximum allocation", maximum, result); + } } \ No newline at end of file