YARN-11014. YARN incorrectly validates maximum capacity resources on the validation API. Contributed by Benjamin Teke

Change-Id: I5505e1b8aaa394dfac31dade7aed6013e0279adc
This commit is contained in:
Szilard Nemeth 2022-03-02 14:23:00 +01:00 committed by Steve Loughran
parent a981df3aec
commit cb40b7d741
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
3 changed files with 284 additions and 4 deletions

View File

@ -2056,6 +2056,22 @@ public class CapacityScheduler extends
} }
} }
/**
* Add node to nodeTracker. Used when validating CS configuration by instantiating a new
* CS instance.
* @param nodesToAdd node to be added
*/
public void addNodes(List<FiCaSchedulerNode> nodesToAdd) {
writeLock.lock();
try {
for (FiCaSchedulerNode node : nodesToAdd) {
nodeTracker.addNode(node);
}
} finally {
writeLock.unlock();
}
}
private void addNode(RMNode nodeManager) { private void addNode(RMNode nodeManager) {
writeLock.lock(); writeLock.lock();
try { try {

View File

@ -42,6 +42,7 @@ public final class CapacitySchedulerConfigValidator {
public static boolean validateCSConfiguration( public static boolean validateCSConfiguration(
final Configuration oldConf, final Configuration newConf, final Configuration oldConf, final Configuration newConf,
final RMContext rmContext) throws IOException { final RMContext rmContext) throws IOException {
CapacityScheduler liveScheduler = (CapacityScheduler) rmContext.getScheduler();
CapacityScheduler newCs = new CapacityScheduler(); CapacityScheduler newCs = new CapacityScheduler();
try { try {
//TODO: extract all the validation steps and replace reinitialize with //TODO: extract all the validation steps and replace reinitialize with
@ -49,6 +50,7 @@ public final class CapacitySchedulerConfigValidator {
newCs.setConf(oldConf); newCs.setConf(oldConf);
newCs.setRMContext(rmContext); newCs.setRMContext(rmContext);
newCs.init(oldConf); newCs.init(oldConf);
newCs.addNodes(liveScheduler.getAllNodes());
newCs.reinitialize(newConf, rmContext, true); newCs.reinitialize(newConf, rmContext, true);
return true; return true;
} finally { } finally {

View File

@ -19,13 +19,23 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.impl.LightWeightResource; import org.apache.hadoop.yarn.api.records.impl.LightWeightResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -34,9 +44,71 @@ import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
public class TestCapacitySchedulerConfigValidator { public class TestCapacitySchedulerConfigValidator {
public static final int NODE_MEMORY = 16;
public static final int NODE1_VCORES = 8;
public static final int NODE2_VCORES = 10;
public static final int NODE3_VCORES = 12;
public static final Map<String, Long> NODE_GPU = ImmutableMap.of(GPU_URI, 2L);
public static final int GB = 1024;
private static final String PARENT_A = "parentA";
private static final String PARENT_B = "parentB";
private static final String LEAF_A = "leafA";
private static final String LEAF_B = "leafB";
private static final String PARENT_A_FULL_PATH = CapacitySchedulerConfiguration.ROOT
+ "." + PARENT_A;
private static final String LEAF_A_FULL_PATH = PARENT_A_FULL_PATH
+ "." + LEAF_A;
private static final String PARENT_B_FULL_PATH = CapacitySchedulerConfiguration.ROOT
+ "." + PARENT_B;
private static final String LEAF_B_FULL_PATH = PARENT_B_FULL_PATH
+ "." + LEAF_B;
private final Resource A_MINRES = Resource.newInstance(16 * GB, 10);
private final Resource B_MINRES = Resource.newInstance(32 * GB, 5);
private final Resource FULL_MAXRES = Resource.newInstance(48 * GB, 30);
private final Resource PARTIAL_MAXRES = Resource.newInstance(16 * GB, 10);
private final Resource VCORE_EXCEEDED_MAXRES = Resource.newInstance(16 * GB, 50);
private Resource A_MINRES_GPU;
private Resource B_MINRES_GPU;
private Resource FULL_MAXRES_GPU;
private Resource PARTIAL_MAXRES_GPU;
private Resource GPU_EXCEEDED_MAXRES_GPU;
protected MockRM mockRM = null;
protected MockNM nm1 = null;
protected MockNM nm2 = null;
protected MockNM nm3 = null;
protected CapacityScheduler cs;
public static void setupResources(boolean useGpu) {
Map<String, ResourceInformation> riMap = new HashMap<>();
ResourceInformation memory = ResourceInformation.newInstance(
ResourceInformation.MEMORY_MB.getName(),
ResourceInformation.MEMORY_MB.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
ResourceInformation vcores = ResourceInformation.newInstance(
ResourceInformation.VCORES.getName(),
ResourceInformation.VCORES.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
riMap.put(ResourceInformation.MEMORY_URI, memory);
riMap.put(ResourceInformation.VCORES_URI, vcores);
if (useGpu) {
riMap.put(ResourceInformation.GPU_URI,
ResourceInformation.newInstance(ResourceInformation.GPU_URI, "", 0,
ResourceTypes.COUNTABLE, 0, 10L));
}
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
}
/** /**
* Test for the case when the scheduler.minimum-allocation-mb == 0. * Test for the case when the scheduler.minimum-allocation-mb == 0.
@ -69,7 +141,6 @@ public class TestCapacitySchedulerConfigValidator {
} }
@Test @Test
public void testValidateMemoryAllocation() { public void testValidateMemoryAllocation() {
Map<String, String> configs = new HashMap(); Map<String, String> configs = new HashMap();
@ -115,7 +186,6 @@ public class TestCapacitySchedulerConfigValidator {
} }
@Test @Test
public void testValidateVCores() { public void testValidateVCores() {
Map<String, String> configs = new HashMap(); Map<String, String> configs = new HashMap();
@ -147,6 +217,106 @@ public class TestCapacitySchedulerConfigValidator {
} }
} }
@Test
public void testValidateCSConfigDefaultRCAbsoluteModeParentMaxMemoryExceeded()
throws Exception {
setUpMockRM(false);
RMContext rmContext = mockRM.getRMContext();
CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
CapacitySchedulerConfiguration newConfiguration =
new CapacitySchedulerConfiguration(cs.getConfiguration());
newConfiguration.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, FULL_MAXRES);
try {
CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
fail("Parent maximum capacity exceeded");
} catch (IOException e) {
Assert.assertTrue(e.getCause().getMessage()
.startsWith("Max resource configuration"));
} finally {
mockRM.stop();
}
}
@Test
public void testValidateCSConfigDefaultRCAbsoluteModeParentMaxVcoreExceeded() throws Exception {
setUpMockRM(false);
RMContext rmContext = mockRM.getRMContext();
CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
CapacitySchedulerConfiguration newConfiguration =
new CapacitySchedulerConfiguration(cs.getConfiguration());
newConfiguration.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, VCORE_EXCEEDED_MAXRES);
try {
CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
} catch (IOException e) {
fail("In DefaultResourceCalculator vcore limits are not enforced");
} finally {
mockRM.stop();
}
}
@Test
public void testValidateCSConfigDominantRCAbsoluteModeParentMaxMemoryExceeded()
throws Exception {
setUpMockRM(true);
RMContext rmContext = mockRM.getRMContext();
CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
CapacitySchedulerConfiguration newConfiguration =
new CapacitySchedulerConfiguration(cs.getConfiguration());
newConfiguration.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, FULL_MAXRES);
try {
CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
fail("Parent maximum capacity exceeded");
} catch (IOException e) {
Assert.assertTrue(e.getCause().getMessage()
.startsWith("Max resource configuration"));
} finally {
mockRM.stop();
}
}
@Test
public void testValidateCSConfigDominantRCAbsoluteModeParentMaxVcoreExceeded() throws Exception {
setUpMockRM(true);
RMContext rmContext = mockRM.getRMContext();
CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
CapacitySchedulerConfiguration newConfiguration =
new CapacitySchedulerConfiguration(cs.getConfiguration());
newConfiguration.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, VCORE_EXCEEDED_MAXRES);
try {
CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
fail("Parent maximum capacity exceeded");
} catch (IOException e) {
Assert.assertTrue(e.getCause().getMessage()
.startsWith("Max resource configuration"));
} finally {
mockRM.stop();
}
}
@Test
public void testValidateCSConfigDominantRCAbsoluteModeParentMaxGPUExceeded() throws Exception {
setUpMockRM(true);
RMContext rmContext = mockRM.getRMContext();
CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
CapacitySchedulerConfiguration newConfiguration =
new CapacitySchedulerConfiguration(cs.getConfiguration());
newConfiguration.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, GPU_EXCEEDED_MAXRES_GPU);
try {
CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
fail("Parent maximum capacity exceeded");
} catch (IOException e) {
Assert.assertTrue(e.getCause().getMessage()
.startsWith("Max resource configuration"));
} finally {
mockRM.stop();
}
}
@Test @Test
public void testValidateCSConfigStopALeafQueue() throws IOException { public void testValidateCSConfigStopALeafQueue() throws IOException {
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
@ -155,7 +325,7 @@ public class TestCapacitySchedulerConfigValidator {
newConfig newConfig
.set("yarn.scheduler.capacity.root.test1.state", "STOPPED"); .set("yarn.scheduler.capacity.root.test1.state", "STOPPED");
RMContext rmContext = prepareRMContext(); RMContext rmContext = prepareRMContext();
Boolean isValidConfig = CapacitySchedulerConfigValidator boolean isValidConfig = CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfig, newConfig, rmContext); .validateCSConfiguration(oldConfig, newConfig, rmContext);
Assert.assertTrue(isValidConfig); Assert.assertTrue(isValidConfig);
} }
@ -340,9 +510,11 @@ public class TestCapacitySchedulerConfigValidator {
Assert.assertTrue(isValidConfig); Assert.assertTrue(isValidConfig);
} }
public static RMContext prepareRMContext() { public static RMContext prepareRMContext() {
setupResources(false);
RMContext rmContext = Mockito.mock(RMContext.class); RMContext rmContext = Mockito.mock(RMContext.class);
CapacityScheduler mockCs = Mockito.mock(CapacityScheduler.class);
Mockito.when(rmContext.getScheduler()).thenReturn(mockCs);
LocalConfigurationProvider configProvider = Mockito LocalConfigurationProvider configProvider = Mockito
.mock(LocalConfigurationProvider.class); .mock(LocalConfigurationProvider.class);
Mockito.when(rmContext.getConfigurationProvider()) Mockito.when(rmContext.getConfigurationProvider())
@ -361,4 +533,94 @@ public class TestCapacitySchedulerConfigValidator {
.thenReturn(queuePlacementManager); .thenReturn(queuePlacementManager);
return rmContext; return rmContext;
} }
private void setUpMockRM(boolean useDominantRC) throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
setupResources(useDominantRC);
CapacitySchedulerConfiguration csConf = setupCSConfiguration(conf, useDominantRC);
mockRM = new MockRM(csConf);
cs = (CapacityScheduler) mockRM.getResourceScheduler();
mockRM.start();
cs.start();
setupNodes(mockRM);
}
private void setupNodes(MockRM newMockRM) throws Exception {
nm1 = new MockNM("h1:1234",
Resource.newInstance(NODE_MEMORY * GB, NODE1_VCORES, NODE_GPU),
newMockRM.getResourceTrackerService(),
YarnVersionInfo.getVersion());
nm1.registerNode();
nm2 = new MockNM("h2:1234",
Resource.newInstance(NODE_MEMORY * GB, NODE2_VCORES, NODE_GPU),
newMockRM.getResourceTrackerService(),
YarnVersionInfo.getVersion());
nm2.registerNode();
nm3 = new MockNM("h3:1234",
Resource.newInstance(NODE_MEMORY * GB, NODE3_VCORES, NODE_GPU),
newMockRM.getResourceTrackerService(),
YarnVersionInfo.getVersion());
nm3.registerNode();
}
private void setupGpuResourceValues() {
A_MINRES_GPU = Resource.newInstance(A_MINRES.getMemorySize(), A_MINRES.getVirtualCores(),
ImmutableMap.of(GPU_URI, 2L));
B_MINRES_GPU = Resource.newInstance(B_MINRES.getMemorySize(), B_MINRES.getVirtualCores(),
ImmutableMap.of(GPU_URI, 2L));
FULL_MAXRES_GPU = Resource.newInstance(FULL_MAXRES.getMemorySize(),
FULL_MAXRES.getVirtualCores(), ImmutableMap.of(GPU_URI, 6L));
PARTIAL_MAXRES_GPU = Resource.newInstance(PARTIAL_MAXRES.getMemorySize(),
PARTIAL_MAXRES.getVirtualCores(), ImmutableMap.of(GPU_URI, 4L));
GPU_EXCEEDED_MAXRES_GPU = Resource.newInstance(PARTIAL_MAXRES.getMemorySize(),
PARTIAL_MAXRES.getVirtualCores(), ImmutableMap.of(GPU_URI, 50L));
}
private CapacitySchedulerConfiguration setupCSConfiguration(YarnConfiguration configuration,
boolean useDominantRC) {
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(configuration);
if (useDominantRC) {
csConf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class.getName());
csConf.set(YarnConfiguration.RESOURCE_TYPES, ResourceInformation.GPU_URI);
}
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[]{PARENT_A, PARENT_B});
csConf.setQueues(PARENT_A_FULL_PATH, new String[]{LEAF_A});
csConf.setQueues(PARENT_B_FULL_PATH, new String[]{LEAF_B});
if (useDominantRC) {
setupGpuResourceValues();
csConf.setMinimumResourceRequirement("", PARENT_A_FULL_PATH, A_MINRES_GPU);
csConf.setMinimumResourceRequirement("", PARENT_B_FULL_PATH, B_MINRES_GPU);
csConf.setMinimumResourceRequirement("", LEAF_A_FULL_PATH, A_MINRES_GPU);
csConf.setMinimumResourceRequirement("", LEAF_B_FULL_PATH, B_MINRES_GPU);
csConf.setMaximumResourceRequirement("", PARENT_A_FULL_PATH, PARTIAL_MAXRES_GPU);
csConf.setMaximumResourceRequirement("", PARENT_B_FULL_PATH, FULL_MAXRES_GPU);
csConf.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, PARTIAL_MAXRES_GPU);
csConf.setMaximumResourceRequirement("", LEAF_B_FULL_PATH, FULL_MAXRES_GPU);
} else {
csConf.setMinimumResourceRequirement("", PARENT_A_FULL_PATH, A_MINRES);
csConf.setMinimumResourceRequirement("", PARENT_B_FULL_PATH, B_MINRES);
csConf.setMinimumResourceRequirement("", LEAF_A_FULL_PATH, A_MINRES);
csConf.setMinimumResourceRequirement("", LEAF_B_FULL_PATH, B_MINRES);
csConf.setMaximumResourceRequirement("", PARENT_A_FULL_PATH, PARTIAL_MAXRES);
csConf.setMaximumResourceRequirement("", PARENT_B_FULL_PATH, FULL_MAXRES);
csConf.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, PARTIAL_MAXRES);
csConf.setMaximumResourceRequirement("", LEAF_B_FULL_PATH, FULL_MAXRES);
}
return csConf;
}
} }