diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java index f8e8814377a..2b9b25ac090 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; +import java.io.IOException; + /** * Interface for allowing changing scheduler configurations. */ @@ -55,4 +57,10 @@ public interface MutableConfigurationProvider { * @throws Exception if confirming mutation fails */ void confirmPendingMutation(boolean isValid) throws Exception; + + /** + * Closes the configuration provider, releasing any required resources. + * @throws IOException on failure to close + */ + void close() throws IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 16b27c100dd..de951795eb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -411,6 +411,9 @@ public class CapacityScheduler extends writeLock.unlock(); } + if (isConfigurationMutable()) { + ((MutableConfigurationProvider) csConfProvider).close(); + } super.serviceStop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java index 1b0eb9f1a9c..21de7a26d61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -72,7 +73,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { private long maxLogs; private Configuration conf; private LogMutation pendingMutation; - private static final Version CURRENT_VERSION_INFO = Version + @VisibleForTesting + protected static final Version CURRENT_VERSION_INFO = Version .newInstance(0, 1); private Timer compactionTimer; private long compactionIntervalMsec; @@ -82,7 +84,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { RMContext rmContext) throws IOException { this.conf = config; try { - this.db = initDatabase(schedConf); + initDatabase(schedConf); this.maxLogs = config.getLong( YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS); @@ -96,7 +98,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { } } - private DB initDatabase(Configuration config) throws Exception { + private void initDatabase(Configuration config) throws Exception { Path storeRoot = createStorageDir(); Options options = new Options(); options.createIfMissing(false); @@ -108,13 +110,13 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { if (key1Str.equals(key2Str)) { return 0; } else if (key1Str.equals(VERSION_KEY)) { - return -1; + return 1; } else if (key2Str.equals(VERSION_KEY)) { - return 1; - } else if (key1Str.equals(LOG_KEY)) { return -1; - } else if (key2Str.equals(LOG_KEY)) { + } else if (key1Str.equals(LOG_KEY)) { return 1; + } else if (key2Str.equals(LOG_KEY)) { + return -1; } return key1Str.compareTo(key2Str); } @@ -156,7 +158,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { throw e; } } - return db; } private Path createStorageDir() throws IOException { @@ -175,6 +176,13 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { return new Path(storePath, DB_NAME); } + @Override + public void close() throws IOException { + if (db != null) { + db.close(); + } + } + @Override public void logMutation(LogMutation logMutation) throws IOException { LinkedList logs = deserLogMutations(db.get(bytes(LOG_KEY))); @@ -212,8 +220,12 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { return baos.toByteArray(); } } + private LinkedList deserLogMutations(byte[] mutations) throws IOException { + if (mutations == null) { + return new LinkedList<>(); + } try (ObjectInput input = new ObjectInputStream( new ByteArrayInputStream(mutations))) { return (LinkedList) input.readObject(); @@ -225,13 +237,16 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { @Override public synchronized Configuration retrieve() { DBIterator itr = db.iterator(); - itr.seek(bytes(LOG_KEY)); + itr.seekToFirst(); Configuration config = new Configuration(false); - itr.next(); while (itr.hasNext()) { Map.Entry entry = itr.next(); - config.set(new String(entry.getKey(), StandardCharsets.UTF_8), - new String(entry.getValue(), StandardCharsets.UTF_8)); + String key = new String(entry.getKey(), StandardCharsets.UTF_8); + String value = new String(entry.getValue(), StandardCharsets.UTF_8); + if (key.equals(LOG_KEY) || key.equals(VERSION_KEY)) { + break; + } + config.set(key, value); } return config; } @@ -268,6 +283,11 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { return version; } + @VisibleForTesting + protected LinkedList getLogs() throws Exception { + return deserLogMutations(db.get(bytes(LOG_KEY))); + } + @Override public void storeVersion() throws Exception { String key = VERSION_KEY; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java index 70d18403b51..ccadf76f950 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -92,6 +92,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, } try { confStore.initialize(config, schedConf, rmContext); + confStore.checkVersion(); } catch (Exception e) { throw new IOException(e); } @@ -103,6 +104,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, aclMutationPolicy.init(config, rmContext); } + @Override + public void close() throws IOException { + confStore.close(); + } + @VisibleForTesting public YarnConfigurationStore getConfStore() { return confStore; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java index 13565356ee0..7fb52fcccd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java @@ -95,6 +95,12 @@ public abstract class YarnConfigurationStore { public abstract void initialize(Configuration conf, Configuration schedConf, RMContext rmContext) throws Exception; + /** + * Closes the configuration store, releasing any required resources. + * @throws IOException on failure to close + */ + public void close() throws IOException {} + /** * Logs the configuration change to backing store. * @param logMutation configuration change to be persisted in write ahead log diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java index bbe95700f18..8f3bc710837 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java @@ -71,6 +71,7 @@ public abstract class ConfigurationStoreBaseTest { confStore.confirmMutation(false); assertNull("Configuration should not be updated", confStore.retrieve().get("keyUpdate2")); + confStore.close(); } @Test @@ -86,5 +87,6 @@ public abstract class ConfigurationStoreBaseTest { confStore.logMutation(mutation); confStore.confirmMutation(true); assertNull(confStore.retrieve().get("key")); + confStore.close(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java new file mode 100644 index 00000000000..779208a3fb2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * Tests {@link LeveldbConfigurationStore}. + */ +public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest { + + public static final Log LOG = + LogFactory.getLog(TestLeveldbConfigurationStore.class); + private static final File TEST_DIR = new File( + System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")), + TestLeveldbConfigurationStore.class.getName()); + + private ResourceManager rm; + + @Before + public void setUp() throws Exception { + super.setUp(); + FileUtil.fullyDelete(TEST_DIR); + conf.set(CapacitySchedulerConfiguration.CS_CONF_PROVIDER, + CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER); + conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, + YarnConfiguration.LEVELDB_CONFIGURATION_STORE); + conf.set(YarnConfiguration.RM_SCHEDCONF_STORE_PATH, TEST_DIR.toString()); + } + + @Test + public void testVersioning() throws Exception { + confStore.initialize(conf, schedConf, rmContext); + assertNull(confStore.getConfStoreVersion()); + confStore.checkVersion(); + assertEquals(LeveldbConfigurationStore.CURRENT_VERSION_INFO, + confStore.getConfStoreVersion()); + confStore.close(); + } + + @Test + public void testPersistConfiguration() throws Exception { + schedConf.set("key", "val"); + confStore.initialize(conf, schedConf, rmContext); + assertEquals("val", confStore.retrieve().get("key")); + confStore.close(); + + // Create a new configuration store, and check for old configuration + confStore = createConfStore(); + schedConf.set("key", "badVal"); + // Should ignore passed-in scheduler configuration. + confStore.initialize(conf, schedConf, rmContext); + assertEquals("val", confStore.retrieve().get("key")); + confStore.close(); + } + + @Test + public void testPersistUpdatedConfiguration() throws Exception { + confStore.initialize(conf, schedConf, rmContext); + assertNull(confStore.retrieve().get("key")); + + Map update = new HashMap<>(); + update.put("key", "val"); + YarnConfigurationStore.LogMutation mutation = + new YarnConfigurationStore.LogMutation(update, TEST_USER); + confStore.logMutation(mutation); + confStore.confirmMutation(true); + assertEquals("val", confStore.retrieve().get("key")); + confStore.close(); + + // Create a new configuration store, and check for updated configuration + confStore = createConfStore(); + schedConf.set("key", "badVal"); + // Should ignore passed-in scheduler configuration. + confStore.initialize(conf, schedConf, rmContext); + assertEquals("val", confStore.retrieve().get("key")); + confStore.close(); + } + + @Test + public void testMaxLogs() throws Exception { + conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2); + confStore.initialize(conf, schedConf, rmContext); + LinkedList logs = + ((LeveldbConfigurationStore) confStore).getLogs(); + assertEquals(0, logs.size()); + + Map update1 = new HashMap<>(); + update1.put("key1", "val1"); + YarnConfigurationStore.LogMutation mutation = + new YarnConfigurationStore.LogMutation(update1, TEST_USER); + confStore.logMutation(mutation); + logs = ((LeveldbConfigurationStore) confStore).getLogs(); + assertEquals(1, logs.size()); + assertEquals("val1", logs.get(0).getUpdates().get("key1")); + confStore.confirmMutation(true); + assertEquals(1, logs.size()); + assertEquals("val1", logs.get(0).getUpdates().get("key1")); + + Map update2 = new HashMap<>(); + update2.put("key2", "val2"); + mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER); + confStore.logMutation(mutation); + logs = ((LeveldbConfigurationStore) confStore).getLogs(); + assertEquals(2, logs.size()); + assertEquals("val1", logs.get(0).getUpdates().get("key1")); + assertEquals("val2", logs.get(1).getUpdates().get("key2")); + confStore.confirmMutation(true); + assertEquals(2, logs.size()); + assertEquals("val1", logs.get(0).getUpdates().get("key1")); + assertEquals("val2", logs.get(1).getUpdates().get("key2")); + + // Next update should purge first update from logs. + Map update3 = new HashMap<>(); + update3.put("key3", "val3"); + mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER); + confStore.logMutation(mutation); + logs = ((LeveldbConfigurationStore) confStore).getLogs(); + assertEquals(2, logs.size()); + assertEquals("val2", logs.get(0).getUpdates().get("key2")); + assertEquals("val3", logs.get(1).getUpdates().get("key3")); + confStore.confirmMutation(true); + assertEquals(2, logs.size()); + assertEquals("val2", logs.get(0).getUpdates().get("key2")); + assertEquals("val3", logs.get(1).getUpdates().get("key3")); + confStore.close(); + } + + /** + * When restarting, RM should read from current state of store, including + * any updates from the previous RM instance. + * @throws Exception + */ + @Test + public void testRestartReadsFromUpdatedStore() throws Exception { + ResourceManager rm1 = new MockRM(conf); + rm1.start(); + assertNull(((MutableConfScheduler) rm1.getResourceScheduler()) + .getConfiguration().get("key")); + + // Update configuration on RM + SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo(); + schedConfUpdateInfo.getGlobalParams().put("key", "val"); + MutableConfigurationProvider confProvider = ((MutableConfScheduler) + rm1.getResourceScheduler()).getMutableConfProvider(); + UserGroupInformation user = UserGroupInformation + .createUserForTesting(TEST_USER, new String[0]); + confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + rm1.getResourceScheduler().reinitialize(conf, rm1.getRMContext()); + assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler()) + .getConfiguration().get("key")); + confProvider.confirmPendingMutation(true); + assertEquals("val", ((MutableCSConfigurationProvider) confProvider) + .getConfStore().retrieve().get("key")); + // Next update is not persisted, it should not be recovered + schedConfUpdateInfo.getGlobalParams().put("key", "badVal"); + confProvider.logAndApplyMutation(user, schedConfUpdateInfo); + rm1.close(); + + // Start RM2 and verifies it starts with updated configuration + ResourceManager rm2 = new MockRM(conf); + rm2.start(); + assertEquals("val", ((MutableCSConfigurationProvider) ( + (CapacityScheduler) rm2.getResourceScheduler()) + .getMutableConfProvider()).getConfStore().retrieve().get("key")); + assertEquals("val", ((MutableConfScheduler) rm2.getResourceScheduler()) + .getConfiguration().get("key")); + rm2.close(); + } + + @Override + public YarnConfigurationStore createConfStore() { + return new LeveldbConfigurationStore(); + } +}