Revert "YARN-9135. NM State store ResourceMappings serialization are tested with Strings instead of real Device objects. Contributed by Peter Bacsko"

This reverts commit b20fd9e212.
Commit is reverted since unnecessary files were added, accidentally.
This commit is contained in:
Szilard Nemeth 2019-08-13 15:44:50 +02:00
parent b040eb91c7
commit a762a6be29
4 changed files with 57 additions and 89 deletions

View File

@ -17,11 +17,9 @@
*/ */
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa; package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa;
import com.google.common.collect.ImmutableMap;
import java.io.Serializable; import java.io.Serializable;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
/** /**
@ -30,18 +28,27 @@ import java.util.Set;
*/ */
public class NumaResourceAllocation implements Serializable { public class NumaResourceAllocation implements Serializable {
private static final long serialVersionUID = 6339719798446595123L; private static final long serialVersionUID = 6339719798446595123L;
private final ImmutableMap<String, Long> nodeVsMemory; private Map<String, Long> nodeVsMemory;
private final ImmutableMap<String, Integer> nodeVsCpus; private Map<String, Integer> nodeVsCpus;
public NumaResourceAllocation(Map<String, Long> memoryAllocations, public NumaResourceAllocation() {
Map<String, Integer> cpuAllocations) { nodeVsMemory = new HashMap<>();
nodeVsMemory = ImmutableMap.copyOf(memoryAllocations); nodeVsCpus = new HashMap<>();
nodeVsCpus = ImmutableMap.copyOf(cpuAllocations);
} }
public NumaResourceAllocation(String memNodeId, long memory, String cpuNodeId, public NumaResourceAllocation(String memNodeId, long memory, String cpuNodeId,
int cpus) { int cpus) {
this(ImmutableMap.of(memNodeId, memory), ImmutableMap.of(cpuNodeId, cpus)); this();
nodeVsMemory.put(memNodeId, memory);
nodeVsCpus.put(cpuNodeId, cpus);
}
public void addMemoryNode(String memNodeId, long memory) {
nodeVsMemory.put(memNodeId, memory);
}
public void addCpuNode(String cpuNodeId, int cpus) {
nodeVsCpus.put(cpuNodeId, cpus);
} }
public Set<String> getMemNodes() { public Set<String> getMemNodes() {
@ -52,37 +59,11 @@ public class NumaResourceAllocation implements Serializable {
return nodeVsCpus.keySet(); return nodeVsCpus.keySet();
} }
public ImmutableMap<String, Long> getNodeVsMemory() { public Map<String, Long> getNodeVsMemory() {
return nodeVsMemory; return nodeVsMemory;
} }
public ImmutableMap<String, Integer> getNodeVsCpus() { public Map<String, Integer> getNodeVsCpus() {
return nodeVsCpus; return nodeVsCpus;
} }
}
@Override
public String toString() {
return "NumaResourceAllocation{" +
"nodeVsMemory=" + nodeVsMemory +
", nodeVsCpus=" + nodeVsCpus +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
NumaResourceAllocation that = (NumaResourceAllocation) o;
return Objects.equals(nodeVsMemory, that.nodeVsMemory) &&
Objects.equals(nodeVsCpus, that.nodeVsCpus);
}
@Override
public int hashCode() {
return Objects.hash(nodeVsMemory, nodeVsCpus);
}
}

View File

@ -31,7 +31,6 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -248,19 +247,17 @@ public class NumaResourceAllocator {
// If there is no single node matched for the container resource // If there is no single node matched for the container resource
// Check the NUMA nodes for Memory resources // Check the NUMA nodes for Memory resources
long memoryRequirement = resource.getMemorySize(); NumaResourceAllocation assignedNumaNodeInfo = new NumaResourceAllocation();
Map<String, Long> memoryAllocations = Maps.newHashMap(); long memreq = resource.getMemorySize();
for (NumaNodeResource numaNode : numaNodesList) { for (NumaNodeResource numaNode : numaNodesList) {
long memoryRemaining = numaNode. long memrem = numaNode.assignAvailableMemory(memreq, containerId);
assignAvailableMemory(memoryRequirement, containerId); assignedNumaNodeInfo.addMemoryNode(numaNode.getNodeId(), memreq - memrem);
memoryAllocations.put(numaNode.getNodeId(), memreq = memrem;
memoryRequirement - memoryRemaining); if (memreq == 0) {
memoryRequirement = memoryRemaining;
if (memoryRequirement == 0) {
break; break;
} }
} }
if (memoryRequirement != 0) { if (memreq != 0) {
LOG.info("There is no available memory:" + resource.getMemorySize() LOG.info("There is no available memory:" + resource.getMemorySize()
+ " in numa nodes for " + containerId); + " in numa nodes for " + containerId);
releaseNumaResource(containerId); releaseNumaResource(containerId);
@ -268,31 +265,26 @@ public class NumaResourceAllocator {
} }
// Check the NUMA nodes for CPU resources // Check the NUMA nodes for CPU resources
int cpusRequirement = resource.getVirtualCores(); int cpusreq = resource.getVirtualCores();
Map<String, Integer> cpuAllocations = Maps.newHashMap();
for (int index = 0; index < numaNodesList.size(); index++) { for (int index = 0; index < numaNodesList.size(); index++) {
NumaNodeResource numaNode = numaNodesList NumaNodeResource numaNode = numaNodesList
.get((currentAssignNode + index) % numaNodesList.size()); .get((currentAssignNode + index) % numaNodesList.size());
int cpusRemaining = numaNode. int cpusrem = numaNode.assignAvailableCpus(cpusreq, containerId);
assignAvailableCpus(cpusRequirement, containerId); assignedNumaNodeInfo.addCpuNode(numaNode.getNodeId(), cpusreq - cpusrem);
cpuAllocations.put(numaNode.getNodeId(), cpusRequirement - cpusRemaining); cpusreq = cpusrem;
cpusRequirement = cpusRemaining; if (cpusreq == 0) {
if (cpusRequirement == 0) {
currentAssignNode = (currentAssignNode + index + 1) currentAssignNode = (currentAssignNode + index + 1)
% numaNodesList.size(); % numaNodesList.size();
break; break;
} }
} }
if (cpusRequirement != 0) { if (cpusreq != 0) {
LOG.info("There are no available cpus:" + resource.getVirtualCores() LOG.info("There are no available cpus:" + resource.getVirtualCores()
+ " in numa nodes for " + containerId); + " in numa nodes for " + containerId);
releaseNumaResource(containerId); releaseNumaResource(containerId);
return null; return null;
} }
NumaResourceAllocation assignedNumaNodeInfo =
new NumaResourceAllocation(memoryAllocations, cpuAllocations);
LOG.info("Assigning multiple NUMA nodes (" LOG.info("Assigning multiple NUMA nodes ("
+ StringUtils.join(",", assignedNumaNodeInfo.getMemNodes()) + StringUtils.join(",", assignedNumaNodeInfo.getMemNodes())
+ ") for memory, (" + ") for memory, ("

View File

@ -1459,7 +1459,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
String keyResChng = CONTAINERS_KEY_PREFIX + container.getContainerId().toString() String keyResChng = CONTAINERS_KEY_PREFIX + container.getContainerId().toString()
+ CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType; + CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType;
try { try {
try (WriteBatch batch = db.createWriteBatch()) { WriteBatch batch = db.createWriteBatch();
try {
ResourceMappings.AssignedResources res = ResourceMappings.AssignedResources res =
new ResourceMappings.AssignedResources(); new ResourceMappings.AssignedResources();
res.updateAssignedResources(assignedResources); res.updateAssignedResources(assignedResources);
@ -1467,6 +1468,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
// New value will overwrite old values for the same key // New value will overwrite old values for the same key
batch.put(bytes(keyResChng), res.toBytes()); batch.put(bytes(keyResChng), res.toBytes());
db.write(batch); db.write(batch);
} finally {
batch.close();
} }
} catch (DBException e) { } catch (DBException e) {
markStoreUnHealthy(e); markStoreUnHealthy(e);

View File

@ -75,9 +75,6 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator.FpgaDevice;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
@ -1451,7 +1448,7 @@ public class TestNMLeveldbStateStoreService {
@Test @Test
public void testStateStoreForResourceMapping() throws IOException { public void testStateStoreForResourceMapping() throws IOException {
// test that stateStore is initially empty // test empty when no state
List<RecoveredContainerState> recoveredContainers = List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator()); loadContainersState(stateStore.getContainerStateIterator());
assertTrue(recoveredContainers.isEmpty()); assertTrue(recoveredContainers.isEmpty());
@ -1467,43 +1464,38 @@ public class TestNMLeveldbStateStoreService {
ResourceMappings resourceMappings = new ResourceMappings(); ResourceMappings resourceMappings = new ResourceMappings();
when(container.getResourceMappings()).thenReturn(resourceMappings); when(container.getResourceMappings()).thenReturn(resourceMappings);
// Store ResourceMapping
stateStore.storeAssignedResources(container, "gpu", stateStore.storeAssignedResources(container, "gpu",
Arrays.asList(new GpuDevice(1, 1), new GpuDevice(2, 2), Arrays.asList("1", "2", "3"));
new GpuDevice(3, 3))); // This will overwrite above
List<Serializable> gpuRes1 = Arrays.asList("1", "2", "4");
// This will overwrite the above
List<Serializable> gpuRes1 = Arrays.asList(
new GpuDevice(1, 1), new GpuDevice(2, 2), new GpuDevice(4, 4));
stateStore.storeAssignedResources(container, "gpu", gpuRes1); stateStore.storeAssignedResources(container, "gpu", gpuRes1);
List<Serializable> fpgaRes = Arrays.asList("3", "4", "5", "6");
List<Serializable> fpgaRes = Arrays.asList(
new FpgaDevice("testType", 3, 3, "testIPID"),
new FpgaDevice("testType", 4, 4, "testIPID"),
new FpgaDevice("testType", 5, 5, "testIPID"),
new FpgaDevice("testType", 6, 6, "testIPID"));
stateStore.storeAssignedResources(container, "fpga", fpgaRes); stateStore.storeAssignedResources(container, "fpga", fpgaRes);
List<Serializable> numaRes = Arrays.asList("numa1");
List<Serializable> numaRes = Arrays.asList(
new NumaResourceAllocation("testmemNodeId", 2048, "testCpuNodeId", 10));
stateStore.storeAssignedResources(container, "numa", numaRes); stateStore.storeAssignedResources(container, "numa", numaRes);
// add a invalid key
restartStateStore(); restartStateStore();
recoveredContainers = recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator()); loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size()); assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0); RecoveredContainerState rcs = recoveredContainers.get(0);
List<Serializable> resources = rcs.getResourceMappings() List<Serializable> res = rcs.getResourceMappings()
.getAssignedResources("gpu"); .getAssignedResources("gpu");
Assert.assertEquals(gpuRes1, resources); Assert.assertTrue(res.equals(gpuRes1));
Assert.assertEquals(gpuRes1, resourceMappings.getAssignedResources("gpu")); Assert.assertTrue(
resourceMappings.getAssignedResources("gpu").equals(gpuRes1));
resources = rcs.getResourceMappings().getAssignedResources("fpga"); res = rcs.getResourceMappings().getAssignedResources("fpga");
Assert.assertEquals(fpgaRes, resources); Assert.assertTrue(res.equals(fpgaRes));
Assert.assertEquals(fpgaRes, resourceMappings.getAssignedResources("fpga")); Assert.assertTrue(
resourceMappings.getAssignedResources("fpga").equals(fpgaRes));
resources = rcs.getResourceMappings().getAssignedResources("numa"); res = rcs.getResourceMappings().getAssignedResources("numa");
Assert.assertEquals(numaRes, resources); Assert.assertTrue(res.equals(numaRes));
Assert.assertEquals(numaRes, resourceMappings.getAssignedResources("numa")); Assert.assertTrue(
resourceMappings.getAssignedResources("numa").equals(numaRes));
} }
@Test @Test