YARN-8151. Yarn RM Epoch should wrap around. Contributed by Young Chen.

This commit is contained in:
Inigo Goiri 2018-05-02 17:23:17 -07:00
parent 87c23ef643
commit e6a80e476d
11 changed files with 51 additions and 7 deletions

View File

@ -188,6 +188,10 @@ public class YarnConfiguration extends Configuration {
public static final String RM_EPOCH = RM_PREFIX + "epoch";
public static final long DEFAULT_RM_EPOCH = 0L;
/** The epoch range before wrap around. 0 disables wrap around*/
public static final String RM_EPOCH_RANGE = RM_EPOCH + ".range";
public static final long DEFAULT_RM_EPOCH_RANGE = 0;
/** The address of the applications manager interface in the RM.*/
public static final String RM_ADDRESS =
RM_PREFIX + "address";

View File

@ -676,6 +676,13 @@
<!--value>yarn-cluster</value-->
</property>
<property>
<description>The range of values above base epoch that the RM will use before
wrapping around</description>
<name>yarn.resourcemanager.epoch.range</name>
<value>0</value>
</property>
<property>
<description>The list of RM nodes in the cluster when HA is
enabled. See description of yarn.resourcemanager.ha

View File

@ -205,12 +205,12 @@ public class FileSystemRMStateStore extends RMStateStore {
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
currentEpoch = epoch.getEpoch();
// increment epoch and store it
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
.toByteArray();
updateFile(epochNodePath, storeData, false);
} else {
// initialize epoch file with 1 for the next time.
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
.toByteArray();
writeFileWithRetries(epochNodePath, storeData, false);
}

View File

@ -259,7 +259,7 @@ public class LeveldbRMStateStore extends RMStateStore {
if (data != null) {
currentEpoch = EpochProto.parseFrom(data).getEpoch();
}
EpochProto proto = Epoch.newInstance(currentEpoch + 1).getProto();
EpochProto proto = Epoch.newInstance(nextEpoch(currentEpoch)).getProto();
db.put(dbKeyBytes, proto.toByteArray());
} catch (DBException e) {
throw new IOException(e);

View File

@ -59,7 +59,7 @@ public class MemoryRMStateStore extends RMStateStore {
@Override
public synchronized long getAndIncrementEpoch() throws Exception {
long currentEpoch = epoch;
epoch = epoch + 1;
epoch = nextEpoch(epoch);
return currentEpoch;
}

View File

@ -104,6 +104,7 @@ public abstract class RMStateStore extends AbstractService {
protected static final String VERSION_NODE = "RMVersionNode";
protected static final String EPOCH_NODE = "EpochNode";
protected long baseEpoch;
private long epochRange;
protected ResourceManager resourceManager;
private final ReadLock readLock;
private final WriteLock writeLock;
@ -732,6 +733,8 @@ public abstract class RMStateStore extends AbstractService {
// read the base epoch value from conf
baseEpoch = conf.getLong(YarnConfiguration.RM_EPOCH,
YarnConfiguration.DEFAULT_RM_EPOCH);
epochRange = conf.getLong(YarnConfiguration.RM_EPOCH_RANGE,
YarnConfiguration.DEFAULT_RM_EPOCH_RANGE);
initInternal(conf);
}
@ -819,6 +822,19 @@ public abstract class RMStateStore extends AbstractService {
*/
public abstract long getAndIncrementEpoch() throws Exception;
/**
* Compute the next epoch value by incrementing by one.
* Wraps around if the epoch range is exceeded so that
* when federation is enabled epoch collisions can be avoided.
*/
protected long nextEpoch(long epoch){
long epochVal = epoch - baseEpoch + 1;
if (epochRange > 0) {
epochVal %= epochRange;
}
return epochVal + baseEpoch;
}
/**
* Blocking API
* The derived class must recover state from the store and return a new

View File

@ -491,13 +491,13 @@ public class ZKRMStateStore extends RMStateStore {
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
currentEpoch = epoch.getEpoch();
// increment epoch and store it
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
.toByteArray();
zkManager.safeSetData(epochNodePath, storeData, -1, zkAcl,
fencingNodePath);
} else {
// initialize epoch node with 1 for the next time.
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
.toByteArray();
zkManager.safeCreate(epochNodePath, storeData, zkAcl,
CreateMode.PERSISTENT, zkAcl, fencingNodePath);

View File

@ -94,6 +94,8 @@ public class RMStateStoreTestBase {
protected final long epoch = 10L;
private final long epochRange = 10L;
static class TestDispatcher implements Dispatcher, EventHandler<Event> {
ApplicationAttemptId attemptId;
@ -141,6 +143,10 @@ public class RMStateStoreTestBase {
boolean attemptExists(RMAppAttempt attempt) throws Exception;
}
public long getEpochRange() {
return epochRange;
}
void waitNotify(TestDispatcher dispatcher) {
long startTime = System.currentTimeMillis();
while(!dispatcher.notified) {
@ -576,6 +582,14 @@ public class RMStateStoreTestBase {
long thirdTimeEpoch = store.getAndIncrementEpoch();
Assert.assertEquals(epoch + 2, thirdTimeEpoch);
for (int i = 0; i < epochRange; ++i) {
store.getAndIncrementEpoch();
}
long wrappedEpoch = store.getAndIncrementEpoch();
// Epoch should have wrapped around and then incremented once for a total
// of + 3
Assert.assertEquals(epoch + 3, wrappedEpoch);
}
public void testAppDeletion(RMStateStoreHelper stateStoreHelper)

View File

@ -118,6 +118,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
900L);
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange());
if (adminCheckEnable) {
conf.setBoolean(
YarnConfiguration.YARN_INTERMEDIATE_DATA_ENCRYPTION, true);

View File

@ -83,6 +83,7 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
@Test(timeout = 60000)
public void testEpoch() throws Exception {
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange());
LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
testEpoch(tester);
}

View File

@ -210,6 +210,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
curatorTestingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange());
this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
return this.store;
}