From 9074ebc0d1d33e1759f426a53ba9af641a340249 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 13 Sep 2016 16:29:05 +0000 Subject: [PATCH] YARN-5009. NMLeveldbStateStoreService database can grow substantially leading to longer recovery times. Contributed by Jason Lowe --- hadoop-yarn-project/CHANGES.txt | 3 ++ .../hadoop/yarn/conf/YarnConfiguration.java | 7 +++ .../src/main/resources/yarn-default.xml | 8 ++++ .../recovery/NMLeveldbStateStoreService.java | 45 ++++++++++++++++++- .../TestNMLeveldbStateStoreService.java | 25 +++++++++++ 5 files changed, 86 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e84914b3178..789251a8031 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -19,6 +19,9 @@ Release 2.6.5 - UNRELEASED YARN-5483. Optimize RMAppAttempt#pullJustFinishedContainers (sandflee via jlowe) + YARN-5009. NMLeveldbStateStoreService database can grow substantially + leading to longer recovery times (jlowe) + BUG FIXES YARN-2046. Out of band heartbeats are sent only on container kill and diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index ddcd7f6063e..a59eb085161 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1037,6 +1037,13 @@ private static void addDeprecatedKeys() { public static final String NM_RECOVERY_DIR = NM_RECOVERY_PREFIX + "dir"; + /** The time in seconds between full compactions of the NM state database. + * Setting the interval to zero disables the full compaction cycles. + */ + public static final String NM_RECOVERY_COMPACTION_INTERVAL_SECS = + NM_RECOVERY_PREFIX + "compaction-interval-secs"; + public static final int DEFAULT_NM_RECOVERY_COMPACTION_INTERVAL_SECS = 3600; + //////////////////////////////// // Web Proxy Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e83f56cafce..827161f0fbf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1173,6 +1173,14 @@ ${hadoop.tmp.dir}/yarn-nm-recovery + + The time in seconds between full compactions of the NM state + database. Setting the interval to zero disables the full compaction + cycles. + yarn.nodemanager.recovery.compaction-interval-secs + 3600 + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 5f349dbe359..c102596ccec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Timer; +import java.util.TimerTask; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -119,6 +122,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private DB db; private boolean isNewlyCreated; + private Timer compactionTimer; public NMLeveldbStateStoreService() { super(NMLeveldbStateStoreService.class.getName()); @@ -130,6 +134,10 @@ protected void startStorage() throws IOException { @Override protected void closeStorage() throws IOException { + if (compactionTimer != null) { + compactionTimer.cancel(); + compactionTimer = null; + } if (db != null) { db.close(); } @@ -854,6 +862,12 @@ public void removeContainerToken(ContainerId containerId) @Override protected void initStorage(Configuration conf) throws IOException { + db = openDatabase(conf); + checkVersion(); + startCompactionTimer(conf); + } + + protected DB openDatabase(Configuration conf) throws IOException { Path storeRoot = createStorageDir(conf); Options options = new Options(); options.createIfMissing(false); @@ -878,7 +892,7 @@ protected void initStorage(Configuration conf) throw e; } } - checkVersion(); + return db; } private Path createStorageDir(Configuration conf) throws IOException { @@ -894,6 +908,33 @@ private Path createStorageDir(Configuration conf) throws IOException { return root; } + private void startCompactionTimer(Configuration conf) { + long intervalMsec = conf.getLong( + YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS, + YarnConfiguration.DEFAULT_NM_RECOVERY_COMPACTION_INTERVAL_SECS) * 1000; + if (intervalMsec > 0) { + compactionTimer = new Timer( + this.getClass().getSimpleName() + " compaction timer", true); + compactionTimer.schedule(new CompactionTimerTask(), + intervalMsec, intervalMsec); + } + } + + + private class CompactionTimerTask extends TimerTask { + @Override + public void run() { + long start = Time.monotonicNow(); + LOG.info("Starting full compaction cycle"); + try { + db.compactRange(null, null); + } catch (DBException e) { + LOG.error("Error compacting database", e); + } + long duration = Time.monotonicNow() - start; + LOG.info("Full compaction cycle completed in " + duration + " msec"); + } + } private static class LeveldbLogger implements Logger { private static final Log LOG = LogFactory.getLog(LeveldbLogger.class); @@ -951,7 +992,7 @@ Version getCurrentVersion() { * throw exception and indicate user to use a separate upgrade tool to * upgrade NM state or remove incompatible old state. */ - private void checkVersion() throws IOException { + protected void checkVersion() throws IOException { Version loadedVersion = loadVersion(); LOG.info("Loaded NM state version info " + loadedVersion); if (loadedVersion.equals(getCurrentVersion())) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index f7f43cc5aa4..72d1ac756f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -23,6 +23,10 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; import java.io.File; import java.io.IOException; @@ -73,6 +77,7 @@ import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.iq80.leveldb.DB; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -831,6 +836,26 @@ public void testContainerTokenStorage() throws IOException { assertEquals(expTime3, loadedActiveTokens.get(cid3)); } + @Test + public void testCompactionCycle() throws IOException { + final DB mockdb = mock(DB.class); + conf.setInt(YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS, 1); + NMLeveldbStateStoreService store = new NMLeveldbStateStoreService() { + @Override + protected void checkVersion() {} + + @Override + protected DB openDatabase(Configuration conf) { + return mockdb; + } + }; + store.init(conf); + store.start(); + verify(mockdb, timeout(10000)).compactRange( + (byte[]) isNull(), (byte[]) isNull()); + store.close(); + } + private static class NMTokenSecretManagerForTest extends BaseNMTokenSecretManager { public MasterKey generateKey() {