HBASE-17374 ZKPermissionWatcher crashed when grant after region close (Liu Junhong)

This commit is contained in:
tedyu 2016-12-28 19:54:01 -08:00
parent 05b1d918b0
commit 001a26d404
1 changed files with 19 additions and 4 deletions

View File

@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
@ -120,7 +121,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
public void nodeCreated(String path) { public void nodeCreated(String path) {
waitUntilStarted(); waitUntilStarted();
if (path.equals(aclZNode)) { if (path.equals(aclZNode)) {
executor.submit(new Runnable() { asyncProcessNodeUpdate(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
@ -141,7 +142,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
public void nodeDeleted(final String path) { public void nodeDeleted(final String path) {
waitUntilStarted(); waitUntilStarted();
if (aclZNode.equals(ZKUtil.getParent(path))) { if (aclZNode.equals(ZKUtil.getParent(path))) {
executor.submit(new Runnable() { asyncProcessNodeUpdate(new Runnable() {
@Override @Override
public void run() { public void run() {
String table = ZKUtil.getNodeName(path); String table = ZKUtil.getNodeName(path);
@ -159,7 +160,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
public void nodeDataChanged(final String path) { public void nodeDataChanged(final String path) {
waitUntilStarted(); waitUntilStarted();
if (aclZNode.equals(ZKUtil.getParent(path))) { if (aclZNode.equals(ZKUtil.getParent(path))) {
executor.submit(new Runnable() { asyncProcessNodeUpdate(new Runnable() {
@Override @Override
public void run() { public void run() {
// update cache on an existing table node // update cache on an existing table node
@ -198,7 +199,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
LOG.error("Error reading data from zookeeper for path "+path, ke); LOG.error("Error reading data from zookeeper for path "+path, ke);
watcher.abort("ZooKeeper error get node children for path "+path, ke); watcher.abort("ZooKeeper error get node children for path "+path, ke);
} }
executor.submit(new Runnable() { asyncProcessNodeUpdate(new Runnable() {
// allows subsequent nodeChildrenChanged event to preempt current processing of // allows subsequent nodeChildrenChanged event to preempt current processing of
// nodeChildrenChanged event // nodeChildrenChanged event
@Override @Override
@ -211,6 +212,20 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
} }
} }
private void asyncProcessNodeUpdate(Runnable runnable) {
if (!executor.isShutdown()) {
try {
executor.submit(runnable);
} catch (RejectedExecutionException e) {
if (executor.isShutdown()) {
LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown");
} else {
throw e;
}
}
}
}
private void refreshNodes(List<ZKUtil.NodeAndData> nodes, AtomicReference ref) { private void refreshNodes(List<ZKUtil.NodeAndData> nodes, AtomicReference ref) {
for (ZKUtil.NodeAndData n : nodes) { for (ZKUtil.NodeAndData n : nodes) {
if (ref != null && ref.get() != null) { if (ref != null && ref.get() != null) {