YARN-5947: Create LeveldbConfigurationStore class using Leveldb as backing store. Contributed by Jonathan Hung
This commit is contained in:
parent
16b27573ef
commit
cd58f5da5a
|
@ -612,8 +612,21 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String SCHEDULER_CONFIGURATION_STORE_CLASS =
|
||||
YARN_PREFIX + "scheduler.configuration.store.class";
|
||||
public static final String MEMORY_CONFIGURATION_STORE = "memory";
|
||||
public static final String LEVELDB_CONFIGURATION_STORE = "leveldb";
|
||||
public static final String DEFAULT_CONFIGURATION_STORE =
|
||||
MEMORY_CONFIGURATION_STORE;
|
||||
public static final String RM_SCHEDCONF_STORE_PATH = YARN_PREFIX
|
||||
+ "scheduler.configuration.leveldb-store.path";
|
||||
|
||||
public static final String RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS =
|
||||
YARN_PREFIX
|
||||
+ "scheduler.configuration.leveldb-store.compaction-interval-secs";
|
||||
public static final long
|
||||
DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS = 60 * 60 * 24L;
|
||||
|
||||
public static final String RM_SCHEDCONF_LEVELDB_MAX_LOGS =
|
||||
YARN_PREFIX + "scheduler.configuration.leveldb-store.max-logs";
|
||||
public static final int DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000;
|
||||
|
||||
public static final String RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS =
|
||||
YARN_PREFIX + "scheduler.configuration.mutation.acl-policy.class";
|
||||
|
|
|
@ -3215,4 +3215,33 @@
|
|||
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.DefaultConfigurationMutationACLPolicy</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
The storage path for LevelDB implementation of configuration store,
|
||||
when yarn.scheduler.configuration.store.class is configured to be
|
||||
"leveldb".
|
||||
</description>
|
||||
<name>yarn.scheduler.configuration.leveldb-store.path</name>
|
||||
<value>${hadoop.tmp.dir}/yarn/system/confstore</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
The compaction interval for LevelDB configuration store in secs,
|
||||
when yarn.scheduler.configuration.store.class is configured to be
|
||||
"leveldb". Default is one day.
|
||||
</description>
|
||||
<name>yarn.scheduler.configuration.leveldb-store.compaction-interval-secs</name>
|
||||
<value>86400</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
The max number of configuration change log entries kept in LevelDB config
|
||||
store, when yarn.scheduler.configuration.store.class is configured to be
|
||||
"leveldb". Default is 1000.
|
||||
</description>
|
||||
<name>yarn.scheduler.configuration.leveldb-store.max-logs</name>
|
||||
<value>1000</value>
|
||||
</property>
|
||||
</configuration>
|
||||
|
|
|
@ -28,6 +28,12 @@ import java.io.IOException;
|
|||
*/
|
||||
public interface MutableConfigurationProvider {
|
||||
|
||||
/**
|
||||
* Apply transactions which were not committed.
|
||||
* @throws IOException if recovery fails
|
||||
*/
|
||||
void recoverConf() throws IOException;
|
||||
|
||||
/**
|
||||
* Update the scheduler configuration with the provided key value pairs.
|
||||
* @param user User issuing the request
|
||||
|
|
|
@ -394,6 +394,9 @@ public class CapacityScheduler extends
|
|||
@Override
|
||||
public void serviceStart() throws Exception {
|
||||
startSchedulerThreads();
|
||||
if (this.csConfProvider instanceof MutableConfigurationProvider) {
|
||||
((MutableConfigurationProvider) csConfProvider).recoverConf();
|
||||
}
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,314 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.fusesource.leveldbjni.JniDBFactory;
|
||||
import org.fusesource.leveldbjni.internal.NativeDB;
|
||||
import org.iq80.leveldb.DB;
|
||||
import org.iq80.leveldb.DBComparator;
|
||||
import org.iq80.leveldb.DBException;
|
||||
import org.iq80.leveldb.DBIterator;
|
||||
import org.iq80.leveldb.Options;
|
||||
import org.iq80.leveldb.WriteBatch;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectInput;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutput;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
|
||||
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
|
||||
|
||||
/**
|
||||
* A LevelDB implementation of {@link YarnConfigurationStore}.
|
||||
*/
|
||||
public class LeveldbConfigurationStore implements YarnConfigurationStore {
|
||||
|
||||
public static final Log LOG =
|
||||
LogFactory.getLog(LeveldbConfigurationStore.class);
|
||||
|
||||
private static final String DB_NAME = "yarn-conf-store";
|
||||
private static final String LOG_PREFIX = "log.";
|
||||
private static final String LOG_COMMITTED_TXN = "committedTxn";
|
||||
|
||||
private DB db;
|
||||
private long txnId = 0;
|
||||
private long minTxn = 0;
|
||||
private long maxLogs;
|
||||
private Configuration conf;
|
||||
private LinkedList<LogMutation> pendingMutations = new LinkedList<>();
|
||||
private Timer compactionTimer;
|
||||
private long compactionIntervalMsec;
|
||||
|
||||
@Override
|
||||
public void initialize(Configuration config, Configuration schedConf)
|
||||
throws IOException {
|
||||
this.conf = config;
|
||||
try {
|
||||
this.db = initDatabase(schedConf);
|
||||
this.txnId = Long.parseLong(new String(db.get(bytes(LOG_COMMITTED_TXN)),
|
||||
StandardCharsets.UTF_8));
|
||||
DBIterator itr = db.iterator();
|
||||
itr.seek(bytes(LOG_PREFIX + txnId));
|
||||
// Seek to first uncommitted log
|
||||
itr.next();
|
||||
while (itr.hasNext()) {
|
||||
Map.Entry<byte[], byte[]> entry = itr.next();
|
||||
if (!new String(entry.getKey(), StandardCharsets.UTF_8)
|
||||
.startsWith(LOG_PREFIX)) {
|
||||
break;
|
||||
}
|
||||
pendingMutations.add(deserLogMutation(entry.getValue()));
|
||||
}
|
||||
// Get the earliest txnId stored in logs
|
||||
itr.seekToFirst();
|
||||
if (itr.hasNext()) {
|
||||
Map.Entry<byte[], byte[]> entry = itr.next();
|
||||
byte[] key = entry.getKey();
|
||||
String logId = new String(key, StandardCharsets.UTF_8);
|
||||
if (logId.startsWith(LOG_PREFIX)) {
|
||||
minTxn = Long.parseLong(logId.substring(logId.indexOf('.') + 1));
|
||||
}
|
||||
}
|
||||
this.maxLogs = config.getLong(
|
||||
YarnConfiguration.RM_SCHEDCONF_LEVELDB_MAX_LOGS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
|
||||
this.compactionIntervalMsec = config.getLong(
|
||||
YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS,
|
||||
YarnConfiguration
|
||||
.DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
|
||||
startCompactionTimer();
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private DB initDatabase(Configuration config) throws Exception {
|
||||
Path storeRoot = createStorageDir();
|
||||
Options options = new Options();
|
||||
options.createIfMissing(false);
|
||||
options.comparator(new DBComparator() {
|
||||
@Override
|
||||
public int compare(byte[] key1, byte[] key2) {
|
||||
String key1Str = new String(key1, StandardCharsets.UTF_8);
|
||||
String key2Str = new String(key2, StandardCharsets.UTF_8);
|
||||
int key1Txn = Integer.MAX_VALUE;
|
||||
int key2Txn = Integer.MAX_VALUE;
|
||||
if (key1Str.startsWith(LOG_PREFIX)) {
|
||||
key1Txn = Integer.parseInt(key1Str.substring(
|
||||
key1Str.indexOf('.') + 1));
|
||||
}
|
||||
if (key2Str.startsWith(LOG_PREFIX)) {
|
||||
key2Txn = Integer.parseInt(key2Str.substring(
|
||||
key2Str.indexOf('.') + 1));
|
||||
}
|
||||
// TODO txnId could overflow, in theory
|
||||
if (key1Txn == Integer.MAX_VALUE && key2Txn == Integer.MAX_VALUE) {
|
||||
if (key1Str.equals(key2Str) && key1Str.equals(LOG_COMMITTED_TXN)) {
|
||||
return 0;
|
||||
} else if (key1Str.equals(LOG_COMMITTED_TXN)) {
|
||||
return -1;
|
||||
} else if (key2Str.equals(LOG_COMMITTED_TXN)) {
|
||||
return 1;
|
||||
}
|
||||
return key1Str.compareTo(key2Str);
|
||||
}
|
||||
return key1Txn - key2Txn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return "logComparator";
|
||||
}
|
||||
|
||||
public byte[] findShortestSeparator(byte[] start, byte[] limit) {
|
||||
return start;
|
||||
}
|
||||
|
||||
public byte[] findShortSuccessor(byte[] key) {
|
||||
return key;
|
||||
}
|
||||
});
|
||||
LOG.info("Using conf database at " + storeRoot);
|
||||
File dbfile = new File(storeRoot.toString());
|
||||
try {
|
||||
db = JniDBFactory.factory.open(dbfile, options);
|
||||
} catch (NativeDB.DBException e) {
|
||||
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
|
||||
LOG.info("Creating conf database at " + dbfile);
|
||||
options.createIfMissing(true);
|
||||
try {
|
||||
db = JniDBFactory.factory.open(dbfile, options);
|
||||
// Write the initial scheduler configuration
|
||||
WriteBatch initBatch = db.createWriteBatch();
|
||||
for (Map.Entry<String, String> kv : config) {
|
||||
initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
|
||||
}
|
||||
initBatch.put(bytes(LOG_COMMITTED_TXN), bytes("0"));
|
||||
db.write(initBatch);
|
||||
} catch (DBException dbErr) {
|
||||
throw new IOException(dbErr.getMessage(), dbErr);
|
||||
}
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
return db;
|
||||
}
|
||||
|
||||
private Path createStorageDir() throws IOException {
|
||||
Path root = getStorageDir();
|
||||
FileSystem fs = FileSystem.getLocal(conf);
|
||||
fs.mkdirs(root, new FsPermission((short) 0700));
|
||||
return root;
|
||||
}
|
||||
|
||||
private Path getStorageDir() throws IOException {
|
||||
String storePath = conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
|
||||
if (storePath == null) {
|
||||
throw new IOException("No store location directory configured in " +
|
||||
YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
|
||||
}
|
||||
return new Path(storePath, DB_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized long logMutation(LogMutation logMutation)
|
||||
throws IOException {
|
||||
logMutation.setId(++txnId);
|
||||
WriteBatch logBatch = db.createWriteBatch();
|
||||
logBatch.put(bytes(LOG_PREFIX + txnId), serLogMutation(logMutation));
|
||||
if (txnId - minTxn >= maxLogs) {
|
||||
logBatch.delete(bytes(LOG_PREFIX + minTxn));
|
||||
minTxn++;
|
||||
}
|
||||
db.write(logBatch);
|
||||
pendingMutations.add(logMutation);
|
||||
return txnId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean confirmMutation(long id, boolean isValid)
|
||||
throws IOException {
|
||||
WriteBatch updateBatch = db.createWriteBatch();
|
||||
if (isValid) {
|
||||
LogMutation mutation = deserLogMutation(db.get(bytes(LOG_PREFIX + id)));
|
||||
for (Map.Entry<String, String> changes :
|
||||
mutation.getUpdates().entrySet()) {
|
||||
if (changes.getValue() == null || changes.getValue().isEmpty()) {
|
||||
updateBatch.delete(bytes(changes.getKey()));
|
||||
} else {
|
||||
updateBatch.put(bytes(changes.getKey()), bytes(changes.getValue()));
|
||||
}
|
||||
}
|
||||
}
|
||||
updateBatch.put(bytes(LOG_COMMITTED_TXN), bytes(String.valueOf(id)));
|
||||
db.write(updateBatch);
|
||||
// Assumes logMutation and confirmMutation are done in the same
|
||||
// synchronized method. For example,
|
||||
// {@link MutableCSConfigurationProvider#mutateConfiguration(
|
||||
// UserGroupInformation user, SchedConfUpdateInfo confUpdate)}
|
||||
pendingMutations.removeFirst();
|
||||
return true;
|
||||
}
|
||||
|
||||
private byte[] serLogMutation(LogMutation mutation) throws IOException {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
try (ObjectOutput oos = new ObjectOutputStream(baos)) {
|
||||
oos.writeObject(mutation);
|
||||
oos.flush();
|
||||
return baos.toByteArray();
|
||||
}
|
||||
}
|
||||
private LogMutation deserLogMutation(byte[] mutation) throws IOException {
|
||||
try (ObjectInput input = new ObjectInputStream(
|
||||
new ByteArrayInputStream(mutation))) {
|
||||
return (LogMutation) input.readObject();
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Configuration retrieve() {
|
||||
DBIterator itr = db.iterator();
|
||||
itr.seek(bytes(LOG_COMMITTED_TXN));
|
||||
Configuration config = new Configuration(false);
|
||||
itr.next();
|
||||
while (itr.hasNext()) {
|
||||
Map.Entry<byte[], byte[]> entry = itr.next();
|
||||
config.set(new String(entry.getKey(), StandardCharsets.UTF_8),
|
||||
new String(entry.getValue(), StandardCharsets.UTF_8));
|
||||
}
|
||||
return config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<LogMutation> getPendingMutations() {
|
||||
return pendingMutations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<LogMutation> getConfirmedConfHistory(long fromId) {
|
||||
return null; // unimplemented
|
||||
}
|
||||
|
||||
// TODO below was taken from LeveldbRMStateStore, it can probably be
|
||||
// refactored
|
||||
private void startCompactionTimer() {
|
||||
if (compactionIntervalMsec > 0) {
|
||||
compactionTimer = new Timer(
|
||||
this.getClass().getSimpleName() + " compaction timer", true);
|
||||
compactionTimer.schedule(new CompactionTimerTask(),
|
||||
compactionIntervalMsec, compactionIntervalMsec);
|
||||
}
|
||||
}
|
||||
|
||||
private class CompactionTimerTask extends TimerTask {
|
||||
@Override
|
||||
public void run() {
|
||||
long start = Time.monotonicNow();
|
||||
LOG.info("Starting full compaction cycle");
|
||||
try {
|
||||
db.compactRange(null, null);
|
||||
} catch (DBException e) {
|
||||
LOG.error("Error compacting database", e);
|
||||
}
|
||||
long duration = Time.monotonicNow() - start;
|
||||
LOG.info("Full compaction cycle completed in " + duration + " msec");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,6 +19,8 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -49,6 +51,9 @@ import java.util.Map;
|
|||
public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
||||
MutableConfigurationProvider {
|
||||
|
||||
public static final Log LOG =
|
||||
LogFactory.getLog(MutableCSConfigurationProvider.class);
|
||||
|
||||
private Configuration schedConf;
|
||||
private YarnConfigurationStore confStore;
|
||||
private ConfigurationMutationACLPolicy aclMutationPolicy;
|
||||
|
@ -68,6 +73,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
|||
case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
|
||||
this.confStore = new InMemoryConfigurationStore();
|
||||
break;
|
||||
case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
|
||||
this.confStore = new LeveldbConfigurationStore();
|
||||
break;
|
||||
default:
|
||||
this.confStore = YarnConfigurationStoreFactory.getStore(config);
|
||||
break;
|
||||
|
@ -82,6 +90,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
|||
schedConf.set(kv.getKey(), kv.getValue());
|
||||
}
|
||||
confStore.initialize(config, schedConf);
|
||||
// After initializing confStore, the store may already have an existing
|
||||
// configuration. Use this one.
|
||||
schedConf = confStore.retrieve();
|
||||
this.aclMutationPolicy = ConfigurationMutationACLPolicyFactory
|
||||
.getPolicy(config);
|
||||
aclMutationPolicy.init(config, rmContext);
|
||||
|
@ -97,7 +108,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
|||
}
|
||||
|
||||
@Override
|
||||
public void mutateConfiguration(UserGroupInformation user,
|
||||
public synchronized void mutateConfiguration(UserGroupInformation user,
|
||||
SchedConfUpdateInfo confUpdate) throws IOException {
|
||||
if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) {
|
||||
throw new AccessControlException("User is not admin of all modified" +
|
||||
|
@ -124,6 +135,31 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
|||
confStore.confirmMutation(id, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recoverConf() throws IOException {
|
||||
List<LogMutation> uncommittedLogs = confStore.getPendingMutations();
|
||||
Configuration oldConf = new Configuration(schedConf);
|
||||
for (LogMutation mutation : uncommittedLogs) {
|
||||
for (Map.Entry<String, String> kv : mutation.getUpdates().entrySet()) {
|
||||
if (kv.getValue() == null) {
|
||||
schedConf.unset(kv.getKey());
|
||||
} else {
|
||||
schedConf.set(kv.getKey(), kv.getValue());
|
||||
}
|
||||
}
|
||||
try {
|
||||
rmContext.getScheduler().reinitialize(conf, rmContext);
|
||||
} catch (IOException e) {
|
||||
schedConf = oldConf;
|
||||
confStore.confirmMutation(mutation.getId(), false);
|
||||
LOG.info("Configuration mutation " + mutation.getId()
|
||||
+ " was rejected", e);
|
||||
continue;
|
||||
}
|
||||
confStore.confirmMutation(mutation.getId(), true);
|
||||
LOG.info("Configuration mutation " + mutation.getId()+ " was accepted");
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, String> constructKeyValueConfUpdate(
|
||||
SchedConfUpdateInfo mutationInfo) throws IOException {
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -43,7 +45,7 @@ public interface YarnConfigurationStore {
|
|||
* LogMutation encapsulates the fields needed for configuration mutation
|
||||
* audit logging and recovery.
|
||||
*/
|
||||
class LogMutation {
|
||||
class LogMutation implements Serializable {
|
||||
private Map<String, String> updates;
|
||||
private String user;
|
||||
private long id;
|
||||
|
@ -106,16 +108,19 @@ public interface YarnConfigurationStore {
|
|||
* Initialize the configuration store.
|
||||
* @param conf configuration to initialize store with
|
||||
* @param schedConf Initial key-value configuration to persist
|
||||
* @throws IOException if initialization fails
|
||||
*/
|
||||
void initialize(Configuration conf, Configuration schedConf);
|
||||
void initialize(Configuration conf, Configuration schedConf)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Logs the configuration change to backing store. Generates an id associated
|
||||
* with this mutation, sets it in {@code logMutation}, and returns it.
|
||||
* @param logMutation configuration change to be persisted in write ahead log
|
||||
* @return id which configuration store associates with this mutation
|
||||
* @throws IOException if logging fails
|
||||
*/
|
||||
long logMutation(LogMutation logMutation);
|
||||
long logMutation(LogMutation logMutation) throws IOException;
|
||||
|
||||
/**
|
||||
* Should be called after {@code logMutation}. Gets the pending mutation
|
||||
|
@ -130,8 +135,9 @@ public interface YarnConfigurationStore {
|
|||
* @param isValid if true, update persisted configuration with mutation
|
||||
* associated with {@code id}.
|
||||
* @return true on success
|
||||
* @throws IOException if mutation confirmation fails
|
||||
*/
|
||||
boolean confirmMutation(long id, boolean isValid);
|
||||
boolean confirmMutation(long id, boolean isValid) throws IOException;
|
||||
|
||||
/**
|
||||
* Retrieve the persisted configuration.
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.Yar
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -43,7 +44,7 @@ public class TestYarnConfigurationStore {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testInMemoryConfigurationStore() {
|
||||
public void testInMemoryConfigurationStore() throws IOException {
|
||||
confStore = new InMemoryConfigurationStore();
|
||||
confStore.initialize(new Configuration(), schedConf);
|
||||
assertEquals("val1", confStore.retrieve().get("key1"));
|
||||
|
|
Loading…
Reference in New Issue