diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 880649e45cb..1491ed9a175 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -157,6 +157,9 @@ Release 2.8.0 - UNRELEASED YARN-3867. ContainerImpl changes to support container resizing. (Meng Ding via jianhe) + YARN-1643. Make ContainersMonitor support changing monitoring size of an + allocated container. (Meng Ding and Wangda Tan) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index afb51ad28bb..b3839d2aa10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -18,13 +18,11 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -32,12 +30,14 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; @@ -56,16 +56,16 @@ public class ContainersMonitorImpl extends AbstractService implements private boolean containerMetricsEnabled; private long containerMetricsPeriodMs; - final List containersToBeRemoved; - final Map containersToBeAdded; - Map trackingContainers = - new HashMap(); + @VisibleForTesting + final Map trackingContainers = + new ConcurrentHashMap<>(); - final ContainerExecutor containerExecutor; + private final ContainerExecutor containerExecutor; private final Dispatcher eventDispatcher; private final Context context; private ResourceCalculatorPlugin resourceCalculatorPlugin; private Configuration conf; + private static float vmemRatio; private Class processTreeClass; private long maxVmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT; @@ -82,6 +82,8 @@ public class ContainersMonitorImpl extends AbstractService implements private ResourceUtilization containersUtilization; + private volatile boolean stopped = false; + public ContainersMonitorImpl(ContainerExecutor exec, AsyncDispatcher dispatcher, Context context) { super("containers-monitor"); @@ -90,8 +92,6 @@ public class ContainersMonitorImpl extends AbstractService implements this.eventDispatcher = dispatcher; this.context = context; - this.containersToBeAdded = new HashMap(); - this.containersToBeRemoved = new ArrayList(); this.monitoringThread = new MonitoringThread(); this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f); @@ -140,7 +140,7 @@ public class ContainersMonitorImpl extends AbstractService implements this.maxVCoresAllottedForContainers = configuredVCoresForContainers; // ///////// Virtual memory configuration ////// - float vmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, + vmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); Preconditions.checkArgument(vmemRatio > 0.99f, YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0"); @@ -218,6 +218,7 @@ public class ContainersMonitorImpl extends AbstractService implements @Override protected void serviceStop() throws Exception { if (containersMonitorEnabled) { + stopped = true; this.monitoringThread.interrupt(); try { this.monitoringThread.join(); @@ -228,7 +229,8 @@ public class ContainersMonitorImpl extends AbstractService implements super.serviceStop(); } - private static class ProcessTreeInfo { + @VisibleForTesting + static class ProcessTreeInfo { private ContainerId containerId; private String pid; private ResourceCalculatorProcessTree pTree; @@ -267,26 +269,43 @@ public class ContainersMonitorImpl extends AbstractService implements this.pTree = pTree; } - public long getVmemLimit() { + /** + * @return Virtual memory limit for the process tree in bytes + */ + public synchronized long getVmemLimit() { return this.vmemLimit; } /** * @return Physical memory limit for the process tree in bytes */ - public long getPmemLimit() { + public synchronized long getPmemLimit() { return this.pmemLimit; } /** - * Return the number of cpu vcores assigned - * @return + * @return Number of cpu vcores assigned */ - public int getCpuVcores() { + public synchronized int getCpuVcores() { return this.cpuVcores; } - } + /** + * Set resource limit for enforcement + * @param pmemLimit + * Physical memory limit for the process tree in bytes + * @param vmemLimit + * Virtual memory limit for the process tree in bytes + * @param cpuVcores + * Number of cpu vcores assigned + */ + public synchronized void setResourceLimit( + long pmemLimit, long vmemLimit, int cpuVcores) { + this.pmemLimit = pmemLimit; + this.vmemLimit = vmemLimit; + this.cpuVcores = cpuVcores; + } + } /** * Check whether a container's process tree's current memory usage is over @@ -359,8 +378,7 @@ public class ContainersMonitorImpl extends AbstractService implements @Override public void run() { - while (true) { - + while (!stopped && !Thread.currentThread().isInterrupted()) { // Print the processTrees for debugging. if (LOG.isDebugEnabled()) { StringBuilder tmp = new StringBuilder("[ "); @@ -372,31 +390,6 @@ public class ContainersMonitorImpl extends AbstractService implements + tmp.substring(0, tmp.length()) + "]"); } - // Add new containers - synchronized (containersToBeAdded) { - for (Entry entry : containersToBeAdded - .entrySet()) { - ContainerId containerId = entry.getKey(); - ProcessTreeInfo processTreeInfo = entry.getValue(); - LOG.info("Starting resource-monitoring for " + containerId); - trackingContainers.put(containerId, processTreeInfo); - } - containersToBeAdded.clear(); - } - - // Remove finished containers - synchronized (containersToBeRemoved) { - for (ContainerId containerId : containersToBeRemoved) { - if (containerMetricsEnabled) { - ContainerMetrics.forContainer( - containerId, containerMetricsPeriodMs).finished(); - } - trackingContainers.remove(containerId); - LOG.info("Stopping resource-monitoring for " + containerId); - } - containersToBeRemoved.clear(); - } - // Temporary structure to calculate the total resource utilization of // the containers ResourceUtilization trackedContainersUtilization = @@ -408,10 +401,8 @@ public class ContainersMonitorImpl extends AbstractService implements long pmemByAllContainers = 0; long cpuUsagePercentPerCoreByAllContainers = 0; long cpuUsageTotalCoresByAllContainers = 0; - for (Iterator> it = - trackingContainers.entrySet().iterator(); it.hasNext();) { - - Map.Entry entry = it.next(); + for (Entry entry : trackingContainers + .entrySet()) { ContainerId containerId = entry.getKey(); ProcessTreeInfo ptInfo = entry.getValue(); try { @@ -435,11 +426,6 @@ public class ContainersMonitorImpl extends AbstractService implements if (containerMetricsEnabled) { ContainerMetrics usageMetrics = ContainerMetrics .forContainer(containerId, containerMetricsPeriodMs); - int cpuVcores = ptInfo.getCpuVcores(); - final int vmemLimit = (int) (ptInfo.getVmemLimit() >> 20); - final int pmemLimit = (int) (ptInfo.getPmemLimit() >> 20); - usageMetrics.recordResourceLimit( - vmemLimit, pmemLimit, cpuVcores); usageMetrics.recordProcessId(pId); } } @@ -548,7 +534,7 @@ public class ContainersMonitorImpl extends AbstractService implements eventDispatcher.getEventHandler().handle( new ContainerKillEvent(containerId, containerExitStatus, msg)); - it.remove(); + trackingContainers.remove(containerId); LOG.info("Removed ProcessTree with root " + pId); } } catch (Exception e) { @@ -605,6 +591,60 @@ public class ContainersMonitorImpl extends AbstractService implements } } + private void changeContainerResource( + ContainerId containerId, Resource resource) { + Container container = context.getContainers().get(containerId); + // Check container existence + if (container == null) { + LOG.warn("Container " + containerId.toString() + "does not exist"); + return; + } + container.setResource(resource); + } + + private void updateContainerMetrics(ContainersMonitorEvent monitoringEvent) { + if (!containerMetricsEnabled || monitoringEvent == null) { + return; + } + + ContainerId containerId = monitoringEvent.getContainerId(); + ContainerMetrics usageMetrics = ContainerMetrics + .forContainer(containerId, containerMetricsPeriodMs); + + int vmemLimitMBs; + int pmemLimitMBs; + int cpuVcores; + switch (monitoringEvent.getType()) { + case START_MONITORING_CONTAINER: + ContainerStartMonitoringEvent startEvent = + (ContainerStartMonitoringEvent) monitoringEvent; + usageMetrics.recordStateChangeDurations( + startEvent.getLaunchDuration(), + startEvent.getLocalizationDuration()); + cpuVcores = startEvent.getCpuVcores(); + vmemLimitMBs = (int) (startEvent.getVmemLimit() >> 20); + pmemLimitMBs = (int) (startEvent.getPmemLimit() >> 20); + usageMetrics.recordResourceLimit( + vmemLimitMBs, pmemLimitMBs, cpuVcores); + break; + case STOP_MONITORING_CONTAINER: + usageMetrics.finished(); + break; + case CHANGE_MONITORING_CONTAINER_RESOURCE: + ChangeMonitoringContainerResourceEvent changeEvent = + (ChangeMonitoringContainerResourceEvent) monitoringEvent; + Resource resource = changeEvent.getResource(); + pmemLimitMBs = resource.getMemory(); + vmemLimitMBs = (int) (pmemLimitMBs * vmemRatio); + cpuVcores = resource.getVirtualCores(); + usageMetrics.recordResourceLimit( + vmemLimitMBs, pmemLimitMBs, cpuVcores); + break; + default: + break; + } + } + @Override public long getVmemAllocatedForContainers() { return this.maxVmemAllottedForContainers; @@ -650,38 +690,53 @@ public class ContainersMonitorImpl extends AbstractService implements } @Override + @SuppressWarnings("unchecked") public void handle(ContainersMonitorEvent monitoringEvent) { - + ContainerId containerId = monitoringEvent.getContainerId(); if (!containersMonitorEnabled) { + if (monitoringEvent.getType() == ContainersMonitorEventType + .CHANGE_MONITORING_CONTAINER_RESOURCE) { + // Nothing to enforce. Update container resource immediately. + ChangeMonitoringContainerResourceEvent changeEvent = + (ChangeMonitoringContainerResourceEvent) monitoringEvent; + changeContainerResource(containerId, changeEvent.getResource()); + } return; } - ContainerId containerId = monitoringEvent.getContainerId(); switch (monitoringEvent.getType()) { case START_MONITORING_CONTAINER: ContainerStartMonitoringEvent startEvent = (ContainerStartMonitoringEvent) monitoringEvent; - - if (containerMetricsEnabled) { - ContainerMetrics usageMetrics = ContainerMetrics - .forContainer(containerId, containerMetricsPeriodMs); - usageMetrics.recordStateChangeDurations( - startEvent.getLaunchDuration(), - startEvent.getLocalizationDuration()); - } - - synchronized (this.containersToBeAdded) { - ProcessTreeInfo processTreeInfo = - new ProcessTreeInfo(containerId, null, null, - startEvent.getVmemLimit(), startEvent.getPmemLimit(), - startEvent.getCpuVcores()); - this.containersToBeAdded.put(containerId, processTreeInfo); - } + LOG.info("Starting resource-monitoring for " + containerId); + updateContainerMetrics(monitoringEvent); + trackingContainers.put(containerId, + new ProcessTreeInfo(containerId, null, null, + startEvent.getVmemLimit(), startEvent.getPmemLimit(), + startEvent.getCpuVcores())); break; case STOP_MONITORING_CONTAINER: - synchronized (this.containersToBeRemoved) { - this.containersToBeRemoved.add(containerId); + LOG.info("Stopping resource-monitoring for " + containerId); + updateContainerMetrics(monitoringEvent); + trackingContainers.remove(containerId); + break; + case CHANGE_MONITORING_CONTAINER_RESOURCE: + ChangeMonitoringContainerResourceEvent changeEvent = + (ChangeMonitoringContainerResourceEvent) monitoringEvent; + ProcessTreeInfo processTreeInfo = trackingContainers.get(containerId); + if (processTreeInfo == null) { + LOG.warn("Failed to track container " + + containerId.toString() + + ". It may have already completed."); + break; } + LOG.info("Changing resource-monitoring for " + containerId); + updateContainerMetrics(monitoringEvent); + long pmemLimit = changeEvent.getResource().getMemory() * 1024L * 1024L; + long vmemLimit = (long) (pmemLimit * vmemRatio); + int cpuVcores = changeEvent.getResource().getVirtualCores(); + processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores); + changeContainerResource(containerId, changeEvent.getResource()); break; default: // TODO: Wrong event. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index 9a052783057..75bcdaef9ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -211,6 +211,17 @@ public class TestContainerManagerWithLCE extends TestContainerManager { super.testIncreaseContainerResourceWithInvalidResource(); } + @Override + public void testChangeContainerResource() throws Exception { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testChangeContainerResource"); + super.testChangeContainerResource(); + } + private boolean shouldRunTest() { return System .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index e2f12ba9d5e..2ea9146b71b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -1046,6 +1046,102 @@ public class TestContainerManager extends BaseContainerManagerTest { } } + @Test + public void testChangeContainerResource() throws Exception { + containerManager.start(); + File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); + PrintWriter fileWriter = new PrintWriter(scriptFile); + // Construct the Container-id + ContainerId cId = createContainerId(0); + if (Shell.WINDOWS) { + fileWriter.println("@ping -n 100 127.0.0.1 >nul"); + } else { + fileWriter.write("\numask 0"); + fileWriter.write("\nexec sleep 100"); + } + fileWriter.close(); + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + URL resource_alpha = + ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource rsrc_alpha = + recordFactory.newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file"; + Map localResources = + new HashMap(); + localResources.put(destinationFile, rsrc_alpha); + containerLaunchContext.setLocalResources(localResources); + List commands = + Arrays.asList(Shell.getRunScriptCommand(scriptFile)); + containerLaunchContext.setCommands(commands); + StartContainerRequest scRequest = + StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(cId, DUMMY_RM_IDENTIFIER, + context.getNodeId(), user, + context.getContainerTokenSecretManager())); + List list = new ArrayList(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + // Make sure the container reaches RUNNING state + BaseContainerManagerTest.waitForNMContainerState(containerManager, cId, + org.apache.hadoop.yarn.server.nodemanager. + containermanager.container.ContainerState.RUNNING); + // Construct container resource increase request, + List increaseTokens = new ArrayList(); + // Add increase request. + Resource targetResource = Resource.newInstance(4096, 2); + Token containerToken = createContainerToken(cId, DUMMY_RM_IDENTIFIER, + context.getNodeId(), user, targetResource, + context.getContainerTokenSecretManager(), null); + increaseTokens.add(containerToken); + IncreaseContainersResourceRequest increaseRequest = + IncreaseContainersResourceRequest.newInstance(increaseTokens); + IncreaseContainersResourceResponse increaseResponse = + containerManager.increaseContainersResource(increaseRequest); + Assert.assertEquals( + 1, increaseResponse.getSuccessfullyIncreasedContainers().size()); + Assert.assertTrue(increaseResponse.getFailedRequests().isEmpty()); + // Check status + List containerIds = new ArrayList<>(); + containerIds.add(cId); + GetContainerStatusesRequest gcsRequest = + GetContainerStatusesRequest.newInstance(containerIds); + ContainerStatus containerStatus = containerManager + .getContainerStatuses(gcsRequest).getContainerStatuses().get(0); + // Check status immediately as resource increase is blocking + assertEquals(targetResource, containerStatus.getCapability()); + // Simulate a decrease request + List containersToDecrease + = new ArrayList<>(); + targetResource = Resource.newInstance(2048, 2); + org.apache.hadoop.yarn.api.records.Container decreasedContainer = + org.apache.hadoop.yarn.api.records.Container + .newInstance(cId, null, null, targetResource, null, null); + containersToDecrease.add(decreasedContainer); + containerManager.handle( + new CMgrDecreaseContainersResourceEvent(containersToDecrease)); + // Check status with retry + containerStatus = containerManager + .getContainerStatuses(gcsRequest).getContainerStatuses().get(0); + int retry = 0; + while (!targetResource.equals(containerStatus.getCapability()) && + (retry++ < 5)) { + Thread.sleep(200); + containerStatus = containerManager.getContainerStatuses(gcsRequest) + .getContainerStatuses().get(0); + } + assertEquals(targetResource, containerStatus.getCapability()); + } + public static Token createContainerToken(ContainerId cId, long rmIdentifier, NodeId nodeId, String user, NMContainerTokenSecretManager containerTokenSecretManager) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java new file mode 100644 index 00000000000..4a18a8c93ef --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; + +public class MockResourceCalculatorPlugin extends ResourceCalculatorPlugin { + + @Override + public long getVirtualMemorySize() { + return 0; + } + + @Override + public long getPhysicalMemorySize() { + return 0; + } + + @Override + public long getAvailableVirtualMemorySize() { + return 0; + } + + @Override + public long getAvailablePhysicalMemorySize() { + return 0; + } + + @Override + public int getNumProcessors() { + return 0; + } + + @Override + public int getNumCores() { + return 0; + } + + @Override + public long getCpuFrequency() { + return 0; + } + + @Override + public long getCumulativeCpuTime() { + return 0; + } + + @Override + public float getCpuUsage() { + return 0; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java new file mode 100644 index 00000000000..c5aaa77b6c9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; + +public class MockResourceCalculatorProcessTree extends ResourceCalculatorProcessTree { + + private long rssMemorySize = 0; + + public MockResourceCalculatorProcessTree(String root) { + super(root); + } + + @Override + public void updateProcessTree() { + } + + @Override + public String getProcessTreeDump() { + return ""; + } + + @Override + public long getCumulativeCpuTime() { + return 0; + } + + @Override + public boolean checkPidPgrpidForMatch() { + return true; + } + + public void setRssMemorySize(long rssMemorySize) { + this.rssMemorySize = rssMemorySize; + } + + public long getRssMemorySize() { + return this.rssMemorySize; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java new file mode 100644 index 00000000000..d7f89fc0b0e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +public class TestContainersMonitorResourceChange { + + private ContainersMonitorImpl containersMonitor; + private MockExecutor executor; + private Configuration conf; + private AsyncDispatcher dispatcher; + private Context context; + private MockContainerEventHandler containerEventHandler; + + private static class MockExecutor extends ContainerExecutor { + @Override + public void init() throws IOException { + } + @Override + public void startLocalizer(LocalizerStartContext ctx) + throws IOException, InterruptedException { + } + @Override + public int launchContainer(ContainerStartContext ctx) throws + IOException { + return 0; + } + @Override + public boolean signalContainer(ContainerSignalContext ctx) + throws IOException { + return true; + } + @Override + public void deleteAsUser(DeletionAsUserContext ctx) + throws IOException, InterruptedException { + } + @Override + public String getProcessId(ContainerId containerId) { + return String.valueOf(containerId.getContainerId()); + } + @Override + public boolean isContainerAlive(ContainerLivenessContext ctx) + throws IOException { + return true; + } + } + + private static class MockContainerEventHandler implements + EventHandler { + final private Set killedContainer + = new HashSet<>(); + @Override + public void handle(ContainerEvent event) { + if (event.getType() == ContainerEventType.KILL_CONTAINER) { + synchronized (killedContainer) { + killedContainer.add(event.getContainerID()); + } + } + } + public boolean isContainerKilled(ContainerId containerId) { + synchronized (killedContainer) { + return killedContainer.contains(containerId); + } + } + } + + @Before + public void setup() { + executor = new MockExecutor(); + dispatcher = new AsyncDispatcher(); + context = Mockito.mock(Context.class); + Mockito.doReturn(new ConcurrentSkipListMap()) + .when(context).getContainers(); + conf = new Configuration(); + conf.set( + YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, + MockResourceCalculatorPlugin.class.getCanonicalName()); + conf.set( + YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, + MockResourceCalculatorProcessTree.class.getCanonicalName()); + dispatcher.init(conf); + dispatcher.start(); + containerEventHandler = new MockContainerEventHandler(); + dispatcher.register(ContainerEventType.class, containerEventHandler); + } + + @After + public void tearDown() throws Exception { + if (containersMonitor != null) { + containersMonitor.stop(); + } + if (dispatcher != null) { + dispatcher.stop(); + } + } + + @Test + public void testContainersResourceChange() throws Exception { + // set container monitor interval to be 20ms + conf.setLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS, 20L); + containersMonitor = createContainersMonitor(executor, dispatcher, context); + containersMonitor.init(conf); + containersMonitor.start(); + // create container 1 + containersMonitor.handle(new ContainerStartMonitoringEvent( + getContainerId(1), 2100L, 1000L, 1, 0, 0)); + // verify that this container is properly tracked + assertNotNull(getProcessTreeInfo(getContainerId(1))); + assertEquals(1000L, getProcessTreeInfo(getContainerId(1)) + .getPmemLimit()); + assertEquals(2100L, getProcessTreeInfo(getContainerId(1)) + .getVmemLimit()); + // sleep longer than the monitor interval to make sure resource + // enforcement has started + Thread.sleep(200); + // increase pmem usage, the container should be killed + MockResourceCalculatorProcessTree mockTree = + (MockResourceCalculatorProcessTree) getProcessTreeInfo( + getContainerId(1)).getProcessTree(); + mockTree.setRssMemorySize(2500L); + // verify that this container is killed + Thread.sleep(200); + assertTrue(containerEventHandler + .isContainerKilled(getContainerId(1))); + // create container 2 + containersMonitor.handle(new ContainerStartMonitoringEvent( + getContainerId(2), 2202009L, 1048576L, 1, 0, 0)); + // verify that this container is properly tracked + assertNotNull(getProcessTreeInfo(getContainerId(2))); + assertEquals(1048576L, getProcessTreeInfo(getContainerId(2)) + .getPmemLimit()); + assertEquals(2202009L, getProcessTreeInfo(getContainerId(2)) + .getVmemLimit()); + // trigger a change resource event, check limit after change + containersMonitor.handle(new ChangeMonitoringContainerResourceEvent( + getContainerId(2), Resource.newInstance(2, 1))); + assertEquals(2097152L, getProcessTreeInfo(getContainerId(2)) + .getPmemLimit()); + assertEquals(4404019L, getProcessTreeInfo(getContainerId(2)) + .getVmemLimit()); + // sleep longer than the monitor interval to make sure resource + // enforcement has started + Thread.sleep(200); + // increase pmem usage, the container should NOT be killed + mockTree = + (MockResourceCalculatorProcessTree) getProcessTreeInfo( + getContainerId(2)).getProcessTree(); + mockTree.setRssMemorySize(2000000L); + // verify that this container is not killed + Thread.sleep(200); + assertFalse(containerEventHandler + .isContainerKilled(getContainerId(2))); + containersMonitor.stop(); + } + + @Test + public void testContainersResourceChangeIsTriggeredImmediately() + throws Exception { + // set container monitor interval to be 20s + conf.setLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS, 20000L); + containersMonitor = createContainersMonitor(executor, dispatcher, context); + containersMonitor.init(conf); + containersMonitor.start(); + // sleep 1 second to make sure the container monitor thread is + // now waiting for the next monitor cycle + Thread.sleep(1000); + // create a container with id 3 + containersMonitor.handle(new ContainerStartMonitoringEvent( + getContainerId(3), 2202009L, 1048576L, 1, 0, 0)); + // Verify that this container has been tracked + assertNotNull(getProcessTreeInfo(getContainerId(3))); + // trigger a change resource event, check limit after change + containersMonitor.handle(new ChangeMonitoringContainerResourceEvent( + getContainerId(3), Resource.newInstance(2, 1))); + // verify that this container has been properly tracked with the + // correct size + assertEquals(2097152L, getProcessTreeInfo(getContainerId(3)) + .getPmemLimit()); + assertEquals(4404019L, getProcessTreeInfo(getContainerId(3)) + .getVmemLimit()); + containersMonitor.stop(); + } + + private ContainersMonitorImpl createContainersMonitor( + ContainerExecutor containerExecutor, AsyncDispatcher dispatcher, + Context context) { + return new ContainersMonitorImpl(containerExecutor, dispatcher, context); + } + + private ContainerId getContainerId(int id) { + return ContainerId.newContainerId(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(123456L, 1), 1), id); + } + + private ProcessTreeInfo getProcessTreeInfo(ContainerId id) { + return containersMonitor.trackingContainers.get(id); + } +}