HBASE-18628 Fix event pre-emption in ZKPermWatcher

Instead of using an Atomic Reference to data and aborting when we detect
that new data comes in, use the native cancellation/pre-emption features
of Java Future.

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Mike Drob 2017-08-21 16:23:27 -05:00 committed by Andrew Purtell
parent 19a80c8234
commit 362a2924d2
1 changed files with 30 additions and 30 deletions

View File

@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
@ -55,12 +56,11 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
private static final Log LOG = LogFactory.getLog(ZKPermissionWatcher.class);
// parent node for permissions lists
static final String ACL_NODE = "acl";
TableAuthManager authManager;
String aclZNode;
CountDownLatch initialized = new CountDownLatch(1);
AtomicReference<List<ZKUtil.NodeAndData>> nodes =
new AtomicReference<List<ZKUtil.NodeAndData>>(null);
ExecutorService executor;
private final TableAuthManager authManager;
private final String aclZNode;
private final CountDownLatch initialized = new CountDownLatch(1);
private final ExecutorService executor;
private Future<?> childrenChangedFuture;
public ZKPermissionWatcher(ZooKeeperWatcher watcher,
TableAuthManager authManager, Configuration conf) {
@ -83,7 +83,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
List<ZKUtil.NodeAndData> existing =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
if (existing != null) {
refreshNodes(existing, null);
refreshNodes(existing);
}
return null;
}
@ -127,7 +127,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
try {
List<ZKUtil.NodeAndData> nodes =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
refreshNodes(nodes, null);
refreshNodes(nodes);
} catch (KeeperException ke) {
LOG.error("Error reading data from zookeeper", ke);
// only option is to abort
@ -185,37 +185,36 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
waitUntilStarted();
if (path.equals(aclZNode)) {
try {
List<ZKUtil.NodeAndData> nodeList =
final List<ZKUtil.NodeAndData> nodeList =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
while (!nodes.compareAndSet(null, nodeList)) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
LOG.warn("Interrupted while setting node list", e);
Thread.currentThread().interrupt();
// preempt any existing nodeChildrenChanged event processing
if (childrenChangedFuture != null && !childrenChangedFuture.isDone()) {
boolean cancelled = childrenChangedFuture.cancel(true);
if (!cancelled) {
// task may have finished between our check and attempted cancel, this is fine.
if (! childrenChangedFuture.isDone()) {
LOG.warn("Could not cancel processing node children changed event, " +
"please file a JIRA and attach logs if possible.");
}
}
}
childrenChangedFuture = asyncProcessNodeUpdate(new Runnable() {
@Override
public void run() {
refreshNodes(nodeList);
}
});
} catch (KeeperException ke) {
LOG.error("Error reading data from zookeeper for path "+path, ke);
watcher.abort("Zookeeper error get node children for path "+path, ke);
}
asyncProcessNodeUpdate(new Runnable() {
// allows subsequent nodeChildrenChanged event to preempt current processing of
// nodeChildrenChanged event
@Override
public void run() {
List<ZKUtil.NodeAndData> nodeList = nodes.get();
nodes.set(null);
refreshNodes(nodeList, nodes);
}
});
}
}
private void asyncProcessNodeUpdate(Runnable runnable) {
private Future<?> asyncProcessNodeUpdate(Runnable runnable) {
if (!executor.isShutdown()) {
try {
executor.submit(runnable);
return executor.submit(runnable);
} catch (RejectedExecutionException e) {
if (executor.isShutdown()) {
LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown");
@ -224,12 +223,13 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
}
}
}
return null; // No task launched so there will be nothing to cancel later
}
private void refreshNodes(List<ZKUtil.NodeAndData> nodes, AtomicReference ref) {
private void refreshNodes(List<ZKUtil.NodeAndData> nodes) {
for (ZKUtil.NodeAndData n : nodes) {
if (ref != null && ref.get() != null) {
// there is a newer list
if (Thread.interrupted()) {
// Use Thread.interrupted so that we clear interrupt status
break;
}
if (n.isEmpty()) continue;