YARN-5008. LeveldbRMStateStore database can grow substantially leading to long recovery times. Contributed by Jason Lowe
(cherry picked from commit dd80042c42
)
This commit is contained in:
parent
0d4fbf01be
commit
e62162db46
|
@ -576,6 +576,13 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX
|
public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX
|
||||||
+ "leveldb-state-store.path";
|
+ "leveldb-state-store.path";
|
||||||
|
|
||||||
|
/** The time in seconds between full compactions of the leveldb database.
|
||||||
|
* Setting the interval to zero disables the full compaction cycles.
|
||||||
|
*/
|
||||||
|
public static final String RM_LEVELDB_COMPACTION_INTERVAL_SECS = RM_PREFIX
|
||||||
|
+ "leveldb-state-store.compaction-interval-secs";
|
||||||
|
public static final long DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS = 3600;
|
||||||
|
|
||||||
/** The maximum number of completed applications RM keeps. */
|
/** The maximum number of completed applications RM keeps. */
|
||||||
public static final String RM_MAX_COMPLETED_APPLICATIONS =
|
public static final String RM_MAX_COMPLETED_APPLICATIONS =
|
||||||
RM_PREFIX + "max-completed-applications";
|
RM_PREFIX + "max-completed-applications";
|
||||||
|
|
|
@ -554,6 +554,14 @@
|
||||||
<value>${hadoop.tmp.dir}/yarn/system/rmstore</value>
|
<value>${hadoop.tmp.dir}/yarn/system/rmstore</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>The time in seconds between full compactions of the leveldb
|
||||||
|
database. Setting the interval to zero disables the full compaction
|
||||||
|
cycles.</description>
|
||||||
|
<name>yarn.resourcemanager.leveldb-state-store.compaction-interval-secs</name>
|
||||||
|
<value>3600</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>Enable RM high-availability. When enabled,
|
<description>Enable RM high-availability. When enabled,
|
||||||
(1) The RM starts in the Standby mode by default, and transitions to
|
(1) The RM starts in the Standby mode by default, and transitions to
|
||||||
|
|
|
@ -29,6 +29,8 @@ import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Timer;
|
||||||
|
import java.util.TimerTask;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -38,6 +40,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
|
@ -97,6 +100,8 @@ public class LeveldbRMStateStore extends RMStateStore {
|
||||||
.newInstance(1, 1);
|
.newInstance(1, 1);
|
||||||
|
|
||||||
private DB db;
|
private DB db;
|
||||||
|
private Timer compactionTimer;
|
||||||
|
private long compactionIntervalMsec;
|
||||||
|
|
||||||
private String getApplicationNodeKey(ApplicationId appId) {
|
private String getApplicationNodeKey(ApplicationId appId) {
|
||||||
return RM_APP_ROOT + SEPARATOR + appId;
|
return RM_APP_ROOT + SEPARATOR + appId;
|
||||||
|
@ -128,6 +133,9 @@ public class LeveldbRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void initInternal(Configuration conf) throws Exception {
|
protected void initInternal(Configuration conf) throws Exception {
|
||||||
|
compactionIntervalMsec = conf.getLong(
|
||||||
|
YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Path getStorageDir() throws IOException {
|
private Path getStorageDir() throws IOException {
|
||||||
|
@ -149,6 +157,11 @@ public class LeveldbRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void startInternal() throws Exception {
|
protected void startInternal() throws Exception {
|
||||||
|
db = openDatabase();
|
||||||
|
startCompactionTimer();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected DB openDatabase() throws Exception {
|
||||||
Path storeRoot = createStorageDir();
|
Path storeRoot = createStorageDir();
|
||||||
Options options = new Options();
|
Options options = new Options();
|
||||||
options.createIfMissing(false);
|
options.createIfMissing(false);
|
||||||
|
@ -172,10 +185,24 @@ public class LeveldbRMStateStore extends RMStateStore {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return db;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startCompactionTimer() {
|
||||||
|
if (compactionIntervalMsec > 0) {
|
||||||
|
compactionTimer = new Timer(
|
||||||
|
this.getClass().getSimpleName() + " compaction timer", true);
|
||||||
|
compactionTimer.schedule(new CompactionTimerTask(),
|
||||||
|
compactionIntervalMsec, compactionIntervalMsec);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void closeInternal() throws Exception {
|
protected void closeInternal() throws Exception {
|
||||||
|
if (compactionTimer != null) {
|
||||||
|
compactionTimer.cancel();
|
||||||
|
compactionTimer = null;
|
||||||
|
}
|
||||||
if (db != null) {
|
if (db != null) {
|
||||||
db.close();
|
db.close();
|
||||||
db = null;
|
db = null;
|
||||||
|
@ -793,6 +820,21 @@ public class LeveldbRMStateStore extends RMStateStore {
|
||||||
return numEntries;
|
return numEntries;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class CompactionTimerTask extends TimerTask {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
long start = Time.monotonicNow();
|
||||||
|
LOG.info("Starting full compaction cycle");
|
||||||
|
try {
|
||||||
|
db.compactRange(null, null);
|
||||||
|
} catch (DBException e) {
|
||||||
|
LOG.error("Error compacting database", e);
|
||||||
|
}
|
||||||
|
long duration = Time.monotonicNow() - start;
|
||||||
|
LOG.info("Full compaction cycle completed in " + duration + " msec");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static class LeveldbLogger implements Logger {
|
private static class LeveldbLogger implements Logger {
|
||||||
private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
|
private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,11 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.isNull;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.timeout;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -25,6 +30,8 @@ import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.records.Version;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
import org.iq80.leveldb.DB;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -108,6 +115,23 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
|
||||||
testReservationStateStore(tester);
|
testReservationStateStore(tester);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testCompactionCycle() throws Exception {
|
||||||
|
final DB mockdb = mock(DB.class);
|
||||||
|
conf.setLong(YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, 1);
|
||||||
|
LeveldbRMStateStore store = new LeveldbRMStateStore() {
|
||||||
|
@Override
|
||||||
|
protected DB openDatabase() throws Exception {
|
||||||
|
return mockdb;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
store.init(conf);
|
||||||
|
store.start();
|
||||||
|
verify(mockdb, timeout(10000)).compactRange(
|
||||||
|
(byte[]) isNull(), (byte[]) isNull());
|
||||||
|
store.close();
|
||||||
|
}
|
||||||
|
|
||||||
class LeveldbStateStoreTester implements RMStateStoreHelper {
|
class LeveldbStateStoreTester implements RMStateStoreHelper {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue