YARN-9894. CapacitySchedulerPerf test for measuring hundreds of apps in a large number of queues. Contributed by Eric Payne

(cherry picked from commit 7b93575b92)
(cherry picked from commit 0707d0a0ae)
(cherry picked from commit 750fb4c321)
This commit is contained in:
Jonathan Hung 2019-12-18 13:18:11 -08:00
parent c4a8c834e5
commit 041fe5fb57
1 changed files with 136 additions and 40 deletions

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
@ -60,6 +61,9 @@ import java.util.Map;
import java.util.PriorityQueue;
import static org.apache.hadoop.yarn.util.resource.TestResourceUtils.TEST_CONF_RESET_RESOURCE_TYPES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -70,9 +74,22 @@ public class TestCapacitySchedulerPerf {
return "resource-" + idx;
}
// This test is run only when when -DRunCapacitySchedulerPerfTests=true is set
// on the command line. In addition, this test has tunables for the following:
// Number of queues: -DNumberOfQueues (default=100)
// Number of total apps: -DNumberOfApplications (default=200)
// Percentage of queues with apps: -DPercentActiveQueues (default=100)
// E.G.:
// mvn test -Dtest=TestCapacitySchedulerPerf -Dsurefire.fork.timeout=1800 \
// -DRunCapacitySchedulerPerfTests=true -DNumberOfQueues=50 \
// -DNumberOfApplications=200 -DPercentActiveQueues=100
// Note that the surefire.fork.timeout flag is added because these tests could
// take longer than the surefire timeout.
private void testUserLimitThroughputWithNumberOfResourceTypes(
int numOfResourceTypes)
int numOfResourceTypes, int numQueues, int pctActiveQueues, int appCount)
throws Exception {
Assume.assumeTrue(Boolean.valueOf(
System.getProperty("RunCapacitySchedulerPerfTests")));
if (numOfResourceTypes > 2) {
// Initialize resource map
Map<String, ResourceInformation> riMap = new HashMap<>();
@ -91,22 +108,16 @@ public class TestCapacitySchedulerPerf {
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
}
// Since this is more of a performance unit test, only run if
// RunUserLimitThroughput is set (-DRunUserLimitThroughput=true)
Assume.assumeTrue(Boolean.valueOf(
System.getProperty("RunCapacitySchedulerPerfTests")));
final int activeQueues = (int) (numQueues * (pctActiveQueues/100f));
final int totalApps = appCount + activeQueues;
// extra apps to get started with user limit
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
100.0f);
csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f);
csconf.setResourceComparator(DominantResourceCalculator.class);
createCSConfWithManyQueues(numQueues);
YarnConfiguration conf = new YarnConfiguration(csconf);
// Don't reset resource types since we have already configured resource types
// Don't reset resource types since we have already configured resource
// types
conf.setBoolean(TEST_CONF_RESET_RESOURCE_TYPES, false);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
@ -115,11 +126,16 @@ public class TestCapacitySchedulerPerf {
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
LeafQueue qb = (LeafQueue)cs.getQueue("default");
// For now make user limit large so we can activate all applications
qb.setUserLimitFactor((float)100.0);
qb.setupConfigurableCapacities();
LeafQueue[] lqs = new LeafQueue[numQueues];
for (int i = 0; i < numQueues; i++) {
String queueName = String.format("%03d", i);
LeafQueue qb = (LeafQueue)cs.getQueue(queueName);
// For now make user limit large so we can activate all applications
qb.setUserLimitFactor((float)100.0);
qb.setupConfigurableCapacities();
lqs[i] = qb;
}
SchedulerEvent addAppEvent;
SchedulerEvent addAttemptEvent;
@ -127,13 +143,12 @@ public class TestCapacitySchedulerPerf {
ApplicationSubmissionContext submissionContext =
mock(ApplicationSubmissionContext.class);
final int appCount = 100;
ApplicationId[] appids = new ApplicationId[appCount];
RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[appCount];
ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[appCount];
RMAppImpl[] apps = new RMAppImpl[appCount];
RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[appCount];
for (int i=0; i<appCount; i++) {
ApplicationId[] appids = new ApplicationId[totalApps];
RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[totalApps];
ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[totalApps];
RMAppImpl[] apps = new RMAppImpl[totalApps];
RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[totalApps];
for (int i=0; i<totalApps; i++) {
appids[i] = BuilderUtils.newApplicationId(100, i);
appAttemptIds[i] =
BuilderUtils.newApplicationAttemptId(appids[i], 1);
@ -150,34 +165,34 @@ public class TestCapacitySchedulerPerf {
when(apps[i].getCurrentAppAttempt()).thenReturn(attempts[i]);
rm.getRMContext().getRMApps().put(appids[i], apps[i]);
String queueName = lqs[i % activeQueues].getQueueName();
addAppEvent =
new AppAddedSchedulerEvent(appids[i], "default", "user1");
new AppAddedSchedulerEvent(appids[i], queueName, "user1");
cs.handle(addAppEvent);
addAttemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptIds[i], false);
cs.handle(addAttemptEvent);
}
// add nodes to cluster, so cluster has 20GB and 20 vcores
Resource nodeResource = Resource.newInstance(10 * GB, 10);
// add nodes to cluster with enough resources to satisfy all apps
Resource newResource = Resource.newInstance(totalApps * GB, totalApps);
if (numOfResourceTypes > 2) {
for (int i = 2; i < numOfResourceTypes; i++) {
nodeResource.setResourceValue(getResourceName(i), 10);
newResource.setResourceValue(getResourceName(i), totalApps);
}
}
RMNode node = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.1");
RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
cs.handle(new NodeAddedSchedulerEvent(node));
RMNode node2 = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.2");
RMNode node2 = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.2");
cs.handle(new NodeAddedSchedulerEvent(node2));
Priority u0Priority = TestUtils.createMockPriority(1);
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[appCount];
for (int i=0;i<appCount;i++) {
FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[totalApps];
for (int i=0;i<totalApps;i++) {
fiCaApps[i] =
cs.getSchedulerApplications().get(apps[i].getApplicationId())
.getCurrentAppAttempt();
@ -195,8 +210,30 @@ public class TestCapacitySchedulerPerf {
fiCaApps[i].updateResourceRequests(
Collections.singletonList(resourceRequest));
}
// Now force everything to be over user limit
qb.setUserLimitFactor((float)0.0);
// Now force everything to be at user limit
for (int i = 0; i < numQueues; i++) {
lqs[i].setUserLimitFactor((float)0.0);
}
// allocate one container for each extra apps since
// LeafQueue.canAssignToUser() checks for used > limit, not used >= limit
cs.handle(new NodeUpdateSchedulerEvent(node));
cs.handle(new NodeUpdateSchedulerEvent(node2));
// make sure only the extra apps have allocated containers
for (int i=0;i<totalApps;i++) {
boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
if (i < activeQueues) {
assertFalse(pending);
assertEquals(0,
fiCaApps[i].getTotalPendingRequestsPerPartition().size());
} else {
assertTrue(pending);
assertEquals(1*GB,
fiCaApps[i].getTotalPendingRequestsPerPartition()
.get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
}
}
// Quiet the loggers while measuring throughput
for (Enumeration<?> loggers = LogManager.getCurrentLoggers();
@ -239,27 +276,86 @@ public class TestCapacitySchedulerPerf {
}
System.out.println(
"#ResourceTypes = " + numOfResourceTypes + ". Avg of fastest " + entries
+ ": " + numerator / (timespent / entries));
+ ": " + numerator / (timespent / entries) + " ops/sec of "
+ appCount + " apps on " + pctActiveQueues + "% of " + numQueues
+ " queues.");
// make sure only the extra apps have allocated containers
for (int i=0;i<totalApps;i++) {
boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
if (i < activeQueues) {
assertFalse(pending);
assertEquals(0,
fiCaApps[i].getTotalPendingRequestsPerPartition().size());
} else {
assertTrue(pending);
assertEquals(1*GB,
fiCaApps[i].getTotalPendingRequestsPerPartition()
.get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
}
}
rm.close();
rm.stop();
}
@Test(timeout = 300000)
public void testUserLimitThroughputForTwoResources() throws Exception {
testUserLimitThroughputWithNumberOfResourceTypes(2);
testUserLimitThroughputWithNumberOfResourceTypes(2, 1, 100, 100);
}
@Test(timeout = 300000)
public void testUserLimitThroughputForThreeResources() throws Exception {
testUserLimitThroughputWithNumberOfResourceTypes(3);
testUserLimitThroughputWithNumberOfResourceTypes(3, 1, 100, 100);
}
@Test(timeout = 300000)
public void testUserLimitThroughputForFourResources() throws Exception {
testUserLimitThroughputWithNumberOfResourceTypes(4);
testUserLimitThroughputWithNumberOfResourceTypes(4, 1, 100, 100);
}
@Test(timeout = 300000)
public void testUserLimitThroughputForFiveResources() throws Exception {
testUserLimitThroughputWithNumberOfResourceTypes(5);
testUserLimitThroughputWithNumberOfResourceTypes(5, 1, 100, 100);
}
@Test(timeout = 1800000)
public void testUserLimitThroughputWithManyQueues() throws Exception {
int numQueues = Integer.getInteger("NumberOfQueues", 40);
int pctActiveQueues = Integer.getInteger("PercentActiveQueues", 100);
int appCount = Integer.getInteger("NumberOfApplications", 100);
testUserLimitThroughputWithNumberOfResourceTypes(
2, numQueues, pctActiveQueues, appCount);
}
CapacitySchedulerConfiguration createCSConfWithManyQueues(int numQueues)
throws Exception {
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
csconf.setResourceComparator(DominantResourceCalculator.class);
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
csconf.setCapacity("root.default", 0.0f);
csconf.setOffSwitchPerHeartbeatLimit(numQueues);
float capacity = 100.0f / numQueues;
String[] subQueues = new String[numQueues];
for (int i = 0; i < numQueues; i++) {
String queueName = String.format("%03d", i);
String queuePath = "root." + queueName;
subQueues[i] = queueName;
csconf.setMaximumApplicationMasterResourcePerQueuePercent(
queuePath, 100.0f);
csconf.setMaximumAMResourcePercentPerPartition(queuePath, "", 100.0f);
csconf.setCapacity(queuePath, capacity);
csconf.setUserLimitFactor(queuePath, 100.0f);
csconf.setMaximumCapacity(queuePath, 100.0f);
}
csconf.setQueues("root", subQueues);
return csconf;
}
}