diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java index 3324b90a3d0..fc94e067561 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java @@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicReference; @@ -55,11 +56,11 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable private static final Log LOG = LogFactory.getLog(ZKPermissionWatcher.class); // parent node for permissions lists static final String ACL_NODE = "acl"; - TableAuthManager authManager; - String aclZNode; - CountDownLatch initialized = new CountDownLatch(1); - AtomicReference> nodes = new AtomicReference<>(null); - ExecutorService executor; + private final TableAuthManager authManager; + private final String aclZNode; + private final CountDownLatch initialized = new CountDownLatch(1); + private final ExecutorService executor; + private Future childrenChangedFuture; public ZKPermissionWatcher(ZooKeeperWatcher watcher, TableAuthManager authManager, Configuration conf) { @@ -82,7 +83,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable List existing = ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); if (existing != null) { - refreshNodes(existing, null); + refreshNodes(existing); } return null; } @@ -126,7 +127,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable try { List nodes = ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); - refreshNodes(nodes, null); + refreshNodes(nodes); } catch (KeeperException ke) { LOG.error("Error reading data from zookeeper", ke); // only option is to abort @@ -179,42 +180,37 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable } } + @Override public void nodeChildrenChanged(final String path) { waitUntilStarted(); if (path.equals(aclZNode)) { try { - List nodeList = + final List nodeList = ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); - while (!nodes.compareAndSet(null, nodeList)) { - try { - Thread.sleep(20); - } catch (InterruptedException e) { - LOG.warn("Interrupted while setting node list", e); - Thread.currentThread().interrupt(); + // preempt any existing nodeChildrenChanged event processing + if (childrenChangedFuture != null && !childrenChangedFuture.isDone()) { + boolean cancelled = childrenChangedFuture.cancel(true); + if (!cancelled) { + // task may have finished between our check and attempted cancel, this is fine. + if (! childrenChangedFuture.isDone()) { + LOG.warn("Could not cancel processing node children changed event, " + + "please file a JIRA and attach logs if possible."); + } } } + childrenChangedFuture = asyncProcessNodeUpdate(() -> refreshNodes(nodeList)); } catch (KeeperException ke) { LOG.error("Error reading data from zookeeper for path "+path, ke); watcher.abort("ZooKeeper error get node children for path "+path, ke); } - asyncProcessNodeUpdate(new Runnable() { - // allows subsequent nodeChildrenChanged event to preempt current processing of - // nodeChildrenChanged event - @Override - public void run() { - List nodeList = nodes.get(); - nodes.set(null); - refreshNodes(nodeList, nodes); - } - }); } } - private void asyncProcessNodeUpdate(Runnable runnable) { + private Future asyncProcessNodeUpdate(Runnable runnable) { if (!executor.isShutdown()) { try { - executor.submit(runnable); + return executor.submit(runnable); } catch (RejectedExecutionException e) { if (executor.isShutdown()) { LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown"); @@ -223,12 +219,13 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable } } } + return null; // No task launched so there will be nothing to cancel later } - private void refreshNodes(List nodes, AtomicReference ref) { + private void refreshNodes(List nodes) { for (ZKUtil.NodeAndData n : nodes) { - if (ref != null && ref.get() != null) { - // there is a newer list + if (Thread.interrupted()) { + // Use Thread.interrupted so that we clear interrupt status break; } if (n.isEmpty()) continue; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java similarity index 99% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java index cb36246e73a..76de0c6048c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java @@ -44,8 +44,8 @@ import org.junit.experimental.categories.Category; * Test the reading and writing of access permissions to and from zookeeper. */ @Category({SecurityTests.class, LargeTests.class}) -public class TestZKPermissionsWatcher { - private static final Log LOG = LogFactory.getLog(TestZKPermissionsWatcher.class); +public class TestZKPermissionWatcher { + private static final Log LOG = LogFactory.getLog(TestZKPermissionWatcher.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static TableAuthManager AUTH_A; private static TableAuthManager AUTH_B; @@ -91,7 +91,7 @@ public class TestZKPermissionsWatcher { Configuration conf = UTIL.getConfiguration(); User george = User.createUserForTesting(conf, "george", new String[] { }); User hubert = User.createUserForTesting(conf, "hubert", new String[] { }); - + assertFalse(AUTH_A.authorizeUser(george, TEST_TABLE, null, TablePermission.Action.READ)); assertFalse(AUTH_A.authorizeUser(george, TEST_TABLE, null,