YARN-9900. Revert to previous state when Invalid Config is applied and Refresh Support in SchedulerConfig Format. Contributed by Prabhu Joseph.

(cherry picked from commit 090f73a9aa)
This commit is contained in:
Sunil G 2019-10-16 18:14:31 +05:30
parent 0bbd48c7a8
commit 9672b81fa3
4 changed files with 57 additions and 3 deletions

View File

@ -73,6 +73,8 @@ public interface MutableConfigurationProvider {
void formatConfigurationInStore(Configuration conf) throws Exception; void formatConfigurationInStore(Configuration conf) throws Exception;
void revertToOldConfig(Configuration config) throws Exception;
/** /**
* Closes the configuration provider, releasing any required resources. * Closes the configuration provider, releasing any required resources.
* @throws IOException on failure to close * @throws IOException on failure to close

View File

@ -166,6 +166,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
formatLock.writeLock().lock(); formatLock.writeLock().lock();
try { try {
confStore.format(); confStore.format();
oldConf = new Configuration(schedConf);
Configuration initialSchedConf = new Configuration(false); Configuration initialSchedConf = new Configuration(false);
initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE); initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE);
this.schedConf = new Configuration(false); this.schedConf = new Configuration(false);
@ -184,6 +185,21 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
} }
} }
@Override
public void revertToOldConfig(Configuration config) throws Exception {
formatLock.writeLock().lock();
try {
schedConf = oldConf;
confStore.format();
confStore.initialize(config, oldConf, rmContext);
confStore.checkVersion();
} catch (Exception e) {
throw new IOException(e);
} finally {
formatLock.writeLock().unlock();
}
}
@Override @Override
public void confirmPendingMutation(boolean isValid) throws Exception { public void confirmPendingMutation(boolean isValid) throws Exception {
formatLock.readLock().lock(); formatLock.readLock().lock();

View File

@ -2337,6 +2337,13 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
MutableConfigurationProvider mutableConfigurationProvider = MutableConfigurationProvider mutableConfigurationProvider =
((MutableConfScheduler) scheduler).getMutableConfProvider(); ((MutableConfScheduler) scheduler).getMutableConfProvider();
mutableConfigurationProvider.formatConfigurationInStore(conf); mutableConfigurationProvider.formatConfigurationInStore(conf);
try {
rm.getRMContext().getRMAdminService().refreshQueues();
} catch (IOException | YarnException e) {
LOG.error("Exception thrown when formatting configuration.", e);
mutableConfigurationProvider.revertToOldConfig(conf);
throw e;
}
return Response.status(Status.OK).entity("Configuration under " + return Response.status(Status.OK).entity("Configuration under " +
"store successfully formatted.").build(); "store successfully formatted.").build();
} catch (Exception e) { } catch (Exception e) {

View File

@ -191,15 +191,44 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
@Test @Test
public void testFormatSchedulerConf() throws Exception { public void testFormatSchedulerConf() throws Exception {
testAddNestedQueue(); CapacitySchedulerConfiguration newConf = getSchedulerConf();
assertNotNull(newConf);
assertEquals(3, newConf.getQueues("root").length);
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
Map<String, String> nearEmptyCapacity = new HashMap<>();
nearEmptyCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "1E-4");
QueueConfigInfo d = new QueueConfigInfo("root.formattest",
nearEmptyCapacity);
updateInfo.getAddQueueInfo().add(d);
Map<String, String> stoppedParam = new HashMap<>();
stoppedParam.put(CapacitySchedulerConfiguration.STATE,
QueueState.STOPPED.toString());
QueueConfigInfo stoppedInfo = new QueueConfigInfo("root.formattest",
stoppedParam);
updateInfo.getUpdateQueueInfo().add(stoppedInfo);
// Add a queue root.formattest to the existing three queues
WebResource r = resource(); WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster") ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("scheduler-conf").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
.entity(YarnWebServiceUtils.toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
newConf = getSchedulerConf();
assertNotNull(newConf);
assertEquals(4, newConf.getQueues("root").length);
// Format the scheduler config and validate root.formattest is not present
response = r.path("ws").path("v1").path("cluster")
.queryParam("user.name", userName) .queryParam("user.name", userName)
.path(RMWSConsts.FORMAT_SCHEDULER_CONF) .path(RMWSConsts.FORMAT_SCHEDULER_CONF)
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals(Status.OK.getStatusCode(), response.getStatus());
CapacitySchedulerConfiguration orgConf = getSchedulerConf(); newConf = getSchedulerConf();
assertEquals(3, orgConf.getQueues("root").length); assertEquals(3, newConf.getQueues("root").length);
} }
private long getConfigVersion() throws Exception { private long getConfigVersion() throws Exception {