YARN-8453. Additional Unit tests to verify queue limit and max-limit with multiple resource types. Contributed by Adam Antal
This commit is contained in:
parent
d1b30e8450
commit
b10fdd136a
|
@ -19,12 +19,21 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
|
@ -32,6 +41,7 @@ import org.junit.Assert;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -41,29 +51,43 @@ import java.util.Map;
|
|||
public class TestCapacitySchedulerWithMultiResourceTypes {
|
||||
private static String RESOURCE_1 = "res1";
|
||||
|
||||
@Test
|
||||
public void testMaximumAllocationRefreshWithMultipleResourceTypes() throws Exception {
|
||||
private static final String A_QUEUE = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||
private static final String B_QUEUE = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||
private static float A_CAPACITY = 50.0f;
|
||||
private static float B_CAPACITY = 50.0f;
|
||||
|
||||
private void setupResources(boolean withGpu) {
|
||||
// Initialize resource map
|
||||
Map<String, ResourceInformation> riMap = new HashMap<>();
|
||||
|
||||
// Initialize mandatory resources
|
||||
ResourceInformation memory = ResourceInformation.newInstance(
|
||||
ResourceInformation.MEMORY_MB.getName(),
|
||||
ResourceInformation.MEMORY_MB.getUnits(),
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||
ResourceInformation.MEMORY_MB.getName(),
|
||||
ResourceInformation.MEMORY_MB.getUnits(),
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||
ResourceInformation vcores = ResourceInformation.newInstance(
|
||||
ResourceInformation.VCORES.getName(),
|
||||
ResourceInformation.VCORES.getUnits(),
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
||||
ResourceInformation.VCORES.getName(),
|
||||
ResourceInformation.VCORES.getUnits(),
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
||||
riMap.put(ResourceInformation.MEMORY_URI, memory);
|
||||
riMap.put(ResourceInformation.VCORES_URI, vcores);
|
||||
riMap.put(RESOURCE_1, ResourceInformation.newInstance(RESOURCE_1, "", 0,
|
||||
ResourceTypes.COUNTABLE, 0, 3333L));
|
||||
if (withGpu) {
|
||||
riMap.put(ResourceInformation.GPU_URI,
|
||||
ResourceInformation.newInstance(ResourceInformation.GPU_URI, "", 0,
|
||||
ResourceTypes.COUNTABLE, 0, 3333L));
|
||||
} else {
|
||||
riMap.put(RESOURCE_1, ResourceInformation.newInstance(RESOURCE_1, "", 0,
|
||||
ResourceTypes.COUNTABLE, 0, 3333L));
|
||||
}
|
||||
|
||||
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaximumAllocationRefreshWithMultipleResourceTypes() throws Exception {
|
||||
setupResources(false);
|
||||
|
||||
CapacitySchedulerConfiguration csconf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
|
@ -213,4 +237,110 @@ public class TestCapacitySchedulerWithMultiResourceTypes {
|
|||
|
||||
Assert.assertTrue("Should have exception in CS", exception);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxLimitsOfQueueWithMultipleResources() throws Exception {
|
||||
setupResources(true);
|
||||
|
||||
int GB = 1024;
|
||||
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
csConf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
|
||||
csConf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
|
||||
csConf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
|
||||
100.0f);
|
||||
csConf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f);
|
||||
csConf.setResourceComparator(DominantResourceCalculator.class);
|
||||
csConf.set(YarnConfiguration.RESOURCE_TYPES, ResourceInformation.GPU_URI);
|
||||
|
||||
// Define top-level queues
|
||||
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||
new String[] {"a", "b"});
|
||||
|
||||
// Set each queue to consider 50% each.
|
||||
csConf.setCapacity(A_QUEUE, A_CAPACITY);
|
||||
csConf.setCapacity(B_QUEUE, B_CAPACITY);
|
||||
csConf.setMaximumCapacity(A_QUEUE, 100.0f);
|
||||
csConf.setUserLimitFactor(A_QUEUE, 2);
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration(csConf);
|
||||
// Don't reset resource types since we have already configured resource
|
||||
// types
|
||||
conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false);
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
|
||||
Map<String, Integer> nameToValues = new HashMap<>();
|
||||
nameToValues.put(ResourceInformation.GPU_URI, 4);
|
||||
// Register NM1 with 10GB memory, 4 CPU and 4 GPU
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234",
|
||||
TestUtils.createResource(10 * GB, 4, nameToValues));
|
||||
|
||||
nameToValues.clear();
|
||||
// Register NM2 with 10GB memory, 4 CPU and 0 GPU
|
||||
rm.registerNode("127.0.0.1:1235",
|
||||
TestUtils.createResource(10 * GB, 4, nameToValues));
|
||||
|
||||
RMApp app1 = rm.submitApp(1024, "app-1", "user1", null, "a");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||
|
||||
SchedulerNodeReport report_nm1 =
|
||||
rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
|
||||
|
||||
// check node report
|
||||
Assert.assertEquals(1 * GB, report_nm1.getUsedResource().getMemorySize());
|
||||
Assert.assertEquals(9 * GB,
|
||||
report_nm1.getAvailableResource().getMemorySize());
|
||||
Assert.assertEquals(0, report_nm1.getUsedResource()
|
||||
.getResourceInformation(ResourceInformation.GPU_URI).getValue());
|
||||
Assert.assertEquals(4, report_nm1.getAvailableResource()
|
||||
.getResourceInformation(ResourceInformation.GPU_URI).getValue());
|
||||
|
||||
nameToValues.put(ResourceInformation.GPU_URI, 4);
|
||||
Resource containerGpuResource =
|
||||
TestUtils.createResource(1 * GB, 1, nameToValues);
|
||||
|
||||
// Allocate one container which takes all 4 GPU
|
||||
am1.allocate(
|
||||
Collections.singletonList(ResourceRequest.newInstance(
|
||||
Priority.newInstance(1), "*", containerGpuResource, 1)), null);
|
||||
ContainerId containerId2 =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
Assert.assertTrue(rm.waitForState(nm1, containerId2,
|
||||
RMContainerState.ALLOCATED));
|
||||
// Acquire this container
|
||||
am1.allocate(null, null);
|
||||
|
||||
report_nm1 =
|
||||
rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
|
||||
Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize());
|
||||
Assert.assertEquals(4, report_nm1.getUsedResource()
|
||||
.getResourceInformation(ResourceInformation.GPU_URI).getValue());
|
||||
Assert.assertEquals(0, report_nm1.getAvailableResource()
|
||||
.getResourceInformation(ResourceInformation.GPU_URI).getValue());
|
||||
|
||||
nameToValues.clear();
|
||||
Resource containerResource =
|
||||
TestUtils.createResource(1 * GB, 1, nameToValues);
|
||||
// Allocate one more container which doesnt need GPU
|
||||
am1.allocate(
|
||||
Collections.singletonList(ResourceRequest.newInstance(
|
||||
Priority.newInstance(1), "*", containerResource, 1)), null);
|
||||
ContainerId containerId3 =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
|
||||
Assert.assertTrue(rm.waitForState(nm1, containerId3,
|
||||
RMContainerState.ALLOCATED));
|
||||
|
||||
report_nm1 =
|
||||
rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
|
||||
Assert.assertEquals(3 * GB, report_nm1.getUsedResource().getMemorySize());
|
||||
Assert.assertEquals(4, report_nm1.getUsedResource()
|
||||
.getResourceInformation(ResourceInformation.GPU_URI).getValue());
|
||||
Assert.assertEquals(0, report_nm1.getAvailableResource()
|
||||
.getResourceInformation(ResourceInformation.GPU_URI).getValue());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue