YARN-10961. TestCapacityScheduler: reuse appHelper where feasible. Contributed by Tamas Domok

Co-authored-by: Tamas Domok <tdomok@cloudera.com>
This commit is contained in:
Tamas Domok 2021-09-21 16:13:04 +02:00 committed by GitHub
parent 9f6430c9ed
commit 8f4456d4a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 12 additions and 163 deletions

View File

@ -1185,32 +1185,7 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
cs.handle(new NodeAddedSchedulerEvent(node));
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
RMAppAttemptMetrics attemptMetric =
new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
RMAppImpl app = mock(RMAppImpl.class);
when(app.getApplicationId()).thenReturn(appId);
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
Container container = mock(Container.class);
when(attempt.getMasterContainer()).thenReturn(container);
ApplicationSubmissionContext submissionContext = mock(
ApplicationSubmissionContext.class);
when(attempt.getSubmissionContext()).thenReturn(submissionContext);
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
when(app.getCurrentAppAttempt()).thenReturn(attempt);
rm.getRMContext().getRMApps().put(appId, app);
SchedulerEvent addAppEvent =
new AppAddedSchedulerEvent(appId, "default", "user");
cs.handle(addAppEvent);
SchedulerEvent addAttemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
cs.handle(addAttemptEvent);
ApplicationAttemptId appAttemptId = appHelper(rm, cs, 100, 1, "default", "user");
// Verify the blacklist can be updated independent of requesting containers
cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), null,
@ -1251,60 +1226,8 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
cs.handle(new NodeAddedSchedulerEvent(node));
//add app begin
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
appId1, 1);
RMAppAttemptMetrics attemptMetric1 =
new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext());
RMAppImpl app1 = mock(RMAppImpl.class);
when(app1.getApplicationId()).thenReturn(appId1);
RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
Container container = mock(Container.class);
when(attempt1.getMasterContainer()).thenReturn(container);
ApplicationSubmissionContext submissionContext = mock(
ApplicationSubmissionContext.class);
when(attempt1.getSubmissionContext()).thenReturn(submissionContext);
when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1);
when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
rm.getRMContext().getRMApps().put(appId1, app1);
SchedulerEvent addAppEvent1 =
new AppAddedSchedulerEvent(appId1, "default", "user");
cs.handle(addAppEvent1);
SchedulerEvent addAttemptEvent1 =
new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
cs.handle(addAttemptEvent1);
//add app end
//add app begin
ApplicationId appId2 = BuilderUtils.newApplicationId(100, 2);
ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId(
appId2, 1);
RMAppAttemptMetrics attemptMetric2 =
new RMAppAttemptMetrics(appAttemptId2, rm.getRMContext());
RMAppImpl app2 = mock(RMAppImpl.class);
when(app2.getApplicationId()).thenReturn(appId2);
RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class);
when(attempt2.getMasterContainer()).thenReturn(container);
when(attempt2.getSubmissionContext()).thenReturn(submissionContext);
when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2);
when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2);
when(app2.getCurrentAppAttempt()).thenReturn(attempt2);
rm.getRMContext().getRMApps().put(appId2, app2);
SchedulerEvent addAppEvent2 =
new AppAddedSchedulerEvent(appId2, "default", "user");
cs.handle(addAppEvent2);
SchedulerEvent addAttemptEvent2 =
new AppAttemptAddedSchedulerEvent(appAttemptId2, false);
cs.handle(addAttemptEvent2);
//add app end
ApplicationAttemptId appAttemptId1 = appHelper(rm, cs, 100, 1, "default", "user");
ApplicationAttemptId appAttemptId2 = appHelper(rm, cs, 100, 2, "default", "user");
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@ -1326,7 +1249,7 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
//failling back to fifo (start) ordering
assertEquals(q.getOrderingPolicy().getAssignmentIterator(
IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(),
appId1.toString());
appAttemptId1.getApplicationId().toString());
//Now, allocate for app2 (this would be the first/AM allocation)
ResourceRequest r2 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory);
@ -1340,7 +1263,7 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
//Now, the first app for assignment is app2
assertEquals(q.getOrderingPolicy().getAssignmentIterator(
IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(),
appId2.toString());
appAttemptId2.getApplicationId().toString());
rm.stop();
}
@ -2850,34 +2773,9 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
SchedulerEvent addNode = new NodeAddedSchedulerEvent(node);
sch.handle(addNode);
// create appid
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
RMAppAttemptMetrics attemptMetric =
new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
RMAppImpl app = mock(RMAppImpl.class);
when(app.getApplicationId()).thenReturn(appId);
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
Container container = mock(Container.class);
when(attempt.getMasterContainer()).thenReturn(container);
ApplicationSubmissionContext submissionContext =
mock(ApplicationSubmissionContext.class);
when(attempt.getSubmissionContext()).thenReturn(submissionContext);
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
when(app.getCurrentAppAttempt()).thenReturn(attempt);
ApplicationAttemptId appAttemptId = appHelper(rm, sch, 100, 1, "a1", "user");
rm.getRMContext().getRMApps().put(appId, app);
// Add application
SchedulerEvent addAppEvent =
new AppAddedSchedulerEvent(appId, "a1", "user");
sch.handle(addAppEvent);
// Add application attempt
SchedulerEvent addAttemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
sch.handle(addAttemptEvent);
// get Queues
CSQueue queueA1 = sch.getQueue("a1");
CSQueue queueB = sch.getQueue("b");
@ -2908,7 +2806,7 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId,
RMAppAttemptState.KILLED, true));
// Move application to queue b1
sch.moveApplication(appId, "b1");
sch.moveApplication(appAttemptId.getApplicationId(), "b1");
// Check queue metrics after move
Assert.assertEquals(0, queueA1.getNumApplications());
Assert.assertEquals(1, queueB.getNumApplications());
@ -2916,7 +2814,7 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
// Release attempt add event
ApplicationAttemptId appAttemptId2 =
BuilderUtils.newApplicationAttemptId(appId, 2);
BuilderUtils.newApplicationAttemptId(appAttemptId.getApplicationId(), 2);
SchedulerEvent addAttemptEvent2 =
new AppAttemptAddedSchedulerEvent(appAttemptId2, true);
sch.handle(addAttemptEvent2);
@ -4259,57 +4157,8 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
LeafQueue qb = (LeafQueue)cs.getQueue("default");
qb.setUserLimitFactor((float)0.8);
// add app 1
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
RMAppAttemptMetrics attemptMetric =
new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
RMAppImpl app = mock(RMAppImpl.class);
when(app.getApplicationId()).thenReturn(appId);
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
Container container = mock(Container.class);
when(attempt.getMasterContainer()).thenReturn(container);
ApplicationSubmissionContext submissionContext = mock(
ApplicationSubmissionContext.class);
when(attempt.getSubmissionContext()).thenReturn(submissionContext);
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
when(app.getCurrentAppAttempt()).thenReturn(attempt);
rm.getRMContext().getRMApps().put(appId, app);
SchedulerEvent addAppEvent =
new AppAddedSchedulerEvent(appId, "default", "user1");
cs.handle(addAppEvent);
SchedulerEvent addAttemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
cs.handle(addAttemptEvent);
// add app 2
ApplicationId appId2 = BuilderUtils.newApplicationId(100, 2);
ApplicationAttemptId appAttemptId2 =
BuilderUtils.newApplicationAttemptId(appId2, 1);
RMAppAttemptMetrics attemptMetric2 =
new RMAppAttemptMetrics(appAttemptId2, rm.getRMContext());
RMAppImpl app2 = mock(RMAppImpl.class);
when(app2.getApplicationId()).thenReturn(appId2);
RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class);
when(attempt2.getMasterContainer()).thenReturn(container);
when(attempt2.getSubmissionContext()).thenReturn(submissionContext);
when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2);
when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2);
when(app2.getCurrentAppAttempt()).thenReturn(attempt2);
rm.getRMContext().getRMApps().put(appId2, app2);
addAppEvent =
new AppAddedSchedulerEvent(appId2, "default", "user2");
cs.handle(addAppEvent);
addAttemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId2, false);
cs.handle(addAttemptEvent);
ApplicationAttemptId appAttemptId = appHelper(rm, cs, 100, 1, "default", "user1");
ApplicationAttemptId appAttemptId2 = appHelper(rm, cs, 100, 2, "default", "user2");
// add nodes to cluster, so cluster have 20GB and 20 vcores
Resource newResource = Resource.newInstance(10 * GB, 10);
@ -4321,11 +4170,11 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
cs.handle(new NodeAddedSchedulerEvent(node2));
FiCaSchedulerApp fiCaApp1 =
cs.getSchedulerApplications().get(app.getApplicationId())
cs.getSchedulerApplications().get(appAttemptId.getApplicationId())
.getCurrentAppAttempt();
FiCaSchedulerApp fiCaApp2 =
cs.getSchedulerApplications().get(app2.getApplicationId())
cs.getSchedulerApplications().get(appAttemptId2.getApplicationId())
.getCurrentAppAttempt();
Priority u0Priority = TestUtils.createMockPriority(1);
RecordFactory recordFactory =