YARN-9269. Minor cleanup in FpgaResourceAllocator. Contributed by Peter Bacsko.

This commit is contained in:
Devaraj K 2019-03-27 10:08:07 -07:00
parent b4ed81c4e6
commit a4cd75e09c
2 changed files with 53 additions and 53 deletions

View File

@ -33,11 +33,15 @@
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.*; import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI; import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
/** /**
* This FPGA resource allocator tends to be used by different FPGA vendor's plugin * This FPGA resource allocator tends to be used by different FPGA vendor's plugin
* A "type" parameter is taken into consideration when allocation * A "type" parameter is taken into consideration when allocation
@ -50,20 +54,21 @@ public class FpgaResourceAllocator {
private List<FpgaDevice> allowedFpgas = new LinkedList<>(); private List<FpgaDevice> allowedFpgas = new LinkedList<>();
//key is resource type of FPGA, vendor plugin supported ID //key is resource type of FPGA, vendor plugin supported ID
private LinkedHashMap<String, List<FpgaDevice>> availableFpga = new LinkedHashMap<>(); private Map<String, List<FpgaDevice>> availableFpgas = new HashMap<>();
//key is requestor, aka. container ID //key is the container ID
private LinkedHashMap<String, List<FpgaDevice>> usedFpgaByRequestor = new LinkedHashMap<>(); private Map<String, List<FpgaDevice>> containerToFpgaMapping =
new HashMap<>();
private Context nmContext; private Context nmContext;
@VisibleForTesting @VisibleForTesting
public HashMap<String, List<FpgaDevice>> getAvailableFpga() { Map<String, List<FpgaDevice>> getAvailableFpga() {
return availableFpga; return availableFpgas;
} }
@VisibleForTesting @VisibleForTesting
public List<FpgaDevice> getAllowedFpga() { List<FpgaDevice> getAllowedFpga() {
return allowedFpgas; return allowedFpgas;
} }
@ -72,25 +77,31 @@ public FpgaResourceAllocator(Context ctx) {
} }
@VisibleForTesting @VisibleForTesting
public int getAvailableFpgaCount() { int getAvailableFpgaCount() {
int count = 0; int count = 0;
for (List<FpgaDevice> l : availableFpga.values()) {
count += l.size(); count = availableFpgas.values()
} .stream()
.mapToInt(i -> i.size())
.sum();
return count; return count;
} }
@VisibleForTesting @VisibleForTesting
public HashMap<String, List<FpgaDevice>> getUsedFpga() { Map<String, List<FpgaDevice>> getUsedFpga() {
return usedFpgaByRequestor; return containerToFpgaMapping;
} }
@VisibleForTesting @VisibleForTesting
public int getUsedFpgaCount() { int getUsedFpgaCount() {
int count = 0; int count = 0;
for (List<FpgaDevice> l : usedFpgaByRequestor.values()) {
count += l.size(); count = containerToFpgaMapping.values()
} .stream()
.mapToInt(i -> i.size())
.sum();
return count; return count;
} }
@ -252,42 +263,31 @@ public String toString() {
} }
} }
public synchronized void addFpga(String type, List<FpgaDevice> list) { // called once during initialization
availableFpga.putIfAbsent(type, new LinkedList<>()); public synchronized void addFpgaDevices(String type, List<FpgaDevice> list) {
availableFpgas.putIfAbsent(type, new LinkedList<>());
List<FpgaDevice> fpgaDevices = new LinkedList<>();
for (FpgaDevice device : list) { for (FpgaDevice device : list) {
if (!allowedFpgas.contains(device)) { if (!allowedFpgas.contains(device)) {
allowedFpgas.add(device); fpgaDevices.add(device);
availableFpga.get(type).add(device); availableFpgas.get(type).add(device);
} else {
LOG.warn("Duplicate device found: " + device + ". Ignored");
} }
} }
LOG.info("Add a list of FPGA Devices: " + list);
allowedFpgas = ImmutableList.copyOf(fpgaDevices);
LOG.info("Added a list of FPGA Devices: " + allowedFpgas);
} }
public synchronized void updateFpga(String requestor, public synchronized void updateFpga(String requestor,
FpgaDevice device, String newIPID, String newHash) { FpgaDevice device, String newIPID, String newHash) {
List<FpgaDevice> usedFpgas = usedFpgaByRequestor.get(requestor); device.setIPID(newIPID);
int index = findMatchedFpga(usedFpgas, device); device.setAocxHash(newHash);
if (-1 != index) {
usedFpgas.get(index).setIPID(newIPID);
FpgaDevice fpga = usedFpgas.get(index);
fpga.setIPID(newIPID);
fpga.setAocxHash(newHash);
} else {
LOG.warn("Failed to update FPGA due to unknown reason " +
"that no record for this allocated device:" + device);
}
LOG.info("Update IPID to " + newIPID + LOG.info("Update IPID to " + newIPID +
" for this allocated device:" + device); " for this allocated device: " + device);
} LOG.info("Update IP hash to " + newHash);
private synchronized int findMatchedFpga(List<FpgaDevice> devices, FpgaDevice item) {
int i = 0;
for (; i < devices.size(); i++) {
if (devices.get(i) == item) {
return i;
}
}
return -1;
} }
/** /**
@ -301,7 +301,8 @@ private synchronized int findMatchedFpga(List<FpgaDevice> devices, FpgaDevice it
* */ * */
public synchronized FpgaAllocation assignFpga(String type, long count, public synchronized FpgaAllocation assignFpga(String type, long count,
Container container, String ipidHash) throws ResourceHandlerException { Container container, String ipidHash) throws ResourceHandlerException {
List<FpgaDevice> currentAvailableFpga = availableFpga.get(type); List<FpgaDevice> currentAvailableFpga = availableFpgas.get(type);
String requestor = container.getContainerId().toString(); String requestor = container.getContainerId().toString();
if (null == currentAvailableFpga) { if (null == currentAvailableFpga) {
throw new ResourceHandlerException("No such type of FPGA resource available: " + type); throw new ResourceHandlerException("No such type of FPGA resource available: " + type);
@ -341,8 +342,8 @@ public synchronized FpgaAllocation assignFpga(String type, long count,
} }
// update state store success, update internal used FPGAs // update state store success, update internal used FPGAs
usedFpgaByRequestor.putIfAbsent(requestor, new LinkedList<>()); containerToFpgaMapping.putIfAbsent(requestor, new LinkedList<>());
usedFpgaByRequestor.get(requestor).addAll(assignedFpgas); containerToFpgaMapping.get(requestor).addAll(assignedFpgas);
} }
return new FpgaAllocation(assignedFpgas, currentAvailableFpga); return new FpgaAllocation(assignedFpgas, currentAvailableFpga);
@ -390,14 +391,13 @@ public synchronized void recoverAssignedFpgas(ContainerId containerId) throws Re
} }
public synchronized void cleanupAssignFpgas(String requestor) { public synchronized void cleanupAssignFpgas(String requestor) {
List<FpgaDevice> usedFpgas = usedFpgaByRequestor.get(requestor); List<FpgaDevice> usedFpgas = containerToFpgaMapping.get(requestor);
if (usedFpgas != null) { if (usedFpgas != null) {
for (FpgaDevice device : usedFpgas) { for (FpgaDevice device : usedFpgas) {
// Add back to availableFpga // Add back to availableFpga
availableFpga.get(device.getType()).add(device); availableFpgas.get(device.getType()).add(device);
} }
usedFpgaByRequestor.remove(requestor); containerToFpgaMapping.remove(requestor);
} }
} }
} }

View File

@ -101,7 +101,7 @@ public List<PrivilegedOperation> bootstrap(Configuration configuration)
// Get avialable devices minor numbers from toolchain or static configuration // Get avialable devices minor numbers from toolchain or static configuration
List<FpgaResourceAllocator.FpgaDevice> fpgaDeviceList = List<FpgaResourceAllocator.FpgaDevice> fpgaDeviceList =
FpgaDiscoverer.getInstance().discover(); FpgaDiscoverer.getInstance().discover();
allocator.addFpga(vendorPlugin.getFpgaType(), fpgaDeviceList); allocator.addFpgaDevices(vendorPlugin.getFpgaType(), fpgaDeviceList);
this.cGroupsHandler.initializeCGroupController( this.cGroupsHandler.initializeCGroupController(
CGroupsHandler.CGroupController.DEVICES); CGroupsHandler.CGroupController.DEVICES);
return null; return null;