YARN-9873. Mutation API Config Change updates Version Number. Contributed by Prabhu Joseph

(cherry picked from commit 4510970e2f)
This commit is contained in:
Sunil G 2019-10-04 21:49:07 +05:30
parent 702572434c
commit 3a0afcfb7f
10 changed files with 124 additions and 2 deletions

View File

@ -65,6 +65,12 @@ public interface MutableConfigurationProvider {
*/ */
Configuration getConfiguration(); Configuration getConfiguration();
/**
* Get the last updated scheduler config version.
* @return Last updated scheduler config version.
*/
long getConfigVersion() throws Exception;
void formatConfigurationInStore(Configuration conf) throws Exception; void formatConfigurationInStore(Configuration conf) throws Exception;
/** /**

View File

@ -148,6 +148,13 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
tempConfigPath = null; tempConfigPath = null;
} }
@Override
public long getConfigVersion() throws Exception {
String version = getLatestConfigPath().getName().
substring(YarnConfiguration.CS_CONFIGURATION_FILE.length() + 1);
return Long.parseLong(version);
}
private void finalizeFileSystemFile() throws IOException { private void finalizeFileSystemFile() throws IOException {
// call confirmMutation() make sure tempConfigPath is not null // call confirmMutation() make sure tempConfigPath is not null
Path finalConfigPath = getFinalConfigPath(tempConfigPath); Path finalConfigPath = getFinalConfigPath(tempConfigPath);

View File

@ -33,11 +33,13 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
private Configuration schedConf; private Configuration schedConf;
private LogMutation pendingMutation; private LogMutation pendingMutation;
private long configVersion;
@Override @Override
public void initialize(Configuration conf, Configuration schedConf, public void initialize(Configuration conf, Configuration schedConf,
RMContext rmContext) { RMContext rmContext) {
this.schedConf = schedConf; this.schedConf = schedConf;
this.configVersion = System.currentTimeMillis();
} }
@Override @Override
@ -57,6 +59,7 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
} }
} }
} }
this.configVersion = System.currentTimeMillis();
pendingMutation = null; pendingMutation = null;
} }
@ -70,6 +73,11 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
return schedConf; return schedConf;
} }
@Override
public long getConfigVersion() throws Exception {
return configVersion;
}
@Override @Override
public List<LogMutation> getConfirmedConfHistory(long fromId) { public List<LogMutation> getConfirmedConfHistory(long fromId) {
// Unimplemented. // Unimplemented.

View File

@ -68,6 +68,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
private static final String DB_NAME = "yarn-conf-store"; private static final String DB_NAME = "yarn-conf-store";
private static final String LOG_KEY = "log"; private static final String LOG_KEY = "log";
private static final String VERSION_KEY = "version"; private static final String VERSION_KEY = "version";
private static final String CONF_VERSION_KEY = "conf-version";
private DB db; private DB db;
private long maxLogs; private long maxLogs;
@ -124,6 +125,10 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
return 1; return 1;
} else if (key2Str.equals(LOG_KEY)) { } else if (key2Str.equals(LOG_KEY)) {
return -1; return -1;
} else if (key1Str.equals(CONF_VERSION_KEY)) {
return 1;
} else if (key2Str.equals(CONF_VERSION_KEY)) {
return -1;
} }
return key1Str.compareTo(key2Str); return key1Str.compareTo(key2Str);
} }
@ -146,6 +151,10 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
File dbfile = new File(storeRoot.toString()); File dbfile = new File(storeRoot.toString());
try { try {
db = JniDBFactory.factory.open(dbfile, options); db = JniDBFactory.factory.open(dbfile, options);
if (db.get(bytes(CONF_VERSION_KEY)) == null) {
db.put(bytes(CONF_VERSION_KEY),
bytes(String.valueOf(System.currentTimeMillis())));
}
} catch (NativeDB.DBException e) { } catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
LOG.info("Creating conf database at " + dbfile); LOG.info("Creating conf database at " + dbfile);
@ -158,6 +167,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
initBatch.put(bytes(kv.getKey()), bytes(kv.getValue())); initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
} }
db.write(initBatch); db.write(initBatch);
db.put(bytes(CONF_VERSION_KEY),
bytes(String.valueOf(System.currentTimeMillis())));
} catch (DBException dbErr) { } catch (DBException dbErr) {
throw new IOException(dbErr.getMessage(), dbErr); throw new IOException(dbErr.getMessage(), dbErr);
} }
@ -215,6 +226,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
} }
} }
db.write(updateBatch); db.write(updateBatch);
db.put(bytes(CONF_VERSION_KEY),
bytes(String.valueOf(System.currentTimeMillis())));
pendingMutation = null; pendingMutation = null;
} }
@ -250,7 +263,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
Map.Entry<byte[], byte[]> entry = itr.next(); Map.Entry<byte[], byte[]> entry = itr.next();
String key = new String(entry.getKey(), StandardCharsets.UTF_8); String key = new String(entry.getKey(), StandardCharsets.UTF_8);
String value = new String(entry.getValue(), StandardCharsets.UTF_8); String value = new String(entry.getValue(), StandardCharsets.UTF_8);
if (key.equals(LOG_KEY) || key.equals(VERSION_KEY)) { if (key.equals(LOG_KEY) || key.equals(VERSION_KEY) ||
key.equals(CONF_VERSION_KEY)) {
break; break;
} }
config.set(key, value); config.set(key, value);
@ -258,6 +272,13 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
return config; return config;
} }
@Override
public long getConfigVersion() throws Exception {
String version = new String(db.get(bytes(CONF_VERSION_KEY)),
StandardCharsets.UTF_8);
return Long.parseLong(version);
}
@Override @Override
public List<LogMutation> getConfirmedConfHistory(long fromId) { public List<LogMutation> getConfirmedConfHistory(long fromId) {
return null; // unimplemented return null; // unimplemented

View File

@ -134,6 +134,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
return new Configuration(schedConf); return new Configuration(schedConf);
} }
@Override
public long getConfigVersion() throws Exception {
return confStore.getConfigVersion();
}
@Override @Override
public ConfigurationMutationACLPolicy getAclMutationPolicy() { public ConfigurationMutationACLPolicy getAclMutationPolicy() {
return aclMutationPolicy; return aclMutationPolicy;

View File

@ -132,6 +132,12 @@ public abstract class YarnConfigurationStore {
*/ */
public abstract void format() throws Exception; public abstract void format() throws Exception;
/**
* Get the last updated config version.
* @return Last updated config version.
*/
public abstract long getConfigVersion() throws Exception;
/** /**
* Get a list of confirmed configuration mutations starting from a given id. * Get a list of confirmed configuration mutations starting from a given id.
* @param fromId id from which to start getting mutations, inclusive * @param fromId id from which to start getting mutations, inclusive

View File

@ -62,11 +62,13 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
private static final String LOGS_PATH = "LOGS"; private static final String LOGS_PATH = "LOGS";
private static final String CONF_STORE_PATH = "CONF_STORE"; private static final String CONF_STORE_PATH = "CONF_STORE";
private static final String FENCING_PATH = "FENCING"; private static final String FENCING_PATH = "FENCING";
private static final String CONF_VERSION_PATH = "CONF_VERSION";
private String zkVersionPath; private String zkVersionPath;
private String logsPath; private String logsPath;
private String confStorePath; private String confStorePath;
private String fencingNodePath; private String fencingNodePath;
private String confVersionPath;
@VisibleForTesting @VisibleForTesting
protected ZKCuratorManager zkManager; protected ZKCuratorManager zkManager;
@ -89,6 +91,7 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
this.logsPath = getNodePath(znodeParentPath, LOGS_PATH); this.logsPath = getNodePath(znodeParentPath, LOGS_PATH);
this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH); this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH);
this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH); this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH);
this.confVersionPath = getNodePath(znodeParentPath, CONF_VERSION_PATH);
zkManager.createRootDirRecursively(znodeParentPath, zkAcl); zkManager.createRootDirRecursively(znodeParentPath, zkAcl);
zkManager.delete(fencingNodePath); zkManager.delete(fencingNodePath);
@ -99,6 +102,12 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
serializeObject(new LinkedList<LogMutation>()), -1); serializeObject(new LinkedList<LogMutation>()), -1);
} }
if (!zkManager.exists(confVersionPath)) {
zkManager.create(confVersionPath);
zkManager.setData(confVersionPath,
String.valueOf(System.currentTimeMillis()), -1);
}
if (!zkManager.exists(confStorePath)) { if (!zkManager.exists(confStorePath)) {
zkManager.create(confStorePath); zkManager.create(confStorePath);
HashMap<String, String> mapSchedConf = new HashMap<>(); HashMap<String, String> mapSchedConf = new HashMap<>();
@ -106,6 +115,8 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
mapSchedConf.put(entry.getKey(), entry.getValue()); mapSchedConf.put(entry.getKey(), entry.getValue());
} }
zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1); zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1);
zkManager.setData(confVersionPath,
String.valueOf(System.currentTimeMillis()), -1);
} }
} }
@ -185,6 +196,9 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
} }
zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1, zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1,
zkAcl, fencingNodePath); zkAcl, fencingNodePath);
zkManager.setData(confVersionPath,
String.valueOf(System.currentTimeMillis()), -1);
} }
pendingMutation = null; pendingMutation = null;
} }
@ -213,6 +227,11 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
return null; return null;
} }
@Override
public long getConfigVersion() throws Exception {
return Long.parseLong(zkManager.getStringData(confVersionPath));
}
@Override @Override
public List<LogMutation> getConfirmedConfHistory(long fromId) { public List<LogMutation> getConfirmedConfHistory(long fromId) {
return null; // unimplemented return null; // unimplemented

View File

@ -51,6 +51,9 @@ public final class RMWSConsts {
/** Path for {@code RMWebServices#formatSchedulerConfiguration}. */ /** Path for {@code RMWebServices#formatSchedulerConfiguration}. */
public static final String FORMAT_SCHEDULER_CONF = "/scheduler-conf/format"; public static final String FORMAT_SCHEDULER_CONF = "/scheduler-conf/format";
/** Path for {@code RMWebServices#getSchedulerConfigurationVersion}. */
public static final String SCHEDULER_CONF_VERSION = "/scheduler-conf/version";
/** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */ /** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */
public static final String SCHEDULER_LOGS = "/scheduler/logs"; public static final String SCHEDULER_LOGS = "/scheduler/logs";

View File

@ -2345,7 +2345,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
} }
} else { } else {
return Response.status(Status.BAD_REQUEST) return Response.status(Status.BAD_REQUEST)
.entity("Configuration change only supported by " + .entity("Scheduler Configuration format only supported by " +
"MutableConfScheduler.").build(); "MutableConfScheduler.").build();
} }
} }
@ -2435,6 +2435,38 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
} }
} }
@GET
@Path(RMWSConsts.SCHEDULER_CONF_VERSION)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
public Response getSchedulerConfigurationVersion(@Context
HttpServletRequest hsr) throws AuthorizationException {
// Only admin user is allowed to get scheduler conf version
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, true);
ResourceScheduler scheduler = rm.getResourceScheduler();
if (scheduler instanceof MutableConfScheduler
&& ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
MutableConfigurationProvider mutableConfigurationProvider =
((MutableConfScheduler) scheduler).getMutableConfProvider();
try {
long configVersion = mutableConfigurationProvider
.getConfigVersion();
return Response.status(Status.OK).entity(configVersion).build();
} catch (Exception e) {
LOG.error("Exception thrown when fetching configuration version.", e);
return Response.status(Status.BAD_REQUEST).entity(e.getMessage())
.build();
}
} else {
return Response.status(Status.BAD_REQUEST)
.entity("Configuration Version only supported by "
+ "MutableConfScheduler.").build();
}
}
@GET @GET
@Path(RMWSConsts.CHECK_USER_ACCESS_TO_QUEUE) @Path(RMWSConsts.CHECK_USER_ACCESS_TO_QUEUE)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,

View File

@ -137,6 +137,21 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
assertNull(confStore.retrieve()); assertNull(confStore.retrieve());
} }
@Test
public void testGetConfigurationVersion() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
long v1 = confStore.getConfigVersion();
Thread.sleep(2000);
Map<String, String> update = new HashMap<>();
update.put("keyver", "valver");
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
confStore.confirmMutation(true);
long v2 = confStore.getConfigVersion();
assertTrue(v2 > v1);
}
@Test @Test
public void testPersistUpdatedConfiguration() throws Exception { public void testPersistUpdatedConfiguration() throws Exception {
confStore.initialize(conf, schedConf, rmContext); confStore.initialize(conf, schedConf, rmContext);