YARN-5601. Make the RM epoch base value configurable. Contributed by Subru Krishnan
(cherry picked from commit 9ca2aba9cc
)
This commit is contained in:
parent
1882bc10fb
commit
2797507d51
|
@ -293,7 +293,10 @@
|
||||||
</Match>
|
</Match>
|
||||||
<Match>
|
<Match>
|
||||||
<Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore" />
|
<Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore" />
|
||||||
|
<Or>
|
||||||
<Field name="resourceManager"/>
|
<Field name="resourceManager"/>
|
||||||
|
<Field name="baseEpoch"/>
|
||||||
|
</Or>
|
||||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||||
</Match>
|
</Match>
|
||||||
<Match>
|
<Match>
|
||||||
|
|
|
@ -143,6 +143,9 @@ public class YarnConfiguration extends Configuration {
|
||||||
|
|
||||||
public static final String RM_HOSTNAME = RM_PREFIX + "hostname";
|
public static final String RM_HOSTNAME = RM_PREFIX + "hostname";
|
||||||
|
|
||||||
|
public static final String RM_EPOCH = RM_PREFIX + "epoch";
|
||||||
|
public static final long DEFAULT_RM_EPOCH = 0L;
|
||||||
|
|
||||||
/** The address of the applications manager interface in the RM.*/
|
/** The address of the applications manager interface in the RM.*/
|
||||||
public static final String RM_ADDRESS =
|
public static final String RM_ADDRESS =
|
||||||
RM_PREFIX + "address";
|
RM_PREFIX + "address";
|
||||||
|
|
|
@ -75,6 +75,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
|
||||||
.add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED);
|
.add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED);
|
||||||
configurationPropsToSkipCompare
|
configurationPropsToSkipCompare
|
||||||
.add(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);
|
.add(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);
|
||||||
|
configurationPropsToSkipCompare
|
||||||
|
.add(YarnConfiguration.RM_EPOCH);
|
||||||
|
|
||||||
// Ignore blacklisting nodes for AM failures feature since it is still a
|
// Ignore blacklisting nodes for AM failures feature since it is still a
|
||||||
// "work in progress"
|
// "work in progress"
|
||||||
|
|
|
@ -772,6 +772,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
LOG.error("Failed to load/recover state", e);
|
LOG.error("Failed to load/recover state", e);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if (HAUtil.isFederationEnabled(conf)) {
|
||||||
|
long epoch = conf.getLong(YarnConfiguration.RM_EPOCH,
|
||||||
|
YarnConfiguration.DEFAULT_RM_EPOCH);
|
||||||
|
rmContext.setEpoch(epoch);
|
||||||
|
LOG.info("Epoch set for Federation: " + epoch);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
|
|
|
@ -197,7 +197,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
@Override
|
@Override
|
||||||
public synchronized long getAndIncrementEpoch() throws Exception {
|
public synchronized long getAndIncrementEpoch() throws Exception {
|
||||||
Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE);
|
Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE);
|
||||||
long currentEpoch = 0;
|
long currentEpoch = baseEpoch;
|
||||||
FileStatus status = getFileStatusWithRetries(epochNodePath);
|
FileStatus status = getFileStatusWithRetries(epochNodePath);
|
||||||
if (status != null) {
|
if (status != null) {
|
||||||
// load current epoch
|
// load current epoch
|
||||||
|
|
|
@ -255,7 +255,7 @@ public class LeveldbRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized long getAndIncrementEpoch() throws Exception {
|
public synchronized long getAndIncrementEpoch() throws Exception {
|
||||||
long currentEpoch = 0;
|
long currentEpoch = baseEpoch;
|
||||||
byte[] dbKeyBytes = bytes(EPOCH_NODE);
|
byte[] dbKeyBytes = bytes(EPOCH_NODE);
|
||||||
try {
|
try {
|
||||||
byte[] data = db.get(dbKeyBytes);
|
byte[] data = db.get(dbKeyBytes);
|
||||||
|
|
|
@ -83,6 +83,7 @@ public class MemoryRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void initInternal(Configuration conf) {
|
public synchronized void initInternal(Configuration conf) {
|
||||||
|
epoch = baseEpoch;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -98,6 +98,7 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
"ReservationSystemRoot";
|
"ReservationSystemRoot";
|
||||||
protected static final String VERSION_NODE = "RMVersionNode";
|
protected static final String VERSION_NODE = "RMVersionNode";
|
||||||
protected static final String EPOCH_NODE = "EpochNode";
|
protected static final String EPOCH_NODE = "EpochNode";
|
||||||
|
protected long baseEpoch;
|
||||||
protected ResourceManager resourceManager;
|
protected ResourceManager resourceManager;
|
||||||
private final ReadLock readLock;
|
private final ReadLock readLock;
|
||||||
private final WriteLock writeLock;
|
private final WriteLock writeLock;
|
||||||
|
@ -690,6 +691,9 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
dispatcher.register(RMStateStoreEventType.class,
|
dispatcher.register(RMStateStoreEventType.class,
|
||||||
rmStateStoreEventHandler);
|
rmStateStoreEventHandler);
|
||||||
dispatcher.setDrainEventsOnStop();
|
dispatcher.setDrainEventsOnStop();
|
||||||
|
// read the base epoch value from conf
|
||||||
|
baseEpoch = conf.getLong(YarnConfiguration.RM_EPOCH,
|
||||||
|
YarnConfiguration.DEFAULT_RM_EPOCH);
|
||||||
initInternal(conf);
|
initInternal(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -438,7 +438,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
@Override
|
@Override
|
||||||
public synchronized long getAndIncrementEpoch() throws Exception {
|
public synchronized long getAndIncrementEpoch() throws Exception {
|
||||||
String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
|
String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
|
||||||
long currentEpoch = 0;
|
long currentEpoch = baseEpoch;
|
||||||
|
|
||||||
if (exists(epochNodePath)) {
|
if (exists(epochNodePath)) {
|
||||||
// load current epoch
|
// load current epoch
|
||||||
|
|
|
@ -91,6 +91,8 @@ public class RMStateStoreTestBase {
|
||||||
|
|
||||||
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
|
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
|
||||||
|
|
||||||
|
protected final long epoch = 10L;
|
||||||
|
|
||||||
static class TestDispatcher implements Dispatcher, EventHandler<Event> {
|
static class TestDispatcher implements Dispatcher, EventHandler<Event> {
|
||||||
|
|
||||||
ApplicationAttemptId attemptId;
|
ApplicationAttemptId attemptId;
|
||||||
|
@ -564,13 +566,13 @@ public class RMStateStoreTestBase {
|
||||||
store.setRMDispatcher(new TestDispatcher());
|
store.setRMDispatcher(new TestDispatcher());
|
||||||
|
|
||||||
long firstTimeEpoch = store.getAndIncrementEpoch();
|
long firstTimeEpoch = store.getAndIncrementEpoch();
|
||||||
Assert.assertEquals(0, firstTimeEpoch);
|
Assert.assertEquals(epoch, firstTimeEpoch);
|
||||||
|
|
||||||
long secondTimeEpoch = store.getAndIncrementEpoch();
|
long secondTimeEpoch = store.getAndIncrementEpoch();
|
||||||
Assert.assertEquals(1, secondTimeEpoch);
|
Assert.assertEquals(epoch + 1, secondTimeEpoch);
|
||||||
|
|
||||||
long thirdTimeEpoch = store.getAndIncrementEpoch();
|
long thirdTimeEpoch = store.getAndIncrementEpoch();
|
||||||
Assert.assertEquals(2, thirdTimeEpoch);
|
Assert.assertEquals(epoch + 2, thirdTimeEpoch);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAppDeletion(RMStateStoreHelper stateStoreHelper)
|
public void testAppDeletion(RMStateStoreHelper stateStoreHelper)
|
||||||
|
|
|
@ -117,6 +117,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
|
||||||
conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 8);
|
conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 8);
|
||||||
conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
|
conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
|
||||||
900L);
|
900L);
|
||||||
|
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
|
||||||
if (adminCheckEnable) {
|
if (adminCheckEnable) {
|
||||||
conf.setBoolean(
|
conf.setBoolean(
|
||||||
YarnConfiguration.YARN_INTERMEDIATE_DATA_ENCRYPTION, true);
|
YarnConfiguration.YARN_INTERMEDIATE_DATA_ENCRYPTION, true);
|
||||||
|
|
|
@ -82,6 +82,7 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testEpoch() throws Exception {
|
public void testEpoch() throws Exception {
|
||||||
|
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
|
||||||
LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
|
LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
|
||||||
testEpoch(tester);
|
testEpoch(tester);
|
||||||
}
|
}
|
||||||
|
|
|
@ -189,6 +189,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
|
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
|
||||||
curatorTestingServer.getConnectString());
|
curatorTestingServer.getConnectString());
|
||||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
||||||
|
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
|
||||||
this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
|
this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
|
||||||
return this.store;
|
return this.store;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue