YARN-9135. NM State store ResourceMappings serialization are tested with Strings instead of real Device objects. Contributed by Peter Bacsko

(cherry picked from commit 8b3c6791b1)
This commit is contained in:
Szilard Nemeth 2019-07-12 17:20:42 +02:00
parent a762a6be29
commit cb91ab73b0
4 changed files with 89 additions and 57 deletions

View File

@ -17,9 +17,11 @@
*/ */
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa; package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
import com.google.common.collect.ImmutableMap;
import java.io.Serializable; import java.io.Serializable;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
/** /**
@ -28,27 +30,18 @@ import java.util.Set;
*/ */
public class NumaResourceAllocation implements Serializable { public class NumaResourceAllocation implements Serializable {
private static final long serialVersionUID = 6339719798446595123L; private static final long serialVersionUID = 6339719798446595123L;
private Map<String, Long> nodeVsMemory; private final ImmutableMap<String, Long> nodeVsMemory;
private Map<String, Integer> nodeVsCpus; private final ImmutableMap<String, Integer> nodeVsCpus;
public NumaResourceAllocation() { public NumaResourceAllocation(Map<String, Long> memoryAllocations,
nodeVsMemory = new HashMap<>(); Map<String, Integer> cpuAllocations) {
nodeVsCpus = new HashMap<>(); nodeVsMemory = ImmutableMap.copyOf(memoryAllocations);
nodeVsCpus = ImmutableMap.copyOf(cpuAllocations);
} }
public NumaResourceAllocation(String memNodeId, long memory, String cpuNodeId, public NumaResourceAllocation(String memNodeId, long memory, String cpuNodeId,
int cpus) { int cpus) {
this(); this(ImmutableMap.of(memNodeId, memory), ImmutableMap.of(cpuNodeId, cpus));
nodeVsMemory.put(memNodeId, memory);
nodeVsCpus.put(cpuNodeId, cpus);
}
public void addMemoryNode(String memNodeId, long memory) {
nodeVsMemory.put(memNodeId, memory);
}
public void addCpuNode(String cpuNodeId, int cpus) {
nodeVsCpus.put(cpuNodeId, cpus);
} }
public Set<String> getMemNodes() { public Set<String> getMemNodes() {
@ -59,11 +52,37 @@ public class NumaResourceAllocation implements Serializable {
return nodeVsCpus.keySet(); return nodeVsCpus.keySet();
} }
public Map<String, Long> getNodeVsMemory() { public ImmutableMap<String, Long> getNodeVsMemory() {
return nodeVsMemory; return nodeVsMemory;
} }
public Map<String, Integer> getNodeVsCpus() { public ImmutableMap<String, Integer> getNodeVsCpus() {
return nodeVsCpus; return nodeVsCpus;
} }
}
@Override
public String toString() {
return "NumaResourceAllocation{" +
"nodeVsMemory=" + nodeVsMemory +
", nodeVsCpus=" + nodeVsCpus +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
NumaResourceAllocation that = (NumaResourceAllocation) o;
return Objects.equals(nodeVsMemory, that.nodeVsMemory) &&
Objects.equals(nodeVsCpus, that.nodeVsCpus);
}
@Override
public int hashCode() {
return Objects.hash(nodeVsMemory, nodeVsCpus);
}
}

View File

@ -29,6 +29,7 @@ import java.util.Map.Entry;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -247,17 +248,19 @@ public class NumaResourceAllocator {
// If there is no single node matched for the container resource // If there is no single node matched for the container resource
// Check the NUMA nodes for Memory resources // Check the NUMA nodes for Memory resources
NumaResourceAllocation assignedNumaNodeInfo = new NumaResourceAllocation(); long memoryRequirement = resource.getMemorySize();
long memreq = resource.getMemorySize(); Map<String, Long> memoryAllocations = Maps.newHashMap();
for (NumaNodeResource numaNode : numaNodesList) { for (NumaNodeResource numaNode : numaNodesList) {
long memrem = numaNode.assignAvailableMemory(memreq, containerId); long memoryRemaining = numaNode.
assignedNumaNodeInfo.addMemoryNode(numaNode.getNodeId(), memreq - memrem); assignAvailableMemory(memoryRequirement, containerId);
memreq = memrem; memoryAllocations.put(numaNode.getNodeId(),
if (memreq == 0) { memoryRequirement - memoryRemaining);
memoryRequirement = memoryRemaining;
if (memoryRequirement == 0) {
break; break;
} }
} }
if (memreq != 0) { if (memoryRequirement != 0) {
LOG.info("There is no available memory:" + resource.getMemorySize() LOG.info("There is no available memory:" + resource.getMemorySize()
+ " in numa nodes for " + containerId); + " in numa nodes for " + containerId);
releaseNumaResource(containerId); releaseNumaResource(containerId);
@ -265,26 +268,31 @@ public class NumaResourceAllocator {
} }
// Check the NUMA nodes for CPU resources // Check the NUMA nodes for CPU resources
int cpusreq = resource.getVirtualCores(); int cpusRequirement = resource.getVirtualCores();
Map<String, Integer> cpuAllocations = Maps.newHashMap();
for (int index = 0; index < numaNodesList.size(); index++) { for (int index = 0; index < numaNodesList.size(); index++) {
NumaNodeResource numaNode = numaNodesList NumaNodeResource numaNode = numaNodesList
.get((currentAssignNode + index) % numaNodesList.size()); .get((currentAssignNode + index) % numaNodesList.size());
int cpusrem = numaNode.assignAvailableCpus(cpusreq, containerId); int cpusRemaining = numaNode.
assignedNumaNodeInfo.addCpuNode(numaNode.getNodeId(), cpusreq - cpusrem); assignAvailableCpus(cpusRequirement, containerId);
cpusreq = cpusrem; cpuAllocations.put(numaNode.getNodeId(), cpusRequirement - cpusRemaining);
if (cpusreq == 0) { cpusRequirement = cpusRemaining;
if (cpusRequirement == 0) {
currentAssignNode = (currentAssignNode + index + 1) currentAssignNode = (currentAssignNode + index + 1)
% numaNodesList.size(); % numaNodesList.size();
break; break;
} }
} }
if (cpusreq != 0) { if (cpusRequirement != 0) {
LOG.info("There are no available cpus:" + resource.getVirtualCores() LOG.info("There are no available cpus:" + resource.getVirtualCores()
+ " in numa nodes for " + containerId); + " in numa nodes for " + containerId);
releaseNumaResource(containerId); releaseNumaResource(containerId);
return null; return null;
} }
NumaResourceAllocation assignedNumaNodeInfo =
new NumaResourceAllocation(memoryAllocations, cpuAllocations);
LOG.info("Assigning multiple NUMA nodes (" LOG.info("Assigning multiple NUMA nodes ("
+ StringUtils.join(",", assignedNumaNodeInfo.getMemNodes()) + StringUtils.join(",", assignedNumaNodeInfo.getMemNodes())
+ ") for memory, (" + ") for memory, ("

View File

@ -1459,8 +1459,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
String keyResChng = CONTAINERS_KEY_PREFIX + container.getContainerId().toString() String keyResChng = CONTAINERS_KEY_PREFIX + container.getContainerId().toString()
+ CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType; + CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType;
try { try {
WriteBatch batch = db.createWriteBatch(); try (WriteBatch batch = db.createWriteBatch()) {
try {
ResourceMappings.AssignedResources res = ResourceMappings.AssignedResources res =
new ResourceMappings.AssignedResources(); new ResourceMappings.AssignedResources();
res.updateAssignedResources(assignedResources); res.updateAssignedResources(assignedResources);
@ -1468,8 +1467,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
// New value will overwrite old values for the same key // New value will overwrite old values for the same key
batch.put(bytes(keyResChng), res.toBytes()); batch.put(bytes(keyResChng), res.toBytes());
db.write(batch); db.write(batch);
} finally {
batch.close();
} }
} catch (DBException e) { } catch (DBException e) {
markStoreUnHealthy(e); markStoreUnHealthy(e);

View File

@ -75,6 +75,9 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
@ -1448,7 +1451,7 @@ public class TestNMLeveldbStateStoreService {
@Test @Test
public void testStateStoreForResourceMapping() throws IOException { public void testStateStoreForResourceMapping() throws IOException {
// test empty when no state // test that stateStore is initially empty
List<RecoveredContainerState> recoveredContainers = List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator()); loadContainersState(stateStore.getContainerStateIterator());
assertTrue(recoveredContainers.isEmpty()); assertTrue(recoveredContainers.isEmpty());
@ -1464,38 +1467,43 @@ public class TestNMLeveldbStateStoreService {
ResourceMappings resourceMappings = new ResourceMappings(); ResourceMappings resourceMappings = new ResourceMappings();
when(container.getResourceMappings()).thenReturn(resourceMappings); when(container.getResourceMappings()).thenReturn(resourceMappings);
// Store ResourceMapping
stateStore.storeAssignedResources(container, "gpu", stateStore.storeAssignedResources(container, "gpu",
Arrays.asList("1", "2", "3")); Arrays.asList(new GpuDevice(1, 1), new GpuDevice(2, 2),
// This will overwrite above new GpuDevice(3, 3)));
List<Serializable> gpuRes1 = Arrays.asList("1", "2", "4");
// This will overwrite the above
List<Serializable> gpuRes1 = Arrays.asList(
new GpuDevice(1, 1), new GpuDevice(2, 2), new GpuDevice(4, 4));
stateStore.storeAssignedResources(container, "gpu", gpuRes1); stateStore.storeAssignedResources(container, "gpu", gpuRes1);
List<Serializable> fpgaRes = Arrays.asList("3", "4", "5", "6");
List<Serializable> fpgaRes = Arrays.asList(
new FpgaDevice("testType", 3, 3, "testIPID"),
new FpgaDevice("testType", 4, 4, "testIPID"),
new FpgaDevice("testType", 5, 5, "testIPID"),
new FpgaDevice("testType", 6, 6, "testIPID"));
stateStore.storeAssignedResources(container, "fpga", fpgaRes); stateStore.storeAssignedResources(container, "fpga", fpgaRes);
List<Serializable> numaRes = Arrays.asList("numa1");
List<Serializable> numaRes = Arrays.asList(
new NumaResourceAllocation("testmemNodeId", 2048, "testCpuNodeId", 10));
stateStore.storeAssignedResources(container, "numa", numaRes); stateStore.storeAssignedResources(container, "numa", numaRes);
// add a invalid key
restartStateStore(); restartStateStore();
recoveredContainers = recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator()); loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size()); assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0); RecoveredContainerState rcs = recoveredContainers.get(0);
List<Serializable> res = rcs.getResourceMappings() List<Serializable> resources = rcs.getResourceMappings()
.getAssignedResources("gpu"); .getAssignedResources("gpu");
Assert.assertTrue(res.equals(gpuRes1)); Assert.assertEquals(gpuRes1, resources);
Assert.assertTrue( Assert.assertEquals(gpuRes1, resourceMappings.getAssignedResources("gpu"));
resourceMappings.getAssignedResources("gpu").equals(gpuRes1));
res = rcs.getResourceMappings().getAssignedResources("fpga"); resources = rcs.getResourceMappings().getAssignedResources("fpga");
Assert.assertTrue(res.equals(fpgaRes)); Assert.assertEquals(fpgaRes, resources);
Assert.assertTrue( Assert.assertEquals(fpgaRes, resourceMappings.getAssignedResources("fpga"));
resourceMappings.getAssignedResources("fpga").equals(fpgaRes));
res = rcs.getResourceMappings().getAssignedResources("numa"); resources = rcs.getResourceMappings().getAssignedResources("numa");
Assert.assertTrue(res.equals(numaRes)); Assert.assertEquals(numaRes, resources);
Assert.assertTrue( Assert.assertEquals(numaRes, resourceMappings.getAssignedResources("numa"));
resourceMappings.getAssignedResources("numa").equals(numaRes));
} }
@Test @Test