From 362a2924d2a61630b5644a2776cf8a6a96b03954 Mon Sep 17 00:00:00 2001 From: Mike Drob Date: Mon, 21 Aug 2017 16:23:27 -0500 Subject: [PATCH] HBASE-18628 Fix event pre-emption in ZKPermWatcher Instead of using an Atomic Reference to data and aborting when we detect that new data comes in, use the native cancellation/pre-emption features of Java Future. Signed-off-by: Andrew Purtell --- .../security/access/ZKPermissionWatcher.java | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java index b4bf510c222..4c37b522620 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java @@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicReference; @@ -55,12 +56,11 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable private static final Log LOG = LogFactory.getLog(ZKPermissionWatcher.class); // parent node for permissions lists static final String ACL_NODE = "acl"; - TableAuthManager authManager; - String aclZNode; - CountDownLatch initialized = new CountDownLatch(1); - AtomicReference> nodes = - new AtomicReference>(null); - ExecutorService executor; + private final TableAuthManager authManager; + private final String aclZNode; + private final CountDownLatch initialized = new CountDownLatch(1); + private final ExecutorService executor; + private Future childrenChangedFuture; public ZKPermissionWatcher(ZooKeeperWatcher watcher, TableAuthManager authManager, Configuration conf) { @@ -83,7 +83,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable List existing = ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); if (existing != null) { - refreshNodes(existing, null); + refreshNodes(existing); } return null; } @@ -127,7 +127,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable try { List nodes = ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); - refreshNodes(nodes, null); + refreshNodes(nodes); } catch (KeeperException ke) { LOG.error("Error reading data from zookeeper", ke); // only option is to abort @@ -185,37 +185,36 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable waitUntilStarted(); if (path.equals(aclZNode)) { try { - List nodeList = + final List nodeList = ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); - while (!nodes.compareAndSet(null, nodeList)) { - try { - Thread.sleep(20); - } catch (InterruptedException e) { - LOG.warn("Interrupted while setting node list", e); - Thread.currentThread().interrupt(); + // preempt any existing nodeChildrenChanged event processing + if (childrenChangedFuture != null && !childrenChangedFuture.isDone()) { + boolean cancelled = childrenChangedFuture.cancel(true); + if (!cancelled) { + // task may have finished between our check and attempted cancel, this is fine. + if (! childrenChangedFuture.isDone()) { + LOG.warn("Could not cancel processing node children changed event, " + + "please file a JIRA and attach logs if possible."); + } } } + childrenChangedFuture = asyncProcessNodeUpdate(new Runnable() { + @Override + public void run() { + refreshNodes(nodeList); + } + }); } catch (KeeperException ke) { LOG.error("Error reading data from zookeeper for path "+path, ke); watcher.abort("Zookeeper error get node children for path "+path, ke); } - asyncProcessNodeUpdate(new Runnable() { - // allows subsequent nodeChildrenChanged event to preempt current processing of - // nodeChildrenChanged event - @Override - public void run() { - List nodeList = nodes.get(); - nodes.set(null); - refreshNodes(nodeList, nodes); - } - }); } } - private void asyncProcessNodeUpdate(Runnable runnable) { + private Future asyncProcessNodeUpdate(Runnable runnable) { if (!executor.isShutdown()) { try { - executor.submit(runnable); + return executor.submit(runnable); } catch (RejectedExecutionException e) { if (executor.isShutdown()) { LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown"); @@ -224,12 +223,13 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable } } } + return null; // No task launched so there will be nothing to cancel later } - private void refreshNodes(List nodes, AtomicReference ref) { + private void refreshNodes(List nodes) { for (ZKUtil.NodeAndData n : nodes) { - if (ref != null && ref.get() != null) { - // there is a newer list + if (Thread.interrupted()) { + // Use Thread.interrupted so that we clear interrupt status break; } if (n.isEmpty()) continue;