Revert "YARN-9873. Mutation API Config Change updates Version Number. Contributed by Prabhu Joseph"
This reverts commit 4510970e2f
.
This commit is contained in:
parent
f209722a19
commit
fb1ecff6a2
|
@ -65,12 +65,6 @@ public interface MutableConfigurationProvider {
|
||||||
*/
|
*/
|
||||||
Configuration getConfiguration();
|
Configuration getConfiguration();
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the last updated scheduler config version.
|
|
||||||
* @return Last updated scheduler config version.
|
|
||||||
*/
|
|
||||||
long getConfigVersion() throws Exception;
|
|
||||||
|
|
||||||
void formatConfigurationInStore(Configuration conf) throws Exception;
|
void formatConfigurationInStore(Configuration conf) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -148,13 +148,6 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
|
||||||
tempConfigPath = null;
|
tempConfigPath = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getConfigVersion() throws Exception {
|
|
||||||
String version = getLatestConfigPath().getName().
|
|
||||||
substring(YarnConfiguration.CS_CONFIGURATION_FILE.length() + 1);
|
|
||||||
return Long.parseLong(version);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void finalizeFileSystemFile() throws IOException {
|
private void finalizeFileSystemFile() throws IOException {
|
||||||
// call confirmMutation() make sure tempConfigPath is not null
|
// call confirmMutation() make sure tempConfigPath is not null
|
||||||
Path finalConfigPath = getFinalConfigPath(tempConfigPath);
|
Path finalConfigPath = getFinalConfigPath(tempConfigPath);
|
||||||
|
|
|
@ -33,13 +33,11 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
|
||||||
|
|
||||||
private Configuration schedConf;
|
private Configuration schedConf;
|
||||||
private LogMutation pendingMutation;
|
private LogMutation pendingMutation;
|
||||||
private long configVersion;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize(Configuration conf, Configuration schedConf,
|
public void initialize(Configuration conf, Configuration schedConf,
|
||||||
RMContext rmContext) {
|
RMContext rmContext) {
|
||||||
this.schedConf = schedConf;
|
this.schedConf = schedConf;
|
||||||
this.configVersion = System.currentTimeMillis();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -59,7 +57,6 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.configVersion = System.currentTimeMillis();
|
|
||||||
pendingMutation = null;
|
pendingMutation = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,11 +70,6 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
|
||||||
return schedConf;
|
return schedConf;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getConfigVersion() throws Exception {
|
|
||||||
return configVersion;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<LogMutation> getConfirmedConfHistory(long fromId) {
|
public List<LogMutation> getConfirmedConfHistory(long fromId) {
|
||||||
// Unimplemented.
|
// Unimplemented.
|
||||||
|
|
|
@ -68,7 +68,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
||||||
private static final String DB_NAME = "yarn-conf-store";
|
private static final String DB_NAME = "yarn-conf-store";
|
||||||
private static final String LOG_KEY = "log";
|
private static final String LOG_KEY = "log";
|
||||||
private static final String VERSION_KEY = "version";
|
private static final String VERSION_KEY = "version";
|
||||||
private static final String CONF_VERSION_KEY = "conf-version";
|
|
||||||
|
|
||||||
private DB db;
|
private DB db;
|
||||||
private long maxLogs;
|
private long maxLogs;
|
||||||
|
@ -125,10 +124,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
||||||
return 1;
|
return 1;
|
||||||
} else if (key2Str.equals(LOG_KEY)) {
|
} else if (key2Str.equals(LOG_KEY)) {
|
||||||
return -1;
|
return -1;
|
||||||
} else if (key1Str.equals(CONF_VERSION_KEY)) {
|
|
||||||
return 1;
|
|
||||||
} else if (key2Str.equals(CONF_VERSION_KEY)) {
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
return key1Str.compareTo(key2Str);
|
return key1Str.compareTo(key2Str);
|
||||||
}
|
}
|
||||||
|
@ -151,10 +146,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
||||||
File dbfile = new File(storeRoot.toString());
|
File dbfile = new File(storeRoot.toString());
|
||||||
try {
|
try {
|
||||||
db = JniDBFactory.factory.open(dbfile, options);
|
db = JniDBFactory.factory.open(dbfile, options);
|
||||||
if (db.get(bytes(CONF_VERSION_KEY)) == null) {
|
|
||||||
db.put(bytes(CONF_VERSION_KEY),
|
|
||||||
bytes(String.valueOf(System.currentTimeMillis())));
|
|
||||||
}
|
|
||||||
} catch (NativeDB.DBException e) {
|
} catch (NativeDB.DBException e) {
|
||||||
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
|
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
|
||||||
LOG.info("Creating conf database at " + dbfile);
|
LOG.info("Creating conf database at " + dbfile);
|
||||||
|
@ -167,8 +158,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
||||||
initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
|
initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
|
||||||
}
|
}
|
||||||
db.write(initBatch);
|
db.write(initBatch);
|
||||||
db.put(bytes(CONF_VERSION_KEY),
|
|
||||||
bytes(String.valueOf(System.currentTimeMillis())));
|
|
||||||
} catch (DBException dbErr) {
|
} catch (DBException dbErr) {
|
||||||
throw new IOException(dbErr.getMessage(), dbErr);
|
throw new IOException(dbErr.getMessage(), dbErr);
|
||||||
}
|
}
|
||||||
|
@ -226,8 +215,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
db.write(updateBatch);
|
db.write(updateBatch);
|
||||||
db.put(bytes(CONF_VERSION_KEY),
|
|
||||||
bytes(String.valueOf(System.currentTimeMillis())));
|
|
||||||
pendingMutation = null;
|
pendingMutation = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -263,8 +250,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
||||||
Map.Entry<byte[], byte[]> entry = itr.next();
|
Map.Entry<byte[], byte[]> entry = itr.next();
|
||||||
String key = new String(entry.getKey(), StandardCharsets.UTF_8);
|
String key = new String(entry.getKey(), StandardCharsets.UTF_8);
|
||||||
String value = new String(entry.getValue(), StandardCharsets.UTF_8);
|
String value = new String(entry.getValue(), StandardCharsets.UTF_8);
|
||||||
if (key.equals(LOG_KEY) || key.equals(VERSION_KEY) ||
|
if (key.equals(LOG_KEY) || key.equals(VERSION_KEY)) {
|
||||||
key.equals(CONF_VERSION_KEY)) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
config.set(key, value);
|
config.set(key, value);
|
||||||
|
@ -272,13 +258,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getConfigVersion() throws Exception {
|
|
||||||
String version = new String(db.get(bytes(CONF_VERSION_KEY)),
|
|
||||||
StandardCharsets.UTF_8);
|
|
||||||
return Long.parseLong(version);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<LogMutation> getConfirmedConfHistory(long fromId) {
|
public List<LogMutation> getConfirmedConfHistory(long fromId) {
|
||||||
return null; // unimplemented
|
return null; // unimplemented
|
||||||
|
|
|
@ -134,11 +134,6 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
||||||
return new Configuration(schedConf);
|
return new Configuration(schedConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getConfigVersion() throws Exception {
|
|
||||||
return confStore.getConfigVersion();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ConfigurationMutationACLPolicy getAclMutationPolicy() {
|
public ConfigurationMutationACLPolicy getAclMutationPolicy() {
|
||||||
return aclMutationPolicy;
|
return aclMutationPolicy;
|
||||||
|
|
|
@ -132,12 +132,6 @@ public abstract class YarnConfigurationStore {
|
||||||
*/
|
*/
|
||||||
public abstract void format() throws Exception;
|
public abstract void format() throws Exception;
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the last updated config version.
|
|
||||||
* @return Last updated config version.
|
|
||||||
*/
|
|
||||||
public abstract long getConfigVersion() throws Exception;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a list of confirmed configuration mutations starting from a given id.
|
* Get a list of confirmed configuration mutations starting from a given id.
|
||||||
* @param fromId id from which to start getting mutations, inclusive
|
* @param fromId id from which to start getting mutations, inclusive
|
||||||
|
|
|
@ -62,13 +62,11 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
||||||
private static final String LOGS_PATH = "LOGS";
|
private static final String LOGS_PATH = "LOGS";
|
||||||
private static final String CONF_STORE_PATH = "CONF_STORE";
|
private static final String CONF_STORE_PATH = "CONF_STORE";
|
||||||
private static final String FENCING_PATH = "FENCING";
|
private static final String FENCING_PATH = "FENCING";
|
||||||
private static final String CONF_VERSION_PATH = "CONF_VERSION";
|
|
||||||
|
|
||||||
private String zkVersionPath;
|
private String zkVersionPath;
|
||||||
private String logsPath;
|
private String logsPath;
|
||||||
private String confStorePath;
|
private String confStorePath;
|
||||||
private String fencingNodePath;
|
private String fencingNodePath;
|
||||||
private String confVersionPath;
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected ZKCuratorManager zkManager;
|
protected ZKCuratorManager zkManager;
|
||||||
|
@ -91,7 +89,6 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
||||||
this.logsPath = getNodePath(znodeParentPath, LOGS_PATH);
|
this.logsPath = getNodePath(znodeParentPath, LOGS_PATH);
|
||||||
this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH);
|
this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH);
|
||||||
this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH);
|
this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH);
|
||||||
this.confVersionPath = getNodePath(znodeParentPath, CONF_VERSION_PATH);
|
|
||||||
|
|
||||||
zkManager.createRootDirRecursively(znodeParentPath, zkAcl);
|
zkManager.createRootDirRecursively(znodeParentPath, zkAcl);
|
||||||
zkManager.delete(fencingNodePath);
|
zkManager.delete(fencingNodePath);
|
||||||
|
@ -102,12 +99,6 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
||||||
serializeObject(new LinkedList<LogMutation>()), -1);
|
serializeObject(new LinkedList<LogMutation>()), -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!zkManager.exists(confVersionPath)) {
|
|
||||||
zkManager.create(confVersionPath);
|
|
||||||
zkManager.setData(confVersionPath,
|
|
||||||
String.valueOf(System.currentTimeMillis()), -1);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!zkManager.exists(confStorePath)) {
|
if (!zkManager.exists(confStorePath)) {
|
||||||
zkManager.create(confStorePath);
|
zkManager.create(confStorePath);
|
||||||
HashMap<String, String> mapSchedConf = new HashMap<>();
|
HashMap<String, String> mapSchedConf = new HashMap<>();
|
||||||
|
@ -115,8 +106,6 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
||||||
mapSchedConf.put(entry.getKey(), entry.getValue());
|
mapSchedConf.put(entry.getKey(), entry.getValue());
|
||||||
}
|
}
|
||||||
zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1);
|
zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1);
|
||||||
zkManager.setData(confVersionPath,
|
|
||||||
String.valueOf(System.currentTimeMillis()), -1);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -196,9 +185,6 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
||||||
}
|
}
|
||||||
zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1,
|
zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1,
|
||||||
zkAcl, fencingNodePath);
|
zkAcl, fencingNodePath);
|
||||||
zkManager.setData(confVersionPath,
|
|
||||||
String.valueOf(System.currentTimeMillis()), -1);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
pendingMutation = null;
|
pendingMutation = null;
|
||||||
}
|
}
|
||||||
|
@ -227,11 +213,6 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getConfigVersion() throws Exception {
|
|
||||||
return Long.parseLong(zkManager.getStringData(confVersionPath));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<LogMutation> getConfirmedConfHistory(long fromId) {
|
public List<LogMutation> getConfirmedConfHistory(long fromId) {
|
||||||
return null; // unimplemented
|
return null; // unimplemented
|
||||||
|
|
|
@ -51,9 +51,6 @@ public final class RMWSConsts {
|
||||||
/** Path for {@code RMWebServices#formatSchedulerConfiguration}. */
|
/** Path for {@code RMWebServices#formatSchedulerConfiguration}. */
|
||||||
public static final String FORMAT_SCHEDULER_CONF = "/scheduler-conf/format";
|
public static final String FORMAT_SCHEDULER_CONF = "/scheduler-conf/format";
|
||||||
|
|
||||||
/** Path for {@code RMWebServices#getSchedulerConfigurationVersion}. */
|
|
||||||
public static final String SCHEDULER_CONF_VERSION = "/scheduler-conf/version";
|
|
||||||
|
|
||||||
/** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */
|
/** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */
|
||||||
public static final String SCHEDULER_LOGS = "/scheduler/logs";
|
public static final String SCHEDULER_LOGS = "/scheduler/logs";
|
||||||
|
|
||||||
|
|
|
@ -2590,7 +2590,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Response.status(Status.BAD_REQUEST)
|
return Response.status(Status.BAD_REQUEST)
|
||||||
.entity("Scheduler Configuration format only supported by " +
|
.entity("Configuration change only supported by " +
|
||||||
"MutableConfScheduler.").build();
|
"MutableConfScheduler.").build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2680,38 +2680,6 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@GET
|
|
||||||
@Path(RMWSConsts.SCHEDULER_CONF_VERSION)
|
|
||||||
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
|
||||||
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
|
||||||
public Response getSchedulerConfigurationVersion(@Context
|
|
||||||
HttpServletRequest hsr) throws AuthorizationException {
|
|
||||||
// Only admin user is allowed to get scheduler conf version
|
|
||||||
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
|
|
||||||
initForWritableEndpoints(callerUGI, true);
|
|
||||||
|
|
||||||
ResourceScheduler scheduler = rm.getResourceScheduler();
|
|
||||||
if (scheduler instanceof MutableConfScheduler
|
|
||||||
&& ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
|
|
||||||
MutableConfigurationProvider mutableConfigurationProvider =
|
|
||||||
((MutableConfScheduler) scheduler).getMutableConfProvider();
|
|
||||||
|
|
||||||
try {
|
|
||||||
long configVersion = mutableConfigurationProvider
|
|
||||||
.getConfigVersion();
|
|
||||||
return Response.status(Status.OK).entity(configVersion).build();
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Exception thrown when fetching configuration version.", e);
|
|
||||||
return Response.status(Status.BAD_REQUEST).entity(e.getMessage())
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Response.status(Status.BAD_REQUEST)
|
|
||||||
.entity("Configuration Version only supported by "
|
|
||||||
+ "MutableConfScheduler.").build();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@GET
|
@GET
|
||||||
@Path(RMWSConsts.CHECK_USER_ACCESS_TO_QUEUE)
|
@Path(RMWSConsts.CHECK_USER_ACCESS_TO_QUEUE)
|
||||||
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||||
|
|
|
@ -137,21 +137,6 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
|
||||||
assertNull(confStore.retrieve());
|
assertNull(confStore.retrieve());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetConfigurationVersion() throws Exception {
|
|
||||||
confStore.initialize(conf, schedConf, rmContext);
|
|
||||||
long v1 = confStore.getConfigVersion();
|
|
||||||
Thread.sleep(2000);
|
|
||||||
Map<String, String> update = new HashMap<>();
|
|
||||||
update.put("keyver", "valver");
|
|
||||||
YarnConfigurationStore.LogMutation mutation =
|
|
||||||
new YarnConfigurationStore.LogMutation(update, TEST_USER);
|
|
||||||
confStore.logMutation(mutation);
|
|
||||||
confStore.confirmMutation(true);
|
|
||||||
long v2 = confStore.getConfigVersion();
|
|
||||||
assertTrue(v2 > v1);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPersistUpdatedConfiguration() throws Exception {
|
public void testPersistUpdatedConfiguration() throws Exception {
|
||||||
confStore.initialize(conf, schedConf, rmContext);
|
confStore.initialize(conf, schedConf, rmContext);
|
||||||
|
|
Loading…
Reference in New Issue