YARN-9266. General improvements in IntelFpgaOpenclPlugin. Contributed by Peter Bacsko.

This commit is contained in:
Sunil G 2019-03-13 02:45:17 +05:30
parent 24793d2d97
commit 8e1539eca8
8 changed files with 619 additions and 421 deletions

View File

@ -25,7 +25,6 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -37,13 +36,13 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.AbstractFpgaVendorPlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDiscoverer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
@ -89,21 +88,25 @@ public String getRequestedIPID(Container container) {
}
@Override
public List<PrivilegedOperation> bootstrap(Configuration configuration) throws ResourceHandlerException {
public List<PrivilegedOperation> bootstrap(Configuration configuration)
throws ResourceHandlerException {
// The plugin should be initilized by FpgaDiscoverer already
if (!vendorPlugin.initPlugin(configuration)) {
throw new ResourceHandlerException("FPGA plugin initialization failed", null);
throw new ResourceHandlerException("FPGA plugin initialization failed");
}
LOG.info("FPGA Plugin bootstrap success.");
// Get avialable devices minor numbers from toolchain or static configuration
List<FpgaResourceAllocator.FpgaDevice> fpgaDeviceList = FpgaDiscoverer.getInstance().discover();
List<FpgaResourceAllocator.FpgaDevice> fpgaDeviceList =
FpgaDiscoverer.getInstance().discover();
allocator.addFpga(vendorPlugin.getFpgaType(), fpgaDeviceList);
this.cGroupsHandler.initializeCGroupController(CGroupsHandler.CGroupController.DEVICES);
this.cGroupsHandler.initializeCGroupController(
CGroupsHandler.CGroupController.DEVICES);
return null;
}
@Override
public List<PrivilegedOperation> preStart(Container container) throws ResourceHandlerException {
public List<PrivilegedOperation> preStart(Container container)
throws ResourceHandlerException {
// 1. Get requested FPGA type and count, choose corresponding FPGA plugin(s)
// 2. Use allocator.assignFpga(type, count) to get FPGAAllocation
// 3. If required, download to ensure IP file exists and configure IP file for all devices
@ -126,7 +129,8 @@ public List<PrivilegedOperation> preStart(Container container) throws ResourceHa
container, getRequestedIPID(container));
LOG.info("FpgaAllocation:" + allocation);
PrivilegedOperation privilegedOperation = new PrivilegedOperation(PrivilegedOperation.OperationType.FPGA,
PrivilegedOperation privilegedOperation =
new PrivilegedOperation(PrivilegedOperation.OperationType.FPGA,
Arrays.asList(CONTAINER_ID_CLI_OPTION, containerIdStr));
if (!allocation.getDenied().isEmpty()) {
List<Integer> denied = new ArrayList<>();
@ -134,7 +138,8 @@ public List<PrivilegedOperation> preStart(Container container) throws ResourceHa
privilegedOperation.appendArgs(Arrays.asList(EXCLUDED_FPGAS_CLI_OPTION,
StringUtils.join(",", denied)));
}
privilegedOperationExecutor.executePrivilegedOperation(privilegedOperation, true);
privilegedOperationExecutor.executePrivilegedOperation(
privilegedOperation, true);
if (deviceCount > 0) {
/**
@ -152,25 +157,30 @@ public List<PrivilegedOperation> preStart(Container container) throws ResourceHa
* for different devices
*
* */
ipFilePath = vendorPlugin.downloadIP(getRequestedIPID(container), container.getWorkDir(),
ipFilePath = vendorPlugin.retrieveIPfilePath(
getRequestedIPID(container),
container.getWorkDir(),
container.getResourceSet().getLocalizedResources());
if (ipFilePath.isEmpty()) {
LOG.warn("FPGA plugin failed to download IP but continue, please check the value of environment viable: " +
REQUEST_FPGA_IP_ID_KEY + " if you want yarn to help");
if (ipFilePath == null) {
LOG.warn("FPGA plugin failed to downloaded IP, please check the" +
" value of environment viable: " + REQUEST_FPGA_IP_ID_KEY +
" if you want YARN to program the device");
} else {
LOG.info("IP file path:" + ipFilePath);
List<FpgaResourceAllocator.FpgaDevice> allowed = allocation.getAllowed();
String majorMinorNumber;
for (int i = 0; i < allowed.size(); i++) {
majorMinorNumber = allowed.get(i).getMajor() + ":" + allowed.get(i).getMinor();
String currentIPID = allowed.get(i).getIPID();
FpgaDevice device = allowed.get(i);
majorMinorNumber = device.getMajor() + ":" + device.getMinor();
String currentIPID = device.getIPID();
if (null != currentIPID &&
currentIPID.equalsIgnoreCase(getRequestedIPID(container))) {
LOG.info("IP already in device \"" + allowed.get(i).getAliasDevName() + "," +
majorMinorNumber + "\", skip reprogramming");
LOG.info("IP already in device \"" +
allowed.get(i).getAliasDevName() +
"," + majorMinorNumber + "\", skip reprogramming");
continue;
}
if (vendorPlugin.configureIP(ipFilePath, majorMinorNumber)) {
if (vendorPlugin.configureIP(ipFilePath, device)) {
// update the allocator that we update an IP of a device
allocator.updateFpga(containerIdStr, allowed.get(i),
getRequestedIPID(container));
@ -186,7 +196,8 @@ public List<PrivilegedOperation> preStart(Container container) throws ResourceHa
throw re;
} catch (PrivilegedOperationException e) {
allocator.cleanupAssignFpgas(containerIdStr);
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES, containerIdStr);
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES,
containerIdStr);
LOG.warn("Could not update cgroup for container", e);
throw new ResourceHandlerException(e);
}
@ -200,7 +211,8 @@ public List<PrivilegedOperation> preStart(Container container) throws ResourceHa
}
@Override
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId) throws ResourceHandlerException {
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
throws ResourceHandlerException {
allocator.recoverAssignedFpgas(containerId);
return null;
}
@ -212,7 +224,8 @@ public List<PrivilegedOperation> updateContainer(Container container)
}
@Override
public List<PrivilegedOperation> postComplete(ContainerId containerId) throws ResourceHandlerException {
public List<PrivilegedOperation> postComplete(ContainerId containerId)
throws ResourceHandlerException {
allocator.cleanupAssignFpgas(containerId.toString());
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES,
containerId.toString());

View File

@ -21,10 +21,10 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
import java.util.List;
import java.util.Map;
@ -38,7 +38,7 @@
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface AbstractFpgaVendorPlugin extends Configurable{
public interface AbstractFpgaVendorPlugin {
/**
* Check vendor's toolchain and required environment
@ -72,19 +72,14 @@ public interface AbstractFpgaVendorPlugin extends Configurable{
* localized file path and value is soft link names
* @return The absolute path string of IP file
* */
String downloadIP(String id, String dstDir, Map<Path, List<String>> localizedResources);
String retrieveIPfilePath(String id, String dstDir,
Map<Path, List<String>> localizedResources);
/**
* The vendor plugin configure an IP file to a device
* @param ipPath The absolute path of the IP file
* @param majorMinorNumber The device in format &lt;major:minor&gt;
* @param device The FPGA device object
* @return configure device ok or not
* */
boolean configureIP(String ipPath, String majorMinorNumber);
@Override
void setConf(Configuration conf);
@Override
Configuration getConf();
boolean configureIP(String ipPath, FpgaDevice device);
}

View File

@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin.InnerShellExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final class AoclDiagnosticOutputParser {
private AoclDiagnosticOutputParser() {
// no instances
}
private static final Logger LOG = LoggerFactory.getLogger(
AoclDiagnosticOutputParser.class);
/**
* One real sample output of Intel FPGA SDK 17.0's "aocl diagnose" is as below:
* "
* aocl diagnose: Running diagnose from /home/fpga/intelFPGA_pro/17.0/hld/board/nalla_pcie/linux64/libexec
*
* ------------------------- acl0 -------------------------
* Vendor: Nallatech ltd
*
* Phys Dev Name Status Information
*
* aclnalla_pcie0Passed nalla_pcie (aclnalla_pcie0)
* PCIe dev_id = 2494, bus:slot.func = 02:00.00, Gen3 x8
* FPGA temperature = 54.4 degrees C.
* Total Card Power Usage = 31.7 Watts.
* Device Power Usage = 0.0 Watts.
*
* DIAGNOSTIC_PASSED
* ---------------------------------------------------------
* "
*
* While per Intel's guide, the output(should be outdated or prior SDK version's) is as below:
*
* "
* aocl diagnose: Running diagnostic from ALTERAOCLSDKROOT/board/&lt;board_name&gt;/
* &lt;platform&gt;/libexec
* Verified that the kernel mode driver is installed on the host machine.
* Using board package from vendor: &lt;board_vendor_name&gt;
* Querying information for all supported devices that are installed on the host
* machine ...
*
* device_name Status Information
*
* acl0 Passed &lt;descriptive_board_name&gt;
* PCIe dev_id = &lt;device_ID&gt;, bus:slot.func = 02:00.00,
* at Gen 2 with 8 lanes.
* FPGA temperature=43.0 degrees C.
* acl1 Passed &lt;descriptive_board_name&gt;
* PCIe dev_id = &lt;device_ID&gt;, bus:slot.func = 03:00.00,
* at Gen 2 with 8 lanes.
* FPGA temperature = 35.0 degrees C.
*
* Found 2 active device(s) installed on the host machine, to perform a full
* diagnostic on a specific device, please run aocl diagnose &lt;device_name&gt;
*
* DIAGNOSTIC_PASSED
* "
* But this method only support the first output
* */
public static List<FpgaDevice> parseDiagnosticOutput(
String output, InnerShellExecutor shellExecutor, String fpgaType) {
if (output.contains("DIAGNOSTIC_PASSED")) {
List<FpgaDevice> devices = new ArrayList<>();
Matcher headerStartMatcher = Pattern.compile("acl[0-31]")
.matcher(output);
Matcher headerEndMatcher = Pattern.compile("(?i)DIAGNOSTIC_PASSED")
.matcher(output);
int sectionStartIndex;
int sectionEndIndex;
String aliasName;
while (headerStartMatcher.find()) {
sectionStartIndex = headerStartMatcher.end();
String section = null;
aliasName = headerStartMatcher.group();
while (headerEndMatcher.find(sectionStartIndex)) {
sectionEndIndex = headerEndMatcher.start();
section = output.substring(sectionStartIndex, sectionEndIndex);
break;
}
if (section == null) {
LOG.warn("Unsupported diagnose output");
LOG.warn("aocl output is: " + output);
return Collections.emptyList();
}
// devName, \(.*\)
// busNum, bus:slot.func\s=\s.*,
// FPGA temperature\s=\s.*
// Total\sCard\sPower\sUsage\s=\s.*
String[] fieldRegexes = new String[]{"\\(.*\\)\n",
"(?i)bus:slot.func\\s=\\s.*,",
"(?i)FPGA temperature\\s=\\s.*",
"(?i)Total\\sCard\\sPower\\sUsage\\s=\\s.*"};
String[] fields = new String[4];
String tempFieldValue;
for (int i = 0; i < fieldRegexes.length; i++) {
Matcher fieldMatcher = Pattern.compile(fieldRegexes[i])
.matcher(section);
if (!fieldMatcher.find()) {
LOG.warn("Couldn't find " + fieldRegexes[i] + " pattern");
fields[i] = "";
continue;
}
tempFieldValue = fieldMatcher.group().trim();
if (i == 0) {
// special case for Device name
fields[i] = tempFieldValue.substring(1,
tempFieldValue.length() - 1);
} else {
String ss = tempFieldValue.split("=")[1].trim();
fields[i] = ss.substring(0, ss.length() - 1);
}
}
String majorMinorNumber = shellExecutor
.getMajorAndMinorNumber(fields[0]);
if (null != majorMinorNumber) {
String[] mmn = majorMinorNumber.split(":");
devices.add(new FpgaDevice(fpgaType,
Integer.parseInt(mmn[0]),
Integer.parseInt(mmn[1]), null,
fields[0], aliasName, fields[1], fields[2], fields[3]));
} else {
LOG.warn("Failed to retrieve major/minor number for device");
}
}
return devices;
} else {
LOG.warn("The diagnostic has failed");
LOG.warn("Output of aocl is: " + output);
return Collections.emptyList();
}
}
}

View File

@ -23,126 +23,118 @@
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
/**
* Intel FPGA for OpenCL plugin.
* The key points are:
* 1. It uses Intel's toolchain "aocl" to discover devices/reprogram IP to the device
* before container launch to achieve a quickest reprogramming path
* 1. It uses Intel's toolchain "aocl" to discover devices/reprogram IP
* to the device before container launch to achieve a quickest
* reprogramming path
* 2. It avoids reprogramming by maintaining a mapping of device to FPGA IP ID
* 3. It assume IP file is distributed to container directory
*/
public class IntelFpgaOpenclPlugin implements AbstractFpgaVendorPlugin {
public static final Logger LOG = LoggerFactory.getLogger(
private static final Logger LOG = LoggerFactory.getLogger(
IntelFpgaOpenclPlugin.class);
private boolean initialized = false;
private Configuration conf;
private InnerShellExecutor shell;
protected static final String DEFAULT_BINARY_NAME = "aocl";
private static final String DEFAULT_BINARY_NAME = "aocl";
protected static final String ALTERAOCLSDKROOT_NAME = "ALTERAOCLSDKROOT";
private static final String ALTERAOCLSDKROOT_NAME = "ALTERAOCLSDKROOT";
private Function<String, String> envProvider = System::getenv;
private String pathToExecutable = null;
// a mapping of major:minor number to acl0-31
private Map<String, String> aliasMap;
@VisibleForTesting
void setInnerShellExecutor(InnerShellExecutor shellExecutor) {
this.shell = shellExecutor;
}
@VisibleForTesting
String getPathToExecutable() {
return pathToExecutable;
}
@VisibleForTesting
void setEnvProvider(Function<String, String> envProvider) {
this.envProvider = envProvider;
}
public IntelFpgaOpenclPlugin() {
this.shell = new InnerShellExecutor();
}
public String getDefaultBinaryName() {
return DEFAULT_BINARY_NAME;
}
public String getDefaultPathToExecutable() {
return System.getenv(ALTERAOCLSDKROOT_NAME);
}
public static String getDefaultPathEnvName() {
return ALTERAOCLSDKROOT_NAME;
}
@VisibleForTesting
public String getPathToExecutable() {
return pathToExecutable;
}
public void setPathToExecutable(String pathToExecutable) {
this.pathToExecutable = pathToExecutable;
}
@VisibleForTesting
public void setShell(InnerShellExecutor shell) {
this.shell = shell;
}
public Map<String, String> getAliasMap() {
return aliasMap;
return envProvider.apply(ALTERAOCLSDKROOT_NAME);
}
/**
* Check the Intel FPGA for OpenCL toolchain
* Check the Intel FPGA for OpenCL toolchain.
* */
@Override
public boolean initPlugin(Configuration conf) {
this.aliasMap = new HashMap<>();
if (this.initialized) {
public boolean initPlugin(Configuration config) {
if (initialized) {
return true;
}
// Find the proper toolchain, mainly aocl
String pluginDefaultBinaryName = getDefaultBinaryName();
String pathToExecutable = conf.get(YarnConfiguration.NM_FPGA_PATH_TO_EXEC,
"");
if (pathToExecutable.isEmpty()) {
pathToExecutable = pluginDefaultBinaryName;
}
String pluginDefaultBinaryName = DEFAULT_BINARY_NAME;
String executable = config.get(YarnConfiguration.NM_FPGA_PATH_TO_EXEC,
pluginDefaultBinaryName);
// Validate file existence
File binaryPath = new File(pathToExecutable);
File binaryPath = new File(executable);
if (!binaryPath.exists()) {
// When binary not exist, fail
LOG.warn("Failed to find FPGA discoverer executable configured in " +
YarnConfiguration.NM_FPGA_PATH_TO_EXEC +
", please check! Try default path");
pathToExecutable = pluginDefaultBinaryName;
executable = pluginDefaultBinaryName;
// Try to find in plugin's preferred path
String pluginDefaultPreferredPath = getDefaultPathToExecutable();
if (null == pluginDefaultPreferredPath) {
LOG.warn("Failed to find FPGA discoverer executable from system environment " +
getDefaultPathEnvName()+
LOG.warn("Failed to find FPGA discoverer executable from system "
+ " environment " + ALTERAOCLSDKROOT_NAME +
", please check your environment!");
} else {
binaryPath = new File(pluginDefaultPreferredPath + "/bin", pluginDefaultBinaryName);
binaryPath = new File(pluginDefaultPreferredPath + "/bin",
pluginDefaultBinaryName);
if (binaryPath.exists()) {
pathToExecutable = binaryPath.getAbsolutePath();
executable = binaryPath.getAbsolutePath();
LOG.info("Succeed in finding FPGA discoverer executable: " +
pathToExecutable);
executable);
} else {
pathToExecutable = pluginDefaultBinaryName;
executable = pluginDefaultBinaryName;
LOG.warn("Failed to find FPGA discoverer executable in " +
pluginDefaultPreferredPath + ", file doesn't exists! Use default binary" + pathToExecutable);
pluginDefaultPreferredPath +
", file doesn't exists! Use default binary" + executable);
}
}
}
setPathToExecutable(pathToExecutable);
pathToExecutable = executable;
if (!diagnose(10*1000)) {
LOG.warn("Intel FPGA for OpenCL diagnose failed!");
this.initialized = false;
initialized = false;
} else {
this.initialized = true;
initialized = true;
}
return this.initialized;
return initialized;
}
@Override
@ -153,10 +145,16 @@ public List<FpgaResourceAllocator.FpgaDevice> discover(int timeout) {
if (null == output) {
return list;
}
parseDiagnoseInfo(output, list);
list = AoclDiagnosticOutputParser.parseDiagnosticOutput(output,
shell, getFpgaType());
return list;
}
/**
* Helper class to run aocl diagnose & determine major/minor numbers.
*/
public static class InnerShellExecutor {
// ls /dev/<devName>
@ -170,13 +168,13 @@ public String getMajorAndMinorNumber(String devName) {
shexec.execute();
String[] strs = shexec.getOutput().trim().split(":");
LOG.debug("stat output:" + shexec.getOutput());
output = Integer.parseInt(strs[0], 16) + ":" + Integer.parseInt(strs[1], 16);
output = Integer.parseInt(strs[0], 16) + ":" +
Integer.parseInt(strs[1], 16);
} catch (IOException e) {
String msg =
"Failed to get major-minor number from reading /dev/" + devName;
LOG.warn(msg);
LOG.debug("Command output:" + shexec.getOutput() + ", exit code:" +
shexec.getExitCode());
LOG.warn("Failed to get major-minor number from reading /dev/" +
devName);
LOG.warn("Command output:" + shexec.getOutput() + ", exit code: " +
shexec.getExitCode(), e);
}
return output;
}
@ -184,7 +182,7 @@ public String getMajorAndMinorNumber(String devName) {
public String runDiagnose(String binary, int timeout) {
String output = null;
Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor(
new String[]{binary, "diagnose"});
new String[]{binary, "diagnose"}, null, null, timeout);
try {
shexec.execute();
} catch (IOException e) {
@ -198,112 +196,6 @@ public String runDiagnose(String binary, int timeout) {
}
return shexec.getOutput();
}
}
/**
* One real sample output of Intel FPGA SDK 17.0's "aocl diagnose" is as below:
* "
* aocl diagnose: Running diagnose from /home/fpga/intelFPGA_pro/17.0/hld/board/nalla_pcie/linux64/libexec
*
* ------------------------- acl0 -------------------------
* Vendor: Nallatech ltd
*
* Phys Dev Name Status Information
*
* aclnalla_pcie0Passed nalla_pcie (aclnalla_pcie0)
* PCIe dev_id = 2494, bus:slot.func = 02:00.00, Gen3 x8
* FPGA temperature = 54.4 degrees C.
* Total Card Power Usage = 31.7 Watts.
* Device Power Usage = 0.0 Watts.
*
* DIAGNOSTIC_PASSED
* ---------------------------------------------------------
* "
*
* While per Intel's guide, the output(should be outdated or prior SDK version's) is as below:
*
* "
* aocl diagnose: Running diagnostic from ALTERAOCLSDKROOT/board/&lt;board_name&gt;/
* &lt;platform&gt;/libexec
* Verified that the kernel mode driver is installed on the host machine.
* Using board package from vendor: &lt;board_vendor_name&gt;
* Querying information for all supported devices that are installed on the host
* machine ...
*
* device_name Status Information
*
* acl0 Passed &lt;descriptive_board_name&gt;
* PCIe dev_id = &lt;device_ID&gt;, bus:slot.func = 02:00.00,
* at Gen 2 with 8 lanes.
* FPGA temperature=43.0 degrees C.
* acl1 Passed &lt;descriptive_board_name&gt;
* PCIe dev_id = &lt;device_ID&gt;, bus:slot.func = 03:00.00,
* at Gen 2 with 8 lanes.
* FPGA temperature = 35.0 degrees C.
*
* Found 2 active device(s) installed on the host machine, to perform a full
* diagnostic on a specific device, please run aocl diagnose &lt;device_name&gt;
*
* DIAGNOSTIC_PASSED
* "
* But this method only support the first output
* */
public void parseDiagnoseInfo(String output, List<FpgaResourceAllocator.FpgaDevice> list) {
if (output.contains("DIAGNOSTIC_PASSED")) {
Matcher headerStartMatcher = Pattern.compile("acl[0-31]").matcher(output);
Matcher headerEndMatcher = Pattern.compile("(?i)DIAGNOSTIC_PASSED").matcher(output);
int sectionStartIndex;
int sectionEndIndex;
String aliasName;
while (headerStartMatcher.find()) {
sectionStartIndex = headerStartMatcher.end();
String section = null;
aliasName = headerStartMatcher.group();
while (headerEndMatcher.find(sectionStartIndex)) {
sectionEndIndex = headerEndMatcher.start();
section = output.substring(sectionStartIndex, sectionEndIndex);
break;
}
if (null == section) {
LOG.warn("Unsupported diagnose output");
return;
}
// devName, \(.*\)
// busNum, bus:slot.func\s=\s.*,
// FPGA temperature\s=\s.*
// Total\sCard\sPower\sUsage\s=\s.*
String[] fieldRegexes = new String[]{"\\(.*\\)\n", "(?i)bus:slot.func\\s=\\s.*,",
"(?i)FPGA temperature\\s=\\s.*", "(?i)Total\\sCard\\sPower\\sUsage\\s=\\s.*"};
String[] fields = new String[4];
String tempFieldValue;
for (int i = 0; i < fieldRegexes.length; i++) {
Matcher fieldMatcher = Pattern.compile(fieldRegexes[i]).matcher(section);
if (!fieldMatcher.find()) {
LOG.warn("Couldn't find " + fieldRegexes[i] + " pattern");
fields[i] = "";
continue;
}
tempFieldValue = fieldMatcher.group().trim();
if (i == 0) {
// special case for Device name
fields[i] = tempFieldValue.substring(1, tempFieldValue.length() - 1);
} else {
String ss = tempFieldValue.split("=")[1].trim();
fields[i] = ss.substring(0, ss.length() - 1);
}
}
String majorMinorNumber = this.shell.getMajorAndMinorNumber(fields[0]);
if (null != majorMinorNumber) {
String[] mmn = majorMinorNumber.split(":");
this.aliasMap.put(majorMinorNumber, aliasName);
list.add(new FpgaResourceAllocator.FpgaDevice(getFpgaType(),
Integer.parseInt(mmn[0]),
Integer.parseInt(mmn[1]), null,
fields[0], aliasName, fields[1], fields[2], fields[3]));
}
}// end while
}// end if
}
public String getDiagnoseInfo(int timeout) {
@ -328,72 +220,78 @@ public String getFpgaType() {
}
@Override
public String downloadIP(String id, String dstDir, Map<Path, List<String>> localizedResources) {
public String retrieveIPfilePath(String id, String dstDir,
Map<Path, List<String>> localizedResources) {
// Assume .aocx IP file is distributed by DS to local dir
String r = "";
Path path;
LOG.info("Got environment: " + id + ", search IP file in localized resources");
String ipFilePath = null;
LOG.info("Got environment: " + id +
", search IP file in localized resources");
if (null == id || id.isEmpty()) {
LOG.warn("IP_ID environment is empty, skip downloading");
return r;
return null;
}
if (localizedResources != null) {
for (Map.Entry<Path, List<String>> resourceEntry :
localizedResources.entrySet()) {
path = resourceEntry.getKey();
LOG.debug("Check:" + path.toUri().toString());
if (path.getName().toLowerCase().contains(id.toLowerCase()) && path.getName().endsWith(".aocx")) {
r = path.toUri().toString();
LOG.debug("Found: " + r);
break;
}
Optional<Path> aocxPath = localizedResources
.keySet()
.stream()
.filter(path -> matchesIpid(path, id))
.findFirst();
if (aocxPath.isPresent()) {
ipFilePath = aocxPath.get().toUri().toString();
LOG.debug("Found: " + ipFilePath);
}
} else {
LOG.warn("Localized resource is null!");
}
return r;
return ipFilePath;
}
private boolean matchesIpid(Path p, String id) {
return p.getName().toLowerCase().equals(id.toLowerCase())
&& p.getName().endsWith(".aocx");
}
/**
* Program one device.
* It's ok for the offline "aocl program" failed because the application will always invoke API to program
* The reason we do offline reprogramming is to make the application's program process faster
* It's ok for the offline "aocl program" failed because the application will
* always invoke API to program.
* The reason we do offline reprogramming is to make the application's
* program process faster.
* @param ipPath the absolute path to the aocx IP file
* @param majorMinorNumber major:minor string
* @return True or False
* @param device Fpga device object which represents the card
* @return false if programming the card fails
* */
@Override
public boolean configureIP(String ipPath, String majorMinorNumber) {
public boolean configureIP(String ipPath, FpgaDevice device) {
// perform offline program the IP to get a quickest reprogramming sequence
// we need a mapping of "major:minor" to "acl0" to issue command "aocl program <acl0> <ipPath>"
// we need a mapping of "major:minor" to "acl0" to
// issue command "aocl program <acl0> <ipPath>"
Shell.ShellCommandExecutor shexec;
String aclName;
aclName = this.aliasMap.get(majorMinorNumber);
aclName = device.getAliasDevName();
shexec = new Shell.ShellCommandExecutor(
new String[]{this.pathToExecutable, "program", aclName, ipPath});
try {
shexec.execute();
if (0 == shexec.getExitCode()) {
LOG.debug(shexec.getOutput());
LOG.info("Intel aocl program " + ipPath + " to " + aclName + " successfully");
LOG.info("Intel aocl program " + ipPath + " to " +
aclName + " successfully");
} else {
LOG.error("Device programming failed, aocl output is:");
LOG.error(shexec.getOutput());
return false;
}
} catch (IOException e) {
LOG.error("Intel aocl program " + ipPath + " to " + aclName + " failed!");
e.printStackTrace();
LOG.error("Intel aocl program " + ipPath + " to " +
aclName + " failed!", e);
LOG.error("Aocl output: " + shexec.getOutput());
return false;
}
return true;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return this.conf;
}
}

View File

@ -0,0 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;

View File

@ -18,9 +18,35 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -30,6 +56,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDiscoverer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin;
@ -38,15 +65,6 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static org.mockito.Mockito.*;
public class TestFpgaResourceHandler {
private Context mockContext;
@ -95,16 +113,20 @@ public void testBootstrap() throws ResourceHandlerException {
verify(mockVendorPlugin, times(1)).initPlugin(configuration);
verify(mockCGroupsHandler, times(1)).initializeCGroupController(
CGroupsHandler.CGroupController.DEVICES);
Assert.assertEquals(5, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
Assert.assertEquals(5, fpgaResourceHandler.getFpgaAllocator().getAllowedFpga().size());
Assert.assertEquals(5, fpgaResourceHandler.getFpgaAllocator()
.getAvailableFpgaCount());
Assert.assertEquals(5, fpgaResourceHandler.getFpgaAllocator()
.getAllowedFpga().size());
// Case 2. subset of devices
fpgaResourceHandler = new FpgaResourceHandlerImpl(mockContext,
mockCGroupsHandler, mockPrivilegedExecutor, mockVendorPlugin);
allowed = "0,1,2";
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, allowed);
fpgaResourceHandler.bootstrap(configuration);
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAllowedFpga().size());
List<FpgaResourceAllocator.FpgaDevice> allowedDevices = fpgaResourceHandler.getFpgaAllocator().getAllowedFpga();
Assert.assertEquals(3,
fpgaResourceHandler.getFpgaAllocator().getAllowedFpga().size());
List<FpgaResourceAllocator.FpgaDevice> allowedDevices =
fpgaResourceHandler.getFpgaAllocator().getAllowedFpga();
for (String s : allowed.split(",")) {
boolean check = false;
for (FpgaResourceAllocator.FpgaDevice device : allowedDevices) {
@ -114,7 +136,8 @@ public void testBootstrap() throws ResourceHandlerException {
}
Assert.assertTrue("Minor:" + s +"found", check);
}
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
Assert.assertEquals(3,
fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
// Case 3. User configuration contains invalid minor device number
fpgaResourceHandler = new FpgaResourceHandlerImpl(mockContext,
@ -122,18 +145,23 @@ public void testBootstrap() throws ResourceHandlerException {
allowed = "0,1,7";
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, allowed);
fpgaResourceHandler.bootstrap(configuration);
Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getAllowedFpga().size());
Assert.assertEquals(2,
fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
Assert.assertEquals(2,
fpgaResourceHandler.getFpgaAllocator().getAllowedFpga().size());
}
@Test
public void testBootstrapWithInvalidUserConfiguration() throws ResourceHandlerException {
public void testBootstrapWithInvalidUserConfiguration()
throws ResourceHandlerException {
// User configuration contains invalid minor device number
String allowed = "0,1,7";
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, allowed);
fpgaResourceHandler.bootstrap(configuration);
Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getAllowedFpga().size());
Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
Assert.assertEquals(2,
fpgaResourceHandler.getFpgaAllocator().getAllowedFpga().size());
Assert.assertEquals(2,
fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
String[] invalidAllowedStrings = {"a,1,2,", "a,1,2", "0,1,2,#", "a", "1,"};
for (String s : invalidAllowedStrings) {
@ -161,7 +189,8 @@ public void testBootstrapWithInvalidUserConfiguration() throws ResourceHandlerEx
}
@Test
public void testBootStrapWithEmptyUserConfiguration() throws ResourceHandlerException {
public void testBootStrapWithEmptyUserConfiguration()
throws ResourceHandlerException {
// User configuration contains invalid minor device number
String allowed = "";
boolean invalidConfiguration = false;
@ -175,7 +204,8 @@ public void testBootStrapWithEmptyUserConfiguration() throws ResourceHandlerExce
}
@Test
public void testAllocationWithPreference() throws ResourceHandlerException, PrivilegedOperationException {
public void testAllocationWithPreference()
throws ResourceHandlerException, PrivilegedOperationException {
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, "0,1,2");
fpgaResourceHandler.bootstrap(configuration);
// Case 1. The id-0 container request 1 FPGA of IntelOpenCL type and GEMM IP
@ -197,18 +227,24 @@ public void testAllocationWithPreference() throws ResourceHandlerException, Priv
Assert.assertTrue(flag);
// Case 3. Release the id-0 container
fpgaResourceHandler.postComplete(getContainerId(0));
Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
Assert.assertEquals(0,
fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(3,
fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
// Now we have enough devices, re-allocate for the id-1 container
fpgaResourceHandler.preStart(mockContainer(1, 3, "GEMM"));
// Id-1 container should have 0 denied devices
verifyDeniedDevices(getContainerId(1), new ArrayList<>());
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
Assert.assertEquals(3,
fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(0,
fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
// Release container id-1
fpgaResourceHandler.postComplete(getContainerId(1));
Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
Assert.assertEquals(0,
fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(3,
fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
// Case 4. Now all 3 devices should have IPID GEMM
// Try container id-2 and id-3
fpgaResourceHandler.preStart(mockContainer(2, 1, "GZIP"));
@ -221,18 +257,24 @@ public void testAllocationWithPreference() throws ResourceHandlerException, Priv
for (FpgaResourceAllocator.FpgaDevice device : list) {
Assert.assertEquals("IPID should be GEMM", "GEMM", device.getIPID());
}
Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(1, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
Assert.assertEquals(2,
fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(1,
fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
fpgaResourceHandler.postComplete(getContainerId(3));
Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
Assert.assertEquals(0,
fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(3,
fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
// Case 5. id-4 request 0 FPGA device
fpgaResourceHandler.preStart(mockContainer(4, 0, ""));
// Deny all devices for id-4
verifyDeniedDevices(getContainerId(4), Arrays.asList(0, 1, 2));
Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
Assert.assertEquals(0,
fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(3,
fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
// Case 6. id-5 with invalid FPGA device
try {
@ -243,53 +285,68 @@ public void testAllocationWithPreference() throws ResourceHandlerException, Priv
}
@Test
public void testsAllocationWithExistingIPIDDevices() throws ResourceHandlerException, PrivilegedOperationException {
public void testsAllocationWithExistingIPIDDevices()
throws ResourceHandlerException, PrivilegedOperationException {
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, "0,1,2");
fpgaResourceHandler.bootstrap(configuration);
// The id-0 container request 3 FPGA of IntelOpenCL type and GEMM IP
fpgaResourceHandler.preStart(mockContainer(0, 3, "GEMM"));
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
List<FpgaResourceAllocator.FpgaDevice> list = fpgaResourceHandler.getFpgaAllocator()
.getUsedFpga().get(getContainerId(0).toString());
Assert.assertEquals(3,
fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
List<FpgaResourceAllocator.FpgaDevice> list =
fpgaResourceHandler
.getFpgaAllocator()
.getUsedFpga()
.get(getContainerId(0).toString());
fpgaResourceHandler.postComplete(getContainerId(0));
for (FpgaResourceAllocator.FpgaDevice device : list) {
Assert.assertEquals("IP should be updated to GEMM", "GEMM", device.getIPID());
Assert.assertEquals("IP should be updated to GEMM", "GEMM",
device.getIPID());
}
// Case 1. id-1 container request preStart, with no plugin.configureIP called
fpgaResourceHandler.preStart(mockContainer(1, 1, "GEMM"));
fpgaResourceHandler.preStart(mockContainer(2, 1, "GEMM"));
// we should have 3 times due to id-1 skip 1 invocation
verify(mockVendorPlugin, times(3)).configureIP(anyString(),anyString());
verify(mockVendorPlugin, times(3)).configureIP(anyString(),
any(FpgaDevice.class));
fpgaResourceHandler.postComplete(getContainerId(1));
fpgaResourceHandler.postComplete(getContainerId(2));
// Case 2. id-2 container request preStart, with 1 plugin.configureIP called
fpgaResourceHandler.preStart(mockContainer(1, 1, "GZIP"));
// we should have 4 times invocation
verify(mockVendorPlugin, times(4)).configureIP(anyString(),anyString());
verify(mockVendorPlugin, times(4)).configureIP(anyString(),
any(FpgaDevice.class));
}
@Test
public void testAllocationWithZeroDevices() throws ResourceHandlerException, PrivilegedOperationException {
public void testAllocationWithZeroDevices()
throws ResourceHandlerException, PrivilegedOperationException {
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, "0,1,2");
fpgaResourceHandler.bootstrap(configuration);
// The id-0 container request 0 FPGA
fpgaResourceHandler.preStart(mockContainer(0, 0, null));
verifyDeniedDevices(getContainerId(0), Arrays.asList(0, 1, 2));
verify(mockVendorPlugin, times(0)).downloadIP(anyString(), anyString(), anyMap());
verify(mockVendorPlugin, times(0)).configureIP(anyString(), anyString());
verify(mockVendorPlugin, times(0)).retrieveIPfilePath(anyString(),
anyString(), anyMap());
verify(mockVendorPlugin, times(0)).configureIP(anyString(),
any(FpgaDevice.class));
}
@Test
public void testStateStore() throws ResourceHandlerException, IOException {
public void testStateStore()
throws ResourceHandlerException, IOException {
// Case 1. store 3 devices
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, "0,1,2");
fpgaResourceHandler.bootstrap(configuration);
Container container0 = mockContainer(0, 3, "GEMM");
fpgaResourceHandler.preStart(container0);
List<FpgaResourceAllocator.FpgaDevice> assigned =
fpgaResourceHandler.getFpgaAllocator().getUsedFpga().get(getContainerId(0).toString());
fpgaResourceHandler
.getFpgaAllocator()
.getUsedFpga()
.get(getContainerId(0).toString());
verify(mockNMStateStore).storeAssignedResources(container0,
ResourceInformation.FPGA_URI,
new ArrayList<>(assigned));
@ -303,22 +360,26 @@ public void testStateStore() throws ResourceHandlerException, IOException {
@Test
public void testReacquireContainer() throws ResourceHandlerException {
Container c0 = mockContainer(0, 2, "GEMM");
List<FpgaResourceAllocator.FpgaDevice> assigned = new ArrayList<>();
assigned.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 0, null));
assigned.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 1, null));
assigned.add(new
FpgaResourceAllocator.FpgaDevice(vendorType, 247, 0, null));
assigned.add(new
FpgaResourceAllocator.FpgaDevice(vendorType, 247, 1, null));
// Mock we've stored the c0 states
mockStateStoreForContainer(c0, assigned);
// NM start
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, "0,1,2");
fpgaResourceHandler.bootstrap(configuration);
Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
Assert.assertEquals(0,
fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(3,
fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
// Case 1. try recover state for id-0 container
fpgaResourceHandler.reacquireContainer(getContainerId(0));
// minor number matches
List<FpgaResourceAllocator.FpgaDevice> used = fpgaResourceHandler.getFpgaAllocator().
List<FpgaResourceAllocator.FpgaDevice> used =
fpgaResourceHandler.getFpgaAllocator().
getUsedFpga().get(getContainerId(0).toString());
int count = 0;
for (FpgaResourceAllocator.FpgaDevice device : used) {
@ -330,21 +391,26 @@ public void testReacquireContainer() throws ResourceHandlerException {
}
}
Assert.assertEquals("Unexpected used minor number in allocator",2, count);
List<FpgaResourceAllocator.FpgaDevice> available = fpgaResourceHandler.getFpgaAllocator().
getAvailableFpga().get(vendorType);
List<FpgaResourceAllocator.FpgaDevice> available =
fpgaResourceHandler
.getFpgaAllocator()
.getAvailableFpga()
.get(vendorType);
count = 0;
for (FpgaResourceAllocator.FpgaDevice device : available) {
if (device.getMinor().equals(2)) {
count++;
}
}
Assert.assertEquals("Unexpected available minor number in allocator", 1, count);
Assert.assertEquals("Unexpected available minor number in allocator",
1, count);
// Case 2. Recover a not allowed device with minor number 5
Container c1 = mockContainer(1, 1, "GEMM");
assigned = new ArrayList<>();
assigned.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 5, null));
assigned.add(new
FpgaResourceAllocator.FpgaDevice(vendorType, 247, 5, null));
// Mock we've stored the c1 states
mockStateStoreForContainer(c1, assigned);
boolean flag = false;
@ -354,13 +420,16 @@ public void testReacquireContainer() throws ResourceHandlerException {
flag = true;
}
Assert.assertTrue(flag);
Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(1, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
Assert.assertEquals(2,
fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(1,
fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
// Case 3. recover a already used device by other container
Container c2 = mockContainer(2, 1, "GEMM");
assigned = new ArrayList<>();
assigned.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 1, null));
assigned.add(new
FpgaResourceAllocator.FpgaDevice(vendorType, 247, 1, null));
// Mock we've stored the c2 states
mockStateStoreForContainer(c2, assigned);
flag = false;
@ -370,18 +439,23 @@ public void testReacquireContainer() throws ResourceHandlerException {
flag = true;
}
Assert.assertTrue(flag);
Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(1, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
Assert.assertEquals(2,
fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(1,
fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
// Case 4. recover a normal container c3 with remaining minor device number 2
Container c3 = mockContainer(3, 1, "GEMM");
assigned = new ArrayList<>();
assigned.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 2, null));
assigned.add(new
FpgaResourceAllocator.FpgaDevice(vendorType, 247, 2, null));
// Mock we've stored the c2 states
mockStateStoreForContainer(c3, assigned);
fpgaResourceHandler.reacquireContainer(getContainerId(3));
Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
Assert.assertEquals(3,
fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
Assert.assertEquals(0,
fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
}
private void verifyDeniedDevices(ContainerId containerId,
@ -405,14 +479,16 @@ private void verifyDeniedDevices(ContainerId containerId,
}
}
private static IntelFpgaOpenclPlugin mockPlugin(String type, List<FpgaResourceAllocator.FpgaDevice> list) {
private static IntelFpgaOpenclPlugin mockPlugin(String type,
List<FpgaResourceAllocator.FpgaDevice> list) {
IntelFpgaOpenclPlugin plugin = mock(IntelFpgaOpenclPlugin.class);
when(plugin.initPlugin(Mockito.any())).thenReturn(true);
when(plugin.initPlugin(any())).thenReturn(true);
when(plugin.getFpgaType()).thenReturn(type);
when(plugin.downloadIP(Mockito.anyString(), Mockito.anyString(), Mockito.anyMap())).thenReturn("/tmp");
when(plugin.configureIP(Mockito.anyString(), Mockito.any()))
when(plugin.retrieveIPfilePath(anyString(),
anyString(), anyMap())).thenReturn("/tmp");
when(plugin.configureIP(anyString(), any()))
.thenReturn(true);
when(plugin.discover(Mockito.anyInt())).thenReturn(list);
when(plugin.discover(anyInt())).thenReturn(list);
return plugin;
}

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.List;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin.InnerShellExecutor;
import org.junit.Test;
/**
* Tests for AoclDiagnosticOutputParser.
*/
@SuppressWarnings("checkstyle:linelength")
public class TestAoclOutputParser {
@Test
public void testParsing() {
String output = "------------------------- acl0 -------------------------\n" +
"Vendor: Nallatech ltd\n" +
"Phys Dev Name Status Information\n" +
"aclnalla_pcie0Passed nalla_pcie (aclnalla_pcie0)\n" +
" PCIe dev_id = 2494, bus:slot.func = 02:00.00, Gen3 x8\n" +
" FPGA temperature = 53.1 degrees C.\n" +
" Total Card Power Usage = 31.7 Watts.\n" +
" Device Power Usage = 0.0 Watts.\n" +
"DIAGNOSTIC_PASSED" +
"---------------------------------------------------------\n";
output = output +
"------------------------- acl1 -------------------------\n" +
"Vendor: Nallatech ltd\n" +
"Phys Dev Name Status Information\n" +
"aclnalla_pcie1Passed nalla_pcie (aclnalla_pcie1)\n" +
" PCIe dev_id = 2495, bus:slot.func = 03:00.00, Gen3 x8\n" +
" FPGA temperature = 43.1 degrees C.\n" +
" Total Card Power Usage = 11.7 Watts.\n" +
" Device Power Usage = 0.0 Watts.\n" +
"DIAGNOSTIC_PASSED" +
"---------------------------------------------------------\n";
output = output +
"------------------------- acl2 -------------------------\n" +
"Vendor: Intel(R) Corporation\n" +
"\n" +
"Phys Dev Name Status Information\n" +
"\n" +
"acla10_ref0 Passed Arria 10 Reference Platform (acla10_ref0)\n" +
" PCIe dev_id = 2494, bus:slot.func = 09:00.00, Gen2 x8\n" +
" FPGA temperature = 50.5781 degrees C.\n" +
"\n" +
"DIAGNOSTIC_PASSED\n" +
"---------------------------------------------------------\n";
InnerShellExecutor shellExecutor = mock(InnerShellExecutor.class);
when(shellExecutor.getMajorAndMinorNumber("aclnalla_pcie0"))
.thenReturn("247:0");
when(shellExecutor.getMajorAndMinorNumber("aclnalla_pcie1"))
.thenReturn("247:1");
when(shellExecutor.getMajorAndMinorNumber("acla10_ref0"))
.thenReturn("246:0");
List<FpgaDevice> devices =
AoclDiagnosticOutputParser.parseDiagnosticOutput(
output, shellExecutor, "IntelOpenCL");
assertEquals(3, devices.size());
assertEquals("IntelOpenCL", devices.get(0).getType());
assertEquals("247", devices.get(0).getMajor().toString());
assertEquals("0", devices.get(0).getMinor().toString());
assertEquals("acl0", devices.get(0).getAliasDevName());
assertEquals("aclnalla_pcie0", devices.get(0).getDevName());
assertEquals("02:00.00", devices.get(0).getBusNum());
assertEquals("53.1 degrees C", devices.get(0).getTemperature());
assertEquals("31.7 Watts", devices.get(0).getCardPowerUsage());
assertEquals("IntelOpenCL", devices.get(1).getType());
assertEquals("247", devices.get(1).getMajor().toString());
assertEquals("1", devices.get(1).getMinor().toString());
assertEquals("acl1", devices.get(1).getAliasDevName());
assertEquals("aclnalla_pcie1", devices.get(1).getDevName());
assertEquals("03:00.00", devices.get(1).getBusNum());
assertEquals("43.1 degrees C", devices.get(1).getTemperature());
assertEquals("11.7 Watts", devices.get(1).getCardPowerUsage());
assertEquals("IntelOpenCL", devices.get(2).getType());
assertEquals("246", devices.get(2).getMajor().toString());
assertEquals("0", devices.get(2).getMinor().toString());
assertEquals("acl2", devices.get(2).getAliasDevName());
assertEquals("acla10_ref0", devices.get(2).getDevName());
assertEquals("09:00.00", devices.get(2).getBusNum());
assertEquals("50.5781 degrees C", devices.get(2).getTemperature());
assertEquals("", devices.get(2).getCardPowerUsage());
// Case 2. check alias map
assertEquals("acl0", devices.get(0).getAliasDevName());
assertEquals("acl1", devices.get(1).getAliasDevName());
assertEquals("acl2", devices.get(2).getAliasDevName());
}
}

View File

@ -20,17 +20,11 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileOutputStream;
@ -38,16 +32,20 @@
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class TestFpgaDiscoverer {
@Rule
@ -114,7 +112,7 @@ public void testLinuxFpgaResourceDiscoverPluginConfig() throws Exception {
// FpgaDiscoverer.getInstance().diagnose() work in openclPlugin.initPlugin()
discoverer.setResourceHanderPlugin(openclPlugin);
openclPlugin.initPlugin(conf);
openclPlugin.setShell(mockPuginShell());
openclPlugin.setInnerShellExecutor(mockPuginShell());
discoverer.initialize(conf);
// Case 1. No configuration set for binary(no environment "ALTERAOCLSDKROOT" set)
@ -157,89 +155,6 @@ public void testLinuxFpgaResourceDiscoverPluginConfig() throws Exception {
}
@Test
public void testDiscoverPluginParser() throws YarnException {
String output = "------------------------- acl0 -------------------------\n" +
"Vendor: Nallatech ltd\n" +
"Phys Dev Name Status Information\n" +
"aclnalla_pcie0Passed nalla_pcie (aclnalla_pcie0)\n" +
" PCIe dev_id = 2494, bus:slot.func = 02:00.00, Gen3 x8\n" +
" FPGA temperature = 53.1 degrees C.\n" +
" Total Card Power Usage = 31.7 Watts.\n" +
" Device Power Usage = 0.0 Watts.\n" +
"DIAGNOSTIC_PASSED" +
"---------------------------------------------------------\n";
output = output +
"------------------------- acl1 -------------------------\n" +
"Vendor: Nallatech ltd\n" +
"Phys Dev Name Status Information\n" +
"aclnalla_pcie1Passed nalla_pcie (aclnalla_pcie1)\n" +
" PCIe dev_id = 2495, bus:slot.func = 03:00.00, Gen3 x8\n" +
" FPGA temperature = 43.1 degrees C.\n" +
" Total Card Power Usage = 11.7 Watts.\n" +
" Device Power Usage = 0.0 Watts.\n" +
"DIAGNOSTIC_PASSED" +
"---------------------------------------------------------\n";
output = output +
"------------------------- acl2 -------------------------\n" +
"Vendor: Intel(R) Corporation\n" +
"\n" +
"Phys Dev Name Status Information\n" +
"\n" +
"acla10_ref0 Passed Arria 10 Reference Platform (acla10_ref0)\n" +
" PCIe dev_id = 2494, bus:slot.func = 09:00.00, Gen2 x8\n" +
" FPGA temperature = 50.5781 degrees C.\n" +
"\n" +
"DIAGNOSTIC_PASSED\n" +
"---------------------------------------------------------\n";
Configuration conf = new Configuration(false);
IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
FpgaDiscoverer.getInstance().setResourceHanderPlugin(openclPlugin);
openclPlugin.initPlugin(conf);
openclPlugin.setShell(mockPuginShell());
FpgaDiscoverer.getInstance().initialize(conf);
List<FpgaResourceAllocator.FpgaDevice> list = new LinkedList<>();
// Case 1. core parsing
openclPlugin.parseDiagnoseInfo(output, list);
assertEquals(3, list.size());
assertEquals("IntelOpenCL", list.get(0).getType());
assertEquals("247", list.get(0).getMajor().toString());
assertEquals("0", list.get(0).getMinor().toString());
assertEquals("acl0", list.get(0).getAliasDevName());
assertEquals("aclnalla_pcie0", list.get(0).getDevName());
assertEquals("02:00.00", list.get(0).getBusNum());
assertEquals("53.1 degrees C", list.get(0).getTemperature());
assertEquals("31.7 Watts", list.get(0).getCardPowerUsage());
assertEquals("IntelOpenCL", list.get(1).getType());
assertEquals("247", list.get(1).getMajor().toString());
assertEquals("1", list.get(1).getMinor().toString());
assertEquals("acl1", list.get(1).getAliasDevName());
assertEquals("aclnalla_pcie1", list.get(1).getDevName());
assertEquals("03:00.00", list.get(1).getBusNum());
assertEquals("43.1 degrees C", list.get(1).getTemperature());
assertEquals("11.7 Watts", list.get(1).getCardPowerUsage());
assertEquals("IntelOpenCL", list.get(2).getType());
assertEquals("246", list.get(2).getMajor().toString());
assertEquals("0", list.get(2).getMinor().toString());
assertEquals("acl2", list.get(2).getAliasDevName());
assertEquals("acla10_ref0", list.get(2).getDevName());
assertEquals("09:00.00", list.get(2).getBusNum());
assertEquals("50.5781 degrees C", list.get(2).getTemperature());
assertEquals("", list.get(2).getCardPowerUsage());
// Case 2. check alias map
Map<String, String> aliasMap = openclPlugin.getAliasMap();
assertEquals("acl0", aliasMap.get("247:0"));
assertEquals("acl1", aliasMap.get("247:1"));
assertEquals("acl2", aliasMap.get("246:0"));
}
@Test
public void testDiscoveryWhenAvailableDevicesDefined()
throws YarnException {
@ -251,7 +166,7 @@ public void testDiscoveryWhenAvailableDevicesDefined()
IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
discoverer.setResourceHanderPlugin(openclPlugin);
openclPlugin.initPlugin(conf);
openclPlugin.setShell(mockPuginShell());
openclPlugin.setInnerShellExecutor(mockPuginShell());
discoverer.initialize(conf);
List<FpgaDevice> devices = discoverer.discover();
@ -282,7 +197,7 @@ public void testDiscoveryWhenAvailableDevicesEmpty()
IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
discoverer.setResourceHanderPlugin(openclPlugin);
openclPlugin.initPlugin(conf);
openclPlugin.setShell(mockPuginShell());
openclPlugin.setInnerShellExecutor(mockPuginShell());
discoverer.initialize(conf);
discoverer.discover();
@ -302,7 +217,7 @@ public void testDiscoveryWhenAvailableDevicesAreIllegalString()
IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
discoverer.setResourceHanderPlugin(openclPlugin);
openclPlugin.initPlugin(conf);
openclPlugin.setShell(mockPuginShell());
openclPlugin.setInnerShellExecutor(mockPuginShell());
discoverer.initialize(conf);
discoverer.discover();
@ -319,7 +234,7 @@ public void testDiscoveryWhenExternalScriptDefined()
IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
discoverer.setResourceHanderPlugin(openclPlugin);
openclPlugin.initPlugin(conf);
openclPlugin.setShell(mockPuginShell());
openclPlugin.setInnerShellExecutor(mockPuginShell());
discoverer.setScriptRunner(s -> {
return Optional.of("acl0/243:0,acl1/244:1"); });
@ -352,7 +267,7 @@ public void testDiscoveryWhenExternalScriptReturnsEmptyString()
IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
discoverer.setResourceHanderPlugin(openclPlugin);
openclPlugin.initPlugin(conf);
openclPlugin.setShell(mockPuginShell());
openclPlugin.setInnerShellExecutor(mockPuginShell());
discoverer.setScriptRunner(s -> {
return Optional.of(""); });
@ -361,7 +276,6 @@ public void testDiscoveryWhenExternalScriptReturnsEmptyString()
}
@Test
public void testDiscoveryWhenExternalScriptFails()
throws YarnException {
expected.expect(ResourceHandlerException.class);
@ -375,7 +289,7 @@ public void testDiscoveryWhenExternalScriptFails()
IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
discoverer.setResourceHanderPlugin(openclPlugin);
openclPlugin.initPlugin(conf);
openclPlugin.setShell(mockPuginShell());
openclPlugin.setInnerShellExecutor(mockPuginShell());
discoverer.setScriptRunner(s -> {
return Optional.empty(); });
@ -396,7 +310,7 @@ public void testDiscoveryWhenExternalScriptUndefined()
IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
discoverer.setResourceHanderPlugin(openclPlugin);
openclPlugin.initPlugin(conf);
openclPlugin.setShell(mockPuginShell());
openclPlugin.setInnerShellExecutor(mockPuginShell());
discoverer.initialize(conf);
discoverer.discover();
@ -421,7 +335,7 @@ public void testDiscoveryWhenExternalScriptCannotBeExecuted()
IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
discoverer.setResourceHanderPlugin(openclPlugin);
openclPlugin.initPlugin(conf);
openclPlugin.setShell(mockPuginShell());
openclPlugin.setInnerShellExecutor(mockPuginShell());
discoverer.initialize(conf);
discoverer.discover();