YARN-5601. Make the RM epoch base value configurable. Contributed by Subru Krishnan

This commit is contained in:
Jian He 2016-09-02 12:23:57 +08:00 committed by Carlo Curino
parent ac1ba2a304
commit 9ca2aba9cc
13 changed files with 32 additions and 7 deletions

View File

@ -293,7 +293,10 @@
</Match> </Match>
<Match> <Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore" /> <Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore" />
<Or>
<Field name="resourceManager"/> <Field name="resourceManager"/>
<Field name="baseEpoch"/>
</Or>
<Bug pattern="IS2_INCONSISTENT_SYNC" /> <Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match> </Match>
<Match> <Match>

View File

@ -143,6 +143,9 @@ public class YarnConfiguration extends Configuration {
public static final String RM_HOSTNAME = RM_PREFIX + "hostname"; public static final String RM_HOSTNAME = RM_PREFIX + "hostname";
public static final String RM_EPOCH = RM_PREFIX + "epoch";
public static final long DEFAULT_RM_EPOCH = 0L;
/** The address of the applications manager interface in the RM.*/ /** The address of the applications manager interface in the RM.*/
public static final String RM_ADDRESS = public static final String RM_ADDRESS =
RM_PREFIX + "address"; RM_PREFIX + "address";

View File

@ -75,6 +75,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
.add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED); .add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED);
configurationPropsToSkipCompare configurationPropsToSkipCompare
.add(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS); .add(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);
configurationPropsToSkipCompare
.add(YarnConfiguration.RM_EPOCH);
// Ignore blacklisting nodes for AM failures feature since it is still a // Ignore blacklisting nodes for AM failures feature since it is still a
// "work in progress" // "work in progress"

View File

@ -772,6 +772,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
LOG.error("Failed to load/recover state", e); LOG.error("Failed to load/recover state", e);
throw e; throw e;
} }
} else {
if (HAUtil.isFederationEnabled(conf)) {
long epoch = conf.getLong(YarnConfiguration.RM_EPOCH,
YarnConfiguration.DEFAULT_RM_EPOCH);
rmContext.setEpoch(epoch);
LOG.info("Epoch set for Federation: " + epoch);
}
} }
super.serviceStart(); super.serviceStart();

View File

@ -197,7 +197,7 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override @Override
public synchronized long getAndIncrementEpoch() throws Exception { public synchronized long getAndIncrementEpoch() throws Exception {
Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE); Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE);
long currentEpoch = 0; long currentEpoch = baseEpoch;
FileStatus status = getFileStatusWithRetries(epochNodePath); FileStatus status = getFileStatusWithRetries(epochNodePath);
if (status != null) { if (status != null) {
// load current epoch // load current epoch

View File

@ -255,7 +255,7 @@ public class LeveldbRMStateStore extends RMStateStore {
@Override @Override
public synchronized long getAndIncrementEpoch() throws Exception { public synchronized long getAndIncrementEpoch() throws Exception {
long currentEpoch = 0; long currentEpoch = baseEpoch;
byte[] dbKeyBytes = bytes(EPOCH_NODE); byte[] dbKeyBytes = bytes(EPOCH_NODE);
try { try {
byte[] data = db.get(dbKeyBytes); byte[] data = db.get(dbKeyBytes);

View File

@ -83,6 +83,7 @@ public class MemoryRMStateStore extends RMStateStore {
@Override @Override
public synchronized void initInternal(Configuration conf) { public synchronized void initInternal(Configuration conf) {
epoch = baseEpoch;
} }
@Override @Override

View File

@ -98,6 +98,7 @@ public abstract class RMStateStore extends AbstractService {
"ReservationSystemRoot"; "ReservationSystemRoot";
protected static final String VERSION_NODE = "RMVersionNode"; protected static final String VERSION_NODE = "RMVersionNode";
protected static final String EPOCH_NODE = "EpochNode"; protected static final String EPOCH_NODE = "EpochNode";
protected long baseEpoch;
protected ResourceManager resourceManager; protected ResourceManager resourceManager;
private final ReadLock readLock; private final ReadLock readLock;
private final WriteLock writeLock; private final WriteLock writeLock;
@ -690,6 +691,9 @@ public abstract class RMStateStore extends AbstractService {
dispatcher.register(RMStateStoreEventType.class, dispatcher.register(RMStateStoreEventType.class,
rmStateStoreEventHandler); rmStateStoreEventHandler);
dispatcher.setDrainEventsOnStop(); dispatcher.setDrainEventsOnStop();
// read the base epoch value from conf
baseEpoch = conf.getLong(YarnConfiguration.RM_EPOCH,
YarnConfiguration.DEFAULT_RM_EPOCH);
initInternal(conf); initInternal(conf);
} }

View File

@ -438,7 +438,7 @@ public class ZKRMStateStore extends RMStateStore {
@Override @Override
public synchronized long getAndIncrementEpoch() throws Exception { public synchronized long getAndIncrementEpoch() throws Exception {
String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE); String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
long currentEpoch = 0; long currentEpoch = baseEpoch;
if (exists(epochNodePath)) { if (exists(epochNodePath)) {
// load current epoch // load current epoch

View File

@ -91,6 +91,8 @@ public class RMStateStoreTestBase {
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class); public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
protected final long epoch = 10L;
static class TestDispatcher implements Dispatcher, EventHandler<Event> { static class TestDispatcher implements Dispatcher, EventHandler<Event> {
ApplicationAttemptId attemptId; ApplicationAttemptId attemptId;
@ -564,13 +566,13 @@ public class RMStateStoreTestBase {
store.setRMDispatcher(new TestDispatcher()); store.setRMDispatcher(new TestDispatcher());
long firstTimeEpoch = store.getAndIncrementEpoch(); long firstTimeEpoch = store.getAndIncrementEpoch();
Assert.assertEquals(0, firstTimeEpoch); Assert.assertEquals(epoch, firstTimeEpoch);
long secondTimeEpoch = store.getAndIncrementEpoch(); long secondTimeEpoch = store.getAndIncrementEpoch();
Assert.assertEquals(1, secondTimeEpoch); Assert.assertEquals(epoch + 1, secondTimeEpoch);
long thirdTimeEpoch = store.getAndIncrementEpoch(); long thirdTimeEpoch = store.getAndIncrementEpoch();
Assert.assertEquals(2, thirdTimeEpoch); Assert.assertEquals(epoch + 2, thirdTimeEpoch);
} }
public void testAppDeletion(RMStateStoreHelper stateStoreHelper) public void testAppDeletion(RMStateStoreHelper stateStoreHelper)

View File

@ -117,6 +117,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 8); conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 8);
conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS, conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
900L); 900L);
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
if (adminCheckEnable) { if (adminCheckEnable) {
conf.setBoolean( conf.setBoolean(
YarnConfiguration.YARN_INTERMEDIATE_DATA_ENCRYPTION, true); YarnConfiguration.YARN_INTERMEDIATE_DATA_ENCRYPTION, true);

View File

@ -82,6 +82,7 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testEpoch() throws Exception { public void testEpoch() throws Exception {
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
LeveldbStateStoreTester tester = new LeveldbStateStoreTester(); LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
testEpoch(tester); testEpoch(tester);
} }

View File

@ -189,6 +189,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
conf.set(YarnConfiguration.RM_ZK_ADDRESS, conf.set(YarnConfiguration.RM_ZK_ADDRESS,
curatorTestingServer.getConnectString()); curatorTestingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
this.store = new TestZKRMStateStoreInternal(conf, workingZnode); this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
return this.store; return this.store;
} }