From 1ac967a6b77c262b23e10c6ca68538b7e4ed39b0 Mon Sep 17 00:00:00 2001 From: Giovanni Matteo Fumarola Date: Wed, 26 Jun 2019 14:01:31 -0700 Subject: [PATCH] YARN-6055. ContainersMonitorImpl need be adjusted when NM resource changed. Contributed by Inigo Goiri. --- .../nodemanager/NodeStatusUpdaterImpl.java | 5 + .../yarn/server/nodemanager/ResourceView.java | 8 ++ .../monitor/ContainersMonitor.java | 6 + .../monitor/ContainersMonitorImpl.java | 49 ++++++--- .../nodemanager/TestNodeStatusUpdater.java | 103 ++++++++++++++++++ 5 files changed, 156 insertions(+), 15 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 8022a07e17c..181094ea6c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -582,6 +582,11 @@ private ResourceUtilization getNodeUtilization() { private void updateNMResource(Resource resource) { metrics.addResource(Resources.subtract(resource, totalResource)); this.totalResource = resource; + + // Update the containers monitor + ContainersMonitor containersMonitor = + this.context.getContainerManager().getContainersMonitor(); + containersMonitor.setAllocatedResourcesForContainers(totalResource); } // Iterate through the NMContext and clone and get all the containers' diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java index 4fde7b926a2..02413c6ab9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java @@ -20,10 +20,18 @@ public interface ResourceView { + /** + * Get virtual memory allocated to the containers. + * @return Virtual memory in bytes. + */ long getVmemAllocatedForContainers(); boolean isVmemCheckEnabled(); + /** + * Get physical memory allocated to the containers. + * @return Physical memory in bytes. + */ long getPmemAllocatedForContainers(); boolean isPmemCheckEnabled(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java index daecc288796..002035bcdc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java @@ -64,4 +64,10 @@ static void decreaseResourceUtilization( * containersMonitor.getVmemRatio()); resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores); } + + /** + * Set the allocated resources for containers. + * @param resource Resources allocated for the containers. + */ + void setAllocatedResourcesForContainers(Resource resource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 6d3791e9550..87929101928 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -84,7 +84,9 @@ public class ContainersMonitorImpl extends AbstractService implements private static float vmemRatio; private Class processTreeClass; + /** Maximum virtual memory in bytes. */ private long maxVmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT; + /** Maximum physical memory in bytes. */ private long maxPmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT; private boolean pmemCheckEnabled; @@ -152,25 +154,23 @@ protected void serviceInit(Configuration myConf) throws Exception { long configuredPMemForContainers = NodeManagerHardwareUtils.getContainerMemoryMB( - this.resourceCalculatorPlugin, this.conf) * 1024 * 1024L; + this.resourceCalculatorPlugin, this.conf); - long configuredVCoresForContainers = - NodeManagerHardwareUtils.getVCores(this.resourceCalculatorPlugin, - this.conf); - - // Setting these irrespective of whether checks are enabled. Required in - // the UI. - // ///////// Physical memory configuration ////// - this.maxPmemAllottedForContainers = configuredPMemForContainers; - this.maxVCoresAllottedForContainers = configuredVCoresForContainers; + int configuredVCoresForContainers = + NodeManagerHardwareUtils.getVCores( + this.resourceCalculatorPlugin, this.conf); // ///////// Virtual memory configuration ////// vmemRatio = this.conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); Preconditions.checkArgument(vmemRatio > 0.99f, YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0"); - this.maxVmemAllottedForContainers = - (long) (vmemRatio * configuredPMemForContainers); + + // Setting these irrespective of whether checks are enabled. + // Required in the UI. + Resource resourcesForContainers = Resource.newInstance( + configuredPMemForContainers, configuredVCoresForContainers); + setAllocatedResourcesForContainers(resourcesForContainers); pmemCheckEnabled = this.conf.getBoolean( YarnConfiguration.NM_PMEM_CHECK_ENABLED, @@ -908,6 +908,16 @@ public long getVCoresAllocatedForContainers() { return this.maxVCoresAllottedForContainers; } + @Override + public void setAllocatedResourcesForContainers(final Resource resource) { + LOG.info("Setting the resources allocated to containers to {}", resource); + this.maxVCoresAllottedForContainers = resource.getVirtualCores(); + this.maxPmemAllottedForContainers = convertMBytesToBytes( + resource.getMemorySize()); + this.maxVmemAllottedForContainers = + (long) (getVmemRatio() * maxPmemAllottedForContainers); + } + /** * Is the total virtual memory check enabled? * @@ -973,10 +983,10 @@ private void onChangeMonitoringContainerResource( } LOG.info("Changing resource-monitoring for {}", containerId); updateContainerMetrics(monitoringEvent); - long pmemLimit = - changeEvent.getResource().getMemorySize() * 1024L * 1024L; + Resource resource = changeEvent.getResource(); + long pmemLimit = convertMBytesToBytes(resource.getMemorySize()); long vmemLimit = (long) (pmemLimit * vmemRatio); - int cpuVcores = changeEvent.getResource().getVirtualCores(); + int cpuVcores = resource.getVirtualCores(); processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores); } } @@ -999,4 +1009,13 @@ private void onStartMonitoringContainer( startEvent.getVmemLimit(), startEvent.getPmemLimit(), startEvent.getCpuVcores())); } + + /** + * Convert MegaBytes to Bytes. + * @param mb MegaBytes (MB). + * @return Bytes representing the input MB. + */ + private static long convertMBytesToBytes(long mb) { + return mb * 1024L * 1024L; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 5ba0bef763c..1b21b936543 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager; import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -57,6 +59,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.service.ServiceOperations; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; @@ -80,6 +83,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl; import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl; +import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceTracker; @@ -96,11 +100,13 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -118,6 +124,10 @@ @SuppressWarnings("rawtypes") public class TestNodeStatusUpdater extends NodeManagerTestBase { + + /** Bytes in a GigaByte. */ + private static final long GB = 1024L * 1024L * 1024L; + volatile int heartBeatID = 0; volatile Throwable nmStartError = null; private final List registeredNodes = new ArrayList(); @@ -1774,6 +1784,99 @@ public void run() { Assert.assertTrue("Test failed with exception(s)" + exceptions, exceptions.isEmpty()); } + + /** + * Test if the {@link NodeManager} updates the resources in the + * {@link ContainersMonitor} when the {@link ResourceManager} triggers the + * change. + * @throws Exception If the test cannot run. + */ + @Test + public void testUpdateNMResources() throws Exception { + + // The resource set for the Node Manager from the Resource Tracker + final Resource resource = Resource.newInstance(8 * 1024, 1); + + LOG.info("Start the Resource Tracker to mock heartbeats"); + Server resourceTracker = getMockResourceTracker(resource); + resourceTracker.start(); + + LOG.info("Start the Node Manager"); + NodeManager nodeManager = new NodeManager(); + YarnConfiguration nmConf = new YarnConfiguration(); + nmConf.setSocketAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + resourceTracker.getListenerAddress()); + nmConf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:0"); + nodeManager.init(nmConf); + nodeManager.start(); + + LOG.info("Initially the Node Manager should have the default resources"); + ContainerManager containerManager = nodeManager.getContainerManager(); + ContainersMonitor containerMonitor = + containerManager.getContainersMonitor(); + assertEquals(8, containerMonitor.getVCoresAllocatedForContainers()); + assertEquals(8 * GB, containerMonitor.getPmemAllocatedForContainers()); + + LOG.info("The first heartbeat should trigger a resource change to {}", + resource); + GenericTestUtils.waitFor( + () -> containerMonitor.getVCoresAllocatedForContainers() == 1, + 100, 2 * 1000); + assertEquals(8 * GB, containerMonitor.getPmemAllocatedForContainers()); + + resource.setVirtualCores(5); + resource.setMemorySize(4 * 1024); + LOG.info("Change the resources to {}", resource); + GenericTestUtils.waitFor( + () -> containerMonitor.getVCoresAllocatedForContainers() == 5, + 100, 2 * 1000); + assertEquals(4 * GB, containerMonitor.getPmemAllocatedForContainers()); + + LOG.info("Cleanup"); + nodeManager.stop(); + nodeManager.close(); + resourceTracker.stop(); + } + + /** + * Create a mock Resource Tracker server that returns the resources we want + * in the heartbeat. + * @param resource Resource to reply in the heartbeat. + * @return RPC server for the Resource Tracker. + * @throws Exception If it cannot create the Resource Tracker. + */ + private static Server getMockResourceTracker(final Resource resource) + throws Exception { + + // Setup the mock Resource Tracker + final ResourceTracker rt = mock(ResourceTracker.class); + when(rt.registerNodeManager(any())).thenAnswer(invocation -> { + RegisterNodeManagerResponse response = recordFactory.newRecordInstance( + RegisterNodeManagerResponse.class); + response.setContainerTokenMasterKey(createMasterKey()); + response.setNMTokenMasterKey(createMasterKey()); + return response; + }); + when(rt.nodeHeartbeat(any())).thenAnswer(invocation -> { + NodeHeartbeatResponse response = recordFactory.newRecordInstance( + NodeHeartbeatResponse.class); + response.setResource(resource); + return response; + }); + when(rt.unRegisterNodeManager(any())).thenAnswer(invocaiton -> { + UnRegisterNodeManagerResponse response = recordFactory.newRecordInstance( + UnRegisterNodeManagerResponse.class); + return response; + }); + + // Get the RPC server + YarnConfiguration conf = new YarnConfiguration(); + YarnRPC rpc = YarnRPC.create(conf); + Server server = rpc.getServer(ResourceTracker.class, rt, + new InetSocketAddress("0.0.0.0", 0), conf, null, 1); + return server; + } + // Add new containers info into NM context each time node heart beats. private class MyNMContext extends NMContext {