HADOOP-11170. ZKDelegationTokenSecretManager fails to renewToken created by a peer. (Arun Suresh and Gregory Chanan via kasha)

(cherry picked from commit db45f047ab)
This commit is contained in:
Karthik Kambatla 2014-10-23 17:04:14 -07:00
parent 92ad210699
commit c99d89f2c6
4 changed files with 209 additions and 63 deletions

View File

@ -25,6 +25,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7227. Fix findbugs warning about NP_DEREFERENCE_OF_READLINE_VALUE in
SpanReceiverHost (cmccabe)
HADOOP-11170. ZKDelegationTokenSecretManager fails to renewToken created by
a peer. (Arun Suresh and Gregory Chanan via kasha)
Release 2.6.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -20,10 +20,13 @@ package org.apache.hadoop.security.token.delegation;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
@ -125,7 +128,7 @@ extends AbstractDelegationTokenIdentifier>
* Reset all data structures and mutable state.
*/
public synchronized void reset() {
currentId = 0;
setCurrentKeyId(0);
allKeys.clear();
setDelegationTokenSeqNum(0);
currentTokens.clear();
@ -138,8 +141,8 @@ extends AbstractDelegationTokenIdentifier>
public synchronized void addKey(DelegationKey key) throws IOException {
if (running) // a safety check
throw new IOException("Can't add delegation key to a running SecretManager.");
if (key.getKeyId() > currentId) {
currentId = key.getKeyId();
if (key.getKeyId() > getCurrentKeyId()) {
setCurrentKeyId(key.getKeyId());
}
allKeys.put(key.getKeyId(), key);
}
@ -182,6 +185,30 @@ extends AbstractDelegationTokenIdentifier>
return;
}
/**
* For subclasses externalizing the storage, for example Zookeeper
* based implementations
*/
protected synchronized int getCurrentKeyId() {
return currentId;
}
/**
* For subclasses externalizing the storage, for example Zookeeper
* based implementations
*/
protected synchronized int incrementCurrentKeyId() {
return ++currentId;
}
/**
* For subclasses externalizing the storage, for example Zookeeper
* based implementations
*/
protected synchronized void setCurrentKeyId(int keyId) {
currentId = keyId;
}
/**
* For subclasses externalizing the storage, for example Zookeeper
* based implementations
@ -282,8 +309,8 @@ extends AbstractDelegationTokenIdentifier>
return;
}
byte[] password = createPassword(identifier.getBytes(), dKey.getKey());
if (identifier.getSequenceNumber() > delegationTokenSequenceNumber) {
delegationTokenSequenceNumber = identifier.getSequenceNumber();
if (identifier.getSequenceNumber() > getDelegationTokenSeqNum()) {
setDelegationTokenSeqNum(identifier.getSequenceNumber());
}
if (getTokenInfo(identifier) == null) {
currentTokens.put(identifier, new DelegationTokenInformation(renewDate,
@ -303,7 +330,7 @@ extends AbstractDelegationTokenIdentifier>
/* Create a new currentKey with an estimated expiry date. */
int newCurrentId;
synchronized (this) {
newCurrentId = currentId+1;
newCurrentId = incrementCurrentKeyId();
}
DelegationKey newKey = new DelegationKey(newCurrentId, System
.currentTimeMillis()
@ -311,7 +338,6 @@ extends AbstractDelegationTokenIdentifier>
//Log must be invoked outside the lock on 'this'
logUpdateMasterKey(newKey);
synchronized (this) {
currentId = newKey.getKeyId();
currentKey = newKey;
storeDelegationKey(currentKey);
}
@ -358,9 +384,9 @@ extends AbstractDelegationTokenIdentifier>
sequenceNum = incrementDelegationTokenSeqNum();
identifier.setIssueDate(now);
identifier.setMaxDate(now + tokenMaxLifetime);
identifier.setMasterKeyId(currentId);
identifier.setMasterKeyId(currentKey.getKeyId());
identifier.setSequenceNumber(sequenceNum);
LOG.info("Creating password for identifier: " + identifier);
LOG.info("Creating password for identifier: [" + MD5Hash.digest(identifier.getBytes()) + ", " + currentKey.getKeyId() + "]");
byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));

View File

@ -48,7 +48,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@ -56,6 +55,7 @@ import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -104,6 +104,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot";
private static final String ZK_DTSM_SEQNUM_ROOT = "ZKDTSMSeqNumRoot";
private static final String ZK_DTSM_KEYID_ROOT = "ZKDTSMKeyIdRoot";
private static final String ZK_DTSM_TOKENS_ROOT = "ZKDTSMTokensRoot";
private static final String ZK_DTSM_MASTER_KEY_ROOT = "ZKDTSMMasterKeyRoot";
@ -119,7 +120,8 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
private final boolean isExternalClient;
private final CuratorFramework zkClient;
private SharedCount seqCounter;
private SharedCount delTokSeqCounter;
private SharedCount keyIdSeqCounter;
private PathChildrenCache keyCache;
private PathChildrenCache tokenCache;
private ExecutorService listenerThreadPool;
@ -276,7 +278,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
}
@Override
public synchronized void startThreads() throws IOException {
public void startThreads() throws IOException {
if (!isExternalClient) {
try {
zkClient.start();
@ -285,13 +287,21 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
}
}
try {
seqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0);
if (seqCounter != null) {
seqCounter.start();
delTokSeqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0);
if (delTokSeqCounter != null) {
delTokSeqCounter.start();
}
} catch (Exception e) {
throw new IOException("Could not start Sequence Counter", e);
}
try {
keyIdSeqCounter = new SharedCount(zkClient, ZK_DTSM_KEYID_ROOT, 0);
if (keyIdSeqCounter != null) {
keyIdSeqCounter.start();
}
} catch (Exception e) {
throw new IOException("Could not start KeyId Counter", e);
}
try {
createPersistentNode(ZK_DTSM_MASTER_KEY_ROOT);
createPersistentNode(ZK_DTSM_TOKENS_ROOT);
@ -402,13 +412,16 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
}
@Override
public synchronized void stopThreads() {
public void stopThreads() {
try {
if (!isExternalClient && (zkClient != null)) {
zkClient.close();
}
if (seqCounter != null) {
seqCounter.close();
if (delTokSeqCounter != null) {
delTokSeqCounter.close();
}
if (keyIdSeqCounter != null) {
keyIdSeqCounter.close();
}
if (keyCache != null) {
keyCache.close();
@ -434,30 +447,46 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
}
@Override
protected synchronized int getDelegationTokenSeqNum() {
return seqCounter.getCount();
protected int getDelegationTokenSeqNum() {
return delTokSeqCounter.getCount();
}
@Override
protected synchronized int incrementDelegationTokenSeqNum() {
protected int incrementDelegationTokenSeqNum() {
try {
while (!seqCounter.trySetCount(seqCounter.getCount() + 1)) {
while (!delTokSeqCounter.trySetCount(delTokSeqCounter.getCount() + 1)) {
}
} catch (Exception e) {
throw new RuntimeException("Could not increment shared counter !!", e);
}
return seqCounter.getCount();
return delTokSeqCounter.getCount();
}
@Override
protected synchronized void setDelegationTokenSeqNum(int seqNum) {
protected void setDelegationTokenSeqNum(int seqNum) {
try {
seqCounter.setCount(seqNum);
delTokSeqCounter.setCount(seqNum);
} catch (Exception e) {
throw new RuntimeException("Could not set shared counter !!", e);
}
}
@Override
protected int getCurrentKeyId() {
return keyIdSeqCounter.getCount();
}
@Override
protected int incrementCurrentKeyId() {
try {
while (!keyIdSeqCounter.trySetCount(keyIdSeqCounter.getCount() + 1)) {
}
} catch (Exception e) {
throw new RuntimeException("Could not increment shared keyId counter !!", e);
}
return keyIdSeqCounter.getCount();
}
@Override
protected DelegationKey getDelegationKey(int keyId) {
// First check if its I already have this key
@ -518,6 +547,11 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident)
throws IOException {
return getTokenInfoFromZK(ident, false);
}
private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident,
boolean quiet) throws IOException {
String nodePath =
getNodePath(ZK_DTSM_TOKENS_ROOT,
DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber());
@ -539,7 +573,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
return tokenInfo;
}
} catch (KeeperException.NoNodeException e) {
if (!quiet) {
LOG.error("No node in path [" + nodePath + "]");
}
} catch (Exception ex) {
throw new IOException(ex);
}
@ -604,7 +640,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
}
try {
if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
zkClient.delete().forPath(nodeRemovePath);
while(zkClient.checkExists().forPath(nodeRemovePath) != null){
zkClient.delete().guaranteed().forPath(nodeRemovePath);
}
} else {
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
}
@ -633,10 +671,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
+ ident.getSequenceNumber());
try {
if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
addOrUpdateToken(ident, tokenInfo, true);
} else {
addOrUpdateToken(ident, tokenInfo, false);
LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
} else {
addOrUpdateToken(ident, tokenInfo, true);
}
} catch (Exception e) {
throw new RuntimeException("Could not update Stored Token ZKDTSMDelegationToken_"
@ -656,9 +694,11 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
}
try {
if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
LOG.debug("Attempted to remove a non-existing znode " + nodeRemovePath);
while(zkClient.checkExists().forPath(nodeRemovePath) != null){
zkClient.delete().guaranteed().forPath(nodeRemovePath);
}
} else {
zkClient.delete().forPath(nodeRemovePath);
LOG.debug("Attempted to remove a non-existing znode " + nodeRemovePath);
}
} catch (Exception e) {
throw new RuntimeException(
@ -682,7 +722,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
tokenOut.writeInt(info.getPassword().length);
tokenOut.write(info.getPassword());
if (LOG.isDebugEnabled()) {
LOG.debug((isUpdate ? "Storing " : "Updating ")
LOG.debug((isUpdate ? "Updating " : "Storing ")
+ "ZKDTSMDelegationToken_" +
ident.getSequenceNumber());
}

View File

@ -22,24 +22,38 @@ import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import static org.junit.Assert.fail;
import org.junit.Test;
public class TestZKDelegationTokenSecretManager {
private static final long DAY_IN_SECS = 86400;
@SuppressWarnings("unchecked")
@Test
public void testZKDelTokSecretManager() throws Exception {
TestingServer zkServer = new TestingServer();
DelegationTokenManager tm1, tm2 = null;
private TestingServer zkServer;
@Before
public void setup() throws Exception {
zkServer = new TestingServer();
zkServer.start();
try {
String connectString = zkServer.getConnectString();
}
@After
public void tearDown() throws Exception {
if (zkServer != null) {
zkServer.close();
}
}
protected Configuration getSecretConf(String connectString) {
Configuration conf = new Configuration();
conf.setBoolean(DelegationTokenManager.ENABLE_ZK_KEY, true);
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString);
@ -49,9 +63,18 @@ public class TestZKDelegationTokenSecretManager {
conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS);
tm1 = new DelegationTokenManager(conf, new Text("foo"));
return conf;
}
@SuppressWarnings("unchecked")
@Test
public void testMultiNodeOperations() throws Exception {
DelegationTokenManager tm1, tm2 = null;
String connectString = zkServer.getConnectString();
Configuration conf = getSecretConf(connectString);
tm1 = new DelegationTokenManager(conf, new Text("bla"));
tm1.init();
tm2 = new DelegationTokenManager(conf, new Text("foo"));
tm2 = new DelegationTokenManager(conf, new Text("bla"));
tm2.init();
Token<DelegationTokenIdentifier> token =
@ -59,13 +82,67 @@ public class TestZKDelegationTokenSecretManager {
UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token);
tm2.verifyToken(token);
tm2.renewToken(token, "foo");
tm1.verifyToken(token);
tm1.cancelToken(token, "foo");
try {
tm2.verifyToken(token);
fail("Expected InvalidToken");
} catch (SecretManager.InvalidToken it) {
// Ignore
}
token = (Token<DelegationTokenIdentifier>) tm2.createToken(
UserGroupInformation.getCurrentUser(), "bar");
Assert.assertNotNull(token);
tm1.verifyToken(token);
} finally {
zkServer.close();
tm1.renewToken(token, "bar");
tm2.verifyToken(token);
tm2.cancelToken(token, "bar");
try {
tm1.verifyToken(token);
fail("Expected InvalidToken");
} catch (SecretManager.InvalidToken it) {
// Ignore
}
}
@SuppressWarnings("unchecked")
@Test
public void testRenewTokenSingleManager() throws Exception {
DelegationTokenManager tm1 = null;
String connectString = zkServer.getConnectString();
Configuration conf = getSecretConf(connectString);
tm1 = new DelegationTokenManager(conf, new Text("foo"));
tm1.init();
Token<DelegationTokenIdentifier> token =
(Token<DelegationTokenIdentifier>)
tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token);
tm1.renewToken(token, "foo");
tm1.verifyToken(token);
}
@SuppressWarnings("unchecked")
@Test
public void testCancelTokenSingleManager() throws Exception {
DelegationTokenManager tm1 = null;
String connectString = zkServer.getConnectString();
Configuration conf = getSecretConf(connectString);
tm1 = new DelegationTokenManager(conf, new Text("foo"));
tm1.init();
Token<DelegationTokenIdentifier> token =
(Token<DelegationTokenIdentifier>)
tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token);
tm1.cancelToken(token, "foo");
try {
tm1.verifyToken(token);
fail("Expected InvalidToken");
} catch (SecretManager.InvalidToken it) {
it.printStackTrace();
}
}
}