YARN-5773. RM recovery too slow due to LeafQueue#activateApplications (Bibin A Chundatt via Varun Saxena)
This commit is contained in:
parent
9b6d27793d
commit
367cb029d4
|
@ -32,4 +32,6 @@ public interface CSAMContainerLaunchDiagnosticsConstants {
|
||||||
String USER_AM_RESOURCE_LIMIT_EXCEED = "User's AM resource limit exceeded. ";
|
String USER_AM_RESOURCE_LIMIT_EXCEED = "User's AM resource limit exceeded. ";
|
||||||
String LAST_NODE_PROCESSED_MSG =
|
String LAST_NODE_PROCESSED_MSG =
|
||||||
" Last Node which was processed for the application : ";
|
" Last Node which was processed for the application : ";
|
||||||
|
String CLUSTER_RESOURCE_EMPTY =
|
||||||
|
"Skipping AM assignment as cluster resource is empty. ";
|
||||||
}
|
}
|
||||||
|
|
|
@ -684,9 +684,11 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
} else {
|
} else {
|
||||||
application.updateAMContainerDiagnostics(AMState.INACTIVATED,
|
application.updateAMContainerDiagnostics(AMState.INACTIVATED,
|
||||||
CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED);
|
CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED);
|
||||||
LOG.info("Not activating application " + applicationId
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Not activating application " + applicationId
|
||||||
+ " as amIfStarted: " + amIfStarted + " exceeds amLimit: "
|
+ " as amIfStarted: " + amIfStarted + " exceeds amLimit: "
|
||||||
+ amLimit);
|
+ amLimit);
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -718,9 +720,11 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
} else {
|
} else {
|
||||||
application.updateAMContainerDiagnostics(AMState.INACTIVATED,
|
application.updateAMContainerDiagnostics(AMState.INACTIVATED,
|
||||||
CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED);
|
CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED);
|
||||||
LOG.info("Not activating application " + applicationId
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Not activating application " + applicationId
|
||||||
+ " for user: " + user + " as userAmIfStarted: "
|
+ " for user: " + user + " as userAmIfStarted: "
|
||||||
+ userAmIfStarted + " exceeds userAmLimit: " + userAMLimit);
|
+ userAmIfStarted + " exceeds userAmLimit: " + userAMLimit);
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -750,7 +754,16 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
applicationAttemptMap.put(application.getApplicationAttemptId(), application);
|
applicationAttemptMap.put(application.getApplicationAttemptId(), application);
|
||||||
|
|
||||||
// Activate applications
|
// Activate applications
|
||||||
|
if (Resources.greaterThan(resourceCalculator, lastClusterResource,
|
||||||
|
lastClusterResource, Resources.none())) {
|
||||||
activateApplications();
|
activateApplications();
|
||||||
|
} else {
|
||||||
|
application.updateAMContainerDiagnostics(AMState.INACTIVATED,
|
||||||
|
CSAMContainerLaunchDiagnosticsConstants.CLUSTER_RESOURCE_EMPTY);
|
||||||
|
LOG.info("Skipping activateApplications for "
|
||||||
|
+ application.getApplicationAttemptId()
|
||||||
|
+ " since cluster resource is " + Resources.none());
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info("Application added -" +
|
LOG.info("Application added -" +
|
||||||
" appId: " + application.getApplicationId() +
|
" appId: " + application.getApplicationId() +
|
||||||
|
|
|
@ -1627,7 +1627,7 @@ public class TestClientRMService {
|
||||||
MockRM rm = new MockRM(conf);
|
MockRM rm = new MockRM(conf);
|
||||||
rm.init(conf);
|
rm.init(conf);
|
||||||
rm.start();
|
rm.start();
|
||||||
|
rm.registerNode("host1:1234", 1024);
|
||||||
// Start app1 with appPriority 5
|
// Start app1 with appPriority 5
|
||||||
RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriority));
|
RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriority));
|
||||||
|
|
||||||
|
|
|
@ -693,16 +693,18 @@ public class TestApplicationPriority {
|
||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Before NM registration, AMResourceLimit threshold is 0. So 1st
|
// Before NM registration, AMResourceLimit threshold is 0. So no
|
||||||
// applications get activated nevertheless of AMResourceLimit threshold
|
// applications get activated.
|
||||||
// Two applications are in pending
|
Assert.assertEquals(0, defaultQueue.getNumActiveApplications());
|
||||||
Assert.assertEquals(1, defaultQueue.getNumActiveApplications());
|
|
||||||
Assert.assertEquals(2, defaultQueue.getNumPendingApplications());
|
|
||||||
|
|
||||||
// NM resync to new RM
|
// NM resync to new RM
|
||||||
nm1.registerNode();
|
nm1.registerNode();
|
||||||
dispatcher1.await();
|
dispatcher1.await();
|
||||||
|
|
||||||
|
Assert.assertEquals(2, defaultQueue.getNumActiveApplications());
|
||||||
|
Assert.assertEquals(1, defaultQueue.getNumPendingApplications());
|
||||||
|
|
||||||
|
|
||||||
// wait for activating one applications
|
// wait for activating one applications
|
||||||
count = 5;
|
count = 5;
|
||||||
while (count-- > 0) {
|
while (count-- > 0) {
|
||||||
|
|
|
@ -2931,12 +2931,14 @@ public class TestCapacityScheduler {
|
||||||
@Test(timeout = 30000)
|
@Test(timeout = 30000)
|
||||||
public void testAMUsedResource() throws Exception {
|
public void testAMUsedResource() throws Exception {
|
||||||
MockRM rm = setUpMove();
|
MockRM rm = setUpMove();
|
||||||
|
rm.registerNode("127.0.0.1:1234", 4 * GB);
|
||||||
|
|
||||||
Configuration conf = rm.getConfig();
|
Configuration conf = rm.getConfig();
|
||||||
int minAllocMb =
|
int minAllocMb =
|
||||||
conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
|
||||||
int amMemory = 50;
|
int amMemory = 50;
|
||||||
assertTrue("AM memory is greater than or equql to minAllocation",
|
assertTrue("AM memory is greater than or equal to minAllocation",
|
||||||
amMemory < minAllocMb);
|
amMemory < minAllocMb);
|
||||||
Resource minAllocResource = Resource.newInstance(minAllocMb, 1);
|
Resource minAllocResource = Resource.newInstance(minAllocMb, 1);
|
||||||
String queueName = "a1";
|
String queueName = "a1";
|
||||||
|
@ -3285,6 +3287,7 @@ public class TestCapacityScheduler {
|
||||||
private void verifyAMLimitForLeafQueue(CapacitySchedulerConfiguration config)
|
private void verifyAMLimitForLeafQueue(CapacitySchedulerConfiguration config)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
MockRM rm = setUpMove(config);
|
MockRM rm = setUpMove(config);
|
||||||
|
rm.registerNode("127.0.0.1:1234", 2 * GB);
|
||||||
|
|
||||||
String queueName = "a1";
|
String queueName = "a1";
|
||||||
String userName = "user_0";
|
String userName = "user_0";
|
||||||
|
|
Loading…
Reference in New Issue