HBASE-17374 ZKPermissionWatcher crashed when grant after region close (Liu Junhong)

This commit is contained in:
tedyu 2016-12-29 05:46:40 -08:00
parent ce33cf2d3d
commit cec40f941f
1 changed files with 19 additions and 4 deletions

View File

@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
/**
@ -120,7 +121,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
public void nodeCreated(String path) {
waitUntilStarted();
if (path.equals(aclZNode)) {
executor.submit(new Runnable() {
asyncProcessNodeUpdate(new Runnable() {
@Override
public void run() {
try {
@ -141,7 +142,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
public void nodeDeleted(final String path) {
waitUntilStarted();
if (aclZNode.equals(ZKUtil.getParent(path))) {
executor.submit(new Runnable() {
asyncProcessNodeUpdate(new Runnable() {
@Override
public void run() {
String table = ZKUtil.getNodeName(path);
@ -159,7 +160,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
public void nodeDataChanged(final String path) {
waitUntilStarted();
if (aclZNode.equals(ZKUtil.getParent(path))) {
executor.submit(new Runnable() {
asyncProcessNodeUpdate(new Runnable() {
@Override
public void run() {
// update cache on an existing table node
@ -198,7 +199,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
LOG.error("Error reading data from zookeeper for path "+path, ke);
watcher.abort("Zookeeper error get node children for path "+path, ke);
}
executor.submit(new Runnable() {
asyncProcessNodeUpdate(new Runnable() {
// allows subsequent nodeChildrenChanged event to preempt current processing of
// nodeChildrenChanged event
@Override
@ -211,6 +212,20 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
}
}
private void asyncProcessNodeUpdate(Runnable runnable) {
if (!executor.isShutdown()) {
try {
executor.submit(runnable);
} catch (RejectedExecutionException e) {
if (executor.isShutdown()) {
LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown");
} else {
throw e;
}
}
}
}
private void refreshNodes(List<ZKUtil.NodeAndData> nodes, AtomicReference ref) {
for (ZKUtil.NodeAndData n : nodes) {
if (ref != null && ref.get() != null) {