YARN-6507. Add support in NodeManager to isolate FPGA devices with CGroups. (Zhankun Tang via wangda)

Change-Id: Ic9afd841805f1035423915a0b0add5f3ba96cf9d
This commit is contained in:
Wangda Tan 2017-12-01 10:50:49 -08:00
parent 5304698dc8
commit 7225ec0ceb
14 changed files with 2155 additions and 5 deletions

View File

@ -42,6 +42,7 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
public static final String MEMORY_URI = "memory-mb";
public static final String VCORES_URI = "vcores";
public static final String GPU_URI = "yarn.io/gpu";
public static final String FPGA_URI = "yarn.io/fpga";
public static final ResourceInformation MEMORY_MB =
ResourceInformation.newInstance(MEMORY_URI, "Mi");
@ -49,9 +50,11 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
ResourceInformation.newInstance(VCORES_URI);
public static final ResourceInformation GPUS =
ResourceInformation.newInstance(GPU_URI);
public static final ResourceInformation FPGAS =
ResourceInformation.newInstance(FPGA_URI);
public static final Map<String, ResourceInformation> MANDATORY_RESOURCES =
ImmutableMap.of(MEMORY_URI, MEMORY_MB, VCORES_URI, VCORES, GPU_URI, GPUS);
ImmutableMap.of(MEMORY_URI, MEMORY_MB, VCORES_URI, VCORES, GPU_URI, GPUS, FPGA_URI, FPGAS);
/**
* Get the name for the resource.

View File

@ -1514,13 +1514,36 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_NVIDIA_DOCKER_PLUGIN_V1_ENDPOINT =
"http://localhost:3476/v1.0/docker/cli";
/**
* Prefix for FPGA configurations. Work in progress: This configuration
* parameter may be changed/removed in the future.
*/
@Private
public static final String NM_FPGA_RESOURCE_PREFIX =
NM_RESOURCE_PLUGINS + ".fpga.";
@Private
public static final String NM_FPGA_ALLOWED_DEVICES =
NM_FPGA_RESOURCE_PREFIX + "allowed-fpga-devices";
@Private
public static final String NM_FPGA_PATH_TO_EXEC =
NM_FPGA_RESOURCE_PREFIX + "path-to-discovery-executables";
@Private
public static final String NM_FPGA_VENDOR_PLUGIN =
NM_FPGA_RESOURCE_PREFIX + "vendor-plugin.class";
@Private
public static final String DEFAULT_NM_FPGA_VENDOR_PLUGIN =
"org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin";
/** NM Webapp address.**/
public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address";
public static final int DEFAULT_NM_WEBAPP_PORT = 8042;
public static final String DEFAULT_NM_WEBAPP_ADDRESS = "0.0.0.0:" +
DEFAULT_NM_WEBAPP_PORT;
/** NM Webapp https address.**/
public static final String NM_WEBAPP_HTTPS_ADDRESS = NM_PREFIX
+ "webapp.https.address";

View File

@ -3512,7 +3512,8 @@
<property>
<description>
Enable additional discovery/isolation of resources on the NodeManager,
split by comma. By default, this is empty. Acceptable values: { "yarn-io/gpu" }.
split by comma. By default, this is empty.
Acceptable values: { "yarn-io/gpu", "yarn-io/fpga"}.
</description>
<name>yarn.nodemanager.resource-plugins</name>
<value></value>
@ -3559,6 +3560,43 @@
<value>http://localhost:3476/v1.0/docker/cli</value>
</property>
>>>>>>> theirs
<property>
<description>
Specify one vendor plugin to handle FPGA devices discovery/IP download/configure.
Only IntelFpgaOpenclPlugin is supported by default.
We only allow one NM configured with one vendor FPGA plugin now since the end user can put the same
vendor's cards in one host. And this also simplify our design.
</description>
<name>yarn.nodemanager.resource-plugins.fpga.vendor-plugin.class</name>
<value>org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin</value>
</property>
<property>
<description>
When yarn.nodemanager.resource.fpga.allowed-fpga-devices=auto specified,
YARN NodeManager needs to run FPGA discovery binary (now only support
IntelFpgaOpenclPlugin) to get FPGA information.
When value is empty (default), YARN NodeManager will try to locate
discovery executable from vendor plugin's preference
</description>
<name>yarn.nodemanager.resource-plugins.fpga.path-to-discovery-executables</name>
<value></value>
</property>
<property>
<description>
Specify FPGA devices which can be managed by YARN NodeManager, split by comma
Number of FPGA devices will be reported to RM to make scheduling decisions.
Set to auto (default) let YARN automatically discover FPGA resource from
system.
Manually specify FPGA devices if admin only want subset of FPGA devices managed by YARN.
At present, since we can only configure one major number in c-e.cfg, FPGA device is
identified by their minor device number. A common approach to get minor
device number of FPGA is using "aocl diagnose" and check uevent with device name.
</description>
<name>yarn.nodemanager.resource-plugins.fpga.allowed-fpga-devices</name>
<value>0,1</value>
</property>
</configuration>

View File

@ -52,6 +52,7 @@ public class PrivilegedOperation {
ADD_PID_TO_CGROUP(""), //no CLI switch supported yet.
RUN_DOCKER_CMD("--run-docker"),
GPU("--module-gpu"),
FPGA("--module-fpga"),
LIST_AS_USER(""); //no CLI switch supported yet.
private final String option;

View File

@ -0,0 +1,413 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDiscoverer;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
/**
* This FPGA resource allocator tends to be used by different FPGA vendor's plugin
* A "type" parameter is taken into consideration when allocation
* */
public class FpgaResourceAllocator {
static final Log LOG = LogFactory.getLog(FpgaResourceAllocator.class);
private List<FpgaDevice> allowedFpgas = new LinkedList<>();
//key is resource type of FPGA, vendor plugin supported ID
private LinkedHashMap<String, List<FpgaDevice>> availableFpga = new LinkedHashMap<>();
//key is requetor, aka. container ID
private LinkedHashMap<String, List<FpgaDevice>> usedFpgaByRequestor = new LinkedHashMap<>();
private Context nmContext;
@VisibleForTesting
public HashMap<String, List<FpgaDevice>> getAvailableFpga() {
return availableFpga;
}
@VisibleForTesting
public List<FpgaDevice> getAllowedFpga() {
return allowedFpgas;
}
public FpgaResourceAllocator(Context ctx) {
this.nmContext = ctx;
}
@VisibleForTesting
public int getAvailableFpgaCount() {
int count = 0;
for (List<FpgaDevice> l : availableFpga.values()) {
count += l.size();
}
return count;
}
@VisibleForTesting
public HashMap<String, List<FpgaDevice>> getUsedFpga() {
return usedFpgaByRequestor;
}
@VisibleForTesting
public int getUsedFpgaCount() {
int count = 0;
for (List<FpgaDevice> l : usedFpgaByRequestor.values()) {
count += l.size();
}
return count;
}
public static class FpgaAllocation {
private List<FpgaDevice> allowed = Collections.emptyList();
private List<FpgaDevice> denied = Collections.emptyList();
FpgaAllocation(List<FpgaDevice> allowed, List<FpgaDevice> denied) {
if (allowed != null) {
this.allowed = ImmutableList.copyOf(allowed);
}
if (denied != null) {
this.denied = ImmutableList.copyOf(denied);
}
}
public List<FpgaDevice> getAllowed() {
return allowed;
}
public List<FpgaDevice> getDenied() {
return denied;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("\nFpgaAllocation\n\tAllowed:\n");
for (FpgaDevice device : allowed) {
sb.append("\t\t");
sb.append(device + "\n");
}
sb.append("\tDenied\n");
for (FpgaDevice device : denied) {
sb.append("\t\t");
sb.append(device + "\n");
}
return sb.toString();
}
}
public static class FpgaDevice implements Comparable<FpgaDevice>, Serializable {
private static final long serialVersionUID = 1L;
private String type;
private Integer major;
private Integer minor;
// IP file identifier. matrix multiplication for instance
private String IPID;
// the device name under /dev
private String devName;
// the alias device name. Intel use acl number acl0 to acl31
private String aliasDevName;
// lspci output's bus number: 02:00.00 (bus:slot.func)
private String busNum;
private String temperature;
private String cardPowerUsage;
public String getType() {
return type;
}
public Integer getMajor() {
return major;
}
public Integer getMinor() {
return minor;
}
public String getIPID() {
return IPID;
}
public void setIPID(String IPID) {
this.IPID = IPID;
}
public String getDevName() {
return devName;
}
public void setDevName(String devName) {
this.devName = devName;
}
public String getAliasDevName() {
return aliasDevName;
}
public void setAliasDevName(String aliasDevName) {
this.aliasDevName = aliasDevName;
}
public String getBusNum() {
return busNum;
}
public void setBusNum(String busNum) {
this.busNum = busNum;
}
public String getTemperature() {
return temperature;
}
public String getCardPowerUsage() {
return cardPowerUsage;
}
public FpgaDevice(String type, Integer major, Integer minor, String IPID) {
this.type = type;
this.major = major;
this.minor = minor;
this.IPID = IPID;
}
public FpgaDevice(String type, Integer major,
Integer minor, String IPID, String devName,
String aliasDevName, String busNum, String temperature, String cardPowerUsage) {
this.type = type;
this.major = major;
this.minor = minor;
this.IPID = IPID;
this.devName = devName;
this.aliasDevName = aliasDevName;
this.busNum = busNum;
this.temperature = temperature;
this.cardPowerUsage = cardPowerUsage;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof FpgaDevice)) {
return false;
}
FpgaDevice other = (FpgaDevice) obj;
if (other.getType().equals(this.type) &&
other.getMajor().equals(this.major) &&
other.getMinor().equals(this.minor)) {
return true;
}
return false;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((type == null) ? 0 : type.hashCode());
result = prime * result + ((major == null) ? 0 : major.hashCode());
result = prime * result + ((minor == null) ? 0 : minor.hashCode());
return result;
}
@Override
public int compareTo(FpgaDevice o) {
return 0;
}
@Override
public String toString() {
return "FPGA Device:(Type: " + this.type + ", Major: " +
this.major + ", Minor: " + this.minor + ", IPID: " + this.IPID + ")";
}
}
public synchronized void addFpga(String type, List<FpgaDevice> list) {
availableFpga.putIfAbsent(type, new LinkedList<>());
for (FpgaDevice device : list) {
if (!allowedFpgas.contains(device)) {
allowedFpgas.add(device);
availableFpga.get(type).add(device);
}
}
LOG.info("Add a list of FPGA Devices: " + list);
}
public synchronized void updateFpga(String requestor,
FpgaDevice device, String newIPID) {
List<FpgaDevice> usedFpgas = usedFpgaByRequestor.get(requestor);
int index = findMatchedFpga(usedFpgas, device);
if (-1 != index) {
usedFpgas.get(index).setIPID(newIPID);
} else {
LOG.warn("Failed to update FPGA due to unknown reason " +
"that no record for this allocated device:" + device);
}
LOG.info("Update IPID to " + newIPID +
" for this allocated device:" + device);
}
private synchronized int findMatchedFpga(List<FpgaDevice> devices, FpgaDevice item) {
int i = 0;
for (; i < devices.size(); i++) {
if (devices.get(i) == item) {
return i;
}
}
return -1;
}
/**
* Assign {@link FpgaAllocation} with preferred IPID, if no, with random FPGAs
* @param type vendor plugin supported FPGA device type
* @param count requested FPGA slot count
* @param container container id
* @param IPIDPreference allocate slot with this IPID first
* @return Instance consists two List of allowed and denied {@link FpgaDevice}
* @throws ResourceHandlerException When failed to allocate or write state store
* */
public synchronized FpgaAllocation assignFpga(String type, long count,
Container container, String IPIDPreference) throws ResourceHandlerException {
List<FpgaDevice> currentAvailableFpga = availableFpga.get(type);
String requestor = container.getContainerId().toString();
if (null == currentAvailableFpga) {
throw new ResourceHandlerException("No such type of FPGA resource available: " + type);
}
if (count < 0 || count > currentAvailableFpga.size()) {
throw new ResourceHandlerException("Invalid FPGA request count or not enough, requested:" +
count + ", available:" + getAvailableFpgaCount());
}
if (count > 0) {
// Allocate devices with matching IP first, then any device is ok
List<FpgaDevice> assignedFpgas = new LinkedList<>();
int matchIPCount = 0;
for (int i = 0; i < currentAvailableFpga.size(); i++) {
if ( null != currentAvailableFpga.get(i).getIPID() &&
currentAvailableFpga.get(i).getIPID().equalsIgnoreCase(IPIDPreference)) {
assignedFpgas.add(currentAvailableFpga.get(i));
currentAvailableFpga.remove(i);
matchIPCount++;
}
}
int remaining = (int) count - matchIPCount;
while (remaining > 0) {
assignedFpgas.add(currentAvailableFpga.remove(0));
remaining--;
}
// Record in state store if we allocated anything
if (!assignedFpgas.isEmpty()) {
try {
nmContext.getNMStateStore().storeAssignedResources(container,
FPGA_URI, new LinkedList<>(assignedFpgas));
} catch (IOException e) {
// failed, give the allocation back
currentAvailableFpga.addAll(assignedFpgas);
throw new ResourceHandlerException(e);
}
// update state store success, update internal used FPGAs
usedFpgaByRequestor.putIfAbsent(requestor, new LinkedList<>());
usedFpgaByRequestor.get(requestor).addAll(assignedFpgas);
}
return new FpgaAllocation(assignedFpgas, currentAvailableFpga);
}
return new FpgaAllocation(null, allowedFpgas);
}
public synchronized void recoverAssignedFpgas(ContainerId containerId) throws ResourceHandlerException {
Container c = nmContext.getContainers().get(containerId);
if (null == c) {
throw new ResourceHandlerException(
"This shouldn't happen, cannot find container with id="
+ containerId);
}
for (Serializable fpgaDevice :
c.getResourceMappings().getAssignedResources(FPGA_URI)) {
if (!(fpgaDevice instanceof FpgaDevice)) {
throw new ResourceHandlerException(
"Trying to recover allocated FPGA devices, however it"
+ " is not FpgaDevice type, this shouldn't happen");
}
// Make sure it is in allowed FPGA device.
if (!allowedFpgas.contains(fpgaDevice)) {
throw new ResourceHandlerException("Try to recover FpgaDevice = " + fpgaDevice
+ " however it is not in allowed device list:" + StringUtils
.join(";", allowedFpgas));
}
// Make sure it is not occupied by anybody else
Iterator<Map.Entry<String, List<FpgaDevice>>> iterator =
getUsedFpga().entrySet().iterator();
while (iterator.hasNext()) {
if (iterator.next().getValue().contains(fpgaDevice)) {
throw new ResourceHandlerException("Try to recover FpgaDevice = " + fpgaDevice
+ " however it is already assigned to others");
}
}
getUsedFpga().putIfAbsent(containerId.toString(), new LinkedList<>());
getUsedFpga().get(containerId.toString()).add((FpgaDevice) fpgaDevice);
// remove them from available list
getAvailableFpga().get(((FpgaDevice) fpgaDevice).getType()).remove(fpgaDevice);
}
}
public synchronized void cleanupAssignFpgas(String requestor) {
List<FpgaDevice> usedFpgas = usedFpgaByRequestor.get(requestor);
if (usedFpgas != null) {
for (FpgaDevice device : usedFpgas) {
// Add back to availableFpga
availableFpga.get(device.getType()).add(device);
}
usedFpgaByRequestor.remove(requestor);
}
}
}

View File

@ -0,0 +1,220 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.AbstractFpgaVendorPlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDiscoverer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
@InterfaceStability.Unstable
@InterfaceAudience.Private
public class FpgaResourceHandlerImpl implements ResourceHandler {
static final Log LOG = LogFactory.getLog(FpgaResourceHandlerImpl.class);
private final String REQUEST_FPGA_IP_ID_KEY = "REQUESTED_FPGA_IP_ID";
private AbstractFpgaVendorPlugin vendorPlugin;
private FpgaResourceAllocator allocator;
private CGroupsHandler cGroupsHandler;
public static final String EXCLUDED_FPGAS_CLI_OPTION = "--excluded_fpgas";
public static final String CONTAINER_ID_CLI_OPTION = "--container_id";
private PrivilegedOperationExecutor privilegedOperationExecutor;
@VisibleForTesting
public FpgaResourceHandlerImpl(Context nmContext,
CGroupsHandler cGroupsHandler,
PrivilegedOperationExecutor privilegedOperationExecutor,
AbstractFpgaVendorPlugin plugin) {
this.allocator = new FpgaResourceAllocator(nmContext);
this.vendorPlugin = plugin;
FpgaDiscoverer.getInstance().setResourceHanderPlugin(vendorPlugin);
this.cGroupsHandler = cGroupsHandler;
this.privilegedOperationExecutor = privilegedOperationExecutor;
}
@VisibleForTesting
public FpgaResourceAllocator getFpgaAllocator() {
return allocator;
}
public String getRequestedIPID(Container container) {
String r= container.getLaunchContext().getEnvironment().
get(REQUEST_FPGA_IP_ID_KEY);
return r == null ? "" : r;
}
@Override
public List<PrivilegedOperation> bootstrap(Configuration configuration) throws ResourceHandlerException {
// The plugin should be initilized by FpgaDiscoverer already
if (!vendorPlugin.initPlugin(configuration)) {
throw new ResourceHandlerException("FPGA plugin initialization failed", null);
}
LOG.info("FPGA Plugin bootstrap success.");
// Get avialable devices minor numbers from toolchain or static configuration
List<FpgaResourceAllocator.FpgaDevice> fpgaDeviceList = FpgaDiscoverer.getInstance().discover();
allocator.addFpga(vendorPlugin.getFpgaType(), fpgaDeviceList);
this.cGroupsHandler.initializeCGroupController(CGroupsHandler.CGroupController.DEVICES);
return null;
}
@Override
public List<PrivilegedOperation> preStart(Container container) throws ResourceHandlerException {
// 1. Get requested FPGA type and count, choose corresponding FPGA plugin(s)
// 2. Use allocator.assignFpga(type, count) to get FPGAAllocation
// 3. If required, download to ensure IP file exists and configure IP file for all devices
List<PrivilegedOperation> ret = new ArrayList<>();
String containerIdStr = container.getContainerId().toString();
Resource requestedResource = container.getResource();
// Create device cgroups for the container
cGroupsHandler.createCGroup(CGroupsHandler.CGroupController.DEVICES,
containerIdStr);
long deviceCount = requestedResource.getResourceValue(FPGA_URI);
LOG.info(containerIdStr + " requested " + deviceCount + " Intel FPGA(s)");
String ipFilePath = null;
try {
// allocate even request 0 FPGA because we need to deny all device numbers for this container
FpgaResourceAllocator.FpgaAllocation allocation = allocator.assignFpga(
vendorPlugin.getFpgaType(), deviceCount,
container, getRequestedIPID(container));
LOG.info("FpgaAllocation:" + allocation);
PrivilegedOperation privilegedOperation = new PrivilegedOperation(PrivilegedOperation.OperationType.FPGA,
Arrays.asList(CONTAINER_ID_CLI_OPTION, containerIdStr));
if (!allocation.getDenied().isEmpty()) {
List<Integer> denied = new ArrayList<>();
allocation.getDenied().forEach(device -> denied.add(device.getMinor()));
privilegedOperation.appendArgs(Arrays.asList(EXCLUDED_FPGAS_CLI_OPTION,
StringUtils.join(",", denied)));
}
privilegedOperationExecutor.executePrivilegedOperation(privilegedOperation, true);
if (deviceCount > 0) {
/**
* We only support flashing one IP for all devices now. If user don't set this
* environment variable, we assume that user's application can find the IP file by
* itself.
* Note that the IP downloading and reprogramming in advance in YARN is not necessary because
* the OpenCL application may find the IP file and reprogram device on the fly. But YARN do this
* for the containers will achieve the quickest reprogram path
*
* For instance, REQUESTED_FPGA_IP_ID = "matrix_mul" will make all devices
* programmed with matrix multiplication IP
*
* In the future, we may support "matrix_mul:1,gzip:2" format to support different IP
* for different devices
*
* */
ipFilePath = vendorPlugin.downloadIP(getRequestedIPID(container), container.getWorkDir(),
container.getResourceSet().getLocalizedResources());
if (ipFilePath.isEmpty()) {
LOG.warn("FPGA plugin failed to download IP but continue, please check the value of environment viable: " +
REQUEST_FPGA_IP_ID_KEY + " if you want yarn to help");
} else {
LOG.info("IP file path:" + ipFilePath);
List<FpgaResourceAllocator.FpgaDevice> allowed = allocation.getAllowed();
String majorMinorNumber;
for (int i = 0; i < allowed.size(); i++) {
majorMinorNumber = allowed.get(i).getMajor() + ":" + allowed.get(i).getMinor();
String currentIPID = allowed.get(i).getIPID();
if (null != currentIPID &&
currentIPID.equalsIgnoreCase(getRequestedIPID(container))) {
LOG.info("IP already in device \"" + allowed.get(i).getAliasDevName() + "," +
majorMinorNumber + "\", skip reprogramming");
continue;
}
if (vendorPlugin.configureIP(ipFilePath, majorMinorNumber)) {
// update the allocator that we update an IP of a device
allocator.updateFpga(containerIdStr, allowed.get(i),
getRequestedIPID(container));
//TODO: update the node constraint label
}
}
}
}
} catch (ResourceHandlerException re) {
allocator.cleanupAssignFpgas(containerIdStr);
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES,
containerIdStr);
throw re;
} catch (PrivilegedOperationException e) {
allocator.cleanupAssignFpgas(containerIdStr);
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES, containerIdStr);
LOG.warn("Could not update cgroup for container", e);
throw new ResourceHandlerException(e);
}
//isolation operation
ret.add(new PrivilegedOperation(
PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
PrivilegedOperation.CGROUP_ARG_PREFIX
+ cGroupsHandler.getPathForCGroupTasks(
CGroupsHandler.CGroupController.DEVICES, containerIdStr)));
return ret;
}
@Override
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId) throws ResourceHandlerException {
allocator.recoverAssignedFpgas(containerId);
return null;
}
@Override
public List<PrivilegedOperation> postComplete(ContainerId containerId) throws ResourceHandlerException {
allocator.cleanupAssignFpgas(containerId.toString());
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES,
containerId.toString());
return null;
}
@Override
public List<PrivilegedOperation> teardown() throws ResourceHandlerException {
return null;
}
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaResourcePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuResourcePlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,6 +34,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
/**
@ -42,7 +44,7 @@ public class ResourcePluginManager {
private static final Logger LOG =
LoggerFactory.getLogger(ResourcePluginManager.class);
private static final Set<String> SUPPORTED_RESOURCE_PLUGINS = ImmutableSet.of(
GPU_URI);
GPU_URI, FPGA_URI);
private Map<String, ResourcePlugin> configuredPlugins = Collections.EMPTY_MAP;
@ -77,6 +79,10 @@ public class ResourcePluginManager {
plugin = new GpuResourcePlugin();
}
if (resourceName.equals(FPGA_URI)) {
plugin = new FpgaResourcePlugin();
}
if (plugin == null) {
throw new YarnException(
"This shouldn't happen, plugin=" + resourceName

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator;
import java.util.List;
import java.util.Map;
/**
* FPGA plugin interface for vendor to implement. Used by {@link FpgaDiscoverer} and
* {@link org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceHandlerImpl}
* to discover devices/download IP/configure IP
* */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface AbstractFpgaVendorPlugin extends Configurable{
/**
* Check vendor's toolchain and required environment
* */
boolean initPlugin(Configuration conf);
/**
* Diagnose the devices using vendor toolchain but no need to parse device information
* */
boolean diagnose(int timeout);
/**
* Discover the vendor's FPGA devices with execution time constraint
* @param timeout The vendor plugin should return result during this time
* @return The result will be added to FPGAResourceAllocator for later scheduling
* */
List<FpgaResourceAllocator.FpgaDevice> discover(int timeout);
/**
* Since all vendor plugins share a {@link org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator}
* which distinguish FPGA devices by type. Vendor plugin must report this.
* */
String getFpgaType();
/**
* The vendor plugin download required IP files to a required directory.
* It should check if the IP file has already been downloaded.
* @param id The identifier for IP file. Comes from application, ie. matrix_multi_v1
* @param dstDir The plugin should download IP file to this directory
* @param localizedResources The container localized resource can be searched for IP file. Key is
* localized file path and value is soft link names
* @return The absolute path string of IP file
* */
String downloadIP(String id, String dstDir, Map<Path, List<String>> localizedResources);
/**
* The vendor plugin configure an IP file to a device
* @param ipPath The absolute path of the IP file
* @param majorMinorNumber The device in format <major:minor>
* @return configure device ok or not
* */
boolean configureIP(String ipPath, String majorMinorNumber);
@Override
void setConf(Configuration conf);
@Override
Configuration getConf();
}

View File

@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
public class FpgaDiscoverer {
public static final Logger LOG = LoggerFactory.getLogger(
FpgaDiscoverer.class);
private static FpgaDiscoverer instance;
private Configuration conf = null;
private AbstractFpgaVendorPlugin plugin = null;
private List<FpgaResourceAllocator.FpgaDevice> currentFpgaInfo = null;
// shell command timeout
private static final int MAX_EXEC_TIMEOUT_MS = 10 * 1000;
static {
instance = new FpgaDiscoverer();
}
public static FpgaDiscoverer getInstance() {
return instance;
}
@VisibleForTesting
public synchronized static FpgaDiscoverer setInstance(FpgaDiscoverer newInstance) {
instance = newInstance;
return instance;
}
@VisibleForTesting
public synchronized void setConf(Configuration conf) {
this.conf = conf;
}
public List<FpgaResourceAllocator.FpgaDevice> getCurrentFpgaInfo() {
return currentFpgaInfo;
}
public synchronized void setResourceHanderPlugin(AbstractFpgaVendorPlugin plugin) {
this.plugin = plugin;
}
public synchronized boolean diagnose() {
return this.plugin.diagnose(MAX_EXEC_TIMEOUT_MS);
}
public synchronized void initialize(Configuration conf) throws YarnException {
this.conf = conf;
this.plugin.initPlugin(conf);
// Try to diagnose FPGA
LOG.info("Trying to diagnose FPGA information ...");
if (!diagnose()) {
LOG.warn("Failed to pass FPGA devices diagnose");
}
}
/**
* get avialable devices minor numbers from toolchain or static configuration
* */
public synchronized List<FpgaResourceAllocator.FpgaDevice> discover() throws ResourceHandlerException {
List<FpgaResourceAllocator.FpgaDevice> list;
String allowed = this.conf.get(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES);
// whatever static or auto discover, we always needs
// the vendor plugin to discover. For instance, IntelFpgaOpenclPlugin need to
// setup a mapping of <major:minor> to <aliasDevName>
list = this.plugin.discover(MAX_EXEC_TIMEOUT_MS);
if (0 == list.size()) {
throw new ResourceHandlerException("No FPGA devices detected!");
}
currentFpgaInfo = list;
if (allowed.equalsIgnoreCase(
YarnConfiguration.AUTOMATICALLY_DISCOVER_GPU_DEVICES)) {
return list;
} else if (allowed.matches("(\\d,)*\\d")){
String[] minors = allowed.split(",");
Iterator<FpgaResourceAllocator.FpgaDevice> iterator = list.iterator();
// remove the non-configured minor numbers
FpgaResourceAllocator.FpgaDevice t;
while (iterator.hasNext()) {
boolean valid = false;
t = iterator.next();
for (String minorNumber : minors) {
if (t.getMinor().toString().equals(minorNumber)) {
valid = true;
break;
}
}
if (!valid) {
iterator.remove();
}
}
// if the count of user configured is still larger than actual
if (list.size() != minors.length) {
LOG.warn("We continue although there're mistakes in user's configuration " +
YarnConfiguration.NM_FPGA_ALLOWED_DEVICES +
"user configured:" + allowed + ", while the real:" + list.toString());
}
} else {
throw new ResourceHandlerException("Invalid value configured for " +
YarnConfiguration.NM_FPGA_ALLOWED_DEVICES + ":\"" + allowed + "\"");
}
return list;
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
public class FpgaNodeResourceUpdateHandler extends NodeResourceUpdaterPlugin {
private static final Logger LOG = LoggerFactory.getLogger(
FpgaNodeResourceUpdateHandler.class);
@Override
public void updateConfiguredResource(Resource res) throws YarnException {
LOG.info("Initializing configured FPGA resources for the NodeManager.");
List<FpgaResourceAllocator.FpgaDevice> list = FpgaDiscoverer.getInstance().getCurrentFpgaInfo();
List<Integer> minors = new LinkedList<>();
for (FpgaResourceAllocator.FpgaDevice device : list) {
minors.add(device.getMinor());
}
if (minors.isEmpty()) {
LOG.info("Didn't find any usable FPGAs on the NodeManager.");
return;
}
long count = minors.size();
Map<String, ResourceInformation> configuredResourceTypes =
ResourceUtils.getResourceTypes();
if (!configuredResourceTypes.containsKey(FPGA_URI)) {
throw new YarnException("Wrong configurations, found " + count +
" usable FPGAs, however " + FPGA_URI
+ " resource-type is not configured inside"
+ " resource-types.xml, please configure it to enable FPGA feature or"
+ " remove " + FPGA_URI + " from "
+ YarnConfiguration.NM_RESOURCE_PLUGINS);
}
res.setResourceValue(FPGA_URI, count);
}
}

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceHandlerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMResourceInfo;
public class FpgaResourcePlugin implements ResourcePlugin {
private static final Log LOG = LogFactory.getLog(FpgaResourcePlugin.class);
private ResourceHandler fpgaResourceHandler = null;
private AbstractFpgaVendorPlugin vendorPlugin = null;
private FpgaNodeResourceUpdateHandler fpgaNodeResourceUpdateHandler = null;
private AbstractFpgaVendorPlugin createFpgaVendorPlugin(Configuration conf) {
String vendorPluginClass = conf.get(YarnConfiguration.NM_FPGA_VENDOR_PLUGIN,
YarnConfiguration.DEFAULT_NM_FPGA_VENDOR_PLUGIN);
LOG.info("Using FPGA vendor plugin: " + vendorPluginClass);
try {
Class<?> schedulerClazz = Class.forName(vendorPluginClass);
if (AbstractFpgaVendorPlugin.class.isAssignableFrom(schedulerClazz)) {
return (AbstractFpgaVendorPlugin) ReflectionUtils.newInstance(schedulerClazz,
conf);
} else {
throw new YarnRuntimeException("Class: " + vendorPluginClass
+ " not instance of " + AbstractFpgaVendorPlugin.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate FPGA vendor plugin: "
+ vendorPluginClass, e);
}
}
@Override
public void initialize(Context context) throws YarnException {
// Get vendor plugin from configuration
this.vendorPlugin = createFpgaVendorPlugin(context.getConf());
FpgaDiscoverer.getInstance().setResourceHanderPlugin(vendorPlugin);
FpgaDiscoverer.getInstance().initialize(context.getConf());
fpgaNodeResourceUpdateHandler = new FpgaNodeResourceUpdateHandler();
}
@Override
public ResourceHandler createResourceHandler(
Context nmContext, CGroupsHandler cGroupsHandler,
PrivilegedOperationExecutor privilegedOperationExecutor) {
if (fpgaResourceHandler == null) {
fpgaResourceHandler = new FpgaResourceHandlerImpl(nmContext,
cGroupsHandler, privilegedOperationExecutor, vendorPlugin);
}
return fpgaResourceHandler;
}
@Override
public NodeResourceUpdaterPlugin getNodeResourceHandlerInstance() {
return fpgaNodeResourceUpdateHandler;
}
@Override
public void cleanup() throws YarnException {
}
@Override
public DockerCommandPlugin getDockerCommandPluginInstance() {
return null;
}
@Override
public NMResourceInfo getNMResourceInfo() throws YarnException {
return null;
}
}

View File

@ -0,0 +1,396 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Intel FPGA for OpenCL plugin.
* The key points are:
* 1. It uses Intel's toolchain "aocl" to discover devices/reprogram IP to the device
* before container launch to achieve a quickest reprogramming path
* 2. It avoids reprogramming by maintaining a mapping of device to FPGA IP ID
* 3. It assume IP file is distributed to container directory
*/
public class IntelFpgaOpenclPlugin implements AbstractFpgaVendorPlugin {
public static final Logger LOG = LoggerFactory.getLogger(
IntelFpgaOpenclPlugin.class);
private boolean initialized = false;
private Configuration conf;
private InnerShellExecutor shell;
protected static final String DEFAULT_BINARY_NAME = "aocl";
protected static final String ALTERAOCLSDKROOT_NAME = "ALTERAOCLSDKROOT";
private String pathToExecutable = null;
// a mapping of major:minor number to acl0-31
private Map<String, String> aliasMap;
public IntelFpgaOpenclPlugin() {
this.shell = new InnerShellExecutor();
}
public String getDefaultBinaryName() {
return DEFAULT_BINARY_NAME;
}
public String getDefaultPathToExecutable() {
return System.getenv(ALTERAOCLSDKROOT_NAME);
}
public static String getDefaultPathEnvName() {
return ALTERAOCLSDKROOT_NAME;
}
@VisibleForTesting
public String getPathToExecutable() {
return pathToExecutable;
}
public void setPathToExecutable(String pathToExecutable) {
this.pathToExecutable = pathToExecutable;
}
@VisibleForTesting
public void setShell(InnerShellExecutor shell) {
this.shell = shell;
}
public Map<String, String> getAliasMap() {
return aliasMap;
}
/**
* Check the Intel FPGA for OpenCL toolchain
* */
@Override
public boolean initPlugin(Configuration conf) {
this.aliasMap = new HashMap<>();
if (this.initialized) {
return true;
}
// Find the proper toolchain, mainly aocl
String pluginDefaultBinaryName = getDefaultBinaryName();
String pathToExecutable = conf.get(YarnConfiguration.NM_FPGA_PATH_TO_EXEC,
"");
if (pathToExecutable.isEmpty()) {
pathToExecutable = pluginDefaultBinaryName;
}
// Validate file existence
File binaryPath = new File(pathToExecutable);
if (!binaryPath.exists()) {
// When binary not exist, fail
LOG.warn("Failed to find FPGA discoverer executable configured in " +
YarnConfiguration.NM_FPGA_PATH_TO_EXEC +
", please check! Try default path");
pathToExecutable = pluginDefaultBinaryName;
// Try to find in plugin's preferred path
String pluginDefaultPreferredPath = getDefaultPathToExecutable();
if (null == pluginDefaultPreferredPath) {
LOG.warn("Failed to find FPGA discoverer executable from system environment " +
getDefaultPathEnvName()+
", please check your environment!");
} else {
binaryPath = new File(pluginDefaultPreferredPath + "/bin", pluginDefaultBinaryName);
if (binaryPath.exists()) {
pathToExecutable = pluginDefaultPreferredPath;
} else {
pathToExecutable = pluginDefaultBinaryName;
LOG.warn("Failed to find FPGA discoverer executable in " +
pluginDefaultPreferredPath + ", file doesn't exists! Use default binary" + pathToExecutable);
}
}
}
setPathToExecutable(pathToExecutable);
if (!diagnose(10*1000)) {
LOG.warn("Intel FPGA for OpenCL diagnose failed!");
this.initialized = false;
} else {
this.initialized = true;
}
return this.initialized;
}
@Override
public List<FpgaResourceAllocator.FpgaDevice> discover(int timeout) {
List<FpgaResourceAllocator.FpgaDevice> list = new LinkedList<>();
String output;
output = getDiagnoseInfo(timeout);
if (null == output) {
return list;
}
parseDiagnoseInfo(output, list);
return list;
}
public static class InnerShellExecutor {
// ls /dev/<devName>
// return a string in format <major:minor>
public String getMajorAndMinorNumber(String devName) {
String output = null;
Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor(
new String[]{"stat", "-c", "%t:%T", "/dev/" + devName});
try {
LOG.debug("Get FPGA major-minor numbers from /dev/" + devName);
shexec.execute();
String[] strs = shexec.getOutput().trim().split(":");
LOG.debug("stat output:" + shexec.getOutput());
output = Integer.parseInt(strs[0], 16) + ":" + Integer.parseInt(strs[1], 16);
} catch (IOException e) {
String msg =
"Failed to get major-minor number from reading /dev/" + devName;
LOG.warn(msg);
LOG.debug("Command output:" + shexec.getOutput() + ", exit code:" +
shexec.getExitCode());
}
return output;
}
public String runDiagnose(String binary, int timeout) {
String output = null;
Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor(
new String[]{binary, "diagnose"});
try {
shexec.execute();
} catch (IOException e) {
// aocl diagnose exit code is 1 even it success.
// we ignore it because we only wants the output
String msg =
"Failed to execute " + binary + " diagnose, exception message:" + e
.getMessage() +", output:" + output + ", continue ...";
LOG.warn(msg);
LOG.debug(shexec.getOutput());
}
return shexec.getOutput();
}
}
/**
* One real sample output of Intel FPGA SDK 17.0's "aocl diagnose" is as below:
* "
* aocl diagnose: Running diagnose from /home/fpga/intelFPGA_pro/17.0/hld/board/nalla_pcie/linux64/libexec
*
* ------------------------- acl0 -------------------------
* Vendor: Nallatech ltd
*
* Phys Dev Name Status Information
*
* aclnalla_pcie0Passed nalla_pcie (aclnalla_pcie0)
* PCIe dev_id = 2494, bus:slot.func = 02:00.00, Gen3 x8
* FPGA temperature = 54.4 degrees C.
* Total Card Power Usage = 31.7 Watts.
* Device Power Usage = 0.0 Watts.
*
* DIAGNOSTIC_PASSED
* ---------------------------------------------------------
* "
*
* While per Intel's guide, the output(should be outdated or prior SDK version's) is as below:
*
* "
* aocl diagnose: Running diagnostic from ALTERAOCLSDKROOT/board/<board_name>/
* <platform>/libexec
* Verified that the kernel mode driver is installed on the host machine.
* Using board package from vendor: <board_vendor_name>
* Querying information for all supported devices that are installed on the host
* machine ...
*
* device_name Status Information
*
* acl0 Passed <descriptive_board_name>
* PCIe dev_id = <device_ID>, bus:slot.func = 02:00.00,
* at Gen 2 with 8 lanes.
* FPGA temperature=43.0 degrees C.
* acl1 Passed <descriptive_board_name>
* PCIe dev_id = <device_ID>, bus:slot.func = 03:00.00,
* at Gen 2 with 8 lanes.
* FPGA temperature = 35.0 degrees C.
*
* Found 2 active device(s) installed on the host machine, to perform a full
* diagnostic on a specific device, please run aocl diagnose <device_name>
*
* DIAGNOSTIC_PASSED
* "
* But this method only support the first output
* */
public void parseDiagnoseInfo(String output, List<FpgaResourceAllocator.FpgaDevice> list) {
if (output.contains("DIAGNOSTIC_PASSED")) {
Matcher headerStartMatcher = Pattern.compile("acl[0-31]").matcher(output);
Matcher headerEndMatcher = Pattern.compile("(?i)DIAGNOSTIC_PASSED").matcher(output);
int sectionStartIndex;
int sectionEndIndex;
String aliasName;
while (headerStartMatcher.find()) {
sectionStartIndex = headerStartMatcher.end();
String section = null;
aliasName = headerStartMatcher.group();
while (headerEndMatcher.find(sectionStartIndex)) {
sectionEndIndex = headerEndMatcher.start();
section = output.substring(sectionStartIndex, sectionEndIndex);
break;
}
if (null == section) {
LOG.warn("Unsupported diagnose output");
return;
}
// devName, \(.*\)
// busNum, bus:slot.func\s=\s.*,
// FPGA temperature\s=\s.*
// Total\sCard\sPower\sUsage\s=\s.*
String[] fieldRegexes = new String[]{"\\(.*\\)\n", "(?i)bus:slot.func\\s=\\s.*,",
"(?i)FPGA temperature\\s=\\s.*", "(?i)Total\\sCard\\sPower\\sUsage\\s=\\s.*"};
String[] fields = new String[4];
String tempFieldValue;
for (int i = 0; i < fieldRegexes.length; i++) {
Matcher fieldMatcher = Pattern.compile(fieldRegexes[i]).matcher(section);
if (!fieldMatcher.find()) {
LOG.warn("Couldn't find " + fieldRegexes[i] + " pattern");
fields[i] = "";
continue;
}
tempFieldValue = fieldMatcher.group().trim();
if (i == 0) {
// special case for Device name
fields[i] = tempFieldValue.substring(1, tempFieldValue.length() - 1);
} else {
String ss = tempFieldValue.split("=")[1].trim();
fields[i] = ss.substring(0, ss.length() - 1);
}
}
String majorMinorNumber = this.shell.getMajorAndMinorNumber(fields[0]);
if (null != majorMinorNumber) {
String[] mmn = majorMinorNumber.split(":");
this.aliasMap.put(majorMinorNumber, aliasName);
list.add(new FpgaResourceAllocator.FpgaDevice(getFpgaType(),
Integer.parseInt(mmn[0]),
Integer.parseInt(mmn[1]), null,
fields[0], aliasName, fields[1], fields[2], fields[3]));
}
}// end while
}// end if
}
public String getDiagnoseInfo(int timeout) {
return this.shell.runDiagnose(this.pathToExecutable,timeout);
}
@Override
public boolean diagnose(int timeout) {
String output = getDiagnoseInfo(timeout);
if (null != output && output.contains("DIAGNOSTIC_PASSED")) {
return true;
}
return false;
}
/**
* this is actually the opencl platform type
* */
@Override
public String getFpgaType() {
return "IntelOpenCL";
}
@Override
public String downloadIP(String id, String dstDir, Map<Path, List<String>> localizedResources) {
// Assume .aocx IP file is distributed by DS to local dir
String r = "";
Path path;
LOG.info("Got environment: " + id + ", search IP file in localized resources");
if (null == id || id.isEmpty()) {
LOG.warn("IP_ID environment is empty, skip downloading");
return r;
}
if (localizedResources != null) {
for (Map.Entry<Path, List<String>> resourceEntry :
localizedResources.entrySet()) {
path = resourceEntry.getKey();
LOG.debug("Check:" + path.toUri().toString());
if (path.getName().toLowerCase().contains(id.toLowerCase()) && path.getName().endsWith(".aocx")) {
r = path.toUri().toString();
LOG.debug("Found: " + r);
break;
}
}
} else {
LOG.warn("Localized resource is null!");
}
return r;
}
/**
* Program one device.
* It's ok for the offline "aocl program" failed because the application will always invoke API to program
* The reason we do offline reprogramming is to make the application's program process faster
* @param ipPath the absolute path to the aocx IP file
* @param majorMinorNumber major:minor string
* @return True or False
* */
@Override
public boolean configureIP(String ipPath, String majorMinorNumber) {
// perform offline program the IP to get a quickest reprogramming sequence
// we need a mapping of "major:minor" to "acl0" to issue command "aocl program <acl0> <ipPath>"
Shell.ShellCommandExecutor shexec;
String aclName;
aclName = this.aliasMap.get(majorMinorNumber);
shexec = new Shell.ShellCommandExecutor(
new String[]{this.pathToExecutable, "program", aclName, ipPath});
try {
shexec.execute();
if (0 == shexec.getExitCode()) {
LOG.debug(shexec.getOutput());
LOG.info("Intel aocl program " + ipPath + " to " + aclName + " successfully");
} else {
return false;
}
} catch (IOException e) {
LOG.error("Intel aocl program " + ipPath + " to " + aclName + " failed!");
e.printStackTrace();
return false;
}
return true;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return this.conf;
}
}

View File

@ -0,0 +1,458 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDiscoverer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static org.mockito.Mockito.*;
public class TestFpgaResourceHandler {
private Context mockContext;
private FpgaResourceHandlerImpl fpgaResourceHandler;
private Configuration configuration;
private CGroupsHandler mockCGroupsHandler;
private PrivilegedOperationExecutor mockPrivilegedExecutor;
private NMStateStoreService mockNMStateStore;
private ConcurrentHashMap<ContainerId, Container> runningContainersMap;
private IntelFpgaOpenclPlugin mockVendorPlugin;
private static final String vendorType = "IntelOpenCL";
@Before
public void setup() {
TestResourceUtils.addNewTypesToResources(ResourceInformation.FPGA_URI);
configuration = new YarnConfiguration();
mockCGroupsHandler = mock(CGroupsHandler.class);
mockPrivilegedExecutor = mock(PrivilegedOperationExecutor.class);
mockNMStateStore = mock(NMStateStoreService.class);
mockContext = mock(Context.class);
// Assumed devices parsed from output
List<FpgaResourceAllocator.FpgaDevice> list = new ArrayList<>();
list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 0, null));
list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 1, null));
list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 2, null));
list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 3, null));
list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 4, null));
mockVendorPlugin = mockPlugin(vendorType, list);
FpgaDiscoverer.getInstance().setConf(configuration);
when(mockContext.getNMStateStore()).thenReturn(mockNMStateStore);
runningContainersMap = new ConcurrentHashMap<>();
when(mockContext.getContainers()).thenReturn(runningContainersMap);
fpgaResourceHandler = new FpgaResourceHandlerImpl(mockContext,
mockCGroupsHandler, mockPrivilegedExecutor, mockVendorPlugin);
}
@Test
public void testBootstrap() throws ResourceHandlerException {
// Case 1. auto
String allowed = "auto";
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, allowed);
fpgaResourceHandler.bootstrap(configuration);
verify(mockVendorPlugin, times(1)).initPlugin(configuration);
verify(mockCGroupsHandler, times(1)).initializeCGroupController(
CGroupsHandler.CGroupController.DEVICES);
Assert.assertEquals(5, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
Assert.assertEquals(5, fpgaResourceHandler.getFpgaAllocator().getAllowedFpga().size());
// Case 2. subset of devices
fpgaResourceHandler = new FpgaResourceHandlerImpl(mockContext,
mockCGroupsHandler, mockPrivilegedExecutor, mockVendorPlugin);
allowed = "0,1,2";
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, allowed);
fpgaResourceHandler.bootstrap(configuration);
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAllowedFpga().size());
List<FpgaResourceAllocator.FpgaDevice> allowedDevices = fpgaResourceHandler.getFpgaAllocator().getAllowedFpga();
for (String s : allowed.split(",")) {
boolean check = false;
for (FpgaResourceAllocator.FpgaDevice device : allowedDevices) {
if (device.getMinor().toString().equals(s)) {
check = true;
}
}
Assert.assertTrue("Minor:" + s +"found", check);
}
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
// Case 3. User configuration contains invalid minor device number
fpgaResourceHandler = new FpgaResourceHandlerImpl(mockContext,
mockCGroupsHandler, mockPrivilegedExecutor, mockVendorPlugin);
allowed = "0,1,7";
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, allowed);
fpgaResourceHandler.bootstrap(configuration);
Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getAllowedFpga().size());
}
@Test
public void testBootstrapWithInvalidUserConfiguration() throws ResourceHandlerException {
// User configuration contains invalid minor device number
String allowed = "0,1,7";
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, allowed);
fpgaResourceHandler.bootstrap(configuration);
Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getAllowedFpga().size());
Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
String[] invalidAllowedStrings = {"a,1,2,", "a,1,2", "0,1,2,#", "a", "1,"};
for (String s : invalidAllowedStrings) {
boolean invalidConfiguration = false;
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, s);
try {
fpgaResourceHandler.bootstrap(configuration);
} catch (ResourceHandlerException e) {
invalidConfiguration = true;
}
Assert.assertTrue(invalidConfiguration);
}
String[] allowedStrings = {"1,2", "1"};
for (String s : allowedStrings) {
boolean invalidConfiguration = false;
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, s);
try {
fpgaResourceHandler.bootstrap(configuration);
} catch (ResourceHandlerException e) {
invalidConfiguration = true;
}
Assert.assertFalse(invalidConfiguration);
}
}
@Test
public void testBootStrapWithEmptyUserConfiguration() throws ResourceHandlerException {
// User configuration contains invalid minor device number
String allowed = "";
boolean invalidConfiguration = false;
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, allowed);
try {
fpgaResourceHandler.bootstrap(configuration);
} catch (ResourceHandlerException e) {
invalidConfiguration = true;
}
Assert.assertTrue(invalidConfiguration);
}
@Test
public void testAllocationWithPreference() throws ResourceHandlerException, PrivilegedOperationException {
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, "0,1,2");
fpgaResourceHandler.bootstrap(configuration);
// Case 1. The id-0 container request 1 FPGA of IntelOpenCL type and GEMM IP
fpgaResourceHandler.preStart(mockContainer(0, 1, "GEMM"));
Assert.assertEquals(1, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
verifyDeniedDevices(getContainerId(0), Arrays.asList(1, 2));
List<FpgaResourceAllocator.FpgaDevice> list = fpgaResourceHandler.getFpgaAllocator()
.getUsedFpga().get(getContainerId(0).toString());
for (FpgaResourceAllocator.FpgaDevice device : list) {
Assert.assertEquals("IP should be updated to GEMM", "GEMM", device.getIPID());
}
// Case 2. The id-1 container request 3 FPGA of IntelOpenCL and GEMM IP. this should fail
boolean flag = false;
try {
fpgaResourceHandler.preStart(mockContainer(1, 3, "GZIP"));
} catch (ResourceHandlerException e) {
flag = true;
}
Assert.assertTrue(flag);
// Case 3. Release the id-0 container
fpgaResourceHandler.postComplete(getContainerId(0));
Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
// Now we have enough devices, re-allocate for the id-1 container
fpgaResourceHandler.preStart(mockContainer(1, 3, "GEMM"));
// Id-1 container should have 0 denied devices
verifyDeniedDevices(getContainerId(1), new ArrayList<>());
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
// Release container id-1
fpgaResourceHandler.postComplete(getContainerId(1));
Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
// Case 4. Now all 3 devices should have IPID GEMM
// Try container id-2 and id-3
fpgaResourceHandler.preStart(mockContainer(2, 1, "GZIP"));
fpgaResourceHandler.postComplete(getContainerId(2));
fpgaResourceHandler.preStart(mockContainer(3, 2, "GEMM"));
// IPID should be GEMM for id-3 container
list = fpgaResourceHandler.getFpgaAllocator()
.getUsedFpga().get(getContainerId(3).toString());
for (FpgaResourceAllocator.FpgaDevice device : list) {
Assert.assertEquals("IPID should be GEMM", "GEMM", device.getIPID());
}
Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(1, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
fpgaResourceHandler.postComplete(getContainerId(3));
Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
// Case 5. id-4 request 0 FPGA device
fpgaResourceHandler.preStart(mockContainer(4, 0, ""));
// Deny all devices for id-4
verifyDeniedDevices(getContainerId(4), Arrays.asList(0, 1, 2));
Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
// Case 6. id-5 with invalid FPGA device
try {
fpgaResourceHandler.preStart(mockContainer(5, -2, ""));
} catch (ResourceHandlerException e) {
Assert.assertTrue(true);
}
}
@Test
public void testsAllocationWithExistingIPIDDevices() throws ResourceHandlerException, PrivilegedOperationException {
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, "0,1,2");
fpgaResourceHandler.bootstrap(configuration);
// The id-0 container request 3 FPGA of IntelOpenCL type and GEMM IP
fpgaResourceHandler.preStart(mockContainer(0, 3, "GEMM"));
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
List<FpgaResourceAllocator.FpgaDevice> list = fpgaResourceHandler.getFpgaAllocator()
.getUsedFpga().get(getContainerId(0).toString());
fpgaResourceHandler.postComplete(getContainerId(0));
for (FpgaResourceAllocator.FpgaDevice device : list) {
Assert.assertEquals("IP should be updated to GEMM", "GEMM", device.getIPID());
}
// Case 1. id-1 container request preStart, with no plugin.configureIP called
fpgaResourceHandler.preStart(mockContainer(1, 1, "GEMM"));
fpgaResourceHandler.preStart(mockContainer(2, 1, "GEMM"));
// we should have 3 times due to id-1 skip 1 invocation
verify(mockVendorPlugin, times(3)).configureIP(anyString(),anyString());
fpgaResourceHandler.postComplete(getContainerId(1));
fpgaResourceHandler.postComplete(getContainerId(2));
// Case 2. id-2 container request preStart, with 1 plugin.configureIP called
fpgaResourceHandler.preStart(mockContainer(1, 1, "GZIP"));
// we should have 4 times invocation
verify(mockVendorPlugin, times(4)).configureIP(anyString(),anyString());
}
@Test
public void testAllocationWithZeroDevices() throws ResourceHandlerException, PrivilegedOperationException {
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, "0,1,2");
fpgaResourceHandler.bootstrap(configuration);
// The id-0 container request 0 FPGA
fpgaResourceHandler.preStart(mockContainer(0, 0, null));
verifyDeniedDevices(getContainerId(0), Arrays.asList(0, 1, 2));
verify(mockVendorPlugin, times(0)).downloadIP(anyString(), anyString(), anyMap());
verify(mockVendorPlugin, times(0)).configureIP(anyString(), anyString());
}
@Test
public void testStateStore() throws ResourceHandlerException, IOException {
// Case 1. store 3 devices
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, "0,1,2");
fpgaResourceHandler.bootstrap(configuration);
Container container0 = mockContainer(0, 3, "GEMM");
fpgaResourceHandler.preStart(container0);
List<FpgaResourceAllocator.FpgaDevice> assigned =
fpgaResourceHandler.getFpgaAllocator().getUsedFpga().get(getContainerId(0).toString());
verify(mockNMStateStore).storeAssignedResources(container0,
ResourceInformation.FPGA_URI,
new ArrayList<>(assigned));
fpgaResourceHandler.postComplete(getContainerId(0));
// Case 2. ask 0, no store api called
Container container1 = mockContainer(1, 0, "");
fpgaResourceHandler.preStart(container1);
verify(mockNMStateStore, never()).storeAssignedResources(
eq(container1), eq(ResourceInformation.FPGA_URI), anyList());
}
@Test
public void testReacquireContainer() throws ResourceHandlerException {
Container c0 = mockContainer(0, 2, "GEMM");
List<FpgaResourceAllocator.FpgaDevice> assigned = new ArrayList<>();
assigned.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 0, null));
assigned.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 1, null));
// Mock we've stored the c0 states
mockStateStoreForContainer(c0, assigned);
// NM start
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, "0,1,2");
fpgaResourceHandler.bootstrap(configuration);
Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
// Case 1. try recover state for id-0 container
fpgaResourceHandler.reacquireContainer(getContainerId(0));
// minor number matches
List<FpgaResourceAllocator.FpgaDevice> used = fpgaResourceHandler.getFpgaAllocator().
getUsedFpga().get(getContainerId(0).toString());
int count = 0;
for (FpgaResourceAllocator.FpgaDevice device : used) {
if (device.getMinor().equals(0)){
count++;
}
if (device.getMinor().equals(1)) {
count++;
}
}
Assert.assertEquals("Unexpected used minor number in allocator",2, count);
List<FpgaResourceAllocator.FpgaDevice> available = fpgaResourceHandler.getFpgaAllocator().
getAvailableFpga().get(vendorType);
count = 0;
for (FpgaResourceAllocator.FpgaDevice device : available) {
if (device.getMinor().equals(2)) {
count++;
}
}
Assert.assertEquals("Unexpected available minor number in allocator", 1, count);
// Case 2. Recover a not allowed device with minor number 5
Container c1 = mockContainer(1, 1, "GEMM");
assigned = new ArrayList<>();
assigned.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 5, null));
// Mock we've stored the c1 states
mockStateStoreForContainer(c1, assigned);
boolean flag = false;
try {
fpgaResourceHandler.reacquireContainer(getContainerId(1));
} catch (ResourceHandlerException e) {
flag = true;
}
Assert.assertTrue(flag);
Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(1, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
// Case 3. recover a already used device by other container
Container c2 = mockContainer(2, 1, "GEMM");
assigned = new ArrayList<>();
assigned.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 1, null));
// Mock we've stored the c2 states
mockStateStoreForContainer(c2, assigned);
flag = false;
try {
fpgaResourceHandler.reacquireContainer(getContainerId(2));
} catch (ResourceHandlerException e) {
flag = true;
}
Assert.assertTrue(flag);
Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(1, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
// Case 4. recover a normal container c3 with remaining minor device number 2
Container c3 = mockContainer(3, 1, "GEMM");
assigned = new ArrayList<>();
assigned.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 2, null));
// Mock we've stored the c2 states
mockStateStoreForContainer(c3, assigned);
fpgaResourceHandler.reacquireContainer(getContainerId(3));
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
}
private void verifyDeniedDevices(ContainerId containerId,
List<Integer> deniedDevices)
throws ResourceHandlerException, PrivilegedOperationException {
verify(mockCGroupsHandler, atLeastOnce()).createCGroup(
CGroupsHandler.CGroupController.DEVICES, containerId.toString());
if (null != deniedDevices && !deniedDevices.isEmpty()) {
verify(mockPrivilegedExecutor, times(1)).executePrivilegedOperation(
new PrivilegedOperation(PrivilegedOperation.OperationType.FPGA, Arrays
.asList(FpgaResourceHandlerImpl.CONTAINER_ID_CLI_OPTION,
containerId.toString(),
FpgaResourceHandlerImpl.EXCLUDED_FPGAS_CLI_OPTION,
StringUtils.join(",", deniedDevices))), true);
} else if (deniedDevices.isEmpty()) {
verify(mockPrivilegedExecutor, times(1)).executePrivilegedOperation(
new PrivilegedOperation(PrivilegedOperation.OperationType.FPGA, Arrays
.asList(FpgaResourceHandlerImpl.CONTAINER_ID_CLI_OPTION,
containerId.toString())), true);
}
}
private static IntelFpgaOpenclPlugin mockPlugin(String type, List<FpgaResourceAllocator.FpgaDevice> list) {
IntelFpgaOpenclPlugin plugin = mock(IntelFpgaOpenclPlugin.class);
when(plugin.initPlugin(Mockito.anyObject())).thenReturn(true);
when(plugin.getFpgaType()).thenReturn(type);
when(plugin.downloadIP(Mockito.anyString(), Mockito.anyString(), Mockito.anyMap())).thenReturn("/tmp");
when(plugin.configureIP(Mockito.anyString(), Mockito.anyObject())).thenReturn(true);
when(plugin.discover(Mockito.anyInt())).thenReturn(list);
return plugin;
}
private static Container mockContainer(int id, int numFpga, String IPID) {
Container c = mock(Container.class);
Resource res = Resource.newInstance(1024, 1);
ResourceMappings resMapping = new ResourceMappings();
res.setResourceValue(ResourceInformation.FPGA_URI, numFpga);
when(c.getResource()).thenReturn(res);
when(c.getResourceMappings()).thenReturn(resMapping);
when(c.getContainerId()).thenReturn(getContainerId(id));
ContainerLaunchContext clc = mock(ContainerLaunchContext.class);
Map<String, String> envs = new HashMap<>();
if (numFpga > 0) {
envs.put("REQUESTED_FPGA_IP_ID", IPID);
}
when(c.getLaunchContext()).thenReturn(clc);
when(clc.getEnvironment()).thenReturn(envs);
when(c.getWorkDir()).thenReturn("/tmp");
ResourceSet resourceSet = new ResourceSet();
when(c.getResourceSet()).thenReturn(resourceSet);
return c;
}
private void mockStateStoreForContainer(Container container,
List<FpgaResourceAllocator.FpgaDevice> assigned) {
ResourceMappings rmap = new ResourceMappings();
ResourceMappings.AssignedResources ar =
new ResourceMappings.AssignedResources();
ar.updateAssignedResources(new ArrayList<>(assigned));
rmap.addAssignedResources(ResourceInformation.FPGA_URI, ar);
when(container.getResourceMappings()).thenReturn(rmap);
runningContainersMap.put(container.getContainerId(), container);
}
private static ContainerId getContainerId(int id) {
return ContainerId.newContainerId(ApplicationAttemptId
.newInstance(ApplicationId.newInstance(1234L, 1), 1), id);
}
}

View File

@ -0,0 +1,187 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestFpgaDiscoverer {
private String getTestParentFolder() {
File f = new File("target/temp/" + TestFpgaDiscoverer.class.getName());
return f.getAbsolutePath();
}
private void touchFile(File f) throws IOException {
new FileOutputStream(f).close();
}
@Before
public void before() throws IOException {
String folder = getTestParentFolder();
File f = new File(folder);
FileUtils.deleteDirectory(f);
f.mkdirs();
}
@Test
public void testLinuxFpgaResourceDiscoverPluginConfig() throws YarnException, IOException {
Configuration conf = new Configuration(false);
FpgaDiscoverer discoverer = FpgaDiscoverer.getInstance();
IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
// because FPGA discoverer is a singleton, we use setPlugin to make
// FpgaDiscoverer.getInstance().diagnose() work in openclPlugin.initPlugin()
discoverer.setResourceHanderPlugin(openclPlugin);
openclPlugin.initPlugin(conf);
openclPlugin.setShell(mockPuginShell());
discoverer.initialize(conf);
// Case 1. No configuration set for binary
Assert.assertEquals("No configuration should return just a single binary name",
"aocl", openclPlugin.getPathToExecutable());
// Case 2. With correct configuration and file exists
File fakeBinary = new File(getTestParentFolder() + "/aocl");
conf.set(YarnConfiguration.NM_FPGA_PATH_TO_EXEC, getTestParentFolder() + "/aocl");
touchFile(fakeBinary);
discoverer.initialize(conf);
Assert.assertEquals("Correct configuration should return user setting",
getTestParentFolder() + "/aocl", openclPlugin.getPathToExecutable());
// Case 3. With correct configuration but file doesn't exists. Use default
fakeBinary.delete();
discoverer.initialize(conf);
Assert.assertEquals("Correct configuration but file doesn't exists should return just a single binary name",
"aocl", openclPlugin.getPathToExecutable());
}
@Test
public void testDiscoverPluginParser() throws YarnException {
String output = "------------------------- acl0 -------------------------\n" +
"Vendor: Nallatech ltd\n" +
"Phys Dev Name Status Information\n" +
"aclnalla_pcie0Passed nalla_pcie (aclnalla_pcie0)\n" +
" PCIe dev_id = 2494, bus:slot.func = 02:00.00, Gen3 x8\n" +
" FPGA temperature = 53.1 degrees C.\n" +
" Total Card Power Usage = 31.7 Watts.\n" +
" Device Power Usage = 0.0 Watts.\n" +
"DIAGNOSTIC_PASSED" +
"---------------------------------------------------------\n";
output = output +
"------------------------- acl1 -------------------------\n" +
"Vendor: Nallatech ltd\n" +
"Phys Dev Name Status Information\n" +
"aclnalla_pcie1Passed nalla_pcie (aclnalla_pcie1)\n" +
" PCIe dev_id = 2495, bus:slot.func = 03:00.00, Gen3 x8\n" +
" FPGA temperature = 43.1 degrees C.\n" +
" Total Card Power Usage = 11.7 Watts.\n" +
" Device Power Usage = 0.0 Watts.\n" +
"DIAGNOSTIC_PASSED" +
"---------------------------------------------------------\n";
output = output +
"------------------------- acl2 -------------------------\n" +
"Vendor: Intel(R) Corporation\n" +
"\n" +
"Phys Dev Name Status Information\n" +
"\n" +
"acla10_ref0 Passed Arria 10 Reference Platform (acla10_ref0)\n" +
" PCIe dev_id = 2494, bus:slot.func = 09:00.00, Gen2 x8\n" +
" FPGA temperature = 50.5781 degrees C.\n" +
"\n" +
"DIAGNOSTIC_PASSED\n" +
"---------------------------------------------------------\n";
Configuration conf = new Configuration(false);
IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
FpgaDiscoverer.getInstance().setResourceHanderPlugin(openclPlugin);
openclPlugin.initPlugin(conf);
openclPlugin.setShell(mockPuginShell());
FpgaDiscoverer.getInstance().initialize(conf);
List<FpgaResourceAllocator.FpgaDevice> list = new LinkedList<>();
// Case 1. core parsing
openclPlugin.parseDiagnoseInfo(output, list);
Assert.assertEquals(3, list.size());
Assert.assertEquals("IntelOpenCL", list.get(0).getType());
Assert.assertEquals("247", list.get(0).getMajor().toString());
Assert.assertEquals("0", list.get(0).getMinor().toString());
Assert.assertEquals("acl0", list.get(0).getAliasDevName());
Assert.assertEquals("aclnalla_pcie0", list.get(0).getDevName());
Assert.assertEquals("02:00.00", list.get(0).getBusNum());
Assert.assertEquals("53.1 degrees C", list.get(0).getTemperature());
Assert.assertEquals("31.7 Watts", list.get(0).getCardPowerUsage());
Assert.assertEquals("IntelOpenCL", list.get(1).getType());
Assert.assertEquals("247", list.get(1).getMajor().toString());
Assert.assertEquals("1", list.get(1).getMinor().toString());
Assert.assertEquals("acl1", list.get(1).getAliasDevName());
Assert.assertEquals("aclnalla_pcie1", list.get(1).getDevName());
Assert.assertEquals("03:00.00", list.get(1).getBusNum());
Assert.assertEquals("43.1 degrees C", list.get(1).getTemperature());
Assert.assertEquals("11.7 Watts", list.get(1).getCardPowerUsage());
Assert.assertEquals("IntelOpenCL", list.get(2).getType());
Assert.assertEquals("246", list.get(2).getMajor().toString());
Assert.assertEquals("0", list.get(2).getMinor().toString());
Assert.assertEquals("acl2", list.get(2).getAliasDevName());
Assert.assertEquals("acla10_ref0", list.get(2).getDevName());
Assert.assertEquals("09:00.00", list.get(2).getBusNum());
Assert.assertEquals("50.5781 degrees C", list.get(2).getTemperature());
Assert.assertEquals("", list.get(2).getCardPowerUsage());
// Case 2. check alias map
Map<String, String> aliasMap = openclPlugin.getAliasMap();
Assert.assertEquals("acl0", aliasMap.get("247:0"));
Assert.assertEquals("acl1", aliasMap.get("247:1"));
Assert.assertEquals("acl2", aliasMap.get("246:0"));
}
private IntelFpgaOpenclPlugin.InnerShellExecutor mockPuginShell() {
IntelFpgaOpenclPlugin.InnerShellExecutor shell = mock(IntelFpgaOpenclPlugin.InnerShellExecutor.class);
when(shell.runDiagnose(anyString(),anyInt())).thenReturn("");
when(shell.getMajorAndMinorNumber("aclnalla_pcie0")).thenReturn("247:0");
when(shell.getMajorAndMinorNumber("aclnalla_pcie1")).thenReturn("247:1");
when(shell.getMajorAndMinorNumber("acla10_ref0")).thenReturn("246:0");
return shell;
}
}