Hadoop-18519. Backport HDFS-15383 and HADOOP-17835 to branch-3.3 (#5112)

* HDFS-15383. RBF: Add support for router delegation token without watch (#2047)

Improving router's performance for delegation tokens related operations. It achieves the goal by removing watchers from router on tokens since based on our experience. The huge number of watches inside Zookeeper is degrading Zookeeper's performance pretty hard. The current limit is about 1.2-1.5 million.

* HADOOP-17835. Use CuratorCache implementation instead of PathChildrenCache / TreeCache (#3266)

Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
Co-authored-by: lfengnan <lfengnan@uber.com>
Co-authored-by: Viraj Jasani <vjasani@apache.org>
Co-authored-by: Melissa You <myou@myou-mn1.linkedin.biz>
This commit is contained in:
Melissa You 2022-11-07 13:29:50 -08:00 committed by GitHub
parent 853ffb545a
commit 02aedd7811
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 615 additions and 253 deletions

View File

@ -23,11 +23,11 @@ import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.crypto.SecretKey; import javax.crypto.SecretKey;
@ -58,10 +58,9 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public abstract public abstract class AbstractDelegationTokenSecretManager<TokenIdent
class AbstractDelegationTokenSecretManager<TokenIdent extends AbstractDelegationTokenIdentifier>
extends AbstractDelegationTokenIdentifier> extends SecretManager<TokenIdent> {
extends SecretManager<TokenIdent> {
private static final Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory
.getLogger(AbstractDelegationTokenSecretManager.class); .getLogger(AbstractDelegationTokenSecretManager.class);
@ -80,7 +79,7 @@ extends AbstractDelegationTokenIdentifier>
* to DelegationTokenInformation. Protected by this object lock. * to DelegationTokenInformation. Protected by this object lock.
*/ */
protected final Map<TokenIdent, DelegationTokenInformation> currentTokens protected final Map<TokenIdent, DelegationTokenInformation> currentTokens
= new HashMap<TokenIdent, DelegationTokenInformation>(); = new ConcurrentHashMap<>();
/** /**
* Sequence number to create DelegationTokenIdentifier. * Sequence number to create DelegationTokenIdentifier.
@ -89,17 +88,17 @@ extends AbstractDelegationTokenIdentifier>
protected int delegationTokenSequenceNumber = 0; protected int delegationTokenSequenceNumber = 0;
/** /**
* Access to allKeys is protected by this object lock * Access to allKeys is protected by this object lock.
*/ */
protected final Map<Integer, DelegationKey> allKeys protected final Map<Integer, DelegationKey> allKeys
= new HashMap<Integer, DelegationKey>(); = new ConcurrentHashMap<>();
/** /**
* Access to currentId is protected by this object lock. * Access to currentId is protected by this object lock.
*/ */
protected int currentId = 0; protected int currentId = 0;
/** /**
* Access to currentKey is protected by this object lock * Access to currentKey is protected by this object lock.
*/ */
private DelegationKey currentKey; private DelegationKey currentKey;
@ -122,7 +121,7 @@ extends AbstractDelegationTokenIdentifier>
protected Object noInterruptsLock = new Object(); protected Object noInterruptsLock = new Object();
/** /**
* Create a secret manager * Create a secret manager.
* @param delegationKeyUpdateInterval the number of milliseconds for rolling * @param delegationKeyUpdateInterval the number of milliseconds for rolling
* new secret keys. * new secret keys.
* @param delegationTokenMaxLifetime the maximum lifetime of the delegation * @param delegationTokenMaxLifetime the maximum lifetime of the delegation
@ -183,8 +182,9 @@ extends AbstractDelegationTokenIdentifier>
* @throws IOException raised on errors performing I/O. * @throws IOException raised on errors performing I/O.
*/ */
public synchronized void addKey(DelegationKey key) throws IOException { public synchronized void addKey(DelegationKey key) throws IOException {
if (running) // a safety check if (running) { // a safety check
throw new IOException("Can't add delegation key to a running SecretManager."); throw new IOException("Can't add delegation key to a running SecretManager.");
}
if (key.getKeyId() > getCurrentKeyId()) { if (key.getKeyId() > getCurrentKeyId()) {
setCurrentKeyId(key.getKeyId()); setCurrentKeyId(key.getKeyId());
} }
@ -453,8 +453,9 @@ extends AbstractDelegationTokenIdentifier>
it.remove(); it.remove();
// ensure the tokens generated by this current key can be recovered // ensure the tokens generated by this current key can be recovered
// with this current key after this current key is rolled // with this current key after this current key is rolled
if(!e.getValue().equals(currentKey)) if(!e.getValue().equals(currentKey)) {
removeStoredMasterKey(e.getValue()); removeStoredMasterKey(e.getValue());
}
} }
} }
} }
@ -729,8 +730,9 @@ extends AbstractDelegationTokenIdentifier>
} }
public void stopThreads() { public void stopThreads() {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled()) {
LOG.debug("Stopping expired delegation token remover thread"); LOG.debug("Stopping expired delegation token remover thread");
}
running = false; running = false;
if (tokenRemoverThread != null) { if (tokenRemoverThread != null) {

View File

@ -23,12 +23,11 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ExecutorService; import java.util.stream.Stream;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -37,10 +36,9 @@ import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.DefaultACLProvider; import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.shared.SharedCount; import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.VersionedValue; import org.apache.curator.framework.recipes.shared.VersionedValue;
import org.apache.curator.retry.RetryNTimes; import org.apache.curator.retry.RetryNTimes;
@ -54,6 +52,7 @@ import org.apache.hadoop.security.authentication.util.JaasConfiguration;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import org.apache.hadoop.util.curator.ZKCuratorManager; import org.apache.hadoop.util.curator.ZKCuratorManager;
import static org.apache.hadoop.util.Time.now;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NoNodeException;
@ -78,7 +77,7 @@ import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
public abstract class ZKDelegationTokenSecretManager<TokenIdent extends AbstractDelegationTokenIdentifier> public abstract class ZKDelegationTokenSecretManager<TokenIdent extends AbstractDelegationTokenIdentifier>
extends AbstractDelegationTokenSecretManager<TokenIdent> { extends AbstractDelegationTokenSecretManager<TokenIdent> {
private static final String ZK_CONF_PREFIX = "zk-dt-secret-manager."; public static final String ZK_CONF_PREFIX = "zk-dt-secret-manager.";
public static final String ZK_DTSM_ZK_NUM_RETRIES = ZK_CONF_PREFIX public static final String ZK_DTSM_ZK_NUM_RETRIES = ZK_CONF_PREFIX
+ "zkNumRetries"; + "zkNumRetries";
public static final String ZK_DTSM_ZK_SESSION_TIMEOUT = ZK_CONF_PREFIX public static final String ZK_DTSM_ZK_SESSION_TIMEOUT = ZK_CONF_PREFIX
@ -101,6 +100,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
+ "kerberos.server.principal"; + "kerberos.server.principal";
public static final String ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE = ZK_CONF_PREFIX public static final String ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE = ZK_CONF_PREFIX
+ "token.seqnum.batch.size"; + "token.seqnum.batch.size";
public static final String ZK_DTSM_TOKEN_WATCHER_ENABLED = ZK_CONF_PREFIX
+ "token.watcher.enabled";
public static final boolean ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT = true;
public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3; public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3;
public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000; public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000;
@ -110,7 +112,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
// by default it is still incrementing seq number by 1 each time // by default it is still incrementing seq number by 1 each time
public static final int ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT = 1; public static final int ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT = 1;
private static Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory
.getLogger(ZKDelegationTokenSecretManager.class); .getLogger(ZKDelegationTokenSecretManager.class);
private static final String JAAS_LOGIN_ENTRY_NAME = private static final String JAAS_LOGIN_ENTRY_NAME =
@ -119,7 +121,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot"; private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot";
private static final String ZK_DTSM_SEQNUM_ROOT = "/ZKDTSMSeqNumRoot"; private static final String ZK_DTSM_SEQNUM_ROOT = "/ZKDTSMSeqNumRoot";
private static final String ZK_DTSM_KEYID_ROOT = "/ZKDTSMKeyIdRoot"; private static final String ZK_DTSM_KEYID_ROOT = "/ZKDTSMKeyIdRoot";
private static final String ZK_DTSM_TOKENS_ROOT = "/ZKDTSMTokensRoot"; protected static final String ZK_DTSM_TOKENS_ROOT = "/ZKDTSMTokensRoot";
private static final String ZK_DTSM_MASTER_KEY_ROOT = "/ZKDTSMMasterKeyRoot"; private static final String ZK_DTSM_MASTER_KEY_ROOT = "/ZKDTSMMasterKeyRoot";
private static final String DELEGATION_KEY_PREFIX = "DK_"; private static final String DELEGATION_KEY_PREFIX = "DK_";
@ -133,17 +135,17 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
} }
private final boolean isExternalClient; private final boolean isExternalClient;
private final CuratorFramework zkClient; protected final CuratorFramework zkClient;
private SharedCount delTokSeqCounter; private SharedCount delTokSeqCounter;
private SharedCount keyIdSeqCounter; private SharedCount keyIdSeqCounter;
private PathChildrenCache keyCache; private CuratorCacheBridge keyCache;
private PathChildrenCache tokenCache; private CuratorCacheBridge tokenCache;
private ExecutorService listenerThreadPool;
private final long shutdownTimeout;
private final int seqNumBatchSize; private final int seqNumBatchSize;
private int currentSeqNum; private int currentSeqNum;
private int currentMaxSeqNum; private int currentMaxSeqNum;
private final boolean isTokenWatcherEnabled;
public ZKDelegationTokenSecretManager(Configuration conf) { public ZKDelegationTokenSecretManager(Configuration conf) {
super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL, super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
DelegationTokenManager.UPDATE_INTERVAL_DEFAULT) * 1000, DelegationTokenManager.UPDATE_INTERVAL_DEFAULT) * 1000,
@ -153,10 +155,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
DelegationTokenManager.RENEW_INTERVAL_DEFAULT) * 1000, DelegationTokenManager.RENEW_INTERVAL_DEFAULT) * 1000,
conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL,
DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000); DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000);
shutdownTimeout = conf.getLong(ZK_DTSM_ZK_SHUTDOWN_TIMEOUT,
ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
seqNumBatchSize = conf.getInt(ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE, seqNumBatchSize = conf.getInt(ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT); ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED,
ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT);
if (CURATOR_TL.get() != null) { if (CURATOR_TL.get() != null) {
zkClient = zkClient =
CURATOR_TL.get().usingNamespace( CURATOR_TL.get().usingNamespace(
@ -259,14 +261,13 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
// So, let's explicitly create them. // So, let's explicitly create them.
CuratorFramework nullNsFw = zkClient.usingNamespace(null); CuratorFramework nullNsFw = zkClient.usingNamespace(null);
EnsurePath ensureNs = EnsurePath ensureNs =
nullNsFw.newNamespaceAwareEnsurePath("/" + zkClient.getNamespace()); nullNsFw.newNamespaceAwareEnsurePath("/" + zkClient.getNamespace());
try { try {
ensureNs.ensure(nullNsFw.getZookeeperClient()); ensureNs.ensure(nullNsFw.getZookeeperClient());
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Could not create namespace", e); throw new IOException("Could not create namespace", e);
} }
} }
listenerThreadPool = Executors.newSingleThreadExecutor();
try { try {
delTokSeqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0); delTokSeqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0);
if (delTokSeqCounter != null) { if (delTokSeqCounter != null) {
@ -296,68 +297,65 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
throw new RuntimeException("Could not create ZK paths"); throw new RuntimeException("Could not create ZK paths");
} }
try { try {
keyCache = new PathChildrenCache(zkClient, ZK_DTSM_MASTER_KEY_ROOT, true); keyCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_MASTER_KEY_ROOT)
if (keyCache != null) { .build();
keyCache.start(StartMode.BUILD_INITIAL_CACHE); CuratorCacheListener keyCacheListener = CuratorCacheListener.builder()
keyCache.getListenable().addListener(new PathChildrenCacheListener() { .forCreatesAndChanges((oldNode, node) -> {
@Override try {
public void childEvent(CuratorFramework client, processKeyAddOrUpdate(node.getData());
PathChildrenCacheEvent event) } catch (IOException e) {
throws Exception { LOG.error("Error while processing Curator keyCacheListener "
switch (event.getType()) { + "NODE_CREATED / NODE_CHANGED event");
case CHILD_ADDED: throw new UncheckedIOException(e);
processKeyAddOrUpdate(event.getData().getData());
break;
case CHILD_UPDATED:
processKeyAddOrUpdate(event.getData().getData());
break;
case CHILD_REMOVED:
processKeyRemoved(event.getData().getPath());
break;
default:
break;
} }
} })
}, listenerThreadPool); .forDeletes(childData -> processKeyRemoved(childData.getPath()))
loadFromZKCache(false); .build();
} keyCache.listenable().addListener(keyCacheListener);
keyCache.start();
loadFromZKCache(false);
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Could not start PathChildrenCache for keys", e); throw new IOException("Could not start Curator keyCacheListener for keys",
e);
} }
try { if (isTokenWatcherEnabled) {
tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true); LOG.info("TokenCache is enabled");
if (tokenCache != null) { try {
tokenCache.start(StartMode.BUILD_INITIAL_CACHE); tokenCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_TOKENS_ROOT)
tokenCache.getListenable().addListener(new PathChildrenCacheListener() { .build();
CuratorCacheListener tokenCacheListener = CuratorCacheListener.builder()
@Override .forCreatesAndChanges((oldNode, node) -> {
public void childEvent(CuratorFramework client, try {
PathChildrenCacheEvent event) throws Exception { processTokenAddOrUpdate(node.getData());
switch (event.getType()) { } catch (IOException e) {
case CHILD_ADDED: LOG.error("Error while processing Curator tokenCacheListener "
processTokenAddOrUpdate(event.getData()); + "NODE_CREATED / NODE_CHANGED event");
break; throw new UncheckedIOException(e);
case CHILD_UPDATED: }
processTokenAddOrUpdate(event.getData()); })
break; .forDeletes(childData -> {
case CHILD_REMOVED: try {
processTokenRemoved(event.getData()); processTokenRemoved(childData);
break; } catch (IOException e) {
default: LOG.error("Error while processing Curator tokenCacheListener "
break; + "NODE_DELETED event");
} throw new UncheckedIOException(e);
} }
}, listenerThreadPool); })
.build();
tokenCache.listenable().addListener(tokenCacheListener);
tokenCache.start();
loadFromZKCache(true); loadFromZKCache(true);
} catch (Exception e) {
throw new IOException(
"Could not start Curator tokenCacheListener for tokens", e);
} }
} catch (Exception e) {
throw new IOException("Could not start PathChildrenCache for tokens", e);
} }
super.startThreads(); super.startThreads();
} }
/** /**
* Load the PathChildrenCache into the in-memory map. Possible caches to be * Load the CuratorCache into the in-memory map. Possible caches to be
* loaded are keyCache and tokenCache. * loaded are keyCache and tokenCache.
* *
* @param isTokenCache true if loading tokenCache, false if loading keyCache. * @param isTokenCache true if loading tokenCache, false if loading keyCache.
@ -365,30 +363,31 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
private void loadFromZKCache(final boolean isTokenCache) { private void loadFromZKCache(final boolean isTokenCache) {
final String cacheName = isTokenCache ? "token" : "key"; final String cacheName = isTokenCache ? "token" : "key";
LOG.info("Starting to load {} cache.", cacheName); LOG.info("Starting to load {} cache.", cacheName);
final List<ChildData> children; final Stream<ChildData> children;
if (isTokenCache) { if (isTokenCache) {
children = tokenCache.getCurrentData(); children = tokenCache.stream();
} else { } else {
children = keyCache.getCurrentData(); children = keyCache.stream();
} }
int count = 0; final AtomicInteger count = new AtomicInteger(0);
for (ChildData child : children) { children.forEach(childData -> {
try { try {
if (isTokenCache) { if (isTokenCache) {
processTokenAddOrUpdate(child); processTokenAddOrUpdate(childData.getData());
} else { } else {
processKeyAddOrUpdate(child.getData()); processKeyAddOrUpdate(childData.getData());
} }
} catch (Exception e) { } catch (Exception e) {
LOG.info("Ignoring node {} because it failed to load.", LOG.info("Ignoring node {} because it failed to load.",
child.getPath()); childData.getPath());
LOG.debug("Failure exception:", e); LOG.debug("Failure exception:", e);
++count; count.getAndIncrement();
} }
} });
if (count > 0) { if (count.get() > 0) {
LOG.warn("Ignored {} nodes while loading {} cache.", count, cacheName); LOG.warn("Ignored {} nodes while loading {} cache.", count.get(),
cacheName);
} }
LOG.info("Loaded {} cache.", cacheName); LOG.info("Loaded {} cache.", cacheName);
} }
@ -398,9 +397,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
DataInputStream din = new DataInputStream(bin); DataInputStream din = new DataInputStream(bin);
DelegationKey key = new DelegationKey(); DelegationKey key = new DelegationKey();
key.readFields(din); key.readFields(din);
synchronized (this) { allKeys.put(key.getKeyId(), key);
allKeys.put(key.getKeyId(), key);
}
} }
private void processKeyRemoved(String path) { private void processKeyRemoved(String path) {
@ -410,15 +407,13 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
int j = tokSeg.indexOf('_'); int j = tokSeg.indexOf('_');
if (j > 0) { if (j > 0) {
int keyId = Integer.parseInt(tokSeg.substring(j + 1)); int keyId = Integer.parseInt(tokSeg.substring(j + 1));
synchronized (this) { allKeys.remove(keyId);
allKeys.remove(keyId);
}
} }
} }
} }
private void processTokenAddOrUpdate(ChildData data) throws IOException { protected TokenIdent processTokenAddOrUpdate(byte[] data) throws IOException {
ByteArrayInputStream bin = new ByteArrayInputStream(data.getData()); ByteArrayInputStream bin = new ByteArrayInputStream(data);
DataInputStream din = new DataInputStream(bin); DataInputStream din = new DataInputStream(bin);
TokenIdent ident = createIdentifier(); TokenIdent ident = createIdentifier();
ident.readFields(din); ident.readFields(din);
@ -429,12 +424,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
if (numRead > -1) { if (numRead > -1) {
DelegationTokenInformation tokenInfo = DelegationTokenInformation tokenInfo =
new DelegationTokenInformation(renewDate, password); new DelegationTokenInformation(renewDate, password);
synchronized (this) { currentTokens.put(ident, tokenInfo);
currentTokens.put(ident, tokenInfo); return ident;
// The cancel task might be waiting
notifyAll();
}
} }
return null;
} }
private void processTokenRemoved(ChildData data) throws IOException { private void processTokenRemoved(ChildData data) throws IOException {
@ -442,11 +435,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
DataInputStream din = new DataInputStream(bin); DataInputStream din = new DataInputStream(bin);
TokenIdent ident = createIdentifier(); TokenIdent ident = createIdentifier();
ident.readFields(din); ident.readFields(din);
synchronized (this) { currentTokens.remove(ident);
currentTokens.remove(ident);
// The cancel task might be waiting
notifyAll();
}
} }
@Override @Override
@ -487,20 +476,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
} catch (Exception e) { } catch (Exception e) {
LOG.error("Could not stop Curator Framework", e); LOG.error("Could not stop Curator Framework", e);
} }
if (listenerThreadPool != null) {
listenerThreadPool.shutdown();
try {
// wait for existing tasks to terminate
if (!listenerThreadPool.awaitTermination(shutdownTimeout,
TimeUnit.MILLISECONDS)) {
LOG.error("Forcing Listener threadPool to shutdown !!");
listenerThreadPool.shutdownNow();
}
} catch (InterruptedException ie) {
listenerThreadPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
} }
private void createPersistentNode(String nodePath) throws Exception { private void createPersistentNode(String nodePath) throws Exception {
@ -647,7 +622,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
* *
* @param ident Identifier of the token * @param ident Identifier of the token
*/ */
private synchronized void syncLocalCacheWithZk(TokenIdent ident) { protected void syncLocalCacheWithZk(TokenIdent ident) {
try { try {
DelegationTokenInformation tokenInfo = getTokenInfoFromZK(ident); DelegationTokenInformation tokenInfo = getTokenInfoFromZK(ident);
if (tokenInfo != null && !currentTokens.containsKey(ident)) { if (tokenInfo != null && !currentTokens.containsKey(ident)) {
@ -661,16 +636,21 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
} }
} }
private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident) protected DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident)
throws IOException { throws IOException {
return getTokenInfoFromZK(ident, false); return getTokenInfoFromZK(ident, false);
} }
private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident, protected DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident,
boolean quiet) throws IOException { boolean quiet) throws IOException {
String nodePath = String nodePath =
getNodePath(ZK_DTSM_TOKENS_ROOT, getNodePath(ZK_DTSM_TOKENS_ROOT,
DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber()); DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber());
return getTokenInfoFromZK(nodePath, quiet);
}
protected DelegationTokenInformation getTokenInfoFromZK(String nodePath,
boolean quiet) throws IOException {
try { try {
byte[] data = zkClient.getData().forPath(nodePath); byte[] data = zkClient.getData().forPath(nodePath);
if ((data == null) || (data.length == 0)) { if ((data == null) || (data.length == 0)) {
@ -805,15 +785,30 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
@Override @Override
protected void removeStoredToken(TokenIdent ident) protected void removeStoredToken(TokenIdent ident)
throws IOException { throws IOException {
removeStoredToken(ident, false);
}
protected void removeStoredToken(TokenIdent ident,
boolean checkAgainstZkBeforeDeletion) throws IOException {
String nodeRemovePath = String nodeRemovePath =
getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
+ ident.getSequenceNumber()); + ident.getSequenceNumber());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing ZKDTSMDelegationToken_"
+ ident.getSequenceNumber());
}
try { try {
if (zkClient.checkExists().forPath(nodeRemovePath) != null) { DelegationTokenInformation dtInfo = getTokenInfoFromZK(ident, true);
if (dtInfo != null) {
// For the case there is no sync or watch miss, it is possible that the
// local storage has expired tokens which have been renewed by peer
// so double check again to avoid accidental delete
if (checkAgainstZkBeforeDeletion
&& dtInfo.getRenewDate() > now()) {
LOG.info("Node already renewed by peer " + nodeRemovePath +
" so this token should not be deleted");
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Removing ZKDTSMDelegationToken_"
+ ident.getSequenceNumber());
}
while(zkClient.checkExists().forPath(nodeRemovePath) != null){ while(zkClient.checkExists().forPath(nodeRemovePath) != null){
try { try {
zkClient.delete().guaranteed().forPath(nodeRemovePath); zkClient.delete().guaranteed().forPath(nodeRemovePath);
@ -836,7 +831,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
} }
@Override @Override
public synchronized TokenIdent cancelToken(Token<TokenIdent> token, public TokenIdent cancelToken(Token<TokenIdent> token,
String canceller) throws IOException { String canceller) throws IOException {
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf); DataInputStream in = new DataInputStream(buf);
@ -847,7 +842,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
return super.cancelToken(token, canceller); return super.cancelToken(token, canceller);
} }
private void addOrUpdateToken(TokenIdent ident, protected void addOrUpdateToken(TokenIdent ident,
DelegationTokenInformation info, boolean isUpdate) throws Exception { DelegationTokenInformation info, boolean isUpdate) throws Exception {
String nodeCreatePath = String nodeCreatePath =
getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
@ -874,6 +869,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
} }
} }
public boolean isTokenWatcherEnabled() {
return isTokenWatcherEnabled;
}
/** /**
* Simple implementation of an {@link ACLProvider} that simply returns an ACL * Simple implementation of an {@link ACLProvider} that simply returns an ACL
* that gives all permissions only to a single principal. * that gives all permissions only to a single principal.
@ -905,11 +904,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
return (root + "/" + nodeName); return (root + "/" + nodeName);
} }
@VisibleForTesting
public ExecutorService getListenerThreadPool() {
return listenerThreadPool;
}
@VisibleForTesting @VisibleForTesting
DelegationTokenInformation getTokenInfoFromMemory(TokenIdent ident) { DelegationTokenInformation getTokenInfoFromMemory(TokenIdent ident) {
return currentTokens.get(ident); return currentTokens.get(ident);

View File

@ -21,8 +21,6 @@ package org.apache.hadoop.security.token.delegation;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.curator.RetryPolicy; import org.apache.curator.RetryPolicy;
@ -59,15 +57,15 @@ public class TestZKDelegationTokenSecretManager {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(TestZKDelegationTokenSecretManager.class); LoggerFactory.getLogger(TestZKDelegationTokenSecretManager.class);
private static final int TEST_RETRIES = 2; protected static final int TEST_RETRIES = 2;
private static final int RETRY_COUNT = 5; protected static final int RETRY_COUNT = 5;
private static final int RETRY_WAIT = 1000; protected static final int RETRY_WAIT = 1000;
private static final long DAY_IN_SECS = 86400; protected static final long DAY_IN_SECS = 86400;
private TestingServer zkServer; protected TestingServer zkServer;
@Rule @Rule
public Timeout globalTimeout = new Timeout(300000); public Timeout globalTimeout = new Timeout(300000);
@ -86,17 +84,17 @@ public class TestZKDelegationTokenSecretManager {
} }
protected Configuration getSecretConf(String connectString) { protected Configuration getSecretConf(String connectString) {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean(DelegationTokenManager.ENABLE_ZK_KEY, true); conf.setBoolean(DelegationTokenManager.ENABLE_ZK_KEY, true);
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString); conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString);
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath"); conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath");
conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none"); conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none");
conf.setLong(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, 100); conf.setLong(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, 100);
conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS); conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS); conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS); conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS);
conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS); conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS);
return conf; return conf;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -317,19 +315,13 @@ public class TestZKDelegationTokenSecretManager {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
protected void verifyDestroy(DelegationTokenManager tm, Configuration conf) protected void verifyDestroy(DelegationTokenManager tm, Configuration conf)
throws Exception { throws Exception {
AbstractDelegationTokenSecretManager sm =
tm.getDelegationTokenSecretManager();
ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager) sm;
ExecutorService es = zksm.getListenerThreadPool();
tm.destroy(); tm.destroy();
Assert.assertTrue(es.isShutdown());
// wait for the pool to terminate // wait for the pool to terminate
long timeout = long timeout =
conf.getLong( conf.getLong(
ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT,
ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT); ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
Thread.sleep(timeout * 3); Thread.sleep(timeout * 3);
Assert.assertTrue(es.isTerminated());
} }
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
@ -353,20 +345,9 @@ public class TestZKDelegationTokenSecretManager {
tm1.init(); tm1.init();
Token<DelegationTokenIdentifier> token = Token<DelegationTokenIdentifier> token =
(Token<DelegationTokenIdentifier>) (Token<DelegationTokenIdentifier>)
tm1.createToken(UserGroupInformation.getCurrentUser(), "foo"); tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token); Assert.assertNotNull(token);
AbstractDelegationTokenSecretManager sm = tm1.getDelegationTokenSecretManager();
ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager)sm;
ExecutorService es = zksm.getListenerThreadPool();
es.submit(new Callable<Void>() {
public Void call() throws Exception {
Thread.sleep(shutdownTimeoutMillis * 2); // force this to be shutdownNow
return null;
}
});
tm1.destroy(); tm1.destroy();
} }
@ -378,7 +359,7 @@ public class TestZKDelegationTokenSecretManager {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
String userPass = "myuser:mypass"; String userPass = "myuser:mypass";
final ACL digestACL = new ACL(ZooDefs.Perms.ALL, new Id("digest", final ACL digestACL = new ACL(ZooDefs.Perms.ALL, new Id("digest",
DigestAuthenticationProvider.generateDigest(userPass))); DigestAuthenticationProvider.generateDigest(userPass)));
ACLProvider digestAclProvider = new ACLProvider() { ACLProvider digestAclProvider = new ACLProvider() {
@Override @Override
public List<ACL> getAclForPath(String path) { return getDefaultAcl(); } public List<ACL> getAclForPath(String path) { return getDefaultAcl(); }
@ -392,12 +373,12 @@ public class TestZKDelegationTokenSecretManager {
}; };
CuratorFramework curatorFramework = CuratorFramework curatorFramework =
CuratorFrameworkFactory.builder() CuratorFrameworkFactory.builder()
.connectString(connectString) .connectString(connectString)
.retryPolicy(retryPolicy) .retryPolicy(retryPolicy)
.aclProvider(digestAclProvider) .aclProvider(digestAclProvider)
.authorization("digest", userPass.getBytes("UTF-8")) .authorization("digest", userPass.getBytes("UTF-8"))
.build(); .build();
curatorFramework.start(); curatorFramework.start();
ZKDelegationTokenSecretManager.setCurator(curatorFramework); ZKDelegationTokenSecretManager.setCurator(curatorFramework);
tm1 = new DelegationTokenManager(conf, new Text("bla")); tm1 = new DelegationTokenManager(conf, new Text("bla"));
@ -425,7 +406,7 @@ public class TestZKDelegationTokenSecretManager {
// cancelled but.. that would mean having to make an RPC call for every // cancelled but.. that would mean having to make an RPC call for every
// verification request. // verification request.
// Thus, the eventual consistency tradef-off should be acceptable here... // Thus, the eventual consistency tradef-off should be acceptable here...
private void verifyTokenFail(DelegationTokenManager tm, protected void verifyTokenFail(DelegationTokenManager tm,
Token<DelegationTokenIdentifier> token) throws IOException, Token<DelegationTokenIdentifier> token) throws IOException,
InterruptedException { InterruptedException {
verifyTokenFailWithRetry(tm, token, RETRY_COUNT); verifyTokenFailWithRetry(tm, token, RETRY_COUNT);

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.registry.client.impl.zk; package org.apache.hadoop.registry.client.impl.zk;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.curator.ensemble.EnsembleProvider; import org.apache.curator.ensemble.EnsembleProvider;
@ -28,9 +31,6 @@ import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CreateBuilder; import org.apache.curator.framework.api.CreateBuilder;
import org.apache.curator.framework.api.DeleteBuilder; import org.apache.curator.framework.api.DeleteBuilder;
import org.apache.curator.framework.api.GetChildrenBuilder; import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.BoundedExponentialBackoffRetry; import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -56,6 +56,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List; import java.util.List;
/** /**
@ -109,9 +110,9 @@ public class CuratorService extends CompositeService
private EnsembleProvider ensembleProvider; private EnsembleProvider ensembleProvider;
/** /**
* Registry tree cache. * Registry Curator cache.
*/ */
private TreeCache treeCache; private CuratorCacheBridge curatorCacheBridge;
/** /**
* Construct the service. * Construct the service.
@ -189,8 +190,8 @@ public class CuratorService extends CompositeService
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
IOUtils.closeStream(curator); IOUtils.closeStream(curator);
if (treeCache != null) { if (curatorCacheBridge != null) {
treeCache.close(); curatorCacheBridge.close();
} }
super.serviceStop(); super.serviceStop();
} }
@ -824,73 +825,54 @@ public class CuratorService extends CompositeService
* *
* @param listener the listener. * @param listener the listener.
* @return a handle allowing for the management of the listener. * @return a handle allowing for the management of the listener.
* @throws Exception if registration fails due to error.
*/ */
public ListenerHandle registerPathListener(final PathListener listener) public ListenerHandle registerPathListener(final PathListener listener) {
throws Exception {
final TreeCacheListener pathChildrenCacheListener = CuratorCacheListener cacheListener = CuratorCacheListener.builder()
new TreeCacheListener() { .forCreatesAndChanges((oldNode, node) -> {
final String path = node.getPath();
public void childEvent(CuratorFramework curatorFramework, LOG.info("Informing listener of added/updated node {}", path);
TreeCacheEvent event) try {
throws Exception { listener.nodeAdded(path);
String path = null; } catch (IOException e) {
if (event != null && event.getData() != null) { LOG.error("Error while processing Curator listener "
path = event.getData().getPath(); + "NODE_CREATED / NODE_CHANGED event");
} throw new UncheckedIOException(e);
assert event != null;
switch (event.getType()) {
case NODE_ADDED:
LOG.info("Informing listener of added node {}", path);
listener.nodeAdded(path);
break;
case NODE_REMOVED:
LOG.info("Informing listener of removed node {}", path);
listener.nodeRemoved(path);
break;
case NODE_UPDATED:
LOG.info("Informing listener of updated node {}", path);
listener.nodeAdded(path);
break;
default:
// do nothing
break;
}
} }
}; })
treeCache.getListenable().addListener(pathChildrenCacheListener); .forDeletes(childData -> {
final String path = childData.getPath();
return new ListenerHandle() { LOG.info("Informing listener of removed node {}", path);
@Override try {
public void remove() { listener.nodeRemoved(path);
treeCache.getListenable().removeListener(pathChildrenCacheListener); } catch (IOException e) {
} LOG.error("Error while processing Curator listener "
}; + "NODE_DELETED event");
throw new UncheckedIOException(e);
}
})
.build();
curatorCacheBridge.listenable().addListener(cacheListener);
return () -> curatorCacheBridge.listenable().removeListener(cacheListener);
} }
// TODO: should caches be stopped and then restarted if need be? // TODO: should caches be stopped and then restarted if need be?
/** /**
* Create the tree cache that monitors the registry for node addition, update, * Instantiate the Curator cache that monitors the registry for node
* and deletion. * addition, update and deletion.
*
* @throws Exception if any issue arises during monitoring.
*/ */
public void monitorRegistryEntries() public void instantiateCacheForRegistry() {
throws Exception {
String registryPath = String registryPath =
getConfig().get(RegistryConstants.KEY_REGISTRY_ZK_ROOT, getConfig().get(RegistryConstants.KEY_REGISTRY_ZK_ROOT,
RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT); RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
treeCache = new TreeCache(curator, registryPath); curatorCacheBridge = CuratorCache.bridgeBuilder(curator, registryPath)
treeCache.start(); .build();
} }
public void startCache() {
curatorCacheBridge.start();
}
} }

View File

@ -106,7 +106,7 @@ public class RegistryDNSServer extends CompositeService {
private void manageRegistryDNS() { private void manageRegistryDNS() {
try { try {
registryOperations.monitorRegistryEntries(); registryOperations.instantiateCacheForRegistry();
registryOperations.registerPathListener(new PathListener() { registryOperations.registerPathListener(new PathListener() {
private String registryRoot = getConfig(). private String registryRoot = getConfig().
get(RegistryConstants.KEY_REGISTRY_ZK_ROOT, get(RegistryConstants.KEY_REGISTRY_ZK_ROOT,
@ -157,6 +157,7 @@ public class RegistryDNSServer extends CompositeService {
} }
}); });
registryOperations.startCache();
// create listener for record deletions // create listener for record deletions

View File

@ -19,13 +19,24 @@
package org.apache.hadoop.hdfs.server.federation.router.security.token; package org.apache.hadoop.hdfs.server.federation.router.security.token;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager; import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
import org.apache.hadoop.util.Time;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/** /**
* Zookeeper based router delegation token store implementation. * Zookeeper based router delegation token store implementation.
@ -33,24 +44,181 @@ import java.io.IOException;
public class ZKDelegationTokenSecretManagerImpl extends public class ZKDelegationTokenSecretManagerImpl extends
ZKDelegationTokenSecretManager<AbstractDelegationTokenIdentifier> { ZKDelegationTokenSecretManager<AbstractDelegationTokenIdentifier> {
public static final String ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL =
ZK_CONF_PREFIX + "router.token.sync.interval";
public static final int ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL_DEFAULT = 5;
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(ZKDelegationTokenSecretManagerImpl.class); LoggerFactory.getLogger(ZKDelegationTokenSecretManagerImpl.class);
private Configuration conf = null; private Configuration conf;
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
// Local cache of delegation tokens, used for deprecating tokens from
// currentTokenMap
private final Set<AbstractDelegationTokenIdentifier> localTokenCache =
new HashSet<>();
// Native zk client for getting all tokens
private ZooKeeper zookeeper;
private final String TOKEN_PATH = "/" + zkClient.getNamespace()
+ ZK_DTSM_TOKENS_ROOT;
// The flag used to issue an extra check before deletion
// Since cancel token and token remover thread use the same
// API here and one router could have a token that is renewed
// by another router, thus token remover should always check ZK
// to confirm whether it has been renewed or not
private ThreadLocal<Boolean> checkAgainstZkBeforeDeletion =
new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return true;
}
};
public ZKDelegationTokenSecretManagerImpl(Configuration conf) { public ZKDelegationTokenSecretManagerImpl(Configuration conf) {
super(conf); super(conf);
this.conf = conf; this.conf = conf;
try { try {
super.startThreads(); startThreads();
} catch (IOException e) { } catch (IOException e) {
LOG.error("Error starting threads for zkDelegationTokens", e); LOG.error("Error starting threads for zkDelegationTokens", e);
} }
LOG.info("Zookeeper delegation token secret manager instantiated"); LOG.info("Zookeeper delegation token secret manager instantiated");
} }
@Override
public void startThreads() throws IOException {
super.startThreads();
// start token cache related work when watcher is disabled
if (!isTokenWatcherEnabled()) {
LOG.info("Watcher for tokens is disabled in this secret manager");
try {
// By default set this variable
checkAgainstZkBeforeDeletion.set(true);
// Ensure the token root path exists
if (zkClient.checkExists().forPath(ZK_DTSM_TOKENS_ROOT) == null) {
zkClient.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(ZK_DTSM_TOKENS_ROOT);
}
// Set up zookeeper client
try {
zookeeper = zkClient.getZookeeperClient().getZooKeeper();
} catch (Exception e) {
LOG.info("Cannot get zookeeper client ", e);
} finally {
if (zookeeper == null) {
throw new IOException("Zookeeper client is null");
}
}
LOG.info("Start loading token cache");
long start = Time.now();
rebuildTokenCache(true);
LOG.info("Loaded token cache in {} milliseconds", Time.now() - start);
int syncInterval = conf.getInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL,
ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL_DEFAULT);
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
rebuildTokenCache(false);
} catch (Exception e) {
// ignore
}
}
}, syncInterval, syncInterval, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error("Error rebuilding local cache for zkDelegationTokens ", e);
}
}
}
@Override
public void stopThreads() {
super.stopThreads();
scheduler.shutdown();
}
@Override @Override
public DelegationTokenIdentifier createIdentifier() { public DelegationTokenIdentifier createIdentifier() {
return new DelegationTokenIdentifier(); return new DelegationTokenIdentifier();
} }
/**
* This function will rebuild local token cache from zk storage.
* It is first called when the secret manager is initialized and
* then regularly at a configured interval.
*
* @param initial whether this is called during initialization
* @throws IOException
*/
private void rebuildTokenCache(boolean initial) throws IOException {
localTokenCache.clear();
// Use bare zookeeper client to get all children since curator will
// wrap the same API with a sorting process. This is time consuming given
// millions of tokens
List<String> zkTokens;
try {
zkTokens = zookeeper.getChildren(TOKEN_PATH, false);
} catch (KeeperException | InterruptedException e) {
throw new IOException("Tokens cannot be fetched from path "
+ TOKEN_PATH, e);
}
byte[] data;
for (String tokenPath : zkTokens) {
try {
data = zkClient.getData().forPath(
ZK_DTSM_TOKENS_ROOT + "/" + tokenPath);
} catch (KeeperException.NoNodeException e) {
LOG.debug("No node in path [" + tokenPath + "]");
continue;
} catch (Exception ex) {
throw new IOException(ex);
}
// Store data to currentTokenMap
AbstractDelegationTokenIdentifier ident = processTokenAddOrUpdate(data);
// Store data to localTokenCache for sync
localTokenCache.add(ident);
}
if (!initial) {
// Sync zkTokens with local cache, specifically
// 1) add/update tokens to local cache from zk, which is done through
// processTokenAddOrUpdate above
// 2) remove tokens in local cache but not in zk anymore
for (AbstractDelegationTokenIdentifier ident : currentTokens.keySet()) {
if (!localTokenCache.contains(ident)) {
currentTokens.remove(ident);
}
}
}
}
@Override
public AbstractDelegationTokenIdentifier cancelToken(
Token<AbstractDelegationTokenIdentifier> token, String canceller)
throws IOException {
checkAgainstZkBeforeDeletion.set(false);
AbstractDelegationTokenIdentifier ident = super.cancelToken(token,
canceller);
checkAgainstZkBeforeDeletion.set(true);
return ident;
}
@Override
protected void removeStoredToken(AbstractDelegationTokenIdentifier ident)
throws IOException {
super.removeStoredToken(ident, checkAgainstZkBeforeDeletion.get());
}
@Override
protected void addOrUpdateToken(AbstractDelegationTokenIdentifier ident,
DelegationTokenInformation info, boolean isUpdate) throws Exception {
// Store the data in local memory first
currentTokens.put(ident, info);
super.addOrUpdateToken(ident, info, isUpdate);
}
} }

View File

@ -0,0 +1,234 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.security.token;
import static org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl.ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL;
import static org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.ZK_DTSM_TOKEN_WATCHER_ENABLED;
import static org.apache.hadoop.security.token.delegation.web.DelegationTokenManager.REMOVAL_SCAN_INTERVAL;
import static org.apache.hadoop.security.token.delegation.web.DelegationTokenManager.RENEW_INTERVAL;
import static org.junit.Assert.fail;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.TestZKDelegationTokenSecretManager;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestZKDelegationTokenSecretManagerImpl
extends TestZKDelegationTokenSecretManager {
private static final Logger LOG =
LoggerFactory.getLogger(TestZKDelegationTokenSecretManagerImpl.class);
@SuppressWarnings("unchecked")
@Test
public void testMultiNodeOperationWithoutWatch() throws Exception {
String connectString = zkServer.getConnectString();
Configuration conf = getSecretConf(connectString);
// disable watch
conf.setBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, false);
conf.setInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL, 3);
for (int i = 0; i < TEST_RETRIES; i++) {
ZKDelegationTokenSecretManagerImpl dtsm1 =
new ZKDelegationTokenSecretManagerImpl(conf);
ZKDelegationTokenSecretManagerImpl dtsm2 =
new ZKDelegationTokenSecretManagerImpl(conf);
DelegationTokenManager tm1, tm2;
tm1 = new DelegationTokenManager(conf, new Text("bla"));
tm1.setExternalDelegationTokenSecretManager(dtsm1);
tm2 = new DelegationTokenManager(conf, new Text("bla"));
tm2.setExternalDelegationTokenSecretManager(dtsm2);
// common token operation without watchers should still be working
Token<DelegationTokenIdentifier> token =
(Token<DelegationTokenIdentifier>) tm1.createToken(
UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token);
tm2.verifyToken(token);
tm2.renewToken(token, "foo");
tm1.verifyToken(token);
tm1.cancelToken(token, "foo");
try {
verifyTokenFail(tm2, token);
fail("Expected InvalidToken");
} catch (SecretManager.InvalidToken it) {
// Ignore
}
token = (Token<DelegationTokenIdentifier>) tm2.createToken(
UserGroupInformation.getCurrentUser(), "bar");
Assert.assertNotNull(token);
tm1.verifyToken(token);
tm1.renewToken(token, "bar");
tm2.verifyToken(token);
tm2.cancelToken(token, "bar");
try {
verifyTokenFail(tm1, token);
fail("Expected InvalidToken");
} catch (SecretManager.InvalidToken it) {
// Ignore
}
dtsm1.stopThreads();
dtsm2.stopThreads();
verifyDestroy(tm1, conf);
verifyDestroy(tm2, conf);
}
}
@Test
public void testMultiNodeTokenRemovalShortSyncWithoutWatch()
throws Exception {
String connectString = zkServer.getConnectString();
Configuration conf = getSecretConf(connectString);
// disable watch
conf.setBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, false);
// make sync quick
conf.setInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL, 3);
// set the renew window and removal interval to be a
// short time to trigger the background cleanup
conf.setInt(RENEW_INTERVAL, 10);
conf.setInt(REMOVAL_SCAN_INTERVAL, 10);
for (int i = 0; i < TEST_RETRIES; i++) {
ZKDelegationTokenSecretManagerImpl dtsm1 =
new ZKDelegationTokenSecretManagerImpl(conf);
ZKDelegationTokenSecretManagerImpl dtsm2 =
new ZKDelegationTokenSecretManagerImpl(conf);
DelegationTokenManager tm1, tm2;
tm1 = new DelegationTokenManager(conf, new Text("bla"));
tm1.setExternalDelegationTokenSecretManager(dtsm1);
tm2 = new DelegationTokenManager(conf, new Text("bla"));
tm2.setExternalDelegationTokenSecretManager(dtsm2);
// time: X
// token expiry time:
// tm1: X + 10
// tm2: X + 10
Token<DelegationTokenIdentifier> token =
(Token<DelegationTokenIdentifier>) tm1.createToken(
UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token);
tm2.verifyToken(token);
// time: X + 9
// token expiry time:
// tm1: X + 10
// tm2: X + 19
Thread.sleep(9 * 1000);
tm2.renewToken(token, "foo");
tm1.verifyToken(token);
// time: X + 13
// token expiry time: (sync happened)
// tm1: X + 19
// tm2: X + 19
Thread.sleep(4 * 1000);
tm1.verifyToken(token);
tm2.verifyToken(token);
dtsm1.stopThreads();
dtsm2.stopThreads();
verifyDestroy(tm1, conf);
verifyDestroy(tm2, conf);
}
}
// This is very unlikely to happen in real case, but worth putting
// the case out
@Test
public void testMultiNodeTokenRemovalLongSyncWithoutWatch()
throws Exception {
String connectString = zkServer.getConnectString();
Configuration conf = getSecretConf(connectString);
// disable watch
conf.setBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, false);
// make sync quick
conf.setInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL, 20);
// set the renew window and removal interval to be a
// short time to trigger the background cleanup
conf.setInt(RENEW_INTERVAL, 10);
conf.setInt(REMOVAL_SCAN_INTERVAL, 10);
for (int i = 0; i < TEST_RETRIES; i++) {
ZKDelegationTokenSecretManagerImpl dtsm1 =
new ZKDelegationTokenSecretManagerImpl(conf);
ZKDelegationTokenSecretManagerImpl dtsm2 =
new ZKDelegationTokenSecretManagerImpl(conf);
ZKDelegationTokenSecretManagerImpl dtsm3 =
new ZKDelegationTokenSecretManagerImpl(conf);
DelegationTokenManager tm1, tm2, tm3;
tm1 = new DelegationTokenManager(conf, new Text("bla"));
tm1.setExternalDelegationTokenSecretManager(dtsm1);
tm2 = new DelegationTokenManager(conf, new Text("bla"));
tm2.setExternalDelegationTokenSecretManager(dtsm2);
tm3 = new DelegationTokenManager(conf, new Text("bla"));
tm3.setExternalDelegationTokenSecretManager(dtsm3);
// time: X
// token expiry time:
// tm1: X + 10
// tm2: X + 10
// tm3: No token due to no sync
Token<DelegationTokenIdentifier> token =
(Token<DelegationTokenIdentifier>) tm1.createToken(
UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token);
tm2.verifyToken(token);
// time: X + 9
// token expiry time:
// tm1: X + 10
// tm2: X + 19
// tm3: No token due to no sync
Thread.sleep(9 * 1000);
long renewalTime = tm2.renewToken(token, "foo");
LOG.info("Renew for token {} at current time {} renewal time {}",
token.getIdentifier(), Time.formatTime(Time.now()),
Time.formatTime(renewalTime));
tm1.verifyToken(token);
// time: X + 13
// token expiry time: (sync din't happen)
// tm1: X + 10
// tm2: X + 19
// tm3: X + 19 due to fetch from zk
Thread.sleep(4 * 1000);
tm2.verifyToken(token);
tm3.verifyToken(token);
dtsm1.stopThreads();
dtsm2.stopThreads();
dtsm3.stopThreads();
verifyDestroy(tm1, conf);
verifyDestroy(tm2, conf);
verifyDestroy(tm3, conf);
}
}
}