YARN-2619. Added NodeManager support for disk io isolation through cgroups. Contributed by Varun Vasudev and Wei Yan.
(cherry picked from commit 1b3b9e5c31
)
This commit is contained in:
parent
ec7b392965
commit
90f6e8c90f
|
@ -57,6 +57,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-2498. Respect labels in preemption policy of capacity scheduler for
|
||||
inter-queue preemption. (Wangda Tan via jianhe)
|
||||
|
||||
YARN-2619. Added NodeManager support for disk io isolation through cgroups.
|
||||
(Varun Vasudev and Wei Yan via vinodkv)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-1880. Cleanup TestApplicationClientProtocolOnHA
|
||||
|
|
|
@ -823,38 +823,68 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
|
||||
100;
|
||||
|
||||
/**
|
||||
* Prefix for disk configurations. Work in progress: This configuration
|
||||
* parameter may be changed/removed in the future.
|
||||
*/
|
||||
@Private
|
||||
public static final String NM_DISK_RESOURCE_PREFIX = NM_PREFIX
|
||||
+ "resource.disk.";
|
||||
/**
|
||||
* This setting controls if resource handling for disk operations is enabled.
|
||||
* Work in progress: This configuration parameter may be changed/removed in
|
||||
* the future
|
||||
*/
|
||||
@Private
|
||||
public static final String NM_DISK_RESOURCE_ENABLED = NM_DISK_RESOURCE_PREFIX
|
||||
+ "enabled";
|
||||
/** Disk as a resource is disabled by default. **/
|
||||
@Private
|
||||
public static final boolean DEFAULT_NM_DISK_RESOURCE_ENABLED = false;
|
||||
|
||||
public static final String NM_NETWORK_RESOURCE_PREFIX = NM_PREFIX + "resource.network.";
|
||||
public static final String NM_NETWORK_RESOURCE_PREFIX = NM_PREFIX
|
||||
+ "resource.network.";
|
||||
|
||||
/** This setting controls if resource handling for network bandwidth is enabled **/
|
||||
/* Work in progress: This configuration parameter may be changed/removed in the future */
|
||||
/**
|
||||
* This setting controls if resource handling for network bandwidth is
|
||||
* enabled. Work in progress: This configuration parameter may be
|
||||
* changed/removed in the future
|
||||
*/
|
||||
@Private
|
||||
public static final String NM_NETWORK_RESOURCE_ENABLED =
|
||||
NM_NETWORK_RESOURCE_PREFIX + "enabled";
|
||||
/** Network as a resource is disabled by default **/
|
||||
/** Network as a resource is disabled by default. **/
|
||||
@Private
|
||||
public static final boolean DEFAULT_NM_NETWORK_RESOURCE_ENABLED = false;
|
||||
|
||||
/** Specifies the interface to be used for applying network throttling rules **/
|
||||
/* Work in progress: This configuration parameter may be changed/removed in the future */
|
||||
/**
|
||||
* Specifies the interface to be used for applying network throttling rules.
|
||||
* Work in progress: This configuration parameter may be changed/removed in
|
||||
* the future
|
||||
*/
|
||||
@Private
|
||||
public static final String NM_NETWORK_RESOURCE_INTERFACE =
|
||||
NM_NETWORK_RESOURCE_PREFIX + "interface";
|
||||
@Private
|
||||
public static final String DEFAULT_NM_NETWORK_RESOURCE_INTERFACE = "eth0";
|
||||
|
||||
/** Specifies the total available outbound bandwidth on the node **/
|
||||
/* Work in progress: This configuration parameter may be changed/removed in the future */
|
||||
/**
|
||||
* Specifies the total available outbound bandwidth on the node. Work in
|
||||
* progress: This configuration parameter may be changed/removed in the future
|
||||
*/
|
||||
@Private
|
||||
public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT =
|
||||
NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-mbit";
|
||||
@Private
|
||||
public static final int DEFAULT_NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT = 1000;
|
||||
public static final int DEFAULT_NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT =
|
||||
1000;
|
||||
|
||||
/** Specifies the total outbound bandwidth available to YARN containers. defaults to
|
||||
* NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT if not specified.
|
||||
/**
|
||||
* Specifies the total outbound bandwidth available to YARN containers.
|
||||
* defaults to NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT if not specified.
|
||||
* Work in progress: This configuration parameter may be changed/removed in
|
||||
* the future
|
||||
*/
|
||||
/* Work in progress: This configuration parameter may be changed/removed in the future */
|
||||
@Private
|
||||
public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT =
|
||||
NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-yarn-mbit";
|
||||
|
|
|
@ -0,0 +1,170 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Handler class to handle the blkio controller. Currently it splits resources
|
||||
* evenly across all containers. Once we have scheduling sorted out, we can
|
||||
* modify the function to represent the disk resources allocated.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class CGroupsBlkioResourceHandlerImpl implements DiskResourceHandler {
|
||||
|
||||
static final Log LOG = LogFactory
|
||||
.getLog(CGroupsBlkioResourceHandlerImpl.class);
|
||||
|
||||
private CGroupsHandler cGroupsHandler;
|
||||
// Arbitrarily choose a weight - all that matters is that all containers
|
||||
// get the same weight assigned to them. Once we have scheduling support
|
||||
// this number will be determined dynamically for each container.
|
||||
@VisibleForTesting
|
||||
static final String DEFAULT_WEIGHT = "500";
|
||||
private static final String PARTITIONS_FILE = "/proc/partitions";
|
||||
|
||||
CGroupsBlkioResourceHandlerImpl(CGroupsHandler cGroupsHandler) {
|
||||
this.cGroupsHandler = cGroupsHandler;
|
||||
// check for linux so that we don't print messages for tests running on
|
||||
// other platforms
|
||||
if(Shell.LINUX) {
|
||||
checkDiskScheduler();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void checkDiskScheduler() {
|
||||
String data;
|
||||
|
||||
// read /proc/partitions and check to make sure that sd* and hd*
|
||||
// are using the CFQ scheduler. If they aren't print a warning
|
||||
try {
|
||||
byte[] contents = Files.readAllBytes(Paths.get(PARTITIONS_FILE));
|
||||
data = new String(contents, "UTF-8").trim();
|
||||
} catch (IOException e) {
|
||||
String msg = "Couldn't read " + PARTITIONS_FILE +
|
||||
"; can't determine disk scheduler type";
|
||||
LOG.warn(msg, e);
|
||||
return;
|
||||
}
|
||||
String[] lines = data.split(System.lineSeparator());
|
||||
if (lines.length > 0) {
|
||||
for (String line : lines) {
|
||||
String[] columns = line.split("\\s+");
|
||||
if (columns.length > 4) {
|
||||
String partition = columns[4];
|
||||
// check some known partitions to make sure the disk scheduler
|
||||
// is cfq - not meant to be comprehensive, more a sanity check
|
||||
if (partition.startsWith("sd") || partition.startsWith("hd")
|
||||
|| partition.startsWith("vd") || partition.startsWith("xvd")) {
|
||||
String schedulerPath =
|
||||
"/sys/block/" + partition + "/queue/scheduler";
|
||||
File schedulerFile = new File(schedulerPath);
|
||||
if (schedulerFile.exists()) {
|
||||
try {
|
||||
byte[] contents = Files.readAllBytes(Paths.get(schedulerPath));
|
||||
String schedulerString = new String(contents, "UTF-8").trim();
|
||||
if (!schedulerString.contains("[cfq]")) {
|
||||
LOG.warn("Device " + partition + " does not use the CFQ"
|
||||
+ " scheduler; disk isolation using "
|
||||
+ "CGroups will not work on this partition.");
|
||||
}
|
||||
} catch (IOException ie) {
|
||||
LOG.warn(
|
||||
"Unable to determine disk scheduler type for partition "
|
||||
+ partition, ie);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PrivilegedOperation> bootstrap(Configuration configuration)
|
||||
throws ResourceHandlerException {
|
||||
// if bootstrap is called on this class, disk is already enabled
|
||||
// so no need to check again
|
||||
this.cGroupsHandler
|
||||
.mountCGroupController(CGroupsHandler.CGroupController.BLKIO);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PrivilegedOperation> preStart(Container container)
|
||||
throws ResourceHandlerException {
|
||||
|
||||
String cgroupId = container.getContainerId().toString();
|
||||
cGroupsHandler
|
||||
.createCGroup(CGroupsHandler.CGroupController.BLKIO, cgroupId);
|
||||
try {
|
||||
cGroupsHandler.updateCGroupParam(CGroupsHandler.CGroupController.BLKIO,
|
||||
cgroupId, CGroupsHandler.CGROUP_PARAM_BLKIO_WEIGHT, DEFAULT_WEIGHT);
|
||||
} catch (ResourceHandlerException re) {
|
||||
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.BLKIO,
|
||||
cgroupId);
|
||||
LOG.warn("Could not update cgroup for container", re);
|
||||
throw re;
|
||||
}
|
||||
List<PrivilegedOperation> ret = new ArrayList<>();
|
||||
ret.add(new PrivilegedOperation(
|
||||
PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
|
||||
PrivilegedOperation.CGROUP_ARG_PREFIX
|
||||
+ cGroupsHandler.getPathForCGroupTasks(
|
||||
CGroupsHandler.CGroupController.BLKIO, cgroupId)));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
|
||||
throws ResourceHandlerException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PrivilegedOperation> postComplete(ContainerId containerId)
|
||||
throws ResourceHandlerException {
|
||||
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.BLKIO,
|
||||
containerId.toString());
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PrivilegedOperation> teardown() throws ResourceHandlerException {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -33,7 +33,8 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
public interface CGroupsHandler {
|
||||
public enum CGroupController {
|
||||
CPU("cpu"),
|
||||
NET_CLS("net_cls");
|
||||
NET_CLS("net_cls"),
|
||||
BLKIO("blkio");
|
||||
|
||||
private final String name;
|
||||
|
||||
|
@ -48,6 +49,7 @@ public interface CGroupsHandler {
|
|||
|
||||
public static final String CGROUP_FILE_TASKS = "tasks";
|
||||
public static final String CGROUP_PARAM_CLASSID = "classid";
|
||||
public static final String CGROUP_PARAM_BLKIO_WEIGHT = "weight";
|
||||
|
||||
/**
|
||||
* Mounts a cgroup controller
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -38,6 +39,7 @@ import java.io.*;
|
|||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -63,7 +65,7 @@ class CGroupsHandlerImpl implements CGroupsHandler {
|
|||
private final String cGroupMountPath;
|
||||
private final long deleteCGroupTimeout;
|
||||
private final long deleteCGroupDelay;
|
||||
private final Map<CGroupController, String> controllerPaths;
|
||||
private Map<CGroupController, String> controllerPaths;
|
||||
private final ReadWriteLock rwLock;
|
||||
private final PrivilegedOperationExecutor privilegedOperationExecutor;
|
||||
private final Clock clock;
|
||||
|
@ -106,55 +108,61 @@ class CGroupsHandlerImpl implements CGroupsHandler {
|
|||
|
||||
private void initializeControllerPaths() throws ResourceHandlerException {
|
||||
if (enableCGroupMount) {
|
||||
//nothing to do here - we support 'deferred' mounting of specific
|
||||
//controllers - we'll populate the path for a given controller when an
|
||||
//explicit mountCGroupController request is issued.
|
||||
// nothing to do here - we support 'deferred' mounting of specific
|
||||
// controllers - we'll populate the path for a given controller when an
|
||||
// explicit mountCGroupController request is issued.
|
||||
LOG.info("CGroup controller mounting enabled.");
|
||||
} else {
|
||||
//cluster admins are expected to have mounted controllers in specific
|
||||
//locations - we'll attempt to figure out mount points
|
||||
initializeControllerPathsFromMtab();
|
||||
}
|
||||
}
|
||||
// cluster admins are expected to have mounted controllers in specific
|
||||
// locations - we'll attempt to figure out mount points
|
||||
|
||||
private void initializeControllerPathsFromMtab()
|
||||
throws ResourceHandlerException {
|
||||
Map<CGroupController, String> cPaths =
|
||||
initializeControllerPathsFromMtab(MTAB_FILE, this.cGroupPrefix);
|
||||
// we want to do a bulk update without the paths changing concurrently
|
||||
try {
|
||||
Map<String, List<String>> parsedMtab = parseMtab();
|
||||
|
||||
//we want to do a bulk update without the paths changing concurrently
|
||||
rwLock.writeLock().lock();
|
||||
controllerPaths = cPaths;
|
||||
} finally {
|
||||
rwLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static Map<CGroupController, String> initializeControllerPathsFromMtab(
|
||||
String mtab, String cGroupPrefix) throws ResourceHandlerException {
|
||||
try {
|
||||
Map<String, List<String>> parsedMtab = parseMtab(mtab);
|
||||
Map<CGroupController, String> ret = new HashMap<>();
|
||||
|
||||
for (CGroupController controller : CGroupController.values()) {
|
||||
String name = controller.getName();
|
||||
String controllerPath = findControllerInMtab(name, parsedMtab);
|
||||
|
||||
if (controllerPath != null) {
|
||||
File f = new File(controllerPath + "/" + this.cGroupPrefix);
|
||||
File f = new File(controllerPath + "/" + cGroupPrefix);
|
||||
|
||||
if (FileUtil.canWrite(f)) {
|
||||
controllerPaths.put(controller, controllerPath);
|
||||
ret.put(controller, controllerPath);
|
||||
} else {
|
||||
String error =
|
||||
new StringBuffer("Mount point Based on mtab file: ")
|
||||
.append(MTAB_FILE).append(
|
||||
". Controller mount point not writable for: ")
|
||||
.append(mtab)
|
||||
.append(". Controller mount point not writable for: ")
|
||||
.append(name).toString();
|
||||
|
||||
LOG.error(error);
|
||||
throw new ResourceHandlerException(error);
|
||||
}
|
||||
} else {
|
||||
|
||||
LOG.warn("Controller not mounted but automount disabled: " + name);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to initialize controller paths! Exception: " + e);
|
||||
throw new ResourceHandlerException(
|
||||
"Failed to initialize controller paths!");
|
||||
} finally {
|
||||
rwLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -173,12 +181,13 @@ class CGroupsHandlerImpl implements CGroupsHandler {
|
|||
* for mounts with type "cgroup". Cgroup controllers will
|
||||
* appear in the list of options for a path.
|
||||
*/
|
||||
private Map<String, List<String>> parseMtab() throws IOException {
|
||||
private static Map<String, List<String>> parseMtab(String mtab)
|
||||
throws IOException {
|
||||
Map<String, List<String>> ret = new HashMap<String, List<String>>();
|
||||
BufferedReader in = null;
|
||||
|
||||
try {
|
||||
FileInputStream fis = new FileInputStream(new File(getMtabFileName()));
|
||||
FileInputStream fis = new FileInputStream(new File(mtab));
|
||||
in = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
|
||||
|
||||
for (String str = in.readLine(); str != null;
|
||||
|
@ -197,7 +206,7 @@ class CGroupsHandlerImpl implements CGroupsHandler {
|
|||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new IOException("Error while reading " + getMtabFileName(), e);
|
||||
throw new IOException("Error while reading " + mtab, e);
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, in);
|
||||
}
|
||||
|
@ -205,7 +214,7 @@ class CGroupsHandlerImpl implements CGroupsHandler {
|
|||
return ret;
|
||||
}
|
||||
|
||||
private String findControllerInMtab(String controller,
|
||||
private static String findControllerInMtab(String controller,
|
||||
Map<String, List<String>> entries) {
|
||||
for (Map.Entry<String, List<String>> e : entries.entrySet()) {
|
||||
if (e.getValue().contains(controller))
|
||||
|
@ -215,10 +224,6 @@ class CGroupsHandlerImpl implements CGroupsHandler {
|
|||
return null;
|
||||
}
|
||||
|
||||
String getMtabFileName() {
|
||||
return MTAB_FILE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void mountCGroupController(CGroupController controller)
|
||||
throws ResourceHandlerException {
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Resource handler for disk resources.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public interface DiskResourceHandler extends ResourceHandler {
|
||||
}
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -31,25 +32,27 @@ import java.util.List;
|
|||
|
||||
/**
|
||||
* Provides mechanisms to get various resource handlers - cpu, memory, network,
|
||||
* disk etc., - based on configuration
|
||||
* disk etc., - based on configuration.
|
||||
*/
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class ResourceHandlerModule {
|
||||
private volatile static ResourceHandlerChain resourceHandlerChain;
|
||||
private static volatile ResourceHandlerChain resourceHandlerChain;
|
||||
|
||||
/**
|
||||
* This specific implementation might provide resource management as well
|
||||
* as resource metrics functionality. We need to ensure that the same
|
||||
* instance is used for both.
|
||||
*/
|
||||
private volatile static TrafficControlBandwidthHandlerImpl
|
||||
private static volatile TrafficControlBandwidthHandlerImpl
|
||||
trafficControlBandwidthHandler;
|
||||
private volatile static CGroupsHandler cGroupsHandler;
|
||||
private static volatile CGroupsHandler cGroupsHandler;
|
||||
private static volatile CGroupsBlkioResourceHandlerImpl
|
||||
cGroupsBlkioResourceHandler;
|
||||
|
||||
/**
|
||||
* Returns an initialized, thread-safe CGroupsHandler instance
|
||||
* Returns an initialized, thread-safe CGroupsHandler instance.
|
||||
*/
|
||||
public static CGroupsHandler getCGroupsHandler(Configuration conf)
|
||||
throws ResourceHandlerException {
|
||||
|
@ -94,6 +97,28 @@ public class ResourceHandlerModule {
|
|||
return getTrafficControlBandwidthHandler(conf);
|
||||
}
|
||||
|
||||
public static DiskResourceHandler getDiskResourceHandler(Configuration conf)
|
||||
throws ResourceHandlerException {
|
||||
if (conf.getBoolean(YarnConfiguration.NM_DISK_RESOURCE_ENABLED,
|
||||
YarnConfiguration.DEFAULT_NM_DISK_RESOURCE_ENABLED)) {
|
||||
return getCgroupsBlkioResourceHandler(conf);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static CGroupsBlkioResourceHandlerImpl getCgroupsBlkioResourceHandler(
|
||||
Configuration conf) throws ResourceHandlerException {
|
||||
if (cGroupsBlkioResourceHandler == null) {
|
||||
synchronized (DiskResourceHandler.class) {
|
||||
if (cGroupsBlkioResourceHandler == null) {
|
||||
cGroupsBlkioResourceHandler =
|
||||
new CGroupsBlkioResourceHandlerImpl(getCGroupsHandler(conf));
|
||||
}
|
||||
}
|
||||
}
|
||||
return cGroupsBlkioResourceHandler;
|
||||
}
|
||||
|
||||
private static void addHandlerIfNotNull(List<ResourceHandler> handlerList,
|
||||
ResourceHandler handler) {
|
||||
if (handler != null) {
|
||||
|
@ -106,11 +131,12 @@ public class ResourceHandlerModule {
|
|||
ArrayList<ResourceHandler> handlerList = new ArrayList<>();
|
||||
|
||||
addHandlerIfNotNull(handlerList, getOutboundBandwidthResourceHandler(conf));
|
||||
addHandlerIfNotNull(handlerList, getDiskResourceHandler(conf));
|
||||
resourceHandlerChain = new ResourceHandlerChain(handlerList);
|
||||
}
|
||||
|
||||
public static ResourceHandlerChain getConfiguredResourceHandlerChain
|
||||
(Configuration conf) throws ResourceHandlerException {
|
||||
public static ResourceHandlerChain getConfiguredResourceHandlerChain(
|
||||
Configuration conf) throws ResourceHandlerException {
|
||||
if (resourceHandlerChain == null) {
|
||||
synchronized (ResourceHandlerModule.class) {
|
||||
if (resourceHandlerChain == null) {
|
||||
|
@ -125,4 +151,9 @@ public class ResourceHandlerModule {
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static void nullifyResourceHandlerChain() throws ResourceHandlerException {
|
||||
resourceHandlerChain = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.io.PrintWriter;
|
|||
import java.io.Writer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -503,4 +504,9 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
|
|||
String getMtabFileName() {
|
||||
return MTAB_FILE;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Map<String, String> getControllerPaths() {
|
||||
return Collections.unmodifiableMap(controllerPaths);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
||||
/**
|
||||
* Tests for the cgroups disk handler implementation.
|
||||
*/
|
||||
public class TestCGroupsBlkioResourceHandlerImpl {
|
||||
|
||||
private CGroupsHandler mockCGroupsHandler;
|
||||
private CGroupsBlkioResourceHandlerImpl cGroupsBlkioResourceHandlerImpl;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
mockCGroupsHandler = mock(CGroupsHandler.class);
|
||||
cGroupsBlkioResourceHandlerImpl =
|
||||
new CGroupsBlkioResourceHandlerImpl(mockCGroupsHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBootstrap() throws Exception {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
List<PrivilegedOperation> ret =
|
||||
cGroupsBlkioResourceHandlerImpl.bootstrap(conf);
|
||||
verify(mockCGroupsHandler, times(1)).mountCGroupController(
|
||||
CGroupsHandler.CGroupController.BLKIO);
|
||||
Assert.assertNull(ret);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreStart() throws Exception {
|
||||
String id = "container_01_01";
|
||||
String path = "test-path/" + id;
|
||||
ContainerId mockContainerId = mock(ContainerId.class);
|
||||
when(mockContainerId.toString()).thenReturn(id);
|
||||
Container mockContainer = mock(Container.class);
|
||||
when(mockContainer.getContainerId()).thenReturn(mockContainerId);
|
||||
when(
|
||||
mockCGroupsHandler.getPathForCGroupTasks(
|
||||
CGroupsHandler.CGroupController.BLKIO, id)).thenReturn(path);
|
||||
|
||||
List<PrivilegedOperation> ret =
|
||||
cGroupsBlkioResourceHandlerImpl.preStart(mockContainer);
|
||||
verify(mockCGroupsHandler, times(1)).createCGroup(
|
||||
CGroupsHandler.CGroupController.BLKIO, id);
|
||||
verify(mockCGroupsHandler, times(1)).updateCGroupParam(
|
||||
CGroupsHandler.CGroupController.BLKIO, id,
|
||||
CGroupsHandler.CGROUP_PARAM_BLKIO_WEIGHT,
|
||||
CGroupsBlkioResourceHandlerImpl.DEFAULT_WEIGHT);
|
||||
Assert.assertNotNull(ret);
|
||||
Assert.assertEquals(1, ret.size());
|
||||
PrivilegedOperation op = ret.get(0);
|
||||
Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
|
||||
op.getOperationType());
|
||||
List<String> args = op.getArguments();
|
||||
Assert.assertEquals(1, args.size());
|
||||
Assert.assertEquals(PrivilegedOperation.CGROUP_ARG_PREFIX + path,
|
||||
args.get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReacquireContainer() throws Exception {
|
||||
ContainerId containerIdMock = mock(ContainerId.class);
|
||||
Assert.assertNull(cGroupsBlkioResourceHandlerImpl
|
||||
.reacquireContainer(containerIdMock));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPostComplete() throws Exception {
|
||||
String id = "container_01_01";
|
||||
ContainerId mockContainerId = mock(ContainerId.class);
|
||||
when(mockContainerId.toString()).thenReturn(id);
|
||||
Assert.assertNull(cGroupsBlkioResourceHandlerImpl
|
||||
.postComplete(mockContainerId));
|
||||
verify(mockCGroupsHandler, times(1)).deleteCGroup(
|
||||
CGroupsHandler.CGroupController.BLKIO, id);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTeardown() throws Exception {
|
||||
Assert.assertNull(cGroupsBlkioResourceHandlerImpl.teardown());
|
||||
}
|
||||
|
||||
}
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -35,18 +36,21 @@ import org.junit.Test;
|
|||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Matchers.isNull;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
|
||||
/**
|
||||
* Tests for the CGroups handler implementation.
|
||||
*/
|
||||
public class TestCGroupsHandlerImpl {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TestCGroupsHandlerImpl.class);
|
||||
|
@ -84,8 +88,8 @@ public class TestCGroupsHandlerImpl {
|
|||
try {
|
||||
cGroupsHandler = new CGroupsHandlerImpl(conf,
|
||||
privilegedOperationExecutorMock);
|
||||
PrivilegedOperation expectedOp = new PrivilegedOperation
|
||||
(PrivilegedOperation.OperationType.MOUNT_CGROUPS, (String) null);
|
||||
PrivilegedOperation expectedOp = new PrivilegedOperation(
|
||||
PrivilegedOperation.OperationType.MOUNT_CGROUPS, (String) null);
|
||||
//This is expected to be of the form :
|
||||
//net_cls=<mount_path>/net_cls
|
||||
StringBuffer controllerKV = new StringBuffer(controller.getName())
|
||||
|
@ -94,8 +98,8 @@ public class TestCGroupsHandlerImpl {
|
|||
|
||||
cGroupsHandler.mountCGroupController(controller);
|
||||
try {
|
||||
ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass
|
||||
(PrivilegedOperation.class);
|
||||
ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass(
|
||||
PrivilegedOperation.class);
|
||||
verify(privilegedOperationExecutorMock)
|
||||
.executePrivilegedOperation(opCaptor.capture(), eq(false));
|
||||
|
||||
|
@ -200,17 +204,15 @@ public class TestCGroupsHandlerImpl {
|
|||
|
||||
Assert.assertTrue(paramFile.exists());
|
||||
try {
|
||||
Assert.assertEquals(paramValue, new String(Files.readAllBytes
|
||||
(paramFile
|
||||
.toPath())));
|
||||
Assert.assertEquals(paramValue, new String(Files.readAllBytes(
|
||||
paramFile.toPath())));
|
||||
} catch (IOException e) {
|
||||
LOG.error("Caught exception: " + e);
|
||||
Assert.assertTrue("Unexpected IOException trying to read cgroup param!",
|
||||
false);
|
||||
Assert.fail("Unexpected IOException trying to read cgroup param!");
|
||||
}
|
||||
|
||||
Assert.assertEquals(paramValue, cGroupsHandler.getCGroupParam
|
||||
(controller, testCGroup, param));
|
||||
Assert.assertEquals(paramValue,
|
||||
cGroupsHandler.getCGroupParam(controller, testCGroup, param));
|
||||
|
||||
//We can't really do a delete test here. Linux cgroups
|
||||
//implementation provides additional semantics - the cgroup cannot be
|
||||
|
@ -222,12 +224,79 @@ public class TestCGroupsHandlerImpl {
|
|||
//delete is not possible with a regular non-empty directory.
|
||||
} catch (ResourceHandlerException e) {
|
||||
LOG.error("Caught exception: " + e);
|
||||
Assert.assertTrue(
|
||||
"Unexpected ResourceHandlerException during cgroup operations!",
|
||||
false);
|
||||
Assert
|
||||
.fail("Unexpected ResourceHandlerException during cgroup operations!");
|
||||
}
|
||||
}
|
||||
|
||||
public static File createMockCgroupMount(File parentDir, String type)
|
||||
throws IOException {
|
||||
return createMockCgroupMount(parentDir, type, "hadoop-yarn");
|
||||
}
|
||||
|
||||
public static File createMockCgroupMount(File parentDir, String type,
|
||||
String hierarchy) throws IOException {
|
||||
File cgroupMountDir =
|
||||
new File(parentDir.getAbsolutePath(), type + "/" + hierarchy);
|
||||
FileUtils.deleteQuietly(cgroupMountDir);
|
||||
if (!cgroupMountDir.mkdirs()) {
|
||||
String message =
|
||||
"Could not create dir " + cgroupMountDir.getAbsolutePath();
|
||||
throw new IOException(message);
|
||||
}
|
||||
return cgroupMountDir;
|
||||
}
|
||||
|
||||
public static File createMockMTab(File parentDir) throws IOException {
|
||||
String cpuMtabContent =
|
||||
"none " + parentDir.getAbsolutePath()
|
||||
+ "/cpu cgroup rw,relatime,cpu 0 0\n";
|
||||
String blkioMtabContent =
|
||||
"none " + parentDir.getAbsolutePath()
|
||||
+ "/blkio cgroup rw,relatime,blkio 0 0\n";
|
||||
|
||||
File mockMtab = new File(parentDir, UUID.randomUUID().toString());
|
||||
if (!mockMtab.exists()) {
|
||||
if (!mockMtab.createNewFile()) {
|
||||
String message = "Could not create file " + mockMtab.getAbsolutePath();
|
||||
throw new IOException(message);
|
||||
}
|
||||
}
|
||||
FileWriter mtabWriter = new FileWriter(mockMtab.getAbsoluteFile());
|
||||
mtabWriter.write(cpuMtabContent);
|
||||
mtabWriter.write(blkioMtabContent);
|
||||
mtabWriter.close();
|
||||
mockMtab.deleteOnExit();
|
||||
return mockMtab;
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testMtabParsing() throws Exception {
|
||||
File parentDir = new File(tmpPath);
|
||||
// create mock cgroup
|
||||
File cpuCgroupMountDir = createMockCgroupMount(parentDir, "cpu",
|
||||
hierarchy);
|
||||
Assert.assertTrue(cpuCgroupMountDir.exists());
|
||||
File blkioCgroupMountDir = createMockCgroupMount(parentDir,
|
||||
"blkio", hierarchy);
|
||||
Assert.assertTrue(blkioCgroupMountDir.exists());
|
||||
File mockMtabFile = createMockMTab(parentDir);
|
||||
Map<CGroupsHandler.CGroupController, String> controllerPaths =
|
||||
CGroupsHandlerImpl.initializeControllerPathsFromMtab(
|
||||
mockMtabFile.getAbsolutePath(), hierarchy);
|
||||
Assert.assertEquals(2, controllerPaths.size());
|
||||
Assert.assertTrue(controllerPaths
|
||||
.containsKey(CGroupsHandler.CGroupController.CPU));
|
||||
Assert.assertTrue(controllerPaths
|
||||
.containsKey(CGroupsHandler.CGroupController.BLKIO));
|
||||
String cpuDir = controllerPaths.get(CGroupsHandler.CGroupController.CPU);
|
||||
String blkioDir =
|
||||
controllerPaths.get(CGroupsHandler.CGroupController.BLKIO);
|
||||
Assert.assertEquals(parentDir.getAbsolutePath() + "/cpu", cpuDir);
|
||||
Assert.assertEquals(parentDir.getAbsolutePath() + "/blkio", blkioDir);
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() {
|
||||
FileUtil.fullyDelete(new File(tmpPath));
|
||||
|
|
|
@ -37,7 +37,7 @@ public class TestResourceHandlerModule {
|
|||
Configuration networkEnabledConf;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
public void setup() throws Exception {
|
||||
emptyConf = new YarnConfiguration();
|
||||
networkEnabledConf = new YarnConfiguration();
|
||||
|
||||
|
@ -46,6 +46,7 @@ public class TestResourceHandlerModule {
|
|||
//We need to bypass mtab parsing for figuring out cgroups mount locations
|
||||
networkEnabledConf.setBoolean(YarnConfiguration
|
||||
.NM_LINUX_CONTAINER_CGROUPS_MOUNT, true);
|
||||
ResourceHandlerModule.nullifyResourceHandlerChain();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -75,4 +76,27 @@ public class TestResourceHandlerModule {
|
|||
Assert.fail("Unexpected ResourceHandlerException: " + e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDiskResourceHandler() throws Exception {
|
||||
|
||||
DiskResourceHandler handler =
|
||||
ResourceHandlerModule.getDiskResourceHandler(emptyConf);
|
||||
Assert.assertNull(handler);
|
||||
|
||||
Configuration diskConf = new YarnConfiguration();
|
||||
diskConf.setBoolean(YarnConfiguration.NM_DISK_RESOURCE_ENABLED, true);
|
||||
|
||||
handler = ResourceHandlerModule.getDiskResourceHandler(diskConf);
|
||||
Assert.assertNotNull(handler);
|
||||
|
||||
ResourceHandlerChain resourceHandlerChain =
|
||||
ResourceHandlerModule.getConfiguredResourceHandlerChain(diskConf);
|
||||
List<ResourceHandler> resourceHandlers =
|
||||
resourceHandlerChain.getResourceHandlerList();
|
||||
// Exactly one resource handler in chain
|
||||
Assert.assertEquals(resourceHandlers.size(), 1);
|
||||
// Same instance is expected to be in the chain.
|
||||
Assert.assertTrue(resourceHandlers.get(0) == handler);
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@ import org.apache.commons.io.FileUtils;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.TestCGroupsHandlerImpl;
|
||||
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
|
||||
import org.junit.Assert;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
@ -33,7 +34,6 @@ import org.mockito.Mockito;
|
|||
import java.io.*;
|
||||
import java.util.List;
|
||||
import java.util.Scanner;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class TestCgroupsLCEResourcesHandler {
|
||||
|
@ -142,7 +142,7 @@ public class TestCgroupsLCEResourcesHandler {
|
|||
|
||||
@Override
|
||||
int[] getOverallLimits(float x) {
|
||||
if (generateLimitsMode == true) {
|
||||
if (generateLimitsMode) {
|
||||
return super.getOverallLimits(x);
|
||||
}
|
||||
return limits;
|
||||
|
@ -172,10 +172,11 @@ public class TestCgroupsLCEResourcesHandler {
|
|||
handler.initConfig();
|
||||
|
||||
// create mock cgroup
|
||||
File cgroupMountDir = createMockCgroupMount(cgroupDir);
|
||||
File cpuCgroupMountDir = TestCGroupsHandlerImpl.createMockCgroupMount(
|
||||
cgroupDir, "cpu");
|
||||
|
||||
// create mock mtab
|
||||
File mockMtab = createMockMTab(cgroupDir);
|
||||
File mockMtab = TestCGroupsHandlerImpl.createMockMTab(cgroupDir);
|
||||
|
||||
// setup our handler and call init()
|
||||
handler.setMtabFile(mockMtab.getAbsolutePath());
|
||||
|
@ -184,8 +185,8 @@ public class TestCgroupsLCEResourcesHandler {
|
|||
// in this case, we're using all cpu so the files
|
||||
// shouldn't exist(because init won't create them
|
||||
handler.init(mockLCE, plugin);
|
||||
File periodFile = new File(cgroupMountDir, "cpu.cfs_period_us");
|
||||
File quotaFile = new File(cgroupMountDir, "cpu.cfs_quota_us");
|
||||
File periodFile = new File(cpuCgroupMountDir, "cpu.cfs_period_us");
|
||||
File quotaFile = new File(cpuCgroupMountDir, "cpu.cfs_quota_us");
|
||||
Assert.assertFalse(periodFile.exists());
|
||||
Assert.assertFalse(quotaFile.exists());
|
||||
|
||||
|
@ -235,7 +236,7 @@ public class TestCgroupsLCEResourcesHandler {
|
|||
Assert.assertEquals(expectedQuota, ret[0]);
|
||||
Assert.assertEquals(-1, ret[1]);
|
||||
|
||||
int[] params = { 0, -1 };
|
||||
int[] params = {0, -1};
|
||||
for (int cores : params) {
|
||||
try {
|
||||
handler.getOverallLimits(cores);
|
||||
|
@ -251,34 +252,6 @@ public class TestCgroupsLCEResourcesHandler {
|
|||
Assert.assertEquals(-1, ret[1]);
|
||||
}
|
||||
|
||||
private File createMockCgroupMount(File cgroupDir) throws IOException {
|
||||
File cgroupMountDir = new File(cgroupDir.getAbsolutePath(), "hadoop-yarn");
|
||||
FileUtils.deleteQuietly(cgroupDir);
|
||||
if (!cgroupMountDir.mkdirs()) {
|
||||
String message =
|
||||
"Could not create dir " + cgroupMountDir.getAbsolutePath();
|
||||
throw new IOException(message);
|
||||
}
|
||||
return cgroupMountDir;
|
||||
}
|
||||
|
||||
private File createMockMTab(File cgroupDir) throws IOException {
|
||||
String mtabContent =
|
||||
"none " + cgroupDir.getAbsolutePath() + " cgroup rw,relatime,cpu 0 0";
|
||||
File mockMtab = new File("target", UUID.randomUUID().toString());
|
||||
if (!mockMtab.exists()) {
|
||||
if (!mockMtab.createNewFile()) {
|
||||
String message = "Could not create file " + mockMtab.getAbsolutePath();
|
||||
throw new IOException(message);
|
||||
}
|
||||
}
|
||||
FileWriter mtabWriter = new FileWriter(mockMtab.getAbsoluteFile());
|
||||
mtabWriter.write(mtabContent);
|
||||
mtabWriter.close();
|
||||
mockMtab.deleteOnExit();
|
||||
return mockMtab;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainerLimits() throws IOException {
|
||||
LinuxContainerExecutor mockLCE = new MockLinuxContainerExecutor();
|
||||
|
@ -286,6 +259,7 @@ public class TestCgroupsLCEResourcesHandler {
|
|||
new CustomCgroupsLCEResourceHandler();
|
||||
handler.generateLimitsMode = true;
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.NM_DISK_RESOURCE_ENABLED, true);
|
||||
final int numProcessors = 4;
|
||||
ResourceCalculatorPlugin plugin =
|
||||
Mockito.mock(ResourceCalculatorPlugin.class);
|
||||
|
@ -294,71 +268,77 @@ public class TestCgroupsLCEResourcesHandler {
|
|||
handler.initConfig();
|
||||
|
||||
// create mock cgroup
|
||||
File cgroupMountDir = createMockCgroupMount(cgroupDir);
|
||||
File cpuCgroupMountDir = TestCGroupsHandlerImpl.createMockCgroupMount(
|
||||
cgroupDir, "cpu");
|
||||
|
||||
// create mock mtab
|
||||
File mockMtab = createMockMTab(cgroupDir);
|
||||
File mockMtab = TestCGroupsHandlerImpl.createMockMTab(cgroupDir);
|
||||
|
||||
// setup our handler and call init()
|
||||
handler.setMtabFile(mockMtab.getAbsolutePath());
|
||||
handler.init(mockLCE, plugin);
|
||||
|
||||
// check values
|
||||
// default case - files shouldn't exist, strict mode off by default
|
||||
// check the controller paths map isn't empty
|
||||
ContainerId id = ContainerId.fromString("container_1_1_1_1");
|
||||
handler.preExecute(id, Resource.newInstance(1024, 1));
|
||||
File containerDir = new File(cgroupMountDir, id.toString());
|
||||
Assert.assertTrue(containerDir.exists());
|
||||
Assert.assertTrue(containerDir.isDirectory());
|
||||
File periodFile = new File(containerDir, "cpu.cfs_period_us");
|
||||
File quotaFile = new File(containerDir, "cpu.cfs_quota_us");
|
||||
Assert.assertNotNull(handler.getControllerPaths());
|
||||
// check values
|
||||
// default case - files shouldn't exist, strict mode off by default
|
||||
File containerCpuDir = new File(cpuCgroupMountDir, id.toString());
|
||||
Assert.assertTrue(containerCpuDir.exists());
|
||||
Assert.assertTrue(containerCpuDir.isDirectory());
|
||||
File periodFile = new File(containerCpuDir, "cpu.cfs_period_us");
|
||||
File quotaFile = new File(containerCpuDir, "cpu.cfs_quota_us");
|
||||
Assert.assertFalse(periodFile.exists());
|
||||
Assert.assertFalse(quotaFile.exists());
|
||||
|
||||
// no files created because we're using all cpu
|
||||
FileUtils.deleteQuietly(containerDir);
|
||||
FileUtils.deleteQuietly(containerCpuDir);
|
||||
conf.setBoolean(
|
||||
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true);
|
||||
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
|
||||
true);
|
||||
handler.initConfig();
|
||||
handler.preExecute(id,
|
||||
Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES));
|
||||
Assert.assertTrue(containerDir.exists());
|
||||
Assert.assertTrue(containerDir.isDirectory());
|
||||
periodFile = new File(containerDir, "cpu.cfs_period_us");
|
||||
quotaFile = new File(containerDir, "cpu.cfs_quota_us");
|
||||
Assert.assertTrue(containerCpuDir.exists());
|
||||
Assert.assertTrue(containerCpuDir.isDirectory());
|
||||
periodFile = new File(containerCpuDir, "cpu.cfs_period_us");
|
||||
quotaFile = new File(containerCpuDir, "cpu.cfs_quota_us");
|
||||
Assert.assertFalse(periodFile.exists());
|
||||
Assert.assertFalse(quotaFile.exists());
|
||||
|
||||
// 50% of CPU
|
||||
FileUtils.deleteQuietly(containerDir);
|
||||
FileUtils.deleteQuietly(containerCpuDir);
|
||||
conf.setBoolean(
|
||||
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true);
|
||||
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
|
||||
true);
|
||||
handler.initConfig();
|
||||
handler.preExecute(id,
|
||||
Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2));
|
||||
Assert.assertTrue(containerDir.exists());
|
||||
Assert.assertTrue(containerDir.isDirectory());
|
||||
periodFile = new File(containerDir, "cpu.cfs_period_us");
|
||||
quotaFile = new File(containerDir, "cpu.cfs_quota_us");
|
||||
Assert.assertTrue(containerCpuDir.exists());
|
||||
Assert.assertTrue(containerCpuDir.isDirectory());
|
||||
periodFile = new File(containerCpuDir, "cpu.cfs_period_us");
|
||||
quotaFile = new File(containerCpuDir, "cpu.cfs_quota_us");
|
||||
Assert.assertTrue(periodFile.exists());
|
||||
Assert.assertTrue(quotaFile.exists());
|
||||
Assert.assertEquals(500 * 1000, readIntFromFile(periodFile));
|
||||
Assert.assertEquals(1000 * 1000, readIntFromFile(quotaFile));
|
||||
|
||||
// CGroups set to 50% of CPU, container set to 50% of YARN CPU
|
||||
FileUtils.deleteQuietly(containerDir);
|
||||
FileUtils.deleteQuietly(containerCpuDir);
|
||||
conf.setBoolean(
|
||||
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true);
|
||||
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
|
||||
true);
|
||||
conf
|
||||
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 50);
|
||||
handler.initConfig();
|
||||
handler.init(mockLCE, plugin);
|
||||
handler.preExecute(id,
|
||||
Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2));
|
||||
Assert.assertTrue(containerDir.exists());
|
||||
Assert.assertTrue(containerDir.isDirectory());
|
||||
periodFile = new File(containerDir, "cpu.cfs_period_us");
|
||||
quotaFile = new File(containerDir, "cpu.cfs_quota_us");
|
||||
Assert.assertTrue(containerCpuDir.exists());
|
||||
Assert.assertTrue(containerCpuDir.isDirectory());
|
||||
periodFile = new File(containerCpuDir, "cpu.cfs_period_us");
|
||||
quotaFile = new File(containerCpuDir, "cpu.cfs_quota_us");
|
||||
Assert.assertTrue(periodFile.exists());
|
||||
Assert.assertTrue(quotaFile.exists());
|
||||
Assert.assertEquals(1000 * 1000, readIntFromFile(periodFile));
|
||||
|
|
Loading…
Reference in New Issue