diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index 1de786ebdc7..08d18218997 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -23,13 +23,13 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; import javax.security.auth.login.AppConfigurationEntry; @@ -40,10 +40,9 @@ import org.apache.curator.framework.CuratorFrameworkFactory.Builder; import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.imps.DefaultACLProvider; import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.framework.recipes.shared.SharedCount; import org.apache.curator.framework.recipes.shared.VersionedValue; import org.apache.curator.retry.RetryNTimes; @@ -113,7 +112,7 @@ public abstract class ZKDelegationTokenSecretManager { + try { + processKeyAddOrUpdate(node.getData()); + } catch (IOException e) { + LOG.error("Error while processing Curator keyCacheListener " + + "NODE_CREATED / NODE_CHANGED event"); + throw new UncheckedIOException(e); } - } - }, listenerThreadPool); - loadFromZKCache(false); - } + }) + .forDeletes(childData -> processKeyRemoved(childData.getPath())) + .build(); + keyCache.listenable().addListener(keyCacheListener); + keyCache.start(); + loadFromZKCache(false); } catch (Exception e) { - throw new IOException("Could not start PathChildrenCache for keys", e); + throw new IOException("Could not start Curator keyCacheListener for keys", + e); } if (isTokenWatcherEnabled) { LOG.info("TokenCache is enabled"); try { - tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true); - if (tokenCache != null) { - tokenCache.start(StartMode.BUILD_INITIAL_CACHE); - tokenCache.getListenable().addListener(new PathChildrenCacheListener() { - - @Override - public void childEvent(CuratorFramework client, - PathChildrenCacheEvent event) throws Exception { - switch (event.getType()) { - case CHILD_ADDED: - processTokenAddOrUpdate(event.getData().getData()); - break; - case CHILD_UPDATED: - processTokenAddOrUpdate(event.getData().getData()); - break; - case CHILD_REMOVED: - processTokenRemoved(event.getData()); - break; - default: - break; + tokenCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_TOKENS_ROOT) + .build(); + CuratorCacheListener tokenCacheListener = CuratorCacheListener.builder() + .forCreatesAndChanges((oldNode, node) -> { + try { + processTokenAddOrUpdate(node.getData()); + } catch (IOException e) { + LOG.error("Error while processing Curator tokenCacheListener " + + "NODE_CREATED / NODE_CHANGED event"); + throw new UncheckedIOException(e); } - } - }, listenerThreadPool); - loadFromZKCache(true); - } + }) + .forDeletes(childData -> { + try { + processTokenRemoved(childData); + } catch (IOException e) { + LOG.error("Error while processing Curator tokenCacheListener " + + "NODE_DELETED event"); + throw new UncheckedIOException(e); + } + }) + .build(); + tokenCache.listenable().addListener(tokenCacheListener); + tokenCache.start(); + loadFromZKCache(true); } catch (Exception e) { - throw new IOException("Could not start PathChildrenCache for tokens", e); + throw new IOException( + "Could not start Curator tokenCacheListener for tokens", e); } } super.startThreads(); } /** - * Load the PathChildrenCache into the in-memory map. Possible caches to be + * Load the CuratorCache into the in-memory map. Possible caches to be * loaded are keyCache and tokenCache. * * @param isTokenCache true if loading tokenCache, false if loading keyCache. @@ -435,33 +423,34 @@ public abstract class ZKDelegationTokenSecretManager children; + final Stream children; if (isTokenCache) { - children = tokenCache.getCurrentData(); + children = tokenCache.stream(); } else { - children = keyCache.getCurrentData(); + children = keyCache.stream(); } - int count = 0; - for (ChildData child : children) { + final AtomicInteger count = new AtomicInteger(0); + children.forEach(childData -> { try { if (isTokenCache) { - processTokenAddOrUpdate(child.getData()); + processTokenAddOrUpdate(childData.getData()); } else { - processKeyAddOrUpdate(child.getData()); + processKeyAddOrUpdate(childData.getData()); } } catch (Exception e) { LOG.info("Ignoring node {} because it failed to load.", - child.getPath()); + childData.getPath()); LOG.debug("Failure exception:", e); - ++count; + count.getAndIncrement(); } - } + }); if (isTokenCache) { syncTokenOwnerStats(); } - if (count > 0) { - LOG.warn("Ignored {} nodes while loading {} cache.", count, cacheName); + if (count.get() > 0) { + LOG.warn("Ignored {} nodes while loading {} cache.", count.get(), + cacheName); } LOG.info("Loaded {} cache.", cacheName); } @@ -550,20 +539,6 @@ public abstract class ZKDelegationTokenSecretManager) tm1.createToken(UserGroupInformation.getCurrentUser(), "foo"); Assert.assertNotNull(token); - - AbstractDelegationTokenSecretManager sm = tm1.getDelegationTokenSecretManager(); - ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager)sm; - ExecutorService es = zksm.getListenerThreadPool(); - es.submit(new Callable() { - public Void call() throws Exception { - Thread.sleep(shutdownTimeoutMillis * 2); // force this to be shutdownNow - return null; - } - }); - tm1.destroy(); } diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java index a01b7151b69..3457fa28634 100644 --- a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java @@ -18,6 +18,9 @@ package org.apache.hadoop.registry.client.impl.zk; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.curator.ensemble.EnsembleProvider; @@ -28,9 +31,6 @@ import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CreateBuilder; import org.apache.curator.framework.api.DeleteBuilder; import org.apache.curator.framework.api.GetChildrenBuilder; -import org.apache.curator.framework.recipes.cache.TreeCache; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.curator.retry.BoundedExponentialBackoffRetry; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -56,6 +56,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.List; /** @@ -109,9 +110,9 @@ public class CuratorService extends CompositeService private EnsembleProvider ensembleProvider; /** - * Registry tree cache. + * Registry Curator cache. */ - private TreeCache treeCache; + private CuratorCacheBridge curatorCacheBridge; /** * Construct the service. @@ -189,8 +190,8 @@ public class CuratorService extends CompositeService protected void serviceStop() throws Exception { IOUtils.closeStream(curator); - if (treeCache != null) { - treeCache.close(); + if (curatorCacheBridge != null) { + curatorCacheBridge.close(); } super.serviceStop(); } @@ -824,73 +825,54 @@ public class CuratorService extends CompositeService * * @param listener the listener. * @return a handle allowing for the management of the listener. - * @throws Exception if registration fails due to error. */ - public ListenerHandle registerPathListener(final PathListener listener) - throws Exception { + public ListenerHandle registerPathListener(final PathListener listener) { - final TreeCacheListener pathChildrenCacheListener = - new TreeCacheListener() { - - public void childEvent(CuratorFramework curatorFramework, - TreeCacheEvent event) - throws Exception { - String path = null; - if (event != null && event.getData() != null) { - path = event.getData().getPath(); - } - assert event != null; - switch (event.getType()) { - case NODE_ADDED: - LOG.info("Informing listener of added node {}", path); - listener.nodeAdded(path); - - break; - - case NODE_REMOVED: - LOG.info("Informing listener of removed node {}", path); - listener.nodeRemoved(path); - - break; - - case NODE_UPDATED: - LOG.info("Informing listener of updated node {}", path); - listener.nodeAdded(path); - - break; - - default: - // do nothing - break; - - } + CuratorCacheListener cacheListener = CuratorCacheListener.builder() + .forCreatesAndChanges((oldNode, node) -> { + final String path = node.getPath(); + LOG.info("Informing listener of added/updated node {}", path); + try { + listener.nodeAdded(path); + } catch (IOException e) { + LOG.error("Error while processing Curator listener " + + "NODE_CREATED / NODE_CHANGED event"); + throw new UncheckedIOException(e); } - }; - treeCache.getListenable().addListener(pathChildrenCacheListener); - - return new ListenerHandle() { - @Override - public void remove() { - treeCache.getListenable().removeListener(pathChildrenCacheListener); - } - }; + }) + .forDeletes(childData -> { + final String path = childData.getPath(); + LOG.info("Informing listener of removed node {}", path); + try { + listener.nodeRemoved(path); + } catch (IOException e) { + LOG.error("Error while processing Curator listener " + + "NODE_DELETED event"); + throw new UncheckedIOException(e); + } + }) + .build(); + curatorCacheBridge.listenable().addListener(cacheListener); + return () -> curatorCacheBridge.listenable().removeListener(cacheListener); } // TODO: should caches be stopped and then restarted if need be? /** - * Create the tree cache that monitors the registry for node addition, update, - * and deletion. - * - * @throws Exception if any issue arises during monitoring. + * Instantiate the Curator cache that monitors the registry for node + * addition, update and deletion. */ - public void monitorRegistryEntries() - throws Exception { + public void instantiateCacheForRegistry() { String registryPath = getConfig().get(RegistryConstants.KEY_REGISTRY_ZK_ROOT, RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT); - treeCache = new TreeCache(curator, registryPath); - treeCache.start(); + curatorCacheBridge = CuratorCache.bridgeBuilder(curator, registryPath) + .build(); } + + public void startCache() { + curatorCacheBridge.start(); + } + } diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java index 1ff5f26b472..8d0a38cfd47 100644 --- a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java @@ -106,7 +106,7 @@ public class RegistryDNSServer extends CompositeService { private void manageRegistryDNS() { try { - registryOperations.monitorRegistryEntries(); + registryOperations.instantiateCacheForRegistry(); registryOperations.registerPathListener(new PathListener() { private String registryRoot = getConfig(). get(RegistryConstants.KEY_REGISTRY_ZK_ROOT, @@ -157,6 +157,7 @@ public class RegistryDNSServer extends CompositeService { } }); + registryOperations.startCache(); // create listener for record deletions