HADOOP-11170. ZKDelegationTokenSecretManager fails to renewToken created by a peer. (Arun Suresh and Gregory Chanan via kasha)

This commit is contained in:
Karthik Kambatla 2014-10-23 17:04:14 -07:00
parent 670879ef41
commit db45f047ab
4 changed files with 209 additions and 63 deletions

View File

@ -368,6 +368,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7227. Fix findbugs warning about NP_DEREFERENCE_OF_READLINE_VALUE in HDFS-7227. Fix findbugs warning about NP_DEREFERENCE_OF_READLINE_VALUE in
SpanReceiverHost (cmccabe) SpanReceiverHost (cmccabe)
HADOOP-11170. ZKDelegationTokenSecretManager fails to renewToken created by
a peer. (Arun Suresh and Gregory Chanan via kasha)
Release 2.6.0 - UNRELEASED Release 2.6.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -20,10 +20,13 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
@ -125,7 +128,7 @@ public void startThreads() throws IOException {
* Reset all data structures and mutable state. * Reset all data structures and mutable state.
*/ */
public synchronized void reset() { public synchronized void reset() {
currentId = 0; setCurrentKeyId(0);
allKeys.clear(); allKeys.clear();
setDelegationTokenSeqNum(0); setDelegationTokenSeqNum(0);
currentTokens.clear(); currentTokens.clear();
@ -138,8 +141,8 @@ public synchronized void reset() {
public synchronized void addKey(DelegationKey key) throws IOException { public synchronized void addKey(DelegationKey key) throws IOException {
if (running) // a safety check if (running) // a safety check
throw new IOException("Can't add delegation key to a running SecretManager."); throw new IOException("Can't add delegation key to a running SecretManager.");
if (key.getKeyId() > currentId) { if (key.getKeyId() > getCurrentKeyId()) {
currentId = key.getKeyId(); setCurrentKeyId(key.getKeyId());
} }
allKeys.put(key.getKeyId(), key); allKeys.put(key.getKeyId(), key);
} }
@ -182,6 +185,30 @@ protected void updateStoredToken(TokenIdent ident, long renewDate) throws IOExce
return; return;
} }
/**
* For subclasses externalizing the storage, for example Zookeeper
* based implementations
*/
protected synchronized int getCurrentKeyId() {
return currentId;
}
/**
* For subclasses externalizing the storage, for example Zookeeper
* based implementations
*/
protected synchronized int incrementCurrentKeyId() {
return ++currentId;
}
/**
* For subclasses externalizing the storage, for example Zookeeper
* based implementations
*/
protected synchronized void setCurrentKeyId(int keyId) {
currentId = keyId;
}
/** /**
* For subclasses externalizing the storage, for example Zookeeper * For subclasses externalizing the storage, for example Zookeeper
* based implementations * based implementations
@ -282,8 +309,8 @@ public synchronized void addPersistedDelegationToken(
return; return;
} }
byte[] password = createPassword(identifier.getBytes(), dKey.getKey()); byte[] password = createPassword(identifier.getBytes(), dKey.getKey());
if (identifier.getSequenceNumber() > delegationTokenSequenceNumber) { if (identifier.getSequenceNumber() > getDelegationTokenSeqNum()) {
delegationTokenSequenceNumber = identifier.getSequenceNumber(); setDelegationTokenSeqNum(identifier.getSequenceNumber());
} }
if (getTokenInfo(identifier) == null) { if (getTokenInfo(identifier) == null) {
currentTokens.put(identifier, new DelegationTokenInformation(renewDate, currentTokens.put(identifier, new DelegationTokenInformation(renewDate,
@ -303,7 +330,7 @@ private void updateCurrentKey() throws IOException {
/* Create a new currentKey with an estimated expiry date. */ /* Create a new currentKey with an estimated expiry date. */
int newCurrentId; int newCurrentId;
synchronized (this) { synchronized (this) {
newCurrentId = currentId+1; newCurrentId = incrementCurrentKeyId();
} }
DelegationKey newKey = new DelegationKey(newCurrentId, System DelegationKey newKey = new DelegationKey(newCurrentId, System
.currentTimeMillis() .currentTimeMillis()
@ -311,7 +338,6 @@ private void updateCurrentKey() throws IOException {
//Log must be invoked outside the lock on 'this' //Log must be invoked outside the lock on 'this'
logUpdateMasterKey(newKey); logUpdateMasterKey(newKey);
synchronized (this) { synchronized (this) {
currentId = newKey.getKeyId();
currentKey = newKey; currentKey = newKey;
storeDelegationKey(currentKey); storeDelegationKey(currentKey);
} }
@ -358,9 +384,9 @@ protected synchronized byte[] createPassword(TokenIdent identifier) {
sequenceNum = incrementDelegationTokenSeqNum(); sequenceNum = incrementDelegationTokenSeqNum();
identifier.setIssueDate(now); identifier.setIssueDate(now);
identifier.setMaxDate(now + tokenMaxLifetime); identifier.setMaxDate(now + tokenMaxLifetime);
identifier.setMasterKeyId(currentId); identifier.setMasterKeyId(currentKey.getKeyId());
identifier.setSequenceNumber(sequenceNum); identifier.setSequenceNumber(sequenceNum);
LOG.info("Creating password for identifier: " + identifier); LOG.info("Creating password for identifier: [" + MD5Hash.digest(identifier.getBytes()) + ", " + currentKey.getKeyId() + "]");
byte[] password = createPassword(identifier.getBytes(), currentKey.getKey()); byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)); + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));

View File

@ -48,7 +48,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -56,6 +55,7 @@
import org.apache.zookeeper.client.ZooKeeperSaslClient; import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -104,6 +104,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot"; private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot";
private static final String ZK_DTSM_SEQNUM_ROOT = "ZKDTSMSeqNumRoot"; private static final String ZK_DTSM_SEQNUM_ROOT = "ZKDTSMSeqNumRoot";
private static final String ZK_DTSM_KEYID_ROOT = "ZKDTSMKeyIdRoot";
private static final String ZK_DTSM_TOKENS_ROOT = "ZKDTSMTokensRoot"; private static final String ZK_DTSM_TOKENS_ROOT = "ZKDTSMTokensRoot";
private static final String ZK_DTSM_MASTER_KEY_ROOT = "ZKDTSMMasterKeyRoot"; private static final String ZK_DTSM_MASTER_KEY_ROOT = "ZKDTSMMasterKeyRoot";
@ -119,7 +120,8 @@ public static void setCurator(CuratorFramework curator) {
private final boolean isExternalClient; private final boolean isExternalClient;
private final CuratorFramework zkClient; private final CuratorFramework zkClient;
private SharedCount seqCounter; private SharedCount delTokSeqCounter;
private SharedCount keyIdSeqCounter;
private PathChildrenCache keyCache; private PathChildrenCache keyCache;
private PathChildrenCache tokenCache; private PathChildrenCache tokenCache;
private ExecutorService listenerThreadPool; private ExecutorService listenerThreadPool;
@ -276,7 +278,7 @@ private String getKrb5LoginModuleName() {
} }
@Override @Override
public synchronized void startThreads() throws IOException { public void startThreads() throws IOException {
if (!isExternalClient) { if (!isExternalClient) {
try { try {
zkClient.start(); zkClient.start();
@ -285,13 +287,21 @@ public synchronized void startThreads() throws IOException {
} }
} }
try { try {
seqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0); delTokSeqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0);
if (seqCounter != null) { if (delTokSeqCounter != null) {
seqCounter.start(); delTokSeqCounter.start();
} }
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Could not start Sequence Counter", e); throw new IOException("Could not start Sequence Counter", e);
} }
try {
keyIdSeqCounter = new SharedCount(zkClient, ZK_DTSM_KEYID_ROOT, 0);
if (keyIdSeqCounter != null) {
keyIdSeqCounter.start();
}
} catch (Exception e) {
throw new IOException("Could not start KeyId Counter", e);
}
try { try {
createPersistentNode(ZK_DTSM_MASTER_KEY_ROOT); createPersistentNode(ZK_DTSM_MASTER_KEY_ROOT);
createPersistentNode(ZK_DTSM_TOKENS_ROOT); createPersistentNode(ZK_DTSM_TOKENS_ROOT);
@ -402,13 +412,16 @@ private void processTokenRemoved(byte[] data) throws IOException {
} }
@Override @Override
public synchronized void stopThreads() { public void stopThreads() {
try { try {
if (!isExternalClient && (zkClient != null)) { if (!isExternalClient && (zkClient != null)) {
zkClient.close(); zkClient.close();
} }
if (seqCounter != null) { if (delTokSeqCounter != null) {
seqCounter.close(); delTokSeqCounter.close();
}
if (keyIdSeqCounter != null) {
keyIdSeqCounter.close();
} }
if (keyCache != null) { if (keyCache != null) {
keyCache.close(); keyCache.close();
@ -434,30 +447,46 @@ private void createPersistentNode(String nodePath) throws Exception {
} }
@Override @Override
protected synchronized int getDelegationTokenSeqNum() { protected int getDelegationTokenSeqNum() {
return seqCounter.getCount(); return delTokSeqCounter.getCount();
} }
@Override @Override
protected synchronized int incrementDelegationTokenSeqNum() { protected int incrementDelegationTokenSeqNum() {
try { try {
while (!seqCounter.trySetCount(seqCounter.getCount() + 1)) { while (!delTokSeqCounter.trySetCount(delTokSeqCounter.getCount() + 1)) {
} }
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("Could not increment shared counter !!", e); throw new RuntimeException("Could not increment shared counter !!", e);
} }
return seqCounter.getCount(); return delTokSeqCounter.getCount();
} }
@Override @Override
protected synchronized void setDelegationTokenSeqNum(int seqNum) { protected void setDelegationTokenSeqNum(int seqNum) {
try { try {
seqCounter.setCount(seqNum); delTokSeqCounter.setCount(seqNum);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("Could not set shared counter !!", e); throw new RuntimeException("Could not set shared counter !!", e);
} }
} }
@Override
protected int getCurrentKeyId() {
return keyIdSeqCounter.getCount();
}
@Override
protected int incrementCurrentKeyId() {
try {
while (!keyIdSeqCounter.trySetCount(keyIdSeqCounter.getCount() + 1)) {
}
} catch (Exception e) {
throw new RuntimeException("Could not increment shared keyId counter !!", e);
}
return keyIdSeqCounter.getCount();
}
@Override @Override
protected DelegationKey getDelegationKey(int keyId) { protected DelegationKey getDelegationKey(int keyId) {
// First check if its I already have this key // First check if its I already have this key
@ -518,6 +547,11 @@ protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident) private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident)
throws IOException { throws IOException {
return getTokenInfoFromZK(ident, false);
}
private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident,
boolean quiet) throws IOException {
String nodePath = String nodePath =
getNodePath(ZK_DTSM_TOKENS_ROOT, getNodePath(ZK_DTSM_TOKENS_ROOT,
DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber()); DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber());
@ -539,7 +573,9 @@ private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident)
return tokenInfo; return tokenInfo;
} }
} catch (KeeperException.NoNodeException e) { } catch (KeeperException.NoNodeException e) {
LOG.error("No node in path [" + nodePath + "]"); if (!quiet) {
LOG.error("No node in path [" + nodePath + "]");
}
} catch (Exception ex) { } catch (Exception ex) {
throw new IOException(ex); throw new IOException(ex);
} }
@ -604,7 +640,9 @@ protected void removeStoredMasterKey(DelegationKey key) {
} }
try { try {
if (zkClient.checkExists().forPath(nodeRemovePath) != null) { if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
zkClient.delete().forPath(nodeRemovePath); while(zkClient.checkExists().forPath(nodeRemovePath) != null){
zkClient.delete().guaranteed().forPath(nodeRemovePath);
}
} else { } else {
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath); LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
} }
@ -633,10 +671,10 @@ protected void updateToken(TokenIdent ident,
+ ident.getSequenceNumber()); + ident.getSequenceNumber());
try { try {
if (zkClient.checkExists().forPath(nodeRemovePath) != null) { if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
addOrUpdateToken(ident, tokenInfo, true);
} else {
addOrUpdateToken(ident, tokenInfo, false); addOrUpdateToken(ident, tokenInfo, false);
LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath); LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
} else {
addOrUpdateToken(ident, tokenInfo, true);
} }
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("Could not update Stored Token ZKDTSMDelegationToken_" throw new RuntimeException("Could not update Stored Token ZKDTSMDelegationToken_"
@ -656,9 +694,11 @@ protected void removeStoredToken(TokenIdent ident)
} }
try { try {
if (zkClient.checkExists().forPath(nodeRemovePath) != null) { if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
LOG.debug("Attempted to remove a non-existing znode " + nodeRemovePath); while(zkClient.checkExists().forPath(nodeRemovePath) != null){
zkClient.delete().guaranteed().forPath(nodeRemovePath);
}
} else { } else {
zkClient.delete().forPath(nodeRemovePath); LOG.debug("Attempted to remove a non-existing znode " + nodeRemovePath);
} }
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException( throw new RuntimeException(
@ -682,7 +722,7 @@ private void addOrUpdateToken(TokenIdent ident,
tokenOut.writeInt(info.getPassword().length); tokenOut.writeInt(info.getPassword().length);
tokenOut.write(info.getPassword()); tokenOut.write(info.getPassword());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug((isUpdate ? "Storing " : "Updating ") LOG.debug((isUpdate ? "Updating " : "Storing ")
+ "ZKDTSMDelegationToken_" + + "ZKDTSMDelegationToken_" +
ident.getSequenceNumber()); ident.getSequenceNumber());
} }

View File

@ -22,50 +22,127 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import static org.junit.Assert.fail;
import org.junit.Test; import org.junit.Test;
public class TestZKDelegationTokenSecretManager { public class TestZKDelegationTokenSecretManager {
private static final long DAY_IN_SECS = 86400; private static final long DAY_IN_SECS = 86400;
@SuppressWarnings("unchecked") private TestingServer zkServer;
@Test
public void testZKDelTokSecretManager() throws Exception { @Before
TestingServer zkServer = new TestingServer(); public void setup() throws Exception {
DelegationTokenManager tm1, tm2 = null; zkServer = new TestingServer();
zkServer.start(); zkServer.start();
try { }
String connectString = zkServer.getConnectString();
Configuration conf = new Configuration();
conf.setBoolean(DelegationTokenManager.ENABLE_ZK_KEY, true);
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString);
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath");
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none");
conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS);
tm1 = new DelegationTokenManager(conf, new Text("foo"));
tm1.init();
tm2 = new DelegationTokenManager(conf, new Text("foo"));
tm2.init();
Token<DelegationTokenIdentifier> token = @After
(Token<DelegationTokenIdentifier>) tm1.createToken( public void tearDown() throws Exception {
UserGroupInformation.getCurrentUser(), "foo"); if (zkServer != null) {
Assert.assertNotNull(token);
tm2.verifyToken(token);
token = (Token<DelegationTokenIdentifier>) tm2.createToken(
UserGroupInformation.getCurrentUser(), "bar");
Assert.assertNotNull(token);
tm1.verifyToken(token);
} finally {
zkServer.close(); zkServer.close();
} }
} }
protected Configuration getSecretConf(String connectString) {
Configuration conf = new Configuration();
conf.setBoolean(DelegationTokenManager.ENABLE_ZK_KEY, true);
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString);
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath");
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none");
conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS);
return conf;
}
@SuppressWarnings("unchecked")
@Test
public void testMultiNodeOperations() throws Exception {
DelegationTokenManager tm1, tm2 = null;
String connectString = zkServer.getConnectString();
Configuration conf = getSecretConf(connectString);
tm1 = new DelegationTokenManager(conf, new Text("bla"));
tm1.init();
tm2 = new DelegationTokenManager(conf, new Text("bla"));
tm2.init();
Token<DelegationTokenIdentifier> token =
(Token<DelegationTokenIdentifier>) tm1.createToken(
UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token);
tm2.verifyToken(token);
tm2.renewToken(token, "foo");
tm1.verifyToken(token);
tm1.cancelToken(token, "foo");
try {
tm2.verifyToken(token);
fail("Expected InvalidToken");
} catch (SecretManager.InvalidToken it) {
// Ignore
}
token = (Token<DelegationTokenIdentifier>) tm2.createToken(
UserGroupInformation.getCurrentUser(), "bar");
Assert.assertNotNull(token);
tm1.verifyToken(token);
tm1.renewToken(token, "bar");
tm2.verifyToken(token);
tm2.cancelToken(token, "bar");
try {
tm1.verifyToken(token);
fail("Expected InvalidToken");
} catch (SecretManager.InvalidToken it) {
// Ignore
}
}
@SuppressWarnings("unchecked")
@Test
public void testRenewTokenSingleManager() throws Exception {
DelegationTokenManager tm1 = null;
String connectString = zkServer.getConnectString();
Configuration conf = getSecretConf(connectString);
tm1 = new DelegationTokenManager(conf, new Text("foo"));
tm1.init();
Token<DelegationTokenIdentifier> token =
(Token<DelegationTokenIdentifier>)
tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token);
tm1.renewToken(token, "foo");
tm1.verifyToken(token);
}
@SuppressWarnings("unchecked")
@Test
public void testCancelTokenSingleManager() throws Exception {
DelegationTokenManager tm1 = null;
String connectString = zkServer.getConnectString();
Configuration conf = getSecretConf(connectString);
tm1 = new DelegationTokenManager(conf, new Text("foo"));
tm1.init();
Token<DelegationTokenIdentifier> token =
(Token<DelegationTokenIdentifier>)
tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token);
tm1.cancelToken(token, "foo");
try {
tm1.verifyToken(token);
fail("Expected InvalidToken");
} catch (SecretManager.InvalidToken it) {
it.printStackTrace();
}
}
} }