HADOOP-17835. Use CuratorCache implementation instead of PathChildrenCache / TreeCache (#3266)

Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
Viraj Jasani 2021-08-07 07:50:35 +05:30 committed by GitHub
parent 4fd97e01e5
commit 23e2a0b202
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 113 additions and 179 deletions

View File

@ -23,13 +23,13 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.Executors; import java.util.stream.Stream;
import java.util.concurrent.TimeUnit;
import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.AppConfigurationEntry;
@ -40,10 +40,9 @@ import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.DefaultACLProvider; import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.shared.SharedCount; import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.VersionedValue; import org.apache.curator.framework.recipes.shared.VersionedValue;
import org.apache.curator.retry.RetryNTimes; import org.apache.curator.retry.RetryNTimes;
@ -113,7 +112,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
// by default it is still incrementing seq number by 1 each time // by default it is still incrementing seq number by 1 each time
public static final int ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT = 1; public static final int ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT = 1;
private static Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory
.getLogger(ZKDelegationTokenSecretManager.class); .getLogger(ZKDelegationTokenSecretManager.class);
private static final String JAAS_LOGIN_ENTRY_NAME = private static final String JAAS_LOGIN_ENTRY_NAME =
@ -139,10 +138,8 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
protected final CuratorFramework zkClient; protected final CuratorFramework zkClient;
private SharedCount delTokSeqCounter; private SharedCount delTokSeqCounter;
private SharedCount keyIdSeqCounter; private SharedCount keyIdSeqCounter;
private PathChildrenCache keyCache; private CuratorCacheBridge keyCache;
private PathChildrenCache tokenCache; private CuratorCacheBridge tokenCache;
private ExecutorService listenerThreadPool;
private final long shutdownTimeout;
private final int seqNumBatchSize; private final int seqNumBatchSize;
private int currentSeqNum; private int currentSeqNum;
private int currentMaxSeqNum; private int currentMaxSeqNum;
@ -158,8 +155,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
DelegationTokenManager.RENEW_INTERVAL_DEFAULT) * 1000, DelegationTokenManager.RENEW_INTERVAL_DEFAULT) * 1000,
conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL,
DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000); DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000);
shutdownTimeout = conf.getLong(ZK_DTSM_ZK_SHUTDOWN_TIMEOUT,
ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
seqNumBatchSize = conf.getInt(ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE, seqNumBatchSize = conf.getInt(ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT); ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED,
@ -333,7 +328,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
throw new IOException("Could not create namespace", e); throw new IOException("Could not create namespace", e);
} }
} }
listenerThreadPool = Executors.newSingleThreadExecutor();
try { try {
delTokSeqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0); delTokSeqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0);
if (delTokSeqCounter != null) { if (delTokSeqCounter != null) {
@ -363,71 +357,65 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
throw new RuntimeException("Could not create ZK paths"); throw new RuntimeException("Could not create ZK paths");
} }
try { try {
keyCache = new PathChildrenCache(zkClient, ZK_DTSM_MASTER_KEY_ROOT, true); keyCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_MASTER_KEY_ROOT)
if (keyCache != null) { .build();
keyCache.start(StartMode.BUILD_INITIAL_CACHE); CuratorCacheListener keyCacheListener = CuratorCacheListener.builder()
keyCache.getListenable().addListener(new PathChildrenCacheListener() { .forCreatesAndChanges((oldNode, node) -> {
@Override try {
public void childEvent(CuratorFramework client, processKeyAddOrUpdate(node.getData());
PathChildrenCacheEvent event) } catch (IOException e) {
throws Exception { LOG.error("Error while processing Curator keyCacheListener "
switch (event.getType()) { + "NODE_CREATED / NODE_CHANGED event");
case CHILD_ADDED: throw new UncheckedIOException(e);
processKeyAddOrUpdate(event.getData().getData());
break;
case CHILD_UPDATED:
processKeyAddOrUpdate(event.getData().getData());
break;
case CHILD_REMOVED:
processKeyRemoved(event.getData().getPath());
break;
default:
break;
} }
} })
}, listenerThreadPool); .forDeletes(childData -> processKeyRemoved(childData.getPath()))
loadFromZKCache(false); .build();
} keyCache.listenable().addListener(keyCacheListener);
keyCache.start();
loadFromZKCache(false);
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Could not start PathChildrenCache for keys", e); throw new IOException("Could not start Curator keyCacheListener for keys",
e);
} }
if (isTokenWatcherEnabled) { if (isTokenWatcherEnabled) {
LOG.info("TokenCache is enabled"); LOG.info("TokenCache is enabled");
try { try {
tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true); tokenCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_TOKENS_ROOT)
if (tokenCache != null) { .build();
tokenCache.start(StartMode.BUILD_INITIAL_CACHE); CuratorCacheListener tokenCacheListener = CuratorCacheListener.builder()
tokenCache.getListenable().addListener(new PathChildrenCacheListener() { .forCreatesAndChanges((oldNode, node) -> {
try {
@Override processTokenAddOrUpdate(node.getData());
public void childEvent(CuratorFramework client, } catch (IOException e) {
PathChildrenCacheEvent event) throws Exception { LOG.error("Error while processing Curator tokenCacheListener "
switch (event.getType()) { + "NODE_CREATED / NODE_CHANGED event");
case CHILD_ADDED: throw new UncheckedIOException(e);
processTokenAddOrUpdate(event.getData().getData());
break;
case CHILD_UPDATED:
processTokenAddOrUpdate(event.getData().getData());
break;
case CHILD_REMOVED:
processTokenRemoved(event.getData());
break;
default:
break;
} }
} })
}, listenerThreadPool); .forDeletes(childData -> {
loadFromZKCache(true); try {
} processTokenRemoved(childData);
} catch (IOException e) {
LOG.error("Error while processing Curator tokenCacheListener "
+ "NODE_DELETED event");
throw new UncheckedIOException(e);
}
})
.build();
tokenCache.listenable().addListener(tokenCacheListener);
tokenCache.start();
loadFromZKCache(true);
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Could not start PathChildrenCache for tokens", e); throw new IOException(
"Could not start Curator tokenCacheListener for tokens", e);
} }
} }
super.startThreads(); super.startThreads();
} }
/** /**
* Load the PathChildrenCache into the in-memory map. Possible caches to be * Load the CuratorCache into the in-memory map. Possible caches to be
* loaded are keyCache and tokenCache. * loaded are keyCache and tokenCache.
* *
* @param isTokenCache true if loading tokenCache, false if loading keyCache. * @param isTokenCache true if loading tokenCache, false if loading keyCache.
@ -435,33 +423,34 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
private void loadFromZKCache(final boolean isTokenCache) { private void loadFromZKCache(final boolean isTokenCache) {
final String cacheName = isTokenCache ? "token" : "key"; final String cacheName = isTokenCache ? "token" : "key";
LOG.info("Starting to load {} cache.", cacheName); LOG.info("Starting to load {} cache.", cacheName);
final List<ChildData> children; final Stream<ChildData> children;
if (isTokenCache) { if (isTokenCache) {
children = tokenCache.getCurrentData(); children = tokenCache.stream();
} else { } else {
children = keyCache.getCurrentData(); children = keyCache.stream();
} }
int count = 0; final AtomicInteger count = new AtomicInteger(0);
for (ChildData child : children) { children.forEach(childData -> {
try { try {
if (isTokenCache) { if (isTokenCache) {
processTokenAddOrUpdate(child.getData()); processTokenAddOrUpdate(childData.getData());
} else { } else {
processKeyAddOrUpdate(child.getData()); processKeyAddOrUpdate(childData.getData());
} }
} catch (Exception e) { } catch (Exception e) {
LOG.info("Ignoring node {} because it failed to load.", LOG.info("Ignoring node {} because it failed to load.",
child.getPath()); childData.getPath());
LOG.debug("Failure exception:", e); LOG.debug("Failure exception:", e);
++count; count.getAndIncrement();
} }
} });
if (isTokenCache) { if (isTokenCache) {
syncTokenOwnerStats(); syncTokenOwnerStats();
} }
if (count > 0) { if (count.get() > 0) {
LOG.warn("Ignored {} nodes while loading {} cache.", count, cacheName); LOG.warn("Ignored {} nodes while loading {} cache.", count.get(),
cacheName);
} }
LOG.info("Loaded {} cache.", cacheName); LOG.info("Loaded {} cache.", cacheName);
} }
@ -550,20 +539,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
} catch (Exception e) { } catch (Exception e) {
LOG.error("Could not stop Curator Framework", e); LOG.error("Could not stop Curator Framework", e);
} }
if (listenerThreadPool != null) {
listenerThreadPool.shutdown();
try {
// wait for existing tasks to terminate
if (!listenerThreadPool.awaitTermination(shutdownTimeout,
TimeUnit.MILLISECONDS)) {
LOG.error("Forcing Listener threadPool to shutdown !!");
listenerThreadPool.shutdownNow();
}
} catch (InterruptedException ie) {
listenerThreadPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
} }
private void createPersistentNode(String nodePath) throws Exception { private void createPersistentNode(String nodePath) throws Exception {
@ -992,11 +967,6 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
return (root + "/" + nodeName); return (root + "/" + nodeName);
} }
@VisibleForTesting
public ExecutorService getListenerThreadPool() {
return listenerThreadPool;
}
@VisibleForTesting @VisibleForTesting
DelegationTokenInformation getTokenInfoFromMemory(TokenIdent ident) { DelegationTokenInformation getTokenInfoFromMemory(TokenIdent ident) {
return currentTokens.get(ident); return currentTokens.get(ident);

View File

@ -21,8 +21,6 @@ package org.apache.hadoop.security.token.delegation;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -318,19 +316,13 @@ public class TestZKDelegationTokenSecretManager {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
protected void verifyDestroy(DelegationTokenManager tm, Configuration conf) protected void verifyDestroy(DelegationTokenManager tm, Configuration conf)
throws Exception { throws Exception {
AbstractDelegationTokenSecretManager sm =
tm.getDelegationTokenSecretManager();
ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager) sm;
ExecutorService es = zksm.getListenerThreadPool();
tm.destroy(); tm.destroy();
Assert.assertTrue(es.isShutdown());
// wait for the pool to terminate // wait for the pool to terminate
long timeout = long timeout =
conf.getLong( conf.getLong(
ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT,
ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT); ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
Thread.sleep(timeout * 3); Thread.sleep(timeout * 3);
Assert.assertTrue(es.isTerminated());
} }
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
@ -357,17 +349,6 @@ public class TestZKDelegationTokenSecretManager {
(Token<DelegationTokenIdentifier>) (Token<DelegationTokenIdentifier>)
tm1.createToken(UserGroupInformation.getCurrentUser(), "foo"); tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token); Assert.assertNotNull(token);
AbstractDelegationTokenSecretManager sm = tm1.getDelegationTokenSecretManager();
ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager)sm;
ExecutorService es = zksm.getListenerThreadPool();
es.submit(new Callable<Void>() {
public Void call() throws Exception {
Thread.sleep(shutdownTimeoutMillis * 2); // force this to be shutdownNow
return null;
}
});
tm1.destroy(); tm1.destroy();
} }

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.registry.client.impl.zk; package org.apache.hadoop.registry.client.impl.zk;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.curator.ensemble.EnsembleProvider; import org.apache.curator.ensemble.EnsembleProvider;
@ -28,9 +31,6 @@ import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CreateBuilder; import org.apache.curator.framework.api.CreateBuilder;
import org.apache.curator.framework.api.DeleteBuilder; import org.apache.curator.framework.api.DeleteBuilder;
import org.apache.curator.framework.api.GetChildrenBuilder; import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.BoundedExponentialBackoffRetry; import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -56,6 +56,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List; import java.util.List;
/** /**
@ -109,9 +110,9 @@ public class CuratorService extends CompositeService
private EnsembleProvider ensembleProvider; private EnsembleProvider ensembleProvider;
/** /**
* Registry tree cache. * Registry Curator cache.
*/ */
private TreeCache treeCache; private CuratorCacheBridge curatorCacheBridge;
/** /**
* Construct the service. * Construct the service.
@ -189,8 +190,8 @@ public class CuratorService extends CompositeService
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
IOUtils.closeStream(curator); IOUtils.closeStream(curator);
if (treeCache != null) { if (curatorCacheBridge != null) {
treeCache.close(); curatorCacheBridge.close();
} }
super.serviceStop(); super.serviceStop();
} }
@ -824,73 +825,54 @@ public class CuratorService extends CompositeService
* *
* @param listener the listener. * @param listener the listener.
* @return a handle allowing for the management of the listener. * @return a handle allowing for the management of the listener.
* @throws Exception if registration fails due to error.
*/ */
public ListenerHandle registerPathListener(final PathListener listener) public ListenerHandle registerPathListener(final PathListener listener) {
throws Exception {
final TreeCacheListener pathChildrenCacheListener = CuratorCacheListener cacheListener = CuratorCacheListener.builder()
new TreeCacheListener() { .forCreatesAndChanges((oldNode, node) -> {
final String path = node.getPath();
public void childEvent(CuratorFramework curatorFramework, LOG.info("Informing listener of added/updated node {}", path);
TreeCacheEvent event) try {
throws Exception { listener.nodeAdded(path);
String path = null; } catch (IOException e) {
if (event != null && event.getData() != null) { LOG.error("Error while processing Curator listener "
path = event.getData().getPath(); + "NODE_CREATED / NODE_CHANGED event");
} throw new UncheckedIOException(e);
assert event != null;
switch (event.getType()) {
case NODE_ADDED:
LOG.info("Informing listener of added node {}", path);
listener.nodeAdded(path);
break;
case NODE_REMOVED:
LOG.info("Informing listener of removed node {}", path);
listener.nodeRemoved(path);
break;
case NODE_UPDATED:
LOG.info("Informing listener of updated node {}", path);
listener.nodeAdded(path);
break;
default:
// do nothing
break;
}
} }
}; })
treeCache.getListenable().addListener(pathChildrenCacheListener); .forDeletes(childData -> {
final String path = childData.getPath();
return new ListenerHandle() { LOG.info("Informing listener of removed node {}", path);
@Override try {
public void remove() { listener.nodeRemoved(path);
treeCache.getListenable().removeListener(pathChildrenCacheListener); } catch (IOException e) {
} LOG.error("Error while processing Curator listener "
}; + "NODE_DELETED event");
throw new UncheckedIOException(e);
}
})
.build();
curatorCacheBridge.listenable().addListener(cacheListener);
return () -> curatorCacheBridge.listenable().removeListener(cacheListener);
} }
// TODO: should caches be stopped and then restarted if need be? // TODO: should caches be stopped and then restarted if need be?
/** /**
* Create the tree cache that monitors the registry for node addition, update, * Instantiate the Curator cache that monitors the registry for node
* and deletion. * addition, update and deletion.
*
* @throws Exception if any issue arises during monitoring.
*/ */
public void monitorRegistryEntries() public void instantiateCacheForRegistry() {
throws Exception {
String registryPath = String registryPath =
getConfig().get(RegistryConstants.KEY_REGISTRY_ZK_ROOT, getConfig().get(RegistryConstants.KEY_REGISTRY_ZK_ROOT,
RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT); RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
treeCache = new TreeCache(curator, registryPath); curatorCacheBridge = CuratorCache.bridgeBuilder(curator, registryPath)
treeCache.start(); .build();
} }
public void startCache() {
curatorCacheBridge.start();
}
} }

View File

@ -106,7 +106,7 @@ public class RegistryDNSServer extends CompositeService {
private void manageRegistryDNS() { private void manageRegistryDNS() {
try { try {
registryOperations.monitorRegistryEntries(); registryOperations.instantiateCacheForRegistry();
registryOperations.registerPathListener(new PathListener() { registryOperations.registerPathListener(new PathListener() {
private String registryRoot = getConfig(). private String registryRoot = getConfig().
get(RegistryConstants.KEY_REGISTRY_ZK_ROOT, get(RegistryConstants.KEY_REGISTRY_ZK_ROOT,
@ -157,6 +157,7 @@ public class RegistryDNSServer extends CompositeService {
} }
}); });
registryOperations.startCache();
// create listener for record deletions // create listener for record deletions