diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index cdfaaf1c3a9..6f24fd291b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -644,6 +644,13 @@ public class YarnConfiguration extends Configuration { public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX + "leveldb-state-store.path"; + /** The time in seconds between full compactions of the leveldb database. + * Setting the interval to zero disables the full compaction cycles. + */ + public static final String RM_LEVELDB_COMPACTION_INTERVAL_SECS = RM_PREFIX + + "leveldb-state-store.compaction-interval-secs"; + public static final long DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS = 3600; + /** The maximum number of completed applications RM keeps. */ public static final String RM_MAX_COMPLETED_APPLICATIONS = RM_PREFIX + "max-completed-applications"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 560d5481e90..b55684cd950 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -554,6 +554,14 @@ ${hadoop.tmp.dir}/yarn/system/rmstore + + The time in seconds between full compactions of the leveldb + database. Setting the interval to zero disables the full compaction + cycles. + yarn.resourcemanager.leveldb-state-store.compaction-interval-secs + 3600 + + Enable RM high-availability. When enabled, (1) The RM starts in the Standby mode by default, and transitions to diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java index 0dfd1bcdc88..a6f096930e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java @@ -29,6 +29,8 @@ import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.Map.Entry; +import java.util.Timer; +import java.util.TimerTask; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,6 +40,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; @@ -97,6 +100,8 @@ public class LeveldbRMStateStore extends RMStateStore { .newInstance(1, 1); private DB db; + private Timer compactionTimer; + private long compactionIntervalMsec; private String getApplicationNodeKey(ApplicationId appId) { return RM_APP_ROOT + SEPARATOR + appId; @@ -128,6 +133,9 @@ public class LeveldbRMStateStore extends RMStateStore { @Override protected void initInternal(Configuration conf) throws Exception { + compactionIntervalMsec = conf.getLong( + YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, + YarnConfiguration.DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000; } private Path getStorageDir() throws IOException { @@ -149,6 +157,11 @@ public class LeveldbRMStateStore extends RMStateStore { @Override protected void startInternal() throws Exception { + db = openDatabase(); + startCompactionTimer(); + } + + protected DB openDatabase() throws Exception { Path storeRoot = createStorageDir(); Options options = new Options(); options.createIfMissing(false); @@ -172,10 +185,24 @@ public class LeveldbRMStateStore extends RMStateStore { throw e; } } + return db; + } + + private void startCompactionTimer() { + if (compactionIntervalMsec > 0) { + compactionTimer = new Timer( + this.getClass().getSimpleName() + " compaction timer", true); + compactionTimer.schedule(new CompactionTimerTask(), + compactionIntervalMsec, compactionIntervalMsec); + } } @Override protected void closeInternal() throws Exception { + if (compactionTimer != null) { + compactionTimer.cancel(); + compactionTimer = null; + } if (db != null) { db.close(); db = null; @@ -825,6 +852,21 @@ public class LeveldbRMStateStore extends RMStateStore { return numEntries; } + private class CompactionTimerTask extends TimerTask { + @Override + public void run() { + long start = Time.monotonicNow(); + LOG.info("Starting full compaction cycle"); + try { + db.compactRange(null, null); + } catch (DBException e) { + LOG.error("Error compacting database", e); + } + long duration = Time.monotonicNow() - start; + LOG.info("Full compaction cycle completed in " + duration + " msec"); + } + } + private static class LeveldbLogger implements Logger { private static final Log LOG = LogFactory.getLog(LeveldbLogger.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java index ce186e6bec8..4297e732e77 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java @@ -18,6 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import static org.mockito.Mockito.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + import java.io.File; import java.io.IOException; @@ -26,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.iq80.leveldb.DB; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -115,6 +121,23 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase { testReservationStateStore(tester); } + @Test(timeout = 60000) + public void testCompactionCycle() throws Exception { + final DB mockdb = mock(DB.class); + conf.setLong(YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, 1); + LeveldbRMStateStore store = new LeveldbRMStateStore() { + @Override + protected DB openDatabase() throws Exception { + return mockdb; + } + }; + store.init(conf); + store.start(); + verify(mockdb, timeout(10000)).compactRange( + (byte[]) isNull(), (byte[]) isNull()); + store.close(); + } + class LeveldbStateStoreTester implements RMStateStoreHelper { @Override