HADOOP-6573. Support for persistent delegation tokens. Contributed by Jitendra Pandey.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@916468 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7785232da5
commit
27ab7f3db9
|
@ -170,6 +170,9 @@ Trunk (unreleased changes)
|
|||
HADOOP-6596. Add a version field to the AbstractDelegationTokenIdentifier's
|
||||
serialized value. (omalley)
|
||||
|
||||
HADOOP-6573. Support for persistent delegation tokens.
|
||||
(Jitendra Pandey via shv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-6467. Improve the performance on HarFileSystem.listStatus(..).
|
||||
|
|
|
@ -52,23 +52,30 @@ extends AbstractDelegationTokenIdentifier>
|
|||
|
||||
/**
|
||||
* Cache of currently valid tokens, mapping from DelegationTokenIdentifier
|
||||
* to DelegationTokenInformation. Protected by its own lock.
|
||||
* to DelegationTokenInformation. Protected by this object lock.
|
||||
*/
|
||||
private final Map<TokenIdent, DelegationTokenInformation> currentTokens
|
||||
protected final Map<TokenIdent, DelegationTokenInformation> currentTokens
|
||||
= new HashMap<TokenIdent, DelegationTokenInformation>();
|
||||
|
||||
/**
|
||||
* Sequence number to create DelegationTokenIdentifier
|
||||
* Sequence number to create DelegationTokenIdentifier.
|
||||
* Protected by this object lock.
|
||||
*/
|
||||
private int delegationTokenSequenceNumber = 0;
|
||||
protected int delegationTokenSequenceNumber = 0;
|
||||
|
||||
private final Map<Integer, DelegationKey> allKeys
|
||||
/**
|
||||
* Access to allKeys is protected by this object lock
|
||||
*/
|
||||
protected final Map<Integer, DelegationKey> allKeys
|
||||
= new HashMap<Integer, DelegationKey>();
|
||||
|
||||
/**
|
||||
* Access to currentId and currentKey is protected by this object lock.
|
||||
* Access to currentId is protected by this object lock.
|
||||
*/
|
||||
protected int currentId = 0;
|
||||
/**
|
||||
* Access to currentKey is protected by this object lock
|
||||
*/
|
||||
private int currentId = 0;
|
||||
private DelegationKey currentKey;
|
||||
|
||||
private long keyUpdateInterval;
|
||||
|
@ -76,7 +83,7 @@ extends AbstractDelegationTokenIdentifier>
|
|||
private long tokenRemoverScanInterval;
|
||||
private long tokenRenewInterval;
|
||||
private Thread tokenRemoverThread;
|
||||
private volatile boolean running;
|
||||
protected volatile boolean running;
|
||||
|
||||
public AbstractDelegationTokenSecretManager(long delegationKeyUpdateInterval,
|
||||
long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
|
||||
|
@ -112,27 +119,50 @@ extends AbstractDelegationTokenIdentifier>
|
|||
return allKeys.values().toArray(new DelegationKey[0]);
|
||||
}
|
||||
|
||||
/** Update the current master key */
|
||||
private synchronized void updateCurrentKey() throws IOException {
|
||||
LOG.info("Updating the current master key for generating delegation tokens");
|
||||
/* Create a new currentKey with an estimated expiry date. */
|
||||
currentId++;
|
||||
currentKey = new DelegationKey(currentId, System.currentTimeMillis()
|
||||
+ keyUpdateInterval + tokenMaxLifetime, generateSecret());
|
||||
allKeys.put(currentKey.getKeyId(), currentKey);
|
||||
protected void logUpdateMasterKey(DelegationKey key) throws IOException {
|
||||
return;
|
||||
}
|
||||
|
||||
/** Update the current master key for generating delegation tokens */
|
||||
public synchronized void rollMasterKey() throws IOException {
|
||||
removeExpiredKeys();
|
||||
/* set final expiry date for retiring currentKey */
|
||||
currentKey.setExpiryDate(System.currentTimeMillis() + tokenMaxLifetime);
|
||||
/*
|
||||
* currentKey might have been removed by removeExpiredKeys(), if
|
||||
* updateMasterKey() isn't called at expected interval. Add it back to
|
||||
* allKeys just in case.
|
||||
*/
|
||||
allKeys.put(currentKey.getKeyId(), currentKey);
|
||||
/**
|
||||
* Update the current master key
|
||||
* This is called once by startThreads before tokenRemoverThread is created,
|
||||
* and only by tokenRemoverThread afterwards.
|
||||
*/
|
||||
private void updateCurrentKey() throws IOException {
|
||||
LOG.info("Updating the current master key for generating delegation tokens");
|
||||
/* Create a new currentKey with an estimated expiry date. */
|
||||
int newCurrentId;
|
||||
synchronized (this) {
|
||||
newCurrentId = currentId+1;
|
||||
}
|
||||
DelegationKey newKey = new DelegationKey(newCurrentId, System
|
||||
.currentTimeMillis()
|
||||
+ keyUpdateInterval + tokenMaxLifetime, generateSecret());
|
||||
//Log must be invoked outside the lock on 'this'
|
||||
logUpdateMasterKey(newKey);
|
||||
synchronized (this) {
|
||||
currentId = newKey.getKeyId();
|
||||
currentKey = newKey;
|
||||
allKeys.put(currentKey.getKeyId(), currentKey);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the current master key for generating delegation tokens
|
||||
* It should be called only by tokenRemoverThread.
|
||||
*/
|
||||
void rollMasterKey() throws IOException {
|
||||
synchronized (this) {
|
||||
removeExpiredKeys();
|
||||
/* set final expiry date for retiring currentKey */
|
||||
currentKey.setExpiryDate(System.currentTimeMillis() + tokenMaxLifetime);
|
||||
/*
|
||||
* currentKey might have been removed by removeExpiredKeys(), if
|
||||
* updateMasterKey() isn't called at expected interval. Add it back to
|
||||
* allKeys just in case.
|
||||
*/
|
||||
allKeys.put(currentKey.getKeyId(), currentKey);
|
||||
}
|
||||
updateCurrentKey();
|
||||
}
|
||||
|
||||
|
@ -148,35 +178,24 @@ extends AbstractDelegationTokenIdentifier>
|
|||
}
|
||||
|
||||
@Override
|
||||
protected byte[] createPassword(TokenIdent identifier) {
|
||||
protected synchronized byte[] createPassword(TokenIdent identifier) {
|
||||
int sequenceNum;
|
||||
int id;
|
||||
DelegationKey key;
|
||||
long now = System.currentTimeMillis();
|
||||
synchronized (this) {
|
||||
id = currentId;
|
||||
key = currentKey;
|
||||
sequenceNum = ++delegationTokenSequenceNumber;
|
||||
}
|
||||
long now = System.currentTimeMillis();
|
||||
sequenceNum = ++delegationTokenSequenceNumber;
|
||||
identifier.setIssueDate(now);
|
||||
identifier.setMaxDate(now + tokenMaxLifetime);
|
||||
identifier.setMasterKeyId(id);
|
||||
identifier.setMasterKeyId(currentId);
|
||||
identifier.setSequenceNumber(sequenceNum);
|
||||
byte[] password = createPassword(identifier.getBytes(), key.getKey());
|
||||
synchronized (currentTokens) {
|
||||
currentTokens.put(identifier, new DelegationTokenInformation(now
|
||||
+ tokenRenewInterval, password));
|
||||
}
|
||||
byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
|
||||
currentTokens.put(identifier, new DelegationTokenInformation(now
|
||||
+ tokenRenewInterval, password));
|
||||
return password;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] retrievePassword(TokenIdent identifier
|
||||
) throws InvalidToken {
|
||||
DelegationTokenInformation info = null;
|
||||
synchronized (currentTokens) {
|
||||
info = currentTokens.get(identifier);
|
||||
}
|
||||
public synchronized byte[] retrievePassword(TokenIdent identifier)
|
||||
throws InvalidToken {
|
||||
DelegationTokenInformation info = currentTokens.get(identifier);
|
||||
if (info == null) {
|
||||
throw new InvalidToken("token is expired or doesn't exist");
|
||||
}
|
||||
|
@ -195,18 +214,14 @@ extends AbstractDelegationTokenIdentifier>
|
|||
* @throws InvalidToken if the token is invalid
|
||||
* @throws AccessControlException if the user can't renew token
|
||||
*/
|
||||
public long renewToken(Token<TokenIdent> token,
|
||||
public synchronized long renewToken(Token<TokenIdent> token,
|
||||
String renewer) throws InvalidToken, IOException {
|
||||
long now = System.currentTimeMillis();
|
||||
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
|
||||
DataInputStream in = new DataInputStream(buf);
|
||||
TokenIdent id = createIdentifier();
|
||||
id.readFields(in);
|
||||
synchronized (currentTokens) {
|
||||
if (currentTokens.get(id) == null) {
|
||||
throw new InvalidToken("Renewal request for unknown token");
|
||||
}
|
||||
}
|
||||
|
||||
if (id.getMaxDate() < now) {
|
||||
throw new InvalidToken("User " + renewer +
|
||||
" tried to renew an expired token");
|
||||
|
@ -222,36 +237,36 @@ extends AbstractDelegationTokenIdentifier>
|
|||
"renewer specified as " +
|
||||
id.getRenewer());
|
||||
}
|
||||
DelegationKey key = null;
|
||||
synchronized (this) {
|
||||
key = allKeys.get(id.getMasterKeyId());
|
||||
}
|
||||
DelegationKey key = allKeys.get(id.getMasterKeyId());
|
||||
if (key == null) {
|
||||
throw new InvalidToken("Unable to find master key for keyId=" +
|
||||
id.getMasterKeyId() +
|
||||
" from cache. Failed to renew an unexpired token"+
|
||||
" with sequenceNumber=" + id.getSequenceNumber());
|
||||
throw new InvalidToken("Unable to find master key for keyId="
|
||||
+ id.getMasterKeyId()
|
||||
+ " from cache. Failed to renew an unexpired token"
|
||||
+ " with sequenceNumber=" + id.getSequenceNumber());
|
||||
}
|
||||
byte[] password = createPassword(token.getIdentifier(), key.getKey());
|
||||
if (!Arrays.equals(password, token.getPassword())) {
|
||||
throw new AccessControlException("Client " + renewer +
|
||||
" is trying to renew a token with " +
|
||||
"wrong password");
|
||||
throw new AccessControlException("Client " + renewer
|
||||
+ " is trying to renew a token with " + "wrong password");
|
||||
}
|
||||
DelegationTokenInformation info = new DelegationTokenInformation(
|
||||
Math.min(id.getMaxDate(), now + tokenRenewInterval), password);
|
||||
synchronized (currentTokens) {
|
||||
currentTokens.put(id, info);
|
||||
long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval);
|
||||
DelegationTokenInformation info = new DelegationTokenInformation(renewTime,
|
||||
password);
|
||||
|
||||
if (currentTokens.get(id) == null) {
|
||||
throw new InvalidToken("Renewal request for unknown token");
|
||||
}
|
||||
return info.getRenewDate();
|
||||
currentTokens.put(id, info);
|
||||
return renewTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel a token by removing it from cache.
|
||||
* @return Identifier of the canceled token
|
||||
* @throws InvalidToken for invalid token
|
||||
* @throws AccessControlException if the user isn't allowed to cancel
|
||||
*/
|
||||
public void cancelToken(Token<TokenIdent> token,
|
||||
public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
|
||||
String canceller) throws IOException {
|
||||
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
|
||||
DataInputStream in = new DataInputStream(buf);
|
||||
|
@ -262,18 +277,17 @@ extends AbstractDelegationTokenIdentifier>
|
|||
}
|
||||
String owner = id.getUser().getUserName();
|
||||
Text renewer = id.getRenewer();
|
||||
if (!canceller.equals(owner) &&
|
||||
(renewer == null || !canceller.equals(renewer.toString()))) {
|
||||
throw new AccessControlException(canceller +
|
||||
" is not authorized to cancel the token");
|
||||
if (!canceller.equals(owner)
|
||||
&& (renewer == null || !canceller.equals(renewer.toString()))) {
|
||||
throw new AccessControlException(canceller
|
||||
+ " is not authorized to cancel the token");
|
||||
}
|
||||
DelegationTokenInformation info = null;
|
||||
synchronized (currentTokens) {
|
||||
info = currentTokens.remove(id);
|
||||
}
|
||||
info = currentTokens.remove(id);
|
||||
if (info == null) {
|
||||
throw new InvalidToken("Token not found");
|
||||
}
|
||||
return id;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -285,16 +299,16 @@ extends AbstractDelegationTokenIdentifier>
|
|||
return SecretManager.createSecretKey(key);
|
||||
}
|
||||
|
||||
/** Utility class to encapsulate a token's renew date and password. */
|
||||
private static class DelegationTokenInformation {
|
||||
/** Class to encapsulate a token's renew date and password. */
|
||||
public static class DelegationTokenInformation {
|
||||
long renewDate;
|
||||
byte[] password;
|
||||
DelegationTokenInformation(long renewDate, byte[] password) {
|
||||
public DelegationTokenInformation(long renewDate, byte[] password) {
|
||||
this.renewDate = renewDate;
|
||||
this.password = password;
|
||||
}
|
||||
/** returns renew date */
|
||||
long getRenewDate() {
|
||||
public long getRenewDate() {
|
||||
return renewDate;
|
||||
}
|
||||
/** returns password */
|
||||
|
@ -304,15 +318,13 @@ extends AbstractDelegationTokenIdentifier>
|
|||
}
|
||||
|
||||
/** Remove expired delegation tokens from cache */
|
||||
private void removeExpiredToken() {
|
||||
private synchronized void removeExpiredToken() {
|
||||
long now = System.currentTimeMillis();
|
||||
synchronized (currentTokens) {
|
||||
Iterator<DelegationTokenInformation> i = currentTokens.values().iterator();
|
||||
while (i.hasNext()) {
|
||||
long renewDate = i.next().getRenewDate();
|
||||
if (now > renewDate) {
|
||||
i.remove();
|
||||
}
|
||||
Iterator<DelegationTokenInformation> i = currentTokens.values().iterator();
|
||||
while (i.hasNext()) {
|
||||
long renewDate = i.next().getRenewDate();
|
||||
if (now > renewDate) {
|
||||
i.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -321,7 +333,9 @@ extends AbstractDelegationTokenIdentifier>
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Stopping expired delegation token remover thread");
|
||||
running = false;
|
||||
tokenRemoverThread.interrupt();
|
||||
if (tokenRemoverThread != null) {
|
||||
tokenRemoverThread.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
private class ExpiredTokenRemover extends Thread {
|
||||
|
|
|
@ -25,7 +25,10 @@ import java.io.DataOutput;
|
|||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
|
@ -36,8 +39,11 @@ import org.apache.hadoop.io.DataOutputBuffer;
|
|||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -91,6 +97,18 @@ public class TestDelegationToken {
|
|||
protected byte[] createPassword(TestDelegationTokenIdentifier t) {
|
||||
return super.createPassword(t);
|
||||
}
|
||||
|
||||
public byte[] createPassword(TestDelegationTokenIdentifier t, DelegationKey key) {
|
||||
return SecretManager.createPassword(t.getBytes(), key.getKey());
|
||||
}
|
||||
|
||||
public Map<TestDelegationTokenIdentifier, DelegationTokenInformation> getAllTokens() {
|
||||
return currentTokens;
|
||||
}
|
||||
|
||||
public DelegationKey getKey(TestDelegationTokenIdentifier id) {
|
||||
return allKeys.get(id.getMasterKeyId());
|
||||
}
|
||||
}
|
||||
|
||||
public static class TokenSelector extends
|
||||
|
@ -299,4 +317,52 @@ public class TestDelegationToken {
|
|||
dtSecretManager.stopThreads();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParallelDelegationTokenCreation() throws Exception {
|
||||
final TestDelegationTokenSecretManager dtSecretManager =
|
||||
new TestDelegationTokenSecretManager(2000, 24 * 60 * 60 * 1000,
|
||||
7 * 24 * 60 * 60 * 1000, 2000);
|
||||
try {
|
||||
dtSecretManager.startThreads();
|
||||
int numThreads = 100;
|
||||
final int numTokensPerThread = 100;
|
||||
class tokenIssuerThread implements Runnable {
|
||||
|
||||
public void run() {
|
||||
for(int i =0;i <numTokensPerThread; i++) {
|
||||
generateDelegationToken(dtSecretManager, "auser", "arenewer");
|
||||
try {
|
||||
Thread.sleep(250);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Thread[] issuers = new Thread[numThreads];
|
||||
for (int i =0; i <numThreads; i++) {
|
||||
issuers[i] = new Daemon(new tokenIssuerThread());
|
||||
issuers[i].start();
|
||||
}
|
||||
for (int i =0; i <numThreads; i++) {
|
||||
issuers[i].join();
|
||||
}
|
||||
Map<TestDelegationTokenIdentifier, DelegationTokenInformation> tokenCache = dtSecretManager
|
||||
.getAllTokens();
|
||||
Assert.assertEquals(numTokensPerThread*numThreads, tokenCache.size());
|
||||
Iterator<TestDelegationTokenIdentifier> iter = tokenCache.keySet().iterator();
|
||||
while (iter.hasNext()) {
|
||||
TestDelegationTokenIdentifier id = iter.next();
|
||||
DelegationTokenInformation info = tokenCache.get(id);
|
||||
Assert.assertTrue(info != null);
|
||||
DelegationKey key = dtSecretManager.getKey(id);
|
||||
Assert.assertTrue(key != null);
|
||||
byte[] storedPassword = dtSecretManager.retrievePassword(id);
|
||||
byte[] password = dtSecretManager.createPassword(id, key);
|
||||
Assert.assertTrue(Arrays.equals(password, storedPassword));
|
||||
}
|
||||
} finally {
|
||||
dtSecretManager.stopThreads();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue