From e6827a9121f9ba8d8a387c001f9d75828eb2e56e Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Mon, 18 Sep 2017 09:53:42 -0700 Subject: [PATCH] YARN-6840. Implement zookeeper based store for scheduler configuration updates. (Jonathan Hung via wangda) Change-Id: I9debea674fe8c7e4109d4ca136965a1ea4c48bcc --- .../hadoop/yarn/conf/YarnConfiguration.java | 14 +- .../src/main/resources/yarn-default.xml | 15 +- .../server/resourcemanager/AdminService.java | 18 +- .../resourcemanager/ResourceManager.java | 24 +- .../RMStateVersionIncompatibleException.java | 2 +- .../recovery/ZKRMStateStore.java | 5 +- .../scheduler/MutableConfScheduler.java | 22 +- .../MutableConfigurationProvider.java | 36 +- .../scheduler/capacity/CapacityScheduler.java | 24 +- .../conf/InMemoryConfigurationStore.java | 73 ++-- .../conf/LeveldbConfigurationStore.java | 168 +++++----- .../conf/MutableCSConfigurationProvider.java | 150 ++++----- .../capacity/conf/YarnConfigurationStore.java | 132 ++++---- .../capacity/conf/ZKConfigurationStore.java | 235 +++++++++++++ .../resourcemanager/webapp/RMWebServices.java | 26 +- .../conf/ConfigurationStoreBaseTest.java | 90 +++++ .../conf/TestInMemoryConfigurationStore.java | 30 ++ .../TestMutableCSConfigurationProvider.java | 18 +- .../conf/TestYarnConfigurationStore.java | 71 ---- .../conf/TestZKConfigurationStore.java | 312 ++++++++++++++++++ 20 files changed, 1040 insertions(+), 425 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 7da4772e74d..600154a95cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -649,6 +649,7 @@ public class YarnConfiguration extends Configuration { YARN_PREFIX + "scheduler.configuration.store.class"; public static final String MEMORY_CONFIGURATION_STORE = "memory"; public static final String LEVELDB_CONFIGURATION_STORE = "leveldb"; + public static final String ZK_CONFIGURATION_STORE = "zk"; public static final String DEFAULT_CONFIGURATION_STORE = MEMORY_CONFIGURATION_STORE; public static final String RM_SCHEDCONF_STORE_PATH = YARN_PREFIX @@ -660,9 +661,16 @@ public class YarnConfiguration extends Configuration { public static final long DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS = 60 * 60 * 24L; - public static final String RM_SCHEDCONF_LEVELDB_MAX_LOGS = - YARN_PREFIX + "scheduler.configuration.leveldb-store.max-logs"; - public static final int DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000; + public static final String RM_SCHEDCONF_MAX_LOGS = + YARN_PREFIX + "scheduler.configuration.store.max-logs"; + public static final long DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000; + public static final long DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS = 1000; + + /** Parent znode path under which ZKConfigurationStore will create znodes. */ + public static final String RM_SCHEDCONF_STORE_ZK_PARENT_PATH = YARN_PREFIX + + "scheduler.configuration.zk-store.parent-path"; + public static final String DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH = + "/confstore"; public static final String RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS = YARN_PREFIX + "scheduler.configuration.mutation.acl-policy.class"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 02918407f43..937f87dab0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3385,11 +3385,20 @@ - The max number of configuration change log entries kept in LevelDB config + The max number of configuration change log entries kept in config store, when yarn.scheduler.configuration.store.class is configured to be - "leveldb". Default is 1000. + "leveldb" or "zk". Default is 1000 for either. - yarn.scheduler.configuration.leveldb-store.max-logs + yarn.scheduler.configuration.store.max-logs 1000 + + + + ZK root node path for configuration store when using zookeeper-based + configuration store. + + yarn.scheduler.configuration.zk-store.parent-path + /confstore + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index fd9e849f3c4..6c0a8541223 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -387,9 +387,7 @@ public class AdminService extends CompositeService implements RefreshQueuesResponse response = recordFactory.newRecordInstance(RefreshQueuesResponse.class); try { - ResourceScheduler scheduler = rm.getRMContext().getScheduler(); - if (scheduler instanceof MutableConfScheduler - && ((MutableConfScheduler) scheduler).isConfigurationMutable()) { + if (isSchedulerMutable()) { throw new IOException("Scheduler configuration is mutable. " + operation + " is not allowed in this scenario."); } @@ -413,6 +411,12 @@ public class AdminService extends CompositeService implements } } + private boolean isSchedulerMutable() { + ResourceScheduler scheduler = rm.getRMContext().getScheduler(); + return (scheduler instanceof MutableConfScheduler + && ((MutableConfScheduler) scheduler).isConfigurationMutable()); + } + @Override public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) throws YarnException, StandbyException { @@ -721,6 +725,14 @@ public class AdminService extends CompositeService implements void refreshAll() throws ServiceFailedException { try { checkAcls("refreshAll"); + if (isSchedulerMutable()) { + try { + ((MutableConfScheduler) rm.getRMContext().getScheduler()) + .getMutableConfProvider().reloadConfigurationFromStore(); + } catch (Exception e) { + throw new IOException("Failed to refresh configuration:", e); + } + } refreshQueues(); refreshNodes(); refreshSuperUserGroupsConfiguration(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 8e8af331061..4277e4acdce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -342,7 +342,7 @@ public class ResourceManager extends CompositeService implements Recoverable { conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED); if (curatorEnabled) { - this.zkManager = createAndStartZKManager(conf); + this.zkManager = getAndStartZKManager(conf); elector = new CuratorBasedElectorService(this); } else { elector = new ActiveStandbyElectorBasedElectorService(this); @@ -351,13 +351,16 @@ public class ResourceManager extends CompositeService implements Recoverable { } /** - * Create and ZooKeeper Curator manager. + * Get ZooKeeper Curator manager, creating and starting if not exists. * @param config Configuration for the ZooKeeper curator. - * @return New ZooKeeper Curator manager. + * @return ZooKeeper Curator manager. * @throws IOException If it cannot create the manager. */ - public ZKCuratorManager createAndStartZKManager(Configuration config) - throws IOException { + public synchronized ZKCuratorManager getAndStartZKManager(Configuration + config) throws IOException { + if (this.zkManager != null) { + return zkManager; + } ZKCuratorManager manager = new ZKCuratorManager(config); // Get authentication @@ -377,15 +380,8 @@ public class ResourceManager extends CompositeService implements Recoverable { } manager.start(authInfos); - return manager; - } - - /** - * Get the ZooKeeper Curator manager. - * @return ZooKeeper Curator manager. - */ - public ZKCuratorManager getZKManager() { - return this.zkManager; + this.zkManager = manager; + return zkManager; } public CuratorFramework getCurator() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java index 135868f1a57..d5fce36efba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java @@ -22,7 +22,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; /** * This exception is thrown by ResourceManager if it's loading an incompatible - * version of state from state store on recovery. + * version of storage on recovery. */ public class RMStateVersionIncompatibleException extends YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index ac67dcd9a86..5bff77f3416 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -327,10 +327,7 @@ public class ZKRMStateStore extends RMStateStore { amrmTokenSecretManagerRoot = getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT); reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT); - zkManager = resourceManager.getZKManager(); - if (zkManager == null) { - zkManager = resourceManager.createAndStartZKManager(conf); - } + zkManager = resourceManager.getAndStartZKManager(conf); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java index 313bf6aee1c..6f677fbfb3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java @@ -18,11 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; - -import java.io.IOException; /** * Interface for a scheduler that supports changing configuration at runtime. @@ -30,16 +25,6 @@ import java.io.IOException; */ public interface MutableConfScheduler extends ResourceScheduler { - /** - * Update the scheduler's configuration. - * @param user Caller of this update - * @param confUpdate configuration update - * @throws IOException if scheduler could not be reinitialized - * @throws YarnException if reservation system could not be reinitialized - */ - void updateConfiguration(UserGroupInformation user, - SchedConfUpdateInfo confUpdate) throws IOException, YarnException; - /** * Get the scheduler configuration. * @return the scheduler configuration @@ -58,4 +43,11 @@ public interface MutableConfScheduler extends ResourceScheduler { * @return whether scheduler configuration is mutable or not. */ boolean isConfigurationMutable(); + + /** + * Get scheduler's configuration provider, so other classes can directly + * call mutation APIs on configuration provider. + * @return scheduler's configuration provider + */ + MutableConfigurationProvider getMutableConfProvider(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java index 9baf1ad1ee2..f8e8814377a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java @@ -19,30 +19,40 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; -import java.io.IOException; - /** * Interface for allowing changing scheduler configurations. */ public interface MutableConfigurationProvider { /** - * Apply transactions which were not committed. - * @throws IOException if recovery fails + * Get the acl mutation policy for this configuration provider. + * @return The acl mutation policy. */ - void recoverConf() throws IOException; + ConfigurationMutationACLPolicy getAclMutationPolicy(); /** - * Update the scheduler configuration with the provided key value pairs. - * @param user User issuing the request - * @param confUpdate Key-value pairs for configurations to be updated. - * @throws IOException if scheduler could not be reinitialized - * @throws YarnException if reservation system could not be reinitialized + * Called when a new ResourceManager is starting/becomes active. Ensures + * configuration is up-to-date. + * @throws Exception if configuration could not be refreshed from store */ - void mutateConfiguration(UserGroupInformation user, SchedConfUpdateInfo - confUpdate) throws IOException, YarnException; + void reloadConfigurationFromStore() throws Exception; + /** + * Log user's requested configuration mutation, and applies it in-memory. + * @param user User who requested the change + * @param confUpdate User's requested configuration change + * @throws Exception if logging the mutation fails + */ + void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo + confUpdate) throws Exception; + + /** + * Confirm last logged mutation. + * @param isValid if the last logged mutation is applied to scheduler + * properly. + * @throws Exception if confirming mutation fails + */ + void confirmPendingMutation(boolean isValid) throws Exception; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index f46c41f5d93..fa206e161fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -141,7 +141,6 @@ import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -393,9 +392,6 @@ public class CapacityScheduler extends @Override public void serviceStart() throws Exception { startSchedulerThreads(); - if (this.csConfProvider instanceof MutableConfigurationProvider) { - ((MutableConfigurationProvider) csConfProvider).recoverConf(); - } super.serviceStart(); } @@ -2618,20 +2614,16 @@ public class CapacityScheduler extends return ((LeafQueue) queue).getMaximumApplicationLifetime(); } - @Override - public void updateConfiguration(UserGroupInformation user, - SchedConfUpdateInfo confUpdate) throws IOException, YarnException { - if (isConfigurationMutable()) { - ((MutableConfigurationProvider) csConfProvider).mutateConfiguration( - user, confUpdate); - } else { - throw new UnsupportedOperationException("Configured CS configuration " + - "provider does not support updating configuration."); - } - } - @Override public boolean isConfigurationMutable() { return csConfProvider instanceof MutableConfigurationProvider; } + + @Override + public MutableConfigurationProvider getMutableConfProvider() { + if (isConfigurationMutable()) { + return (MutableConfigurationProvider) csConfProvider; + } + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java index c63734dfac8..d69c23690c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java @@ -19,8 +19,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -28,48 +29,35 @@ import java.util.Map; * A default implementation of {@link YarnConfigurationStore}. Doesn't offer * persistent configuration storage, just stores the configuration in memory. */ -public class InMemoryConfigurationStore implements YarnConfigurationStore { +public class InMemoryConfigurationStore extends YarnConfigurationStore { private Configuration schedConf; - private LinkedList pendingMutations; - private long pendingId; + private LogMutation pendingMutation; @Override - public void initialize(Configuration conf, Configuration schedConf) { + public void initialize(Configuration conf, Configuration schedConf, + RMContext rmContext) { this.schedConf = schedConf; - this.pendingMutations = new LinkedList<>(); - this.pendingId = 0; } @Override - public synchronized long logMutation(LogMutation logMutation) { - logMutation.setId(++pendingId); - pendingMutations.add(logMutation); - return pendingId; + public void logMutation(LogMutation logMutation) { + pendingMutation = logMutation; } @Override - public synchronized boolean confirmMutation(long id, boolean isValid) { - LogMutation mutation = pendingMutations.poll(); - // If confirmMutation is called out of order, discard mutations until id - // is reached. - while (mutation != null) { - if (mutation.getId() == id) { - if (isValid) { - Map mutations = mutation.getUpdates(); - for (Map.Entry kv : mutations.entrySet()) { - if (kv.getValue() == null) { - schedConf.unset(kv.getKey()); - } else { - schedConf.set(kv.getKey(), kv.getValue()); - } - } + public void confirmMutation(boolean isValid) { + if (isValid) { + for (Map.Entry kv : pendingMutation.getUpdates() + .entrySet()) { + if (kv.getValue() == null) { + schedConf.unset(kv.getKey()); + } else { + schedConf.set(kv.getKey(), kv.getValue()); } - return true; } - mutation = pendingMutations.poll(); } - return false; + pendingMutation = null; } @Override @@ -77,14 +65,31 @@ public class InMemoryConfigurationStore implements YarnConfigurationStore { return schedConf; } - @Override - public synchronized List getPendingMutations() { - return new LinkedList<>(pendingMutations); - } - @Override public List getConfirmedConfHistory(long fromId) { // Unimplemented. return null; } + + @Override + public Version getConfStoreVersion() throws Exception { + // Does nothing. + return null; + } + + @Override + public void storeVersion() throws Exception { + // Does nothing. + } + + @Override + public Version getCurrentVersion() { + // Does nothing. + return null; + } + + @Override + public void checkVersion() { + // Does nothing. (Version is always compatible since it's in memory) + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java index 1280fab931b..1b0eb9f1a9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java @@ -26,6 +26,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.fusesource.leveldbjni.JniDBFactory; import org.fusesource.leveldbjni.internal.NativeDB; import org.iq80.leveldb.DB; @@ -55,58 +59,32 @@ import static org.fusesource.leveldbjni.JniDBFactory.bytes; /** * A LevelDB implementation of {@link YarnConfigurationStore}. */ -public class LeveldbConfigurationStore implements YarnConfigurationStore { +public class LeveldbConfigurationStore extends YarnConfigurationStore { public static final Log LOG = LogFactory.getLog(LeveldbConfigurationStore.class); private static final String DB_NAME = "yarn-conf-store"; - private static final String LOG_PREFIX = "log."; - private static final String LOG_COMMITTED_TXN = "committedTxn"; + private static final String LOG_KEY = "log"; + private static final String VERSION_KEY = "version"; private DB db; - // Txnid for the last transaction logged to the store. - private long txnId = 0; - private long minTxn = 0; private long maxLogs; private Configuration conf; - private LinkedList pendingMutations = new LinkedList<>(); + private LogMutation pendingMutation; + private static final Version CURRENT_VERSION_INFO = Version + .newInstance(0, 1); private Timer compactionTimer; private long compactionIntervalMsec; @Override - public void initialize(Configuration config, Configuration schedConf) - throws IOException { + public void initialize(Configuration config, Configuration schedConf, + RMContext rmContext) throws IOException { this.conf = config; try { this.db = initDatabase(schedConf); - this.txnId = Long.parseLong(new String(db.get(bytes(LOG_COMMITTED_TXN)), - StandardCharsets.UTF_8)); - DBIterator itr = db.iterator(); - itr.seek(bytes(LOG_PREFIX + txnId)); - // Seek to first uncommitted log - itr.next(); - while (itr.hasNext()) { - Map.Entry entry = itr.next(); - if (!new String(entry.getKey(), StandardCharsets.UTF_8) - .startsWith(LOG_PREFIX)) { - break; - } - pendingMutations.add(deserLogMutation(entry.getValue())); - txnId++; - } - // Get the earliest txnId stored in logs - itr.seekToFirst(); - if (itr.hasNext()) { - Map.Entry entry = itr.next(); - byte[] key = entry.getKey(); - String logId = new String(key, StandardCharsets.UTF_8); - if (logId.startsWith(LOG_PREFIX)) { - minTxn = Long.parseLong(logId.substring(logId.indexOf('.') + 1)); - } - } this.maxLogs = config.getLong( - YarnConfiguration.RM_SCHEDCONF_LEVELDB_MAX_LOGS, + YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS); this.compactionIntervalMsec = config.getLong( YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS, @@ -127,33 +105,23 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore { public int compare(byte[] key1, byte[] key2) { String key1Str = new String(key1, StandardCharsets.UTF_8); String key2Str = new String(key2, StandardCharsets.UTF_8); - int key1Txn = Integer.MAX_VALUE; - int key2Txn = Integer.MAX_VALUE; - if (key1Str.startsWith(LOG_PREFIX)) { - key1Txn = Integer.parseInt(key1Str.substring( - key1Str.indexOf('.') + 1)); + if (key1Str.equals(key2Str)) { + return 0; + } else if (key1Str.equals(VERSION_KEY)) { + return -1; + } else if (key2Str.equals(VERSION_KEY)) { + return 1; + } else if (key1Str.equals(LOG_KEY)) { + return -1; + } else if (key2Str.equals(LOG_KEY)) { + return 1; } - if (key2Str.startsWith(LOG_PREFIX)) { - key2Txn = Integer.parseInt(key2Str.substring( - key2Str.indexOf('.') + 1)); - } - // TODO txnId could overflow, in theory - if (key1Txn == Integer.MAX_VALUE && key2Txn == Integer.MAX_VALUE) { - if (key1Str.equals(key2Str) && key1Str.equals(LOG_COMMITTED_TXN)) { - return 0; - } else if (key1Str.equals(LOG_COMMITTED_TXN)) { - return -1; - } else if (key2Str.equals(LOG_COMMITTED_TXN)) { - return 1; - } - return key1Str.compareTo(key2Str); - } - return key1Txn - key2Txn; + return key1Str.compareTo(key2Str); } @Override public String name() { - return "logComparator"; + return "keyComparator"; } public byte[] findShortestSeparator(byte[] start, byte[] limit) { @@ -164,6 +132,7 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore { return key; } }); + LOG.info("Using conf database at " + storeRoot); File dbfile = new File(storeRoot.toString()); try { @@ -179,7 +148,6 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore { for (Map.Entry kv : config) { initBatch.put(bytes(kv.getKey()), bytes(kv.getValue())); } - initBatch.put(bytes(LOG_COMMITTED_TXN), bytes("0")); db.write(initBatch); } catch (DBException dbErr) { throw new IOException(dbErr.getMessage(), dbErr); @@ -208,28 +176,22 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore { } @Override - public synchronized long logMutation(LogMutation logMutation) - throws IOException { - logMutation.setId(++txnId); - WriteBatch logBatch = db.createWriteBatch(); - logBatch.put(bytes(LOG_PREFIX + txnId), serLogMutation(logMutation)); - if (txnId - minTxn >= maxLogs) { - logBatch.delete(bytes(LOG_PREFIX + minTxn)); - minTxn++; + public void logMutation(LogMutation logMutation) throws IOException { + LinkedList logs = deserLogMutations(db.get(bytes(LOG_KEY))); + logs.add(logMutation); + if (logs.size() > maxLogs) { + logs.removeFirst(); } - db.write(logBatch); - pendingMutations.add(logMutation); - return txnId; + db.put(bytes(LOG_KEY), serLogMutations(logs)); + pendingMutation = logMutation; } @Override - public synchronized boolean confirmMutation(long id, boolean isValid) - throws IOException { + public void confirmMutation(boolean isValid) throws IOException { WriteBatch updateBatch = db.createWriteBatch(); if (isValid) { - LogMutation mutation = deserLogMutation(db.get(bytes(LOG_PREFIX + id))); for (Map.Entry changes : - mutation.getUpdates().entrySet()) { + pendingMutation.getUpdates().entrySet()) { if (changes.getValue() == null || changes.getValue().isEmpty()) { updateBatch.delete(bytes(changes.getKey())); } else { @@ -237,28 +199,24 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore { } } } - updateBatch.put(bytes(LOG_COMMITTED_TXN), bytes(String.valueOf(id))); db.write(updateBatch); - // Assumes logMutation and confirmMutation are done in the same - // synchronized method. For example, - // {@link MutableCSConfigurationProvider#mutateConfiguration( - // UserGroupInformation user, SchedConfUpdateInfo confUpdate)} - pendingMutations.removeFirst(); - return true; + pendingMutation = null; } - private byte[] serLogMutation(LogMutation mutation) throws IOException { + private byte[] serLogMutations(LinkedList mutations) throws + IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (ObjectOutput oos = new ObjectOutputStream(baos)) { - oos.writeObject(mutation); + oos.writeObject(mutations); oos.flush(); return baos.toByteArray(); } } - private LogMutation deserLogMutation(byte[] mutation) throws IOException { + private LinkedList deserLogMutations(byte[] mutations) throws + IOException { try (ObjectInput input = new ObjectInputStream( - new ByteArrayInputStream(mutation))) { - return (LogMutation) input.readObject(); + new ByteArrayInputStream(mutations))) { + return (LinkedList) input.readObject(); } catch (ClassNotFoundException e) { throw new IOException(e); } @@ -267,7 +225,7 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore { @Override public synchronized Configuration retrieve() { DBIterator itr = db.iterator(); - itr.seek(bytes(LOG_COMMITTED_TXN)); + itr.seek(bytes(LOG_KEY)); Configuration config = new Configuration(false); itr.next(); while (itr.hasNext()) { @@ -278,11 +236,6 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore { return config; } - @Override - public List getPendingMutations() { - return new LinkedList<>(pendingMutations); - } - @Override public List getConfirmedConfHistory(long fromId) { return null; // unimplemented @@ -299,6 +252,39 @@ public class LeveldbConfigurationStore implements YarnConfigurationStore { } } + // TODO: following is taken from LeveldbRMStateStore + @Override + public Version getConfStoreVersion() throws Exception { + Version version = null; + try { + byte[] data = db.get(bytes(VERSION_KEY)); + if (data != null) { + version = new VersionPBImpl(YarnServerCommonProtos.VersionProto + .parseFrom(data)); + } + } catch (DBException e) { + throw new IOException(e); + } + return version; + } + + @Override + public void storeVersion() throws Exception { + String key = VERSION_KEY; + byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto() + .toByteArray(); + try { + db.put(bytes(key), data); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public Version getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + private class CompactionTimerTask extends TimerTask { @Override public void run() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java index d03b2e23d27..70d18403b51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -18,20 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicyFactory; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; @@ -56,6 +53,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, LogFactory.getLog(MutableCSConfigurationProvider.class); private Configuration schedConf; + private Configuration oldConf; private YarnConfigurationStore confStore; private ConfigurationMutationACLPolicy aclMutationPolicy; private RMContext rmContext; @@ -76,6 +74,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, case YarnConfiguration.LEVELDB_CONFIGURATION_STORE: this.confStore = new LeveldbConfigurationStore(); break; + case YarnConfiguration.ZK_CONFIGURATION_STORE: + this.confStore = new ZKConfigurationStore(); + break; default: this.confStore = YarnConfigurationStoreFactory.getStore(config); break; @@ -89,7 +90,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, for (Map.Entry kv : initialSchedConf) { schedConf.set(kv.getKey(), kv.getValue()); } - confStore.initialize(config, schedConf); + try { + confStore.initialize(config, schedConf, rmContext); + } catch (Exception e) { + throw new IOException(e); + } // After initializing confStore, the store may already have an existing // configuration. Use this one. schedConf = confStore.retrieve(); @@ -98,6 +103,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, aclMutationPolicy.init(config, rmContext); } + @VisibleForTesting + public YarnConfigurationStore getConfStore() { + return confStore; + } + @Override public CapacitySchedulerConfiguration loadConfiguration(Configuration configuration) throws IOException { @@ -107,16 +117,17 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, } @Override - public synchronized void mutateConfiguration(UserGroupInformation user, - SchedConfUpdateInfo confUpdate) throws IOException, YarnException { - if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) { - throw new AccessControlException("User is not admin of all modified" + - " queues."); - } - Configuration oldConf = new Configuration(schedConf); + public ConfigurationMutationACLPolicy getAclMutationPolicy() { + return aclMutationPolicy; + } + + @Override + public void logAndApplyMutation(UserGroupInformation user, + SchedConfUpdateInfo confUpdate) throws Exception { + oldConf = new Configuration(schedConf); Map kvUpdate = constructKeyValueConfUpdate(confUpdate); LogMutation log = new LogMutation(kvUpdate, user.getShortUserName()); - long id = confStore.logMutation(log); + confStore.logMutation(log); for (Map.Entry kv : kvUpdate.entrySet()) { if (kv.getValue() == null) { schedConf.unset(kv.getKey()); @@ -124,47 +135,33 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, schedConf.set(kv.getKey(), kv.getValue()); } } - try { - rmContext.getRMAdminService().refreshQueues(); - } catch (IOException | YarnException e) { - schedConf = oldConf; - confStore.confirmMutation(id, false); - throw e; - } - confStore.confirmMutation(id, true); } @Override - public void recoverConf() throws IOException { - List uncommittedLogs = confStore.getPendingMutations(); - Configuration oldConf = new Configuration(schedConf); - for (LogMutation mutation : uncommittedLogs) { - for (Map.Entry kv : mutation.getUpdates().entrySet()) { - if (kv.getValue() == null) { - schedConf.unset(kv.getKey()); - } else { - schedConf.set(kv.getKey(), kv.getValue()); - } - } - try { - rmContext.getScheduler().reinitialize(schedConf, rmContext); - } catch (IOException e) { - schedConf = oldConf; - confStore.confirmMutation(mutation.getId(), false); - LOG.info("Configuration mutation " + mutation.getId() - + " was rejected", e); - continue; - } - confStore.confirmMutation(mutation.getId(), true); - LOG.info("Configuration mutation " + mutation.getId()+ " was accepted"); + public void confirmPendingMutation(boolean isValid) throws Exception { + confStore.confirmMutation(isValid); + if (!isValid) { + schedConf = oldConf; } } + @Override + public void reloadConfigurationFromStore() throws Exception { + schedConf = confStore.retrieve(); + } + + private List getSiblingQueues(String queuePath, Configuration conf) { + String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.')); + String childQueuesKey = CapacitySchedulerConfiguration.PREFIX + + parentQueue + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.QUEUES; + return new ArrayList<>(conf.getStringCollection(childQueuesKey)); + } + private Map constructKeyValueConfUpdate( SchedConfUpdateInfo mutationInfo) throws IOException { - CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler(); CapacitySchedulerConfiguration proposedConf = - new CapacitySchedulerConfiguration(cs.getConfiguration(), false); + new CapacitySchedulerConfiguration(schedConf, false); Map confUpdate = new HashMap<>(); for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) { removeQueue(queueToRemove, proposedConf, confUpdate); @@ -188,40 +185,35 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, if (queueToRemove == null) { return; } else { - CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler(); String queueName = queueToRemove.substring( queueToRemove.lastIndexOf('.') + 1); - CSQueue queue = cs.getQueue(queueName); - if (queue == null || - !queue.getQueuePath().equals(queueToRemove)) { - throw new IOException("Queue " + queueToRemove + " not found"); - } else if (queueToRemove.lastIndexOf('.') == -1) { + if (queueToRemove.lastIndexOf('.') == -1) { throw new IOException("Can't remove queue " + queueToRemove); - } - String parentQueuePath = queueToRemove.substring(0, queueToRemove - .lastIndexOf('.')); - String[] siblingQueues = proposedConf.getQueues(parentQueuePath); - List newSiblingQueues = new ArrayList<>(); - for (String siblingQueue : siblingQueues) { - if (!siblingQueue.equals(queueName)) { - newSiblingQueues.add(siblingQueue); - } - } - proposedConf.setQueues(parentQueuePath, newSiblingQueues - .toArray(new String[0])); - String queuesConfig = CapacitySchedulerConfiguration.PREFIX - + parentQueuePath + CapacitySchedulerConfiguration.DOT - + CapacitySchedulerConfiguration.QUEUES; - if (newSiblingQueues.size() == 0) { - confUpdate.put(queuesConfig, null); } else { - confUpdate.put(queuesConfig, Joiner.on(',').join(newSiblingQueues)); - } - for (Map.Entry confRemove : proposedConf.getValByRegex( - ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*") - .entrySet()) { - proposedConf.unset(confRemove.getKey()); - confUpdate.put(confRemove.getKey(), null); + List siblingQueues = getSiblingQueues(queueToRemove, + proposedConf); + if (!siblingQueues.contains(queueName)) { + throw new IOException("Queue " + queueToRemove + " not found"); + } + siblingQueues.remove(queueName); + String parentQueuePath = queueToRemove.substring(0, queueToRemove + .lastIndexOf('.')); + proposedConf.setQueues(parentQueuePath, siblingQueues.toArray( + new String[0])); + String queuesConfig = CapacitySchedulerConfiguration.PREFIX + + parentQueuePath + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.QUEUES; + if (siblingQueues.size() == 0) { + confUpdate.put(queuesConfig, null); + } else { + confUpdate.put(queuesConfig, Joiner.on(',').join(siblingQueues)); + } + for (Map.Entry confRemove : proposedConf.getValByRegex( + ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*") + .entrySet()) { + proposedConf.unset(confRemove.getKey()); + confUpdate.put(confRemove.getKey(), null); + } } } } @@ -232,13 +224,13 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, if (addInfo == null) { return; } else { - CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler(); String queuePath = addInfo.getQueue(); String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1); - if (cs.getQueue(queueName) != null) { - throw new IOException("Can't add existing queue " + queuePath); - } else if (queuePath.lastIndexOf('.') == -1) { + if (queuePath.lastIndexOf('.') == -1) { throw new IOException("Can't add invalid queue " + queuePath); + } else if (getSiblingQueues(queuePath, proposedConf).contains( + queueName)) { + throw new IOException("Can't add existing queue " + queuePath); } String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.')); String[] siblings = proposedConf.getQueues(parentQueue); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java index 065c877eeb7..13565356ee0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java @@ -18,7 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateVersionIncompatibleException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import java.io.IOException; @@ -39,36 +44,26 @@ import java.util.Map; * {@code getPendingMutations}, and replay/confirm them via * {@code confirmMutation} as in the normal case. */ -public interface YarnConfigurationStore { +public abstract class YarnConfigurationStore { + public static final Log LOG = + LogFactory.getLog(YarnConfigurationStore.class); /** * LogMutation encapsulates the fields needed for configuration mutation * audit logging and recovery. */ - class LogMutation implements Serializable { + static class LogMutation implements Serializable { private Map updates; private String user; - private long id; /** - * Create log mutation prior to logging. + * Create log mutation. * @param updates key-value configuration updates * @param user user who requested configuration change */ - public LogMutation(Map updates, String user) { - this(updates, user, 0); - } - - /** - * Create log mutation for recovery. - * @param updates key-value configuration updates - * @param user user who requested configuration change - * @param id transaction id of configuration change - */ - LogMutation(Map updates, String user, long id) { + LogMutation(Map updates, String user) { this.updates = updates; this.user = user; - this.id = id; } /** @@ -86,75 +81,92 @@ public interface YarnConfigurationStore { public String getUser() { return user; } - - /** - * Get transaction id of this configuration change. - * @return transaction id - */ - public long getId() { - return id; - } - - /** - * Set transaction id of this configuration change. - * @param id transaction id - */ - public void setId(long id) { - this.id = id; - } } /** - * Initialize the configuration store. + * Initialize the configuration store, with schedConf as the initial + * scheduler configuration. If a persisted store already exists, use the + * scheduler configuration stored there, and ignore schedConf. * @param conf configuration to initialize store with - * @param schedConf Initial key-value configuration to persist + * @param schedConf Initial key-value scheduler configuration to persist. + * @param rmContext RMContext for this configuration store * @throws IOException if initialization fails */ - void initialize(Configuration conf, Configuration schedConf) - throws IOException; + public abstract void initialize(Configuration conf, Configuration schedConf, + RMContext rmContext) throws Exception; /** - * Logs the configuration change to backing store. Generates an id associated - * with this mutation, sets it in {@code logMutation}, and returns it. + * Logs the configuration change to backing store. * @param logMutation configuration change to be persisted in write ahead log - * @return id which configuration store associates with this mutation * @throws IOException if logging fails */ - long logMutation(LogMutation logMutation) throws IOException; + public abstract void logMutation(LogMutation logMutation) throws Exception; /** * Should be called after {@code logMutation}. Gets the pending mutation - * associated with {@code id} and marks the mutation as persisted (no longer - * pending). If isValid is true, merge the mutation with the persisted + * last logged by {@code logMutation} and marks the mutation as persisted (no + * longer pending). If isValid is true, merge the mutation with the persisted * configuration. - * - * If {@code confirmMutation} is called with ids in a different order than - * was returned by {@code logMutation}, the result is implementation - * dependent. - * @param id id of mutation to be confirmed - * @param isValid if true, update persisted configuration with mutation - * associated with {@code id}. - * @return true on success - * @throws IOException if mutation confirmation fails + * @param isValid if true, update persisted configuration with pending + * mutation. + * @throws Exception if mutation confirmation fails */ - boolean confirmMutation(long id, boolean isValid) throws IOException; + public abstract void confirmMutation(boolean isValid) throws Exception; /** * Retrieve the persisted configuration. * @return configuration as key-value */ - Configuration retrieve(); - - /** - * Get the list of pending mutations, in the order they were logged. - * @return list of mutations - */ - List getPendingMutations(); + public abstract Configuration retrieve(); /** * Get a list of confirmed configuration mutations starting from a given id. * @param fromId id from which to start getting mutations, inclusive * @return list of configuration mutations */ - List getConfirmedConfHistory(long fromId); + public abstract List getConfirmedConfHistory(long fromId); + + /** + * Get schema version of persisted conf store, for detecting compatibility + * issues when changing conf store schema. + * @return Schema version currently used by the persisted configuration store. + * @throws Exception On version fetch failure + */ + protected abstract Version getConfStoreVersion() throws Exception; + + /** + * Persist the hard-coded schema version to the conf store. + * @throws Exception On storage failure + */ + protected abstract void storeVersion() throws Exception; + + /** + * Get the hard-coded schema version, for comparison against the schema + * version currently persisted. + * @return Current hard-coded schema version + */ + protected abstract Version getCurrentVersion(); + + public void checkVersion() throws Exception { + // TODO this was taken from RMStateStore. Should probably refactor + Version loadedVersion = getConfStoreVersion(); + LOG.info("Loaded configuration store version info " + loadedVersion); + if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) { + return; + } + // if there is no version info, treat it as CURRENT_VERSION_INFO; + if (loadedVersion == null) { + loadedVersion = getCurrentVersion(); + } + if (loadedVersion.isCompatibleTo(getCurrentVersion())) { + LOG.info("Storing configuration store version info " + + getCurrentVersion()); + storeVersion(); + } else { + throw new RMStateVersionIncompatibleException( + "Expecting configuration store version " + getCurrentVersion() + + ", but loading version " + loadedVersion); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java new file mode 100644 index 00000000000..a0bba8c440d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.curator.ZKCuratorManager; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.ACL; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * A Zookeeper-based implementation of {@link YarnConfigurationStore}. + */ +public class ZKConfigurationStore extends YarnConfigurationStore { + + public static final Log LOG = + LogFactory.getLog(ZKConfigurationStore.class); + + private long maxLogs; + + @VisibleForTesting + protected static final Version CURRENT_VERSION_INFO = Version + .newInstance(0, 1); + private Configuration conf; + private LogMutation pendingMutation; + + private String znodeParentPath; + + private static final String ZK_VERSION_PATH = "VERSION"; + private static final String LOGS_PATH = "LOGS"; + private static final String CONF_STORE_PATH = "CONF_STORE"; + private static final String FENCING_PATH = "FENCING"; + + private String zkVersionPath; + private String logsPath; + private String confStorePath; + private String fencingNodePath; + + @VisibleForTesting + protected ZKCuratorManager zkManager; + private List zkAcl; + + @Override + public void initialize(Configuration config, Configuration schedConf, + RMContext rmContext) throws Exception { + this.conf = config; + this.maxLogs = conf.getLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, + YarnConfiguration.DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS); + this.znodeParentPath = + conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_ZK_PARENT_PATH, + YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH); + this.zkManager = rmContext.getResourceManager().getAndStartZKManager(conf); + this.zkAcl = ZKCuratorManager.getZKAcls(conf); + + this.zkVersionPath = getNodePath(znodeParentPath, ZK_VERSION_PATH); + this.logsPath = getNodePath(znodeParentPath, LOGS_PATH); + this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH); + this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH); + + zkManager.createRootDirRecursively(znodeParentPath); + zkManager.delete(fencingNodePath); + + if (!zkManager.exists(logsPath)) { + zkManager.create(logsPath); + zkManager.setData(logsPath, + serializeObject(new LinkedList()), -1); + } + + if (!zkManager.exists(confStorePath)) { + zkManager.create(confStorePath); + HashMap mapSchedConf = new HashMap<>(); + for (Map.Entry entry : schedConf) { + mapSchedConf.put(entry.getKey(), entry.getValue()); + } + zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1); + } + } + + @VisibleForTesting + protected LinkedList getLogs() throws Exception { + return (LinkedList) + deserializeObject(zkManager.getData(logsPath)); + } + + // TODO: following version-related code is taken from ZKRMStateStore + @Override + public Version getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + @Override + public Version getConfStoreVersion() throws Exception { + if (zkManager.exists(zkVersionPath)) { + byte[] data = zkManager.getData(zkVersionPath); + return new VersionPBImpl(YarnServerCommonProtos.VersionProto + .parseFrom(data)); + } + + return null; + } + + @Override + public synchronized void storeVersion() throws Exception { + byte[] data = + ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); + + if (zkManager.exists(zkVersionPath)) { + zkManager.safeSetData(zkVersionPath, data, -1, zkAcl, fencingNodePath); + } else { + zkManager.safeCreate(zkVersionPath, data, zkAcl, CreateMode.PERSISTENT, + zkAcl, fencingNodePath); + } + } + + @Override + public void logMutation(LogMutation logMutation) throws Exception { + byte[] storedLogs = zkManager.getData(logsPath); + LinkedList logs = new LinkedList<>(); + if (storedLogs != null) { + logs = (LinkedList) deserializeObject(storedLogs); + } + logs.add(logMutation); + if (logs.size() > maxLogs) { + logs.remove(logs.removeFirst()); + } + zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl, + fencingNodePath); + pendingMutation = logMutation; + } + + @Override + public void confirmMutation(boolean isValid) + throws Exception { + if (isValid) { + Configuration storedConfigs = retrieve(); + Map mapConf = new HashMap<>(); + for (Map.Entry storedConf : storedConfigs) { + mapConf.put(storedConf.getKey(), storedConf.getValue()); + } + for (Map.Entry confChange : + pendingMutation.getUpdates().entrySet()) { + if (confChange.getValue() == null || confChange.getValue().isEmpty()) { + mapConf.remove(confChange.getKey()); + } else { + mapConf.put(confChange.getKey(), confChange.getValue()); + } + } + zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1, + zkAcl, fencingNodePath); + } + pendingMutation = null; + } + + @Override + public synchronized Configuration retrieve() { + byte[] serializedSchedConf; + try { + serializedSchedConf = zkManager.getData(confStorePath); + } catch (Exception e) { + LOG.error("Failed to retrieve configuration from zookeeper store", e); + return null; + } + try { + Map map = + (HashMap) deserializeObject(serializedSchedConf); + Configuration c = new Configuration(); + for (Map.Entry e : map.entrySet()) { + c.set(e.getKey(), e.getValue()); + } + return c; + } catch (Exception e) { + LOG.error("Exception while deserializing scheduler configuration " + + "from store", e); + } + return null; + } + + @Override + public List getConfirmedConfHistory(long fromId) { + return null; // unimplemented + } + + private static String getNodePath(String root, String nodeName) { + return ZKCuratorManager.getNodePath(root, nodeName); + } + + private static byte[] serializeObject(Object o) throws Exception { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos);) { + oos.writeObject(o); + oos.flush(); + baos.flush(); + return baos.toByteArray(); + } + } + + private static Object deserializeObject(byte[] bytes) throws Exception { + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + ObjectInputStream ois = new ObjectInputStream(bais);) { + return ois.readObject(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 1da4e653d58..d264c107d8a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -136,6 +136,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -2464,7 +2465,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol { @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) - public Response updateSchedulerConfiguration(SchedConfUpdateInfo + public synchronized Response updateSchedulerConfiguration(SchedConfUpdateInfo mutationInfo, @Context HttpServletRequest hsr) throws AuthorizationException, InterruptedException { init(); @@ -2479,17 +2480,32 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol { } ResourceScheduler scheduler = rm.getResourceScheduler(); - if (scheduler instanceof MutableConfScheduler) { + if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler) + scheduler).isConfigurationMutable()) { try { callerUGI.doAs(new PrivilegedExceptionAction() { @Override - public Void run() throws IOException, YarnException { - ((MutableConfScheduler) scheduler).updateConfiguration(callerUGI, - mutationInfo); + public Void run() throws Exception { + MutableConfigurationProvider provider = ((MutableConfScheduler) + scheduler).getMutableConfProvider(); + if (!provider.getAclMutationPolicy().isMutationAllowed(callerUGI, + mutationInfo)) { + throw new org.apache.hadoop.security.AccessControlException("User" + + " is not admin of all modified queues."); + } + provider.logAndApplyMutation(callerUGI, mutationInfo); + try { + rm.getRMContext().getRMAdminService().refreshQueues(); + } catch (IOException | YarnException e) { + provider.confirmPendingMutation(false); + throw e; + } + provider.confirmPendingMutation(true); return null; } }); } catch (IOException e) { + LOG.error("Exception thrown when modifying configuration.", e); return Response.status(Status.BAD_REQUEST).entity(e.getMessage()) .build(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java new file mode 100644 index 00000000000..bbe95700f18 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * Base class for {@link YarnConfigurationStore} implementations. + */ +public abstract class ConfigurationStoreBaseTest { + + protected YarnConfigurationStore confStore = createConfStore(); + + protected abstract YarnConfigurationStore createConfStore(); + + protected Configuration conf; + protected Configuration schedConf; + protected RMContext rmContext; + + protected static final String TEST_USER = "testUser"; + + @Before + public void setUp() throws Exception { + this.conf = new Configuration(); + this.schedConf = new Configuration(false); + } + + @Test + public void testConfigurationUpdate() throws Exception { + schedConf.set("key1", "val1"); + confStore.initialize(conf, schedConf, rmContext); + assertEquals("val1", confStore.retrieve().get("key1")); + + Map update1 = new HashMap<>(); + update1.put("keyUpdate1", "valUpdate1"); + YarnConfigurationStore.LogMutation mutation1 = + new YarnConfigurationStore.LogMutation(update1, TEST_USER); + confStore.logMutation(mutation1); + confStore.confirmMutation(true); + assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1")); + + Map update2 = new HashMap<>(); + update2.put("keyUpdate2", "valUpdate2"); + YarnConfigurationStore.LogMutation mutation2 = + new YarnConfigurationStore.LogMutation(update2, TEST_USER); + confStore.logMutation(mutation2); + confStore.confirmMutation(false); + assertNull("Configuration should not be updated", + confStore.retrieve().get("keyUpdate2")); + } + + @Test + public void testNullConfigurationUpdate() throws Exception { + schedConf.set("key", "val"); + confStore.initialize(conf, schedConf, rmContext); + assertEquals("val", confStore.retrieve().get("key")); + + Map update = new HashMap<>(); + update.put("key", null); + YarnConfigurationStore.LogMutation mutation = + new YarnConfigurationStore.LogMutation(update, TEST_USER); + confStore.logMutation(mutation); + confStore.confirmMutation(true); + assertNull(confStore.retrieve().get("key")); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java new file mode 100644 index 00000000000..c40d16a27cb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestInMemoryConfigurationStore.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +/** + * Tests {@link InMemoryConfigurationStore}. + */ +public class TestInMemoryConfigurationStore extends ConfigurationStoreBaseTest { + + @Override + protected YarnConfigurationStore createConfStore() { + return new InMemoryConfigurationStore(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java index 635a184e8ff..9b080cd1e2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -30,14 +29,11 @@ import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -82,25 +78,21 @@ public class TestMutableCSConfigurationProvider { } @Test - public void testInMemoryBackedProvider() throws IOException, YarnException { + public void testInMemoryBackedProvider() throws Exception { Configuration conf = new Configuration(); confProvider.init(conf); assertNull(confProvider.loadConfiguration(conf) .get("yarn.scheduler.capacity.root.a.goodKey")); - doNothing().when(adminService).refreshQueues(); - confProvider.mutateConfiguration(TEST_USER, goodUpdate); + confProvider.logAndApplyMutation(TEST_USER, goodUpdate); + confProvider.confirmPendingMutation(true); assertEquals("goodVal", confProvider.loadConfiguration(conf) .get("yarn.scheduler.capacity.root.a.goodKey")); assertNull(confProvider.loadConfiguration(conf).get( "yarn.scheduler.capacity.root.a.badKey")); - doThrow(new IOException()).when(adminService).refreshQueues(); - try { - confProvider.mutateConfiguration(TEST_USER, badUpdate); - } catch (IOException e) { - // Expected exception. - } + confProvider.logAndApplyMutation(TEST_USER, badUpdate); + confProvider.confirmPendingMutation(false); assertNull(confProvider.loadConfiguration(conf).get( "yarn.scheduler.capacity.root.a.badKey")); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java deleted file mode 100644 index 631ce657e84..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -public class TestYarnConfigurationStore { - - private YarnConfigurationStore confStore; - private Configuration schedConf; - - private static final String testUser = "testUser"; - - @Before - public void setUp() { - schedConf = new Configuration(false); - schedConf.set("key1", "val1"); - } - - @Test - public void testInMemoryConfigurationStore() throws IOException { - confStore = new InMemoryConfigurationStore(); - confStore.initialize(new Configuration(), schedConf); - assertEquals("val1", confStore.retrieve().get("key1")); - - Map update1 = new HashMap<>(); - update1.put("keyUpdate1", "valUpdate1"); - LogMutation mutation1 = new LogMutation(update1, testUser); - long id = confStore.logMutation(mutation1); - assertEquals(1, confStore.getPendingMutations().size()); - confStore.confirmMutation(id, true); - assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1")); - assertEquals(0, confStore.getPendingMutations().size()); - - Map update2 = new HashMap<>(); - update2.put("keyUpdate2", "valUpdate2"); - LogMutation mutation2 = new LogMutation(update2, testUser); - id = confStore.logMutation(mutation2); - assertEquals(1, confStore.getPendingMutations().size()); - confStore.confirmMutation(id, false); - assertNull("Configuration should not be updated", - confStore.retrieve().get("keyUpdate2")); - assertEquals(0, confStore.getPendingMutations().size()); - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java new file mode 100644 index 00000000000..3cfa8da8047 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * Tests {@link ZKConfigurationStore}. + */ +public class TestZKConfigurationStore extends ConfigurationStoreBaseTest { + + public static final Log LOG = + LogFactory.getLog(TestZKConfigurationStore.class); + + private static final int ZK_TIMEOUT_MS = 10000; + private TestingServer curatorTestingServer; + private CuratorFramework curatorFramework; + private ResourceManager rm; + + public static TestingServer setupCuratorServer() throws Exception { + TestingServer curatorTestingServer = new TestingServer(); + curatorTestingServer.start(); + return curatorTestingServer; + } + + public static CuratorFramework setupCuratorFramework( + TestingServer curatorTestingServer) throws Exception { + CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() + .connectString(curatorTestingServer.getConnectString()) + .retryPolicy(new RetryNTimes(100, 100)) + .build(); + curatorFramework.start(); + return curatorFramework; + } + + @Before + public void setUp() throws Exception { + super.setUp(); + curatorTestingServer = setupCuratorServer(); + curatorFramework = setupCuratorFramework(curatorTestingServer); + + conf.set(CommonConfigurationKeys.ZK_ADDRESS, + curatorTestingServer.getConnectString()); + rm = new MockRM(conf); + rm.start(); + rmContext = rm.getRMContext(); + } + + @After + public void cleanup() throws IOException { + rm.stop(); + curatorFramework.close(); + curatorTestingServer.stop(); + } + + @Test + public void testVersioning() throws Exception { + confStore.initialize(conf, schedConf, rmContext); + assertNull(confStore.getConfStoreVersion()); + confStore.checkVersion(); + assertEquals(ZKConfigurationStore.CURRENT_VERSION_INFO, + confStore.getConfStoreVersion()); + } + + @Test + public void testPersistConfiguration() throws Exception { + schedConf.set("key", "val"); + confStore.initialize(conf, schedConf, rmContext); + assertEquals("val", confStore.retrieve().get("key")); + + // Create a new configuration store, and check for old configuration + confStore = createConfStore(); + schedConf.set("key", "badVal"); + // Should ignore passed-in scheduler configuration. + confStore.initialize(conf, schedConf, rmContext); + assertEquals("val", confStore.retrieve().get("key")); + } + + + @Test + public void testPersistUpdatedConfiguration() throws Exception { + confStore.initialize(conf, schedConf, rmContext); + assertNull(confStore.retrieve().get("key")); + + Map update = new HashMap<>(); + update.put("key", "val"); + YarnConfigurationStore.LogMutation mutation = + new YarnConfigurationStore.LogMutation(update, TEST_USER); + confStore.logMutation(mutation); + confStore.confirmMutation(true); + assertEquals("val", confStore.retrieve().get("key")); + + // Create a new configuration store, and check for updated configuration + confStore = createConfStore(); + schedConf.set("key", "badVal"); + // Should ignore passed-in scheduler configuration. + confStore.initialize(conf, schedConf, rmContext); + assertEquals("val", confStore.retrieve().get("key")); + } + + @Test + public void testMaxLogs() throws Exception { + conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2); + confStore.initialize(conf, schedConf, rmContext); + LinkedList logs = + ((ZKConfigurationStore) confStore).getLogs(); + assertEquals(0, logs.size()); + + Map update1 = new HashMap<>(); + update1.put("key1", "val1"); + YarnConfigurationStore.LogMutation mutation = + new YarnConfigurationStore.LogMutation(update1, TEST_USER); + confStore.logMutation(mutation); + logs = ((ZKConfigurationStore) confStore).getLogs(); + assertEquals(1, logs.size()); + assertEquals("val1", logs.get(0).getUpdates().get("key1")); + confStore.confirmMutation(true); + assertEquals(1, logs.size()); + assertEquals("val1", logs.get(0).getUpdates().get("key1")); + + Map update2 = new HashMap<>(); + update2.put("key2", "val2"); + mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER); + confStore.logMutation(mutation); + logs = ((ZKConfigurationStore) confStore).getLogs(); + assertEquals(2, logs.size()); + assertEquals("val1", logs.get(0).getUpdates().get("key1")); + assertEquals("val2", logs.get(1).getUpdates().get("key2")); + confStore.confirmMutation(true); + assertEquals(2, logs.size()); + assertEquals("val1", logs.get(0).getUpdates().get("key1")); + assertEquals("val2", logs.get(1).getUpdates().get("key2")); + + // Next update should purge first update from logs. + Map update3 = new HashMap<>(); + update3.put("key3", "val3"); + mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER); + confStore.logMutation(mutation); + logs = ((ZKConfigurationStore) confStore).getLogs(); + assertEquals(2, logs.size()); + assertEquals("val2", logs.get(0).getUpdates().get("key2")); + assertEquals("val3", logs.get(1).getUpdates().get("key3")); + confStore.confirmMutation(true); + assertEquals(2, logs.size()); + assertEquals("val2", logs.get(0).getUpdates().get("key2")); + assertEquals("val3", logs.get(1).getUpdates().get("key3")); + } + + public Configuration createRMHAConf(String rmIds, String rmId, + int adminPort) { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, rmIds); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(CapacitySchedulerConfiguration.CS_CONF_PROVIDER, + CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER); + conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, + YarnConfiguration.ZK_CONFIGURATION_STORE); + conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName()); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, + curatorTestingServer.getConnectString()); + conf.set(YarnConfiguration.RM_HA_ID, rmId); + conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0"); + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + for (String rpcAddress : + YarnConfiguration.getServiceAddressConfKeys(conf)) { + for (String id : HAUtil.getRMHAIds(conf)) { + conf.set(HAUtil.addSuffix(rpcAddress, id), "localhost:0"); + } + } + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, rmId), + "localhost:" + adminPort); + return conf; + } + + /** + * When failing over, new active RM should read from current state of store, + * including any updates when the new active RM was in standby. + * @throws Exception + */ + @Test + public void testFailoverReadsFromUpdatedStore() throws Exception { + HAServiceProtocol.StateChangeRequestInfo req = + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + Configuration conf1 = createRMHAConf("rm1,rm2", "rm1", 1234); + ResourceManager rm1 = new MockRM(conf1); + rm1.start(); + rm1.getRMContext().getRMAdminService().transitionToActive(req); + assertEquals("RM with ZKStore didn't start", + Service.STATE.STARTED, rm1.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm1.getRMContext().getRMAdminService().getServiceStatus().getState()); + assertNull(((MutableConfScheduler) rm1.getResourceScheduler()) + .getConfiguration().get("key")); + + Configuration conf2 = createRMHAConf("rm1,rm2", "rm2", 5678); + ResourceManager rm2 = new MockRM(conf2); + rm2.start(); + assertEquals("RM should be Standby", + HAServiceProtocol.HAServiceState.STANDBY, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + + // Update configuration on RM1 + SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo(); + schedConfUpdateInfo.getGlobalParams().put("key", "val"); + MutableConfigurationProvider confProvider = ((MutableConfScheduler) + rm1.getResourceScheduler()).getMutableConfProvider(); + UserGroupInformation user = UserGroupInformation + .createUserForTesting(TEST_USER, new String[0]); + confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext()); + assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler()) + .getConfiguration().get("key")); + confProvider.confirmPendingMutation(true); + assertEquals("val", ((MutableCSConfigurationProvider) confProvider) + .getConfStore().retrieve().get("key")); + // Next update is not persisted, it should not be recovered + schedConfUpdateInfo.getGlobalParams().put("key", "badVal"); + confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + + // Start RM2 and verifies it starts with updated configuration + rm2.getRMContext().getRMAdminService().transitionToActive(req); + assertEquals("RM with ZKStore didn't start", + Service.STATE.STARTED, rm2.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + + for (int i = 0; i < ZK_TIMEOUT_MS / 50; i++) { + if (HAServiceProtocol.HAServiceState.ACTIVE == + rm1.getRMContext().getRMAdminService().getServiceStatus() + .getState()) { + Thread.sleep(100); + } + } + assertEquals("RM should have been fenced", + HAServiceProtocol.HAServiceState.STANDBY, + rm1.getRMContext().getRMAdminService().getServiceStatus().getState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + + assertEquals("val", ((MutableCSConfigurationProvider) ( + (CapacityScheduler) rm2.getResourceScheduler()) + .getMutableConfProvider()).getConfStore().retrieve().get("key")); + assertEquals("val", ((MutableConfScheduler) rm2.getResourceScheduler()) + .getConfiguration().get("key")); + // Transition to standby will set RM's HA status and then reinitialize in + // a separate thread. Despite asserting for STANDBY state, it's + // possible for reinitialization to be unfinished. Wait here for it to + // finish, otherwise closing rm1 will close zkManager and the unfinished + // reinitialization will throw an exception. + Thread.sleep(10000); + rm1.close(); + rm2.close(); + } + + @Override + public YarnConfigurationStore createConfStore() { + return new ZKConfigurationStore(); + } +}