HBASE-7847 Use zookeeper multi to clear znodes (Rakesh R)
This commit is contained in:
parent
3bf34993a8
commit
80dfdc5244
|
@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Deque;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -1350,14 +1351,106 @@ public class ZKUtil {
|
||||||
*
|
*
|
||||||
* Sets no watches. Throws all exceptions besides dealing with deletion of
|
* Sets no watches. Throws all exceptions besides dealing with deletion of
|
||||||
* children.
|
* children.
|
||||||
|
*
|
||||||
|
* If hbase.zookeeper.useMulti is true, use ZooKeeper's multi-update functionality.
|
||||||
|
* Otherwise, run the list of operations sequentially.
|
||||||
|
*
|
||||||
|
* @throws KeeperException
|
||||||
*/
|
*/
|
||||||
public static void deleteChildrenRecursively(ZooKeeperWatcher zkw, String node)
|
public static void deleteChildrenRecursively(ZooKeeperWatcher zkw, String node)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
List<String> children = ZKUtil.listChildrenNoWatch(zkw, node);
|
deleteChildrenRecursivelyMultiOrSequential(zkw, true, node);
|
||||||
if (children == null || children.isEmpty()) return;
|
|
||||||
for(String child : children) {
|
|
||||||
deleteNodeRecursively(zkw, joinZNode(node, child));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete all the children of the specified node but not the node itself. This
|
||||||
|
* will first traverse the znode tree for listing the children and then delete
|
||||||
|
* these znodes using multi-update api or sequential based on the specified
|
||||||
|
* configurations.
|
||||||
|
* <p>
|
||||||
|
* Sets no watches. Throws all exceptions besides dealing with deletion of
|
||||||
|
* children.
|
||||||
|
* <p>
|
||||||
|
* If hbase.zookeeper.useMulti is true, use ZooKeeper's multi-update
|
||||||
|
* functionality. Otherwise, run the list of operations sequentially.
|
||||||
|
* <p>
|
||||||
|
* If all of the following are true:
|
||||||
|
* <ul>
|
||||||
|
* <li>runSequentialOnMultiFailure is true
|
||||||
|
* <li>hbase.zookeeper.useMulti is true
|
||||||
|
* </ul>
|
||||||
|
* on calling multi, we get a ZooKeeper exception that can be handled by a
|
||||||
|
* sequential call(*), we retry the operations one-by-one (sequentially).
|
||||||
|
*
|
||||||
|
* @param zkw
|
||||||
|
* - zk reference
|
||||||
|
* @param runSequentialOnMultiFailure
|
||||||
|
* - if true when we get a ZooKeeper exception that could retry the
|
||||||
|
* operations one-by-one (sequentially)
|
||||||
|
* @param pathRoots
|
||||||
|
* - path of the parent node(s)
|
||||||
|
* @throws KeeperException.NotEmptyException
|
||||||
|
* if node has children while deleting
|
||||||
|
* @throws KeeperException
|
||||||
|
* if unexpected ZooKeeper exception
|
||||||
|
* @throws IllegalArgumentException
|
||||||
|
* if an invalid path is specified
|
||||||
|
*/
|
||||||
|
public static void deleteChildrenRecursivelyMultiOrSequential(
|
||||||
|
ZooKeeperWatcher zkw, boolean runSequentialOnMultiFailure,
|
||||||
|
String... pathRoots) throws KeeperException {
|
||||||
|
if (pathRoots == null || pathRoots.length <= 0) {
|
||||||
|
LOG.warn("Given path is not valid!");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
List<ZKUtilOp> ops = new ArrayList<ZKUtil.ZKUtilOp>();
|
||||||
|
for (String eachRoot : pathRoots) {
|
||||||
|
List<String> children = listChildrenBFSNoWatch(zkw, eachRoot);
|
||||||
|
// Delete the leaves first and eventually get rid of the root
|
||||||
|
for (int i = children.size() - 1; i >= 0; --i) {
|
||||||
|
ops.add(ZKUtilOp.deleteNodeFailSilent(children.get(i)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// atleast one element should exist
|
||||||
|
if (ops.size() > 0) {
|
||||||
|
multiOrSequential(zkw, ops, runSequentialOnMultiFailure);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BFS Traversal of all the children under path, with the entries in the list,
|
||||||
|
* in the same order as that of the traversal. Lists all the children without
|
||||||
|
* setting any watches.
|
||||||
|
*
|
||||||
|
* @param zkw
|
||||||
|
* - zk reference
|
||||||
|
* @param znode
|
||||||
|
* - path of node
|
||||||
|
* @return list of children znodes under the path
|
||||||
|
* @throws KeeperException
|
||||||
|
* if unexpected ZooKeeper exception
|
||||||
|
*/
|
||||||
|
private static List<String> listChildrenBFSNoWatch(ZooKeeperWatcher zkw,
|
||||||
|
final String znode) throws KeeperException {
|
||||||
|
Deque<String> queue = new LinkedList<String>();
|
||||||
|
List<String> tree = new ArrayList<String>();
|
||||||
|
queue.add(znode);
|
||||||
|
while (true) {
|
||||||
|
String node = queue.pollFirst();
|
||||||
|
if (node == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
List<String> children = listChildrenNoWatch(zkw, node);
|
||||||
|
if (children == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
for (final String child : children) {
|
||||||
|
final String childPath = node + "/" + child;
|
||||||
|
queue.add(childPath);
|
||||||
|
tree.add(childPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tree;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -24,7 +24,9 @@ package org.apache.hadoop.hbase.zookeeper;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -34,7 +36,10 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
|
||||||
|
import org.apache.zookeeper.CreateMode;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.apache.zookeeper.Op;
|
||||||
|
import org.apache.zookeeper.ZooDefs.Ids;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -74,7 +79,7 @@ public class TestZKMulti {
|
||||||
TEST_UTIL.shutdownMiniZKCluster();
|
TEST_UTIL.shutdownMiniZKCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout=60000)
|
||||||
public void testSimpleMulti() throws Exception {
|
public void testSimpleMulti() throws Exception {
|
||||||
// null multi
|
// null multi
|
||||||
ZKUtil.multiOrSequential(zkw, null, false);
|
ZKUtil.multiOrSequential(zkw, null, false);
|
||||||
|
@ -103,7 +108,7 @@ public class TestZKMulti {
|
||||||
assertTrue(ZKUtil.checkExists(zkw, path) == -1);
|
assertTrue(ZKUtil.checkExists(zkw, path) == -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout=60000)
|
||||||
public void testComplexMulti() throws Exception {
|
public void testComplexMulti() throws Exception {
|
||||||
String path1 = ZKUtil.joinZNode(zkw.baseZNode, "testComplexMulti1");
|
String path1 = ZKUtil.joinZNode(zkw.baseZNode, "testComplexMulti1");
|
||||||
String path2 = ZKUtil.joinZNode(zkw.baseZNode, "testComplexMulti2");
|
String path2 = ZKUtil.joinZNode(zkw.baseZNode, "testComplexMulti2");
|
||||||
|
@ -145,7 +150,7 @@ public class TestZKMulti {
|
||||||
assertTrue(Bytes.equals(ZKUtil.getData(zkw, path6), Bytes.toBytes(path6)));
|
assertTrue(Bytes.equals(ZKUtil.getData(zkw, path6), Bytes.toBytes(path6)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout=60000)
|
||||||
public void testSingleFailure() throws Exception {
|
public void testSingleFailure() throws Exception {
|
||||||
// try to delete a node that doesn't exist
|
// try to delete a node that doesn't exist
|
||||||
boolean caughtNoNode = false;
|
boolean caughtNoNode = false;
|
||||||
|
@ -183,7 +188,7 @@ public class TestZKMulti {
|
||||||
assertTrue(caughtNodeExists);
|
assertTrue(caughtNodeExists);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout=60000)
|
||||||
public void testSingleFailureInMulti() throws Exception {
|
public void testSingleFailureInMulti() throws Exception {
|
||||||
// try a multi where all but one operation succeeds
|
// try a multi where all but one operation succeeds
|
||||||
String pathA = ZKUtil.joinZNode(zkw.baseZNode, "testSingleFailureInMultiA");
|
String pathA = ZKUtil.joinZNode(zkw.baseZNode, "testSingleFailureInMultiA");
|
||||||
|
@ -206,7 +211,7 @@ public class TestZKMulti {
|
||||||
assertTrue(ZKUtil.checkExists(zkw, pathC) == -1);
|
assertTrue(ZKUtil.checkExists(zkw, pathC) == -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout=60000)
|
||||||
public void testMultiFailure() throws Exception {
|
public void testMultiFailure() throws Exception {
|
||||||
String pathX = ZKUtil.joinZNode(zkw.baseZNode, "testMultiFailureX");
|
String pathX = ZKUtil.joinZNode(zkw.baseZNode, "testMultiFailureX");
|
||||||
String pathY = ZKUtil.joinZNode(zkw.baseZNode, "testMultiFailureY");
|
String pathY = ZKUtil.joinZNode(zkw.baseZNode, "testMultiFailureY");
|
||||||
|
@ -260,7 +265,7 @@ public class TestZKMulti {
|
||||||
assertTrue(ZKUtil.checkExists(zkw, pathV) == -1);
|
assertTrue(ZKUtil.checkExists(zkw, pathV) == -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout=60000)
|
||||||
public void testRunSequentialOnMultiFailure() throws Exception {
|
public void testRunSequentialOnMultiFailure() throws Exception {
|
||||||
String path1 = ZKUtil.joinZNode(zkw.baseZNode, "runSequential1");
|
String path1 = ZKUtil.joinZNode(zkw.baseZNode, "runSequential1");
|
||||||
String path2 = ZKUtil.joinZNode(zkw.baseZNode, "runSequential2");
|
String path2 = ZKUtil.joinZNode(zkw.baseZNode, "runSequential2");
|
||||||
|
@ -288,4 +293,72 @@ public class TestZKMulti {
|
||||||
assertTrue(ZKUtil.checkExists(zkw, path3) == -1);
|
assertTrue(ZKUtil.checkExists(zkw, path3) == -1);
|
||||||
assertFalse(ZKUtil.checkExists(zkw, path4) == -1);
|
assertFalse(ZKUtil.checkExists(zkw, path4) == -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies that for the given root node, it should delete all the child nodes
|
||||||
|
* recursively using multi-update api.
|
||||||
|
*/
|
||||||
|
@Test (timeout=60000)
|
||||||
|
public void testdeleteChildrenRecursivelyMulti() throws Exception {
|
||||||
|
String parentZNode = "/testRootMulti";
|
||||||
|
createZNodeTree(parentZNode);
|
||||||
|
|
||||||
|
ZKUtil.deleteChildrenRecursivelyMultiOrSequential(zkw, true, parentZNode);
|
||||||
|
|
||||||
|
assertTrue("Wrongly deleted parent znode!",
|
||||||
|
ZKUtil.checkExists(zkw, parentZNode) > -1);
|
||||||
|
List<String> children = zkw.getRecoverableZooKeeper().getChildren(
|
||||||
|
parentZNode, false);
|
||||||
|
assertTrue("Failed to delete child znodes!", 0 == children.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies that for the given root node, it should delete all the child nodes
|
||||||
|
* recursively using normal sequential way.
|
||||||
|
*/
|
||||||
|
@Test (timeout=60000)
|
||||||
|
public void testdeleteChildrenRecursivelySequential() throws Exception {
|
||||||
|
String parentZNode = "/testRootSeq";
|
||||||
|
createZNodeTree(parentZNode);
|
||||||
|
boolean useMulti = zkw.getConfiguration().getBoolean(
|
||||||
|
"hbase.zookeeper.useMulti", false);
|
||||||
|
zkw.getConfiguration().setBoolean("hbase.zookeeper.useMulti", false);
|
||||||
|
try {
|
||||||
|
// disables the multi-update api execution
|
||||||
|
ZKUtil.deleteChildrenRecursivelyMultiOrSequential(zkw, true, parentZNode);
|
||||||
|
|
||||||
|
assertTrue("Wrongly deleted parent znode!",
|
||||||
|
ZKUtil.checkExists(zkw, parentZNode) > -1);
|
||||||
|
List<String> children = zkw.getRecoverableZooKeeper().getChildren(
|
||||||
|
parentZNode, false);
|
||||||
|
assertTrue("Failed to delete child znodes!", 0 == children.size());
|
||||||
|
} finally {
|
||||||
|
// sets back the multi-update api execution
|
||||||
|
zkw.getConfiguration().setBoolean("hbase.zookeeper.useMulti", useMulti);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createZNodeTree(String rootZNode) throws KeeperException,
|
||||||
|
InterruptedException {
|
||||||
|
List<Op> opList = new ArrayList<Op>();
|
||||||
|
opList.add(Op.create(rootZNode, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
||||||
|
CreateMode.PERSISTENT));
|
||||||
|
int level = 0;
|
||||||
|
String parentZNode = rootZNode;
|
||||||
|
while (level < 10) {
|
||||||
|
// define parent node
|
||||||
|
parentZNode = parentZNode + "/" + level;
|
||||||
|
opList.add(Op.create(parentZNode, new byte[0], Ids.OPEN_ACL_UNSAFE,
|
||||||
|
CreateMode.PERSISTENT));
|
||||||
|
int elements = 0;
|
||||||
|
// add elements to the parent node
|
||||||
|
while (elements < level) {
|
||||||
|
opList.add(Op.create(parentZNode + "/" + elements, new byte[0],
|
||||||
|
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
|
||||||
|
elements++;
|
||||||
|
}
|
||||||
|
level++;
|
||||||
|
}
|
||||||
|
zkw.getRecoverableZooKeeper().multi(opList);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue