YARN-5009. NMLeveldbStateStoreService database can grow substantially leading to longer recovery times. Contributed by Jason Lowe

This commit is contained in:
Jian He 2016-04-28 21:54:11 -07:00
parent dd80042c42
commit 4a8508501b
4 changed files with 83 additions and 2 deletions

View File

@ -1470,6 +1470,13 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String NM_RECOVERY_DIR = NM_RECOVERY_PREFIX + "dir";
/** The time in seconds between full compactions of the NM state database.
* Setting the interval to zero disables the full compaction cycles.
*/
public static final String NM_RECOVERY_COMPACTION_INTERVAL_SECS =
NM_RECOVERY_PREFIX + "compaction-interval-secs";
public static final int DEFAULT_NM_RECOVERY_COMPACTION_INTERVAL_SECS = 3600;
public static final String NM_RECOVERY_SUPERVISED =
NM_RECOVERY_PREFIX + "supervised";
public static final boolean DEFAULT_NM_RECOVERY_SUPERVISED = false;

View File

@ -1657,6 +1657,14 @@
<value>${hadoop.tmp.dir}/yarn-nm-recovery</value>
</property>
<property>
<description>The time in seconds between full compactions of the NM state
database. Setting the interval to zero disables the full compaction
cycles.</description>
<name>yarn.nodemanager.recovery.compaction-interval-secs</name>
<value>3600</value>
</property>
<property>
<description>Whether the nodemanager is running under supervision. A
nodemanager that supports recovery and is running under supervision

View File

@ -28,6 +28,8 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -35,6 +37,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -128,6 +131,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private DB db;
private boolean isNewlyCreated;
private Timer compactionTimer;
public NMLeveldbStateStoreService() {
super(NMLeveldbStateStoreService.class.getName());
@ -139,6 +143,10 @@ protected void startStorage() throws IOException {
@Override
protected void closeStorage() throws IOException {
if (compactionTimer != null) {
compactionTimer.cancel();
compactionTimer = null;
}
if (db != null) {
db.close();
}
@ -1004,6 +1012,12 @@ private String getLogDeleterKey(ApplicationId appId) {
@Override
protected void initStorage(Configuration conf)
throws IOException {
db = openDatabase(conf);
checkVersion();
startCompactionTimer(conf);
}
protected DB openDatabase(Configuration conf) throws IOException {
Path storeRoot = createStorageDir(conf);
Options options = new Options();
options.createIfMissing(false);
@ -1028,7 +1042,7 @@ protected void initStorage(Configuration conf)
throw e;
}
}
checkVersion();
return db;
}
private Path createStorageDir(Configuration conf) throws IOException {
@ -1044,6 +1058,33 @@ private Path createStorageDir(Configuration conf) throws IOException {
return root;
}
private void startCompactionTimer(Configuration conf) {
long intervalMsec = conf.getLong(
YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS,
YarnConfiguration.DEFAULT_NM_RECOVERY_COMPACTION_INTERVAL_SECS) * 1000;
if (intervalMsec > 0) {
compactionTimer = new Timer(
this.getClass().getSimpleName() + " compaction timer", true);
compactionTimer.schedule(new CompactionTimerTask(),
intervalMsec, intervalMsec);
}
}
private class CompactionTimerTask extends TimerTask {
@Override
public void run() {
long start = Time.monotonicNow();
LOG.info("Starting full compaction cycle");
try {
db.compactRange(null, null);
} catch (DBException e) {
LOG.error("Error compacting database", e);
}
long duration = Time.monotonicNow() - start;
LOG.info("Full compaction cycle completed in " + duration + " msec");
}
}
private static class LeveldbLogger implements Logger {
private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
@ -1101,7 +1142,7 @@ Version getCurrentVersion() {
* throw exception and indicate user to use a separate upgrade tool to
* upgrade NM state or remove incompatible old state.
*/
private void checkVersion() throws IOException {
protected void checkVersion() throws IOException {
Version loadedVersion = loadVersion();
LOG.info("Loaded NM state version info " + loadedVersion);
if (loadedVersion.equals(getCurrentVersion())) {

View File

@ -23,6 +23,10 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import java.io.File;
import java.io.IOException;
@ -75,6 +79,7 @@
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.iq80.leveldb.DB;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -885,6 +890,26 @@ public void testLogDeleterStorage() throws IOException {
assertTrue(state.getLogDeleterMap().isEmpty());
}
@Test
public void testCompactionCycle() throws IOException {
final DB mockdb = mock(DB.class);
conf.setInt(YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS, 1);
NMLeveldbStateStoreService store = new NMLeveldbStateStoreService() {
@Override
protected void checkVersion() {}
@Override
protected DB openDatabase(Configuration conf) {
return mockdb;
}
};
store.init(conf);
store.start();
verify(mockdb, timeout(10000)).compactRange(
(byte[]) isNull(), (byte[]) isNull());
store.close();
}
private static class NMTokenSecretManagerForTest extends
BaseNMTokenSecretManager {
public MasterKey generateKey() {