HADOOP-11157. ZKDelegationTokenSecretManager never shuts down listenerThreadPool. Contributed by Arun Suresh.

(cherry picked from commit 07d489e6230682e0553840bb1a0e446acb9f8d19)
This commit is contained in:
Aaron T. Myers 2014-11-17 12:57:52 -08:00
parent 6eb88c278c
commit d35eba7b1f
3 changed files with 341 additions and 83 deletions

View File

@ -85,6 +85,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11294. Nfs3FileAttributes should not change the values of rdev, HADOOP-11294. Nfs3FileAttributes should not change the values of rdev,
nlink and size in the constructor. (Brandon Li via wheat9) nlink and size in the constructor. (Brandon Li via wheat9)
HADOOP-11157. ZKDelegationTokenSecretManager never shuts down
listenerThreadPool. (Arun Suresh via atm)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.AppConfigurationEntry;
@ -38,6 +39,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFrameworkFactory.Builder; import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.DefaultACLProvider; import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@ -48,6 +50,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -80,6 +83,8 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
+ "zkSessionTimeout"; + "zkSessionTimeout";
public static final String ZK_DTSM_ZK_CONNECTION_TIMEOUT = ZK_CONF_PREFIX public static final String ZK_DTSM_ZK_CONNECTION_TIMEOUT = ZK_CONF_PREFIX
+ "zkConnectionTimeout"; + "zkConnectionTimeout";
public static final String ZK_DTSM_ZK_SHUTDOWN_TIMEOUT = ZK_CONF_PREFIX
+ "zkShutdownTimeout";
public static final String ZK_DTSM_ZNODE_WORKING_PATH = ZK_CONF_PREFIX public static final String ZK_DTSM_ZNODE_WORKING_PATH = ZK_CONF_PREFIX
+ "znodeWorkingPath"; + "znodeWorkingPath";
public static final String ZK_DTSM_ZK_AUTH_TYPE = ZK_CONF_PREFIX public static final String ZK_DTSM_ZK_AUTH_TYPE = ZK_CONF_PREFIX
@ -94,6 +99,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3; public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3;
public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000; public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000;
public static final int ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT = 10000; public static final int ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT = 10000;
public static final int ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT = 10000;
public static final String ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT = "zkdtsm"; public static final String ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT = "zkdtsm";
private static Logger LOG = LoggerFactory private static Logger LOG = LoggerFactory
@ -125,6 +131,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
private PathChildrenCache keyCache; private PathChildrenCache keyCache;
private PathChildrenCache tokenCache; private PathChildrenCache tokenCache;
private ExecutorService listenerThreadPool; private ExecutorService listenerThreadPool;
private final long shutdownTimeout;
public ZKDelegationTokenSecretManager(Configuration conf) { public ZKDelegationTokenSecretManager(Configuration conf) {
super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL, super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
@ -135,6 +142,8 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
DelegationTokenManager.RENEW_INTERVAL_DEFAULT * 1000), DelegationTokenManager.RENEW_INTERVAL_DEFAULT * 1000),
conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL,
DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000); DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000);
shutdownTimeout = conf.getLong(ZK_DTSM_ZK_SHUTDOWN_TIMEOUT,
ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
if (CURATOR_TL.get() != null) { if (CURATOR_TL.get() != null) {
zkClient = zkClient =
CURATOR_TL.get().usingNamespace( CURATOR_TL.get().usingNamespace(
@ -199,7 +208,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
.build(); .build();
isExternalClient = false; isExternalClient = false;
} }
listenerThreadPool = Executors.newFixedThreadPool(2);
} }
private String setJaasConfiguration(Configuration config) throws Exception { private String setJaasConfiguration(Configuration config) throws Exception {
@ -290,6 +298,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
throw new IOException("Could not start Curator Framework", e); throw new IOException("Could not start Curator Framework", e);
} }
} }
listenerThreadPool = Executors.newSingleThreadExecutor();
try { try {
delTokSeqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0); delTokSeqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0);
if (delTokSeqCounter != null) { if (delTokSeqCounter != null) {
@ -315,7 +324,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
try { try {
keyCache = new PathChildrenCache(zkClient, ZK_DTSM_MASTER_KEY_ROOT, true); keyCache = new PathChildrenCache(zkClient, ZK_DTSM_MASTER_KEY_ROOT, true);
if (keyCache != null) { if (keyCache != null) {
keyCache.start(StartMode.POST_INITIALIZED_EVENT); keyCache.start(StartMode.BUILD_INITIAL_CACHE);
keyCache.getListenable().addListener(new PathChildrenCacheListener() { keyCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override @Override
public void childEvent(CuratorFramework client, public void childEvent(CuratorFramework client,
@ -343,7 +352,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
try { try {
tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true); tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true);
if (tokenCache != null) { if (tokenCache != null) {
tokenCache.start(StartMode.POST_INITIALIZED_EVENT); tokenCache.start(StartMode.BUILD_INITIAL_CACHE);
tokenCache.getListenable().addListener(new PathChildrenCacheListener() { tokenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override @Override
@ -351,13 +360,13 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
PathChildrenCacheEvent event) throws Exception { PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) { switch (event.getType()) {
case CHILD_ADDED: case CHILD_ADDED:
processTokenAddOrUpdate(event.getData().getData()); processTokenAddOrUpdate(event.getData());
break; break;
case CHILD_UPDATED: case CHILD_UPDATED:
processTokenAddOrUpdate(event.getData().getData()); processTokenAddOrUpdate(event.getData());
break; break;
case CHILD_REMOVED: case CHILD_REMOVED:
processTokenRemoved(event.getData().getData()); processTokenRemoved(event.getData());
break; break;
default: default:
break; break;
@ -376,7 +385,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
DataInputStream din = new DataInputStream(bin); DataInputStream din = new DataInputStream(bin);
DelegationKey key = new DelegationKey(); DelegationKey key = new DelegationKey();
key.readFields(din); key.readFields(din);
allKeys.put(key.getKeyId(), key); synchronized (this) {
allKeys.put(key.getKeyId(), key);
}
} }
private void processKeyRemoved(String path) { private void processKeyRemoved(String path) {
@ -386,13 +397,15 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
int j = tokSeg.indexOf('_'); int j = tokSeg.indexOf('_');
if (j > 0) { if (j > 0) {
int keyId = Integer.parseInt(tokSeg.substring(j + 1)); int keyId = Integer.parseInt(tokSeg.substring(j + 1));
allKeys.remove(keyId); synchronized (this) {
allKeys.remove(keyId);
}
} }
} }
} }
private void processTokenAddOrUpdate(byte[] data) throws IOException { private void processTokenAddOrUpdate(ChildData data) throws IOException {
ByteArrayInputStream bin = new ByteArrayInputStream(data); ByteArrayInputStream bin = new ByteArrayInputStream(data.getData());
DataInputStream din = new DataInputStream(bin); DataInputStream din = new DataInputStream(bin);
TokenIdent ident = createIdentifier(); TokenIdent ident = createIdentifier();
ident.readFields(din); ident.readFields(din);
@ -403,41 +416,78 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
if (numRead > -1) { if (numRead > -1) {
DelegationTokenInformation tokenInfo = DelegationTokenInformation tokenInfo =
new DelegationTokenInformation(renewDate, password); new DelegationTokenInformation(renewDate, password);
currentTokens.put(ident, tokenInfo); synchronized (this) {
currentTokens.put(ident, tokenInfo);
// The cancel task might be waiting
notifyAll();
}
} }
} }
private void processTokenRemoved(byte[] data) throws IOException { private void processTokenRemoved(ChildData data) throws IOException {
ByteArrayInputStream bin = new ByteArrayInputStream(data); ByteArrayInputStream bin = new ByteArrayInputStream(data.getData());
DataInputStream din = new DataInputStream(bin); DataInputStream din = new DataInputStream(bin);
TokenIdent ident = createIdentifier(); TokenIdent ident = createIdentifier();
ident.readFields(din); ident.readFields(din);
currentTokens.remove(ident); synchronized (this) {
currentTokens.remove(ident);
// The cancel task might be waiting
notifyAll();
}
} }
@Override @Override
public void stopThreads() { public void stopThreads() {
super.stopThreads();
try { try {
if (!isExternalClient && (zkClient != null)) {
zkClient.close();
}
if (delTokSeqCounter != null) {
delTokSeqCounter.close();
}
if (keyIdSeqCounter != null) {
keyIdSeqCounter.close();
}
if (keyCache != null) {
keyCache.close();
}
if (tokenCache != null) { if (tokenCache != null) {
tokenCache.close(); tokenCache.close();
} }
} catch (Exception e) { } catch (Exception e) {
LOG.error("Could not stop Curator Framework", e); LOG.error("Could not stop Delegation Token Cache", e);
// Ignore }
try {
if (delTokSeqCounter != null) {
delTokSeqCounter.close();
}
} catch (Exception e) {
LOG.error("Could not stop Delegation Token Counter", e);
}
try {
if (keyIdSeqCounter != null) {
keyIdSeqCounter.close();
}
} catch (Exception e) {
LOG.error("Could not stop Key Id Counter", e);
}
try {
if (keyCache != null) {
keyCache.close();
}
} catch (Exception e) {
LOG.error("Could not stop KeyCache", e);
}
try {
if (!isExternalClient && (zkClient != null)) {
zkClient.close();
}
} catch (Exception e) {
LOG.error("Could not stop Curator Framework", e);
}
if (listenerThreadPool != null) {
listenerThreadPool.shutdown();
try {
// wait for existing tasks to terminate
if (!listenerThreadPool.awaitTermination(shutdownTimeout,
TimeUnit.MILLISECONDS)) {
LOG.error("Forcing Listener threadPool to shutdown !!");
listenerThreadPool.shutdownNow();
}
} catch (InterruptedException ie) {
listenerThreadPool.shutdownNow();
Thread.currentThread().interrupt();
}
} }
super.stopThreads();
} }
private void createPersistentNode(String nodePath) throws Exception { private void createPersistentNode(String nodePath) throws Exception {
@ -460,6 +510,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
try { try {
while (!delTokSeqCounter.trySetCount(delTokSeqCounter.getCount() + 1)) { while (!delTokSeqCounter.trySetCount(delTokSeqCounter.getCount() + 1)) {
} }
} catch (InterruptedException e) {
// The ExpirationThread is just finishing.. so dont do anything..
LOG.debug("Thread interrupted while performing token counter increment", e);
Thread.currentThread().interrupt();
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("Could not increment shared counter !!", e); throw new RuntimeException("Could not increment shared counter !!", e);
} }
@ -485,6 +539,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
try { try {
while (!keyIdSeqCounter.trySetCount(keyIdSeqCounter.getCount() + 1)) { while (!keyIdSeqCounter.trySetCount(keyIdSeqCounter.getCount() + 1)) {
} }
} catch (InterruptedException e) {
// The ExpirationThread is just finishing.. so dont do anything..
LOG.debug("Thread interrupted while performing keyId increment", e);
Thread.currentThread().interrupt();
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("Could not increment shared keyId counter !!", e); throw new RuntimeException("Could not increment shared keyId counter !!", e);
} }
@ -588,13 +646,11 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
@Override @Override
protected void storeDelegationKey(DelegationKey key) throws IOException { protected void storeDelegationKey(DelegationKey key) throws IOException {
allKeys.put(key.getKeyId(), key);
addOrUpdateDelegationKey(key, false); addOrUpdateDelegationKey(key, false);
} }
@Override @Override
protected void updateDelegationKey(DelegationKey key) throws IOException { protected void updateDelegationKey(DelegationKey key) throws IOException {
allKeys.put(key.getKeyId(), key);
addOrUpdateDelegationKey(key, true); addOrUpdateDelegationKey(key, true);
} }
@ -658,7 +714,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
@Override @Override
protected void storeToken(TokenIdent ident, protected void storeToken(TokenIdent ident,
DelegationTokenInformation tokenInfo) throws IOException { DelegationTokenInformation tokenInfo) throws IOException {
currentTokens.put(ident, tokenInfo);
try { try {
addOrUpdateToken(ident, tokenInfo, false); addOrUpdateToken(ident, tokenInfo, false);
} catch (Exception e) { } catch (Exception e) {
@ -669,7 +724,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
@Override @Override
protected void updateToken(TokenIdent ident, protected void updateToken(TokenIdent ident,
DelegationTokenInformation tokenInfo) throws IOException { DelegationTokenInformation tokenInfo) throws IOException {
currentTokens.put(ident, tokenInfo);
String nodeRemovePath = String nodeRemovePath =
getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
+ ident.getSequenceNumber()); + ident.getSequenceNumber());
@ -711,6 +765,25 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
} }
} }
@Override
public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
String canceller) throws IOException {
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
TokenIdent id = createIdentifier();
id.readFields(in);
try {
if (!currentTokens.containsKey(id)) {
// See if token can be retrieved and placed in currentTokens
getTokenInfo(id);
}
return super.cancelToken(token, canceller);
} catch (Exception e) {
LOG.error("Exception while checking if token exist !!", e);
return id;
}
}
private void addOrUpdateToken(TokenIdent ident, private void addOrUpdateToken(TokenIdent ident,
DelegationTokenInformation info, boolean isUpdate) throws Exception { DelegationTokenInformation info, boolean isUpdate) throws Exception {
String nodeCreatePath = String nodeCreatePath =
@ -772,4 +845,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
static String getNodePath(String root, String nodeName) { static String getNodePath(String root, String nodeName) {
return (root + "/" + nodeName); return (root + "/" + nodeName);
} }
@VisibleForTesting
public ExecutorService getListenerThreadPool() {
return listenerThreadPool;
}
} }

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.security.token.delegation; package org.apache.hadoop.security.token.delegation;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import org.apache.curator.test.TestingServer; import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -36,6 +40,12 @@ import org.junit.Test;
public class TestZKDelegationTokenSecretManager { public class TestZKDelegationTokenSecretManager {
private static final int TEST_RETRIES = 2;
private static final int RETRY_COUNT = 5;
private static final int RETRY_WAIT = 1000;
private static final long DAY_IN_SECS = 86400; private static final long DAY_IN_SECS = 86400;
private TestingServer zkServer; private TestingServer zkServer;
@ -59,6 +69,7 @@ public class TestZKDelegationTokenSecretManager {
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString); conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString);
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath"); conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath");
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none"); conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none");
conf.setLong(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, 100);
conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS); conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS); conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS); conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS);
@ -69,80 +80,246 @@ public class TestZKDelegationTokenSecretManager {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public void testMultiNodeOperations() throws Exception { public void testMultiNodeOperations() throws Exception {
DelegationTokenManager tm1, tm2 = null; for (int i = 0; i < TEST_RETRIES; i++) {
String connectString = zkServer.getConnectString(); DelegationTokenManager tm1, tm2 = null;
Configuration conf = getSecretConf(connectString); String connectString = zkServer.getConnectString();
tm1 = new DelegationTokenManager(conf, new Text("bla")); Configuration conf = getSecretConf(connectString);
tm1.init(); tm1 = new DelegationTokenManager(conf, new Text("bla"));
tm2 = new DelegationTokenManager(conf, new Text("bla")); tm1.init();
tm2.init(); tm2 = new DelegationTokenManager(conf, new Text("bla"));
tm2.init();
Token<DelegationTokenIdentifier> token = Token<DelegationTokenIdentifier> token =
(Token<DelegationTokenIdentifier>) tm1.createToken( (Token<DelegationTokenIdentifier>) tm1.createToken(
UserGroupInformation.getCurrentUser(), "foo"); UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token); Assert.assertNotNull(token);
tm2.verifyToken(token);
tm2.renewToken(token, "foo");
tm1.verifyToken(token);
tm1.cancelToken(token, "foo");
try {
tm2.verifyToken(token); tm2.verifyToken(token);
fail("Expected InvalidToken"); tm2.renewToken(token, "foo");
} catch (SecretManager.InvalidToken it) {
// Ignore
}
token = (Token<DelegationTokenIdentifier>) tm2.createToken(
UserGroupInformation.getCurrentUser(), "bar");
Assert.assertNotNull(token);
tm1.verifyToken(token);
tm1.renewToken(token, "bar");
tm2.verifyToken(token);
tm2.cancelToken(token, "bar");
try {
tm1.verifyToken(token); tm1.verifyToken(token);
fail("Expected InvalidToken"); tm1.cancelToken(token, "foo");
} catch (SecretManager.InvalidToken it) { try {
// Ignore verifyTokenFail(tm2, token);
fail("Expected InvalidToken");
} catch (SecretManager.InvalidToken it) {
// Ignore
}
token = (Token<DelegationTokenIdentifier>) tm2.createToken(
UserGroupInformation.getCurrentUser(), "bar");
Assert.assertNotNull(token);
tm1.verifyToken(token);
tm1.renewToken(token, "bar");
tm2.verifyToken(token);
tm2.cancelToken(token, "bar");
try {
verifyTokenFail(tm1, token);
fail("Expected InvalidToken");
} catch (SecretManager.InvalidToken it) {
// Ignore
}
verifyDestroy(tm1, conf);
verifyDestroy(tm2, conf);
}
}
@SuppressWarnings("unchecked")
@Test
public void testNodeUpAferAWhile() throws Exception {
for (int i = 0; i < TEST_RETRIES; i++) {
String connectString = zkServer.getConnectString();
Configuration conf = getSecretConf(connectString);
DelegationTokenManager tm1 = new DelegationTokenManager(conf, new Text("bla"));
tm1.init();
Token<DelegationTokenIdentifier> token1 =
(Token<DelegationTokenIdentifier>) tm1.createToken(
UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token1);
Token<DelegationTokenIdentifier> token2 =
(Token<DelegationTokenIdentifier>) tm1.createToken(
UserGroupInformation.getCurrentUser(), "bar");
Assert.assertNotNull(token2);
Token<DelegationTokenIdentifier> token3 =
(Token<DelegationTokenIdentifier>) tm1.createToken(
UserGroupInformation.getCurrentUser(), "boo");
Assert.assertNotNull(token3);
tm1.verifyToken(token1);
tm1.verifyToken(token2);
tm1.verifyToken(token3);
// Cancel one token
tm1.cancelToken(token1, "foo");
// Start second node after some time..
Thread.sleep(1000);
DelegationTokenManager tm2 = new DelegationTokenManager(conf, new Text("bla"));
tm2.init();
tm2.verifyToken(token2);
tm2.verifyToken(token3);
try {
verifyTokenFail(tm2, token1);
fail("Expected InvalidToken");
} catch (SecretManager.InvalidToken it) {
// Ignore
}
// Create a new token thru the new ZKDTSM
Token<DelegationTokenIdentifier> token4 =
(Token<DelegationTokenIdentifier>) tm2.createToken(
UserGroupInformation.getCurrentUser(), "xyz");
Assert.assertNotNull(token4);
tm2.verifyToken(token4);
tm1.verifyToken(token4);
// Bring down tm2
verifyDestroy(tm2, conf);
// Start third node after some time..
Thread.sleep(1000);
DelegationTokenManager tm3 = new DelegationTokenManager(conf, new Text("bla"));
tm3.init();
tm3.verifyToken(token2);
tm3.verifyToken(token3);
tm3.verifyToken(token4);
try {
verifyTokenFail(tm3, token1);
fail("Expected InvalidToken");
} catch (SecretManager.InvalidToken it) {
// Ignore
}
verifyDestroy(tm3, conf);
verifyDestroy(tm1, conf);
} }
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public void testRenewTokenSingleManager() throws Exception { public void testRenewTokenSingleManager() throws Exception {
DelegationTokenManager tm1 = null; for (int i = 0; i < TEST_RETRIES; i++) {
String connectString = zkServer.getConnectString(); DelegationTokenManager tm1 = null;
Configuration conf = getSecretConf(connectString); String connectString = zkServer.getConnectString();
tm1 = new DelegationTokenManager(conf, new Text("foo")); Configuration conf = getSecretConf(connectString);
tm1.init(); tm1 = new DelegationTokenManager(conf, new Text("foo"));
tm1.init();
Token<DelegationTokenIdentifier> token = Token<DelegationTokenIdentifier> token =
(Token<DelegationTokenIdentifier>) (Token<DelegationTokenIdentifier>)
tm1.createToken(UserGroupInformation.getCurrentUser(), "foo"); tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token); Assert.assertNotNull(token);
tm1.renewToken(token, "foo"); tm1.renewToken(token, "foo");
tm1.verifyToken(token); tm1.verifyToken(token);
verifyDestroy(tm1, conf);
}
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public void testCancelTokenSingleManager() throws Exception { public void testCancelTokenSingleManager() throws Exception {
for (int i = 0; i < TEST_RETRIES; i++) {
DelegationTokenManager tm1 = null;
String connectString = zkServer.getConnectString();
Configuration conf = getSecretConf(connectString);
tm1 = new DelegationTokenManager(conf, new Text("foo"));
tm1.init();
Token<DelegationTokenIdentifier> token =
(Token<DelegationTokenIdentifier>)
tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token);
tm1.cancelToken(token, "foo");
try {
verifyTokenFail(tm1, token);
fail("Expected InvalidToken");
} catch (SecretManager.InvalidToken it) {
it.printStackTrace();
}
verifyDestroy(tm1, conf);
}
}
@SuppressWarnings("rawtypes")
protected void verifyDestroy(DelegationTokenManager tm, Configuration conf)
throws Exception {
AbstractDelegationTokenSecretManager sm =
tm.getDelegationTokenSecretManager();
ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager) sm;
ExecutorService es = zksm.getListenerThreadPool();
tm.destroy();
Assert.assertTrue(es.isShutdown());
// wait for the pool to terminate
long timeout =
conf.getLong(
ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT,
ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
Thread.sleep(timeout * 3);
Assert.assertTrue(es.isTerminated());
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testStopThreads() throws Exception {
DelegationTokenManager tm1 = null; DelegationTokenManager tm1 = null;
String connectString = zkServer.getConnectString(); String connectString = zkServer.getConnectString();
// let's make the update interval short and the shutdown interval
// comparatively longer, so if the update thread runs after shutdown,
// it will cause an error.
final long updateIntervalSeconds = 1;
final long shutdownTimeoutMillis = updateIntervalSeconds * 1000 * 5;
Configuration conf = getSecretConf(connectString); Configuration conf = getSecretConf(connectString);
conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, updateIntervalSeconds);
conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, updateIntervalSeconds);
conf.setLong(DelegationTokenManager.RENEW_INTERVAL, updateIntervalSeconds);
conf.setLong(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, shutdownTimeoutMillis);
tm1 = new DelegationTokenManager(conf, new Text("foo")); tm1 = new DelegationTokenManager(conf, new Text("foo"));
tm1.init(); tm1.init();
Token<DelegationTokenIdentifier> token = Token<DelegationTokenIdentifier> token =
(Token<DelegationTokenIdentifier>) (Token<DelegationTokenIdentifier>)
tm1.createToken(UserGroupInformation.getCurrentUser(), "foo"); tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token); Assert.assertNotNull(token);
tm1.cancelToken(token, "foo");
AbstractDelegationTokenSecretManager sm = tm1.getDelegationTokenSecretManager();
ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager)sm;
ExecutorService es = zksm.getListenerThreadPool();
es.submit(new Callable<Void>() {
public Void call() throws Exception {
Thread.sleep(shutdownTimeoutMillis * 2); // force this to be shutdownNow
return null;
}
});
tm1.destroy();
}
// Since it is possible that there can be a delay for the cancel token message
// initiated by one node to reach another node.. The second node can ofcourse
// verify with ZK directly if the token that needs verification has been
// cancelled but.. that would mean having to make an RPC call for every
// verification request.
// Thus, the eventual consistency tradef-off should be acceptable here...
private void verifyTokenFail(DelegationTokenManager tm,
Token<DelegationTokenIdentifier> token) throws IOException,
InterruptedException {
verifyTokenFailWithRetry(tm, token, RETRY_COUNT);
}
private void verifyTokenFailWithRetry(DelegationTokenManager tm,
Token<DelegationTokenIdentifier> token, int retryCount)
throws IOException, InterruptedException {
try { try {
tm1.verifyToken(token); tm.verifyToken(token);
fail("Expected InvalidToken"); } catch (SecretManager.InvalidToken er) {
} catch (SecretManager.InvalidToken it) { throw er;
it.printStackTrace(); }
if (retryCount > 0) {
Thread.sleep(RETRY_WAIT);
verifyTokenFailWithRetry(tm, token, retryCount - 1);
} }
} }
} }