Merge r1588365 from trunk: YARN-1947. TestRMDelegationTokens#testRMDTMasterKeyStateOnRollingMasterKey is failing intermittently. (Jian He via junping_du)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1588367 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8c23c3295a
commit
e39ce638b6
|
@ -123,6 +123,9 @@ Release 2.4.1 - UNRELEASED
|
|||
YARN-1750. TestNodeStatusUpdater#testNMRegistration is incorrect in test
|
||||
case. (Wangda Tan via junping_du)
|
||||
|
||||
YARN-1947. TestRMDelegationTokens#testRMDTMasterKeyStateOnRollingMasterKey
|
||||
is failing intermittently. (Jian He via junping_du)
|
||||
|
||||
Release 2.4.0 - 2014-04-07
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -190,6 +190,9 @@ public class MemoryRMStateStore extends RMStateStore {
|
|||
}
|
||||
rmDTState.put(rmDTIdentifier, renewDate);
|
||||
state.rmSecretManagerState.dtSequenceNumber = latestSequenceNumber;
|
||||
LOG.info("Store RMDT with sequence number "
|
||||
+ rmDTIdentifier.getSequenceNumber()
|
||||
+ ". And the latest sequence number is " + latestSequenceNumber);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -198,6 +201,8 @@ public class MemoryRMStateStore extends RMStateStore {
|
|||
Map<RMDelegationTokenIdentifier, Long> rmDTState =
|
||||
state.rmSecretManagerState.getTokenState();
|
||||
rmDTState.remove(rmDTIdentifier);
|
||||
LOG.info("Remove RMDT with sequence number "
|
||||
+ rmDTIdentifier.getSequenceNumber());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -207,6 +212,8 @@ public class MemoryRMStateStore extends RMStateStore {
|
|||
removeRMDelegationTokenState(rmDTIdentifier);
|
||||
storeRMDelegationTokenAndSequenceNumberState(
|
||||
rmDTIdentifier, renewDate, latestSequenceNumber);
|
||||
LOG.info("Update RMDT with sequence number "
|
||||
+ rmDTIdentifier.getSequenceNumber());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -223,12 +230,14 @@ public class MemoryRMStateStore extends RMStateStore {
|
|||
throw e;
|
||||
}
|
||||
state.getRMDTSecretManagerState().getMasterKeyState().add(delegationKey);
|
||||
LOG.info("rmDTMasterKeyState SIZE: " + rmDTMasterKeyState.size());
|
||||
LOG.info("Store RMDT master key with key id: " + delegationKey.getKeyId()
|
||||
+ ". Currently rmDTMasterKeyState size: " + rmDTMasterKeyState.size());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void removeRMDTMasterKeyState(DelegationKey delegationKey)
|
||||
throws Exception {
|
||||
LOG.info("Remove RMDT master key with key id: " + delegationKey.getKeyId());
|
||||
Set<DelegationKey> rmDTMasterKeyState =
|
||||
state.rmSecretManagerState.getMasterKeyState();
|
||||
rmDTMasterKeyState.remove(delegationKey);
|
||||
|
|
|
@ -67,6 +67,7 @@ public class TestRMDelegationTokens {
|
|||
conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
|
||||
}
|
||||
|
||||
// Test the DT mast key in the state-store when the mast key is being rolled.
|
||||
@Test(timeout = 15000)
|
||||
public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
|
@ -92,9 +93,6 @@ public class TestRMDelegationTokens {
|
|||
Set<DelegationKey> expiringKeys = new HashSet<DelegationKey>();
|
||||
expiringKeys.addAll(dtSecretManager.getAllMasterKeys());
|
||||
|
||||
// record the current key
|
||||
DelegationKey oldCurrentKey =
|
||||
((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey();
|
||||
|
||||
// request to generate a RMDelegationToken
|
||||
GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class);
|
||||
|
@ -107,29 +105,26 @@ public class TestRMDelegationTokens {
|
|||
ConverterUtils.convertFromYarn(delegationToken, (Text) null);
|
||||
RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier();
|
||||
|
||||
// wait for the first rollMasterKey
|
||||
// For all keys that still remain in memory, we should have them stored
|
||||
// in state-store also.
|
||||
while (((TestRMDelegationTokenSecretManager) dtSecretManager).numUpdatedKeys
|
||||
.get() < 1){
|
||||
Thread.sleep(200);
|
||||
.get() < 3) {
|
||||
((TestRMDelegationTokenSecretManager) dtSecretManager)
|
||||
.checkCurrentKeyInStateStore(rmDTMasterKeyState);
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
// assert old-current-key and new-current-key exist
|
||||
Assert.assertTrue(rmDTMasterKeyState.contains(oldCurrentKey));
|
||||
DelegationKey newCurrentKey =
|
||||
((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey();
|
||||
Assert.assertTrue(rmDTMasterKeyState.contains(newCurrentKey));
|
||||
|
||||
// wait for token to expire
|
||||
// wait for token to expire and remove from state-store
|
||||
// rollMasterKey is called every 1 second.
|
||||
while (((TestRMDelegationTokenSecretManager) dtSecretManager).numUpdatedKeys
|
||||
.get() < 6) {
|
||||
Thread.sleep(200);
|
||||
int count = 0;
|
||||
while (rmDTState.containsKey(dtId1) && count < 100) {
|
||||
Thread.sleep(100);
|
||||
count++;
|
||||
}
|
||||
|
||||
Assert.assertFalse(rmDTState.containsKey(dtId1));
|
||||
rm1.stop();
|
||||
}
|
||||
|
||||
// Test all expired keys are removed from state-store.
|
||||
@Test(timeout = 15000)
|
||||
public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
|
@ -205,10 +200,13 @@ public class TestRMDelegationTokens {
|
|||
numUpdatedKeys.incrementAndGet();
|
||||
}
|
||||
|
||||
public DelegationKey getCurrentKey() {
|
||||
public synchronized DelegationKey checkCurrentKeyInStateStore(
|
||||
Set<DelegationKey> rmDTMasterKeyState) {
|
||||
for (int keyId : allKeys.keySet()) {
|
||||
if (keyId == currentId) {
|
||||
return allKeys.get(keyId);
|
||||
DelegationKey currentKey = allKeys.get(keyId);
|
||||
Assert.assertTrue(rmDTMasterKeyState.contains(currentKey));
|
||||
return currentKey;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
|
Loading…
Reference in New Issue