From a4cd75e09c934699ec5e2fa969f1c8d0a14c1d49 Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Wed, 27 Mar 2019 10:08:07 -0700 Subject: [PATCH] YARN-9269. Minor cleanup in FpgaResourceAllocator. Contributed by Peter Bacsko. --- .../resources/fpga/FpgaResourceAllocator.java | 104 +++++++++--------- .../fpga/FpgaResourceHandlerImpl.java | 2 +- 2 files changed, 53 insertions(+), 53 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java index b64ffd04d87..01036db4be8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java @@ -33,11 +33,15 @@ import java.io.IOException; import java.io.Serializable; -import java.util.*; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI; - /** * This FPGA resource allocator tends to be used by different FPGA vendor's plugin * A "type" parameter is taken into consideration when allocation @@ -50,20 +54,21 @@ public class FpgaResourceAllocator { private List allowedFpgas = new LinkedList<>(); //key is resource type of FPGA, vendor plugin supported ID - private LinkedHashMap> availableFpga = new LinkedHashMap<>(); + private Map> availableFpgas = new HashMap<>(); - //key is requestor, aka. container ID - private LinkedHashMap> usedFpgaByRequestor = new LinkedHashMap<>(); + //key is the container ID + private Map> containerToFpgaMapping = + new HashMap<>(); private Context nmContext; @VisibleForTesting - public HashMap> getAvailableFpga() { - return availableFpga; + Map> getAvailableFpga() { + return availableFpgas; } @VisibleForTesting - public List getAllowedFpga() { + List getAllowedFpga() { return allowedFpgas; } @@ -72,25 +77,31 @@ public FpgaResourceAllocator(Context ctx) { } @VisibleForTesting - public int getAvailableFpgaCount() { + int getAvailableFpgaCount() { int count = 0; - for (List l : availableFpga.values()) { - count += l.size(); - } + + count = availableFpgas.values() + .stream() + .mapToInt(i -> i.size()) + .sum(); + return count; } @VisibleForTesting - public HashMap> getUsedFpga() { - return usedFpgaByRequestor; + Map> getUsedFpga() { + return containerToFpgaMapping; } @VisibleForTesting - public int getUsedFpgaCount() { + int getUsedFpgaCount() { int count = 0; - for (List l : usedFpgaByRequestor.values()) { - count += l.size(); - } + + count = containerToFpgaMapping.values() + .stream() + .mapToInt(i -> i.size()) + .sum(); + return count; } @@ -252,42 +263,31 @@ public String toString() { } } - public synchronized void addFpga(String type, List list) { - availableFpga.putIfAbsent(type, new LinkedList<>()); + // called once during initialization + public synchronized void addFpgaDevices(String type, List list) { + availableFpgas.putIfAbsent(type, new LinkedList<>()); + List fpgaDevices = new LinkedList<>(); + for (FpgaDevice device : list) { if (!allowedFpgas.contains(device)) { - allowedFpgas.add(device); - availableFpga.get(type).add(device); + fpgaDevices.add(device); + availableFpgas.get(type).add(device); + } else { + LOG.warn("Duplicate device found: " + device + ". Ignored"); } } - LOG.info("Add a list of FPGA Devices: " + list); + + allowedFpgas = ImmutableList.copyOf(fpgaDevices); + LOG.info("Added a list of FPGA Devices: " + allowedFpgas); } public synchronized void updateFpga(String requestor, FpgaDevice device, String newIPID, String newHash) { - List usedFpgas = usedFpgaByRequestor.get(requestor); - int index = findMatchedFpga(usedFpgas, device); - if (-1 != index) { - usedFpgas.get(index).setIPID(newIPID); - FpgaDevice fpga = usedFpgas.get(index); - fpga.setIPID(newIPID); - fpga.setAocxHash(newHash); - } else { - LOG.warn("Failed to update FPGA due to unknown reason " + - "that no record for this allocated device:" + device); - } + device.setIPID(newIPID); + device.setAocxHash(newHash); LOG.info("Update IPID to " + newIPID + - " for this allocated device:" + device); - } - - private synchronized int findMatchedFpga(List devices, FpgaDevice item) { - int i = 0; - for (; i < devices.size(); i++) { - if (devices.get(i) == item) { - return i; - } - } - return -1; + " for this allocated device: " + device); + LOG.info("Update IP hash to " + newHash); } /** @@ -301,7 +301,8 @@ private synchronized int findMatchedFpga(List devices, FpgaDevice it * */ public synchronized FpgaAllocation assignFpga(String type, long count, Container container, String ipidHash) throws ResourceHandlerException { - List currentAvailableFpga = availableFpga.get(type); + List currentAvailableFpga = availableFpgas.get(type); + String requestor = container.getContainerId().toString(); if (null == currentAvailableFpga) { throw new ResourceHandlerException("No such type of FPGA resource available: " + type); @@ -341,8 +342,8 @@ public synchronized FpgaAllocation assignFpga(String type, long count, } // update state store success, update internal used FPGAs - usedFpgaByRequestor.putIfAbsent(requestor, new LinkedList<>()); - usedFpgaByRequestor.get(requestor).addAll(assignedFpgas); + containerToFpgaMapping.putIfAbsent(requestor, new LinkedList<>()); + containerToFpgaMapping.get(requestor).addAll(assignedFpgas); } return new FpgaAllocation(assignedFpgas, currentAvailableFpga); @@ -390,14 +391,13 @@ public synchronized void recoverAssignedFpgas(ContainerId containerId) throws Re } public synchronized void cleanupAssignFpgas(String requestor) { - List usedFpgas = usedFpgaByRequestor.get(requestor); + List usedFpgas = containerToFpgaMapping.get(requestor); if (usedFpgas != null) { for (FpgaDevice device : usedFpgas) { // Add back to availableFpga - availableFpga.get(device.getType()).add(device); + availableFpgas.get(device.getType()).add(device); } - usedFpgaByRequestor.remove(requestor); + containerToFpgaMapping.remove(requestor); } } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java index 1a9d6088777..cd1ea13bc4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java @@ -101,7 +101,7 @@ public List bootstrap(Configuration configuration) // Get avialable devices minor numbers from toolchain or static configuration List fpgaDeviceList = FpgaDiscoverer.getInstance().discover(); - allocator.addFpga(vendorPlugin.getFpgaType(), fpgaDeviceList); + allocator.addFpgaDevices(vendorPlugin.getFpgaType(), fpgaDeviceList); this.cGroupsHandler.initializeCGroupController( CGroupsHandler.CGroupController.DEVICES); return null;