HADOOP-9574. Added new methods in AbstractDelegationTokenSecretManager for helping YARN ResourceManager to reuse code for RM restart. Contributed by Jian He.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1487692 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5420f287cc
commit
fdfedf4c31
|
@ -447,6 +447,10 @@ Release 2.0.5-beta - UNRELEASED
|
|||
|
||||
HADOOP-9218 Document the Rpc-wrappers used internally (sanjay Radia)
|
||||
|
||||
HADOOP-9574. Added new methods in AbstractDelegationTokenSecretManager for
|
||||
helping YARN ResourceManager to reuse code for RM restart. (Jian He via
|
||||
vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs
|
||||
|
|
|
@ -141,15 +141,73 @@ extends AbstractDelegationTokenIdentifier>
|
|||
public synchronized DelegationKey[] getAllKeys() {
|
||||
return allKeys.values().toArray(new DelegationKey[0]);
|
||||
}
|
||||
|
||||
|
||||
// HDFS
|
||||
protected void logUpdateMasterKey(DelegationKey key) throws IOException {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// HDFS
|
||||
protected void logExpireToken(TokenIdent ident) throws IOException {
|
||||
return;
|
||||
}
|
||||
|
||||
// RM
|
||||
protected void storeNewMasterKey(DelegationKey key) throws IOException {
|
||||
return;
|
||||
}
|
||||
|
||||
// RM
|
||||
protected void removeStoredMasterKey(DelegationKey key) {
|
||||
return;
|
||||
}
|
||||
|
||||
// RM
|
||||
protected void storeNewToken(TokenIdent ident, long renewDate) {
|
||||
return;
|
||||
}
|
||||
// RM
|
||||
protected void removeStoredToken(TokenIdent ident) throws IOException {
|
||||
|
||||
}
|
||||
// RM
|
||||
protected void updateStoredToken(TokenIdent ident, long renewDate) {
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is intended to be used for recovering persisted delegation
|
||||
* tokens
|
||||
* @param identifier identifier read from persistent storage
|
||||
* @param renewDate token renew time
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized void addPersistedDelegationToken(
|
||||
TokenIdent identifier, long renewDate) throws IOException {
|
||||
if (running) {
|
||||
// a safety check
|
||||
throw new IOException(
|
||||
"Can't add persisted delegation token to a running SecretManager.");
|
||||
}
|
||||
int keyId = identifier.getMasterKeyId();
|
||||
DelegationKey dKey = allKeys.get(keyId);
|
||||
if (dKey == null) {
|
||||
LOG.warn("No KEY found for persisted identifier " + identifier.toString());
|
||||
return;
|
||||
}
|
||||
byte[] password = createPassword(identifier.getBytes(), dKey.getKey());
|
||||
if (identifier.getSequenceNumber() > this.delegationTokenSequenceNumber) {
|
||||
this.delegationTokenSequenceNumber = identifier.getSequenceNumber();
|
||||
}
|
||||
if (currentTokens.get(identifier) == null) {
|
||||
currentTokens.put(identifier, new DelegationTokenInformation(renewDate,
|
||||
password));
|
||||
} else {
|
||||
throw new IOException(
|
||||
"Same delegation token being added twice.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the current master key
|
||||
* This is called once by startThreads before tokenRemoverThread is created,
|
||||
|
@ -167,6 +225,7 @@ extends AbstractDelegationTokenIdentifier>
|
|||
+ keyUpdateInterval + tokenMaxLifetime, generateSecret());
|
||||
//Log must be invoked outside the lock on 'this'
|
||||
logUpdateMasterKey(newKey);
|
||||
storeNewMasterKey(newKey);
|
||||
synchronized (this) {
|
||||
currentId = newKey.getKeyId();
|
||||
currentKey = newKey;
|
||||
|
@ -200,6 +259,10 @@ extends AbstractDelegationTokenIdentifier>
|
|||
Map.Entry<Integer, DelegationKey> e = it.next();
|
||||
if (e.getValue().getExpiryDate() < now) {
|
||||
it.remove();
|
||||
// ensure the tokens generated by this current key can be recovered
|
||||
// with this current key after this current key is rolled
|
||||
if(!e.getValue().equals(currentKey))
|
||||
removeStoredMasterKey(e.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -215,6 +278,7 @@ extends AbstractDelegationTokenIdentifier>
|
|||
identifier.setSequenceNumber(sequenceNum);
|
||||
LOG.info("Creating password for identifier: " + identifier);
|
||||
byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
|
||||
storeNewToken(identifier, now + tokenRenewInterval);
|
||||
currentTokens.put(identifier, new DelegationTokenInformation(now
|
||||
+ tokenRenewInterval, password));
|
||||
return password;
|
||||
|
@ -302,6 +366,7 @@ extends AbstractDelegationTokenIdentifier>
|
|||
throw new InvalidToken("Renewal request for unknown token");
|
||||
}
|
||||
currentTokens.put(id, info);
|
||||
updateStoredToken(id, renewTime);
|
||||
return renewTime;
|
||||
}
|
||||
|
||||
|
@ -337,6 +402,7 @@ extends AbstractDelegationTokenIdentifier>
|
|||
if (info == null) {
|
||||
throw new InvalidToken("Token not found");
|
||||
}
|
||||
removeStoredToken(id);
|
||||
return id;
|
||||
}
|
||||
|
||||
|
@ -387,6 +453,7 @@ extends AbstractDelegationTokenIdentifier>
|
|||
// don't hold lock on 'this' to avoid edit log updates blocking token ops
|
||||
for (TokenIdent ident : expiredTokens) {
|
||||
logExpireToken(ident);
|
||||
removeStoredToken(ident);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,18 +18,18 @@
|
|||
|
||||
package org.apache.hadoop.security.token.delegation;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
import org.apache.avro.reflect.Nullable;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import org.apache.avro.reflect.Nullable;
|
||||
|
||||
/**
|
||||
* Key used for generating and verifying delegation tokens
|
||||
|
@ -117,4 +117,29 @@ public class DelegationKey implements Writable {
|
|||
in.readFully(keyBytes);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + (int) (expiryDate ^ (expiryDate >>> 32));
|
||||
result = prime * result + Arrays.hashCode(keyBytes);
|
||||
result = prime * result + keyId;
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object right) {
|
||||
if (this == right) {
|
||||
return true;
|
||||
} else if (right == null || getClass() != right.getClass()) {
|
||||
return false;
|
||||
} else {
|
||||
DelegationKey r = (DelegationKey) right;
|
||||
return keyId == r.keyId &&
|
||||
expiryDate == r.expiryDate &&
|
||||
Arrays.equals(keyBytes, r.keyBytes);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -84,6 +84,11 @@ public class TestDelegationToken {
|
|||
public static class TestDelegationTokenSecretManager
|
||||
extends AbstractDelegationTokenSecretManager<TestDelegationTokenIdentifier> {
|
||||
|
||||
public boolean isStoreNewMasterKeyCalled = false;
|
||||
public boolean isRemoveStoredMasterKeyCalled = false;
|
||||
public boolean isStoreNewTokenCalled = false;
|
||||
public boolean isRemoveStoredTokenCalled = false;
|
||||
public boolean isUpdateStoredTokenCalled = false;
|
||||
public TestDelegationTokenSecretManager(long delegationKeyUpdateInterval,
|
||||
long delegationTokenMaxLifetime,
|
||||
long delegationTokenRenewInterval,
|
||||
|
@ -101,7 +106,40 @@ public class TestDelegationToken {
|
|||
protected byte[] createPassword(TestDelegationTokenIdentifier t) {
|
||||
return super.createPassword(t);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void storeNewMasterKey(DelegationKey key) throws IOException {
|
||||
isStoreNewMasterKeyCalled = true;
|
||||
super.storeNewMasterKey(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void removeStoredMasterKey(DelegationKey key) {
|
||||
isRemoveStoredMasterKeyCalled = true;
|
||||
Assert.assertFalse(key.equals(allKeys.get(currentId)));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void storeNewToken(TestDelegationTokenIdentifier ident,
|
||||
long renewDate) {
|
||||
super.storeNewToken(ident, renewDate);
|
||||
isStoreNewTokenCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void removeStoredToken(TestDelegationTokenIdentifier ident)
|
||||
throws IOException {
|
||||
super.removeStoredToken(ident);
|
||||
isRemoveStoredTokenCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateStoredToken(TestDelegationTokenIdentifier ident,
|
||||
long renewDate) {
|
||||
super.updateStoredToken(ident, renewDate);
|
||||
isUpdateStoredTokenCalled = true;
|
||||
}
|
||||
|
||||
public byte[] createPassword(TestDelegationTokenIdentifier t, DelegationKey key) {
|
||||
return SecretManager.createPassword(t.getBytes(), key.getKey());
|
||||
}
|
||||
|
@ -229,6 +267,7 @@ public class TestDelegationToken {
|
|||
final Token<TestDelegationTokenIdentifier> token =
|
||||
generateDelegationToken(
|
||||
dtSecretManager, "SomeUser", "JobTracker");
|
||||
Assert.assertTrue(dtSecretManager.isStoreNewTokenCalled);
|
||||
// Fake renewer should not be able to renew
|
||||
shouldThrow(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
|
@ -238,6 +277,7 @@ public class TestDelegationToken {
|
|||
}
|
||||
}, AccessControlException.class);
|
||||
long time = dtSecretManager.renewToken(token, "JobTracker");
|
||||
Assert.assertTrue(dtSecretManager.isUpdateStoredTokenCalled);
|
||||
assertTrue("renew time is in future", time > Time.now());
|
||||
TestDelegationTokenIdentifier identifier =
|
||||
new TestDelegationTokenIdentifier();
|
||||
|
@ -289,6 +329,7 @@ public class TestDelegationToken {
|
|||
}
|
||||
}, AccessControlException.class);
|
||||
dtSecretManager.cancelToken(token, "JobTracker");
|
||||
Assert.assertTrue(dtSecretManager.isRemoveStoredTokenCalled);
|
||||
shouldThrow(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
|
@ -304,8 +345,8 @@ public class TestDelegationToken {
|
|||
@Test
|
||||
public void testRollMasterKey() throws Exception {
|
||||
TestDelegationTokenSecretManager dtSecretManager =
|
||||
new TestDelegationTokenSecretManager(24*60*60*1000,
|
||||
10*1000,1*1000,3600000);
|
||||
new TestDelegationTokenSecretManager(800,
|
||||
800,1*1000,3600000);
|
||||
try {
|
||||
dtSecretManager.startThreads();
|
||||
//generate a token and store the password
|
||||
|
@ -316,7 +357,8 @@ public class TestDelegationToken {
|
|||
int prevNumKeys = dtSecretManager.getAllKeys().length;
|
||||
|
||||
dtSecretManager.rollMasterKey();
|
||||
|
||||
Assert.assertTrue(dtSecretManager.isStoreNewMasterKeyCalled);
|
||||
|
||||
//after rolling, the length of the keys list must increase
|
||||
int currNumKeys = dtSecretManager.getAllKeys().length;
|
||||
Assert.assertEquals((currNumKeys - prevNumKeys) >= 1, true);
|
||||
|
@ -333,6 +375,10 @@ public class TestDelegationToken {
|
|||
dtSecretManager.retrievePassword(identifier);
|
||||
//compare the passwords
|
||||
Assert.assertEquals(oldPasswd, newPasswd);
|
||||
// wait for keys to exipire
|
||||
Thread.sleep(2200);
|
||||
Assert.assertTrue(dtSecretManager.isRemoveStoredMasterKeyCalled);
|
||||
|
||||
} finally {
|
||||
dtSecretManager.stopThreads();
|
||||
}
|
||||
|
@ -484,4 +530,13 @@ public class TestDelegationToken {
|
|||
assertFalse(testDelegationTokenIdentiferSerializationRoundTrip(
|
||||
new Text("owner"), new Text("renewer"), new Text(bigBuf)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelegationKeyEqualAndHash() {
|
||||
DelegationKey key1 = new DelegationKey(1111, 2222, "keyBytes".getBytes());
|
||||
DelegationKey key2 = new DelegationKey(1111, 2222, "keyBytes".getBytes());
|
||||
DelegationKey key3 = new DelegationKey(3333, 2222, "keyBytes".getBytes());
|
||||
Assert.assertEquals(key1, key2);
|
||||
Assert.assertFalse(key2.equals(key3));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue