YARN-2308. Changed CapacityScheduler to explicitly throw exception if the queue
to which the apps were submitted is changed across RM restart. Contributed by Craig Welch & Chang Li
(cherry picked from commit f9680d9a16
)
This commit is contained in:
parent
9d73c1b15f
commit
29496c9be2
|
@ -648,6 +648,19 @@ public class CapacityScheduler extends
|
|||
// sanity checks.
|
||||
CSQueue queue = getQueue(queueName);
|
||||
if (queue == null) {
|
||||
//During a restart, this indicates a queue was removed, which is
|
||||
//not presently supported
|
||||
if (isAppRecovering) {
|
||||
//throwing RuntimeException because some other exceptions are caught
|
||||
//(including YarnRuntimeException) and we want this to force an exit
|
||||
String queueErrorMsg = "Queue named " + queueName
|
||||
+ " missing during application recovery."
|
||||
+ " Queue removal during recovery is not presently supported by the"
|
||||
+ " capacity scheduler, please restart with all queues configured"
|
||||
+ " which were present before shutdown/restart.";
|
||||
LOG.fatal(queueErrorMsg);
|
||||
throw new RuntimeException(queueErrorMsg);
|
||||
}
|
||||
String message = "Application " + applicationId +
|
||||
" submitted by user " + user + " to unknown queue: " + queueName;
|
||||
this.rmContext.getDispatcher().getEventHandler()
|
||||
|
|
|
@ -249,6 +249,15 @@ public class MockRM extends ResourceManager {
|
|||
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
|
||||
}
|
||||
|
||||
public RMApp submitApp(int masterMemory, String name, String user,
|
||||
Map<ApplicationAccessType, String> acls, String queue,
|
||||
boolean waitForAccepted) throws Exception {
|
||||
return submitApp(masterMemory, name, user, acls, false, queue,
|
||||
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
|
||||
waitForAccepted);
|
||||
}
|
||||
|
||||
public RMApp submitApp(int masterMemory, String name, String user,
|
||||
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
||||
|
|
|
@ -336,6 +336,8 @@ public class TestWorkPreservingRMRestart {
|
|||
private static final String R = "Default";
|
||||
private static final String A = "QueueA";
|
||||
private static final String B = "QueueB";
|
||||
//don't ever create the below queue ;-)
|
||||
private static final String QUEUE_DOESNT_EXIST = "NoSuchQueue";
|
||||
private static final String USER_1 = "user1";
|
||||
private static final String USER_2 = "user2";
|
||||
|
||||
|
@ -351,6 +353,18 @@ public class TestWorkPreservingRMRestart {
|
|||
conf.setDouble(CapacitySchedulerConfiguration
|
||||
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f);
|
||||
}
|
||||
|
||||
private void setupQueueConfigurationOnlyA(
|
||||
CapacitySchedulerConfiguration conf) {
|
||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { R });
|
||||
final String Q_R = CapacitySchedulerConfiguration.ROOT + "." + R;
|
||||
conf.setCapacity(Q_R, 100);
|
||||
final String Q_A = Q_R + "." + A;
|
||||
conf.setQueues(Q_R, new String[] {A});
|
||||
conf.setCapacity(Q_A, 100);
|
||||
conf.setDouble(CapacitySchedulerConfiguration
|
||||
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 1.0f);
|
||||
}
|
||||
|
||||
// Test CS recovery with multi-level queues and multi-users:
|
||||
// 1. setup 2 NMs each with 8GB memory;
|
||||
|
@ -470,6 +484,70 @@ public class TestWorkPreservingRMRestart {
|
|||
totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(),
|
||||
totalUsedResource.getVirtualCores());
|
||||
}
|
||||
|
||||
//Test that we receive a meaningful exit-causing exception if a queue
|
||||
//is removed during recovery
|
||||
//1. Add some apps to two queues, attempt to add an app to a non-existant
|
||||
// queue to verify that the new logic is not in effect during normal app
|
||||
// submission
|
||||
//2. Remove one of the queues, restart the RM
|
||||
//3. Verify that the expected exception was thrown
|
||||
@Test (timeout = 30000)
|
||||
public void testCapacitySchedulerQueueRemovedRecovery() throws Exception {
|
||||
if (!schedulerClass.equals(CapacityScheduler.class)) {
|
||||
return;
|
||||
}
|
||||
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
|
||||
conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
||||
DominantResourceCalculator.class.getName());
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration(conf);
|
||||
setupQueueConfiguration(csConf);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(csConf);
|
||||
rm1 = new MockRM(csConf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
MockNM nm2 =
|
||||
new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
nm2.registerNode();
|
||||
RMApp app1_1 = rm1.submitApp(1024, "app1_1", USER_1, null, A);
|
||||
MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1);
|
||||
RMApp app1_2 = rm1.submitApp(1024, "app1_2", USER_1, null, A);
|
||||
MockAM am1_2 = MockRM.launchAndRegisterAM(app1_2, rm1, nm2);
|
||||
|
||||
RMApp app2 = rm1.submitApp(1024, "app2", USER_2, null, B);
|
||||
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
|
||||
|
||||
//Submit an app with a non existant queue to make sure it does not
|
||||
//cause a fatal failure in the non-recovery case
|
||||
RMApp appNA = rm1.submitApp(1024, "app1_2", USER_1, null,
|
||||
QUEUE_DOESNT_EXIST, false);
|
||||
|
||||
// clear queue metrics
|
||||
rm1.clearQueueMetrics(app1_1);
|
||||
rm1.clearQueueMetrics(app1_2);
|
||||
rm1.clearQueueMetrics(app2);
|
||||
|
||||
// Re-start RM
|
||||
csConf =
|
||||
new CapacitySchedulerConfiguration(conf);
|
||||
setupQueueConfigurationOnlyA(csConf);
|
||||
rm2 = new MockRM(csConf, memStore);
|
||||
boolean runtimeThrown = false;
|
||||
try {
|
||||
rm2.start();
|
||||
} catch (RuntimeException e) {
|
||||
//we're catching it because we want to verify the message
|
||||
//and we don't want to set it as an expected exception for the
|
||||
//test because we only want it to happen here
|
||||
assertTrue(e.getMessage().contains(B + " missing"));
|
||||
runtimeThrown = true;
|
||||
}
|
||||
assertTrue(runtimeThrown);
|
||||
}
|
||||
|
||||
private void checkParentQueue(ParentQueue parentQueue, int numContainers,
|
||||
Resource usedResource, float UsedCapacity, float absoluteUsedCapacity) {
|
||||
|
|
Loading…
Reference in New Issue