HBASE-18628 Fix event pre-emption in ZKPermWatcher

Instead of using an Atomic Reference to data and aborting when we detect
that new data comes in, use the native cancellation/pre-emption features
of Java Future.
This commit is contained in:
Mike Drob 2017-08-18 11:11:46 -05:00
parent 1653b54de9
commit d159478956
2 changed files with 29 additions and 32 deletions

View File

@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -55,11 +56,11 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
private static final Log LOG = LogFactory.getLog(ZKPermissionWatcher.class); private static final Log LOG = LogFactory.getLog(ZKPermissionWatcher.class);
// parent node for permissions lists // parent node for permissions lists
static final String ACL_NODE = "acl"; static final String ACL_NODE = "acl";
TableAuthManager authManager; private final TableAuthManager authManager;
String aclZNode; private final String aclZNode;
CountDownLatch initialized = new CountDownLatch(1); private final CountDownLatch initialized = new CountDownLatch(1);
AtomicReference<List<ZKUtil.NodeAndData>> nodes = new AtomicReference<>(null); private final ExecutorService executor;
ExecutorService executor; private Future<?> childrenChangedFuture;
public ZKPermissionWatcher(ZooKeeperWatcher watcher, public ZKPermissionWatcher(ZooKeeperWatcher watcher,
TableAuthManager authManager, Configuration conf) { TableAuthManager authManager, Configuration conf) {
@ -82,7 +83,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
List<ZKUtil.NodeAndData> existing = List<ZKUtil.NodeAndData> existing =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
if (existing != null) { if (existing != null) {
refreshNodes(existing, null); refreshNodes(existing);
} }
return null; return null;
} }
@ -126,7 +127,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
try { try {
List<ZKUtil.NodeAndData> nodes = List<ZKUtil.NodeAndData> nodes =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
refreshNodes(nodes, null); refreshNodes(nodes);
} catch (KeeperException ke) { } catch (KeeperException ke) {
LOG.error("Error reading data from zookeeper", ke); LOG.error("Error reading data from zookeeper", ke);
// only option is to abort // only option is to abort
@ -179,42 +180,37 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
} }
} }
@Override @Override
public void nodeChildrenChanged(final String path) { public void nodeChildrenChanged(final String path) {
waitUntilStarted(); waitUntilStarted();
if (path.equals(aclZNode)) { if (path.equals(aclZNode)) {
try { try {
List<ZKUtil.NodeAndData> nodeList = final List<ZKUtil.NodeAndData> nodeList =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
while (!nodes.compareAndSet(null, nodeList)) { // preempt any existing nodeChildrenChanged event processing
try { if (childrenChangedFuture != null && !childrenChangedFuture.isDone()) {
Thread.sleep(20); boolean cancelled = childrenChangedFuture.cancel(true);
} catch (InterruptedException e) { if (!cancelled) {
LOG.warn("Interrupted while setting node list", e); // task may have finished between our check and attempted cancel, this is fine.
Thread.currentThread().interrupt(); if (! childrenChangedFuture.isDone()) {
LOG.warn("Could not cancel processing node children changed event, " +
"please file a JIRA and attach logs if possible.");
}
} }
} }
childrenChangedFuture = asyncProcessNodeUpdate(() -> refreshNodes(nodeList));
} catch (KeeperException ke) { } catch (KeeperException ke) {
LOG.error("Error reading data from zookeeper for path "+path, ke); LOG.error("Error reading data from zookeeper for path "+path, ke);
watcher.abort("ZooKeeper error get node children for path "+path, ke); watcher.abort("ZooKeeper error get node children for path "+path, ke);
} }
asyncProcessNodeUpdate(new Runnable() {
// allows subsequent nodeChildrenChanged event to preempt current processing of
// nodeChildrenChanged event
@Override
public void run() {
List<ZKUtil.NodeAndData> nodeList = nodes.get();
nodes.set(null);
refreshNodes(nodeList, nodes);
}
});
} }
} }
private void asyncProcessNodeUpdate(Runnable runnable) { private Future<?> asyncProcessNodeUpdate(Runnable runnable) {
if (!executor.isShutdown()) { if (!executor.isShutdown()) {
try { try {
executor.submit(runnable); return executor.submit(runnable);
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
if (executor.isShutdown()) { if (executor.isShutdown()) {
LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown"); LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown");
@ -223,12 +219,13 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
} }
} }
} }
return null; // No task launched so there will be nothing to cancel later
} }
private void refreshNodes(List<ZKUtil.NodeAndData> nodes, AtomicReference ref) { private void refreshNodes(List<ZKUtil.NodeAndData> nodes) {
for (ZKUtil.NodeAndData n : nodes) { for (ZKUtil.NodeAndData n : nodes) {
if (ref != null && ref.get() != null) { if (Thread.interrupted()) {
// there is a newer list // Use Thread.interrupted so that we clear interrupt status
break; break;
} }
if (n.isEmpty()) continue; if (n.isEmpty()) continue;

View File

@ -44,8 +44,8 @@ import org.junit.experimental.categories.Category;
* Test the reading and writing of access permissions to and from zookeeper. * Test the reading and writing of access permissions to and from zookeeper.
*/ */
@Category({SecurityTests.class, LargeTests.class}) @Category({SecurityTests.class, LargeTests.class})
public class TestZKPermissionsWatcher { public class TestZKPermissionWatcher {
private static final Log LOG = LogFactory.getLog(TestZKPermissionsWatcher.class); private static final Log LOG = LogFactory.getLog(TestZKPermissionWatcher.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static TableAuthManager AUTH_A; private static TableAuthManager AUTH_A;
private static TableAuthManager AUTH_B; private static TableAuthManager AUTH_B;
@ -91,7 +91,7 @@ public class TestZKPermissionsWatcher {
Configuration conf = UTIL.getConfiguration(); Configuration conf = UTIL.getConfiguration();
User george = User.createUserForTesting(conf, "george", new String[] { }); User george = User.createUserForTesting(conf, "george", new String[] { });
User hubert = User.createUserForTesting(conf, "hubert", new String[] { }); User hubert = User.createUserForTesting(conf, "hubert", new String[] { });
assertFalse(AUTH_A.authorizeUser(george, TEST_TABLE, null, assertFalse(AUTH_A.authorizeUser(george, TEST_TABLE, null,
TablePermission.Action.READ)); TablePermission.Action.READ));
assertFalse(AUTH_A.authorizeUser(george, TEST_TABLE, null, assertFalse(AUTH_A.authorizeUser(george, TEST_TABLE, null,