diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java index 9e843dfe89c..eff8aa86632 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java @@ -65,6 +65,12 @@ public interface MutableConfigurationProvider { */ Configuration getConfiguration(); + /** + * Get the last updated scheduler config version. + * @return Last updated scheduler config version. + */ + long getConfigVersion() throws Exception; + void formatConfigurationInStore(Configuration conf) throws Exception; /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java index ddc5c8acdd4..14cd0a42ecb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java @@ -148,6 +148,13 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore { tempConfigPath = null; } + @Override + public long getConfigVersion() throws Exception { + String version = getLatestConfigPath().getName(). + substring(YarnConfiguration.CS_CONFIGURATION_FILE.length() + 1); + return Long.parseLong(version); + } + private void finalizeFileSystemFile() throws IOException { // call confirmMutation() make sure tempConfigPath is not null Path finalConfigPath = getFinalConfigPath(tempConfigPath); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java index 4871443e54a..59d140ec4cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java @@ -33,11 +33,13 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore { private Configuration schedConf; private LogMutation pendingMutation; + private long configVersion; @Override public void initialize(Configuration conf, Configuration schedConf, RMContext rmContext) { this.schedConf = schedConf; + this.configVersion = System.currentTimeMillis(); } @Override @@ -57,6 +59,7 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore { } } } + this.configVersion = System.currentTimeMillis(); pendingMutation = null; } @@ -70,6 +73,11 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore { return schedConf; } + @Override + public long getConfigVersion() throws Exception { + return configVersion; + } + @Override public List getConfirmedConfHistory(long fromId) { // Unimplemented. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java index 0792d7f3dc3..19614c25b12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java @@ -68,6 +68,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { private static final String DB_NAME = "yarn-conf-store"; private static final String LOG_KEY = "log"; private static final String VERSION_KEY = "version"; + private static final String CONF_VERSION_KEY = "conf-version"; private DB db; private long maxLogs; @@ -124,6 +125,10 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { return 1; } else if (key2Str.equals(LOG_KEY)) { return -1; + } else if (key1Str.equals(CONF_VERSION_KEY)) { + return 1; + } else if (key2Str.equals(CONF_VERSION_KEY)) { + return -1; } return key1Str.compareTo(key2Str); } @@ -146,6 +151,10 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { File dbfile = new File(storeRoot.toString()); try { db = JniDBFactory.factory.open(dbfile, options); + if (db.get(bytes(CONF_VERSION_KEY)) == null) { + db.put(bytes(CONF_VERSION_KEY), + bytes(String.valueOf(System.currentTimeMillis()))); + } } catch (NativeDB.DBException e) { if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { LOG.info("Creating conf database at " + dbfile); @@ -158,6 +167,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { initBatch.put(bytes(kv.getKey()), bytes(kv.getValue())); } db.write(initBatch); + db.put(bytes(CONF_VERSION_KEY), + bytes(String.valueOf(System.currentTimeMillis()))); } catch (DBException dbErr) { throw new IOException(dbErr.getMessage(), dbErr); } @@ -215,6 +226,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { } } db.write(updateBatch); + db.put(bytes(CONF_VERSION_KEY), + bytes(String.valueOf(System.currentTimeMillis()))); pendingMutation = null; } @@ -250,7 +263,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { Map.Entry entry = itr.next(); String key = new String(entry.getKey(), StandardCharsets.UTF_8); String value = new String(entry.getValue(), StandardCharsets.UTF_8); - if (key.equals(LOG_KEY) || key.equals(VERSION_KEY)) { + if (key.equals(LOG_KEY) || key.equals(VERSION_KEY) || + key.equals(CONF_VERSION_KEY)) { break; } config.set(key, value); @@ -258,6 +272,13 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { return config; } + @Override + public long getConfigVersion() throws Exception { + String version = new String(db.get(bytes(CONF_VERSION_KEY)), + StandardCharsets.UTF_8); + return Long.parseLong(version); + } + @Override public List getConfirmedConfHistory(long fromId) { return null; // unimplemented diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java index 6d153dccaa1..ba325b314c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -134,6 +134,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, return new Configuration(schedConf); } + @Override + public long getConfigVersion() throws Exception { + return confStore.getConfigVersion(); + } + @Override public ConfigurationMutationACLPolicy getAclMutationPolicy() { return aclMutationPolicy; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java index 493f38e731f..bbeacc044c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java @@ -132,6 +132,12 @@ public abstract class YarnConfigurationStore { */ public abstract void format() throws Exception; + /** + * Get the last updated config version. + * @return Last updated config version. + */ + public abstract long getConfigVersion() throws Exception; + /** * Get a list of confirmed configuration mutations starting from a given id. * @param fromId id from which to start getting mutations, inclusive diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java index 766029b8de8..063910d0100 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java @@ -62,11 +62,13 @@ public class ZKConfigurationStore extends YarnConfigurationStore { private static final String LOGS_PATH = "LOGS"; private static final String CONF_STORE_PATH = "CONF_STORE"; private static final String FENCING_PATH = "FENCING"; + private static final String CONF_VERSION_PATH = "CONF_VERSION"; private String zkVersionPath; private String logsPath; private String confStorePath; private String fencingNodePath; + private String confVersionPath; @VisibleForTesting protected ZKCuratorManager zkManager; @@ -89,6 +91,7 @@ public class ZKConfigurationStore extends YarnConfigurationStore { this.logsPath = getNodePath(znodeParentPath, LOGS_PATH); this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH); this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH); + this.confVersionPath = getNodePath(znodeParentPath, CONF_VERSION_PATH); zkManager.createRootDirRecursively(znodeParentPath, zkAcl); zkManager.delete(fencingNodePath); @@ -99,6 +102,12 @@ public class ZKConfigurationStore extends YarnConfigurationStore { serializeObject(new LinkedList()), -1); } + if (!zkManager.exists(confVersionPath)) { + zkManager.create(confVersionPath); + zkManager.setData(confVersionPath, + String.valueOf(System.currentTimeMillis()), -1); + } + if (!zkManager.exists(confStorePath)) { zkManager.create(confStorePath); HashMap mapSchedConf = new HashMap<>(); @@ -106,6 +115,8 @@ public class ZKConfigurationStore extends YarnConfigurationStore { mapSchedConf.put(entry.getKey(), entry.getValue()); } zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1); + zkManager.setData(confVersionPath, + String.valueOf(System.currentTimeMillis()), -1); } } @@ -185,6 +196,9 @@ public class ZKConfigurationStore extends YarnConfigurationStore { } zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1, zkAcl, fencingNodePath); + zkManager.setData(confVersionPath, + String.valueOf(System.currentTimeMillis()), -1); + } pendingMutation = null; } @@ -213,6 +227,11 @@ public class ZKConfigurationStore extends YarnConfigurationStore { return null; } + @Override + public long getConfigVersion() throws Exception { + return Long.parseLong(zkManager.getStringData(confVersionPath)); + } + @Override public List getConfirmedConfHistory(long fromId) { return null; // unimplemented diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java index 4479c949091..c02b3f46e71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java @@ -51,6 +51,9 @@ public final class RMWSConsts { /** Path for {@code RMWebServices#formatSchedulerConfiguration}. */ public static final String FORMAT_SCHEDULER_CONF = "/scheduler-conf/format"; + /** Path for {@code RMWebServices#getSchedulerConfigurationVersion}. */ + public static final String SCHEDULER_CONF_VERSION = "/scheduler-conf/version"; + /** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */ public static final String SCHEDULER_LOGS = "/scheduler/logs"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 56af548d4cd..facd83f1527 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -2345,7 +2345,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol { } } else { return Response.status(Status.BAD_REQUEST) - .entity("Configuration change only supported by " + + .entity("Scheduler Configuration format only supported by " + "MutableConfScheduler.").build(); } } @@ -2435,6 +2435,38 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol { } } + @GET + @Path(RMWSConsts.SCHEDULER_CONF_VERSION) + @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, + MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) + public Response getSchedulerConfigurationVersion(@Context + HttpServletRequest hsr) throws AuthorizationException { + // Only admin user is allowed to get scheduler conf version + UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); + initForWritableEndpoints(callerUGI, true); + + ResourceScheduler scheduler = rm.getResourceScheduler(); + if (scheduler instanceof MutableConfScheduler + && ((MutableConfScheduler) scheduler).isConfigurationMutable()) { + MutableConfigurationProvider mutableConfigurationProvider = + ((MutableConfScheduler) scheduler).getMutableConfProvider(); + + try { + long configVersion = mutableConfigurationProvider + .getConfigVersion(); + return Response.status(Status.OK).entity(configVersion).build(); + } catch (Exception e) { + LOG.error("Exception thrown when fetching configuration version.", e); + return Response.status(Status.BAD_REQUEST).entity(e.getMessage()) + .build(); + } + } else { + return Response.status(Status.BAD_REQUEST) + .entity("Configuration Version only supported by " + + "MutableConfScheduler.").build(); + } + } + @GET @Path(RMWSConsts.CHECK_USER_ACCESS_TO_QUEUE) @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java index 6a930e868a5..591d9781abc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java @@ -137,6 +137,21 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest { assertNull(confStore.retrieve()); } + @Test + public void testGetConfigurationVersion() throws Exception { + confStore.initialize(conf, schedConf, rmContext); + long v1 = confStore.getConfigVersion(); + Thread.sleep(2000); + Map update = new HashMap<>(); + update.put("keyver", "valver"); + YarnConfigurationStore.LogMutation mutation = + new YarnConfigurationStore.LogMutation(update, TEST_USER); + confStore.logMutation(mutation); + confStore.confirmMutation(true); + long v2 = confStore.getConfigVersion(); + assertTrue(v2 > v1); + } + @Test public void testPersistUpdatedConfiguration() throws Exception { confStore.initialize(conf, schedConf, rmContext);