YARN-7046. Add closing logic to configuration store

This commit is contained in:
Jonathan Hung 2017-09-22 10:18:27 -07:00
parent ff39c0de20
commit d6622daaa3
7 changed files with 270 additions and 12 deletions

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import java.io.IOException;
/** /**
* Interface for allowing changing scheduler configurations. * Interface for allowing changing scheduler configurations.
*/ */
@ -55,4 +57,10 @@ public interface MutableConfigurationProvider {
* @throws Exception if confirming mutation fails * @throws Exception if confirming mutation fails
*/ */
void confirmPendingMutation(boolean isValid) throws Exception; void confirmPendingMutation(boolean isValid) throws Exception;
/**
* Closes the configuration provider, releasing any required resources.
* @throws IOException on failure to close
*/
void close() throws IOException;
} }

View File

@ -411,6 +411,9 @@ public class CapacityScheduler extends
writeLock.unlock(); writeLock.unlock();
} }
if (isConfigurationMutable()) {
((MutableConfigurationProvider) csConfProvider).close();
}
super.serviceStop(); super.serviceStop();
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -72,7 +73,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
private long maxLogs; private long maxLogs;
private Configuration conf; private Configuration conf;
private LogMutation pendingMutation; private LogMutation pendingMutation;
private static final Version CURRENT_VERSION_INFO = Version @VisibleForTesting
protected static final Version CURRENT_VERSION_INFO = Version
.newInstance(0, 1); .newInstance(0, 1);
private Timer compactionTimer; private Timer compactionTimer;
private long compactionIntervalMsec; private long compactionIntervalMsec;
@ -82,7 +84,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
RMContext rmContext) throws IOException { RMContext rmContext) throws IOException {
this.conf = config; this.conf = config;
try { try {
this.db = initDatabase(schedConf); initDatabase(schedConf);
this.maxLogs = config.getLong( this.maxLogs = config.getLong(
YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS); YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
@ -96,7 +98,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
} }
} }
private DB initDatabase(Configuration config) throws Exception { private void initDatabase(Configuration config) throws Exception {
Path storeRoot = createStorageDir(); Path storeRoot = createStorageDir();
Options options = new Options(); Options options = new Options();
options.createIfMissing(false); options.createIfMissing(false);
@ -108,13 +110,13 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
if (key1Str.equals(key2Str)) { if (key1Str.equals(key2Str)) {
return 0; return 0;
} else if (key1Str.equals(VERSION_KEY)) { } else if (key1Str.equals(VERSION_KEY)) {
return -1; return 1;
} else if (key2Str.equals(VERSION_KEY)) { } else if (key2Str.equals(VERSION_KEY)) {
return 1;
} else if (key1Str.equals(LOG_KEY)) {
return -1; return -1;
} else if (key2Str.equals(LOG_KEY)) { } else if (key1Str.equals(LOG_KEY)) {
return 1; return 1;
} else if (key2Str.equals(LOG_KEY)) {
return -1;
} }
return key1Str.compareTo(key2Str); return key1Str.compareTo(key2Str);
} }
@ -156,7 +158,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
throw e; throw e;
} }
} }
return db;
} }
private Path createStorageDir() throws IOException { private Path createStorageDir() throws IOException {
@ -175,6 +176,13 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
return new Path(storePath, DB_NAME); return new Path(storePath, DB_NAME);
} }
@Override
public void close() throws IOException {
if (db != null) {
db.close();
}
}
@Override @Override
public void logMutation(LogMutation logMutation) throws IOException { public void logMutation(LogMutation logMutation) throws IOException {
LinkedList<LogMutation> logs = deserLogMutations(db.get(bytes(LOG_KEY))); LinkedList<LogMutation> logs = deserLogMutations(db.get(bytes(LOG_KEY)));
@ -212,8 +220,12 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
return baos.toByteArray(); return baos.toByteArray();
} }
} }
private LinkedList<LogMutation> deserLogMutations(byte[] mutations) throws private LinkedList<LogMutation> deserLogMutations(byte[] mutations) throws
IOException { IOException {
if (mutations == null) {
return new LinkedList<>();
}
try (ObjectInput input = new ObjectInputStream( try (ObjectInput input = new ObjectInputStream(
new ByteArrayInputStream(mutations))) { new ByteArrayInputStream(mutations))) {
return (LinkedList<LogMutation>) input.readObject(); return (LinkedList<LogMutation>) input.readObject();
@ -225,13 +237,16 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
@Override @Override
public synchronized Configuration retrieve() { public synchronized Configuration retrieve() {
DBIterator itr = db.iterator(); DBIterator itr = db.iterator();
itr.seek(bytes(LOG_KEY)); itr.seekToFirst();
Configuration config = new Configuration(false); Configuration config = new Configuration(false);
itr.next();
while (itr.hasNext()) { while (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next(); Map.Entry<byte[], byte[]> entry = itr.next();
config.set(new String(entry.getKey(), StandardCharsets.UTF_8), String key = new String(entry.getKey(), StandardCharsets.UTF_8);
new String(entry.getValue(), StandardCharsets.UTF_8)); String value = new String(entry.getValue(), StandardCharsets.UTF_8);
if (key.equals(LOG_KEY) || key.equals(VERSION_KEY)) {
break;
}
config.set(key, value);
} }
return config; return config;
} }
@ -268,6 +283,11 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
return version; return version;
} }
@VisibleForTesting
protected LinkedList<LogMutation> getLogs() throws Exception {
return deserLogMutations(db.get(bytes(LOG_KEY)));
}
@Override @Override
public void storeVersion() throws Exception { public void storeVersion() throws Exception {
String key = VERSION_KEY; String key = VERSION_KEY;

View File

@ -92,6 +92,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
} }
try { try {
confStore.initialize(config, schedConf, rmContext); confStore.initialize(config, schedConf, rmContext);
confStore.checkVersion();
} catch (Exception e) { } catch (Exception e) {
throw new IOException(e); throw new IOException(e);
} }
@ -103,6 +104,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
aclMutationPolicy.init(config, rmContext); aclMutationPolicy.init(config, rmContext);
} }
@Override
public void close() throws IOException {
confStore.close();
}
@VisibleForTesting @VisibleForTesting
public YarnConfigurationStore getConfStore() { public YarnConfigurationStore getConfStore() {
return confStore; return confStore;

View File

@ -95,6 +95,12 @@ public abstract class YarnConfigurationStore {
public abstract void initialize(Configuration conf, Configuration schedConf, public abstract void initialize(Configuration conf, Configuration schedConf,
RMContext rmContext) throws Exception; RMContext rmContext) throws Exception;
/**
* Closes the configuration store, releasing any required resources.
* @throws IOException on failure to close
*/
public void close() throws IOException {}
/** /**
* Logs the configuration change to backing store. * Logs the configuration change to backing store.
* @param logMutation configuration change to be persisted in write ahead log * @param logMutation configuration change to be persisted in write ahead log

View File

@ -71,6 +71,7 @@ public abstract class ConfigurationStoreBaseTest {
confStore.confirmMutation(false); confStore.confirmMutation(false);
assertNull("Configuration should not be updated", assertNull("Configuration should not be updated",
confStore.retrieve().get("keyUpdate2")); confStore.retrieve().get("keyUpdate2"));
confStore.close();
} }
@Test @Test
@ -86,5 +87,6 @@ public abstract class ConfigurationStoreBaseTest {
confStore.logMutation(mutation); confStore.logMutation(mutation);
confStore.confirmMutation(true); confStore.confirmMutation(true);
assertNull(confStore.retrieve().get("key")); assertNull(confStore.retrieve().get("key"));
confStore.close();
} }
} }

View File

@ -0,0 +1,213 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
/**
* Tests {@link LeveldbConfigurationStore}.
*/
public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
public static final Log LOG =
LogFactory.getLog(TestLeveldbConfigurationStore.class);
private static final File TEST_DIR = new File(
System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestLeveldbConfigurationStore.class.getName());
private ResourceManager rm;
@Before
public void setUp() throws Exception {
super.setUp();
FileUtil.fullyDelete(TEST_DIR);
conf.set(CapacitySchedulerConfiguration.CS_CONF_PROVIDER,
CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER);
conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
YarnConfiguration.LEVELDB_CONFIGURATION_STORE);
conf.set(YarnConfiguration.RM_SCHEDCONF_STORE_PATH, TEST_DIR.toString());
}
@Test
public void testVersioning() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
assertNull(confStore.getConfStoreVersion());
confStore.checkVersion();
assertEquals(LeveldbConfigurationStore.CURRENT_VERSION_INFO,
confStore.getConfStoreVersion());
confStore.close();
}
@Test
public void testPersistConfiguration() throws Exception {
schedConf.set("key", "val");
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
confStore.close();
// Create a new configuration store, and check for old configuration
confStore = createConfStore();
schedConf.set("key", "badVal");
// Should ignore passed-in scheduler configuration.
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
confStore.close();
}
@Test
public void testPersistUpdatedConfiguration() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
assertNull(confStore.retrieve().get("key"));
Map<String, String> update = new HashMap<>();
update.put("key", "val");
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
confStore.confirmMutation(true);
assertEquals("val", confStore.retrieve().get("key"));
confStore.close();
// Create a new configuration store, and check for updated configuration
confStore = createConfStore();
schedConf.set("key", "badVal");
// Should ignore passed-in scheduler configuration.
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
confStore.close();
}
@Test
public void testMaxLogs() throws Exception {
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
confStore.initialize(conf, schedConf, rmContext);
LinkedList<YarnConfigurationStore.LogMutation> logs =
((LeveldbConfigurationStore) confStore).getLogs();
assertEquals(0, logs.size());
Map<String, String> update1 = new HashMap<>();
update1.put("key1", "val1");
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update1, TEST_USER);
confStore.logMutation(mutation);
logs = ((LeveldbConfigurationStore) confStore).getLogs();
assertEquals(1, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
confStore.confirmMutation(true);
assertEquals(1, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
Map<String, String> update2 = new HashMap<>();
update2.put("key2", "val2");
mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER);
confStore.logMutation(mutation);
logs = ((LeveldbConfigurationStore) confStore).getLogs();
assertEquals(2, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
assertEquals("val2", logs.get(1).getUpdates().get("key2"));
confStore.confirmMutation(true);
assertEquals(2, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
assertEquals("val2", logs.get(1).getUpdates().get("key2"));
// Next update should purge first update from logs.
Map<String, String> update3 = new HashMap<>();
update3.put("key3", "val3");
mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER);
confStore.logMutation(mutation);
logs = ((LeveldbConfigurationStore) confStore).getLogs();
assertEquals(2, logs.size());
assertEquals("val2", logs.get(0).getUpdates().get("key2"));
assertEquals("val3", logs.get(1).getUpdates().get("key3"));
confStore.confirmMutation(true);
assertEquals(2, logs.size());
assertEquals("val2", logs.get(0).getUpdates().get("key2"));
assertEquals("val3", logs.get(1).getUpdates().get("key3"));
confStore.close();
}
/**
* When restarting, RM should read from current state of store, including
* any updates from the previous RM instance.
* @throws Exception
*/
@Test
public void testRestartReadsFromUpdatedStore() throws Exception {
ResourceManager rm1 = new MockRM(conf);
rm1.start();
assertNull(((MutableConfScheduler) rm1.getResourceScheduler())
.getConfiguration().get("key"));
// Update configuration on RM
SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo();
schedConfUpdateInfo.getGlobalParams().put("key", "val");
MutableConfigurationProvider confProvider = ((MutableConfScheduler)
rm1.getResourceScheduler()).getMutableConfProvider();
UserGroupInformation user = UserGroupInformation
.createUserForTesting(TEST_USER, new String[0]);
confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
rm1.getResourceScheduler().reinitialize(conf, rm1.getRMContext());
assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler())
.getConfiguration().get("key"));
confProvider.confirmPendingMutation(true);
assertEquals("val", ((MutableCSConfigurationProvider) confProvider)
.getConfStore().retrieve().get("key"));
// Next update is not persisted, it should not be recovered
schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
rm1.close();
// Start RM2 and verifies it starts with updated configuration
ResourceManager rm2 = new MockRM(conf);
rm2.start();
assertEquals("val", ((MutableCSConfigurationProvider) (
(CapacityScheduler) rm2.getResourceScheduler())
.getMutableConfProvider()).getConfStore().retrieve().get("key"));
assertEquals("val", ((MutableConfScheduler) rm2.getResourceScheduler())
.getConfiguration().get("key"));
rm2.close();
}
@Override
public YarnConfigurationStore createConfStore() {
return new LeveldbConfigurationStore();
}
}