YARN-7064. Use cgroup to get container resource utilization. (Miklos Szegedi via Haibo Chen)

This commit is contained in:
Haibo Chen 2018-01-26 16:27:31 -08:00
parent 6eef3d7f1a
commit 649ef7ac33
17 changed files with 1183 additions and 75 deletions

View File

@ -1357,22 +1357,20 @@ public class YarnConfiguration extends Configuration {
public static final String NM_MEMORY_RESOURCE_PREFIX = NM_PREFIX
+ "resource.memory.";
@Private
public static final String NM_MEMORY_RESOURCE_ENABLED =
NM_MEMORY_RESOURCE_PREFIX + "enabled";
@Private
public static final boolean DEFAULT_NM_MEMORY_RESOURCE_ENABLED = false;
@Private
public static final String NM_MEMORY_RESOURCE_ENFORCED =
NM_MEMORY_RESOURCE_PREFIX + "enforced";
public static final boolean DEFAULT_NM_MEMORY_RESOURCE_ENFORCED = true;
public static final String NM_MEMORY_RESOURCE_CGROUPS_SWAPPINESS =
NM_MEMORY_RESOURCE_PREFIX + "cgroups.swappiness";
@Private
public static final int DEFAULT_NM_MEMORY_RESOURCE_CGROUPS_SWAPPINESS = 0;
@Private
public static final String NM_MEMORY_RESOURCE_CGROUPS_SOFT_LIMIT_PERCENTAGE =
NM_MEMORY_RESOURCE_PREFIX + "cgroups.soft-limit-percentage";
@Private
public static final float
DEFAULT_NM_MEMORY_RESOURCE_CGROUPS_SOFT_LIMIT_PERCENTAGE =
90.0f;

View File

@ -158,8 +158,6 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
.add(YarnConfiguration.NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT);
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_DISK_RESOURCE_ENABLED);
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_MEMORY_RESOURCE_PREFIX);
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED);
configurationPrefixToSkipCompare.add(

View File

@ -468,6 +468,9 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
@Override
public float getCpuUsagePercent() {
BigInteger processTotalJiffies = getTotalProcessJiffies();
if (LOG.isDebugEnabled()) {
LOG.debug("Process " + pid + " jiffies:" + processTotalJiffies);
}
cpuTimeTracker.updateElapsedJiffies(processTotalJiffies,
clock.getTime());
return cpuTimeTracker.getCpuTrackerUsagePercent();

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* Interface class to obtain process resource usage
@ -50,6 +51,13 @@ public abstract class ResourceCalculatorProcessTree extends Configured {
public ResourceCalculatorProcessTree(String root) {
}
/**
* Initialize the object.
* @throws YarnException Throws an exception on error.
*/
public void initialize() throws YarnException {
}
/**
* Update the process-tree with latest state.
*
@ -168,6 +176,7 @@ public abstract class ResourceCalculatorProcessTree extends Configured {
Constructor <? extends ResourceCalculatorProcessTree> c = clazz.getConstructor(String.class);
ResourceCalculatorProcessTree rctree = c.newInstance(pid);
rctree.setConf(conf);
rctree.initialize();
return rctree;
} catch(Exception e) {
throw new RuntimeException(e);

View File

@ -1308,6 +1308,37 @@
<value>-1</value>
</property>
<property>
<description>Whether YARN CGroups memory tracking is enabled.</description>
<name>yarn.nodemanager.resource.memory.enabled</name>
<value>false</value>
</property>
<property>
<description>Whether YARN CGroups strict memory enforcement is enabled.
</description>
<name>yarn.nodemanager.resource.memory.enforced</name>
<value>true</value>
</property>
<property>
<description>If memory limit is enforced, this the percentage of soft limit
compared to the memory assigned to the container. If there is memory
pressure container memory usage will be pushed back to its soft limit
by swapping out memory.
</description>
<name>yarn.nodemanager.resource.memory.cgroups.soft-limit-percentage</name>
<value>90.0</value>
</property>
<property>
<description>Container swappiness is the likelihood a page will be swapped
out compared to be kept in memory. Value is between 0-100.
</description>
<name>yarn.nodemanager.resource.memory.cgroups.swappiness</name>
<value>0</value>
</property>
<property>
<description>Whether physical memory limits will be enforced for
containers.</description>
@ -1622,7 +1653,8 @@
or be allowed to consume spare resources if they need them. For example, turning the
flag on will restrict apps to use only their share of CPU, even if the node has spare
CPU cycles. The default value is false i.e. use available resources. Please note that
turning this flag on may reduce job throughput on the cluster.</description>
turning this flag on may reduce job throughput on the cluster. This setting does
not apply to other subsystems like memory.</description>
<name>yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage</name>
<value>false</value>
</property>

View File

@ -54,7 +54,7 @@ public interface CGroupsHandler {
this.name = name;
}
String getName() {
public String getName() {
return name;
}
@ -112,6 +112,13 @@ public interface CGroupsHandler {
void deleteCGroup(CGroupController controller, String cGroupId) throws
ResourceHandlerException;
/**
* Gets the absolute path to the specified cgroup controller.
* @param controller - controller type for the cgroup
* @return the root of the controller.
*/
String getControllerPath(CGroupController controller);
/**
* Gets the relative path for the cgroup, independent of a controller, for a
* given cgroup id.

View File

@ -125,7 +125,8 @@ class CGroupsHandlerImpl implements CGroupsHandler {
initializeControllerPaths();
}
private String getControllerPath(CGroupController controller) {
@Override
public String getControllerPath(CGroupController controller) {
try {
rwLock.readLock().lock();
return controllerPaths.get(controller);

View File

@ -52,6 +52,7 @@ public class CGroupsMemoryResourceHandlerImpl implements MemoryResourceHandler {
private static final int OPPORTUNISTIC_SOFT_LIMIT = 0;
private CGroupsHandler cGroupsHandler;
private boolean enforce = true;
private int swappiness = 0;
// multiplier to set the soft limit - value should be between 0 and 1
private float softLimit = 0.0f;
@ -79,6 +80,9 @@ public class CGroupsMemoryResourceHandlerImpl implements MemoryResourceHandler {
throw new ResourceHandlerException(msg);
}
this.cGroupsHandler.initializeCGroupController(MEMORY);
enforce = conf.getBoolean(
YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED,
YarnConfiguration.DEFAULT_NM_MEMORY_RESOURCE_ENFORCED);
swappiness = conf
.getInt(YarnConfiguration.NM_MEMORY_RESOURCE_CGROUPS_SWAPPINESS,
YarnConfiguration.DEFAULT_NM_MEMORY_RESOURCE_CGROUPS_SWAPPINESS);
@ -124,6 +128,7 @@ public class CGroupsMemoryResourceHandlerImpl implements MemoryResourceHandler {
(long) (container.getResource().getMemorySize() * this.softLimit);
long containerHardLimit = container.getResource().getMemorySize();
cGroupsHandler.createCGroup(MEMORY, cgroupId);
if (enforce) {
try {
cGroupsHandler.updateCGroupParam(MEMORY, cgroupId,
CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES,
@ -150,6 +155,7 @@ public class CGroupsMemoryResourceHandlerImpl implements MemoryResourceHandler {
LOG.warn("Could not update cgroup for container", re);
throw re;
}
}
List<PrivilegedOperation> ret = new ArrayList<>();
ret.add(new PrivilegedOperation(
PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,

View File

@ -0,0 +1,357 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.CpuTimeTracker;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.SysInfoLinux;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.hadoop.yarn.util.SystemClock;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* A cgroups file-system based Resource calculator without the process tree
* features.
*
* CGroups has its limitations. It can only be enabled, if both CPU and memory
* cgroups are enabled with yarn.nodemanager.resource.cpu.enabled and
* yarn.nodemanager.resource.memory.enabled respectively. This means that
* memory limits are enforced by default. You can turn this off and keep
* memory reporting only with yarn.nodemanager.resource.memory.enforced.
*
* Another limitation is virtual memory measurement. CGroups does not have the
* ability to measure virtual memory usage. This includes memory reserved but
* not used. CGroups measures used memory as sa sum of
* physical memory and swap usage. This will be returned in the virtual
* memory counters.
* If the real virtual memory is required please use the legacy procfs based
* resource calculator or CombinedResourceCalculator.
*/
public class CGroupsResourceCalculator extends ResourceCalculatorProcessTree {
enum Result {
Continue,
Exit
}
protected static final Log LOG = LogFactory
.getLog(CGroupsResourceCalculator.class);
private static final String PROCFS = "/proc";
static final String CGROUP = "cgroup";
static final String CPU_STAT = "cpuacct.stat";
static final String MEM_STAT = "memory.usage_in_bytes";
static final String MEMSW_STAT = "memory.memsw.usage_in_bytes";
private static final String USER = "user ";
private static final String SYSTEM = "system ";
private static final Pattern CGROUP_FILE_FORMAT = Pattern.compile(
"^(\\d+):([^:]+):/(.*)$");
private final String procfsDir;
private CGroupsHandler cGroupsHandler;
private String pid;
private File cpuStat;
private File memStat;
private File memswStat;
private BigInteger processTotalJiffies;
private long processPhysicalMemory;
private long processVirtualMemory;
private final long jiffyLengthMs;
private final CpuTimeTracker cpuTimeTracker;
private Clock clock;
/**
* Create resource calculator for all Yarn containers.
*/
public CGroupsResourceCalculator()
throws YarnException {
this(null, PROCFS, ResourceHandlerModule.getCGroupsHandler(),
SystemClock.getInstance(), SysInfoLinux.JIFFY_LENGTH_IN_MILLIS);
}
/**
* Create resource calculator for the container that has the specified pid.
* @param pid A pid from the cgroup or null for all containers
*/
public CGroupsResourceCalculator(String pid) {
this(pid, PROCFS, ResourceHandlerModule.getCGroupsHandler(),
SystemClock.getInstance(), SysInfoLinux.JIFFY_LENGTH_IN_MILLIS);
}
/**
* Create resource calculator for testing.
* @param pid A pid from the cgroup or null for all containers
* @param procfsDir Path to /proc or a mock /proc directory
* @param cGroupsHandler Initialized cgroups handler object
* @param clock A clock object
* @param jiffyLengthMs0 Jiffy length in milliseconds
*/
@VisibleForTesting
CGroupsResourceCalculator(String pid, String procfsDir,
CGroupsHandler cGroupsHandler,
Clock clock,
long jiffyLengthMs0) {
super(pid);
this.procfsDir = procfsDir;
this.cGroupsHandler = cGroupsHandler;
this.pid = pid != null && pid.equals("0") ? "1" : pid;
this.jiffyLengthMs = jiffyLengthMs0;
this.cpuTimeTracker =
new CpuTimeTracker(this.jiffyLengthMs);
this.clock = clock;
this.processTotalJiffies = BigInteger.ZERO;
this.processPhysicalMemory = UNAVAILABLE;
this.processVirtualMemory = UNAVAILABLE;
}
@Override
public void initialize() throws YarnException {
if (!CGroupsResourceCalculator.isAvailable()) {
throw new YarnException("CGroupsResourceCalculator is not available");
}
setCGroupFilePaths();
}
@Override
public float getCpuUsagePercent() {
if (LOG.isDebugEnabled()) {
LOG.debug("Process " + pid + " jiffies:" + processTotalJiffies);
}
return cpuTimeTracker.getCpuTrackerUsagePercent();
}
@Override
public long getCumulativeCpuTime() {
if (jiffyLengthMs < 0) {
return UNAVAILABLE;
}
return processTotalJiffies.longValue() * jiffyLengthMs;
}
@Override
public long getRssMemorySize(int olderThanAge) {
if (olderThanAge > 1) {
return UNAVAILABLE;
}
return processPhysicalMemory;
}
@Override
public long getVirtualMemorySize(int olderThanAge) {
if (olderThanAge > 1) {
return UNAVAILABLE;
}
return processVirtualMemory;
}
@Override
public void updateProcessTree() {
try {
this.processTotalJiffies = readTotalProcessJiffies();
cpuTimeTracker.updateElapsedJiffies(processTotalJiffies,
clock.getTime());
} catch (YarnException e) {
LOG.warn("Failed to parse " + pid, e);
}
processPhysicalMemory = getMemorySize(memStat);
if (memswStat.exists()) {
processVirtualMemory = getMemorySize(memswStat);
} else if(LOG.isDebugEnabled()) {
LOG.debug("Swap cgroups monitoring is not compiled into the kernel " +
memswStat.getAbsolutePath().toString());
}
}
@Override
public String getProcessTreeDump() {
// We do not have a process tree in cgroups return just the pid for tracking
return pid;
}
@Override
public boolean checkPidPgrpidForMatch() {
// We do not have a process tree in cgroups returning default ok
return true;
}
/**
* Checks if the CGroupsResourceCalculator is available on this system.
* This assumes that Linux container executor is already initialized.
*
* @return true if CGroupsResourceCalculator is available. False otherwise.
*/
public static boolean isAvailable() {
try {
if (!Shell.LINUX) {
LOG.info("CGroupsResourceCalculator currently is supported only on "
+ "Linux.");
return false;
}
if (ResourceHandlerModule.getCGroupsHandler() == null ||
ResourceHandlerModule.getCpuResourceHandler() == null ||
ResourceHandlerModule.getMemoryResourceHandler() == null) {
LOG.info("CGroupsResourceCalculator requires enabling CGroups" +
"cpu and memory");
return false;
}
} catch (SecurityException se) {
LOG.warn("Failed to get Operating System name. " + se);
return false;
}
return true;
}
private long getMemorySize(File cgroupUsageFile) {
long[] mem = new long[1];
try {
processFile(cgroupUsageFile, (String line) -> {
mem[0] = Long.parseLong(line);
return Result.Exit;
});
return mem[0];
} catch (YarnException e) {
LOG.warn("Failed to parse cgroups " + memswStat, e);
}
return UNAVAILABLE;
}
private BigInteger readTotalProcessJiffies() throws YarnException {
final BigInteger[] totalCPUTimeJiffies = new BigInteger[1];
totalCPUTimeJiffies[0] = BigInteger.ZERO;
processFile(cpuStat, (String line) -> {
if (line.startsWith(USER)) {
totalCPUTimeJiffies[0] = totalCPUTimeJiffies[0].add(
new BigInteger(line.substring(USER.length())));
}
if (line.startsWith(SYSTEM)) {
totalCPUTimeJiffies[0] = totalCPUTimeJiffies[0].add(
new BigInteger(line.substring(SYSTEM.length())));
}
return Result.Continue;
});
return totalCPUTimeJiffies[0];
}
private String getCGroupRelativePath(
CGroupsHandler.CGroupController controller)
throws YarnException {
if (pid == null) {
return cGroupsHandler.getRelativePathForCGroup("");
} else {
return getCGroupRelativePathForPid(controller);
}
}
private String getCGroupRelativePathForPid(
CGroupsHandler.CGroupController controller)
throws YarnException {
File pidCgroupFile = new File(new File(procfsDir, pid), CGROUP);
String[] result = new String[1];
processFile(pidCgroupFile, (String line)->{
Matcher m = CGROUP_FILE_FORMAT.matcher(line);
boolean mat = m.find();
if (mat) {
if (m.group(2).contains(controller.getName())) {
// Instead of returning the full path we compose it
// based on the last item as the container id
// This helps to avoid confusion within a privileged Docker container
// where the path is referred in /proc/<pid>/cgroup as
// /docker/<dcontainerid>/hadoop-yarn/<containerid>
// but it is /hadoop-yarn/<containerid> in the cgroups hierarchy
String cgroupPath = m.group(3);
if (cgroupPath != null) {
String cgroup =
new File(cgroupPath).toPath().getFileName().toString();
result[0] = cGroupsHandler.getRelativePathForCGroup(cgroup);
} else {
LOG.warn("Invalid cgroup path for " + pidCgroupFile);
}
return Result.Exit;
}
} else {
LOG.warn(
"Unexpected: cgroup file is not in the expected format"
+ " for process with pid " + pid);
}
return Result.Continue;
});
if (result[0] == null) {
throw new YarnException(controller.getName() + " CGroup for pid " + pid +
" not found " + pidCgroupFile);
}
return result[0];
}
private void processFile(File file, Function<String, Result> processLine)
throws YarnException {
// Read "procfsDir/<pid>/stat" file - typically /proc/<pid>/stat
try (InputStreamReader fReader = new InputStreamReader(
new FileInputStream(file), Charset.forName("UTF-8"))) {
try (BufferedReader in = new BufferedReader(fReader)) {
try {
String str;
while ((str = in.readLine()) != null) {
Result result = processLine.apply(str);
if (result == Result.Exit) {
return;
}
}
} catch (IOException io) {
throw new YarnException("Error reading the stream " + io, io);
}
}
} catch (IOException f) {
throw new YarnException("The process vanished in the interim " + pid, f);
}
}
void setCGroupFilePaths() throws YarnException {
if (cGroupsHandler == null) {
throw new YarnException("CGroups handler is not initialized");
}
File cpuDir = new File(
cGroupsHandler.getControllerPath(
CGroupsHandler.CGroupController.CPUACCT),
getCGroupRelativePath(CGroupsHandler.CGroupController.CPUACCT));
File memDir = new File(
cGroupsHandler.getControllerPath(
CGroupsHandler.CGroupController.MEMORY),
getCGroupRelativePath(CGroupsHandler.CGroupController.MEMORY));
cpuStat = new File(cpuDir, CPU_STAT);
memStat = new File(memDir, MEM_STAT);
memswStat = new File(memDir, MEMSW_STAT);
}
}

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
/**
* CombinedResourceCalculator is a resource calculator that uses cgroups but
* it is backward compatible with procfs in terms of virtual memory usage.
*/
public class CombinedResourceCalculator extends ResourceCalculatorProcessTree {
protected static final Log LOG = LogFactory
.getLog(CombinedResourceCalculator.class);
private ProcfsBasedProcessTree procfs;
private CGroupsResourceCalculator cgroup;
public CombinedResourceCalculator(String pid) {
super(pid);
procfs = new ProcfsBasedProcessTree(pid);
cgroup = new CGroupsResourceCalculator(pid);
}
@Override
public void initialize() throws YarnException {
procfs.initialize();
cgroup.initialize();
}
@Override
public void updateProcessTree() {
procfs.updateProcessTree();
cgroup.updateProcessTree();
}
@Override
public String getProcessTreeDump() {
return procfs.getProcessTreeDump();
}
@Override
public float getCpuUsagePercent() {
float cgroupUsage = cgroup.getCpuUsagePercent();
if (LOG.isDebugEnabled()) {
float procfsUsage = procfs.getCpuUsagePercent();
LOG.debug("CPU Comparison:" + procfsUsage + " " + cgroupUsage);
LOG.debug("Jiffy Comparison:" +
procfs.getCumulativeCpuTime() + " " +
cgroup.getCumulativeCpuTime());
}
return cgroupUsage;
}
@Override
public boolean checkPidPgrpidForMatch() {
return procfs.checkPidPgrpidForMatch();
}
@Override
public long getCumulativeCpuTime() {
if (LOG.isDebugEnabled()) {
LOG.debug("CPU Comparison:" +
procfs.getCumulativeCpuTime() + " " +
cgroup.getCumulativeCpuTime());
}
return cgroup.getCumulativeCpuTime();
}
@Override
public long getRssMemorySize(int olderThanAge) {
if (LOG.isDebugEnabled()) {
LOG.debug("MEM Comparison:" +
procfs.getRssMemorySize(olderThanAge) + " " +
cgroup.getRssMemorySize(olderThanAge));
}
return cgroup.getRssMemorySize(olderThanAge);
}
@Override
public long getVirtualMemorySize(int olderThanAge) {
if (LOG.isDebugEnabled()) {
LOG.debug("VMEM Comparison:" +
procfs.getVirtualMemorySize(olderThanAge) + " " +
cgroup.getVirtualMemorySize(olderThanAge));
}
return procfs.getVirtualMemorySize(olderThanAge);
}
}

View File

@ -25,7 +25,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
@ -101,7 +100,27 @@ public class ResourceHandlerModule {
return cGroupsHandler;
}
private static CGroupsCpuResourceHandlerImpl getCGroupsCpuResourceHandler(
public static NetworkPacketTaggingHandlerImpl
getNetworkResourceHandler() {
return networkPacketTaggingHandlerImpl;
}
public static DiskResourceHandler
getDiskResourceHandler() {
return cGroupsBlkioResourceHandler;
}
public static MemoryResourceHandler
getMemoryResourceHandler() {
return cGroupsMemoryResourceHandler;
}
public static CpuResourceHandler
getCpuResourceHandler() {
return cGroupsCpuResourceHandler;
}
private static CGroupsCpuResourceHandlerImpl initCGroupsCpuResourceHandler(
Configuration conf) throws ResourceHandlerException {
boolean cgroupsCpuEnabled =
conf.getBoolean(YarnConfiguration.NM_CPU_RESOURCE_ENABLED,
@ -150,7 +169,7 @@ public class ResourceHandlerModule {
}
}
public static ResourceHandler getNetworkResourceHandler(Configuration conf)
public static ResourceHandler initNetworkResourceHandler(Configuration conf)
throws ResourceHandlerException {
boolean useNetworkTagHandler = conf.getBoolean(
YarnConfiguration.NM_NETWORK_TAG_HANDLER_ENABLED,
@ -181,12 +200,12 @@ public class ResourceHandlerModule {
}
public static OutboundBandwidthResourceHandler
getOutboundBandwidthResourceHandler(Configuration conf)
initOutboundBandwidthResourceHandler(Configuration conf)
throws ResourceHandlerException {
return getTrafficControlBandwidthHandler(conf);
}
public static DiskResourceHandler getDiskResourceHandler(Configuration conf)
public static DiskResourceHandler initDiskResourceHandler(Configuration conf)
throws ResourceHandlerException {
if (conf.getBoolean(YarnConfiguration.NM_DISK_RESOURCE_ENABLED,
YarnConfiguration.DEFAULT_NM_DISK_RESOURCE_ENABLED)) {
@ -210,7 +229,7 @@ public class ResourceHandlerModule {
return cGroupsBlkioResourceHandler;
}
public static MemoryResourceHandler getMemoryResourceHandler(
public static MemoryResourceHandler initMemoryResourceHandler(
Configuration conf) throws ResourceHandlerException {
if (conf.getBoolean(YarnConfiguration.NM_MEMORY_RESOURCE_ENABLED,
YarnConfiguration.DEFAULT_NM_MEMORY_RESOURCE_ENABLED)) {
@ -246,10 +265,14 @@ public class ResourceHandlerModule {
throws ResourceHandlerException {
ArrayList<ResourceHandler> handlerList = new ArrayList<>();
addHandlerIfNotNull(handlerList, getNetworkResourceHandler(conf));
addHandlerIfNotNull(handlerList, getDiskResourceHandler(conf));
addHandlerIfNotNull(handlerList, getMemoryResourceHandler(conf));
addHandlerIfNotNull(handlerList, getCGroupsCpuResourceHandler(conf));
addHandlerIfNotNull(handlerList,
initNetworkResourceHandler(conf));
addHandlerIfNotNull(handlerList,
initDiskResourceHandler(conf));
addHandlerIfNotNull(handlerList,
initMemoryResourceHandler(conf));
addHandlerIfNotNull(handlerList,
initCGroupsCpuResourceHandler(conf));
addHandlersFromConfiguredResourcePlugins(handlerList, conf, nmContext);
resourceHandlerChain = new ResourceHandlerChain(handlerList);
}

View File

@ -215,15 +215,25 @@ public class ContainersMonitorImpl extends AbstractService implements
YarnConfiguration.DEFAULT_NM_CONTAINER_MONITOR_ENABLED);
}
/**
* Get the best process tree calculator.
* @param pId container process id
* @return process tree calculator
*/
private ResourceCalculatorProcessTree
getResourceCalculatorProcessTree(String pId) {
return ResourceCalculatorProcessTree.
getResourceCalculatorProcessTree(
pId, processTreeClass, conf);
}
private boolean isResourceCalculatorAvailable() {
if (resourceCalculatorPlugin == null) {
LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this
.getClass().getName() + " is disabled.");
return false;
}
if (ResourceCalculatorProcessTree
.getResourceCalculatorProcessTree("0", processTreeClass, conf)
== null) {
if (getResourceCalculatorProcessTree("0") == null) {
LOG.info("ResourceCalculatorProcessTree is unavailable on this system. "
+ this.getClass().getName() + " is disabled.");
return false;
@ -535,9 +545,7 @@ public class ContainersMonitorImpl extends AbstractService implements
LOG.debug("Tracking ProcessTree " + pId + " for the first time");
}
ResourceCalculatorProcessTree pt =
ResourceCalculatorProcessTree.
getResourceCalculatorProcessTree(
pId, processTreeClass, conf);
getResourceCalculatorProcessTree(pId);
ptInfo.setPid(pId);
ptInfo.setProcessTree(pt);
@ -599,11 +607,14 @@ public class ContainersMonitorImpl extends AbstractService implements
long pmemLimit = ptInfo.getPmemLimit();
if (AUDITLOG.isDebugEnabled()) {
AUDITLOG.debug(String.format(
"Memory usage of ProcessTree %s for container-id %s: ",
pId, containerId.toString()) +
"Resource usage of ProcessTree %s for container-id %s:" +
" %s CPU:%f CPU/core:%f",
pId, containerId.toString(),
formatUsageString(
currentVmemUsage, vmemLimit,
currentPmemUsage, pmemLimit));
currentPmemUsage, pmemLimit),
cpuUsagePercentPerCore,
cpuUsageTotalCoresPercentage));
}
// Add resource utilization for this container

View File

@ -148,6 +148,51 @@ public class TestCGroupsMemoryResourceHandlerImpl {
args.get(0));
}
@Test
public void testPreStartNonEnforced() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
conf.setBoolean(YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED, false);
cGroupsMemoryResourceHandler.bootstrap(conf);
String id = "container_01_01";
String path = "test-path/" + id;
ContainerId mockContainerId = mock(ContainerId.class);
when(mockContainerId.toString()).thenReturn(id);
Container mockContainer = mock(Container.class);
when(mockContainer.getContainerId()).thenReturn(mockContainerId);
when(mockCGroupsHandler
.getPathForCGroupTasks(CGroupsHandler.CGroupController.MEMORY, id))
.thenReturn(path);
int memory = 1024;
when(mockContainer.getResource())
.thenReturn(Resource.newInstance(memory, 1));
List<PrivilegedOperation> ret =
cGroupsMemoryResourceHandler.preStart(mockContainer);
verify(mockCGroupsHandler, times(1))
.createCGroup(CGroupsHandler.CGroupController.MEMORY, id);
verify(mockCGroupsHandler, times(0))
.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, id,
CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES,
String.valueOf(memory) + "M");
verify(mockCGroupsHandler, times(0))
.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, id,
CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES,
String.valueOf((int) (memory * 0.9)) + "M");
verify(mockCGroupsHandler, times(0))
.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, id,
CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS, String.valueOf(0));
Assert.assertNotNull(ret);
Assert.assertEquals(1, ret.size());
PrivilegedOperation op = ret.get(0);
Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
op.getOperationType());
List<String> args = op.getArguments();
Assert.assertEquals(1, args.size());
Assert.assertEquals(PrivilegedOperation.CGROUP_ARG_PREFIX + path,
args.get(0));
}
@Test
public void testReacquireContainer() throws Exception {
ContainerId containerIdMock = mock(ContainerId.class);

View File

@ -0,0 +1,274 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import static org.mockito.Mockito.*;
/**
* Unit test for CGroupsResourceCalculator.
*/
public class TestCGroupsResourceCalculator {
private ControlledClock clock = new ControlledClock();
private CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
private String basePath = "/tmp/" + this.getClass().getName();
public TestCGroupsResourceCalculator() {
when(cGroupsHandler.getRelativePathForCGroup("container_1"))
.thenReturn("/yarn/container_1");
when(cGroupsHandler.getRelativePathForCGroup("")).thenReturn("/yarn/");
}
@Test(expected = YarnException.class)
public void testPidNotFound() throws Exception {
CGroupsResourceCalculator calculator =
new CGroupsResourceCalculator(
"1234", ".", cGroupsHandler, clock, 10);
calculator.setCGroupFilePaths();
Assert.assertEquals("Expected exception", null, calculator);
}
@Test(expected = YarnException.class)
public void testNoMemoryCGgroupMount() throws Exception {
File procfs = new File(basePath + "/1234");
Assert.assertTrue("Setup error", procfs.mkdirs());
try {
FileUtils.writeStringToFile(
new File(procfs, CGroupsResourceCalculator.CGROUP),
"7:devices:/yarn/container_1\n" +
"6:cpuacct,cpu:/yarn/container_1\n" +
"5:pids:/yarn/container_1\n");
CGroupsResourceCalculator calculator =
new CGroupsResourceCalculator(
"1234", basePath,
cGroupsHandler, clock, 10);
calculator.setCGroupFilePaths();
Assert.assertEquals("Expected exception", null, calculator);
} finally {
FileUtils.deleteDirectory(new File(basePath));
}
}
@Test
public void testCGgroupNotFound() throws Exception {
File procfs = new File(basePath + "/1234");
Assert.assertTrue("Setup error", procfs.mkdirs());
try {
FileUtils.writeStringToFile(
new File(procfs, CGroupsResourceCalculator.CGROUP),
"7:devices:/yarn/container_1\n" +
"6:cpuacct,cpu:/yarn/container_1\n" +
"5:pids:/yarn/container_1\n" +
"4:memory:/yarn/container_1\n");
CGroupsResourceCalculator calculator =
new CGroupsResourceCalculator(
"1234", basePath,
cGroupsHandler, clock, 10);
calculator.setCGroupFilePaths();
calculator.updateProcessTree();
Assert.assertEquals("cgroups should be missing",
(long)ResourceCalculatorProcessTree.UNAVAILABLE,
calculator.getRssMemorySize(0));
} finally {
FileUtils.deleteDirectory(new File(basePath));
}
}
@Test
public void testCPUParsing() throws Exception {
File cgcpuacctDir =
new File(basePath + "/cgcpuacct");
File cgcpuacctContainerDir =
new File(cgcpuacctDir, "/yarn/container_1");
File procfs = new File(basePath + "/1234");
when(cGroupsHandler.getControllerPath(
CGroupsHandler.CGroupController.CPUACCT)).
thenReturn(cgcpuacctDir.getAbsolutePath());
Assert.assertTrue("Setup error", procfs.mkdirs());
Assert.assertTrue("Setup error", cgcpuacctContainerDir.mkdirs());
try {
FileUtils.writeStringToFile(
new File(procfs, CGroupsResourceCalculator.CGROUP),
"7:devices:/yarn/container_1\n" +
"6:cpuacct,cpu:/yarn/container_1\n" +
"5:pids:/yarn/container_1\n" +
"4:memory:/yarn/container_1\n");
FileUtils.writeStringToFile(
new File(cgcpuacctContainerDir, CGroupsResourceCalculator.CPU_STAT),
"Can you handle this?\n" +
"user 5415\n" +
"system 3632");
CGroupsResourceCalculator calculator =
new CGroupsResourceCalculator(
"1234", basePath,
cGroupsHandler, clock, 10);
calculator.setCGroupFilePaths();
calculator.updateProcessTree();
Assert.assertEquals("Incorrect CPU usage",
90470,
calculator.getCumulativeCpuTime());
} finally {
FileUtils.deleteDirectory(new File(basePath));
}
}
@Test
public void testMemoryParsing() throws Exception {
File cgcpuacctDir =
new File(basePath + "/cgcpuacct");
File cgcpuacctContainerDir =
new File(cgcpuacctDir, "/yarn/container_1");
File cgmemoryDir =
new File(basePath + "/memory");
File cgMemoryContainerDir =
new File(cgmemoryDir, "/yarn/container_1");
File procfs = new File(basePath + "/1234");
when(cGroupsHandler.getControllerPath(
CGroupsHandler.CGroupController.MEMORY)).
thenReturn(cgmemoryDir.getAbsolutePath());
Assert.assertTrue("Setup error", procfs.mkdirs());
Assert.assertTrue("Setup error", cgcpuacctContainerDir.mkdirs());
Assert.assertTrue("Setup error", cgMemoryContainerDir.mkdirs());
try {
FileUtils.writeStringToFile(
new File(procfs, CGroupsResourceCalculator.CGROUP),
"6:cpuacct,cpu:/yarn/container_1\n" +
"4:memory:/yarn/container_1\n");
FileUtils.writeStringToFile(
new File(cgMemoryContainerDir, CGroupsResourceCalculator.MEM_STAT),
"418496512\n");
CGroupsResourceCalculator calculator =
new CGroupsResourceCalculator(
"1234", basePath,
cGroupsHandler, clock, 10);
calculator.setCGroupFilePaths();
calculator.updateProcessTree();
// Test the case where memsw is not available (Ubuntu)
Assert.assertEquals("Incorrect memory usage",
418496512,
calculator.getRssMemorySize());
Assert.assertEquals("Incorrect swap usage",
(long)ResourceCalculatorProcessTree.UNAVAILABLE,
calculator.getVirtualMemorySize());
// Test the case where memsw is available
FileUtils.writeStringToFile(
new File(cgMemoryContainerDir, CGroupsResourceCalculator.MEMSW_STAT),
"418496513\n");
calculator.updateProcessTree();
Assert.assertEquals("Incorrect swap usage",
418496513,
calculator.getVirtualMemorySize());
} finally {
FileUtils.deleteDirectory(new File(basePath));
}
}
@Test
public void testCPUParsingRoot() throws Exception {
File cgcpuacctDir =
new File(basePath + "/cgcpuacct");
File cgcpuacctRootDir =
new File(cgcpuacctDir, "/yarn");
when(cGroupsHandler.getControllerPath(
CGroupsHandler.CGroupController.CPUACCT)).
thenReturn(cgcpuacctDir.getAbsolutePath());
Assert.assertTrue("Setup error", cgcpuacctRootDir.mkdirs());
try {
FileUtils.writeStringToFile(
new File(cgcpuacctRootDir, CGroupsResourceCalculator.CPU_STAT),
"user 5415\n" +
"system 3632");
CGroupsResourceCalculator calculator =
new CGroupsResourceCalculator(
null, basePath,
cGroupsHandler, clock, 10);
calculator.setCGroupFilePaths();
calculator.updateProcessTree();
Assert.assertEquals("Incorrect CPU usage",
90470,
calculator.getCumulativeCpuTime());
} finally {
FileUtils.deleteDirectory(new File(basePath));
}
}
@Test
public void testMemoryParsingRoot() throws Exception {
File cgcpuacctDir =
new File(basePath + "/cgcpuacct");
File cgcpuacctRootDir =
new File(cgcpuacctDir, "/yarn");
File cgmemoryDir =
new File(basePath + "/memory");
File cgMemoryRootDir =
new File(cgmemoryDir, "/yarn");
File procfs = new File(basePath + "/1234");
when(cGroupsHandler.getControllerPath(
CGroupsHandler.CGroupController.MEMORY)).
thenReturn(cgmemoryDir.getAbsolutePath());
Assert.assertTrue("Setup error", procfs.mkdirs());
Assert.assertTrue("Setup error", cgcpuacctRootDir.mkdirs());
Assert.assertTrue("Setup error", cgMemoryRootDir.mkdirs());
try {
FileUtils.writeStringToFile(
new File(cgMemoryRootDir, CGroupsResourceCalculator.MEM_STAT),
"418496512\n");
CGroupsResourceCalculator calculator =
new CGroupsResourceCalculator(
null, basePath,
cGroupsHandler, clock, 10);
calculator.setCGroupFilePaths();
calculator.updateProcessTree();
// Test the case where memsw is not available (Ubuntu)
Assert.assertEquals("Incorrect memory usage",
418496512,
calculator.getRssMemorySize());
Assert.assertEquals("Incorrect swap usage",
(long)ResourceCalculatorProcessTree.UNAVAILABLE,
calculator.getVirtualMemorySize());
// Test the case where memsw is available
FileUtils.writeStringToFile(
new File(cgMemoryRootDir, CGroupsResourceCalculator.MEMSW_STAT),
"418496513\n");
calculator.updateProcessTree();
Assert.assertEquals("Incorrect swap usage",
418496513,
calculator.getVirtualMemorySize());
} finally {
FileUtils.deleteDirectory(new File(basePath));
}
}
}

View File

@ -0,0 +1,227 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.commons.lang3.SystemUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.junit.*;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Random;
import static org.mockito.Mockito.mock;
/**
* Functional test for CGroupsResourceCalculator to compare two resource
* calculators. It is OS dependent.
* Ignored in automated tests due to flakiness by design.
*/
public class TestCompareResourceCalculators {
private Process target = null;
private String cgroup = null;
private String cgroupCPU = null;
private String cgroupMemory = null;
public static final long SHMEM_KB = 100 * 1024;
@Before
public void setup() throws IOException, YarnException {
Assume.assumeTrue(SystemUtils.IS_OS_LINUX);
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_HIERARCHY,
"TestCompareResourceCalculators");
conf.setBoolean(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_MOUNT, false);
conf.setStrings(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH,
"/sys/fs/cgroup");
conf.setBoolean(YarnConfiguration.NM_CPU_RESOURCE_ENABLED, true);
ResourceHandlerChain module = null;
try {
module = ResourceHandlerModule.getConfiguredResourceHandlerChain(conf,
mock(Context.class));
} catch (ResourceHandlerException e) {
throw new YarnException("Cannot access cgroups", e);
}
Assume.assumeNotNull(module);
Assume.assumeNotNull(
ResourceHandlerModule.getCGroupsHandler()
.getControllerPath(CGroupsHandler.CGroupController.CPU));
Assume.assumeNotNull(
ResourceHandlerModule.getCGroupsHandler()
.getControllerPath(CGroupsHandler.CGroupController.MEMORY));
Random random = new Random(System.currentTimeMillis());
cgroup = Long.toString(random.nextLong());
cgroupCPU = ResourceHandlerModule.getCGroupsHandler()
.getPathForCGroup(CGroupsHandler.CGroupController.CPU, cgroup);
cgroupMemory = ResourceHandlerModule.getCGroupsHandler()
.getPathForCGroup(CGroupsHandler.CGroupController.MEMORY, cgroup);
}
@After
public void tearDown() throws YarnException {
stopTestProcess();
}
// Ignored in automated tests due to flakiness by design
@Ignore
@Test
public void testCompareResults()
throws YarnException, InterruptedException, IOException {
startTestProcess();
ProcfsBasedProcessTree legacyCalculator =
new ProcfsBasedProcessTree(Long.toString(getPid()));
CGroupsResourceCalculator cgroupsCalculator =
new CGroupsResourceCalculator(Long.toString(getPid()));
cgroupsCalculator.setCGroupFilePaths();
for (int i = 0; i < 5; ++i) {
Thread.sleep(3000);
compareMetrics(legacyCalculator, cgroupsCalculator);
}
stopTestProcess();
ensureCleanedUp(legacyCalculator, cgroupsCalculator);
}
private void ensureCleanedUp(
ResourceCalculatorProcessTree metric1,
ResourceCalculatorProcessTree metric2) {
metric1.updateProcessTree();
metric2.updateProcessTree();
long pmem1 = metric1.getRssMemorySize(0);
long pmem2 = metric2.getRssMemorySize(0);
System.out.println(pmem1 + " " + pmem2);
Assert.assertTrue("pmem should be invalid " + pmem1 + " " + pmem2,
pmem1 == ResourceCalculatorProcessTree.UNAVAILABLE &&
pmem2 == ResourceCalculatorProcessTree.UNAVAILABLE);
long vmem1 = metric1.getRssMemorySize(0);
long vmem2 = metric2.getRssMemorySize(0);
System.out.println(vmem1 + " " + vmem2);
Assert.assertTrue("vmem Error outside range " + vmem1 + " " + vmem2,
vmem1 == ResourceCalculatorProcessTree.UNAVAILABLE &&
vmem2 == ResourceCalculatorProcessTree.UNAVAILABLE);
float cpu1 = metric1.getCpuUsagePercent();
float cpu2 = metric2.getCpuUsagePercent();
// TODO ProcfsBasedProcessTree may report negative on process exit
Assert.assertTrue("CPU% Error outside range " + cpu1 + " " + cpu2,
cpu1 == 0 && cpu2 == 0);
}
private void compareMetrics(
ResourceCalculatorProcessTree metric1,
ResourceCalculatorProcessTree metric2) {
metric1.updateProcessTree();
metric2.updateProcessTree();
long pmem1 = metric1.getRssMemorySize(0);
long pmem2 = metric2.getRssMemorySize(0);
// TODO The calculation is different and cgroup
// can report a small amount after process stop
// This is not an issue since the cgroup is deleted
System.out.println(pmem1 + " " + (pmem2 - SHMEM_KB * 1024));
Assert.assertTrue("pmem Error outside range " + pmem1 + " " + pmem2,
Math.abs(pmem1 - (pmem2 - SHMEM_KB * 1024)) < 5000000);
long vmem1 = metric1.getRssMemorySize(0);
long vmem2 = metric2.getRssMemorySize(0);
System.out.println(vmem1 + " " + (vmem2 - SHMEM_KB * 1024));
// TODO The calculation is different and cgroup
// can report a small amount after process stop
// This is not an issue since the cgroup is deleted
Assert.assertTrue("vmem Error outside range " + vmem1 + " " + vmem2,
Math.abs(vmem1 - (vmem2 - SHMEM_KB * 1024)) < 5000000);
float cpu1 = metric1.getCpuUsagePercent();
float cpu2 = metric2.getCpuUsagePercent();
if (cpu1 > 0) {
// TODO ProcfsBasedProcessTree may report negative on process exit
Assert.assertTrue("CPU% Error outside range " + cpu1 + " " + cpu2,
Math.abs(cpu2 - cpu1) < 10);
}
}
private void startTestProcess() throws IOException {
ProcessBuilder builder = new ProcessBuilder();
String script =
"mkdir -p " + cgroupCPU + ";" +
"echo $$ >" + cgroupCPU + "/tasks;" +
"mkdir -p " + cgroupMemory + ";" +
"echo $$ >" + cgroupMemory + "/tasks;" +
"dd if=/dev/zero of=/dev/shm/" +
cgroup + " bs=1k count=" + SHMEM_KB + ";" +
"dd if=/dev/zero of=/dev/null bs=1k &" +
"echo $! >/tmp/\" + cgroup + \".pid;" +
//"echo while [ -f /tmp/" + cgroup + ".pid ]; do sleep 1; done;" +
"sleep 10000;" +
"echo kill $(jobs -p);";
builder.command("bash", "-c", script);
builder.redirectError(new File("/tmp/a.txt"));
builder.redirectOutput(new File("/tmp/b.txt"));
target = builder.start();
}
private void stopTestProcess() throws YarnException {
if (target != null) {
target.destroyForcibly();
target = null;
}
try {
ProcessBuilder builder = new ProcessBuilder();
String script =
"rm -f /dev/shm/" + cgroup + ";" +
"cat " + cgroupCPU + "/tasks | xargs kill;" +
"rm -f /tmp/" + cgroup + ".pid;" +
"sleep 4;" +
"rmdir " + cgroupCPU + ";" +
"rmdir " + cgroupMemory + ";";
builder.command("bash", "-c", script);
Process cleanup = builder.start();
cleanup.waitFor();
} catch (IOException|InterruptedException e) {
throw new YarnException("Could not clean up", e);
}
}
private long getPid() throws YarnException {
Class processClass = target.getClass();
if (processClass.getName().equals("java.lang.UNIXProcess")) {
try {
Field pidField = processClass.getDeclaredField("pid");
pidField.setAccessible(true);
long pid = pidField.getLong(target);
pidField.setAccessible(false);
return pid;
} catch (NoSuchFieldException|IllegalAccessException e) {
throw new YarnException("Reflection error", e);
}
} else {
throw new YarnException("Not Unix " + processClass.getName());
}
}
}

View File

@ -36,8 +36,8 @@ import static org.mockito.Mockito.mock;
public class TestResourceHandlerModule {
private static final Logger LOG =
LoggerFactory.getLogger(TestResourceHandlerModule.class);
Configuration emptyConf;
Configuration networkEnabledConf;
private Configuration emptyConf;
private Configuration networkEnabledConf;
@Before
public void setup() throws Exception {
@ -55,23 +55,28 @@ public class TestResourceHandlerModule {
//This resourceHandler should be non-null only if network as a resource
//is explicitly enabled
OutboundBandwidthResourceHandler resourceHandler = ResourceHandlerModule
.getOutboundBandwidthResourceHandler(emptyConf);
.initOutboundBandwidthResourceHandler(emptyConf);
Assert.assertNull(resourceHandler);
//When network as a resource is enabled this should be non-null
resourceHandler = ResourceHandlerModule
.getOutboundBandwidthResourceHandler(networkEnabledConf);
.initOutboundBandwidthResourceHandler(networkEnabledConf);
Assert.assertNotNull(resourceHandler);
//Ensure that outbound bandwidth resource handler is present in the chain
ResourceHandlerChain resourceHandlerChain = ResourceHandlerModule
.getConfiguredResourceHandlerChain(networkEnabledConf, mock(Context.class));
.getConfiguredResourceHandlerChain(networkEnabledConf,
mock(Context.class));
if (resourceHandlerChain != null) {
List<ResourceHandler> resourceHandlers = resourceHandlerChain
.getResourceHandlerList();
//Exactly one resource handler in chain
Assert.assertEquals(resourceHandlers.size(), 1);
//Same instance is expected to be in the chain.
Assert.assertTrue(resourceHandlers.get(0) == resourceHandler);
} else {
Assert.fail("Null returned");
}
} catch (ResourceHandlerException e) {
Assert.fail("Unexpected ResourceHandlerException: " + e);
}
@ -81,23 +86,27 @@ public class TestResourceHandlerModule {
public void testDiskResourceHandler() throws Exception {
DiskResourceHandler handler =
ResourceHandlerModule.getDiskResourceHandler(emptyConf);
ResourceHandlerModule.initDiskResourceHandler(emptyConf);
Assert.assertNull(handler);
Configuration diskConf = new YarnConfiguration();
diskConf.setBoolean(YarnConfiguration.NM_DISK_RESOURCE_ENABLED, true);
handler = ResourceHandlerModule.getDiskResourceHandler(diskConf);
handler = ResourceHandlerModule.initDiskResourceHandler(diskConf);
Assert.assertNotNull(handler);
ResourceHandlerChain resourceHandlerChain =
ResourceHandlerModule.getConfiguredResourceHandlerChain(diskConf,
mock(Context.class));
if (resourceHandlerChain != null) {
List<ResourceHandler> resourceHandlers =
resourceHandlerChain.getResourceHandlerList();
// Exactly one resource handler in chain
Assert.assertEquals(resourceHandlers.size(), 1);
// Same instance is expected to be in the chain.
Assert.assertTrue(resourceHandlers.get(0) == handler);
} else {
Assert.fail("Null returned");
}
}
}