HBASE-18628 Fix event pre-emption in ZKPermWatcher
Instead of using an Atomic Reference to data and aborting when we detect that new data comes in, use the native cancellation/pre-emption features of Java Future.
This commit is contained in:
parent
c90a4e8c63
commit
5b42d81950
|
@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
@ -55,11 +56,11 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
|
||||||
private static final Log LOG = LogFactory.getLog(ZKPermissionWatcher.class);
|
private static final Log LOG = LogFactory.getLog(ZKPermissionWatcher.class);
|
||||||
// parent node for permissions lists
|
// parent node for permissions lists
|
||||||
static final String ACL_NODE = "acl";
|
static final String ACL_NODE = "acl";
|
||||||
TableAuthManager authManager;
|
private final TableAuthManager authManager;
|
||||||
String aclZNode;
|
private final String aclZNode;
|
||||||
CountDownLatch initialized = new CountDownLatch(1);
|
private final CountDownLatch initialized = new CountDownLatch(1);
|
||||||
AtomicReference<List<ZKUtil.NodeAndData>> nodes = new AtomicReference<>(null);
|
private final ExecutorService executor;
|
||||||
ExecutorService executor;
|
private Future<?> childrenChangedFuture;
|
||||||
|
|
||||||
public ZKPermissionWatcher(ZooKeeperWatcher watcher,
|
public ZKPermissionWatcher(ZooKeeperWatcher watcher,
|
||||||
TableAuthManager authManager, Configuration conf) {
|
TableAuthManager authManager, Configuration conf) {
|
||||||
|
@ -82,7 +83,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
|
||||||
List<ZKUtil.NodeAndData> existing =
|
List<ZKUtil.NodeAndData> existing =
|
||||||
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
|
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
|
||||||
if (existing != null) {
|
if (existing != null) {
|
||||||
refreshNodes(existing, null);
|
refreshNodes(existing);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -126,7 +127,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
|
||||||
try {
|
try {
|
||||||
List<ZKUtil.NodeAndData> nodes =
|
List<ZKUtil.NodeAndData> nodes =
|
||||||
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
|
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
|
||||||
refreshNodes(nodes, null);
|
refreshNodes(nodes);
|
||||||
} catch (KeeperException ke) {
|
} catch (KeeperException ke) {
|
||||||
LOG.error("Error reading data from zookeeper", ke);
|
LOG.error("Error reading data from zookeeper", ke);
|
||||||
// only option is to abort
|
// only option is to abort
|
||||||
|
@ -179,42 +180,37 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void nodeChildrenChanged(final String path) {
|
public void nodeChildrenChanged(final String path) {
|
||||||
waitUntilStarted();
|
waitUntilStarted();
|
||||||
if (path.equals(aclZNode)) {
|
if (path.equals(aclZNode)) {
|
||||||
try {
|
try {
|
||||||
List<ZKUtil.NodeAndData> nodeList =
|
final List<ZKUtil.NodeAndData> nodeList =
|
||||||
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
|
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
|
||||||
while (!nodes.compareAndSet(null, nodeList)) {
|
// preempt any existing nodeChildrenChanged event processing
|
||||||
try {
|
if (childrenChangedFuture != null && !childrenChangedFuture.isDone()) {
|
||||||
Thread.sleep(20);
|
boolean cancelled = childrenChangedFuture.cancel(true);
|
||||||
} catch (InterruptedException e) {
|
if (!cancelled) {
|
||||||
LOG.warn("Interrupted while setting node list", e);
|
// task may have finished between our check and attempted cancel, this is fine.
|
||||||
Thread.currentThread().interrupt();
|
if (! childrenChangedFuture.isDone()) {
|
||||||
|
LOG.warn("Could not cancel processing node children changed event, " +
|
||||||
|
"please file a JIRA and attach logs if possible.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
childrenChangedFuture = asyncProcessNodeUpdate(() -> refreshNodes(nodeList));
|
||||||
} catch (KeeperException ke) {
|
} catch (KeeperException ke) {
|
||||||
LOG.error("Error reading data from zookeeper for path "+path, ke);
|
LOG.error("Error reading data from zookeeper for path "+path, ke);
|
||||||
watcher.abort("ZooKeeper error get node children for path "+path, ke);
|
watcher.abort("ZooKeeper error get node children for path "+path, ke);
|
||||||
}
|
}
|
||||||
asyncProcessNodeUpdate(new Runnable() {
|
|
||||||
// allows subsequent nodeChildrenChanged event to preempt current processing of
|
|
||||||
// nodeChildrenChanged event
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
List<ZKUtil.NodeAndData> nodeList = nodes.get();
|
|
||||||
nodes.set(null);
|
|
||||||
refreshNodes(nodeList, nodes);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void asyncProcessNodeUpdate(Runnable runnable) {
|
private Future<?> asyncProcessNodeUpdate(Runnable runnable) {
|
||||||
if (!executor.isShutdown()) {
|
if (!executor.isShutdown()) {
|
||||||
try {
|
try {
|
||||||
executor.submit(runnable);
|
return executor.submit(runnable);
|
||||||
} catch (RejectedExecutionException e) {
|
} catch (RejectedExecutionException e) {
|
||||||
if (executor.isShutdown()) {
|
if (executor.isShutdown()) {
|
||||||
LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown");
|
LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown");
|
||||||
|
@ -223,12 +219,13 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return null; // No task launched so there will be nothing to cancel later
|
||||||
}
|
}
|
||||||
|
|
||||||
private void refreshNodes(List<ZKUtil.NodeAndData> nodes, AtomicReference ref) {
|
private void refreshNodes(List<ZKUtil.NodeAndData> nodes) {
|
||||||
for (ZKUtil.NodeAndData n : nodes) {
|
for (ZKUtil.NodeAndData n : nodes) {
|
||||||
if (ref != null && ref.get() != null) {
|
if (Thread.interrupted()) {
|
||||||
// there is a newer list
|
// Use Thread.interrupted so that we clear interrupt status
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (n.isEmpty()) continue;
|
if (n.isEmpty()) continue;
|
||||||
|
|
|
@ -44,8 +44,8 @@ import org.junit.experimental.categories.Category;
|
||||||
* Test the reading and writing of access permissions to and from zookeeper.
|
* Test the reading and writing of access permissions to and from zookeeper.
|
||||||
*/
|
*/
|
||||||
@Category({SecurityTests.class, LargeTests.class})
|
@Category({SecurityTests.class, LargeTests.class})
|
||||||
public class TestZKPermissionsWatcher {
|
public class TestZKPermissionWatcher {
|
||||||
private static final Log LOG = LogFactory.getLog(TestZKPermissionsWatcher.class);
|
private static final Log LOG = LogFactory.getLog(TestZKPermissionWatcher.class);
|
||||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
private static TableAuthManager AUTH_A;
|
private static TableAuthManager AUTH_A;
|
||||||
private static TableAuthManager AUTH_B;
|
private static TableAuthManager AUTH_B;
|
Loading…
Reference in New Issue