From cd58f5da5a87f9d1d1f585ad639c06aac85dd775 Mon Sep 17 00:00:00 2001 From: Xuan Date: Mon, 31 Jul 2017 16:48:40 -0700 Subject: [PATCH] YARN-5947: Create LeveldbConfigurationStore class using Leveldb as backing store. Contributed by Jonathan Hung --- .../hadoop/yarn/conf/YarnConfiguration.java | 13 + .../src/main/resources/yarn-default.xml | 29 ++ .../MutableConfigurationProvider.java | 6 + .../scheduler/capacity/CapacityScheduler.java | 3 + .../conf/LeveldbConfigurationStore.java | 314 ++++++++++++++++++ .../conf/MutableCSConfigurationProvider.java | 38 ++- .../capacity/conf/YarnConfigurationStore.java | 14 +- .../conf/TestYarnConfigurationStore.java | 3 +- 8 files changed, 414 insertions(+), 6 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index cef3c741b51..84465eb5871 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -612,8 +612,21 @@ public class YarnConfiguration extends Configuration { public static final String SCHEDULER_CONFIGURATION_STORE_CLASS = YARN_PREFIX + "scheduler.configuration.store.class"; public static final String MEMORY_CONFIGURATION_STORE = "memory"; + public static final String LEVELDB_CONFIGURATION_STORE = "leveldb"; public static final String DEFAULT_CONFIGURATION_STORE = MEMORY_CONFIGURATION_STORE; + public static final String RM_SCHEDCONF_STORE_PATH = YARN_PREFIX + + "scheduler.configuration.leveldb-store.path"; + + public static final String RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS = + YARN_PREFIX + + "scheduler.configuration.leveldb-store.compaction-interval-secs"; + public static final long + DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS = 60 * 60 * 24L; + + public static final String RM_SCHEDCONF_LEVELDB_MAX_LOGS = + YARN_PREFIX + "scheduler.configuration.leveldb-store.max-logs"; + public static final int DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS = 1000; public static final String RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS = YARN_PREFIX + "scheduler.configuration.mutation.acl-policy.class"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 80984d93a7f..c4a761d3690 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3215,4 +3215,33 @@ org.apache.hadoop.yarn.server.resourcemanager.scheduler.DefaultConfigurationMutationACLPolicy + + + The storage path for LevelDB implementation of configuration store, + when yarn.scheduler.configuration.store.class is configured to be + "leveldb". + + yarn.scheduler.configuration.leveldb-store.path + ${hadoop.tmp.dir}/yarn/system/confstore + + + + + The compaction interval for LevelDB configuration store in secs, + when yarn.scheduler.configuration.store.class is configured to be + "leveldb". Default is one day. + + yarn.scheduler.configuration.leveldb-store.compaction-interval-secs + 86400 + + + + + The max number of configuration change log entries kept in LevelDB config + store, when yarn.scheduler.configuration.store.class is configured to be + "leveldb". Default is 1000. + + yarn.scheduler.configuration.leveldb-store.max-logs + 1000 + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java index 86be7c378a1..1f134677da0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java @@ -28,6 +28,12 @@ import java.io.IOException; */ public interface MutableConfigurationProvider { + /** + * Apply transactions which were not committed. + * @throws IOException if recovery fails + */ + void recoverConf() throws IOException; + /** * Update the scheduler configuration with the provided key value pairs. * @param user User issuing the request diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 4151fe7b884..4184917235e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -394,6 +394,9 @@ public class CapacityScheduler extends @Override public void serviceStart() throws Exception { startSchedulerThreads(); + if (this.csConfProvider instanceof MutableConfigurationProvider) { + ((MutableConfigurationProvider) csConfProvider).recoverConf(); + } super.serviceStart(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java new file mode 100644 index 00000000000..15346850c65 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.fusesource.leveldbjni.JniDBFactory; +import org.fusesource.leveldbjni.internal.NativeDB; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBComparator; +import org.iq80.leveldb.DBException; +import org.iq80.leveldb.DBIterator; +import org.iq80.leveldb.Options; +import org.iq80.leveldb.WriteBatch; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; + +import static org.fusesource.leveldbjni.JniDBFactory.bytes; + +/** + * A LevelDB implementation of {@link YarnConfigurationStore}. + */ +public class LeveldbConfigurationStore implements YarnConfigurationStore { + + public static final Log LOG = + LogFactory.getLog(LeveldbConfigurationStore.class); + + private static final String DB_NAME = "yarn-conf-store"; + private static final String LOG_PREFIX = "log."; + private static final String LOG_COMMITTED_TXN = "committedTxn"; + + private DB db; + private long txnId = 0; + private long minTxn = 0; + private long maxLogs; + private Configuration conf; + private LinkedList pendingMutations = new LinkedList<>(); + private Timer compactionTimer; + private long compactionIntervalMsec; + + @Override + public void initialize(Configuration config, Configuration schedConf) + throws IOException { + this.conf = config; + try { + this.db = initDatabase(schedConf); + this.txnId = Long.parseLong(new String(db.get(bytes(LOG_COMMITTED_TXN)), + StandardCharsets.UTF_8)); + DBIterator itr = db.iterator(); + itr.seek(bytes(LOG_PREFIX + txnId)); + // Seek to first uncommitted log + itr.next(); + while (itr.hasNext()) { + Map.Entry entry = itr.next(); + if (!new String(entry.getKey(), StandardCharsets.UTF_8) + .startsWith(LOG_PREFIX)) { + break; + } + pendingMutations.add(deserLogMutation(entry.getValue())); + } + // Get the earliest txnId stored in logs + itr.seekToFirst(); + if (itr.hasNext()) { + Map.Entry entry = itr.next(); + byte[] key = entry.getKey(); + String logId = new String(key, StandardCharsets.UTF_8); + if (logId.startsWith(LOG_PREFIX)) { + minTxn = Long.parseLong(logId.substring(logId.indexOf('.') + 1)); + } + } + this.maxLogs = config.getLong( + YarnConfiguration.RM_SCHEDCONF_LEVELDB_MAX_LOGS, + YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS); + this.compactionIntervalMsec = config.getLong( + YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS, + YarnConfiguration + .DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000; + startCompactionTimer(); + } catch (Exception e) { + throw new IOException(e); + } + } + + private DB initDatabase(Configuration config) throws Exception { + Path storeRoot = createStorageDir(); + Options options = new Options(); + options.createIfMissing(false); + options.comparator(new DBComparator() { + @Override + public int compare(byte[] key1, byte[] key2) { + String key1Str = new String(key1, StandardCharsets.UTF_8); + String key2Str = new String(key2, StandardCharsets.UTF_8); + int key1Txn = Integer.MAX_VALUE; + int key2Txn = Integer.MAX_VALUE; + if (key1Str.startsWith(LOG_PREFIX)) { + key1Txn = Integer.parseInt(key1Str.substring( + key1Str.indexOf('.') + 1)); + } + if (key2Str.startsWith(LOG_PREFIX)) { + key2Txn = Integer.parseInt(key2Str.substring( + key2Str.indexOf('.') + 1)); + } + // TODO txnId could overflow, in theory + if (key1Txn == Integer.MAX_VALUE && key2Txn == Integer.MAX_VALUE) { + if (key1Str.equals(key2Str) && key1Str.equals(LOG_COMMITTED_TXN)) { + return 0; + } else if (key1Str.equals(LOG_COMMITTED_TXN)) { + return -1; + } else if (key2Str.equals(LOG_COMMITTED_TXN)) { + return 1; + } + return key1Str.compareTo(key2Str); + } + return key1Txn - key2Txn; + } + + @Override + public String name() { + return "logComparator"; + } + + public byte[] findShortestSeparator(byte[] start, byte[] limit) { + return start; + } + + public byte[] findShortSuccessor(byte[] key) { + return key; + } + }); + LOG.info("Using conf database at " + storeRoot); + File dbfile = new File(storeRoot.toString()); + try { + db = JniDBFactory.factory.open(dbfile, options); + } catch (NativeDB.DBException e) { + if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { + LOG.info("Creating conf database at " + dbfile); + options.createIfMissing(true); + try { + db = JniDBFactory.factory.open(dbfile, options); + // Write the initial scheduler configuration + WriteBatch initBatch = db.createWriteBatch(); + for (Map.Entry kv : config) { + initBatch.put(bytes(kv.getKey()), bytes(kv.getValue())); + } + initBatch.put(bytes(LOG_COMMITTED_TXN), bytes("0")); + db.write(initBatch); + } catch (DBException dbErr) { + throw new IOException(dbErr.getMessage(), dbErr); + } + } else { + throw e; + } + } + return db; + } + + private Path createStorageDir() throws IOException { + Path root = getStorageDir(); + FileSystem fs = FileSystem.getLocal(conf); + fs.mkdirs(root, new FsPermission((short) 0700)); + return root; + } + + private Path getStorageDir() throws IOException { + String storePath = conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_PATH); + if (storePath == null) { + throw new IOException("No store location directory configured in " + + YarnConfiguration.RM_SCHEDCONF_STORE_PATH); + } + return new Path(storePath, DB_NAME); + } + + @Override + public synchronized long logMutation(LogMutation logMutation) + throws IOException { + logMutation.setId(++txnId); + WriteBatch logBatch = db.createWriteBatch(); + logBatch.put(bytes(LOG_PREFIX + txnId), serLogMutation(logMutation)); + if (txnId - minTxn >= maxLogs) { + logBatch.delete(bytes(LOG_PREFIX + minTxn)); + minTxn++; + } + db.write(logBatch); + pendingMutations.add(logMutation); + return txnId; + } + + @Override + public synchronized boolean confirmMutation(long id, boolean isValid) + throws IOException { + WriteBatch updateBatch = db.createWriteBatch(); + if (isValid) { + LogMutation mutation = deserLogMutation(db.get(bytes(LOG_PREFIX + id))); + for (Map.Entry changes : + mutation.getUpdates().entrySet()) { + if (changes.getValue() == null || changes.getValue().isEmpty()) { + updateBatch.delete(bytes(changes.getKey())); + } else { + updateBatch.put(bytes(changes.getKey()), bytes(changes.getValue())); + } + } + } + updateBatch.put(bytes(LOG_COMMITTED_TXN), bytes(String.valueOf(id))); + db.write(updateBatch); + // Assumes logMutation and confirmMutation are done in the same + // synchronized method. For example, + // {@link MutableCSConfigurationProvider#mutateConfiguration( + // UserGroupInformation user, SchedConfUpdateInfo confUpdate)} + pendingMutations.removeFirst(); + return true; + } + + private byte[] serLogMutation(LogMutation mutation) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ObjectOutput oos = new ObjectOutputStream(baos)) { + oos.writeObject(mutation); + oos.flush(); + return baos.toByteArray(); + } + } + private LogMutation deserLogMutation(byte[] mutation) throws IOException { + try (ObjectInput input = new ObjectInputStream( + new ByteArrayInputStream(mutation))) { + return (LogMutation) input.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + } + + @Override + public synchronized Configuration retrieve() { + DBIterator itr = db.iterator(); + itr.seek(bytes(LOG_COMMITTED_TXN)); + Configuration config = new Configuration(false); + itr.next(); + while (itr.hasNext()) { + Map.Entry entry = itr.next(); + config.set(new String(entry.getKey(), StandardCharsets.UTF_8), + new String(entry.getValue(), StandardCharsets.UTF_8)); + } + return config; + } + + @Override + public List getPendingMutations() { + return pendingMutations; + } + + @Override + public List getConfirmedConfHistory(long fromId) { + return null; // unimplemented + } + + // TODO below was taken from LeveldbRMStateStore, it can probably be + // refactored + private void startCompactionTimer() { + if (compactionIntervalMsec > 0) { + compactionTimer = new Timer( + this.getClass().getSimpleName() + " compaction timer", true); + compactionTimer.schedule(new CompactionTimerTask(), + compactionIntervalMsec, compactionIntervalMsec); + } + } + + private class CompactionTimerTask extends TimerTask { + @Override + public void run() { + long start = Time.monotonicNow(); + LOG.info("Starting full compaction cycle"); + try { + db.compactRange(null, null); + } catch (DBException e) { + LOG.error("Error compacting database", e); + } + long duration = Time.monotonicNow() - start; + LOG.info("Full compaction cycle completed in " + duration + " msec"); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java index 4a3088ca224..0160b551510 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; import com.google.common.base.Joiner; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; @@ -49,6 +51,9 @@ import java.util.Map; public class MutableCSConfigurationProvider implements CSConfigurationProvider, MutableConfigurationProvider { + public static final Log LOG = + LogFactory.getLog(MutableCSConfigurationProvider.class); + private Configuration schedConf; private YarnConfigurationStore confStore; private ConfigurationMutationACLPolicy aclMutationPolicy; @@ -68,6 +73,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, case YarnConfiguration.MEMORY_CONFIGURATION_STORE: this.confStore = new InMemoryConfigurationStore(); break; + case YarnConfiguration.LEVELDB_CONFIGURATION_STORE: + this.confStore = new LeveldbConfigurationStore(); + break; default: this.confStore = YarnConfigurationStoreFactory.getStore(config); break; @@ -82,6 +90,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, schedConf.set(kv.getKey(), kv.getValue()); } confStore.initialize(config, schedConf); + // After initializing confStore, the store may already have an existing + // configuration. Use this one. + schedConf = confStore.retrieve(); this.aclMutationPolicy = ConfigurationMutationACLPolicyFactory .getPolicy(config); aclMutationPolicy.init(config, rmContext); @@ -97,7 +108,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, } @Override - public void mutateConfiguration(UserGroupInformation user, + public synchronized void mutateConfiguration(UserGroupInformation user, SchedConfUpdateInfo confUpdate) throws IOException { if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) { throw new AccessControlException("User is not admin of all modified" + @@ -124,6 +135,31 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, confStore.confirmMutation(id, true); } + @Override + public void recoverConf() throws IOException { + List uncommittedLogs = confStore.getPendingMutations(); + Configuration oldConf = new Configuration(schedConf); + for (LogMutation mutation : uncommittedLogs) { + for (Map.Entry kv : mutation.getUpdates().entrySet()) { + if (kv.getValue() == null) { + schedConf.unset(kv.getKey()); + } else { + schedConf.set(kv.getKey(), kv.getValue()); + } + } + try { + rmContext.getScheduler().reinitialize(conf, rmContext); + } catch (IOException e) { + schedConf = oldConf; + confStore.confirmMutation(mutation.getId(), false); + LOG.info("Configuration mutation " + mutation.getId() + + " was rejected", e); + continue; + } + confStore.confirmMutation(mutation.getId(), true); + LOG.info("Configuration mutation " + mutation.getId()+ " was accepted"); + } + } private Map constructKeyValueConfUpdate( SchedConfUpdateInfo mutationInfo) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java index 22c0ef8f5ef..065c877eeb7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import java.io.IOException; +import java.io.Serializable; import java.util.List; import java.util.Map; @@ -43,7 +45,7 @@ public interface YarnConfigurationStore { * LogMutation encapsulates the fields needed for configuration mutation * audit logging and recovery. */ - class LogMutation { + class LogMutation implements Serializable { private Map updates; private String user; private long id; @@ -106,16 +108,19 @@ public interface YarnConfigurationStore { * Initialize the configuration store. * @param conf configuration to initialize store with * @param schedConf Initial key-value configuration to persist + * @throws IOException if initialization fails */ - void initialize(Configuration conf, Configuration schedConf); + void initialize(Configuration conf, Configuration schedConf) + throws IOException; /** * Logs the configuration change to backing store. Generates an id associated * with this mutation, sets it in {@code logMutation}, and returns it. * @param logMutation configuration change to be persisted in write ahead log * @return id which configuration store associates with this mutation + * @throws IOException if logging fails */ - long logMutation(LogMutation logMutation); + long logMutation(LogMutation logMutation) throws IOException; /** * Should be called after {@code logMutation}. Gets the pending mutation @@ -130,8 +135,9 @@ public interface YarnConfigurationStore { * @param isValid if true, update persisted configuration with mutation * associated with {@code id}. * @return true on success + * @throws IOException if mutation confirmation fails */ - boolean confirmMutation(long id, boolean isValid); + boolean confirmMutation(long id, boolean isValid) throws IOException; /** * Retrieve the persisted configuration. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java index dff4e774700..631ce657e84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestYarnConfigurationStore.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.Yar import org.junit.Before; import org.junit.Test; +import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -43,7 +44,7 @@ public class TestYarnConfigurationStore { } @Test - public void testInMemoryConfigurationStore() { + public void testInMemoryConfigurationStore() throws IOException { confStore = new InMemoryConfigurationStore(); confStore.initialize(new Configuration(), schedConf); assertEquals("val1", confStore.retrieve().get("key1"));