YARN-7046. Add closing logic to configuration store
This commit is contained in:
parent
887b366d5f
commit
c8f9fe88d5
|
@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Interface for allowing changing scheduler configurations.
|
||||
*/
|
||||
|
@ -55,4 +57,10 @@ public interface MutableConfigurationProvider {
|
|||
* @throws Exception if confirming mutation fails
|
||||
*/
|
||||
void confirmPendingMutation(boolean isValid) throws Exception;
|
||||
|
||||
/**
|
||||
* Closes the configuration provider, releasing any required resources.
|
||||
* @throws IOException on failure to close
|
||||
*/
|
||||
void close() throws IOException;
|
||||
}
|
||||
|
|
|
@ -412,6 +412,9 @@ public class CapacityScheduler extends
|
|||
writeLock.unlock();
|
||||
}
|
||||
|
||||
if (isConfigurationMutable()) {
|
||||
((MutableConfigurationProvider) csConfProvider).close();
|
||||
}
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -72,7 +73,8 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|||
private long maxLogs;
|
||||
private Configuration conf;
|
||||
private LogMutation pendingMutation;
|
||||
private static final Version CURRENT_VERSION_INFO = Version
|
||||
@VisibleForTesting
|
||||
protected static final Version CURRENT_VERSION_INFO = Version
|
||||
.newInstance(0, 1);
|
||||
private Timer compactionTimer;
|
||||
private long compactionIntervalMsec;
|
||||
|
@ -82,7 +84,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|||
RMContext rmContext) throws IOException {
|
||||
this.conf = config;
|
||||
try {
|
||||
this.db = initDatabase(schedConf);
|
||||
initDatabase(schedConf);
|
||||
this.maxLogs = config.getLong(
|
||||
YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
|
||||
|
@ -96,7 +98,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|||
}
|
||||
}
|
||||
|
||||
private DB initDatabase(Configuration config) throws Exception {
|
||||
private void initDatabase(Configuration config) throws Exception {
|
||||
Path storeRoot = createStorageDir();
|
||||
Options options = new Options();
|
||||
options.createIfMissing(false);
|
||||
|
@ -108,13 +110,13 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|||
if (key1Str.equals(key2Str)) {
|
||||
return 0;
|
||||
} else if (key1Str.equals(VERSION_KEY)) {
|
||||
return -1;
|
||||
return 1;
|
||||
} else if (key2Str.equals(VERSION_KEY)) {
|
||||
return 1;
|
||||
} else if (key1Str.equals(LOG_KEY)) {
|
||||
return -1;
|
||||
} else if (key2Str.equals(LOG_KEY)) {
|
||||
} else if (key1Str.equals(LOG_KEY)) {
|
||||
return 1;
|
||||
} else if (key2Str.equals(LOG_KEY)) {
|
||||
return -1;
|
||||
}
|
||||
return key1Str.compareTo(key2Str);
|
||||
}
|
||||
|
@ -156,7 +158,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
return db;
|
||||
}
|
||||
|
||||
private Path createStorageDir() throws IOException {
|
||||
|
@ -175,6 +176,13 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|||
return new Path(storePath, DB_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (db != null) {
|
||||
db.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void logMutation(LogMutation logMutation) throws IOException {
|
||||
LinkedList<LogMutation> logs = deserLogMutations(db.get(bytes(LOG_KEY)));
|
||||
|
@ -212,8 +220,12 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|||
return baos.toByteArray();
|
||||
}
|
||||
}
|
||||
|
||||
private LinkedList<LogMutation> deserLogMutations(byte[] mutations) throws
|
||||
IOException {
|
||||
if (mutations == null) {
|
||||
return new LinkedList<>();
|
||||
}
|
||||
try (ObjectInput input = new ObjectInputStream(
|
||||
new ByteArrayInputStream(mutations))) {
|
||||
return (LinkedList<LogMutation>) input.readObject();
|
||||
|
@ -225,13 +237,16 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|||
@Override
|
||||
public synchronized Configuration retrieve() {
|
||||
DBIterator itr = db.iterator();
|
||||
itr.seek(bytes(LOG_KEY));
|
||||
itr.seekToFirst();
|
||||
Configuration config = new Configuration(false);
|
||||
itr.next();
|
||||
while (itr.hasNext()) {
|
||||
Map.Entry<byte[], byte[]> entry = itr.next();
|
||||
config.set(new String(entry.getKey(), StandardCharsets.UTF_8),
|
||||
new String(entry.getValue(), StandardCharsets.UTF_8));
|
||||
String key = new String(entry.getKey(), StandardCharsets.UTF_8);
|
||||
String value = new String(entry.getValue(), StandardCharsets.UTF_8);
|
||||
if (key.equals(LOG_KEY) || key.equals(VERSION_KEY)) {
|
||||
break;
|
||||
}
|
||||
config.set(key, value);
|
||||
}
|
||||
return config;
|
||||
}
|
||||
|
@ -268,6 +283,11 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
|||
return version;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected LinkedList<LogMutation> getLogs() throws Exception {
|
||||
return deserLogMutations(db.get(bytes(LOG_KEY)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeVersion() throws Exception {
|
||||
String key = VERSION_KEY;
|
||||
|
|
|
@ -92,6 +92,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
|||
}
|
||||
try {
|
||||
confStore.initialize(config, schedConf, rmContext);
|
||||
confStore.checkVersion();
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
@ -103,6 +104,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
|||
aclMutationPolicy.init(config, rmContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
confStore.close();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public YarnConfigurationStore getConfStore() {
|
||||
return confStore;
|
||||
|
|
|
@ -95,6 +95,12 @@ public abstract class YarnConfigurationStore {
|
|||
public abstract void initialize(Configuration conf, Configuration schedConf,
|
||||
RMContext rmContext) throws Exception;
|
||||
|
||||
/**
|
||||
* Closes the configuration store, releasing any required resources.
|
||||
* @throws IOException on failure to close
|
||||
*/
|
||||
public void close() throws IOException {}
|
||||
|
||||
/**
|
||||
* Logs the configuration change to backing store.
|
||||
* @param logMutation configuration change to be persisted in write ahead log
|
||||
|
|
|
@ -71,6 +71,7 @@ public abstract class ConfigurationStoreBaseTest {
|
|||
confStore.confirmMutation(false);
|
||||
assertNull("Configuration should not be updated",
|
||||
confStore.retrieve().get("keyUpdate2"));
|
||||
confStore.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -86,5 +87,6 @@ public abstract class ConfigurationStoreBaseTest {
|
|||
confStore.logMutation(mutation);
|
||||
confStore.confirmMutation(true);
|
||||
assertNull(confStore.retrieve().get("key"));
|
||||
confStore.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,213 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
/**
|
||||
* Tests {@link LeveldbConfigurationStore}.
|
||||
*/
|
||||
public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
|
||||
|
||||
public static final Log LOG =
|
||||
LogFactory.getLog(TestLeveldbConfigurationStore.class);
|
||||
private static final File TEST_DIR = new File(
|
||||
System.getProperty("test.build.data",
|
||||
System.getProperty("java.io.tmpdir")),
|
||||
TestLeveldbConfigurationStore.class.getName());
|
||||
|
||||
private ResourceManager rm;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
FileUtil.fullyDelete(TEST_DIR);
|
||||
conf.set(CapacitySchedulerConfiguration.CS_CONF_PROVIDER,
|
||||
CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER);
|
||||
conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
|
||||
YarnConfiguration.LEVELDB_CONFIGURATION_STORE);
|
||||
conf.set(YarnConfiguration.RM_SCHEDCONF_STORE_PATH, TEST_DIR.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVersioning() throws Exception {
|
||||
confStore.initialize(conf, schedConf, rmContext);
|
||||
assertNull(confStore.getConfStoreVersion());
|
||||
confStore.checkVersion();
|
||||
assertEquals(LeveldbConfigurationStore.CURRENT_VERSION_INFO,
|
||||
confStore.getConfStoreVersion());
|
||||
confStore.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPersistConfiguration() throws Exception {
|
||||
schedConf.set("key", "val");
|
||||
confStore.initialize(conf, schedConf, rmContext);
|
||||
assertEquals("val", confStore.retrieve().get("key"));
|
||||
confStore.close();
|
||||
|
||||
// Create a new configuration store, and check for old configuration
|
||||
confStore = createConfStore();
|
||||
schedConf.set("key", "badVal");
|
||||
// Should ignore passed-in scheduler configuration.
|
||||
confStore.initialize(conf, schedConf, rmContext);
|
||||
assertEquals("val", confStore.retrieve().get("key"));
|
||||
confStore.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPersistUpdatedConfiguration() throws Exception {
|
||||
confStore.initialize(conf, schedConf, rmContext);
|
||||
assertNull(confStore.retrieve().get("key"));
|
||||
|
||||
Map<String, String> update = new HashMap<>();
|
||||
update.put("key", "val");
|
||||
YarnConfigurationStore.LogMutation mutation =
|
||||
new YarnConfigurationStore.LogMutation(update, TEST_USER);
|
||||
confStore.logMutation(mutation);
|
||||
confStore.confirmMutation(true);
|
||||
assertEquals("val", confStore.retrieve().get("key"));
|
||||
confStore.close();
|
||||
|
||||
// Create a new configuration store, and check for updated configuration
|
||||
confStore = createConfStore();
|
||||
schedConf.set("key", "badVal");
|
||||
// Should ignore passed-in scheduler configuration.
|
||||
confStore.initialize(conf, schedConf, rmContext);
|
||||
assertEquals("val", confStore.retrieve().get("key"));
|
||||
confStore.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxLogs() throws Exception {
|
||||
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
|
||||
confStore.initialize(conf, schedConf, rmContext);
|
||||
LinkedList<YarnConfigurationStore.LogMutation> logs =
|
||||
((LeveldbConfigurationStore) confStore).getLogs();
|
||||
assertEquals(0, logs.size());
|
||||
|
||||
Map<String, String> update1 = new HashMap<>();
|
||||
update1.put("key1", "val1");
|
||||
YarnConfigurationStore.LogMutation mutation =
|
||||
new YarnConfigurationStore.LogMutation(update1, TEST_USER);
|
||||
confStore.logMutation(mutation);
|
||||
logs = ((LeveldbConfigurationStore) confStore).getLogs();
|
||||
assertEquals(1, logs.size());
|
||||
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
|
||||
confStore.confirmMutation(true);
|
||||
assertEquals(1, logs.size());
|
||||
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
|
||||
|
||||
Map<String, String> update2 = new HashMap<>();
|
||||
update2.put("key2", "val2");
|
||||
mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER);
|
||||
confStore.logMutation(mutation);
|
||||
logs = ((LeveldbConfigurationStore) confStore).getLogs();
|
||||
assertEquals(2, logs.size());
|
||||
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
|
||||
assertEquals("val2", logs.get(1).getUpdates().get("key2"));
|
||||
confStore.confirmMutation(true);
|
||||
assertEquals(2, logs.size());
|
||||
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
|
||||
assertEquals("val2", logs.get(1).getUpdates().get("key2"));
|
||||
|
||||
// Next update should purge first update from logs.
|
||||
Map<String, String> update3 = new HashMap<>();
|
||||
update3.put("key3", "val3");
|
||||
mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER);
|
||||
confStore.logMutation(mutation);
|
||||
logs = ((LeveldbConfigurationStore) confStore).getLogs();
|
||||
assertEquals(2, logs.size());
|
||||
assertEquals("val2", logs.get(0).getUpdates().get("key2"));
|
||||
assertEquals("val3", logs.get(1).getUpdates().get("key3"));
|
||||
confStore.confirmMutation(true);
|
||||
assertEquals(2, logs.size());
|
||||
assertEquals("val2", logs.get(0).getUpdates().get("key2"));
|
||||
assertEquals("val3", logs.get(1).getUpdates().get("key3"));
|
||||
confStore.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* When restarting, RM should read from current state of store, including
|
||||
* any updates from the previous RM instance.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testRestartReadsFromUpdatedStore() throws Exception {
|
||||
ResourceManager rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
assertNull(((MutableConfScheduler) rm1.getResourceScheduler())
|
||||
.getConfiguration().get("key"));
|
||||
|
||||
// Update configuration on RM
|
||||
SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo();
|
||||
schedConfUpdateInfo.getGlobalParams().put("key", "val");
|
||||
MutableConfigurationProvider confProvider = ((MutableConfScheduler)
|
||||
rm1.getResourceScheduler()).getMutableConfProvider();
|
||||
UserGroupInformation user = UserGroupInformation
|
||||
.createUserForTesting(TEST_USER, new String[0]);
|
||||
confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
|
||||
rm1.getResourceScheduler().reinitialize(conf, rm1.getRMContext());
|
||||
assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler())
|
||||
.getConfiguration().get("key"));
|
||||
confProvider.confirmPendingMutation(true);
|
||||
assertEquals("val", ((MutableCSConfigurationProvider) confProvider)
|
||||
.getConfStore().retrieve().get("key"));
|
||||
// Next update is not persisted, it should not be recovered
|
||||
schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
|
||||
confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
|
||||
rm1.close();
|
||||
|
||||
// Start RM2 and verifies it starts with updated configuration
|
||||
ResourceManager rm2 = new MockRM(conf);
|
||||
rm2.start();
|
||||
assertEquals("val", ((MutableCSConfigurationProvider) (
|
||||
(CapacityScheduler) rm2.getResourceScheduler())
|
||||
.getMutableConfProvider()).getConfStore().retrieve().get("key"));
|
||||
assertEquals("val", ((MutableConfScheduler) rm2.getResourceScheduler())
|
||||
.getConfiguration().get("key"));
|
||||
rm2.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public YarnConfigurationStore createConfStore() {
|
||||
return new LeveldbConfigurationStore();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue