YARN-5764. NUMA awareness support for launching containers. Contributed by Devaraj K.

This commit is contained in:
Miklos Szegedi 2018-03-13 11:03:27 -07:00
parent 45cccadd2e
commit a82d4a2e3a
12 changed files with 1318 additions and 4 deletions

View File

@ -3585,6 +3585,22 @@ public class YarnConfiguration extends Configuration {
DEFAULT_TIMELINE_SERVICE_COLLECTOR_WEBAPP_HTTPS_ADDRESS =
DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS;
/**
* Settings for NUMA awareness.
*/
public static final String NM_NUMA_AWARENESS_ENABLED = NM_PREFIX
+ "numa-awareness.enabled";
public static final boolean DEFAULT_NM_NUMA_AWARENESS_ENABLED = false;
public static final String NM_NUMA_AWARENESS_READ_TOPOLOGY = NM_PREFIX
+ "numa-awareness.read-topology";
public static final boolean DEFAULT_NM_NUMA_AWARENESS_READ_TOPOLOGY = false;
public static final String NM_NUMA_AWARENESS_NODE_IDS = NM_PREFIX
+ "numa-awareness.node-ids";
public static final String NM_NUMA_AWARENESS_NUMACTL_CMD = NM_PREFIX
+ "numa-awareness.numactl.cmd";
public static final String DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD =
"/usr/bin/numactl";
public YarnConfiguration() {
super();
}
@ -3791,6 +3807,17 @@ public class YarnConfiguration extends Configuration {
YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
}
/**
* Returns whether the NUMA awareness is enabled.
*
* @param conf the configuration
* @return whether the NUMA awareness is enabled.
*/
public static boolean numaAwarenessEnabled(Configuration conf) {
return conf.getBoolean(NM_NUMA_AWARENESS_ENABLED,
DEFAULT_NM_NUMA_AWARENESS_ENABLED);
}
/* For debugging. mp configurations to system output as XML format. */
public static void main(String[] args) throws Exception {
new YarnConfiguration(new Configuration()).writeXml(System.out);

View File

@ -3711,4 +3711,55 @@
<value></value>
</property>
<property>
<description>
Whether to enable the NUMA awareness for containers in Node Manager.
</description>
<name>yarn.nodemanager.numa-awareness.enabled</name>
<value>false</value>
</property>
<property>
<description>
Whether to read the NUMA topology from the system or from the
configurations. If the value is true then NM reads the NUMA topology from
system using the command 'numactl --hardware'. If the value is false then NM
reads the topology from the configurations
'yarn.nodemanager.numa-awareness.node-ids'(for node id's),
'yarn.nodemanager.numa-awareness.&lt;NODE_ID&gt;.memory'(for each node memory),
'yarn.nodemanager.numa-awareness.&lt;NODE_ID&gt;.cpus'(for each node cpus).
</description>
<name>yarn.nodemanager.numa-awareness.read-topology</name>
<value>false</value>
</property>
<property>
<description>
NUMA node id's in the form of comma separated list. Memory and No of CPUs
will be read using the properties
'yarn.nodemanager.numa-awareness.&lt;NODE_ID&gt;.memory' and
'yarn.nodemanager.numa-awareness.&lt;NODE_ID&gt;.cpus' for each id specified
in this value. This property value will be read only when
'yarn.nodemanager.numa-awareness.read-topology=false'.
For example, if yarn.nodemanager.numa-awareness.node-ids=0,1
then need to specify memory and cpus for node id's '0' and '1' like below,
yarn.nodemanager.numa-awareness.0.memory=73717
yarn.nodemanager.numa-awareness.0.cpus=4
yarn.nodemanager.numa-awareness.1.memory=73727
yarn.nodemanager.numa-awareness.1.cpus=4
</description>
<name>yarn.nodemanager.numa-awareness.node-ids</name>
<value></value>
</property>
<property>
<description>
The numactl command path which controls NUMA policy for processes or
shared memory.
</description>
<name>yarn.nodemanager.numa-awareness.numactl.cmd</name>
<value>/usr/bin/numactl</value>
</property>
</configuration>

View File

@ -485,6 +485,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
container.getResource());
String resourcesOptions = resourcesHandler.getResourcesOption(containerId);
String tcCommandFile = null;
List<String> numaArgs = null;
try {
if (resourceHandlerChain != null) {
@ -506,6 +507,9 @@ public class LinuxContainerExecutor extends ContainerExecutor {
case TC_MODIFY_STATE:
tcCommandFile = op.getArguments().get(0);
break;
case ADD_NUMA_PARAMS:
numaArgs = op.getArguments();
break;
default:
LOG.warn("PrivilegedOperation type unsupported in launch: "
+ op.getOperationType());
@ -536,7 +540,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
if (pidFilePath != null) {
ContainerRuntimeContext runtimeContext = buildContainerRuntimeContext(
ctx, pidFilePath, resourcesOptions, tcCommandFile);
ctx, pidFilePath, resourcesOptions, tcCommandFile, numaArgs);
linuxContainerRuntime.launchContainer(runtimeContext);
} else {
@ -610,11 +614,12 @@ public class LinuxContainerExecutor extends ContainerExecutor {
}
private ContainerRuntimeContext buildContainerRuntimeContext(
ContainerStartContext ctx, Path pidFilePath,
String resourcesOptions, String tcCommandFile) {
ContainerStartContext ctx, Path pidFilePath, String resourcesOptions,
String tcCommandFile, List<String> numaArgs) {
List<String> prefixCommands = new ArrayList<>();
addSchedPriorityCommand(prefixCommands);
addNumaArgsToCommand(prefixCommands, numaArgs);
Container container = ctx.getContainer();
@ -662,6 +667,13 @@ public class LinuxContainerExecutor extends ContainerExecutor {
return linuxContainerRuntime.getIpAndHost(container);
}
private void addNumaArgsToCommand(List<String> prefixCommands,
List<String> numaArgs) {
if (numaArgs != null) {
prefixCommands.addAll(numaArgs);
}
}
@Override
public int reacquireContainer(ContainerReacquisitionContext ctx)
throws IOException, InterruptedException {

View File

@ -53,7 +53,8 @@ public class PrivilegedOperation {
RUN_DOCKER_CMD("--run-docker"),
GPU("--module-gpu"),
FPGA("--module-fpga"),
LIST_AS_USER(""); //no CLI switch supported yet.
LIST_AS_USER(""), // no CLI switch supported yet.
ADD_NUMA_PARAMS(""); // no CLI switch supported yet.
private final String option;

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceHandlerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler;
@ -253,6 +254,14 @@ public class ResourceHandlerModule {
return cGroupsMemoryResourceHandler;
}
private static ResourceHandler getNumaResourceHandler(Configuration conf,
Context nmContext) {
if (YarnConfiguration.numaAwarenessEnabled(conf)) {
return new NumaResourceHandlerImpl(conf, nmContext);
}
return null;
}
private static void addHandlerIfNotNull(List<ResourceHandler> handlerList,
ResourceHandler handler) {
if (handler != null) {
@ -273,6 +282,7 @@ public class ResourceHandlerModule {
initMemoryResourceHandler(conf));
addHandlerIfNotNull(handlerList,
initCGroupsCpuResourceHandler(conf));
addHandlerIfNotNull(handlerList, getNumaResourceHandler(conf, nmContext));
addHandlersFromConfiguredResourcePlugins(handlerList, conf, nmContext);
resourceHandlerChain = new ResourceHandlerChain(handlerList);
}

View File

@ -0,0 +1,204 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
/**
* NumaNodeResource class holds the NUMA node topology with the total and used
* resources.
*/
public class NumaNodeResource {
private String nodeId;
private long totalMemory;
private int totalCpus;
private long usedMemory;
private int usedCpus;
private static final Log LOG = LogFactory.getLog(NumaNodeResource.class);
private Map<ContainerId, Long> containerVsMemUsage =
new ConcurrentHashMap<>();
private Map<ContainerId, Integer> containerVsCpusUsage =
new ConcurrentHashMap<>();
public NumaNodeResource(String nodeId, long totalMemory, int totalCpus) {
this.nodeId = nodeId;
this.totalMemory = totalMemory;
this.totalCpus = totalCpus;
}
/**
* Checks whether the specified resources available or not.
*
* @param resource resource
* @return whether the specified resources available or not
*/
public boolean isResourcesAvailable(Resource resource) {
LOG.debug(
"Memory available:" + (totalMemory - usedMemory) + ", CPUs available:"
+ (totalCpus - usedCpus) + ", requested:" + resource);
if ((totalMemory - usedMemory) >= resource.getMemorySize()
&& (totalCpus - usedCpus) >= resource.getVirtualCores()) {
return true;
}
return false;
}
/**
* Assigns available memory and returns the remaining needed memory.
*
* @param memreq required memory
* @param containerId which container memory to assign
* @return remaining needed memory
*/
public long assignAvailableMemory(long memreq, ContainerId containerId) {
long memAvailable = totalMemory - usedMemory;
if (memAvailable >= memreq) {
containerVsMemUsage.put(containerId, memreq);
usedMemory += memreq;
return 0;
} else {
usedMemory += memAvailable;
containerVsMemUsage.put(containerId, memAvailable);
return memreq - memAvailable;
}
}
/**
* Assigns available cpu's and returns the remaining needed cpu's.
*
* @param cpusreq required cpu's
* @param containerId which container cpu's to assign
* @return remaining needed cpu's
*/
public int assignAvailableCpus(int cpusreq, ContainerId containerId) {
int cpusAvailable = totalCpus - usedCpus;
if (cpusAvailable >= cpusreq) {
containerVsCpusUsage.put(containerId, cpusreq);
usedCpus += cpusreq;
return 0;
} else {
usedCpus += cpusAvailable;
containerVsCpusUsage.put(containerId, cpusAvailable);
return cpusreq - cpusAvailable;
}
}
/**
* Assigns the requested resources for Container.
*
* @param resource resource to assign
* @param containerId to which container the resources to assign
*/
public void assignResources(Resource resource, ContainerId containerId) {
containerVsMemUsage.put(containerId, resource.getMemorySize());
containerVsCpusUsage.put(containerId, resource.getVirtualCores());
usedMemory += resource.getMemorySize();
usedCpus += resource.getVirtualCores();
}
/**
* Releases the assigned resources for Container.
*
* @param containerId to which container the assigned resources to release
*/
public void releaseResources(ContainerId containerId) {
if (containerVsMemUsage.containsKey(containerId)) {
usedMemory -= containerVsMemUsage.get(containerId);
containerVsMemUsage.remove(containerId);
}
if (containerVsCpusUsage.containsKey(containerId)) {
usedCpus -= containerVsCpusUsage.get(containerId);
containerVsCpusUsage.remove(containerId);
}
}
/**
* Recovers the memory resources for Container.
*
* @param containerId recover the memory resources for the Container
* @param memory memory to recover
*/
public void recoverMemory(ContainerId containerId, long memory) {
containerVsMemUsage.put(containerId, memory);
usedMemory += memory;
}
/**
* Recovers the cpu's resources for Container.
*
* @param containerId recover the cpu's resources for the Container
* @param cpus cpu's to recover
*/
public void recoverCpus(ContainerId containerId, int cpus) {
containerVsCpusUsage.put(containerId, cpus);
usedCpus += cpus;
}
@Override
public String toString() {
return "Node Id:" + nodeId + "\tMemory:" + totalMemory + "\tCPus:"
+ totalCpus;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
result = prime * result + (int) (totalMemory ^ (totalMemory >>> 32));
result = prime * result + totalCpus;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
NumaNodeResource other = (NumaNodeResource) obj;
if (nodeId == null) {
if (other.nodeId != null) {
return false;
}
} else if (!nodeId.equals(other.nodeId)) {
return false;
}
if (totalMemory != other.totalMemory) {
return false;
}
if (totalCpus != other.totalCpus) {
return false;
}
return true;
}
public String getNodeId() {
return nodeId;
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* NumaResourceAllocation contains Memory nodes and CPU nodes assigned to a
* container.
*/
public class NumaResourceAllocation implements Serializable {
private static final long serialVersionUID = 6339719798446595123L;
private Map<String, Long> nodeVsMemory;
private Map<String, Integer> nodeVsCpus;
public NumaResourceAllocation() {
nodeVsMemory = new HashMap<>();
nodeVsCpus = new HashMap<>();
}
public NumaResourceAllocation(String memNodeId, long memory, String cpuNodeId,
int cpus) {
this();
nodeVsMemory.put(memNodeId, memory);
nodeVsCpus.put(cpuNodeId, cpus);
}
public void addMemoryNode(String memNodeId, long memory) {
nodeVsMemory.put(memNodeId, memory);
}
public void addCpuNode(String cpuNodeId, int cpus) {
nodeVsCpus.put(cpuNodeId, cpus);
}
public Set<String> getMemNodes() {
return nodeVsMemory.keySet();
}
public Set<String> getCpuNodes() {
return nodeVsCpus.keySet();
}
public Map<String, Long> getNodeVsMemory() {
return nodeVsMemory;
}
public Map<String, Integer> getNodeVsCpus() {
return nodeVsCpus;
}
}

View File

@ -0,0 +1,342 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import com.google.common.annotations.VisibleForTesting;
/**
* NUMA Resources Allocator reads the NUMA topology and assigns NUMA nodes to
* the containers.
*/
public class NumaResourceAllocator {
private static final Log LOG = LogFactory.getLog(NumaResourceAllocator.class);
// Regex to find node ids, Ex: 'available: 2 nodes (0-1)'
private static final String NUMA_NODEIDS_REGEX =
"available:\\s*[0-9]+\\s*nodes\\s*\\(([0-9\\-,]*)\\)";
// Regex to find node memory, Ex: 'node 0 size: 73717 MB'
private static final String NUMA_NODE_MEMORY_REGEX =
"node\\s*<NUMA-NODE>\\s*size:\\s*([0-9]+)\\s*([KMG]B)";
// Regex to find node cpus, Ex: 'node 0 cpus: 0 2 4 6'
private static final String NUMA_NODE_CPUS_REGEX =
"node\\s*<NUMA-NODE>\\s*cpus:\\s*([0-9\\s]+)";
private static final String GB = "GB";
private static final String KB = "KB";
private static final String NUMA_NODE = "<NUMA-NODE>";
private static final String SPACE = "\\s";
private static final long DEFAULT_NUMA_NODE_MEMORY = 1024;
private static final int DEFAULT_NUMA_NODE_CPUS = 1;
private static final String NUMA_RESOURCE_TYPE = "numa";
private List<NumaNodeResource> numaNodesList = new ArrayList<>();
private Map<String, NumaNodeResource> numaNodeIdVsResource = new HashMap<>();
private int currentAssignNode;
private Context context;
public NumaResourceAllocator(Context context) {
this.context = context;
}
public void init(Configuration conf) throws YarnException {
if (conf.getBoolean(YarnConfiguration.NM_NUMA_AWARENESS_READ_TOPOLOGY,
YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_READ_TOPOLOGY)) {
LOG.info("Reading NUMA topology using 'numactl --hardware' command.");
String cmdOutput = executeNGetCmdOutput(conf);
String[] outputLines = cmdOutput.split("\\n");
Pattern pattern = Pattern.compile(NUMA_NODEIDS_REGEX);
String nodeIdsStr = null;
for (String line : outputLines) {
Matcher matcher = pattern.matcher(line);
if (matcher.find()) {
nodeIdsStr = matcher.group(1);
break;
}
}
if (nodeIdsStr == null) {
throw new YarnException("Failed to get numa nodes from"
+ " 'numactl --hardware' output and output is:\n" + cmdOutput);
}
String[] nodeIdCommaSplits = nodeIdsStr.split("[,\\s]");
for (String nodeIdOrRange : nodeIdCommaSplits) {
if (nodeIdOrRange.contains("-")) {
String[] beginNEnd = nodeIdOrRange.split("-");
int endNode = Integer.parseInt(beginNEnd[1]);
for (int nodeId = Integer
.parseInt(beginNEnd[0]); nodeId <= endNode; nodeId++) {
long memory = parseMemory(outputLines, String.valueOf(nodeId));
int cpus = parseCpus(outputLines, String.valueOf(nodeId));
addToCollection(String.valueOf(nodeId), memory, cpus);
}
} else {
long memory = parseMemory(outputLines, nodeIdOrRange);
int cpus = parseCpus(outputLines, nodeIdOrRange);
addToCollection(nodeIdOrRange, memory, cpus);
}
}
} else {
LOG.info("Reading NUMA topology using configurations.");
Collection<String> nodeIds = conf
.getStringCollection(YarnConfiguration.NM_NUMA_AWARENESS_NODE_IDS);
for (String nodeId : nodeIds) {
long mem = conf.getLong(
"yarn.nodemanager.numa-awareness." + nodeId + ".memory",
DEFAULT_NUMA_NODE_MEMORY);
int cpus = conf.getInt(
"yarn.nodemanager.numa-awareness." + nodeId + ".cpus",
DEFAULT_NUMA_NODE_CPUS);
addToCollection(nodeId, mem, cpus);
}
}
if (numaNodesList.isEmpty()) {
throw new YarnException("There are no available NUMA nodes"
+ " for making containers NUMA aware.");
}
LOG.info("Available numa nodes with capacities : " + numaNodesList.size());
}
@VisibleForTesting
String executeNGetCmdOutput(Configuration conf) throws YarnException {
String numaCtlCmd = conf.get(
YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD,
YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD);
String[] args = new String[] {numaCtlCmd, "--hardware"};
ShellCommandExecutor shExec = new ShellCommandExecutor(args);
try {
shExec.execute();
} catch (IOException e) {
throw new YarnException("Failed to read the numa configurations.", e);
}
return shExec.getOutput();
}
private int parseCpus(String[] outputLines, String nodeId) {
int cpus = 0;
Pattern patternNodeCPUs = Pattern
.compile(NUMA_NODE_CPUS_REGEX.replace(NUMA_NODE, nodeId));
for (String line : outputLines) {
Matcher matcherNodeCPUs = patternNodeCPUs.matcher(line);
if (matcherNodeCPUs.find()) {
String cpusStr = matcherNodeCPUs.group(1);
cpus = cpusStr.split(SPACE).length;
break;
}
}
return cpus;
}
private long parseMemory(String[] outputLines, String nodeId)
throws YarnException {
long memory = 0;
String units;
Pattern patternNodeMem = Pattern
.compile(NUMA_NODE_MEMORY_REGEX.replace(NUMA_NODE, nodeId));
for (String line : outputLines) {
Matcher matcherNodeMem = patternNodeMem.matcher(line);
if (matcherNodeMem.find()) {
try {
memory = Long.parseLong(matcherNodeMem.group(1));
units = matcherNodeMem.group(2);
if (GB.equals(units)) {
memory = memory * 1024;
} else if (KB.equals(units)) {
memory = memory / 1024;
}
} catch (Exception ex) {
throw new YarnException("Failed to get memory for node:" + nodeId,
ex);
}
break;
}
}
return memory;
}
private void addToCollection(String nodeId, long memory, int cpus) {
NumaNodeResource numaNode = new NumaNodeResource(nodeId, memory, cpus);
numaNodesList.add(numaNode);
numaNodeIdVsResource.put(nodeId, numaNode);
}
/**
* Allocates the available NUMA nodes for the requested containerId with
* resource in a round robin fashion.
*
* @param container the container to allocate NUMA resources
* @return the assigned NUMA Node info or null if resources not available.
* @throws ResourceHandlerException when failed to store NUMA resources
*/
public synchronized NumaResourceAllocation allocateNumaNodes(
Container container) throws ResourceHandlerException {
NumaResourceAllocation allocation = allocate(container.getContainerId(),
container.getResource());
if (allocation != null) {
try {
// Update state store.
context.getNMStateStore().storeAssignedResources(container,
NUMA_RESOURCE_TYPE, Arrays.asList(allocation));
} catch (IOException e) {
releaseNumaResource(container.getContainerId());
throw new ResourceHandlerException(e);
}
}
return allocation;
}
private NumaResourceAllocation allocate(ContainerId containerId,
Resource resource) {
for (int index = 0; index < numaNodesList.size(); index++) {
NumaNodeResource numaNode = numaNodesList
.get((currentAssignNode + index) % numaNodesList.size());
if (numaNode.isResourcesAvailable(resource)) {
numaNode.assignResources(resource, containerId);
LOG.info("Assigning NUMA node " + numaNode.getNodeId() + " for memory, "
+ numaNode.getNodeId() + " for cpus for the " + containerId);
currentAssignNode = (currentAssignNode + index + 1)
% numaNodesList.size();
return new NumaResourceAllocation(numaNode.getNodeId(),
resource.getMemorySize(), numaNode.getNodeId(),
resource.getVirtualCores());
}
}
// If there is no single node matched for the container resource
// Check the NUMA nodes for Memory resources
NumaResourceAllocation assignedNumaNodeInfo = new NumaResourceAllocation();
long memreq = resource.getMemorySize();
for (NumaNodeResource numaNode : numaNodesList) {
long memrem = numaNode.assignAvailableMemory(memreq, containerId);
assignedNumaNodeInfo.addMemoryNode(numaNode.getNodeId(), memreq - memrem);
memreq = memrem;
if (memreq == 0) {
break;
}
}
if (memreq != 0) {
LOG.info("There is no available memory:" + resource.getMemorySize()
+ " in numa nodes for " + containerId);
releaseNumaResource(containerId);
return null;
}
// Check the NUMA nodes for CPU resources
int cpusreq = resource.getVirtualCores();
for (int index = 0; index < numaNodesList.size(); index++) {
NumaNodeResource numaNode = numaNodesList
.get((currentAssignNode + index) % numaNodesList.size());
int cpusrem = numaNode.assignAvailableCpus(cpusreq, containerId);
assignedNumaNodeInfo.addCpuNode(numaNode.getNodeId(), cpusreq - cpusrem);
cpusreq = cpusrem;
if (cpusreq == 0) {
currentAssignNode = (currentAssignNode + index + 1)
% numaNodesList.size();
break;
}
}
if (cpusreq != 0) {
LOG.info("There are no available cpus:" + resource.getVirtualCores()
+ " in numa nodes for " + containerId);
releaseNumaResource(containerId);
return null;
}
LOG.info("Assigning multiple NUMA nodes ("
+ StringUtils.join(",", assignedNumaNodeInfo.getMemNodes())
+ ") for memory, ("
+ StringUtils.join(",", assignedNumaNodeInfo.getCpuNodes())
+ ") for cpus for " + containerId);
return assignedNumaNodeInfo;
}
/**
* Release assigned NUMA resources for the container.
*
* @param containerId the container ID
*/
public synchronized void releaseNumaResource(ContainerId containerId) {
LOG.info("Releasing the assigned NUMA resources for " + containerId);
for (NumaNodeResource numaNode : numaNodesList) {
numaNode.releaseResources(containerId);
}
}
/**
* Recovers assigned numa resources.
*
* @param containerId the container ID to recover resources
*/
public synchronized void recoverNumaResource(ContainerId containerId) {
Container container = context.getContainers().get(containerId);
ResourceMappings resourceMappings = container.getResourceMappings();
List<Serializable> assignedResources = resourceMappings
.getAssignedResources(NUMA_RESOURCE_TYPE);
if (assignedResources.size() == 1) {
NumaResourceAllocation numaResourceAllocation =
(NumaResourceAllocation) assignedResources.get(0);
for (Entry<String, Long> nodeAndMemory : numaResourceAllocation
.getNodeVsMemory().entrySet()) {
numaNodeIdVsResource.get(nodeAndMemory.getKey())
.recoverMemory(containerId, nodeAndMemory.getValue());
}
for (Entry<String, Integer> nodeAndCpus : numaResourceAllocation
.getNodeVsCpus().entrySet()) {
numaNodeIdVsResource.get(nodeAndCpus.getKey()).recoverCpus(containerId,
nodeAndCpus.getValue());
}
} else {
LOG.error("Unexpected number:" + assignedResources.size()
+ " of assigned numa resources for " + containerId
+ " while recovering.");
}
}
@VisibleForTesting
Collection<NumaNodeResource> getNumaNodesList() {
return numaNodesList;
}
}

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation.OperationType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
/**
* ResourceHandler implementation for allocating NUMA Resources to each
* container.
*/
public class NumaResourceHandlerImpl implements ResourceHandler {
private static final Log LOG = LogFactory
.getLog(NumaResourceHandlerImpl.class);
private NumaResourceAllocator numaResourceAllocator;
private String numaCtlCmd;
public NumaResourceHandlerImpl(Configuration conf, Context nmContext) {
LOG.info("NUMA resources allocation is enabled, initializing NUMA resources"
+ " allocator.");
numaResourceAllocator = new NumaResourceAllocator(nmContext);
numaCtlCmd = conf.get(YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD,
YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD);
}
@Override
public List<PrivilegedOperation> bootstrap(Configuration configuration)
throws ResourceHandlerException {
try {
numaResourceAllocator.init(configuration);
} catch (YarnException e) {
throw new ResourceHandlerException(e);
}
return null;
}
@Override
public List<PrivilegedOperation> preStart(Container container)
throws ResourceHandlerException {
List<PrivilegedOperation> ret = null;
NumaResourceAllocation numaAllocation = numaResourceAllocator
.allocateNumaNodes(container);
if (numaAllocation != null) {
ret = new ArrayList<>();
ArrayList<String> args = new ArrayList<>();
args.add(numaCtlCmd);
args.add(
"--interleave=" + String.join(",", numaAllocation.getMemNodes()));
args.add(
"--cpunodebind=" + String.join(",", numaAllocation.getCpuNodes()));
ret.add(new PrivilegedOperation(OperationType.ADD_NUMA_PARAMS, args));
}
return ret;
}
@Override
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
throws ResourceHandlerException {
try {
numaResourceAllocator.recoverNumaResource(containerId);
} catch (Throwable e) {
throw new ResourceHandlerException(
"Failed to recover numa resource for " + containerId, e);
}
return null;
}
@Override
public List<PrivilegedOperation> postComplete(ContainerId containerId)
throws ResourceHandlerException {
numaResourceAllocator.releaseNumaResource(containerId);
return null;
}
@Override
public List<PrivilegedOperation> teardown() throws ResourceHandlerException {
return null;
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.
* resources.numa contains classes related to NM local scheduler allocators.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -0,0 +1,281 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings.AssignedResources;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
/**
* Test class for NumaResourceAllocator.
*/
public class TestNumaResourceAllocator {
private Configuration conf;
private NumaResourceAllocator numaResourceAllocator;
@Before
public void setUp() throws IOException, YarnException {
conf = new YarnConfiguration();
Context mockContext = mock(Context.class);
@SuppressWarnings("unchecked")
ConcurrentHashMap<ContainerId, Container> mockContainers = mock(
ConcurrentHashMap.class);
Container mockContainer = mock(Container.class);
when(mockContainer.getResourceMappings())
.thenReturn(new ResourceMappings());
when(mockContainers.get(Matchers.any())).thenReturn(mockContainer);
when(mockContext.getContainers()).thenReturn(mockContainers);
NMStateStoreService mock = mock(NMStateStoreService.class);
when(mockContext.getNMStateStore()).thenReturn(mock);
numaResourceAllocator = new NumaResourceAllocator(mockContext);
setNumaTopologyConfigs();
numaResourceAllocator.init(conf);
}
@Test
public void testReadNumaTopologyFromConfigurations() throws Exception {
Collection<NumaNodeResource> nodesList = numaResourceAllocator
.getNumaNodesList();
Collection<NumaNodeResource> expectedNodesList = getExpectedNumaNodesList();
Assert.assertEquals(expectedNodesList, nodesList);
}
@Test
public void testReadNumaTopologyFromCmdOutput() throws Exception {
conf.setBoolean(YarnConfiguration.NM_NUMA_AWARENESS_READ_TOPOLOGY, true);
String cmdOutput = "available: 2 nodes (0-1)\n\t"
+ "node 0 cpus: 0 2 4 6\n\t"
+ "node 0 size: 73717 MB\n\t"
+ "node 0 free: 17272 MB\n\t"
+ "node 1 cpus: 1 3 5 7\n\t"
+ "node 1 size: 73727 MB\n\t"
+ "node 1 free: 10699 MB\n\t"
+ "node distances:\n\t"
+ "node 0 1\n\t"
+ "0: 10 20\n\t"
+ "1: 20 10";
numaResourceAllocator = new NumaResourceAllocator(mock(Context.class)) {
@Override
String executeNGetCmdOutput(Configuration config)
throws YarnRuntimeException {
return cmdOutput;
}
};
numaResourceAllocator.init(conf);
Collection<NumaNodeResource> nodesList = numaResourceAllocator
.getNumaNodesList();
Collection<NumaNodeResource> expectedNodesList = getExpectedNumaNodesList();
Assert.assertEquals(expectedNodesList, nodesList);
}
@Test
public void testAllocateNumaNode() throws Exception {
NumaResourceAllocation nodeInfo = numaResourceAllocator
.allocateNumaNodes(getContainer(
ContainerId.fromString("container_1481156246874_0001_01_000001"),
Resource.newInstance(2048, 2)));
Assert.assertEquals("0", String.join(",", nodeInfo.getMemNodes()));
Assert.assertEquals("0", String.join(",", nodeInfo.getCpuNodes()));
}
@Test
public void testAllocateNumaNodeWithRoundRobinFashionAssignment()
throws Exception {
NumaResourceAllocation nodeInfo1 = numaResourceAllocator
.allocateNumaNodes(getContainer(
ContainerId.fromString("container_1481156246874_0001_01_000001"),
Resource.newInstance(2048, 2)));
Assert.assertEquals("0", String.join(",", nodeInfo1.getMemNodes()));
Assert.assertEquals("0", String.join(",", nodeInfo1.getCpuNodes()));
NumaResourceAllocation nodeInfo2 = numaResourceAllocator
.allocateNumaNodes(getContainer(
ContainerId.fromString("container_1481156246874_0001_01_000002"),
Resource.newInstance(2048, 2)));
Assert.assertEquals("1", String.join(",", nodeInfo2.getMemNodes()));
Assert.assertEquals("1", String.join(",", nodeInfo2.getCpuNodes()));
NumaResourceAllocation nodeInfo3 = numaResourceAllocator
.allocateNumaNodes(getContainer(
ContainerId.fromString("container_1481156246874_0001_01_000003"),
Resource.newInstance(2048, 2)));
Assert.assertEquals("0", String.join(",", nodeInfo3.getMemNodes()));
Assert.assertEquals("0", String.join(",", nodeInfo3.getCpuNodes()));
NumaResourceAllocation nodeInfo4 = numaResourceAllocator
.allocateNumaNodes(getContainer(
ContainerId.fromString("container_1481156246874_0001_01_000003"),
Resource.newInstance(2048, 2)));
Assert.assertEquals("1", String.join(",", nodeInfo4.getMemNodes()));
Assert.assertEquals("1", String.join(",", nodeInfo4.getCpuNodes()));
}
@Test
public void testAllocateNumaNodeWithMultipleNodesForMemory()
throws Exception {
NumaResourceAllocation nodeInfo = numaResourceAllocator
.allocateNumaNodes(getContainer(
ContainerId.fromString("container_1481156246874_0001_01_000001"),
Resource.newInstance(102400, 2)));
Assert.assertEquals("0,1", String.join(",", nodeInfo.getMemNodes()));
Assert.assertEquals("0", String.join(",", nodeInfo.getCpuNodes()));
}
@Test
public void testAllocateNumaNodeWithMultipleNodesForCpus() throws Exception {
NumaResourceAllocation nodeInfo = numaResourceAllocator
.allocateNumaNodes(getContainer(
ContainerId.fromString("container_1481156246874_0001_01_000001"),
Resource.newInstance(2048, 6)));
Assert.assertEquals("0", String.join(",", nodeInfo.getMemNodes()));
Assert.assertEquals("0,1", String.join(",", nodeInfo.getCpuNodes()));
}
@Test
public void testAllocateNumaNodeWhenNoNumaMemResourcesAvailable()
throws Exception {
NumaResourceAllocation nodeInfo = numaResourceAllocator
.allocateNumaNodes(getContainer(
ContainerId.fromString("container_1481156246874_0001_01_000001"),
Resource.newInstance(2048000, 6)));
Assert.assertNull("Should not assign numa nodes when there"
+ " are no sufficient memory resources available.", nodeInfo);
}
@Test
public void testAllocateNumaNodeWhenNoNumaCpuResourcesAvailable()
throws Exception {
NumaResourceAllocation nodeInfo = numaResourceAllocator
.allocateNumaNodes(getContainer(
ContainerId.fromString("container_1481156246874_0001_01_000001"),
Resource.newInstance(2048, 600)));
Assert.assertNull("Should not assign numa nodes when there"
+ " are no sufficient cpu resources available.", nodeInfo);
}
@Test
public void testReleaseNumaResourcess() throws Exception {
NumaResourceAllocation nodeInfo = numaResourceAllocator
.allocateNumaNodes(getContainer(
ContainerId.fromString("container_1481156246874_0001_01_000001"),
Resource.newInstance(2048, 8)));
Assert.assertEquals("0", String.join(",", nodeInfo.getMemNodes()));
Assert.assertEquals("0,1", String.join(",", nodeInfo.getCpuNodes()));
// Request the resource when all cpu nodes occupied
nodeInfo = numaResourceAllocator.allocateNumaNodes(getContainer(
ContainerId.fromString("container_1481156246874_0001_01_000002"),
Resource.newInstance(2048, 4)));
Assert.assertNull("Should not assign numa nodes when there"
+ " are no sufficient cpu resources available.", nodeInfo);
// Release the resources
numaResourceAllocator.releaseNumaResource(
ContainerId.fromString("container_1481156246874_0001_01_000001"));
// Request the resources
nodeInfo = numaResourceAllocator.allocateNumaNodes(getContainer(
ContainerId.fromString("container_1481156246874_0001_01_000003"),
Resource.newInstance(1024, 2)));
Assert.assertEquals("0", String.join(",", nodeInfo.getMemNodes()));
Assert.assertEquals("0", String.join(",", nodeInfo.getCpuNodes()));
}
@Test
public void testRecoverNumaResource() throws Exception {
@SuppressWarnings("unchecked")
ConcurrentHashMap<ContainerId, Container> mockContainers = mock(
ConcurrentHashMap.class);
Context mockContext = mock(Context.class);
Container mockContainer = mock(Container.class);
ResourceMappings value = new ResourceMappings();
AssignedResources assignedResources = new AssignedResources();
assignedResources.updateAssignedResources(
Arrays.asList(new NumaResourceAllocation("0", 70000, "0", 4)));
value.addAssignedResources("numa", assignedResources);
when(mockContainer.getResourceMappings()).thenReturn(value);
when(mockContainers.get(Matchers.any())).thenReturn(mockContainer);
when(mockContext.getContainers()).thenReturn(mockContainers);
NMStateStoreService mock = mock(NMStateStoreService.class);
when(mockContext.getNMStateStore()).thenReturn(mock);
numaResourceAllocator = new NumaResourceAllocator(mockContext);
numaResourceAllocator.init(conf);
// Recover the resources
numaResourceAllocator.recoverNumaResource(
ContainerId.fromString("container_1481156246874_0001_01_000001"));
// Request resources based on the availability
NumaResourceAllocation numaNode = numaResourceAllocator
.allocateNumaNodes(getContainer(
ContainerId.fromString("container_1481156246874_0001_01_000005"),
Resource.newInstance(2048, 1)));
assertEquals("1", String.join(",", numaNode.getMemNodes()));
assertEquals("1", String.join(",", numaNode.getCpuNodes()));
// Request resources more than the available
numaNode = numaResourceAllocator.allocateNumaNodes(getContainer(
ContainerId.fromString("container_1481156246874_0001_01_000006"),
Resource.newInstance(2048, 4)));
assertNull(numaNode);
}
private void setNumaTopologyConfigs() {
conf.set(YarnConfiguration.NM_NUMA_AWARENESS_NODE_IDS, "0,1");
conf.set("yarn.nodemanager.numa-awareness.0.memory", "73717");
conf.set("yarn.nodemanager.numa-awareness.0.cpus", "4");
conf.set("yarn.nodemanager.numa-awareness.1.memory", "73727");
conf.set("yarn.nodemanager.numa-awareness.1.cpus", "4");
}
private Collection<NumaNodeResource> getExpectedNumaNodesList() {
Collection<NumaNodeResource> expectedNodesList = new ArrayList<>(2);
expectedNodesList.add(new NumaNodeResource("0", 73717, 4));
expectedNodesList.add(new NumaNodeResource("1", 73727, 4));
return expectedNodesList;
}
private Container getContainer(ContainerId containerId, Resource resource) {
Container mockContainer = mock(Container.class);
when(mockContainer.getContainerId()).thenReturn(containerId);
when(mockContainer.getResource()).thenReturn(resource);
return mockContainer;
}
}

View File

@ -0,0 +1,181 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings.AssignedResources;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
/**
* Test class for NumaResourceHandlerImpl.
*
*/
public class TestNumaResourceHandlerImpl {
private YarnConfiguration conf;
private NumaResourceHandlerImpl numaResourceHandler;
private Container mockContainer;
@Before
public void setUp() throws IOException, ResourceHandlerException {
conf = new YarnConfiguration();
setNumaTopologyConfigs();
Context mockContext = createAndGetMockContext();
NMStateStoreService mock = mock(NMStateStoreService.class);
when(mockContext.getNMStateStore()).thenReturn(mock);
numaResourceHandler = new NumaResourceHandlerImpl(conf, mockContext);
numaResourceHandler.bootstrap(conf);
mockContainer = mock(Container.class);
}
@Test
public void testAllocateNumaMemoryResource() throws ResourceHandlerException {
// allocates node 0 for memory and cpu
testAllocateNumaResource("container_1481156246874_0001_01_000001",
Resource.newInstance(2048, 2), "0", "0");
// allocates node 1 for memory and cpu since allocator uses round
// robin assignment
testAllocateNumaResource("container_1481156246874_0001_01_000002",
Resource.newInstance(60000, 2), "1", "1");
// allocates node 0,1 for memory since there is no sufficient memory in any
// one node
testAllocateNumaResource("container_1481156246874_0001_01_000003",
Resource.newInstance(80000, 2), "0,1", "0");
// returns null since there are no sufficient resources available for the
// request
when(mockContainer.getContainerId()).thenReturn(
ContainerId.fromString("container_1481156246874_0001_01_000004"));
when(mockContainer.getResource())
.thenReturn(Resource.newInstance(80000, 2));
assertNull(numaResourceHandler.preStart(mockContainer));
// allocates node 1 for memory and cpu
testAllocateNumaResource("container_1481156246874_0001_01_000005",
Resource.newInstance(1024, 2), "1", "1");
}
@Test
public void testAllocateNumaCpusResource() throws ResourceHandlerException {
// allocates node 0 for memory and cpu
testAllocateNumaResource("container_1481156246874_0001_01_000001",
Resource.newInstance(2048, 2), "0", "0");
// allocates node 1 for memory and cpu since allocator uses round
// robin assignment
testAllocateNumaResource("container_1481156246874_0001_01_000002",
Resource.newInstance(2048, 2), "1", "1");
// allocates node 0,1 for cpus since there is are no sufficient cpus
// available in any one node
testAllocateNumaResource("container_1481156246874_0001_01_000003",
Resource.newInstance(2048, 3), "0", "0,1");
// returns null since there are no sufficient resources available for the
// request
when(mockContainer.getContainerId()).thenReturn(
ContainerId.fromString("container_1481156246874_0001_01_000004"));
when(mockContainer.getResource()).thenReturn(Resource.newInstance(2048, 2));
assertNull(numaResourceHandler.preStart(mockContainer));
// allocates node 1 for memory and cpu
testAllocateNumaResource("container_1481156246874_0001_01_000005",
Resource.newInstance(2048, 1), "1", "1");
}
@Test
public void testReacquireContainer() throws Exception {
@SuppressWarnings("unchecked")
ConcurrentHashMap<ContainerId, Container> mockContainers = mock(
ConcurrentHashMap.class);
Context mockContext = mock(Context.class);
NMStateStoreService mock = mock(NMStateStoreService.class);
when(mockContext.getNMStateStore()).thenReturn(mock);
ResourceMappings resourceMappings = new ResourceMappings();
AssignedResources assignedRscs = new AssignedResources();
NumaResourceAllocation numaResourceAllocation = new NumaResourceAllocation(
"0", 70000, "0", 4);
assignedRscs.updateAssignedResources(Arrays.asList(numaResourceAllocation));
resourceMappings.addAssignedResources("numa", assignedRscs);
when(mockContainer.getResourceMappings()).thenReturn(resourceMappings);
when(mockContainers.get(Matchers.any())).thenReturn(mockContainer);
when(mockContext.getContainers()).thenReturn(mockContainers);
numaResourceHandler = new NumaResourceHandlerImpl(conf, mockContext);
numaResourceHandler.bootstrap(conf);
// recovered numa resources should be added to the used resources and
// remaining will be available for further allocation.
numaResourceHandler.reacquireContainer(
ContainerId.fromString("container_1481156246874_0001_01_000001"));
testAllocateNumaResource("container_1481156246874_0001_01_000005",
Resource.newInstance(2048, 1), "1", "1");
when(mockContainer.getContainerId()).thenReturn(
ContainerId.fromString("container_1481156246874_0001_01_000005"));
when(mockContainer.getResource()).thenReturn(Resource.newInstance(2048, 4));
List<PrivilegedOperation> preStart = numaResourceHandler
.preStart(mockContainer);
assertNull(preStart);
}
private void setNumaTopologyConfigs() {
conf.set(YarnConfiguration.NM_NUMA_AWARENESS_NODE_IDS, "0,1");
conf.set("yarn.nodemanager.numa-awareness.0.memory", "73717");
conf.set("yarn.nodemanager.numa-awareness.0.cpus", "4");
conf.set("yarn.nodemanager.numa-awareness.1.memory", "73727");
conf.set("yarn.nodemanager.numa-awareness.1.cpus", "4");
}
private Context createAndGetMockContext() {
Context mockContext = mock(Context.class);
@SuppressWarnings("unchecked")
ConcurrentHashMap<ContainerId, Container> mockContainers = mock(
ConcurrentHashMap.class);
mockContainer = mock(Container.class);
when(mockContainer.getResourceMappings())
.thenReturn(new ResourceMappings());
when(mockContainers.get(Matchers.any())).thenReturn(mockContainer);
when(mockContext.getContainers()).thenReturn(mockContainers);
return mockContext;
}
private void testAllocateNumaResource(String containerId, Resource resource,
String memNodes, String cpuNodes) throws ResourceHandlerException {
when(mockContainer.getContainerId())
.thenReturn(ContainerId.fromString(containerId));
when(mockContainer.getResource()).thenReturn(resource);
List<PrivilegedOperation> preStart = numaResourceHandler
.preStart(mockContainer);
List<String> arguments = preStart.get(0).getArguments();
assertEquals(arguments, Arrays.asList("/usr/bin/numactl",
"--interleave=" + memNodes, "--cpunodebind=" + cpuNodes));
}
}