diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 0cfca203805..8922e060f8c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1134,6 +1134,13 @@ public class YarnConfiguration extends Configuration {
public static final String NM_RECOVERY_DIR = NM_RECOVERY_PREFIX + "dir";
+ /** The time in seconds between full compactions of the NM state database.
+ * Setting the interval to zero disables the full compaction cycles.
+ */
+ public static final String NM_RECOVERY_COMPACTION_INTERVAL_SECS =
+ NM_RECOVERY_PREFIX + "compaction-interval-secs";
+ public static final int DEFAULT_NM_RECOVERY_COMPACTION_INTERVAL_SECS = 3600;
+
////////////////////////////////
// Web Proxy Configs
////////////////////////////////
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 5664242993a..12e42f8aee2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1240,6 +1240,14 @@
${hadoop.tmp.dir}/yarn-nm-recovery
+
+ The time in seconds between full compactions of the NM state
+ database. Setting the interval to zero disables the full compaction
+ cycles.
+ yarn.nodemanager.recovery.compaction-interval-secs
+ 3600
+
+
The delay time ms to unregister container metrics after completion.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 0c9901ce9cf..36b7f817428 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -28,6 +28,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -123,6 +126,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private DB db;
private boolean isNewlyCreated;
+ private Timer compactionTimer;
public NMLeveldbStateStoreService() {
super(NMLeveldbStateStoreService.class.getName());
@@ -134,6 +138,10 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
@Override
protected void closeStorage() throws IOException {
+ if (compactionTimer != null) {
+ compactionTimer.cancel();
+ compactionTimer = null;
+ }
if (db != null) {
db.close();
}
@@ -942,6 +950,12 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
@Override
protected void initStorage(Configuration conf)
throws IOException {
+ db = openDatabase(conf);
+ checkVersion();
+ startCompactionTimer(conf);
+ }
+
+ protected DB openDatabase(Configuration conf) throws IOException {
Path storeRoot = createStorageDir(conf);
Options options = new Options();
options.createIfMissing(false);
@@ -966,7 +980,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
throw e;
}
}
- checkVersion();
+ return db;
}
private Path createStorageDir(Configuration conf) throws IOException {
@@ -982,6 +996,33 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
return root;
}
+ private void startCompactionTimer(Configuration conf) {
+ long intervalMsec = conf.getLong(
+ YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS,
+ YarnConfiguration.DEFAULT_NM_RECOVERY_COMPACTION_INTERVAL_SECS) * 1000;
+ if (intervalMsec > 0) {
+ compactionTimer = new Timer(
+ this.getClass().getSimpleName() + " compaction timer", true);
+ compactionTimer.schedule(new CompactionTimerTask(),
+ intervalMsec, intervalMsec);
+ }
+ }
+
+
+ private class CompactionTimerTask extends TimerTask {
+ @Override
+ public void run() {
+ long start = Time.monotonicNow();
+ LOG.info("Starting full compaction cycle");
+ try {
+ db.compactRange(null, null);
+ } catch (DBException e) {
+ LOG.error("Error compacting database", e);
+ }
+ long duration = Time.monotonicNow() - start;
+ LOG.info("Full compaction cycle completed in " + duration + " msec");
+ }
+ }
private static class LeveldbLogger implements Logger {
private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
@@ -1039,7 +1080,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
* throw exception and indicate user to use a separate upgrade tool to
* upgrade NM state or remove incompatible old state.
*/
- private void checkVersion() throws IOException {
+ protected void checkVersion() throws IOException {
Version loadedVersion = loadVersion();
LOG.info("Loaded NM state version info " + loadedVersion);
if (loadedVersion.equals(getCurrentVersion())) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 41ec2d55550..f2f43a9118e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -23,6 +23,10 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
import java.io.File;
import java.io.IOException;
@@ -75,6 +79,7 @@ import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.iq80.leveldb.DB;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -874,6 +879,26 @@ public class TestNMLeveldbStateStoreService {
assertTrue(state.getLogDeleterMap().isEmpty());
}
+ @Test
+ public void testCompactionCycle() throws IOException {
+ final DB mockdb = mock(DB.class);
+ conf.setInt(YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS, 1);
+ NMLeveldbStateStoreService store = new NMLeveldbStateStoreService() {
+ @Override
+ protected void checkVersion() {}
+
+ @Override
+ protected DB openDatabase(Configuration conf) {
+ return mockdb;
+ }
+ };
+ store.init(conf);
+ store.start();
+ verify(mockdb, timeout(10000)).compactRange(
+ (byte[]) isNull(), (byte[]) isNull());
+ store.close();
+ }
+
private static class NMTokenSecretManagerForTest extends
BaseNMTokenSecretManager {
public MasterKey generateKey() {