diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java index e91ac3e041e..f8d47396e98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocation.java @@ -17,11 +17,9 @@ */ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa; -import com.google.common.collect.ImmutableMap; - import java.io.Serializable; +import java.util.HashMap; import java.util.Map; -import java.util.Objects; import java.util.Set; /** @@ -30,18 +28,27 @@ import java.util.Set; */ public class NumaResourceAllocation implements Serializable { private static final long serialVersionUID = 6339719798446595123L; - private final ImmutableMap nodeVsMemory; - private final ImmutableMap nodeVsCpus; + private Map nodeVsMemory; + private Map nodeVsCpus; - public NumaResourceAllocation(Map memoryAllocations, - Map cpuAllocations) { - nodeVsMemory = ImmutableMap.copyOf(memoryAllocations); - nodeVsCpus = ImmutableMap.copyOf(cpuAllocations); + public NumaResourceAllocation() { + nodeVsMemory = new HashMap<>(); + nodeVsCpus = new HashMap<>(); } public NumaResourceAllocation(String memNodeId, long memory, String cpuNodeId, int cpus) { - this(ImmutableMap.of(memNodeId, memory), ImmutableMap.of(cpuNodeId, cpus)); + this(); + nodeVsMemory.put(memNodeId, memory); + nodeVsCpus.put(cpuNodeId, cpus); + } + + public void addMemoryNode(String memNodeId, long memory) { + nodeVsMemory.put(memNodeId, memory); + } + + public void addCpuNode(String cpuNodeId, int cpus) { + nodeVsCpus.put(cpuNodeId, cpus); } public Set getMemNodes() { @@ -52,37 +59,11 @@ public class NumaResourceAllocation implements Serializable { return nodeVsCpus.keySet(); } - public ImmutableMap getNodeVsMemory() { + public Map getNodeVsMemory() { return nodeVsMemory; } - public ImmutableMap getNodeVsCpus() { + public Map getNodeVsCpus() { return nodeVsCpus; } - - @Override - public String toString() { - return "NumaResourceAllocation{" + - "nodeVsMemory=" + nodeVsMemory + - ", nodeVsCpus=" + nodeVsCpus + - '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - NumaResourceAllocation that = (NumaResourceAllocation) o; - return Objects.equals(nodeVsMemory, that.nodeVsMemory) && - Objects.equals(nodeVsCpus, that.nodeVsCpus); - } - - @Override - public int hashCode() { - return Objects.hash(nodeVsMemory, nodeVsCpus); - } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java index f95e55ee0e3..e152bdab87f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java @@ -31,7 +31,6 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.StringUtils; @@ -248,19 +247,17 @@ public class NumaResourceAllocator { // If there is no single node matched for the container resource // Check the NUMA nodes for Memory resources - long memoryRequirement = resource.getMemorySize(); - Map memoryAllocations = Maps.newHashMap(); + NumaResourceAllocation assignedNumaNodeInfo = new NumaResourceAllocation(); + long memreq = resource.getMemorySize(); for (NumaNodeResource numaNode : numaNodesList) { - long memoryRemaining = numaNode. - assignAvailableMemory(memoryRequirement, containerId); - memoryAllocations.put(numaNode.getNodeId(), - memoryRequirement - memoryRemaining); - memoryRequirement = memoryRemaining; - if (memoryRequirement == 0) { + long memrem = numaNode.assignAvailableMemory(memreq, containerId); + assignedNumaNodeInfo.addMemoryNode(numaNode.getNodeId(), memreq - memrem); + memreq = memrem; + if (memreq == 0) { break; } } - if (memoryRequirement != 0) { + if (memreq != 0) { LOG.info("There is no available memory:" + resource.getMemorySize() + " in numa nodes for " + containerId); releaseNumaResource(containerId); @@ -268,31 +265,26 @@ public class NumaResourceAllocator { } // Check the NUMA nodes for CPU resources - int cpusRequirement = resource.getVirtualCores(); - Map cpuAllocations = Maps.newHashMap(); + int cpusreq = resource.getVirtualCores(); for (int index = 0; index < numaNodesList.size(); index++) { NumaNodeResource numaNode = numaNodesList .get((currentAssignNode + index) % numaNodesList.size()); - int cpusRemaining = numaNode. - assignAvailableCpus(cpusRequirement, containerId); - cpuAllocations.put(numaNode.getNodeId(), cpusRequirement - cpusRemaining); - cpusRequirement = cpusRemaining; - if (cpusRequirement == 0) { + int cpusrem = numaNode.assignAvailableCpus(cpusreq, containerId); + assignedNumaNodeInfo.addCpuNode(numaNode.getNodeId(), cpusreq - cpusrem); + cpusreq = cpusrem; + if (cpusreq == 0) { currentAssignNode = (currentAssignNode + index + 1) % numaNodesList.size(); break; } } - if (cpusRequirement != 0) { + if (cpusreq != 0) { LOG.info("There are no available cpus:" + resource.getVirtualCores() + " in numa nodes for " + containerId); releaseNumaResource(containerId); return null; } - - NumaResourceAllocation assignedNumaNodeInfo = - new NumaResourceAllocation(memoryAllocations, cpuAllocations); LOG.info("Assigning multiple NUMA nodes (" + StringUtils.join(",", assignedNumaNodeInfo.getMemNodes()) + ") for memory, (" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 8de94a54987..1d7771a9e1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -1459,7 +1459,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { String keyResChng = CONTAINERS_KEY_PREFIX + container.getContainerId().toString() + CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType; try { - try (WriteBatch batch = db.createWriteBatch()) { + WriteBatch batch = db.createWriteBatch(); + try { ResourceMappings.AssignedResources res = new ResourceMappings.AssignedResources(); res.updateAssignedResources(assignedResources); @@ -1467,6 +1468,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { // New value will overwrite old values for the same key batch.put(bytes(keyResChng), res.toBytes()); db.write(batch); + } finally { + batch.close(); } } catch (DBException e) { markStoreUnHealthy(e); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index c4c194c3a94..87208f76497 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -75,9 +75,6 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; @@ -1451,7 +1448,7 @@ public class TestNMLeveldbStateStoreService { @Test public void testStateStoreForResourceMapping() throws IOException { - // test that stateStore is initially empty + // test empty when no state List recoveredContainers = loadContainersState(stateStore.getContainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); @@ -1467,43 +1464,38 @@ public class TestNMLeveldbStateStoreService { ResourceMappings resourceMappings = new ResourceMappings(); when(container.getResourceMappings()).thenReturn(resourceMappings); + // Store ResourceMapping stateStore.storeAssignedResources(container, "gpu", - Arrays.asList(new GpuDevice(1, 1), new GpuDevice(2, 2), - new GpuDevice(3, 3))); - - // This will overwrite the above - List gpuRes1 = Arrays.asList( - new GpuDevice(1, 1), new GpuDevice(2, 2), new GpuDevice(4, 4)); + Arrays.asList("1", "2", "3")); + // This will overwrite above + List gpuRes1 = Arrays.asList("1", "2", "4"); stateStore.storeAssignedResources(container, "gpu", gpuRes1); - - List fpgaRes = Arrays.asList( - new FpgaDevice("testType", 3, 3, "testIPID"), - new FpgaDevice("testType", 4, 4, "testIPID"), - new FpgaDevice("testType", 5, 5, "testIPID"), - new FpgaDevice("testType", 6, 6, "testIPID")); + List fpgaRes = Arrays.asList("3", "4", "5", "6"); stateStore.storeAssignedResources(container, "fpga", fpgaRes); - - List numaRes = Arrays.asList( - new NumaResourceAllocation("testmemNodeId", 2048, "testCpuNodeId", 10)); + List numaRes = Arrays.asList("numa1"); stateStore.storeAssignedResources(container, "numa", numaRes); + // add a invalid key restartStateStore(); recoveredContainers = loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); - List resources = rcs.getResourceMappings() + List res = rcs.getResourceMappings() .getAssignedResources("gpu"); - Assert.assertEquals(gpuRes1, resources); - Assert.assertEquals(gpuRes1, resourceMappings.getAssignedResources("gpu")); + Assert.assertTrue(res.equals(gpuRes1)); + Assert.assertTrue( + resourceMappings.getAssignedResources("gpu").equals(gpuRes1)); - resources = rcs.getResourceMappings().getAssignedResources("fpga"); - Assert.assertEquals(fpgaRes, resources); - Assert.assertEquals(fpgaRes, resourceMappings.getAssignedResources("fpga")); + res = rcs.getResourceMappings().getAssignedResources("fpga"); + Assert.assertTrue(res.equals(fpgaRes)); + Assert.assertTrue( + resourceMappings.getAssignedResources("fpga").equals(fpgaRes)); - resources = rcs.getResourceMappings().getAssignedResources("numa"); - Assert.assertEquals(numaRes, resources); - Assert.assertEquals(numaRes, resourceMappings.getAssignedResources("numa")); + res = rcs.getResourceMappings().getAssignedResources("numa"); + Assert.assertTrue(res.equals(numaRes)); + Assert.assertTrue( + resourceMappings.getAssignedResources("numa").equals(numaRes)); } @Test