diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java index 334c6bd4c30..e5622f92221 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java @@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDiscoverer; import java.io.IOException; import java.io.Serializable; @@ -143,6 +142,8 @@ public class FpgaResourceAllocator { private Integer minor; // IP file identifier. matrix multiplication for instance private String IPID; + // SHA-256 hash of the uploaded aocx file + private String aocxHash; // the device name under /dev private String devName; // the alias device name. Intel use acl number acl0 to acl31 @@ -168,6 +169,14 @@ public class FpgaResourceAllocator { return IPID; } + public String getAocxHash() { + return aocxHash; + } + + public void setAocxHash(String hash) { + this.aocxHash = hash; + } + public void setIPID(String IPID) { this.IPID = IPID; } @@ -263,7 +272,8 @@ public class FpgaResourceAllocator { @Override public String toString() { return "FPGA Device:(Type: " + this.type + ", Major: " + - this.major + ", Minor: " + this.minor + ", IPID: " + this.IPID + ")"; + this.major + ", Minor: " + this.minor + ", IPID: " + + this.IPID + ", Hash: " + this.aocxHash + ")"; } } @@ -279,11 +289,14 @@ public class FpgaResourceAllocator { } public synchronized void updateFpga(String requestor, - FpgaDevice device, String newIPID) { + FpgaDevice device, String newIPID, String newHash) { List usedFpgas = usedFpgaByRequestor.get(requestor); int index = findMatchedFpga(usedFpgas, device); if (-1 != index) { usedFpgas.get(index).setIPID(newIPID); + FpgaDevice fpga = usedFpgas.get(index); + fpga.setIPID(newIPID); + fpga.setAocxHash(newHash); } else { LOG.warn("Failed to update FPGA due to unknown reason " + "that no record for this allocated device:" + device); @@ -307,12 +320,12 @@ public class FpgaResourceAllocator { * @param type vendor plugin supported FPGA device type * @param count requested FPGA slot count * @param container container id - * @param IPIDPreference allocate slot with this IPID first + * @param ipidHash hash of the localized aocx file * @return Instance consists two List of allowed and denied {@link FpgaDevice} * @throws ResourceHandlerException When failed to allocate or write state store * */ public synchronized FpgaAllocation assignFpga(String type, long count, - Container container, String IPIDPreference) throws ResourceHandlerException { + Container container, String ipidHash) throws ResourceHandlerException { List currentAvailableFpga = availableFpga.get(type); String requestor = container.getContainerId().toString(); if (null == currentAvailableFpga) { @@ -327,8 +340,9 @@ public class FpgaResourceAllocator { List assignedFpgas = new LinkedList<>(); int matchIPCount = 0; for (int i = 0; i < currentAvailableFpga.size(); i++) { - if ( null != currentAvailableFpga.get(i).getIPID() && - currentAvailableFpga.get(i).getIPID().equalsIgnoreCase(IPIDPreference)) { + String deviceIPIDhash = currentAvailableFpga.get(i).getAocxHash(); + if (deviceIPIDhash != null && + deviceIPIDhash.equalsIgnoreCase(ipidHash)) { assignedFpgas.add(currentAvailableFpga.get(i)); currentAvailableFpga.remove(i); matchIPCount++; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java index d9ca8d1041e..1a9d6088777 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java @@ -19,9 +19,15 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga; -import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -39,19 +45,16 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resource import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.AbstractFpgaVendorPlugin; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDiscoverer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI; +import com.google.common.annotations.VisibleForTesting; @InterfaceStability.Unstable @InterfaceAudience.Private public class FpgaResourceHandlerImpl implements ResourceHandler { - - static final Logger LOG = LoggerFactory. - getLogger(FpgaResourceHandlerImpl.class); + private static final Logger LOG = + LoggerFactory.getLogger(FpgaResourceHandlerImpl.class); private final String REQUEST_FPGA_IP_ID_KEY = "REQUESTED_FPGA_IP_ID"; @@ -78,14 +81,13 @@ public class FpgaResourceHandlerImpl implements ResourceHandler { } @VisibleForTesting - public FpgaResourceAllocator getFpgaAllocator() { + FpgaResourceAllocator getFpgaAllocator() { return allocator; } public String getRequestedIPID(Container container) { - String r= container.getLaunchContext().getEnvironment(). + return container.getLaunchContext().getEnvironment(). get(REQUEST_FPGA_IP_ID_KEY); - return r == null ? "" : r; } @Override @@ -125,9 +127,22 @@ public class FpgaResourceHandlerImpl implements ResourceHandler { try { // allocate even request 0 FPGA because we need to deny all device numbers for this container + final String requestedIPID = getRequestedIPID(container); + String localizedIPIDHash = null; + ipFilePath = vendorPlugin.retrieveIPfilePath( + requestedIPID, container.getWorkDir(), + container.getResourceSet().getLocalizedResources()); + if (ipFilePath != null) { + try (FileInputStream fis = new FileInputStream(ipFilePath)) { + localizedIPIDHash = DigestUtils.sha256Hex(fis); + } catch (IOException e) { + throw new ResourceHandlerException("Could not calculate SHA-256", e); + } + } + FpgaResourceAllocator.FpgaAllocation allocation = allocator.assignFpga( vendorPlugin.getFpgaType(), deviceCount, - container, getRequestedIPID(container)); + container, localizedIPIDHash); LOG.info("FpgaAllocation:" + allocation); PrivilegedOperation privilegedOperation = @@ -173,18 +188,18 @@ public class FpgaResourceHandlerImpl implements ResourceHandler { for (int i = 0; i < allowed.size(); i++) { FpgaDevice device = allowed.get(i); majorMinorNumber = device.getMajor() + ":" + device.getMinor(); - String currentIPID = device.getIPID(); - if (null != currentIPID && - currentIPID.equalsIgnoreCase(getRequestedIPID(container))) { - LOG.info("IP already in device \"" + - allowed.get(i).getAliasDevName() + - "," + majorMinorNumber + "\", skip reprogramming"); + String currentHash = allowed.get(i).getAocxHash(); + if (currentHash != null && + currentHash.equalsIgnoreCase(localizedIPIDHash)) { + LOG.info("IP already in device \"" + + allowed.get(i).getAliasDevName() + "," + + majorMinorNumber + "\", skip reprogramming"); continue; } if (vendorPlugin.configureIP(ipFilePath, device)) { // update the allocator that we update an IP of a device allocator.updateFpga(containerIdStr, allowed.get(i), - getRequestedIPID(container)); + requestedIPID, localizedIPIDHash); //TODO: update the node constraint label } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/TestFpgaResourceHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/TestFpgaResourceHandler.java index 1048fecd108..1660b2e635d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/TestFpgaResourceHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/TestFpgaResourceHandler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga; +import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyList; @@ -31,7 +32,9 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -62,11 +65,23 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider; +import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; + +import com.google.common.io.Files; public class TestFpgaResourceHandler { + @Rule + public ExpectedException expected = ExpectedException.none(); + + private static final String HASHABLE_STRING = "abcdef"; + private static final String EXPECTED_HASH = + "bef57ec7f53a6d40beb640a780a639c83bc29ac8a9816f1fc6c5c6dcd93c4721"; + private Context mockContext; private FpgaResourceHandlerImpl fpgaResourceHandler; private Configuration configuration; @@ -75,10 +90,17 @@ public class TestFpgaResourceHandler { private NMStateStoreService mockNMStateStore; private ConcurrentHashMap runningContainersMap; private IntelFpgaOpenclPlugin mockVendorPlugin; + private List deviceList; private static final String vendorType = "IntelOpenCL"; + private File dummyAocx; + + private String getTestParentFolder() { + File f = new File("target/temp/" + TestFpgaResourceHandler.class.getName()); + return f.getAbsolutePath(); + } @Before - public void setup() { + public void setup() throws IOException { CustomResourceTypesConfigurationProvider. initResourceTypes(ResourceInformation.FPGA_URI); configuration = new YarnConfiguration(); @@ -88,13 +110,12 @@ public class TestFpgaResourceHandler { mockNMStateStore = mock(NMStateStoreService.class); mockContext = mock(Context.class); // Assumed devices parsed from output - List list = new ArrayList<>(); - list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 0, null)); - list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 1, null)); - list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 2, null)); - list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 3, null)); - list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 4, null)); - mockVendorPlugin = mockPlugin(vendorType, list); + deviceList = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + deviceList.add(new FpgaDevice(vendorType, 247, i, null)); + } + String aocxPath = getTestParentFolder() + "/test.aocx"; + mockVendorPlugin = mockPlugin(vendorType, deviceList, aocxPath); FpgaDiscoverer.getInstance().setConf(configuration); when(mockContext.getNMStateStore()).thenReturn(mockNMStateStore); runningContainersMap = new ConcurrentHashMap<>(); @@ -102,6 +123,18 @@ public class TestFpgaResourceHandler { fpgaResourceHandler = new FpgaResourceHandlerImpl(mockContext, mockCGroupsHandler, mockPrivilegedExecutor, mockVendorPlugin); + + dummyAocx = new File(aocxPath); + Files.createParentDirs(dummyAocx); + Files.touch(dummyAocx); + Files.append(HASHABLE_STRING, dummyAocx, StandardCharsets.UTF_8); + } + + @After + public void teardown() { + if (dummyAocx != null) { + dummyAocx.delete(); + } } @Test @@ -125,11 +158,11 @@ public class TestFpgaResourceHandler { fpgaResourceHandler.bootstrap(configuration); Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAllowedFpga().size()); - List allowedDevices = + List allowedDevices = fpgaResourceHandler.getFpgaAllocator().getAllowedFpga(); for (String s : allowed.split(",")) { boolean check = false; - for (FpgaResourceAllocator.FpgaDevice device : allowedDevices) { + for (FpgaDevice device : allowedDevices) { if (device.getMinor().toString().equals(s)) { check = true; } @@ -212,9 +245,9 @@ public class TestFpgaResourceHandler { fpgaResourceHandler.preStart(mockContainer(0, 1, "GEMM")); Assert.assertEquals(1, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount()); verifyDeniedDevices(getContainerId(0), Arrays.asList(1, 2)); - List list = fpgaResourceHandler.getFpgaAllocator() + List list = fpgaResourceHandler.getFpgaAllocator() .getUsedFpga().get(getContainerId(0).toString()); - for (FpgaResourceAllocator.FpgaDevice device : list) { + for (FpgaDevice device : list) { Assert.assertEquals("IP should be updated to GEMM", "GEMM", device.getIPID()); } // Case 2. The id-1 container request 3 FPGA of IntelOpenCL and GEMM IP. this should fail @@ -254,7 +287,7 @@ public class TestFpgaResourceHandler { // IPID should be GEMM for id-3 container list = fpgaResourceHandler.getFpgaAllocator() .getUsedFpga().get(getContainerId(3).toString()); - for (FpgaResourceAllocator.FpgaDevice device : list) { + for (FpgaDevice device : list) { Assert.assertEquals("IPID should be GEMM", "GEMM", device.getIPID()); } Assert.assertEquals(2, @@ -286,20 +319,21 @@ public class TestFpgaResourceHandler { @Test public void testsAllocationWithExistingIPIDDevices() - throws ResourceHandlerException, PrivilegedOperationException { + throws ResourceHandlerException, PrivilegedOperationException, + IOException { configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, "0,1,2"); fpgaResourceHandler.bootstrap(configuration); // The id-0 container request 3 FPGA of IntelOpenCL type and GEMM IP fpgaResourceHandler.preStart(mockContainer(0, 3, "GEMM")); Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount()); - List list = + List list = fpgaResourceHandler .getFpgaAllocator() .getUsedFpga() .get(getContainerId(0).toString()); fpgaResourceHandler.postComplete(getContainerId(0)); - for (FpgaResourceAllocator.FpgaDevice device : list) { + for (FpgaDevice device : list) { Assert.assertEquals("IP should be updated to GEMM", "GEMM", device.getIPID()); } @@ -314,6 +348,8 @@ public class TestFpgaResourceHandler { fpgaResourceHandler.postComplete(getContainerId(2)); // Case 2. id-2 container request preStart, with 1 plugin.configureIP called + // Add some characters to the dummy file to have its hash changed + Files.append("12345", dummyAocx, StandardCharsets.UTF_8); fpgaResourceHandler.preStart(mockContainer(1, 1, "GZIP")); // we should have 4 times invocation verify(mockVendorPlugin, times(4)).configureIP(anyString(), @@ -342,7 +378,7 @@ public class TestFpgaResourceHandler { fpgaResourceHandler.bootstrap(configuration); Container container0 = mockContainer(0, 3, "GEMM"); fpgaResourceHandler.preStart(container0); - List assigned = + List assigned = fpgaResourceHandler .getFpgaAllocator() .getUsedFpga() @@ -361,11 +397,11 @@ public class TestFpgaResourceHandler { @Test public void testReacquireContainer() throws ResourceHandlerException { Container c0 = mockContainer(0, 2, "GEMM"); - List assigned = new ArrayList<>(); + List assigned = new ArrayList<>(); assigned.add(new - FpgaResourceAllocator.FpgaDevice(vendorType, 247, 0, null)); + FpgaDevice(vendorType, 247, 0, null)); assigned.add(new - FpgaResourceAllocator.FpgaDevice(vendorType, 247, 1, null)); + FpgaDevice(vendorType, 247, 1, null)); // Mock we've stored the c0 states mockStateStoreForContainer(c0, assigned); // NM start @@ -378,11 +414,11 @@ public class TestFpgaResourceHandler { // Case 1. try recover state for id-0 container fpgaResourceHandler.reacquireContainer(getContainerId(0)); // minor number matches - List used = + List used = fpgaResourceHandler.getFpgaAllocator(). getUsedFpga().get(getContainerId(0).toString()); int count = 0; - for (FpgaResourceAllocator.FpgaDevice device : used) { + for (FpgaDevice device : used) { if (device.getMinor().equals(0)){ count++; } @@ -391,13 +427,13 @@ public class TestFpgaResourceHandler { } } Assert.assertEquals("Unexpected used minor number in allocator",2, count); - List available = + List available = fpgaResourceHandler .getFpgaAllocator() .getAvailableFpga() .get(vendorType); count = 0; - for (FpgaResourceAllocator.FpgaDevice device : available) { + for (FpgaDevice device : available) { if (device.getMinor().equals(2)) { count++; } @@ -410,7 +446,7 @@ public class TestFpgaResourceHandler { Container c1 = mockContainer(1, 1, "GEMM"); assigned = new ArrayList<>(); assigned.add(new - FpgaResourceAllocator.FpgaDevice(vendorType, 247, 5, null)); + FpgaDevice(vendorType, 247, 5, null)); // Mock we've stored the c1 states mockStateStoreForContainer(c1, assigned); boolean flag = false; @@ -429,7 +465,7 @@ public class TestFpgaResourceHandler { Container c2 = mockContainer(2, 1, "GEMM"); assigned = new ArrayList<>(); assigned.add(new - FpgaResourceAllocator.FpgaDevice(vendorType, 247, 1, null)); + FpgaDevice(vendorType, 247, 1, null)); // Mock we've stored the c2 states mockStateStoreForContainer(c2, assigned); flag = false; @@ -448,7 +484,7 @@ public class TestFpgaResourceHandler { Container c3 = mockContainer(3, 1, "GEMM"); assigned = new ArrayList<>(); assigned.add(new - FpgaResourceAllocator.FpgaDevice(vendorType, 247, 2, null)); + FpgaDevice(vendorType, 247, 2, null)); // Mock we've stored the c2 states mockStateStoreForContainer(c3, assigned); fpgaResourceHandler.reacquireContainer(getContainerId(3)); @@ -458,6 +494,33 @@ public class TestFpgaResourceHandler { fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount()); } + @Test + public void testSha256CalculationFails() throws ResourceHandlerException { + expected.expect(ResourceHandlerException.class); + expected.expectMessage("Could not calculate SHA-256"); + + dummyAocx.delete(); + fpgaResourceHandler.preStart(mockContainer(0, 1, "GEMM")); + } + + @Test + public void testSha256CalculationSucceeds() + throws IOException, ResourceHandlerException { + mockVendorPlugin = + mockPlugin(vendorType, deviceList, dummyAocx.getAbsolutePath()); + fpgaResourceHandler = new FpgaResourceHandlerImpl(mockContext, + mockCGroupsHandler, mockPrivilegedExecutor, mockVendorPlugin); + + fpgaResourceHandler.bootstrap(configuration); + fpgaResourceHandler.preStart(mockContainer(0, 1, "GEMM")); + + // IP file is assigned to the first device + List devices = + fpgaResourceHandler.getFpgaAllocator().getAllowedFpga(); + FpgaDevice device = devices.get(0); + assertEquals("Hash value", EXPECTED_HASH, device.getAocxHash()); + } + private void verifyDeniedDevices(ContainerId containerId, List deniedDevices) throws ResourceHandlerException, PrivilegedOperationException { @@ -480,19 +543,18 @@ public class TestFpgaResourceHandler { } private static IntelFpgaOpenclPlugin mockPlugin(String type, - List list) { + List list, String aocxPath) { IntelFpgaOpenclPlugin plugin = mock(IntelFpgaOpenclPlugin.class); when(plugin.initPlugin(any())).thenReturn(true); when(plugin.getFpgaType()).thenReturn(type); when(plugin.retrieveIPfilePath(anyString(), - anyString(), anyMap())).thenReturn("/tmp"); + anyString(), anyMap())).thenReturn(aocxPath); when(plugin.configureIP(anyString(), any())) .thenReturn(true); when(plugin.discover(anyInt())).thenReturn(list); return plugin; } - private static Container mockContainer(int id, int numFpga, String IPID) { Container c = mock(Container.class); @@ -519,7 +581,7 @@ public class TestFpgaResourceHandler { } private void mockStateStoreForContainer(Container container, - List assigned) { + List assigned) { ResourceMappings rmap = new ResourceMappings(); ResourceMappings.AssignedResources ar = new ResourceMappings.AssignedResources();