YARN-9873. Mutation API Config Change need to update Version Number. Contributed by Prabhu Joseph
(cherry picked from commit be901f4962
)
This commit is contained in:
parent
a12ab911ee
commit
9cb3ab058f
|
@ -65,6 +65,12 @@ public interface MutableConfigurationProvider {
|
||||||
*/
|
*/
|
||||||
Configuration getConfiguration();
|
Configuration getConfiguration();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the last updated scheduler config version.
|
||||||
|
* @return Last updated scheduler config version.
|
||||||
|
*/
|
||||||
|
long getConfigVersion() throws Exception;
|
||||||
|
|
||||||
void formatConfigurationInStore(Configuration conf) throws Exception;
|
void formatConfigurationInStore(Configuration conf) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -29,6 +29,7 @@ import com.google.gson.GsonBuilder;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -62,6 +63,7 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
|
||||||
private volatile Configuration schedConf;
|
private volatile Configuration schedConf;
|
||||||
private volatile Configuration oldConf;
|
private volatile Configuration oldConf;
|
||||||
private Path tempConfigPath;
|
private Path tempConfigPath;
|
||||||
|
private Path configVersionFile;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize(Configuration conf, Configuration vSchedConf,
|
public void initialize(Configuration conf, Configuration vSchedConf,
|
||||||
|
@ -99,9 +101,17 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.configVersionFile = new Path(schedulerConfPathStr, "ConfigVersion");
|
||||||
|
if (!fileSystem.exists(configVersionFile)) {
|
||||||
|
fileSystem.createNewFile(configVersionFile);
|
||||||
|
writeConfigVersion(0L);
|
||||||
|
}
|
||||||
|
|
||||||
// create capacity-schedule.xml.ts file if not existing
|
// create capacity-schedule.xml.ts file if not existing
|
||||||
if (this.getConfigFileInputStream() == null) {
|
if (this.getConfigFileInputStream() == null) {
|
||||||
writeConfigurationToFileSystem(vSchedConf);
|
writeConfigurationToFileSystem(vSchedConf);
|
||||||
|
long configVersion = getConfigVersion() + 1L;
|
||||||
|
writeConfigVersion(configVersion);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.schedConf = this.getConfigurationFromFileSystem();
|
this.schedConf = this.getConfigurationFromFileSystem();
|
||||||
|
@ -141,6 +151,8 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
|
||||||
}
|
}
|
||||||
if (isValid) {
|
if (isValid) {
|
||||||
finalizeFileSystemFile();
|
finalizeFileSystemFile();
|
||||||
|
long configVersion = getConfigVersion() + 1L;
|
||||||
|
writeConfigVersion(configVersion);
|
||||||
} else {
|
} else {
|
||||||
schedConf = oldConf;
|
schedConf = oldConf;
|
||||||
removeTmpConfigFile();
|
removeTmpConfigFile();
|
||||||
|
@ -158,7 +170,15 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void format() throws Exception {
|
public void format() throws Exception {
|
||||||
fileSystem.delete(schedulerConfDir, true);
|
FileStatus[] fileStatuses = fileSystem.listStatus(this.schedulerConfDir,
|
||||||
|
this.configFilePathFilter);
|
||||||
|
if (fileStatuses == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (int i = 0; i < fileStatuses.length; i++) {
|
||||||
|
fileSystem.delete(fileStatuses[i].getPath(), false);
|
||||||
|
LOG.info("delete config file " + fileStatuses[i].getPath());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Path getFinalConfigPath(Path tempPath) {
|
private Path getFinalConfigPath(Path tempPath) {
|
||||||
|
@ -222,6 +242,27 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
|
||||||
return fileStatuses[fileStatuses.length - 1].getPath();
|
return fileStatuses[fileStatuses.length - 1].getPath();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void writeConfigVersion(long configVersion) throws IOException {
|
||||||
|
try (FSDataOutputStream out = fileSystem.create(configVersionFile, true)) {
|
||||||
|
out.writeLong(configVersion);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.info("Failed to write config version at {}", configVersionFile, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getConfigVersion() throws Exception {
|
||||||
|
try (FSDataInputStream in = fileSystem.open(configVersionFile)) {
|
||||||
|
return in.readLong();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.info("Failed to read config version at {}", configVersionFile, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
private Path writeTmpConfig(Configuration vSchedConf) throws IOException {
|
private Path writeTmpConfig(Configuration vSchedConf) throws IOException {
|
||||||
long start = Time.monotonicNow();
|
long start = Time.monotonicNow();
|
||||||
|
|
|
@ -33,11 +33,13 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
|
||||||
|
|
||||||
private Configuration schedConf;
|
private Configuration schedConf;
|
||||||
private LogMutation pendingMutation;
|
private LogMutation pendingMutation;
|
||||||
|
private long configVersion;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize(Configuration conf, Configuration schedConf,
|
public void initialize(Configuration conf, Configuration schedConf,
|
||||||
RMContext rmContext) {
|
RMContext rmContext) {
|
||||||
this.schedConf = schedConf;
|
this.schedConf = schedConf;
|
||||||
|
this.configVersion = 1L;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -56,6 +58,7 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
|
||||||
schedConf.set(kv.getKey(), kv.getValue());
|
schedConf.set(kv.getKey(), kv.getValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
this.configVersion = this.configVersion + 1L;
|
||||||
}
|
}
|
||||||
pendingMutation = null;
|
pendingMutation = null;
|
||||||
}
|
}
|
||||||
|
@ -70,6 +73,11 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
|
||||||
return schedConf;
|
return schedConf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getConfigVersion() {
|
||||||
|
return configVersion;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<LogMutation> getConfirmedConfHistory(long fromId) {
|
public List<LogMutation> getConfirmedConfHistory(long fromId) {
|
||||||
// Unimplemented.
|
// Unimplemented.
|
||||||
|
|
|
@ -68,8 +68,11 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
||||||
private static final String DB_NAME = "yarn-conf-store";
|
private static final String DB_NAME = "yarn-conf-store";
|
||||||
private static final String LOG_KEY = "log";
|
private static final String LOG_KEY = "log";
|
||||||
private static final String VERSION_KEY = "version";
|
private static final String VERSION_KEY = "version";
|
||||||
|
private static final String CONF_VERSION_NAME = "conf-version-store";
|
||||||
|
private static final String CONF_VERSION_KEY = "conf-version";
|
||||||
|
|
||||||
private DB db;
|
private DB db;
|
||||||
|
private DB versiondb;
|
||||||
private long maxLogs;
|
private long maxLogs;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private LogMutation pendingMutation;
|
private LogMutation pendingMutation;
|
||||||
|
@ -102,11 +105,11 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
||||||
public void format() throws Exception {
|
public void format() throws Exception {
|
||||||
close();
|
close();
|
||||||
FileSystem fs = FileSystem.getLocal(conf);
|
FileSystem fs = FileSystem.getLocal(conf);
|
||||||
fs.delete(getStorageDir(), true);
|
fs.delete(getStorageDir(DB_NAME), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initDatabase(Configuration config) throws Exception {
|
private void initDatabase(Configuration config) throws Exception {
|
||||||
Path storeRoot = createStorageDir();
|
Path storeRoot = createStorageDir(DB_NAME);
|
||||||
Options options = new Options();
|
Options options = new Options();
|
||||||
options.createIfMissing(false);
|
options.createIfMissing(false);
|
||||||
options.comparator(new DBComparator() {
|
options.comparator(new DBComparator() {
|
||||||
|
@ -142,6 +145,29 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
Path confVersion = createStorageDir(CONF_VERSION_NAME);
|
||||||
|
Options confOptions = new Options();
|
||||||
|
confOptions.createIfMissing(false);
|
||||||
|
LOG.info("Using conf version at " + confVersion);
|
||||||
|
File confVersionFile = new File(confVersion.toString());
|
||||||
|
try {
|
||||||
|
versiondb = JniDBFactory.factory.open(confVersionFile, confOptions);
|
||||||
|
} catch (NativeDB.DBException e) {
|
||||||
|
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
|
||||||
|
LOG.info("Creating conf version at " + confVersionFile);
|
||||||
|
confOptions.createIfMissing(true);
|
||||||
|
try {
|
||||||
|
versiondb = JniDBFactory.factory.open(confVersionFile, confOptions);
|
||||||
|
versiondb.put(bytes(CONF_VERSION_KEY), bytes(String.valueOf(0)));
|
||||||
|
} catch (DBException dbErr) {
|
||||||
|
throw new IOException(dbErr.getMessage(), dbErr);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
LOG.info("Using conf database at " + storeRoot);
|
LOG.info("Using conf database at " + storeRoot);
|
||||||
File dbfile = new File(storeRoot.toString());
|
File dbfile = new File(storeRoot.toString());
|
||||||
try {
|
try {
|
||||||
|
@ -158,6 +184,9 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
||||||
initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
|
initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
|
||||||
}
|
}
|
||||||
db.write(initBatch);
|
db.write(initBatch);
|
||||||
|
long configVersion = getConfigVersion() + 1L;
|
||||||
|
versiondb.put(bytes(CONF_VERSION_KEY),
|
||||||
|
bytes(String.valueOf(configVersion)));
|
||||||
} catch (DBException dbErr) {
|
} catch (DBException dbErr) {
|
||||||
throw new IOException(dbErr.getMessage(), dbErr);
|
throw new IOException(dbErr.getMessage(), dbErr);
|
||||||
}
|
}
|
||||||
|
@ -167,20 +196,20 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Path createStorageDir() throws IOException {
|
private Path createStorageDir(String storageName) throws IOException {
|
||||||
Path root = getStorageDir();
|
Path root = getStorageDir(storageName);
|
||||||
FileSystem fs = FileSystem.getLocal(conf);
|
FileSystem fs = FileSystem.getLocal(conf);
|
||||||
fs.mkdirs(root, new FsPermission((short) 0700));
|
fs.mkdirs(root, new FsPermission((short) 0700));
|
||||||
return root;
|
return root;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Path getStorageDir() throws IOException {
|
private Path getStorageDir(String storageName) throws IOException {
|
||||||
String storePath = conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
|
String storePath = conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
|
||||||
if (storePath == null) {
|
if (storePath == null) {
|
||||||
throw new IOException("No store location directory configured in " +
|
throw new IOException("No store location directory configured in " +
|
||||||
YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
|
YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
|
||||||
}
|
}
|
||||||
return new Path(storePath, DB_NAME);
|
return new Path(storePath, storageName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -188,6 +217,9 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
||||||
if (db != null) {
|
if (db != null) {
|
||||||
db.close();
|
db.close();
|
||||||
}
|
}
|
||||||
|
if (versiondb != null) {
|
||||||
|
versiondb.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -213,6 +245,9 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
||||||
updateBatch.put(bytes(changes.getKey()), bytes(changes.getValue()));
|
updateBatch.put(bytes(changes.getKey()), bytes(changes.getValue()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
long configVersion = getConfigVersion() + 1L;
|
||||||
|
versiondb.put(bytes(CONF_VERSION_KEY),
|
||||||
|
bytes(String.valueOf(configVersion)));
|
||||||
}
|
}
|
||||||
db.write(updateBatch);
|
db.write(updateBatch);
|
||||||
pendingMutation = null;
|
pendingMutation = null;
|
||||||
|
@ -258,6 +293,13 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getConfigVersion() {
|
||||||
|
String version = new String(versiondb.get(bytes(CONF_VERSION_KEY)),
|
||||||
|
StandardCharsets.UTF_8);
|
||||||
|
return Long.parseLong(version);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<LogMutation> getConfirmedConfHistory(long fromId) {
|
public List<LogMutation> getConfirmedConfHistory(long fromId) {
|
||||||
return null; // unimplemented
|
return null; // unimplemented
|
||||||
|
|
|
@ -134,6 +134,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
||||||
return new Configuration(schedConf);
|
return new Configuration(schedConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getConfigVersion() throws Exception {
|
||||||
|
return confStore.getConfigVersion();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ConfigurationMutationACLPolicy getAclMutationPolicy() {
|
public ConfigurationMutationACLPolicy getAclMutationPolicy() {
|
||||||
return aclMutationPolicy;
|
return aclMutationPolicy;
|
||||||
|
|
|
@ -132,6 +132,12 @@ public abstract class YarnConfigurationStore {
|
||||||
*/
|
*/
|
||||||
public abstract void format() throws Exception;
|
public abstract void format() throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the last updated config version.
|
||||||
|
* @return Last updated config version.
|
||||||
|
*/
|
||||||
|
public abstract long getConfigVersion() throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a list of confirmed configuration mutations starting from a given id.
|
* Get a list of confirmed configuration mutations starting from a given id.
|
||||||
* @param fromId id from which to start getting mutations, inclusive
|
* @param fromId id from which to start getting mutations, inclusive
|
||||||
|
|
|
@ -62,11 +62,13 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
||||||
private static final String LOGS_PATH = "LOGS";
|
private static final String LOGS_PATH = "LOGS";
|
||||||
private static final String CONF_STORE_PATH = "CONF_STORE";
|
private static final String CONF_STORE_PATH = "CONF_STORE";
|
||||||
private static final String FENCING_PATH = "FENCING";
|
private static final String FENCING_PATH = "FENCING";
|
||||||
|
private static final String CONF_VERSION_PATH = "CONF_VERSION";
|
||||||
|
|
||||||
private String zkVersionPath;
|
private String zkVersionPath;
|
||||||
private String logsPath;
|
private String logsPath;
|
||||||
private String confStorePath;
|
private String confStorePath;
|
||||||
private String fencingNodePath;
|
private String fencingNodePath;
|
||||||
|
private String confVersionPath;
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected ZKCuratorManager zkManager;
|
protected ZKCuratorManager zkManager;
|
||||||
|
@ -89,6 +91,7 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
||||||
this.logsPath = getNodePath(znodeParentPath, LOGS_PATH);
|
this.logsPath = getNodePath(znodeParentPath, LOGS_PATH);
|
||||||
this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH);
|
this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH);
|
||||||
this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH);
|
this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH);
|
||||||
|
this.confVersionPath = getNodePath(znodeParentPath, CONF_VERSION_PATH);
|
||||||
|
|
||||||
zkManager.createRootDirRecursively(znodeParentPath, zkAcl);
|
zkManager.createRootDirRecursively(znodeParentPath, zkAcl);
|
||||||
zkManager.delete(fencingNodePath);
|
zkManager.delete(fencingNodePath);
|
||||||
|
@ -99,6 +102,11 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
||||||
serializeObject(new LinkedList<LogMutation>()), -1);
|
serializeObject(new LinkedList<LogMutation>()), -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!zkManager.exists(confVersionPath)) {
|
||||||
|
zkManager.create(confVersionPath);
|
||||||
|
zkManager.setData(confVersionPath, String.valueOf(0), -1);
|
||||||
|
}
|
||||||
|
|
||||||
if (!zkManager.exists(confStorePath)) {
|
if (!zkManager.exists(confStorePath)) {
|
||||||
zkManager.create(confStorePath);
|
zkManager.create(confStorePath);
|
||||||
HashMap<String, String> mapSchedConf = new HashMap<>();
|
HashMap<String, String> mapSchedConf = new HashMap<>();
|
||||||
|
@ -106,6 +114,8 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
||||||
mapSchedConf.put(entry.getKey(), entry.getValue());
|
mapSchedConf.put(entry.getKey(), entry.getValue());
|
||||||
}
|
}
|
||||||
zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1);
|
zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1);
|
||||||
|
long configVersion = getConfigVersion() + 1L;
|
||||||
|
zkManager.setData(confVersionPath, String.valueOf(configVersion), -1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,6 +195,9 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
||||||
}
|
}
|
||||||
zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1,
|
zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1,
|
||||||
zkAcl, fencingNodePath);
|
zkAcl, fencingNodePath);
|
||||||
|
long configVersion = getConfigVersion() + 1L;
|
||||||
|
zkManager.setData(confVersionPath, String.valueOf(configVersion), -1);
|
||||||
|
|
||||||
}
|
}
|
||||||
pendingMutation = null;
|
pendingMutation = null;
|
||||||
}
|
}
|
||||||
|
@ -213,6 +226,11 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getConfigVersion() throws Exception {
|
||||||
|
return Long.parseLong(zkManager.getStringData(confVersionPath));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<LogMutation> getConfirmedConfHistory(long fromId) {
|
public List<LogMutation> getConfirmedConfHistory(long fromId) {
|
||||||
return null; // unimplemented
|
return null; // unimplemented
|
||||||
|
|
|
@ -51,6 +51,9 @@ public final class RMWSConsts {
|
||||||
/** Path for {@code RMWebServices#formatSchedulerConfiguration}. */
|
/** Path for {@code RMWebServices#formatSchedulerConfiguration}. */
|
||||||
public static final String FORMAT_SCHEDULER_CONF = "/scheduler-conf/format";
|
public static final String FORMAT_SCHEDULER_CONF = "/scheduler-conf/format";
|
||||||
|
|
||||||
|
/** Path for {@code RMWebServices#getSchedulerConfigurationVersion}. */
|
||||||
|
public static final String SCHEDULER_CONF_VERSION = "/scheduler-conf/version";
|
||||||
|
|
||||||
/** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */
|
/** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */
|
||||||
public static final String SCHEDULER_LOGS = "/scheduler/logs";
|
public static final String SCHEDULER_LOGS = "/scheduler/logs";
|
||||||
|
|
||||||
|
|
|
@ -188,6 +188,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ConfigVersionInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ConfInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ConfInfo;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
@ -2345,7 +2346,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Response.status(Status.BAD_REQUEST)
|
return Response.status(Status.BAD_REQUEST)
|
||||||
.entity("Configuration change only supported by " +
|
.entity("Scheduler Configuration format only supported by " +
|
||||||
"MutableConfScheduler.").build();
|
"MutableConfScheduler.").build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2435,6 +2436,39 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@GET
|
||||||
|
@Path(RMWSConsts.SCHEDULER_CONF_VERSION)
|
||||||
|
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||||
|
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||||
|
public Response getSchedulerConfigurationVersion(@Context
|
||||||
|
HttpServletRequest hsr) throws AuthorizationException {
|
||||||
|
// Only admin user is allowed to get scheduler conf version
|
||||||
|
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
|
||||||
|
initForWritableEndpoints(callerUGI, true);
|
||||||
|
|
||||||
|
ResourceScheduler scheduler = rm.getResourceScheduler();
|
||||||
|
if (scheduler instanceof MutableConfScheduler
|
||||||
|
&& ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
|
||||||
|
MutableConfigurationProvider mutableConfigurationProvider =
|
||||||
|
((MutableConfScheduler) scheduler).getMutableConfProvider();
|
||||||
|
|
||||||
|
try {
|
||||||
|
long configVersion = mutableConfigurationProvider
|
||||||
|
.getConfigVersion();
|
||||||
|
return Response.status(Status.OK)
|
||||||
|
.entity(new ConfigVersionInfo(configVersion)).build();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Exception thrown when fetching configuration version.", e);
|
||||||
|
return Response.status(Status.BAD_REQUEST).entity(e.getMessage())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Response.status(Status.BAD_REQUEST)
|
||||||
|
.entity("Configuration Version only supported by "
|
||||||
|
+ "MutableConfScheduler.").build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@GET
|
@GET
|
||||||
@Path(RMWSConsts.CHECK_USER_ACCESS_TO_QUEUE)
|
@Path(RMWSConsts.CHECK_USER_ACCESS_TO_QUEUE)
|
||||||
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
|
||||||
|
|
||||||
|
import javax.xml.bind.annotation.XmlAccessType;
|
||||||
|
import javax.xml.bind.annotation.XmlAccessorType;
|
||||||
|
import javax.xml.bind.annotation.XmlRootElement;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Version of Scheduler Config.
|
||||||
|
*/
|
||||||
|
@XmlRootElement(name = "configversion")
|
||||||
|
@XmlAccessorType(XmlAccessType.FIELD)
|
||||||
|
public class ConfigVersionInfo {
|
||||||
|
|
||||||
|
private long versionID;
|
||||||
|
|
||||||
|
public ConfigVersionInfo() {
|
||||||
|
} // JAXB needs this
|
||||||
|
|
||||||
|
public ConfigVersionInfo(long version) {
|
||||||
|
this.versionID = version;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getVersionID() {
|
||||||
|
return this.versionID;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -37,7 +37,6 @@ import org.junit.Test;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -140,7 +139,6 @@ public class TestFSSchedulerConfigurationStore {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFormatConfiguration() throws Exception {
|
public void testFormatConfiguration() throws Exception {
|
||||||
assertTrue(testSchedulerConfigurationDir.exists());
|
|
||||||
Configuration schedulerConf = new Configuration();
|
Configuration schedulerConf = new Configuration();
|
||||||
schedulerConf.set("a", "a");
|
schedulerConf.set("a", "a");
|
||||||
writeConf(schedulerConf);
|
writeConf(schedulerConf);
|
||||||
|
@ -148,7 +146,15 @@ public class TestFSSchedulerConfigurationStore {
|
||||||
Configuration storedConfig = configurationStore.retrieve();
|
Configuration storedConfig = configurationStore.retrieve();
|
||||||
assertEquals("a", storedConfig.get("a"));
|
assertEquals("a", storedConfig.get("a"));
|
||||||
configurationStore.format();
|
configurationStore.format();
|
||||||
assertFalse(testSchedulerConfigurationDir.exists());
|
boolean exceptionCaught = false;
|
||||||
|
try {
|
||||||
|
storedConfig = configurationStore.retrieve();
|
||||||
|
} catch (IOException e) {
|
||||||
|
if (e.getMessage().contains("no capacity scheduler file in")) {
|
||||||
|
exceptionCaught = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue(exceptionCaught);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -137,6 +137,21 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
|
||||||
assertNull(confStore.retrieve());
|
assertNull(confStore.retrieve());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetConfigurationVersion() throws Exception {
|
||||||
|
confStore.initialize(conf, schedConf, rmContext);
|
||||||
|
long v1 = confStore.getConfigVersion();
|
||||||
|
assertEquals(1, v1);
|
||||||
|
Map<String, String> update = new HashMap<>();
|
||||||
|
update.put("keyver", "valver");
|
||||||
|
YarnConfigurationStore.LogMutation mutation =
|
||||||
|
new YarnConfigurationStore.LogMutation(update, TEST_USER);
|
||||||
|
confStore.logMutation(mutation);
|
||||||
|
confStore.confirmMutation(true);
|
||||||
|
long v2 = confStore.getConfigVersion();
|
||||||
|
assertEquals(2, v2);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPersistUpdatedConfiguration() throws Exception {
|
public void testPersistUpdatedConfiguration() throws Exception {
|
||||||
confStore.initialize(conf, schedConf, rmContext);
|
confStore.initialize(conf, schedConf, rmContext);
|
||||||
|
|
|
@ -202,6 +202,25 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
assertEquals(3, orgConf.getQueues("root").length);
|
assertEquals(3, orgConf.getQueues("root").length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private long getConfigVersion() throws Exception {
|
||||||
|
WebResource r = resource();
|
||||||
|
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||||
|
.queryParam("user.name", userName)
|
||||||
|
.path(RMWSConsts.SCHEDULER_CONF_VERSION)
|
||||||
|
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||||
|
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||||
|
|
||||||
|
JSONObject json = response.getEntity(JSONObject.class);
|
||||||
|
return Long.parseLong(json.get("versionID").toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSchedulerConfigVersion() throws Exception {
|
||||||
|
assertEquals(1, getConfigVersion());
|
||||||
|
testAddNestedQueue();
|
||||||
|
assertEquals(2, getConfigVersion());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAddNestedQueue() throws Exception {
|
public void testAddNestedQueue() throws Exception {
|
||||||
CapacitySchedulerConfiguration orgConf = getSchedulerConf();
|
CapacitySchedulerConfiguration orgConf = getSchedulerConf();
|
||||||
|
|
Loading…
Reference in New Issue