YARN-3542. Refactored existing CPU cgroups support to use the newer and integrated ResourceHandler mechanism, and also deprecated the old LCEResourceHandler inteface hierarchy. Contributed by Varun Vasudev.

(cherry picked from commit 2085e60a96)
This commit is contained in:
Vinod Kumar Vavilapalli (I am also known as @tshooter.) 2016-01-25 16:19:03 -08:00 committed by Vinod Kumar Vavilapalli
parent 97f6cfa94b
commit 47b20d0c74
13 changed files with 661 additions and 58 deletions

View File

@ -49,6 +49,10 @@ Release 2.9.0 - UNRELEASED
YARN-4496. Improve HA ResourceManager Failover detection on the client. YARN-4496. Improve HA ResourceManager Failover detection on the client.
(Jian He via xgong) (Jian He via xgong)
YARN-3542. Refactored existing CPU cgroups support to use the newer and
integrated ResourceHandler mechanism, and also deprecated the old
LCEResourceHandler inteface hierarchy. (Varun Vasudev via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -938,6 +938,18 @@ public class YarnConfiguration extends Configuration {
DEFAULT_NM_MEMORY_RESOURCE_CGROUPS_SOFT_LIMIT_PERCENTAGE = DEFAULT_NM_MEMORY_RESOURCE_CGROUPS_SOFT_LIMIT_PERCENTAGE =
90.0f; 90.0f;
@Private
public static final String NM_CPU_RESOURCE_PREFIX = NM_PREFIX
+ "resource.cpu.";
/** Enable cpu isolation. */
@Private
public static final String NM_CPU_RESOURCE_ENABLED =
NM_CPU_RESOURCE_PREFIX + "enabled";
@Private
public static final boolean DEFAULT_NM_CPU_RESOURCE_ENABLED = false;
/** /**
* Prefix for disk configurations. Work in progress: This configuration * Prefix for disk configurations. Work in progress: This configuration
* parameter may be changed/removed in the future. * parameter may be changed/removed in the future.

View File

@ -111,6 +111,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
.add(YarnConfiguration.NM_DISK_RESOURCE_ENABLED); .add(YarnConfiguration.NM_DISK_RESOURCE_ENABLED);
configurationPrefixToSkipCompare configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_MEMORY_RESOURCE_PREFIX); .add(YarnConfiguration.NM_MEMORY_RESOURCE_PREFIX);
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED);
// Set by container-executor.cfg // Set by container-executor.cfg
configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR); configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler;
import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler; import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler; import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
@ -95,10 +96,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
super.setConf(conf); super.setConf(conf);
resourcesHandler = ReflectionUtils.newInstance( resourcesHandler = getResourcesHandler(conf);
conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf);
resourcesHandler.setConf(conf);
if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY)
!= null) { != null) {
@ -122,6 +120,23 @@ public class LinuxContainerExecutor extends ContainerExecutor {
} }
} }
private LCEResourcesHandler getResourcesHandler(Configuration conf) {
LCEResourcesHandler handler = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf);
// Stop using CgroupsLCEResourcesHandler
// use the resource handler chain instead
// ResourceHandlerModule will create the cgroup cpu module if
// CgroupsLCEResourcesHandler is set
if (handler instanceof CgroupsLCEResourcesHandler) {
handler =
ReflectionUtils.newInstance(DefaultLCEResourcesHandler.class, conf);
}
handler.setConf(conf);
return handler;
}
void verifyUsernamePattern(String user) { void verifyUsernamePattern(String user) {
if (!UserGroupInformation.isSecurityEnabled() && if (!UserGroupInformation.isSecurityEnabled() &&
!nonsecureLocalUserPattern.matcher(user).matches()) { !nonsecureLocalUserPattern.matcher(user).matches()) {
@ -184,7 +199,12 @@ public class LinuxContainerExecutor extends ContainerExecutor {
try { try {
resourceHandlerChain = ResourceHandlerModule resourceHandlerChain = ResourceHandlerModule
.getConfiguredResourceHandlerChain(conf); .getConfiguredResourceHandlerChain(conf);
if (LOG.isDebugEnabled()) {
LOG.debug("Resource handler chain enabled = " + (resourceHandlerChain
== null));
}
if (resourceHandlerChain != null) { if (resourceHandlerChain != null) {
LOG.debug("Bootstrapping resource handler chain");
resourceHandlerChain.bootstrap(conf); resourceHandlerChain.bootstrap(conf);
} }
} catch (ResourceHandlerException e) { } catch (ResourceHandlerException e) {

View File

@ -0,0 +1,235 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* An implementation for using CGroups to restrict CPU usage on Linux. The
* implementation supports 3 different controls - restrict usage of all YARN
* containers, restrict relative usage of individual YARN containers and
* restrict usage of individual YARN containers. Admins can set the overall CPU
* to be used by all YARN containers - this is implemented by setting
* cpu.cfs_period_us and cpu.cfs_quota_us to the ratio desired. If strict
* resource usage mode is not enabled, cpu.shares is set for individual
* containers - this prevents containers from exceeding the overall limit for
* YARN containers but individual containers can use as much of the CPU as
* available(under the YARN limit). If strict resource usage is enabled, then
* container can only use the percentage of CPU allocated to them and this is
* again implemented using cpu.cfs_period_us and cpu.cfs_quota_us.
*
*/
@InterfaceStability.Unstable
@InterfaceAudience.Private
public class CGroupsCpuResourceHandlerImpl implements CpuResourceHandler {
static final Log LOG = LogFactory.getLog(CGroupsCpuResourceHandlerImpl.class);
private CGroupsHandler cGroupsHandler;
private boolean strictResourceUsageMode = false;
private float yarnProcessors;
private int nodeVCores;
private static final CGroupsHandler.CGroupController CPU =
CGroupsHandler.CGroupController.CPU;
@VisibleForTesting
static final int MAX_QUOTA_US = 1000 * 1000;
@VisibleForTesting
static final int MIN_PERIOD_US = 1000;
@VisibleForTesting
static final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel
CGroupsCpuResourceHandlerImpl(CGroupsHandler cGroupsHandler) {
this.cGroupsHandler = cGroupsHandler;
}
@Override
public List<PrivilegedOperation> bootstrap(Configuration conf)
throws ResourceHandlerException {
return bootstrap(
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf), conf);
}
@VisibleForTesting
List<PrivilegedOperation> bootstrap(
ResourceCalculatorPlugin plugin, Configuration conf)
throws ResourceHandlerException {
this.strictResourceUsageMode = conf.getBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE);
this.cGroupsHandler.mountCGroupController(CPU);
nodeVCores = NodeManagerHardwareUtils.getVCores(plugin, conf);
// cap overall usage to the number of cores allocated to YARN
yarnProcessors = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
int systemProcessors = NodeManagerHardwareUtils.getNodeCPUs(plugin, conf);
boolean existingCpuLimits;
try {
existingCpuLimits =
cpuLimitsExist(cGroupsHandler.getPathForCGroup(CPU, ""));
} catch (IOException ie) {
throw new ResourceHandlerException(ie);
}
if (systemProcessors != (int) yarnProcessors) {
LOG.info("YARN containers restricted to " + yarnProcessors + " cores");
int[] limits = getOverallLimits(yarnProcessors);
cGroupsHandler
.updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_PERIOD_US,
String.valueOf(limits[0]));
cGroupsHandler
.updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_QUOTA_US,
String.valueOf(limits[1]));
} else if (existingCpuLimits) {
LOG.info("Removing CPU constraints for YARN containers.");
cGroupsHandler
.updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_QUOTA_US,
String.valueOf(-1));
}
return null;
}
@InterfaceAudience.Private
public static boolean cpuLimitsExist(String path)
throws IOException {
File quotaFile = new File(path,
CPU.getName() + "." + CGroupsHandler.CGROUP_CPU_QUOTA_US);
if (quotaFile.exists()) {
String contents = FileUtils.readFileToString(quotaFile, "UTF-8");
int quotaUS = Integer.parseInt(contents.trim());
if (quotaUS != -1) {
return true;
}
}
return false;
}
@VisibleForTesting
@InterfaceAudience.Private
public static int[] getOverallLimits(float yarnProcessors) {
int[] ret = new int[2];
if (yarnProcessors < 0.01f) {
throw new IllegalArgumentException("Number of processors can't be <= 0.");
}
int quotaUS = MAX_QUOTA_US;
int periodUS = (int) (MAX_QUOTA_US / yarnProcessors);
if (yarnProcessors < 1.0f) {
periodUS = MAX_QUOTA_US;
quotaUS = (int) (periodUS * yarnProcessors);
if (quotaUS < MIN_PERIOD_US) {
LOG.warn("The quota calculated for the cgroup was too low."
+ " The minimum value is " + MIN_PERIOD_US
+ ", calculated value is " + quotaUS
+ ". Setting quota to minimum value.");
quotaUS = MIN_PERIOD_US;
}
}
// cfs_period_us can't be less than 1000 microseconds
// if the value of periodUS is less than 1000, we can't really use cgroups
// to limit cpu
if (periodUS < MIN_PERIOD_US) {
LOG.warn("The period calculated for the cgroup was too low."
+ " The minimum value is " + MIN_PERIOD_US
+ ", calculated value is " + periodUS
+ ". Using all available CPU.");
periodUS = MAX_QUOTA_US;
quotaUS = -1;
}
ret[0] = periodUS;
ret[1] = quotaUS;
return ret;
}
@Override
public List<PrivilegedOperation> preStart(Container container)
throws ResourceHandlerException {
String cgroupId = container.getContainerId().toString();
Resource containerResource = container.getResource();
cGroupsHandler.createCGroup(CPU, cgroupId);
try {
int containerVCores = containerResource.getVirtualCores();
int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores;
cGroupsHandler
.updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_SHARES,
String.valueOf(cpuShares));
if (strictResourceUsageMode) {
if (nodeVCores != containerVCores) {
float containerCPU =
(containerVCores * yarnProcessors) / (float) nodeVCores;
int[] limits = getOverallLimits(containerCPU);
cGroupsHandler.updateCGroupParam(CPU, cgroupId,
CGroupsHandler.CGROUP_CPU_PERIOD_US, String.valueOf(limits[0]));
cGroupsHandler.updateCGroupParam(CPU, cgroupId,
CGroupsHandler.CGROUP_CPU_QUOTA_US, String.valueOf(limits[1]));
}
}
} catch (ResourceHandlerException re) {
cGroupsHandler.deleteCGroup(CPU, cgroupId);
LOG.warn("Could not update cgroup for container", re);
throw re;
}
List<PrivilegedOperation> ret = new ArrayList<>();
ret.add(new PrivilegedOperation(
PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupsHandler
.getPathForCGroupTasks(CPU, cgroupId)));
return ret;
}
@Override
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
throws ResourceHandlerException {
return null;
}
@Override
public List<PrivilegedOperation> postComplete(ContainerId containerId)
throws ResourceHandlerException {
cGroupsHandler.deleteCGroup(CPU, containerId.toString());
return null;
}
@Override public List<PrivilegedOperation> teardown()
throws ResourceHandlerException {
return null;
}
}

View File

@ -58,6 +58,10 @@ public interface CGroupsHandler {
String CGROUP_PARAM_MEMORY_SWAPPINESS = "swappiness"; String CGROUP_PARAM_MEMORY_SWAPPINESS = "swappiness";
String CGROUP_CPU_PERIOD_US = "cfs_period_us";
String CGROUP_CPU_QUOTA_US = "cfs_quota_us";
String CGROUP_CPU_SHARES = "shares";
/** /**
* Mounts a cgroup controller * Mounts a cgroup controller
* @param controller - the controller being mounted * @param controller - the controller being mounted

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Resource handler for cpu resources.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface CpuResourceHandler extends ResourceHandler {
}

View File

@ -21,11 +21,15 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler;
import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -38,6 +42,7 @@ import java.util.List;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class ResourceHandlerModule { public class ResourceHandlerModule {
static final Log LOG = LogFactory.getLog(ResourceHandlerModule.class);
private static volatile ResourceHandlerChain resourceHandlerChain; private static volatile ResourceHandlerChain resourceHandlerChain;
/** /**
@ -52,6 +57,8 @@ public class ResourceHandlerModule {
cGroupsBlkioResourceHandler; cGroupsBlkioResourceHandler;
private static volatile CGroupsMemoryResourceHandlerImpl private static volatile CGroupsMemoryResourceHandlerImpl
cGroupsMemoryResourceHandler; cGroupsMemoryResourceHandler;
private static volatile CGroupsCpuResourceHandlerImpl
cGroupsCpuResourceHandler;
/** /**
* Returns an initialized, thread-safe CGroupsHandler instance. * Returns an initialized, thread-safe CGroupsHandler instance.
@ -70,6 +77,30 @@ public class ResourceHandlerModule {
return cGroupsHandler; return cGroupsHandler;
} }
private static CGroupsCpuResourceHandlerImpl getcGroupsCpuResourceHandler(
Configuration conf) throws ResourceHandlerException {
boolean cgroupsCpuEnabled =
conf.getBoolean(YarnConfiguration.NM_CPU_RESOURCE_ENABLED,
YarnConfiguration.DEFAULT_NM_CPU_RESOURCE_ENABLED);
boolean cgroupsLCEResourcesHandlerEnabled =
conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
DefaultLCEResourcesHandler.class)
.equals(CgroupsLCEResourcesHandler.class);
if (cgroupsCpuEnabled || cgroupsLCEResourcesHandlerEnabled) {
if (cGroupsCpuResourceHandler == null) {
synchronized (CpuResourceHandler.class) {
if (cGroupsCpuResourceHandler == null) {
LOG.debug("Creating new cgroups cpu handler");
cGroupsCpuResourceHandler =
new CGroupsCpuResourceHandlerImpl(getCGroupsHandler(conf));
return cGroupsCpuResourceHandler;
}
}
}
}
return null;
}
private static TrafficControlBandwidthHandlerImpl private static TrafficControlBandwidthHandlerImpl
getTrafficControlBandwidthHandler(Configuration conf) getTrafficControlBandwidthHandler(Configuration conf)
throws ResourceHandlerException { throws ResourceHandlerException {
@ -78,6 +109,7 @@ public class ResourceHandlerModule {
if (trafficControlBandwidthHandler == null) { if (trafficControlBandwidthHandler == null) {
synchronized (OutboundBandwidthResourceHandler.class) { synchronized (OutboundBandwidthResourceHandler.class) {
if (trafficControlBandwidthHandler == null) { if (trafficControlBandwidthHandler == null) {
LOG.debug("Creating new traffic control bandwidth handler");
trafficControlBandwidthHandler = new trafficControlBandwidthHandler = new
TrafficControlBandwidthHandlerImpl(PrivilegedOperationExecutor TrafficControlBandwidthHandlerImpl(PrivilegedOperationExecutor
.getInstance(conf), getCGroupsHandler(conf), .getInstance(conf), getCGroupsHandler(conf),
@ -113,6 +145,7 @@ public class ResourceHandlerModule {
if (cGroupsBlkioResourceHandler == null) { if (cGroupsBlkioResourceHandler == null) {
synchronized (DiskResourceHandler.class) { synchronized (DiskResourceHandler.class) {
if (cGroupsBlkioResourceHandler == null) { if (cGroupsBlkioResourceHandler == null) {
LOG.debug("Creating new cgroups blkio handler");
cGroupsBlkioResourceHandler = cGroupsBlkioResourceHandler =
new CGroupsBlkioResourceHandlerImpl(getCGroupsHandler(conf)); new CGroupsBlkioResourceHandlerImpl(getCGroupsHandler(conf));
} }
@ -158,6 +191,7 @@ public class ResourceHandlerModule {
addHandlerIfNotNull(handlerList, getOutboundBandwidthResourceHandler(conf)); addHandlerIfNotNull(handlerList, getOutboundBandwidthResourceHandler(conf));
addHandlerIfNotNull(handlerList, getDiskResourceHandler(conf)); addHandlerIfNotNull(handlerList, getDiskResourceHandler(conf));
addHandlerIfNotNull(handlerList, getMemoryResourceHandler(conf)); addHandlerIfNotNull(handlerList, getMemoryResourceHandler(conf));
addHandlerIfNotNull(handlerList, getcGroupsCpuResourceHandler(conf));
resourceHandlerChain = new ResourceHandlerChain(handlerList); resourceHandlerChain = new ResourceHandlerChain(handlerList);
} }

View File

@ -39,7 +39,6 @@ import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -50,10 +49,20 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsCpuResourceHandlerImpl;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
/**
* Resource handler that lets you setup cgroups
* to to handle cpu isolation. Please look at the ResourceHandlerModule
* and CGroupsCpuResourceHandlerImpl classes which let you isolate multiple
* resources using cgroups.
* Deprecated - please look at ResourceHandlerModule and
* CGroupsCpuResourceHandlerImpl
*/
@Deprecated
public class CgroupsLCEResourcesHandler implements LCEResourcesHandler { public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
final static Log LOG = LogFactory final static Log LOG = LogFactory
@ -73,8 +82,6 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
private final String CPU_PERIOD_US = "cfs_period_us"; private final String CPU_PERIOD_US = "cfs_period_us";
private final String CPU_QUOTA_US = "cfs_quota_us"; private final String CPU_QUOTA_US = "cfs_quota_us";
private final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel private final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel
private final int MAX_QUOTA_US = 1000 * 1000;
private final int MIN_PERIOD_US = 1000;
private final Map<String, String> controllerPaths; // Controller -> path private final Map<String, String> controllerPaths; // Controller -> path
private long deleteCgroupTimeout; private long deleteCgroupTimeout;
@ -163,65 +170,18 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
int[] limits = getOverallLimits(yarnProcessors); int[] limits = getOverallLimits(yarnProcessors);
updateCgroup(CONTROLLER_CPU, "", CPU_PERIOD_US, String.valueOf(limits[0])); updateCgroup(CONTROLLER_CPU, "", CPU_PERIOD_US, String.valueOf(limits[0]));
updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(limits[1])); updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(limits[1]));
} else if (cpuLimitsExist()) { } else if (CGroupsCpuResourceHandlerImpl.cpuLimitsExist(
pathForCgroup(CONTROLLER_CPU, ""))) {
LOG.info("Removing CPU constraints for YARN containers."); LOG.info("Removing CPU constraints for YARN containers.");
updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(-1)); updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(-1));
} }
} }
boolean cpuLimitsExist() throws IOException {
String path = pathForCgroup(CONTROLLER_CPU, "");
File quotaFile = new File(path, CONTROLLER_CPU + "." + CPU_QUOTA_US);
if (quotaFile.exists()) {
String contents = FileUtils.readFileToString(quotaFile, "UTF-8");
int quotaUS = Integer.parseInt(contents.trim());
if (quotaUS != -1) {
return true;
}
}
return false;
}
@VisibleForTesting
int[] getOverallLimits(float yarnProcessors) { int[] getOverallLimits(float yarnProcessors) {
return CGroupsCpuResourceHandlerImpl.getOverallLimits(yarnProcessors);
int[] ret = new int[2];
if (yarnProcessors < 0.01f) {
throw new IllegalArgumentException("Number of processors can't be <= 0.");
}
int quotaUS = MAX_QUOTA_US;
int periodUS = (int) (MAX_QUOTA_US / yarnProcessors);
if (yarnProcessors < 1.0f) {
periodUS = MAX_QUOTA_US;
quotaUS = (int) (periodUS * yarnProcessors);
if (quotaUS < MIN_PERIOD_US) {
LOG
.warn("The quota calculated for the cgroup was too low. The minimum value is "
+ MIN_PERIOD_US + ", calculated value is " + quotaUS
+ ". Setting quota to minimum value.");
quotaUS = MIN_PERIOD_US;
}
}
// cfs_period_us can't be less than 1000 microseconds
// if the value of periodUS is less than 1000, we can't really use cgroups
// to limit cpu
if (periodUS < MIN_PERIOD_US) {
LOG
.warn("The period calculated for the cgroup was too low. The minimum value is "
+ MIN_PERIOD_US + ", calculated value is " + periodUS
+ ". Using all available CPU.");
periodUS = MAX_QUOTA_US;
quotaUS = -1;
}
ret[0] = periodUS;
ret[1] = quotaUS;
return ret;
} }
boolean isCpuWeightEnabled() { boolean isCpuWeightEnabled() {
return this.cpuWeightEnabled; return this.cpuWeightEnabled;
} }

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
@Deprecated
public class DefaultLCEResourcesHandler implements LCEResourcesHandler { public class DefaultLCEResourcesHandler implements LCEResourcesHandler {
final static Log LOG = LogFactory final static Log LOG = LogFactory

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
@Deprecated
public interface LCEResourcesHandler extends Configurable { public interface LCEResourcesHandler extends Configurable {
void init(LinuxContainerExecutor lce) throws IOException; void init(LinuxContainerExecutor lce) throws IOException;

View File

@ -0,0 +1,297 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.util.List;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.times;
public class TestCGroupsCpuResourceHandlerImpl {
private CGroupsHandler mockCGroupsHandler;
private CGroupsCpuResourceHandlerImpl cGroupsCpuResourceHandler;
private ResourceCalculatorPlugin plugin;
final int numProcessors = 4;
@Before
public void setup() {
mockCGroupsHandler = mock(CGroupsHandler.class);
cGroupsCpuResourceHandler =
new CGroupsCpuResourceHandlerImpl(mockCGroupsHandler);
plugin = mock(ResourceCalculatorPlugin.class);
Mockito.doReturn(numProcessors).when(plugin).getNumProcessors();
Mockito.doReturn(numProcessors).when(plugin).getNumCores();
}
@Test
public void testBootstrap() throws Exception {
Configuration conf = new YarnConfiguration();
List<PrivilegedOperation> ret =
cGroupsCpuResourceHandler.bootstrap(plugin, conf);
verify(mockCGroupsHandler, times(1))
.mountCGroupController(CGroupsHandler.CGroupController.CPU);
verify(mockCGroupsHandler, times(0))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
CGroupsHandler.CGROUP_CPU_PERIOD_US, "");
verify(mockCGroupsHandler, times(0))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
CGroupsHandler.CGROUP_CPU_QUOTA_US, "");
Assert.assertNull(ret);
}
@Test
public void testBootstrapLimits() throws Exception {
Configuration conf = new YarnConfiguration();
int cpuPerc = 80;
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
cpuPerc);
int period = (CGroupsCpuResourceHandlerImpl.MAX_QUOTA_US * 100) / (cpuPerc
* numProcessors);
List<PrivilegedOperation> ret =
cGroupsCpuResourceHandler.bootstrap(plugin, conf);
verify(mockCGroupsHandler, times(1))
.mountCGroupController(CGroupsHandler.CGroupController.CPU);
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
CGroupsHandler.CGROUP_CPU_PERIOD_US, String.valueOf(period));
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
CGroupsHandler.CGROUP_CPU_QUOTA_US,
String.valueOf(CGroupsCpuResourceHandlerImpl.MAX_QUOTA_US));
Assert.assertNull(ret);
}
@Test
public void testBootstrapExistingLimits() throws Exception {
File existingLimit = new File(CGroupsHandler.CGroupController.CPU.getName()
+ "." + CGroupsHandler.CGROUP_CPU_QUOTA_US);
try {
FileUtils.write(existingLimit, "10000"); // value doesn't matter
when(mockCGroupsHandler
.getPathForCGroup(CGroupsHandler.CGroupController.CPU, ""))
.thenReturn(".");
Configuration conf = new YarnConfiguration();
List<PrivilegedOperation> ret =
cGroupsCpuResourceHandler.bootstrap(plugin, conf);
verify(mockCGroupsHandler, times(1))
.mountCGroupController(CGroupsHandler.CGroupController.CPU);
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
CGroupsHandler.CGROUP_CPU_QUOTA_US, "-1");
Assert.assertNull(ret);
} finally {
FileUtils.deleteQuietly(existingLimit);
}
}
@Test
public void testPreStart() throws Exception {
String id = "container_01_01";
String path = "test-path/" + id;
ContainerId mockContainerId = mock(ContainerId.class);
when(mockContainerId.toString()).thenReturn(id);
Container mockContainer = mock(Container.class);
when(mockContainer.getContainerId()).thenReturn(mockContainerId);
when(mockCGroupsHandler
.getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id))
.thenReturn(path);
when(mockContainer.getResource()).thenReturn(Resource.newInstance(1024, 2));
List<PrivilegedOperation> ret =
cGroupsCpuResourceHandler.preStart(mockContainer);
verify(mockCGroupsHandler, times(1))
.createCGroup(CGroupsHandler.CGroupController.CPU, id);
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
CGroupsHandler.CGROUP_CPU_SHARES, String
.valueOf(CGroupsCpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT * 2));
// don't set quota or period
verify(mockCGroupsHandler, never())
.updateCGroupParam(eq(CGroupsHandler.CGroupController.CPU), eq(id),
eq(CGroupsHandler.CGROUP_CPU_PERIOD_US), anyString());
verify(mockCGroupsHandler, never())
.updateCGroupParam(eq(CGroupsHandler.CGroupController.CPU), eq(id),
eq(CGroupsHandler.CGROUP_CPU_QUOTA_US), anyString());
Assert.assertNotNull(ret);
Assert.assertEquals(1, ret.size());
PrivilegedOperation op = ret.get(0);
Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
op.getOperationType());
List<String> args = op.getArguments();
Assert.assertEquals(1, args.size());
Assert.assertEquals(PrivilegedOperation.CGROUP_ARG_PREFIX + path,
args.get(0));
}
@Test
public void testPreStartStrictUsage() throws Exception {
String id = "container_01_01";
String path = "test-path/" + id;
ContainerId mockContainerId = mock(ContainerId.class);
when(mockContainerId.toString()).thenReturn(id);
Container mockContainer = mock(Container.class);
when(mockContainer.getContainerId()).thenReturn(mockContainerId);
when(mockCGroupsHandler
.getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id))
.thenReturn(path);
when(mockContainer.getResource()).thenReturn(Resource.newInstance(1024, 1));
Configuration conf = new YarnConfiguration();
conf.setBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
true);
cGroupsCpuResourceHandler.bootstrap(plugin, conf);
int defaultVCores = 8;
float share = (float) numProcessors / (float) defaultVCores;
List<PrivilegedOperation> ret =
cGroupsCpuResourceHandler.preStart(mockContainer);
verify(mockCGroupsHandler, times(1))
.createCGroup(CGroupsHandler.CGroupController.CPU, id);
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
CGroupsHandler.CGROUP_CPU_SHARES,
String.valueOf(CGroupsCpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT));
// set quota and period
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
CGroupsHandler.CGROUP_CPU_PERIOD_US,
String.valueOf(CGroupsCpuResourceHandlerImpl.MAX_QUOTA_US));
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
CGroupsHandler.CGROUP_CPU_QUOTA_US, String.valueOf(
(int) (CGroupsCpuResourceHandlerImpl.MAX_QUOTA_US * share)));
Assert.assertNotNull(ret);
Assert.assertEquals(1, ret.size());
PrivilegedOperation op = ret.get(0);
Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
op.getOperationType());
List<String> args = op.getArguments();
Assert.assertEquals(1, args.size());
Assert.assertEquals(PrivilegedOperation.CGROUP_ARG_PREFIX + path,
args.get(0));
}
@Test
public void testPreStartRestrictedContainers() throws Exception {
String id = "container_01_01";
String path = "test-path/" + id;
int defaultVCores = 8;
Configuration conf = new YarnConfiguration();
conf.setBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
true);
int cpuPerc = 75;
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
cpuPerc);
cGroupsCpuResourceHandler.bootstrap(plugin, conf);
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
CGroupsHandler.CGROUP_CPU_PERIOD_US, String.valueOf("333333"));
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
CGroupsHandler.CGROUP_CPU_QUOTA_US,
String.valueOf(CGroupsCpuResourceHandlerImpl.MAX_QUOTA_US));
float yarnCores = (cpuPerc * numProcessors) / 100;
int[] containerVCores = { 2, 4 };
for (int cVcores : containerVCores) {
ContainerId mockContainerId = mock(ContainerId.class);
when(mockContainerId.toString()).thenReturn(id);
Container mockContainer = mock(Container.class);
when(mockContainer.getContainerId()).thenReturn(mockContainerId);
when(mockCGroupsHandler
.getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id))
.thenReturn(path);
when(mockContainer.getResource())
.thenReturn(Resource.newInstance(1024, cVcores));
when(mockCGroupsHandler
.getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id))
.thenReturn(path);
float share = (cVcores * yarnCores) / defaultVCores;
int quotaUS;
int periodUS;
if (share > 1.0f) {
quotaUS = CGroupsCpuResourceHandlerImpl.MAX_QUOTA_US;
periodUS =
(int) ((float) CGroupsCpuResourceHandlerImpl.MAX_QUOTA_US / share);
} else {
quotaUS = (int) (CGroupsCpuResourceHandlerImpl.MAX_QUOTA_US * share);
periodUS = CGroupsCpuResourceHandlerImpl.MAX_QUOTA_US;
}
cGroupsCpuResourceHandler.preStart(mockContainer);
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
CGroupsHandler.CGROUP_CPU_SHARES, String.valueOf(
CGroupsCpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT * cVcores));
// set quota and period
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
CGroupsHandler.CGROUP_CPU_PERIOD_US, String.valueOf(periodUS));
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
CGroupsHandler.CGROUP_CPU_QUOTA_US, String.valueOf(quotaUS));
}
}
@Test
public void testReacquireContainer() throws Exception {
ContainerId containerIdMock = mock(ContainerId.class);
Assert.assertNull(
cGroupsCpuResourceHandler.reacquireContainer(containerIdMock));
}
@Test
public void testPostComplete() throws Exception {
String id = "container_01_01";
ContainerId mockContainerId = mock(ContainerId.class);
when(mockContainerId.toString()).thenReturn(id);
Assert.assertNull(cGroupsCpuResourceHandler.postComplete(mockContainerId));
verify(mockCGroupsHandler, times(1))
.deleteCGroup(CGroupsHandler.CGroupController.CPU, id);
}
@Test
public void testTeardown() throws Exception {
Assert.assertNull(cGroupsCpuResourceHandler.teardown());
}
@Test
public void testStrictResourceUsage() throws Exception {
Assert.assertNull(cGroupsCpuResourceHandler.teardown());
}
}

View File

@ -37,6 +37,7 @@ import java.util.List;
import java.util.Scanner; import java.util.Scanner;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@Deprecated
public class TestCgroupsLCEResourcesHandler { public class TestCgroupsLCEResourcesHandler {
static File cgroupDir = null; static File cgroupDir = null;