HBASE-22638 ZooKeeper Utility enhancements

Signed-off-by: Jan Hentschel <jan.hentschel@ultratendency.com>
This commit is contained in:
Viraj Jasani 2019-07-13 22:20:30 +05:30 committed by Jan Hentschel
parent c5f879483d
commit 202d2adba1
6 changed files with 88 additions and 74 deletions

View File

@ -56,21 +56,21 @@ public class MiniZooKeeperCluster {
private static final int TICK_TIME = 2000; private static final int TICK_TIME = 2000;
private static final int DEFAULT_CONNECTION_TIMEOUT = 30000; private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
private static final byte[] STATIC_BYTES = Bytes.toBytes("stat"); private static final byte[] STATIC_BYTES = Bytes.toBytes("stat");
private int connectionTimeout; private final int connectionTimeout;
private boolean started; private boolean started;
/** The default port. If zero, we use a random port. */ /** The default port. If zero, we use a random port. */
private int defaultClientPort = 0; private int defaultClientPort = 0;
private List<NIOServerCnxnFactory> standaloneServerFactoryList; private final List<NIOServerCnxnFactory> standaloneServerFactoryList;
private List<ZooKeeperServer> zooKeeperServers; private final List<ZooKeeperServer> zooKeeperServers;
private List<Integer> clientPortList; private final List<Integer> clientPortList;
private int activeZKServerIndex; private int activeZKServerIndex;
private int tickTime = 0; private int tickTime = 0;
private Configuration configuration; private final Configuration configuration;
public MiniZooKeeperCluster() { public MiniZooKeeperCluster() {
this(new Configuration()); this(new Configuration());
@ -98,6 +98,7 @@ public class MiniZooKeeperCluster {
/** /**
* Get the list of client ports. * Get the list of client ports.
*
* @return clientPortList the client port list * @return clientPortList the client port list
*/ */
@VisibleForTesting @VisibleForTesting
@ -143,7 +144,8 @@ public class MiniZooKeeperCluster {
} }
} }
// Make sure that the port is unused. // Make sure that the port is unused.
while (true) { // break when an unused port is found
do {
for (i = 0; i < clientPortList.size(); i++) { for (i = 0; i < clientPortList.size(); i++) {
if (returnClientPort == clientPortList.get(i)) { if (returnClientPort == clientPortList.get(i)) {
// Already used. Update the port and retry. // Already used. Update the port and retry.
@ -151,10 +153,7 @@ public class MiniZooKeeperCluster {
break; break;
} }
} }
if (i == clientPortList.size()) { } while (i != clientPortList.size());
break; // found a unused port, exit
}
}
return returnClientPort; return returnClientPort;
} }
@ -163,7 +162,7 @@ public class MiniZooKeeperCluster {
} }
public int getBackupZooKeeperServerNum() { public int getBackupZooKeeperServerNum() {
return zooKeeperServers.size()-1; return zooKeeperServers.size() - 1;
} }
public int getZooKeeperServerNum() { public int getZooKeeperServerNum() {
@ -179,7 +178,7 @@ public class MiniZooKeeperCluster {
System.setProperty("zookeeper.preAllocSize", "100"); System.setProperty("zookeeper.preAllocSize", "100");
FileTxnLog.setPreallocSize(100 * 1024); FileTxnLog.setPreallocSize(100 * 1024);
// allow all 4 letter words // allow all 4 letter words
System.setProperty("zookeeper.4lw.commands.whitelist","*"); System.setProperty("zookeeper.4lw.commands.whitelist", "*");
} }
public int startup(File baseDir) throws IOException, InterruptedException { public int startup(File baseDir) throws IOException, InterruptedException {
@ -212,7 +211,7 @@ public class MiniZooKeeperCluster {
// running all the ZK servers // running all the ZK servers
for (int i = 0; i < numZooKeeperServers; i++) { for (int i = 0; i < numZooKeeperServers; i++) {
File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile(); File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile();
createDir(dir); createDir(dir);
int tickTimeToUse; int tickTimeToUse;
if (this.tickTime > 0) { if (this.tickTime > 0) {
@ -268,8 +267,7 @@ public class MiniZooKeeperCluster {
// We have selected a port as a client port. Update clientPortList if necessary. // We have selected a port as a client port. Update clientPortList if necessary.
if (clientPortList.size() <= i) { // it is not in the list, add the port if (clientPortList.size() <= i) { // it is not in the list, add the port
clientPortList.add(currentClientPort); clientPortList.add(currentClientPort);
} } else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port
else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port
clientPortList.remove(i); clientPortList.remove(i);
clientPortList.add(i, currentClientPort); clientPortList.add(i, currentClientPort);
} }
@ -405,13 +403,10 @@ public class MiniZooKeeperCluster {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
while (true) { while (true) {
try { try {
Socket sock = new Socket("localhost", port); try (Socket sock = new Socket("localhost", port)) {
try {
OutputStream outstream = sock.getOutputStream(); OutputStream outstream = sock.getOutputStream();
outstream.write(STATIC_BYTES); outstream.write(STATIC_BYTES);
outstream.flush(); outstream.flush();
} finally {
sock.close();
} }
} catch (IOException e) { } catch (IOException e) {
return true; return true;

View File

@ -78,10 +78,10 @@ public class RecoverableZooKeeper {
// An identifier of this process in the cluster // An identifier of this process in the cluster
private final String identifier; private final String identifier;
private final byte[] id; private final byte[] id;
private Watcher watcher; private final Watcher watcher;
private int sessionTimeout; private final int sessionTimeout;
private String quorumServers; private final String quorumServers;
private int maxMultiSize; private final int maxMultiSize;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE", @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
justification="None. Its always been this way.") justification="None. Its always been this way.")

View File

@ -67,13 +67,10 @@ public class ZKAclReset extends Configured implements Tool {
private static void resetAcls(final Configuration conf, boolean eraseAcls) private static void resetAcls(final Configuration conf, boolean eraseAcls)
throws Exception { throws Exception {
ZKWatcher zkw = new ZKWatcher(conf, "ZKAclReset", null); try (ZKWatcher zkw = new ZKWatcher(conf, "ZKAclReset", null)) {
try {
LOG.info((eraseAcls ? "Erase" : "Set") + " HBase ACLs for " + LOG.info((eraseAcls ? "Erase" : "Set") + " HBase ACLs for " +
zkw.getQuorum() + " " + zkw.getZNodePaths().baseZNode); zkw.getQuorum() + " " + zkw.getZNodePaths().baseZNode);
resetAcls(zkw, zkw.getZNodePaths().baseZNode, eraseAcls); resetAcls(zkw, zkw.getZNodePaths().baseZNode, eraseAcls);
} finally {
zkw.close();
} }
} }
@ -96,13 +93,20 @@ public class ZKAclReset extends Configured implements Tool {
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
boolean eraseAcls = true; boolean eraseAcls = true;
for (int i = 0; i < args.length; ++i) { for (String arg : args) {
if (args[i].equals("-help")) { switch (arg) {
printUsageAndExit(); case "-help": {
} else if (args[i].equals("-set-acls")) { printUsageAndExit();
eraseAcls = false; break;
} else { }
printUsageAndExit(); case "-set-acls": {
eraseAcls = false;
break;
}
default: {
printUsageAndExit();
break;
}
} }
} }

View File

@ -35,8 +35,8 @@ import org.apache.zookeeper.KeeperException;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ZKClusterId { public class ZKClusterId {
private ZKWatcher watcher; private final ZKWatcher watcher;
private Abortable abortable; private final Abortable abortable;
private String id; private String id;
public ZKClusterId(ZKWatcher watcher, Abortable abortable) { public ZKClusterId(ZKWatcher watcher, Abortable abortable) {

View File

@ -45,9 +45,9 @@ public class ZKLeaderManager extends ZKListener {
private final Object lock = new Object(); private final Object lock = new Object();
private final AtomicBoolean leaderExists = new AtomicBoolean(); private final AtomicBoolean leaderExists = new AtomicBoolean();
private String leaderZNode; private final String leaderZNode;
private byte[] nodeId; private final byte[] nodeId;
private Stoppable candidate; private final Stoppable candidate;
public ZKLeaderManager(ZKWatcher watcher, String leaderZNode, public ZKLeaderManager(ZKWatcher watcher, String leaderZNode,
byte[] identifier, Stoppable candidate) { byte[] identifier, Stoppable candidate) {

View File

@ -246,7 +246,7 @@ public final class ZKUtil {
private static final Map<String, String> BASIC_JAAS_OPTIONS = new HashMap<>(); private static final Map<String, String> BASIC_JAAS_OPTIONS = new HashMap<>();
static { static {
String jaasEnvVar = System.getenv("HBASE_JAAS_DEBUG"); String jaasEnvVar = System.getenv("HBASE_JAAS_DEBUG");
if (jaasEnvVar != null && "true".equalsIgnoreCase(jaasEnvVar)) { if ("true".equalsIgnoreCase(jaasEnvVar)) {
BASIC_JAAS_OPTIONS.put("debug", "true"); BASIC_JAAS_OPTIONS.put("debug", "true");
} }
} }
@ -353,7 +353,7 @@ public final class ZKUtil {
throws KeeperException { throws KeeperException {
try { try {
Stat s = zkw.getRecoverableZooKeeper().exists(znode, zkw); Stat s = zkw.getRecoverableZooKeeper().exists(znode, zkw);
boolean exists = s != null ? true : false; boolean exists = s != null;
if (exists) { if (exists) {
LOG.debug(zkw.prefix("Set watcher on existing znode=" + znode)); LOG.debug(zkw.prefix("Set watcher on existing znode=" + znode));
} else { } else {
@ -443,8 +443,7 @@ public final class ZKUtil {
ZKWatcher zkw, String znode) ZKWatcher zkw, String znode)
throws KeeperException { throws KeeperException {
try { try {
List<String> children = zkw.getRecoverableZooKeeper().getChildren(znode, zkw); return zkw.getRecoverableZooKeeper().getChildren(znode, zkw);
return children;
} catch(KeeperException.NoNodeException ke) { } catch(KeeperException.NoNodeException ke) {
LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " + LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " +
"because node does not exist (not an error)")); "because node does not exist (not an error)"));
@ -1400,9 +1399,16 @@ public final class ZKUtil {
* Chunks the provided {@code ops} when their approximate size exceeds the the configured limit. * Chunks the provided {@code ops} when their approximate size exceeds the the configured limit.
* Take caution that this can ONLY be used for operations where atomicity is not important, * Take caution that this can ONLY be used for operations where atomicity is not important,
* e.g. deletions. It must not be used when atomicity of the operations is critical. * e.g. deletions. It must not be used when atomicity of the operations is critical.
*
* @param zkw reference to the {@link ZKWatcher} which contains configuration and constants
* @param runSequentialOnMultiFailure if true when we get a ZooKeeper exception that could
* retry the operations one-by-one (sequentially)
* @param ops list of ZKUtilOp {@link ZKUtilOp} to partition while submitting batched multi
* or sequential
* @throws KeeperException unexpected ZooKeeper Exception / Zookeeper unreachable
*/ */
static void submitBatchedMultiOrSequential(ZKWatcher zkw, boolean runSequentialOnMultiFailure, private static void submitBatchedMultiOrSequential(ZKWatcher zkw,
List<ZKUtilOp> ops) throws KeeperException { boolean runSequentialOnMultiFailure, List<ZKUtilOp> ops) throws KeeperException {
// at least one element should exist // at least one element should exist
if (ops.isEmpty()) { if (ops.isEmpty()) {
return; return;
@ -1794,9 +1800,12 @@ public final class ZKUtil {
sb.append("<<FAILED LOOKUP: " + e.getMessage() + ">>"); sb.append("<<FAILED LOOKUP: " + e.getMessage() + ">>");
} }
sb.append("\nBackup master addresses:"); sb.append("\nBackup master addresses:");
for (String child : listChildrenNoWatch(zkw, final List<String> backupMasterChildrenNoWatchList = listChildrenNoWatch(zkw,
zkw.getZNodePaths().backupMasterAddressesZNode)) { zkw.getZNodePaths().backupMasterAddressesZNode);
sb.append("\n ").append(child); if (backupMasterChildrenNoWatchList != null) {
for (String child : backupMasterChildrenNoWatchList) {
sb.append("\n ").append(child);
}
} }
sb.append("\nRegion server holding hbase:meta: " sb.append("\nRegion server holding hbase:meta: "
+ MetaTableLocator.getMetaRegionLocation(zkw)); + MetaTableLocator.getMetaRegionLocation(zkw));
@ -1808,8 +1817,12 @@ public final class ZKUtil {
+ MetaTableLocator.getMetaRegionLocation(zkw, i)); + MetaTableLocator.getMetaRegionLocation(zkw, i));
} }
sb.append("\nRegion servers:"); sb.append("\nRegion servers:");
for (String child : listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode)) { final List<String> rsChildrenNoWatchList =
sb.append("\n ").append(child); listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
if (rsChildrenNoWatchList != null) {
for (String child : rsChildrenNoWatchList) {
sb.append("\n ").append(child);
}
} }
try { try {
getReplicationZnodesDump(zkw, sb); getReplicationZnodesDump(zkw, sb);
@ -1860,31 +1873,33 @@ public final class ZKUtil {
// do a ls -r on this znode // do a ls -r on this znode
sb.append("\n").append(replicationZnode).append(": "); sb.append("\n").append(replicationZnode).append(": ");
List<String> children = ZKUtil.listChildrenNoWatch(zkw, replicationZnode); List<String> children = ZKUtil.listChildrenNoWatch(zkw, replicationZnode);
Collections.sort(children); if (children != null) {
for (String child : children) { Collections.sort(children);
String znode = ZNodePaths.joinZNode(replicationZnode, child); for (String child : children) {
if (znode.equals(zkw.getZNodePaths().peersZNode)) { String zNode = ZNodePaths.joinZNode(replicationZnode, child);
appendPeersZnodes(zkw, znode, sb); if (zNode.equals(zkw.getZNodePaths().peersZNode)) {
} else if (znode.equals(zkw.getZNodePaths().queuesZNode)) { appendPeersZnodes(zkw, zNode, sb);
appendRSZnodes(zkw, znode, sb); } else if (zNode.equals(zkw.getZNodePaths().queuesZNode)) {
} else if (znode.equals(zkw.getZNodePaths().hfileRefsZNode)) { appendRSZnodes(zkw, zNode, sb);
appendHFileRefsZnodes(zkw, znode, sb); } else if (zNode.equals(zkw.getZNodePaths().hfileRefsZNode)) {
appendHFileRefsZNodes(zkw, zNode, sb);
}
} }
} }
} }
private static void appendHFileRefsZnodes(ZKWatcher zkw, String hfileRefsZnode, private static void appendHFileRefsZNodes(ZKWatcher zkw, String hFileRefsZNode,
StringBuilder sb) throws KeeperException { StringBuilder sb) throws KeeperException {
sb.append("\n").append(hfileRefsZnode).append(": "); sb.append("\n").append(hFileRefsZNode).append(": ");
for (String peerIdZnode : ZKUtil.listChildrenNoWatch(zkw, hfileRefsZnode)) { final List<String> hFileRefChildrenNoWatchList =
String znodeToProcess = ZNodePaths.joinZNode(hfileRefsZnode, peerIdZnode); ZKUtil.listChildrenNoWatch(zkw, hFileRefsZNode);
sb.append("\n").append(znodeToProcess).append(": "); if (hFileRefChildrenNoWatchList != null) {
List<String> peerHFileRefsZnodes = ZKUtil.listChildrenNoWatch(zkw, znodeToProcess); for (String peerIdZNode : hFileRefChildrenNoWatchList) {
int size = peerHFileRefsZnodes.size(); String zNodeToProcess = ZNodePaths.joinZNode(hFileRefsZNode, peerIdZNode);
for (int i = 0; i < size; i++) { sb.append("\n").append(zNodeToProcess).append(": ");
sb.append(peerHFileRefsZnodes.get(i)); List<String> peerHFileRefsZNodes = ZKUtil.listChildrenNoWatch(zkw, zNodeToProcess);
if (i != size - 1) { if (peerHFileRefsZNodes != null) {
sb.append(", "); sb.append(String.join(", ", peerHFileRefsZNodes));
} }
} }
} }
@ -1996,10 +2011,10 @@ public final class ZKUtil {
* @return The array of response strings. * @return The array of response strings.
* @throws IOException When the socket communication fails. * @throws IOException When the socket communication fails.
*/ */
public static String[] getServerStats(String server, int timeout) private static String[] getServerStats(String server, int timeout)
throws IOException { throws IOException {
String[] sp = server.split(":"); String[] sp = server.split(":");
if (sp == null || sp.length == 0) { if (sp.length == 0) {
return null; return null;
} }
@ -2135,7 +2150,7 @@ public final class ZKUtil {
* @see #logZKTree(ZKWatcher, String) * @see #logZKTree(ZKWatcher, String)
* @throws KeeperException if an unexpected exception occurs * @throws KeeperException if an unexpected exception occurs
*/ */
protected static void logZKTree(ZKWatcher zkw, String root, String prefix) private static void logZKTree(ZKWatcher zkw, String root, String prefix)
throws KeeperException { throws KeeperException {
List<String> children = ZKUtil.listChildrenNoWatch(zkw, root); List<String> children = ZKUtil.listChildrenNoWatch(zkw, root);