HBASE-26614 Refactor code related to "dump"ing ZK nodes (#3969)
The code starting at `ZKUtil.dump(ZKWatcher)` is a small mess – it has cyclic dependencies woven through itself, `ZKWatcher` and `RecoverableZooKeeper`. It also initializes a static variable in `ZKUtil` through the factory for `RecoverableZooKeeper` instances. Let's decouple and clean it up. Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
parent
97f3c1cf7f
commit
625d610bcc
|
@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
|||
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
@ -142,7 +141,7 @@ public class IntegrationTestZKAndFSPermissions extends AbstractHBaseTool {
|
|||
private void testZNodeACLs() throws IOException, KeeperException, InterruptedException {
|
||||
|
||||
ZKWatcher watcher = new ZKWatcher(conf, "IntegrationTestZnodeACLs", null);
|
||||
RecoverableZooKeeper zk = ZKUtil.connect(this.conf, watcher);
|
||||
RecoverableZooKeeper zk = RecoverableZooKeeper.connect(this.conf, watcher);
|
||||
|
||||
String baseZNode = watcher.getZNodePaths().baseZNode;
|
||||
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKDump;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
|
@ -238,7 +239,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
|
|||
} else {
|
||||
// use ZK instead
|
||||
System.out.print("Dumping replication znodes via ZooKeeper:");
|
||||
System.out.println(ZKUtil.getReplicationZnodesDump(zkw));
|
||||
System.out.println(ZKDump.getReplicationZnodesDump(zkw));
|
||||
}
|
||||
return (0);
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -19,9 +19,9 @@
|
|||
--%>
|
||||
<%@ page contentType="text/html;charset=UTF-8"
|
||||
import="org.apache.commons.lang3.StringEscapeUtils"
|
||||
import="org.apache.hadoop.hbase.zookeeper.ZKUtil"
|
||||
import="org.apache.hadoop.hbase.zookeeper.ZKWatcher"
|
||||
import="org.apache.hadoop.hbase.master.HMaster"
|
||||
import="org.apache.hadoop.hbase.zookeeper.ZKDump"
|
||||
import="org.apache.hadoop.hbase.zookeeper.ZKWatcher"
|
||||
%>
|
||||
<%
|
||||
HMaster master = (HMaster)getServletContext().getAttribute(HMaster.MASTER);
|
||||
|
@ -38,7 +38,7 @@
|
|||
</div>
|
||||
<div class="row">
|
||||
<div class="span12">
|
||||
<pre><%= StringEscapeUtils.escapeHtml4(ZKUtil.dump(watcher).trim()) %></pre>
|
||||
<pre><%= StringEscapeUtils.escapeHtml4(ZKDump.dump(watcher).trim()) %></pre>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
|
||||
import org.junit.After;
|
||||
|
@ -282,7 +281,7 @@ public final class TestNamespaceReplicationWithBulkLoadedData extends TestBulkLo
|
|||
// Verify hfile-refs for 1:ns_peer1, expect is empty
|
||||
MiniZooKeeperCluster zkCluster = UTIL1.getZkCluster();
|
||||
ZKWatcher watcher = new ZKWatcher(UTIL1.getConfiguration(), "TestZnodeHFiles-refs", null);
|
||||
RecoverableZooKeeper zk = ZKUtil.connect(UTIL1.getConfiguration(), watcher);
|
||||
RecoverableZooKeeper zk = RecoverableZooKeeper.connect(UTIL1.getConfiguration(), watcher);
|
||||
ZKReplicationQueueStorage replicationQueueStorage =
|
||||
new ZKReplicationQueueStorage(watcher, UTIL1.getConfiguration());
|
||||
Set<String> hfiles = replicationQueueStorage.getAllHFileRefs();
|
||||
|
|
|
@ -477,7 +477,7 @@ module Hbase
|
|||
)
|
||||
zk = @zk_wrapper.getRecoverableZooKeeper.getZooKeeper
|
||||
@zk_main = org.apache.zookeeper.ZooKeeperMain.new(zk)
|
||||
org.apache.hadoop.hbase.zookeeper.ZKUtil.dump(@zk_wrapper)
|
||||
org.apache.hadoop.hbase.zookeeper.ZKDump.dump(@zk_wrapper)
|
||||
end
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
|
|
|
@ -58,7 +58,7 @@ public final class MetaTableLocator {
|
|||
* @return server name or null if we failed to get the data.
|
||||
*/
|
||||
@RestrictedApi(explanation = "Should only be called in tests or ZKUtil", link = "",
|
||||
allowedOnPath = ".*/src/test/.*|.*/ZKUtil\\.java")
|
||||
allowedOnPath = ".*/src/test/.*|.*/ZKDump\\.java")
|
||||
public static ServerName getMetaRegionLocation(final ZKWatcher zkw) {
|
||||
try {
|
||||
RegionState state = getMetaRegionState(zkw);
|
||||
|
@ -75,7 +75,7 @@ public final class MetaTableLocator {
|
|||
* @return server name
|
||||
*/
|
||||
@RestrictedApi(explanation = "Should only be called in self or ZKUtil", link = "",
|
||||
allowedOnPath = ".*(MetaTableLocator|ZKUtil)\\.java")
|
||||
allowedOnPath = ".*(MetaTableLocator|ZKDump)\\.java")
|
||||
public static ServerName getMetaRegionLocation(final ZKWatcher zkw, int replicaId) {
|
||||
try {
|
||||
RegionState state = getMetaRegionState(zkw, replicaId);
|
||||
|
|
|
@ -25,6 +25,8 @@ import java.lang.management.ManagementFactory;
|
|||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||
|
@ -83,6 +85,57 @@ public class RecoverableZooKeeper {
|
|||
private final String quorumServers;
|
||||
private final int maxMultiSize;
|
||||
|
||||
/**
|
||||
* See {@link #connect(Configuration, String, Watcher, String)}
|
||||
*/
|
||||
public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher)
|
||||
throws IOException {
|
||||
String ensemble = ZKConfig.getZKQuorumServersString(conf);
|
||||
return connect(conf, ensemble, watcher);
|
||||
}
|
||||
|
||||
/**
|
||||
* See {@link #connect(Configuration, String, Watcher, String)}
|
||||
*/
|
||||
public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
|
||||
Watcher watcher)
|
||||
throws IOException {
|
||||
return connect(conf, ensemble, watcher, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new connection to ZooKeeper, pulling settings and ensemble config
|
||||
* from the specified configuration object using methods from {@link ZKConfig}.
|
||||
*
|
||||
* Sets the connection status monitoring watcher to the specified watcher.
|
||||
*
|
||||
* @param conf configuration to pull ensemble and other settings from
|
||||
* @param watcher watcher to monitor connection changes
|
||||
* @param ensemble ZooKeeper servers quorum string
|
||||
* @param identifier value used to identify this client instance.
|
||||
* @return connection to zookeeper
|
||||
* @throws IOException if unable to connect to zk or config problem
|
||||
*/
|
||||
public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
|
||||
Watcher watcher, final String identifier)
|
||||
throws IOException {
|
||||
if(ensemble == null) {
|
||||
throw new IOException("Unable to determine ZooKeeper ensemble");
|
||||
}
|
||||
int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
|
||||
HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("{} opening connection to ZooKeeper ensemble={}", identifier, ensemble);
|
||||
}
|
||||
int retry = conf.getInt("zookeeper.recovery.retry", 3);
|
||||
int retryIntervalMillis =
|
||||
conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
|
||||
int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000);
|
||||
int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024*1024);
|
||||
return new RecoverableZooKeeper(ensemble, timeout, watcher,
|
||||
retry, retryIntervalMillis, maxSleepTime, identifier, multiMaxSize);
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
|
||||
justification="None. Its always been this way.")
|
||||
public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
|
||||
|
|
|
@ -0,0 +1,315 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
||||
|
||||
/**
|
||||
* Builds a string containing everything in ZooKeeper. This is inherently invasive into the
|
||||
* structures of other components' logical responsibilities.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class ZKDump {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZKDump.class);
|
||||
|
||||
private ZKDump() {}
|
||||
|
||||
public static String dump(final ZKWatcher zkWatcher) {
|
||||
final int zkDumpConnectionTimeOut = zkWatcher.getConfiguration()
|
||||
.getInt("zookeeper.dump.connection.timeout", 1000);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
try {
|
||||
sb.append("HBase is rooted at ").append(zkWatcher.getZNodePaths().baseZNode);
|
||||
sb.append("\nActive master address: ");
|
||||
try {
|
||||
sb.append("\n ").append(MasterAddressTracker.getMasterAddress(zkWatcher));
|
||||
} catch (IOException e) {
|
||||
sb.append("<<FAILED LOOKUP: ").append(e.getMessage()).append(">>");
|
||||
}
|
||||
sb.append("\nBackup master addresses:");
|
||||
final List<String> backupMasterChildrenNoWatchList = ZKUtil.listChildrenNoWatch(zkWatcher,
|
||||
zkWatcher.getZNodePaths().backupMasterAddressesZNode);
|
||||
if (backupMasterChildrenNoWatchList != null) {
|
||||
for (String child : backupMasterChildrenNoWatchList) {
|
||||
sb.append("\n ").append(child);
|
||||
}
|
||||
}
|
||||
sb.append("\nRegion server holding hbase:meta:");
|
||||
sb.append("\n ").append(MetaTableLocator.getMetaRegionLocation(zkWatcher));
|
||||
int numMetaReplicas = zkWatcher.getMetaReplicaNodes().size();
|
||||
for (int i = 1; i < numMetaReplicas; i++) {
|
||||
sb.append("\n")
|
||||
.append(" replica").append(i).append(": ")
|
||||
.append(MetaTableLocator.getMetaRegionLocation(zkWatcher, i));
|
||||
}
|
||||
sb.append("\nRegion servers:");
|
||||
final List<String> rsChildrenNoWatchList =
|
||||
ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.getZNodePaths().rsZNode);
|
||||
if (rsChildrenNoWatchList != null) {
|
||||
for (String child : rsChildrenNoWatchList) {
|
||||
sb.append("\n ").append(child);
|
||||
}
|
||||
}
|
||||
try {
|
||||
getReplicationZnodesDump(zkWatcher, sb);
|
||||
} catch (KeeperException ke) {
|
||||
LOG.warn("Couldn't get the replication znode dump", ke);
|
||||
}
|
||||
sb.append("\nQuorum Server Statistics:");
|
||||
String[] servers = zkWatcher.getQuorum().split(",");
|
||||
for (String server : servers) {
|
||||
sb.append("\n ").append(server);
|
||||
try {
|
||||
String[] stat = getServerStats(server, zkDumpConnectionTimeOut);
|
||||
|
||||
if (stat == null) {
|
||||
sb.append("[Error] invalid quorum server: ").append(server);
|
||||
break;
|
||||
}
|
||||
|
||||
for (String s : stat) {
|
||||
sb.append("\n ").append(s);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
sb.append("\n ERROR: ").append(e.getMessage());
|
||||
}
|
||||
}
|
||||
} catch (KeeperException ke) {
|
||||
sb.append("\nFATAL ZooKeeper Exception!\n");
|
||||
sb.append("\n").append(ke.getMessage());
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Appends replication znodes to the passed StringBuilder.
|
||||
*
|
||||
* @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
|
||||
* @param sb the {@link StringBuilder} to append to
|
||||
* @throws KeeperException if a ZooKeeper operation fails
|
||||
*/
|
||||
private static void getReplicationZnodesDump(ZKWatcher zkw, StringBuilder sb)
|
||||
throws KeeperException {
|
||||
String replicationZnode = zkw.getZNodePaths().replicationZNode;
|
||||
|
||||
if (ZKUtil.checkExists(zkw, replicationZnode) == -1) {
|
||||
return;
|
||||
}
|
||||
|
||||
// do a ls -r on this znode
|
||||
sb.append("\n").append(replicationZnode).append(": ");
|
||||
List<String> children = ZKUtil.listChildrenNoWatch(zkw, replicationZnode);
|
||||
if (children != null) {
|
||||
Collections.sort(children);
|
||||
for (String child : children) {
|
||||
String zNode = ZNodePaths.joinZNode(replicationZnode, child);
|
||||
if (zNode.equals(zkw.getZNodePaths().peersZNode)) {
|
||||
appendPeersZnodes(zkw, zNode, sb);
|
||||
} else if (zNode.equals(zkw.getZNodePaths().queuesZNode)) {
|
||||
appendRSZnodes(zkw, zNode, sb);
|
||||
} else if (zNode.equals(zkw.getZNodePaths().hfileRefsZNode)) {
|
||||
appendHFileRefsZNodes(zkw, zNode, sb);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void appendHFileRefsZNodes(ZKWatcher zkw, String hFileRefsZNode,
|
||||
StringBuilder sb) throws KeeperException {
|
||||
sb.append("\n").append(hFileRefsZNode).append(": ");
|
||||
final List<String> hFileRefChildrenNoWatchList =
|
||||
ZKUtil.listChildrenNoWatch(zkw, hFileRefsZNode);
|
||||
if (hFileRefChildrenNoWatchList != null) {
|
||||
for (String peerIdZNode : hFileRefChildrenNoWatchList) {
|
||||
String zNodeToProcess = ZNodePaths.joinZNode(hFileRefsZNode, peerIdZNode);
|
||||
sb.append("\n").append(zNodeToProcess).append(": ");
|
||||
List<String> peerHFileRefsZNodes = ZKUtil.listChildrenNoWatch(zkw, zNodeToProcess);
|
||||
if (peerHFileRefsZNodes != null) {
|
||||
sb.append(String.join(", ", peerHFileRefsZNodes));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a string with replication znodes and position of the replication log
|
||||
* @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
|
||||
* @return aq string of replication znodes and log positions
|
||||
*/
|
||||
public static String getReplicationZnodesDump(ZKWatcher zkw) throws KeeperException {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
getReplicationZnodesDump(zkw, sb);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private static void appendRSZnodes(ZKWatcher zkw, String znode, StringBuilder sb)
|
||||
throws KeeperException {
|
||||
List<String> stack = new LinkedList<>();
|
||||
stack.add(znode);
|
||||
do {
|
||||
String znodeToProcess = stack.remove(stack.size() - 1);
|
||||
sb.append("\n").append(znodeToProcess).append(": ");
|
||||
byte[] data;
|
||||
try {
|
||||
data = ZKUtil.getData(zkw, znodeToProcess);
|
||||
} catch (InterruptedException e) {
|
||||
zkw.interruptedException(e);
|
||||
return;
|
||||
}
|
||||
if (data != null && data.length > 0) { // log position
|
||||
long position = 0;
|
||||
try {
|
||||
position = ZKUtil.parseWALPositionFrom(ZKUtil.getData(zkw, znodeToProcess));
|
||||
sb.append(position);
|
||||
} catch (DeserializationException ignored) {
|
||||
} catch (InterruptedException e) {
|
||||
zkw.interruptedException(e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
for (String zNodeChild : ZKUtil.listChildrenNoWatch(zkw, znodeToProcess)) {
|
||||
stack.add(ZNodePaths.joinZNode(znodeToProcess, zNodeChild));
|
||||
}
|
||||
} while (stack.size() > 0);
|
||||
}
|
||||
|
||||
private static void appendPeersZnodes(ZKWatcher zkw, String peersZnode,
|
||||
StringBuilder sb) throws KeeperException {
|
||||
int pblen = ProtobufUtil.lengthOfPBMagic();
|
||||
sb.append("\n").append(peersZnode).append(": ");
|
||||
for (String peerIdZnode : ZKUtil.listChildrenNoWatch(zkw, peersZnode)) {
|
||||
String znodeToProcess = ZNodePaths.joinZNode(peersZnode, peerIdZnode);
|
||||
byte[] data;
|
||||
try {
|
||||
data = ZKUtil.getData(zkw, znodeToProcess);
|
||||
} catch (InterruptedException e) {
|
||||
zkw.interruptedException(e);
|
||||
return;
|
||||
}
|
||||
// parse the data of the above peer znode.
|
||||
try {
|
||||
ReplicationProtos.ReplicationPeer.Builder builder =
|
||||
ReplicationProtos.ReplicationPeer.newBuilder();
|
||||
ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
|
||||
String clusterKey = builder.getClusterkey();
|
||||
sb.append("\n").append(znodeToProcess).append(": ").append(clusterKey);
|
||||
// add the peer-state.
|
||||
appendPeerState(zkw, znodeToProcess, sb);
|
||||
} catch (IOException ipbe) {
|
||||
LOG.warn("Got Exception while parsing peer: " + znodeToProcess, ipbe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void appendPeerState(ZKWatcher zkw, String znodeToProcess, StringBuilder sb)
|
||||
throws KeeperException, InvalidProtocolBufferException {
|
||||
String peerState = zkw.getConfiguration().get("zookeeper.znode.replication.peers.state",
|
||||
"peer-state");
|
||||
int pblen = ProtobufUtil.lengthOfPBMagic();
|
||||
for (String child : ZKUtil.listChildrenNoWatch(zkw, znodeToProcess)) {
|
||||
if (!child.equals(peerState)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String peerStateZnode = ZNodePaths.joinZNode(znodeToProcess, child);
|
||||
sb.append("\n").append(peerStateZnode).append(": ");
|
||||
byte[] peerStateData;
|
||||
try {
|
||||
peerStateData = ZKUtil.getData(zkw, peerStateZnode);
|
||||
ReplicationProtos.ReplicationState.Builder builder =
|
||||
ReplicationProtos.ReplicationState.newBuilder();
|
||||
ProtobufUtil.mergeFrom(builder, peerStateData, pblen, peerStateData.length - pblen);
|
||||
sb.append(builder.getState().name());
|
||||
} catch (IOException ipbe) {
|
||||
LOG.warn("Got Exception while parsing peer: " + znodeToProcess, ipbe);
|
||||
} catch (InterruptedException e) {
|
||||
zkw.interruptedException(e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the statistics from the given server.
|
||||
*
|
||||
* @param server The server to get the statistics from.
|
||||
* @param timeout The socket timeout to use.
|
||||
* @return The array of response strings.
|
||||
* @throws IOException When the socket communication fails.
|
||||
*/
|
||||
private static String[] getServerStats(String server, int timeout)
|
||||
throws IOException {
|
||||
String[] sp = server.split(":");
|
||||
if (sp.length == 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String host = sp[0];
|
||||
int port = sp.length > 1 ? Integer.parseInt(sp[1])
|
||||
: HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT;
|
||||
|
||||
try (Socket socket = new Socket()) {
|
||||
InetSocketAddress sockAddr = new InetSocketAddress(host, port);
|
||||
if (sockAddr.isUnresolved()) {
|
||||
throw new UnknownHostException(host + " cannot be resolved");
|
||||
}
|
||||
socket.connect(sockAddr, timeout);
|
||||
socket.setSoTimeout(timeout);
|
||||
try (PrintWriter out = new PrintWriter(new BufferedWriter(
|
||||
new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8)), true);
|
||||
BufferedReader in = new BufferedReader(
|
||||
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))) {
|
||||
out.println("stat");
|
||||
out.flush();
|
||||
ArrayList<String> res = new ArrayList<>();
|
||||
while (true) {
|
||||
String line = in.readLine();
|
||||
if (line != null) {
|
||||
res.add(line);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return res.toArray(new String[res.size()]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -18,16 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -51,7 +41,6 @@ import org.apache.zookeeper.CreateMode;
|
|||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.KeeperException.NoNodeException;
|
||||
import org.apache.zookeeper.Op;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.apache.zookeeper.proto.CreateRequest;
|
||||
|
@ -59,7 +48,6 @@ import org.apache.zookeeper.proto.DeleteRequest;
|
|||
import org.apache.zookeeper.proto.SetDataRequest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
||||
|
||||
|
@ -76,56 +64,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
|||
public final class ZKUtil {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZKUtil.class);
|
||||
|
||||
private static int zkDumpConnectionTimeOut;
|
||||
|
||||
private ZKUtil() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new connection to ZooKeeper, pulling settings and ensemble config
|
||||
* from the specified configuration object using methods from {@link ZKConfig}.
|
||||
*
|
||||
* Sets the connection status monitoring watcher to the specified watcher.
|
||||
*
|
||||
* @param conf configuration to pull ensemble and other settings from
|
||||
* @param watcher watcher to monitor connection changes
|
||||
* @return connection to zookeeper
|
||||
* @throws IOException if unable to connect to zk or config problem
|
||||
*/
|
||||
public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher)
|
||||
throws IOException {
|
||||
String ensemble = ZKConfig.getZKQuorumServersString(conf);
|
||||
return connect(conf, ensemble, watcher);
|
||||
}
|
||||
|
||||
public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
|
||||
Watcher watcher)
|
||||
throws IOException {
|
||||
return connect(conf, ensemble, watcher, null);
|
||||
}
|
||||
|
||||
public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
|
||||
Watcher watcher, final String identifier)
|
||||
throws IOException {
|
||||
if(ensemble == null) {
|
||||
throw new IOException("Unable to determine ZooKeeper ensemble");
|
||||
}
|
||||
int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
|
||||
HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("{} opening connection to ZooKeeper ensemble={}", identifier, ensemble);
|
||||
}
|
||||
int retry = conf.getInt("zookeeper.recovery.retry", 3);
|
||||
int retryIntervalMillis =
|
||||
conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
|
||||
int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000);
|
||||
zkDumpConnectionTimeOut = conf.getInt("zookeeper.dump.connection.timeout",
|
||||
1000);
|
||||
int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024*1024);
|
||||
return new RecoverableZooKeeper(ensemble, timeout, watcher,
|
||||
retry, retryIntervalMillis, maxSleepTime, identifier, multiMaxSize);
|
||||
}
|
||||
|
||||
//
|
||||
// Helper methods
|
||||
//
|
||||
|
@ -1577,265 +1518,6 @@ public final class ZKUtil {
|
|||
// ZooKeeper cluster information
|
||||
//
|
||||
|
||||
/** @return String dump of everything in ZooKeeper. */
|
||||
public static String dump(ZKWatcher zkw) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
try {
|
||||
sb.append("HBase is rooted at ").append(zkw.getZNodePaths().baseZNode);
|
||||
sb.append("\nActive master address: ");
|
||||
try {
|
||||
sb.append("\n ").append(MasterAddressTracker.getMasterAddress(zkw));
|
||||
} catch (IOException e) {
|
||||
sb.append("<<FAILED LOOKUP: " + e.getMessage() + ">>");
|
||||
}
|
||||
sb.append("\nBackup master addresses:");
|
||||
final List<String> backupMasterChildrenNoWatchList = listChildrenNoWatch(zkw,
|
||||
zkw.getZNodePaths().backupMasterAddressesZNode);
|
||||
if (backupMasterChildrenNoWatchList != null) {
|
||||
for (String child : backupMasterChildrenNoWatchList) {
|
||||
sb.append("\n ").append(child);
|
||||
}
|
||||
}
|
||||
sb.append("\nRegion server holding hbase:meta:");
|
||||
sb.append("\n ").append(MetaTableLocator.getMetaRegionLocation(zkw));
|
||||
int numMetaReplicas = zkw.getMetaReplicaNodes().size();
|
||||
for (int i = 1; i < numMetaReplicas; i++) {
|
||||
sb.append("\n replica" + i + ": "
|
||||
+ MetaTableLocator.getMetaRegionLocation(zkw, i));
|
||||
}
|
||||
sb.append("\nRegion servers:");
|
||||
final List<String> rsChildrenNoWatchList =
|
||||
listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
|
||||
if (rsChildrenNoWatchList != null) {
|
||||
for (String child : rsChildrenNoWatchList) {
|
||||
sb.append("\n ").append(child);
|
||||
}
|
||||
}
|
||||
try {
|
||||
getReplicationZnodesDump(zkw, sb);
|
||||
} catch (KeeperException ke) {
|
||||
LOG.warn("Couldn't get the replication znode dump", ke);
|
||||
}
|
||||
sb.append("\nQuorum Server Statistics:");
|
||||
String[] servers = zkw.getQuorum().split(",");
|
||||
for (String server : servers) {
|
||||
sb.append("\n ").append(server);
|
||||
try {
|
||||
String[] stat = getServerStats(server, ZKUtil.zkDumpConnectionTimeOut);
|
||||
|
||||
if (stat == null) {
|
||||
sb.append("[Error] invalid quorum server: " + server);
|
||||
break;
|
||||
}
|
||||
|
||||
for (String s : stat) {
|
||||
sb.append("\n ").append(s);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
sb.append("\n ERROR: ").append(e.getMessage());
|
||||
}
|
||||
}
|
||||
} catch (KeeperException ke) {
|
||||
sb.append("\nFATAL ZooKeeper Exception!\n");
|
||||
sb.append("\n" + ke.getMessage());
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Appends replication znodes to the passed StringBuilder.
|
||||
*
|
||||
* @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
|
||||
* @param sb the {@link StringBuilder} to append to
|
||||
* @throws KeeperException if a ZooKeeper operation fails
|
||||
*/
|
||||
private static void getReplicationZnodesDump(ZKWatcher zkw, StringBuilder sb)
|
||||
throws KeeperException {
|
||||
String replicationZnode = zkw.getZNodePaths().replicationZNode;
|
||||
|
||||
if (ZKUtil.checkExists(zkw, replicationZnode) == -1) {
|
||||
return;
|
||||
}
|
||||
|
||||
// do a ls -r on this znode
|
||||
sb.append("\n").append(replicationZnode).append(": ");
|
||||
List<String> children = ZKUtil.listChildrenNoWatch(zkw, replicationZnode);
|
||||
if (children != null) {
|
||||
Collections.sort(children);
|
||||
for (String child : children) {
|
||||
String zNode = ZNodePaths.joinZNode(replicationZnode, child);
|
||||
if (zNode.equals(zkw.getZNodePaths().peersZNode)) {
|
||||
appendPeersZnodes(zkw, zNode, sb);
|
||||
} else if (zNode.equals(zkw.getZNodePaths().queuesZNode)) {
|
||||
appendRSZnodes(zkw, zNode, sb);
|
||||
} else if (zNode.equals(zkw.getZNodePaths().hfileRefsZNode)) {
|
||||
appendHFileRefsZNodes(zkw, zNode, sb);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void appendHFileRefsZNodes(ZKWatcher zkw, String hFileRefsZNode,
|
||||
StringBuilder sb) throws KeeperException {
|
||||
sb.append("\n").append(hFileRefsZNode).append(": ");
|
||||
final List<String> hFileRefChildrenNoWatchList =
|
||||
ZKUtil.listChildrenNoWatch(zkw, hFileRefsZNode);
|
||||
if (hFileRefChildrenNoWatchList != null) {
|
||||
for (String peerIdZNode : hFileRefChildrenNoWatchList) {
|
||||
String zNodeToProcess = ZNodePaths.joinZNode(hFileRefsZNode, peerIdZNode);
|
||||
sb.append("\n").append(zNodeToProcess).append(": ");
|
||||
List<String> peerHFileRefsZNodes = ZKUtil.listChildrenNoWatch(zkw, zNodeToProcess);
|
||||
if (peerHFileRefsZNodes != null) {
|
||||
sb.append(String.join(", ", peerHFileRefsZNodes));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a string with replication znodes and position of the replication log
|
||||
* @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation
|
||||
* @return aq string of replication znodes and log positions
|
||||
*/
|
||||
public static String getReplicationZnodesDump(ZKWatcher zkw) throws KeeperException {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
getReplicationZnodesDump(zkw, sb);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private static void appendRSZnodes(ZKWatcher zkw, String znode, StringBuilder sb)
|
||||
throws KeeperException {
|
||||
List<String> stack = new LinkedList<>();
|
||||
stack.add(znode);
|
||||
do {
|
||||
String znodeToProcess = stack.remove(stack.size() - 1);
|
||||
sb.append("\n").append(znodeToProcess).append(": ");
|
||||
byte[] data;
|
||||
try {
|
||||
data = ZKUtil.getData(zkw, znodeToProcess);
|
||||
} catch (InterruptedException e) {
|
||||
zkw.interruptedException(e);
|
||||
return;
|
||||
}
|
||||
if (data != null && data.length > 0) { // log position
|
||||
long position = 0;
|
||||
try {
|
||||
position = ZKUtil.parseWALPositionFrom(ZKUtil.getData(zkw, znodeToProcess));
|
||||
sb.append(position);
|
||||
} catch (DeserializationException ignored) {
|
||||
} catch (InterruptedException e) {
|
||||
zkw.interruptedException(e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
for (String zNodeChild : ZKUtil.listChildrenNoWatch(zkw, znodeToProcess)) {
|
||||
stack.add(ZNodePaths.joinZNode(znodeToProcess, zNodeChild));
|
||||
}
|
||||
} while (stack.size() > 0);
|
||||
}
|
||||
|
||||
private static void appendPeersZnodes(ZKWatcher zkw, String peersZnode,
|
||||
StringBuilder sb) throws KeeperException {
|
||||
int pblen = ProtobufUtil.lengthOfPBMagic();
|
||||
sb.append("\n").append(peersZnode).append(": ");
|
||||
for (String peerIdZnode : ZKUtil.listChildrenNoWatch(zkw, peersZnode)) {
|
||||
String znodeToProcess = ZNodePaths.joinZNode(peersZnode, peerIdZnode);
|
||||
byte[] data;
|
||||
try {
|
||||
data = ZKUtil.getData(zkw, znodeToProcess);
|
||||
} catch (InterruptedException e) {
|
||||
zkw.interruptedException(e);
|
||||
return;
|
||||
}
|
||||
// parse the data of the above peer znode.
|
||||
try {
|
||||
ReplicationProtos.ReplicationPeer.Builder builder =
|
||||
ReplicationProtos.ReplicationPeer.newBuilder();
|
||||
ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
|
||||
String clusterKey = builder.getClusterkey();
|
||||
sb.append("\n").append(znodeToProcess).append(": ").append(clusterKey);
|
||||
// add the peer-state.
|
||||
appendPeerState(zkw, znodeToProcess, sb);
|
||||
} catch (IOException ipbe) {
|
||||
LOG.warn("Got Exception while parsing peer: " + znodeToProcess, ipbe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void appendPeerState(ZKWatcher zkw, String znodeToProcess, StringBuilder sb)
|
||||
throws KeeperException, InvalidProtocolBufferException {
|
||||
String peerState = zkw.getConfiguration().get("zookeeper.znode.replication.peers.state",
|
||||
"peer-state");
|
||||
int pblen = ProtobufUtil.lengthOfPBMagic();
|
||||
for (String child : ZKUtil.listChildrenNoWatch(zkw, znodeToProcess)) {
|
||||
if (!child.equals(peerState)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String peerStateZnode = ZNodePaths.joinZNode(znodeToProcess, child);
|
||||
sb.append("\n").append(peerStateZnode).append(": ");
|
||||
byte[] peerStateData;
|
||||
try {
|
||||
peerStateData = ZKUtil.getData(zkw, peerStateZnode);
|
||||
ReplicationProtos.ReplicationState.Builder builder =
|
||||
ReplicationProtos.ReplicationState.newBuilder();
|
||||
ProtobufUtil.mergeFrom(builder, peerStateData, pblen, peerStateData.length - pblen);
|
||||
sb.append(builder.getState().name());
|
||||
} catch (IOException ipbe) {
|
||||
LOG.warn("Got Exception while parsing peer: " + znodeToProcess, ipbe);
|
||||
} catch (InterruptedException e) {
|
||||
zkw.interruptedException(e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the statistics from the given server.
|
||||
*
|
||||
* @param server The server to get the statistics from.
|
||||
* @param timeout The socket timeout to use.
|
||||
* @return The array of response strings.
|
||||
* @throws IOException When the socket communication fails.
|
||||
*/
|
||||
private static String[] getServerStats(String server, int timeout)
|
||||
throws IOException {
|
||||
String[] sp = server.split(":");
|
||||
if (sp.length == 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String host = sp[0];
|
||||
int port = sp.length > 1 ? Integer.parseInt(sp[1])
|
||||
: HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT;
|
||||
|
||||
try (Socket socket = new Socket()) {
|
||||
InetSocketAddress sockAddr = new InetSocketAddress(host, port);
|
||||
if (sockAddr.isUnresolved()) {
|
||||
throw new UnknownHostException(host + " cannot be resolved");
|
||||
}
|
||||
socket.connect(sockAddr, timeout);
|
||||
socket.setSoTimeout(timeout);
|
||||
try (PrintWriter out = new PrintWriter(new BufferedWriter(
|
||||
new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8)), true);
|
||||
BufferedReader in = new BufferedReader(
|
||||
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))) {
|
||||
out.println("stat");
|
||||
out.flush();
|
||||
ArrayList<String> res = new ArrayList<>();
|
||||
while (true) {
|
||||
String line = in.readLine();
|
||||
if (line != null) {
|
||||
res.add(line);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return res.toArray(new String[res.size()]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void logRetrievedMsg(final ZKWatcher zkw,
|
||||
final String znode, final byte [] data, final boolean watcherSet) {
|
||||
if (!LOG.isTraceEnabled()) {
|
||||
|
|
|
@ -177,7 +177,8 @@ public class ZKWatcher implements Watcher, Abortable, Closeable {
|
|||
this.abortable = abortable;
|
||||
this.znodePaths = new ZNodePaths(conf);
|
||||
PendingWatcher pendingWatcher = new PendingWatcher();
|
||||
this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier);
|
||||
this.recoverableZooKeeper =
|
||||
RecoverableZooKeeper.connect(conf, quorum, pendingWatcher, identifier);
|
||||
pendingWatcher.prepare(this);
|
||||
if (canCreateBaseZNode) {
|
||||
try {
|
||||
|
|
|
@ -77,7 +77,7 @@ public class TestRecoverableZooKeeper {
|
|||
ZKWatcher zkw = new ZKWatcher(conf, "testSetDataVersionMismatchInLoop",
|
||||
abortable, true);
|
||||
String ensemble = ZKConfig.getZKQuorumServersString(conf);
|
||||
RecoverableZooKeeper rzk = ZKUtil.connect(conf, ensemble, zkw);
|
||||
RecoverableZooKeeper rzk = RecoverableZooKeeper.connect(conf, ensemble, zkw);
|
||||
rzk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
||||
rzk.setData(znode, Bytes.toBytes("OPENING"), 0);
|
||||
Field zkField = RecoverableZooKeeper.class.getDeclaredField("zk");
|
||||
|
|
Loading…
Reference in New Issue