YARN-9267. General improvements in FpgaResourceHandlerImpl. Contributed by Peter Bacsko.
This commit is contained in:
parent
9f1c017f44
commit
a99eb80659
|
@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDiscoverer;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
@ -143,6 +142,8 @@ public class FpgaResourceAllocator {
|
||||||
private Integer minor;
|
private Integer minor;
|
||||||
// IP file identifier. matrix multiplication for instance
|
// IP file identifier. matrix multiplication for instance
|
||||||
private String IPID;
|
private String IPID;
|
||||||
|
// SHA-256 hash of the uploaded aocx file
|
||||||
|
private String aocxHash;
|
||||||
// the device name under /dev
|
// the device name under /dev
|
||||||
private String devName;
|
private String devName;
|
||||||
// the alias device name. Intel use acl number acl0 to acl31
|
// the alias device name. Intel use acl number acl0 to acl31
|
||||||
|
@ -168,6 +169,14 @@ public class FpgaResourceAllocator {
|
||||||
return IPID;
|
return IPID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getAocxHash() {
|
||||||
|
return aocxHash;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAocxHash(String hash) {
|
||||||
|
this.aocxHash = hash;
|
||||||
|
}
|
||||||
|
|
||||||
public void setIPID(String IPID) {
|
public void setIPID(String IPID) {
|
||||||
this.IPID = IPID;
|
this.IPID = IPID;
|
||||||
}
|
}
|
||||||
|
@ -263,7 +272,8 @@ public class FpgaResourceAllocator {
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "FPGA Device:(Type: " + this.type + ", Major: " +
|
return "FPGA Device:(Type: " + this.type + ", Major: " +
|
||||||
this.major + ", Minor: " + this.minor + ", IPID: " + this.IPID + ")";
|
this.major + ", Minor: " + this.minor + ", IPID: " +
|
||||||
|
this.IPID + ", Hash: " + this.aocxHash + ")";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -279,11 +289,14 @@ public class FpgaResourceAllocator {
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void updateFpga(String requestor,
|
public synchronized void updateFpga(String requestor,
|
||||||
FpgaDevice device, String newIPID) {
|
FpgaDevice device, String newIPID, String newHash) {
|
||||||
List<FpgaDevice> usedFpgas = usedFpgaByRequestor.get(requestor);
|
List<FpgaDevice> usedFpgas = usedFpgaByRequestor.get(requestor);
|
||||||
int index = findMatchedFpga(usedFpgas, device);
|
int index = findMatchedFpga(usedFpgas, device);
|
||||||
if (-1 != index) {
|
if (-1 != index) {
|
||||||
usedFpgas.get(index).setIPID(newIPID);
|
usedFpgas.get(index).setIPID(newIPID);
|
||||||
|
FpgaDevice fpga = usedFpgas.get(index);
|
||||||
|
fpga.setIPID(newIPID);
|
||||||
|
fpga.setAocxHash(newHash);
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Failed to update FPGA due to unknown reason " +
|
LOG.warn("Failed to update FPGA due to unknown reason " +
|
||||||
"that no record for this allocated device:" + device);
|
"that no record for this allocated device:" + device);
|
||||||
|
@ -307,12 +320,12 @@ public class FpgaResourceAllocator {
|
||||||
* @param type vendor plugin supported FPGA device type
|
* @param type vendor plugin supported FPGA device type
|
||||||
* @param count requested FPGA slot count
|
* @param count requested FPGA slot count
|
||||||
* @param container container id
|
* @param container container id
|
||||||
* @param IPIDPreference allocate slot with this IPID first
|
* @param ipidHash hash of the localized aocx file
|
||||||
* @return Instance consists two List of allowed and denied {@link FpgaDevice}
|
* @return Instance consists two List of allowed and denied {@link FpgaDevice}
|
||||||
* @throws ResourceHandlerException When failed to allocate or write state store
|
* @throws ResourceHandlerException When failed to allocate or write state store
|
||||||
* */
|
* */
|
||||||
public synchronized FpgaAllocation assignFpga(String type, long count,
|
public synchronized FpgaAllocation assignFpga(String type, long count,
|
||||||
Container container, String IPIDPreference) throws ResourceHandlerException {
|
Container container, String ipidHash) throws ResourceHandlerException {
|
||||||
List<FpgaDevice> currentAvailableFpga = availableFpga.get(type);
|
List<FpgaDevice> currentAvailableFpga = availableFpga.get(type);
|
||||||
String requestor = container.getContainerId().toString();
|
String requestor = container.getContainerId().toString();
|
||||||
if (null == currentAvailableFpga) {
|
if (null == currentAvailableFpga) {
|
||||||
|
@ -327,8 +340,9 @@ public class FpgaResourceAllocator {
|
||||||
List<FpgaDevice> assignedFpgas = new LinkedList<>();
|
List<FpgaDevice> assignedFpgas = new LinkedList<>();
|
||||||
int matchIPCount = 0;
|
int matchIPCount = 0;
|
||||||
for (int i = 0; i < currentAvailableFpga.size(); i++) {
|
for (int i = 0; i < currentAvailableFpga.size(); i++) {
|
||||||
if ( null != currentAvailableFpga.get(i).getIPID() &&
|
String deviceIPIDhash = currentAvailableFpga.get(i).getAocxHash();
|
||||||
currentAvailableFpga.get(i).getIPID().equalsIgnoreCase(IPIDPreference)) {
|
if (deviceIPIDhash != null &&
|
||||||
|
deviceIPIDhash.equalsIgnoreCase(ipidHash)) {
|
||||||
assignedFpgas.add(currentAvailableFpga.get(i));
|
assignedFpgas.add(currentAvailableFpga.get(i));
|
||||||
currentAvailableFpga.remove(i);
|
currentAvailableFpga.remove(i);
|
||||||
matchIPCount++;
|
matchIPCount++;
|
||||||
|
|
|
@ -19,9 +19,15 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
import java.io.FileInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -39,19 +45,16 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resource
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.AbstractFpgaVendorPlugin;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.AbstractFpgaVendorPlugin;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDiscoverer;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDiscoverer;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
|
|
||||||
|
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class FpgaResourceHandlerImpl implements ResourceHandler {
|
public class FpgaResourceHandlerImpl implements ResourceHandler {
|
||||||
|
private static final Logger LOG =
|
||||||
static final Logger LOG = LoggerFactory.
|
LoggerFactory.getLogger(FpgaResourceHandlerImpl.class);
|
||||||
getLogger(FpgaResourceHandlerImpl.class);
|
|
||||||
|
|
||||||
private final String REQUEST_FPGA_IP_ID_KEY = "REQUESTED_FPGA_IP_ID";
|
private final String REQUEST_FPGA_IP_ID_KEY = "REQUESTED_FPGA_IP_ID";
|
||||||
|
|
||||||
|
@ -78,14 +81,13 @@ public class FpgaResourceHandlerImpl implements ResourceHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public FpgaResourceAllocator getFpgaAllocator() {
|
FpgaResourceAllocator getFpgaAllocator() {
|
||||||
return allocator;
|
return allocator;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getRequestedIPID(Container container) {
|
public String getRequestedIPID(Container container) {
|
||||||
String r= container.getLaunchContext().getEnvironment().
|
return container.getLaunchContext().getEnvironment().
|
||||||
get(REQUEST_FPGA_IP_ID_KEY);
|
get(REQUEST_FPGA_IP_ID_KEY);
|
||||||
return r == null ? "" : r;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -125,9 +127,22 @@ public class FpgaResourceHandlerImpl implements ResourceHandler {
|
||||||
try {
|
try {
|
||||||
|
|
||||||
// allocate even request 0 FPGA because we need to deny all device numbers for this container
|
// allocate even request 0 FPGA because we need to deny all device numbers for this container
|
||||||
|
final String requestedIPID = getRequestedIPID(container);
|
||||||
|
String localizedIPIDHash = null;
|
||||||
|
ipFilePath = vendorPlugin.retrieveIPfilePath(
|
||||||
|
requestedIPID, container.getWorkDir(),
|
||||||
|
container.getResourceSet().getLocalizedResources());
|
||||||
|
if (ipFilePath != null) {
|
||||||
|
try (FileInputStream fis = new FileInputStream(ipFilePath)) {
|
||||||
|
localizedIPIDHash = DigestUtils.sha256Hex(fis);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ResourceHandlerException("Could not calculate SHA-256", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
FpgaResourceAllocator.FpgaAllocation allocation = allocator.assignFpga(
|
FpgaResourceAllocator.FpgaAllocation allocation = allocator.assignFpga(
|
||||||
vendorPlugin.getFpgaType(), deviceCount,
|
vendorPlugin.getFpgaType(), deviceCount,
|
||||||
container, getRequestedIPID(container));
|
container, localizedIPIDHash);
|
||||||
LOG.info("FpgaAllocation:" + allocation);
|
LOG.info("FpgaAllocation:" + allocation);
|
||||||
|
|
||||||
PrivilegedOperation privilegedOperation =
|
PrivilegedOperation privilegedOperation =
|
||||||
|
@ -173,18 +188,18 @@ public class FpgaResourceHandlerImpl implements ResourceHandler {
|
||||||
for (int i = 0; i < allowed.size(); i++) {
|
for (int i = 0; i < allowed.size(); i++) {
|
||||||
FpgaDevice device = allowed.get(i);
|
FpgaDevice device = allowed.get(i);
|
||||||
majorMinorNumber = device.getMajor() + ":" + device.getMinor();
|
majorMinorNumber = device.getMajor() + ":" + device.getMinor();
|
||||||
String currentIPID = device.getIPID();
|
String currentHash = allowed.get(i).getAocxHash();
|
||||||
if (null != currentIPID &&
|
if (currentHash != null &&
|
||||||
currentIPID.equalsIgnoreCase(getRequestedIPID(container))) {
|
currentHash.equalsIgnoreCase(localizedIPIDHash)) {
|
||||||
LOG.info("IP already in device \"" +
|
LOG.info("IP already in device \""
|
||||||
allowed.get(i).getAliasDevName() +
|
+ allowed.get(i).getAliasDevName() + "," +
|
||||||
"," + majorMinorNumber + "\", skip reprogramming");
|
majorMinorNumber + "\", skip reprogramming");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (vendorPlugin.configureIP(ipFilePath, device)) {
|
if (vendorPlugin.configureIP(ipFilePath, device)) {
|
||||||
// update the allocator that we update an IP of a device
|
// update the allocator that we update an IP of a device
|
||||||
allocator.updateFpga(containerIdStr, allowed.get(i),
|
allocator.updateFpga(containerIdStr, allowed.get(i),
|
||||||
getRequestedIPID(container));
|
requestedIPID, localizedIPIDHash);
|
||||||
//TODO: update the node constraint label
|
//TODO: update the node constraint label
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.anyInt;
|
import static org.mockito.ArgumentMatchers.anyInt;
|
||||||
import static org.mockito.ArgumentMatchers.anyList;
|
import static org.mockito.ArgumentMatchers.anyList;
|
||||||
|
@ -31,7 +32,9 @@ import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -62,11 +65,23 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
|
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
|
import com.google.common.io.Files;
|
||||||
|
|
||||||
public class TestFpgaResourceHandler {
|
public class TestFpgaResourceHandler {
|
||||||
|
@Rule
|
||||||
|
public ExpectedException expected = ExpectedException.none();
|
||||||
|
|
||||||
|
private static final String HASHABLE_STRING = "abcdef";
|
||||||
|
private static final String EXPECTED_HASH =
|
||||||
|
"bef57ec7f53a6d40beb640a780a639c83bc29ac8a9816f1fc6c5c6dcd93c4721";
|
||||||
|
|
||||||
private Context mockContext;
|
private Context mockContext;
|
||||||
private FpgaResourceHandlerImpl fpgaResourceHandler;
|
private FpgaResourceHandlerImpl fpgaResourceHandler;
|
||||||
private Configuration configuration;
|
private Configuration configuration;
|
||||||
|
@ -75,10 +90,17 @@ public class TestFpgaResourceHandler {
|
||||||
private NMStateStoreService mockNMStateStore;
|
private NMStateStoreService mockNMStateStore;
|
||||||
private ConcurrentHashMap<ContainerId, Container> runningContainersMap;
|
private ConcurrentHashMap<ContainerId, Container> runningContainersMap;
|
||||||
private IntelFpgaOpenclPlugin mockVendorPlugin;
|
private IntelFpgaOpenclPlugin mockVendorPlugin;
|
||||||
|
private List<FpgaDevice> deviceList;
|
||||||
private static final String vendorType = "IntelOpenCL";
|
private static final String vendorType = "IntelOpenCL";
|
||||||
|
private File dummyAocx;
|
||||||
|
|
||||||
|
private String getTestParentFolder() {
|
||||||
|
File f = new File("target/temp/" + TestFpgaResourceHandler.class.getName());
|
||||||
|
return f.getAbsolutePath();
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() throws IOException {
|
||||||
CustomResourceTypesConfigurationProvider.
|
CustomResourceTypesConfigurationProvider.
|
||||||
initResourceTypes(ResourceInformation.FPGA_URI);
|
initResourceTypes(ResourceInformation.FPGA_URI);
|
||||||
configuration = new YarnConfiguration();
|
configuration = new YarnConfiguration();
|
||||||
|
@ -88,13 +110,12 @@ public class TestFpgaResourceHandler {
|
||||||
mockNMStateStore = mock(NMStateStoreService.class);
|
mockNMStateStore = mock(NMStateStoreService.class);
|
||||||
mockContext = mock(Context.class);
|
mockContext = mock(Context.class);
|
||||||
// Assumed devices parsed from output
|
// Assumed devices parsed from output
|
||||||
List<FpgaResourceAllocator.FpgaDevice> list = new ArrayList<>();
|
deviceList = new ArrayList<>();
|
||||||
list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 0, null));
|
for (int i = 0; i < 5; i++) {
|
||||||
list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 1, null));
|
deviceList.add(new FpgaDevice(vendorType, 247, i, null));
|
||||||
list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 2, null));
|
}
|
||||||
list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 3, null));
|
String aocxPath = getTestParentFolder() + "/test.aocx";
|
||||||
list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 4, null));
|
mockVendorPlugin = mockPlugin(vendorType, deviceList, aocxPath);
|
||||||
mockVendorPlugin = mockPlugin(vendorType, list);
|
|
||||||
FpgaDiscoverer.getInstance().setConf(configuration);
|
FpgaDiscoverer.getInstance().setConf(configuration);
|
||||||
when(mockContext.getNMStateStore()).thenReturn(mockNMStateStore);
|
when(mockContext.getNMStateStore()).thenReturn(mockNMStateStore);
|
||||||
runningContainersMap = new ConcurrentHashMap<>();
|
runningContainersMap = new ConcurrentHashMap<>();
|
||||||
|
@ -102,6 +123,18 @@ public class TestFpgaResourceHandler {
|
||||||
|
|
||||||
fpgaResourceHandler = new FpgaResourceHandlerImpl(mockContext,
|
fpgaResourceHandler = new FpgaResourceHandlerImpl(mockContext,
|
||||||
mockCGroupsHandler, mockPrivilegedExecutor, mockVendorPlugin);
|
mockCGroupsHandler, mockPrivilegedExecutor, mockVendorPlugin);
|
||||||
|
|
||||||
|
dummyAocx = new File(aocxPath);
|
||||||
|
Files.createParentDirs(dummyAocx);
|
||||||
|
Files.touch(dummyAocx);
|
||||||
|
Files.append(HASHABLE_STRING, dummyAocx, StandardCharsets.UTF_8);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void teardown() {
|
||||||
|
if (dummyAocx != null) {
|
||||||
|
dummyAocx.delete();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -125,11 +158,11 @@ public class TestFpgaResourceHandler {
|
||||||
fpgaResourceHandler.bootstrap(configuration);
|
fpgaResourceHandler.bootstrap(configuration);
|
||||||
Assert.assertEquals(3,
|
Assert.assertEquals(3,
|
||||||
fpgaResourceHandler.getFpgaAllocator().getAllowedFpga().size());
|
fpgaResourceHandler.getFpgaAllocator().getAllowedFpga().size());
|
||||||
List<FpgaResourceAllocator.FpgaDevice> allowedDevices =
|
List<FpgaDevice> allowedDevices =
|
||||||
fpgaResourceHandler.getFpgaAllocator().getAllowedFpga();
|
fpgaResourceHandler.getFpgaAllocator().getAllowedFpga();
|
||||||
for (String s : allowed.split(",")) {
|
for (String s : allowed.split(",")) {
|
||||||
boolean check = false;
|
boolean check = false;
|
||||||
for (FpgaResourceAllocator.FpgaDevice device : allowedDevices) {
|
for (FpgaDevice device : allowedDevices) {
|
||||||
if (device.getMinor().toString().equals(s)) {
|
if (device.getMinor().toString().equals(s)) {
|
||||||
check = true;
|
check = true;
|
||||||
}
|
}
|
||||||
|
@ -212,9 +245,9 @@ public class TestFpgaResourceHandler {
|
||||||
fpgaResourceHandler.preStart(mockContainer(0, 1, "GEMM"));
|
fpgaResourceHandler.preStart(mockContainer(0, 1, "GEMM"));
|
||||||
Assert.assertEquals(1, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
|
Assert.assertEquals(1, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
|
||||||
verifyDeniedDevices(getContainerId(0), Arrays.asList(1, 2));
|
verifyDeniedDevices(getContainerId(0), Arrays.asList(1, 2));
|
||||||
List<FpgaResourceAllocator.FpgaDevice> list = fpgaResourceHandler.getFpgaAllocator()
|
List<FpgaDevice> list = fpgaResourceHandler.getFpgaAllocator()
|
||||||
.getUsedFpga().get(getContainerId(0).toString());
|
.getUsedFpga().get(getContainerId(0).toString());
|
||||||
for (FpgaResourceAllocator.FpgaDevice device : list) {
|
for (FpgaDevice device : list) {
|
||||||
Assert.assertEquals("IP should be updated to GEMM", "GEMM", device.getIPID());
|
Assert.assertEquals("IP should be updated to GEMM", "GEMM", device.getIPID());
|
||||||
}
|
}
|
||||||
// Case 2. The id-1 container request 3 FPGA of IntelOpenCL and GEMM IP. this should fail
|
// Case 2. The id-1 container request 3 FPGA of IntelOpenCL and GEMM IP. this should fail
|
||||||
|
@ -254,7 +287,7 @@ public class TestFpgaResourceHandler {
|
||||||
// IPID should be GEMM for id-3 container
|
// IPID should be GEMM for id-3 container
|
||||||
list = fpgaResourceHandler.getFpgaAllocator()
|
list = fpgaResourceHandler.getFpgaAllocator()
|
||||||
.getUsedFpga().get(getContainerId(3).toString());
|
.getUsedFpga().get(getContainerId(3).toString());
|
||||||
for (FpgaResourceAllocator.FpgaDevice device : list) {
|
for (FpgaDevice device : list) {
|
||||||
Assert.assertEquals("IPID should be GEMM", "GEMM", device.getIPID());
|
Assert.assertEquals("IPID should be GEMM", "GEMM", device.getIPID());
|
||||||
}
|
}
|
||||||
Assert.assertEquals(2,
|
Assert.assertEquals(2,
|
||||||
|
@ -286,20 +319,21 @@ public class TestFpgaResourceHandler {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testsAllocationWithExistingIPIDDevices()
|
public void testsAllocationWithExistingIPIDDevices()
|
||||||
throws ResourceHandlerException, PrivilegedOperationException {
|
throws ResourceHandlerException, PrivilegedOperationException,
|
||||||
|
IOException {
|
||||||
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, "0,1,2");
|
configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, "0,1,2");
|
||||||
fpgaResourceHandler.bootstrap(configuration);
|
fpgaResourceHandler.bootstrap(configuration);
|
||||||
// The id-0 container request 3 FPGA of IntelOpenCL type and GEMM IP
|
// The id-0 container request 3 FPGA of IntelOpenCL type and GEMM IP
|
||||||
fpgaResourceHandler.preStart(mockContainer(0, 3, "GEMM"));
|
fpgaResourceHandler.preStart(mockContainer(0, 3, "GEMM"));
|
||||||
Assert.assertEquals(3,
|
Assert.assertEquals(3,
|
||||||
fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
|
fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
|
||||||
List<FpgaResourceAllocator.FpgaDevice> list =
|
List<FpgaDevice> list =
|
||||||
fpgaResourceHandler
|
fpgaResourceHandler
|
||||||
.getFpgaAllocator()
|
.getFpgaAllocator()
|
||||||
.getUsedFpga()
|
.getUsedFpga()
|
||||||
.get(getContainerId(0).toString());
|
.get(getContainerId(0).toString());
|
||||||
fpgaResourceHandler.postComplete(getContainerId(0));
|
fpgaResourceHandler.postComplete(getContainerId(0));
|
||||||
for (FpgaResourceAllocator.FpgaDevice device : list) {
|
for (FpgaDevice device : list) {
|
||||||
Assert.assertEquals("IP should be updated to GEMM", "GEMM",
|
Assert.assertEquals("IP should be updated to GEMM", "GEMM",
|
||||||
device.getIPID());
|
device.getIPID());
|
||||||
}
|
}
|
||||||
|
@ -314,6 +348,8 @@ public class TestFpgaResourceHandler {
|
||||||
fpgaResourceHandler.postComplete(getContainerId(2));
|
fpgaResourceHandler.postComplete(getContainerId(2));
|
||||||
|
|
||||||
// Case 2. id-2 container request preStart, with 1 plugin.configureIP called
|
// Case 2. id-2 container request preStart, with 1 plugin.configureIP called
|
||||||
|
// Add some characters to the dummy file to have its hash changed
|
||||||
|
Files.append("12345", dummyAocx, StandardCharsets.UTF_8);
|
||||||
fpgaResourceHandler.preStart(mockContainer(1, 1, "GZIP"));
|
fpgaResourceHandler.preStart(mockContainer(1, 1, "GZIP"));
|
||||||
// we should have 4 times invocation
|
// we should have 4 times invocation
|
||||||
verify(mockVendorPlugin, times(4)).configureIP(anyString(),
|
verify(mockVendorPlugin, times(4)).configureIP(anyString(),
|
||||||
|
@ -342,7 +378,7 @@ public class TestFpgaResourceHandler {
|
||||||
fpgaResourceHandler.bootstrap(configuration);
|
fpgaResourceHandler.bootstrap(configuration);
|
||||||
Container container0 = mockContainer(0, 3, "GEMM");
|
Container container0 = mockContainer(0, 3, "GEMM");
|
||||||
fpgaResourceHandler.preStart(container0);
|
fpgaResourceHandler.preStart(container0);
|
||||||
List<FpgaResourceAllocator.FpgaDevice> assigned =
|
List<FpgaDevice> assigned =
|
||||||
fpgaResourceHandler
|
fpgaResourceHandler
|
||||||
.getFpgaAllocator()
|
.getFpgaAllocator()
|
||||||
.getUsedFpga()
|
.getUsedFpga()
|
||||||
|
@ -361,11 +397,11 @@ public class TestFpgaResourceHandler {
|
||||||
@Test
|
@Test
|
||||||
public void testReacquireContainer() throws ResourceHandlerException {
|
public void testReacquireContainer() throws ResourceHandlerException {
|
||||||
Container c0 = mockContainer(0, 2, "GEMM");
|
Container c0 = mockContainer(0, 2, "GEMM");
|
||||||
List<FpgaResourceAllocator.FpgaDevice> assigned = new ArrayList<>();
|
List<FpgaDevice> assigned = new ArrayList<>();
|
||||||
assigned.add(new
|
assigned.add(new
|
||||||
FpgaResourceAllocator.FpgaDevice(vendorType, 247, 0, null));
|
FpgaDevice(vendorType, 247, 0, null));
|
||||||
assigned.add(new
|
assigned.add(new
|
||||||
FpgaResourceAllocator.FpgaDevice(vendorType, 247, 1, null));
|
FpgaDevice(vendorType, 247, 1, null));
|
||||||
// Mock we've stored the c0 states
|
// Mock we've stored the c0 states
|
||||||
mockStateStoreForContainer(c0, assigned);
|
mockStateStoreForContainer(c0, assigned);
|
||||||
// NM start
|
// NM start
|
||||||
|
@ -378,11 +414,11 @@ public class TestFpgaResourceHandler {
|
||||||
// Case 1. try recover state for id-0 container
|
// Case 1. try recover state for id-0 container
|
||||||
fpgaResourceHandler.reacquireContainer(getContainerId(0));
|
fpgaResourceHandler.reacquireContainer(getContainerId(0));
|
||||||
// minor number matches
|
// minor number matches
|
||||||
List<FpgaResourceAllocator.FpgaDevice> used =
|
List<FpgaDevice> used =
|
||||||
fpgaResourceHandler.getFpgaAllocator().
|
fpgaResourceHandler.getFpgaAllocator().
|
||||||
getUsedFpga().get(getContainerId(0).toString());
|
getUsedFpga().get(getContainerId(0).toString());
|
||||||
int count = 0;
|
int count = 0;
|
||||||
for (FpgaResourceAllocator.FpgaDevice device : used) {
|
for (FpgaDevice device : used) {
|
||||||
if (device.getMinor().equals(0)){
|
if (device.getMinor().equals(0)){
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
|
@ -391,13 +427,13 @@ public class TestFpgaResourceHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Assert.assertEquals("Unexpected used minor number in allocator",2, count);
|
Assert.assertEquals("Unexpected used minor number in allocator",2, count);
|
||||||
List<FpgaResourceAllocator.FpgaDevice> available =
|
List<FpgaDevice> available =
|
||||||
fpgaResourceHandler
|
fpgaResourceHandler
|
||||||
.getFpgaAllocator()
|
.getFpgaAllocator()
|
||||||
.getAvailableFpga()
|
.getAvailableFpga()
|
||||||
.get(vendorType);
|
.get(vendorType);
|
||||||
count = 0;
|
count = 0;
|
||||||
for (FpgaResourceAllocator.FpgaDevice device : available) {
|
for (FpgaDevice device : available) {
|
||||||
if (device.getMinor().equals(2)) {
|
if (device.getMinor().equals(2)) {
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
|
@ -410,7 +446,7 @@ public class TestFpgaResourceHandler {
|
||||||
Container c1 = mockContainer(1, 1, "GEMM");
|
Container c1 = mockContainer(1, 1, "GEMM");
|
||||||
assigned = new ArrayList<>();
|
assigned = new ArrayList<>();
|
||||||
assigned.add(new
|
assigned.add(new
|
||||||
FpgaResourceAllocator.FpgaDevice(vendorType, 247, 5, null));
|
FpgaDevice(vendorType, 247, 5, null));
|
||||||
// Mock we've stored the c1 states
|
// Mock we've stored the c1 states
|
||||||
mockStateStoreForContainer(c1, assigned);
|
mockStateStoreForContainer(c1, assigned);
|
||||||
boolean flag = false;
|
boolean flag = false;
|
||||||
|
@ -429,7 +465,7 @@ public class TestFpgaResourceHandler {
|
||||||
Container c2 = mockContainer(2, 1, "GEMM");
|
Container c2 = mockContainer(2, 1, "GEMM");
|
||||||
assigned = new ArrayList<>();
|
assigned = new ArrayList<>();
|
||||||
assigned.add(new
|
assigned.add(new
|
||||||
FpgaResourceAllocator.FpgaDevice(vendorType, 247, 1, null));
|
FpgaDevice(vendorType, 247, 1, null));
|
||||||
// Mock we've stored the c2 states
|
// Mock we've stored the c2 states
|
||||||
mockStateStoreForContainer(c2, assigned);
|
mockStateStoreForContainer(c2, assigned);
|
||||||
flag = false;
|
flag = false;
|
||||||
|
@ -448,7 +484,7 @@ public class TestFpgaResourceHandler {
|
||||||
Container c3 = mockContainer(3, 1, "GEMM");
|
Container c3 = mockContainer(3, 1, "GEMM");
|
||||||
assigned = new ArrayList<>();
|
assigned = new ArrayList<>();
|
||||||
assigned.add(new
|
assigned.add(new
|
||||||
FpgaResourceAllocator.FpgaDevice(vendorType, 247, 2, null));
|
FpgaDevice(vendorType, 247, 2, null));
|
||||||
// Mock we've stored the c2 states
|
// Mock we've stored the c2 states
|
||||||
mockStateStoreForContainer(c3, assigned);
|
mockStateStoreForContainer(c3, assigned);
|
||||||
fpgaResourceHandler.reacquireContainer(getContainerId(3));
|
fpgaResourceHandler.reacquireContainer(getContainerId(3));
|
||||||
|
@ -458,6 +494,33 @@ public class TestFpgaResourceHandler {
|
||||||
fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
|
fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSha256CalculationFails() throws ResourceHandlerException {
|
||||||
|
expected.expect(ResourceHandlerException.class);
|
||||||
|
expected.expectMessage("Could not calculate SHA-256");
|
||||||
|
|
||||||
|
dummyAocx.delete();
|
||||||
|
fpgaResourceHandler.preStart(mockContainer(0, 1, "GEMM"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSha256CalculationSucceeds()
|
||||||
|
throws IOException, ResourceHandlerException {
|
||||||
|
mockVendorPlugin =
|
||||||
|
mockPlugin(vendorType, deviceList, dummyAocx.getAbsolutePath());
|
||||||
|
fpgaResourceHandler = new FpgaResourceHandlerImpl(mockContext,
|
||||||
|
mockCGroupsHandler, mockPrivilegedExecutor, mockVendorPlugin);
|
||||||
|
|
||||||
|
fpgaResourceHandler.bootstrap(configuration);
|
||||||
|
fpgaResourceHandler.preStart(mockContainer(0, 1, "GEMM"));
|
||||||
|
|
||||||
|
// IP file is assigned to the first device
|
||||||
|
List<FpgaDevice> devices =
|
||||||
|
fpgaResourceHandler.getFpgaAllocator().getAllowedFpga();
|
||||||
|
FpgaDevice device = devices.get(0);
|
||||||
|
assertEquals("Hash value", EXPECTED_HASH, device.getAocxHash());
|
||||||
|
}
|
||||||
|
|
||||||
private void verifyDeniedDevices(ContainerId containerId,
|
private void verifyDeniedDevices(ContainerId containerId,
|
||||||
List<Integer> deniedDevices)
|
List<Integer> deniedDevices)
|
||||||
throws ResourceHandlerException, PrivilegedOperationException {
|
throws ResourceHandlerException, PrivilegedOperationException {
|
||||||
|
@ -480,19 +543,18 @@ public class TestFpgaResourceHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static IntelFpgaOpenclPlugin mockPlugin(String type,
|
private static IntelFpgaOpenclPlugin mockPlugin(String type,
|
||||||
List<FpgaResourceAllocator.FpgaDevice> list) {
|
List<FpgaDevice> list, String aocxPath) {
|
||||||
IntelFpgaOpenclPlugin plugin = mock(IntelFpgaOpenclPlugin.class);
|
IntelFpgaOpenclPlugin plugin = mock(IntelFpgaOpenclPlugin.class);
|
||||||
when(plugin.initPlugin(any())).thenReturn(true);
|
when(plugin.initPlugin(any())).thenReturn(true);
|
||||||
when(plugin.getFpgaType()).thenReturn(type);
|
when(plugin.getFpgaType()).thenReturn(type);
|
||||||
when(plugin.retrieveIPfilePath(anyString(),
|
when(plugin.retrieveIPfilePath(anyString(),
|
||||||
anyString(), anyMap())).thenReturn("/tmp");
|
anyString(), anyMap())).thenReturn(aocxPath);
|
||||||
when(plugin.configureIP(anyString(), any()))
|
when(plugin.configureIP(anyString(), any()))
|
||||||
.thenReturn(true);
|
.thenReturn(true);
|
||||||
when(plugin.discover(anyInt())).thenReturn(list);
|
when(plugin.discover(anyInt())).thenReturn(list);
|
||||||
return plugin;
|
return plugin;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static Container mockContainer(int id, int numFpga, String IPID) {
|
private static Container mockContainer(int id, int numFpga, String IPID) {
|
||||||
Container c = mock(Container.class);
|
Container c = mock(Container.class);
|
||||||
|
|
||||||
|
@ -519,7 +581,7 @@ public class TestFpgaResourceHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void mockStateStoreForContainer(Container container,
|
private void mockStateStoreForContainer(Container container,
|
||||||
List<FpgaResourceAllocator.FpgaDevice> assigned) {
|
List<FpgaDevice> assigned) {
|
||||||
ResourceMappings rmap = new ResourceMappings();
|
ResourceMappings rmap = new ResourceMappings();
|
||||||
ResourceMappings.AssignedResources ar =
|
ResourceMappings.AssignedResources ar =
|
||||||
new ResourceMappings.AssignedResources();
|
new ResourceMappings.AssignedResources();
|
||||||
|
|
Loading…
Reference in New Issue