diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java index f752743545c..2ce101daab1 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestZKAndFSPermissions.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.hadoop.util.ToolRunner; @@ -142,7 +141,7 @@ public class IntegrationTestZKAndFSPermissions extends AbstractHBaseTool { private void testZNodeACLs() throws IOException, KeeperException, InterruptedException { ZKWatcher watcher = new ZKWatcher(conf, "IntegrationTestZnodeACLs", null); - RecoverableZooKeeper zk = ZKUtil.connect(this.conf, watcher); + RecoverableZooKeeper zk = RecoverableZooKeeper.connect(this.conf, watcher); String baseZNode = watcher.getZNodePaths().baseZNode; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java index 11b0c7ca806..dc4ee347e20 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.zookeeper.ZKDump; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.util.Tool; @@ -238,7 +239,7 @@ public class DumpReplicationQueues extends Configured implements Tool { } else { // use ZK instead System.out.print("Dumping replication znodes via ZooKeeper:"); - System.out.println(ZKUtil.getReplicationZnodesDump(zkw)); + System.out.println(ZKDump.getReplicationZnodesDump(zkw)); } return (0); } catch (IOException e) { diff --git a/hbase-server/src/main/resources/hbase-webapps/master/zk.jsp b/hbase-server/src/main/resources/hbase-webapps/master/zk.jsp index 742dd514392..c6ae44fc86d 100644 --- a/hbase-server/src/main/resources/hbase-webapps/master/zk.jsp +++ b/hbase-server/src/main/resources/hbase-webapps/master/zk.jsp @@ -19,9 +19,9 @@ --%> <%@ page contentType="text/html;charset=UTF-8" import="org.apache.commons.lang3.StringEscapeUtils" - import="org.apache.hadoop.hbase.zookeeper.ZKUtil" - import="org.apache.hadoop.hbase.zookeeper.ZKWatcher" import="org.apache.hadoop.hbase.master.HMaster" + import="org.apache.hadoop.hbase.zookeeper.ZKDump" + import="org.apache.hadoop.hbase.zookeeper.ZKWatcher" %> <% HMaster master = (HMaster)getServletContext().getAttribute(HMaster.MASTER); @@ -38,7 +38,7 @@
-
<%= StringEscapeUtils.escapeHtml4(ZKUtil.dump(watcher).trim()) %>
+
<%= StringEscapeUtils.escapeHtml4(ZKDump.dump(watcher).trim()) %>
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplicationWithBulkLoadedData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplicationWithBulkLoadedData.java index db4286125b9..ec9c40d1817 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplicationWithBulkLoadedData.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplicationWithBulkLoadedData.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.junit.After; @@ -282,7 +281,7 @@ public final class TestNamespaceReplicationWithBulkLoadedData extends TestBulkLo // Verify hfile-refs for 1:ns_peer1, expect is empty MiniZooKeeperCluster zkCluster = UTIL1.getZkCluster(); ZKWatcher watcher = new ZKWatcher(UTIL1.getConfiguration(), "TestZnodeHFiles-refs", null); - RecoverableZooKeeper zk = ZKUtil.connect(UTIL1.getConfiguration(), watcher); + RecoverableZooKeeper zk = RecoverableZooKeeper.connect(UTIL1.getConfiguration(), watcher); ZKReplicationQueueStorage replicationQueueStorage = new ZKReplicationQueueStorage(watcher, UTIL1.getConfiguration()); Set hfiles = replicationQueueStorage.getAllHFileRefs(); diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 8f3a264bca6..62fc7fed7d5 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -477,7 +477,7 @@ module Hbase ) zk = @zk_wrapper.getRecoverableZooKeeper.getZooKeeper @zk_main = org.apache.zookeeper.ZooKeeperMain.new(zk) - org.apache.hadoop.hbase.zookeeper.ZKUtil.dump(@zk_wrapper) + org.apache.hadoop.hbase.zookeeper.ZKDump.dump(@zk_wrapper) end #---------------------------------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java index e1e9482511e..57f5f3ec489 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java @@ -58,7 +58,7 @@ public final class MetaTableLocator { * @return server name or null if we failed to get the data. */ @RestrictedApi(explanation = "Should only be called in tests or ZKUtil", link = "", - allowedOnPath = ".*/src/test/.*|.*/ZKUtil\\.java") + allowedOnPath = ".*/src/test/.*|.*/ZKDump\\.java") public static ServerName getMetaRegionLocation(final ZKWatcher zkw) { try { RegionState state = getMetaRegionState(zkw); @@ -75,7 +75,7 @@ public final class MetaTableLocator { * @return server name */ @RestrictedApi(explanation = "Should only be called in self or ZKUtil", link = "", - allowedOnPath = ".*(MetaTableLocator|ZKUtil)\\.java") + allowedOnPath = ".*(MetaTableLocator|ZKDump)\\.java") public static ServerName getMetaRegionLocation(final ZKWatcher zkw, int replicaId) { try { RegionState state = getMetaRegionState(zkw, replicaId); diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index df2fede1b56..66ef868fdfd 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -25,6 +25,8 @@ import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.RetryCounter; @@ -83,6 +85,57 @@ public class RecoverableZooKeeper { private final String quorumServers; private final int maxMultiSize; + /** + * See {@link #connect(Configuration, String, Watcher, String)} + */ + public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher) + throws IOException { + String ensemble = ZKConfig.getZKQuorumServersString(conf); + return connect(conf, ensemble, watcher); + } + + /** + * See {@link #connect(Configuration, String, Watcher, String)} + */ + public static RecoverableZooKeeper connect(Configuration conf, String ensemble, + Watcher watcher) + throws IOException { + return connect(conf, ensemble, watcher, null); + } + + /** + * Creates a new connection to ZooKeeper, pulling settings and ensemble config + * from the specified configuration object using methods from {@link ZKConfig}. + * + * Sets the connection status monitoring watcher to the specified watcher. + * + * @param conf configuration to pull ensemble and other settings from + * @param watcher watcher to monitor connection changes + * @param ensemble ZooKeeper servers quorum string + * @param identifier value used to identify this client instance. + * @return connection to zookeeper + * @throws IOException if unable to connect to zk or config problem + */ + public static RecoverableZooKeeper connect(Configuration conf, String ensemble, + Watcher watcher, final String identifier) + throws IOException { + if(ensemble == null) { + throw new IOException("Unable to determine ZooKeeper ensemble"); + } + int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, + HConstants.DEFAULT_ZK_SESSION_TIMEOUT); + if (LOG.isTraceEnabled()) { + LOG.trace("{} opening connection to ZooKeeper ensemble={}", identifier, ensemble); + } + int retry = conf.getInt("zookeeper.recovery.retry", 3); + int retryIntervalMillis = + conf.getInt("zookeeper.recovery.retry.intervalmill", 1000); + int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000); + int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024*1024); + return new RecoverableZooKeeper(ensemble, timeout, watcher, + retry, retryIntervalMillis, maxSleepTime, identifier, multiMaxSize); + } + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE", justification="None. Its always been this way.") public RecoverableZooKeeper(String quorumServers, int sessionTimeout, diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKDump.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKDump.java new file mode 100644 index 00000000000..4d93d143a90 --- /dev/null +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKDump.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; + +/** + * Builds a string containing everything in ZooKeeper. This is inherently invasive into the + * structures of other components' logical responsibilities. + */ +@InterfaceAudience.Private +public final class ZKDump { + private static final Logger LOG = LoggerFactory.getLogger(ZKDump.class); + + private ZKDump() {} + + public static String dump(final ZKWatcher zkWatcher) { + final int zkDumpConnectionTimeOut = zkWatcher.getConfiguration() + .getInt("zookeeper.dump.connection.timeout", 1000); + StringBuilder sb = new StringBuilder(); + try { + sb.append("HBase is rooted at ").append(zkWatcher.getZNodePaths().baseZNode); + sb.append("\nActive master address: "); + try { + sb.append("\n ").append(MasterAddressTracker.getMasterAddress(zkWatcher)); + } catch (IOException e) { + sb.append("<>"); + } + sb.append("\nBackup master addresses:"); + final List backupMasterChildrenNoWatchList = ZKUtil.listChildrenNoWatch(zkWatcher, + zkWatcher.getZNodePaths().backupMasterAddressesZNode); + if (backupMasterChildrenNoWatchList != null) { + for (String child : backupMasterChildrenNoWatchList) { + sb.append("\n ").append(child); + } + } + sb.append("\nRegion server holding hbase:meta:"); + sb.append("\n ").append(MetaTableLocator.getMetaRegionLocation(zkWatcher)); + int numMetaReplicas = zkWatcher.getMetaReplicaNodes().size(); + for (int i = 1; i < numMetaReplicas; i++) { + sb.append("\n") + .append(" replica").append(i).append(": ") + .append(MetaTableLocator.getMetaRegionLocation(zkWatcher, i)); + } + sb.append("\nRegion servers:"); + final List rsChildrenNoWatchList = + ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.getZNodePaths().rsZNode); + if (rsChildrenNoWatchList != null) { + for (String child : rsChildrenNoWatchList) { + sb.append("\n ").append(child); + } + } + try { + getReplicationZnodesDump(zkWatcher, sb); + } catch (KeeperException ke) { + LOG.warn("Couldn't get the replication znode dump", ke); + } + sb.append("\nQuorum Server Statistics:"); + String[] servers = zkWatcher.getQuorum().split(","); + for (String server : servers) { + sb.append("\n ").append(server); + try { + String[] stat = getServerStats(server, zkDumpConnectionTimeOut); + + if (stat == null) { + sb.append("[Error] invalid quorum server: ").append(server); + break; + } + + for (String s : stat) { + sb.append("\n ").append(s); + } + } catch (Exception e) { + sb.append("\n ERROR: ").append(e.getMessage()); + } + } + } catch (KeeperException ke) { + sb.append("\nFATAL ZooKeeper Exception!\n"); + sb.append("\n").append(ke.getMessage()); + } + return sb.toString(); + } + + /** + * Appends replication znodes to the passed StringBuilder. + * + * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation + * @param sb the {@link StringBuilder} to append to + * @throws KeeperException if a ZooKeeper operation fails + */ + private static void getReplicationZnodesDump(ZKWatcher zkw, StringBuilder sb) + throws KeeperException { + String replicationZnode = zkw.getZNodePaths().replicationZNode; + + if (ZKUtil.checkExists(zkw, replicationZnode) == -1) { + return; + } + + // do a ls -r on this znode + sb.append("\n").append(replicationZnode).append(": "); + List children = ZKUtil.listChildrenNoWatch(zkw, replicationZnode); + if (children != null) { + Collections.sort(children); + for (String child : children) { + String zNode = ZNodePaths.joinZNode(replicationZnode, child); + if (zNode.equals(zkw.getZNodePaths().peersZNode)) { + appendPeersZnodes(zkw, zNode, sb); + } else if (zNode.equals(zkw.getZNodePaths().queuesZNode)) { + appendRSZnodes(zkw, zNode, sb); + } else if (zNode.equals(zkw.getZNodePaths().hfileRefsZNode)) { + appendHFileRefsZNodes(zkw, zNode, sb); + } + } + } + } + + private static void appendHFileRefsZNodes(ZKWatcher zkw, String hFileRefsZNode, + StringBuilder sb) throws KeeperException { + sb.append("\n").append(hFileRefsZNode).append(": "); + final List hFileRefChildrenNoWatchList = + ZKUtil.listChildrenNoWatch(zkw, hFileRefsZNode); + if (hFileRefChildrenNoWatchList != null) { + for (String peerIdZNode : hFileRefChildrenNoWatchList) { + String zNodeToProcess = ZNodePaths.joinZNode(hFileRefsZNode, peerIdZNode); + sb.append("\n").append(zNodeToProcess).append(": "); + List peerHFileRefsZNodes = ZKUtil.listChildrenNoWatch(zkw, zNodeToProcess); + if (peerHFileRefsZNodes != null) { + sb.append(String.join(", ", peerHFileRefsZNodes)); + } + } + } + } + + /** + * Returns a string with replication znodes and position of the replication log + * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation + * @return aq string of replication znodes and log positions + */ + public static String getReplicationZnodesDump(ZKWatcher zkw) throws KeeperException { + StringBuilder sb = new StringBuilder(); + getReplicationZnodesDump(zkw, sb); + return sb.toString(); + } + + private static void appendRSZnodes(ZKWatcher zkw, String znode, StringBuilder sb) + throws KeeperException { + List stack = new LinkedList<>(); + stack.add(znode); + do { + String znodeToProcess = stack.remove(stack.size() - 1); + sb.append("\n").append(znodeToProcess).append(": "); + byte[] data; + try { + data = ZKUtil.getData(zkw, znodeToProcess); + } catch (InterruptedException e) { + zkw.interruptedException(e); + return; + } + if (data != null && data.length > 0) { // log position + long position = 0; + try { + position = ZKUtil.parseWALPositionFrom(ZKUtil.getData(zkw, znodeToProcess)); + sb.append(position); + } catch (DeserializationException ignored) { + } catch (InterruptedException e) { + zkw.interruptedException(e); + return; + } + } + for (String zNodeChild : ZKUtil.listChildrenNoWatch(zkw, znodeToProcess)) { + stack.add(ZNodePaths.joinZNode(znodeToProcess, zNodeChild)); + } + } while (stack.size() > 0); + } + + private static void appendPeersZnodes(ZKWatcher zkw, String peersZnode, + StringBuilder sb) throws KeeperException { + int pblen = ProtobufUtil.lengthOfPBMagic(); + sb.append("\n").append(peersZnode).append(": "); + for (String peerIdZnode : ZKUtil.listChildrenNoWatch(zkw, peersZnode)) { + String znodeToProcess = ZNodePaths.joinZNode(peersZnode, peerIdZnode); + byte[] data; + try { + data = ZKUtil.getData(zkw, znodeToProcess); + } catch (InterruptedException e) { + zkw.interruptedException(e); + return; + } + // parse the data of the above peer znode. + try { + ReplicationProtos.ReplicationPeer.Builder builder = + ReplicationProtos.ReplicationPeer.newBuilder(); + ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen); + String clusterKey = builder.getClusterkey(); + sb.append("\n").append(znodeToProcess).append(": ").append(clusterKey); + // add the peer-state. + appendPeerState(zkw, znodeToProcess, sb); + } catch (IOException ipbe) { + LOG.warn("Got Exception while parsing peer: " + znodeToProcess, ipbe); + } + } + } + + private static void appendPeerState(ZKWatcher zkw, String znodeToProcess, StringBuilder sb) + throws KeeperException, InvalidProtocolBufferException { + String peerState = zkw.getConfiguration().get("zookeeper.znode.replication.peers.state", + "peer-state"); + int pblen = ProtobufUtil.lengthOfPBMagic(); + for (String child : ZKUtil.listChildrenNoWatch(zkw, znodeToProcess)) { + if (!child.equals(peerState)) { + continue; + } + + String peerStateZnode = ZNodePaths.joinZNode(znodeToProcess, child); + sb.append("\n").append(peerStateZnode).append(": "); + byte[] peerStateData; + try { + peerStateData = ZKUtil.getData(zkw, peerStateZnode); + ReplicationProtos.ReplicationState.Builder builder = + ReplicationProtos.ReplicationState.newBuilder(); + ProtobufUtil.mergeFrom(builder, peerStateData, pblen, peerStateData.length - pblen); + sb.append(builder.getState().name()); + } catch (IOException ipbe) { + LOG.warn("Got Exception while parsing peer: " + znodeToProcess, ipbe); + } catch (InterruptedException e) { + zkw.interruptedException(e); + return; + } + } + } + + /** + * Gets the statistics from the given server. + * + * @param server The server to get the statistics from. + * @param timeout The socket timeout to use. + * @return The array of response strings. + * @throws IOException When the socket communication fails. + */ + private static String[] getServerStats(String server, int timeout) + throws IOException { + String[] sp = server.split(":"); + if (sp.length == 0) { + return null; + } + + String host = sp[0]; + int port = sp.length > 1 ? Integer.parseInt(sp[1]) + : HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT; + + try (Socket socket = new Socket()) { + InetSocketAddress sockAddr = new InetSocketAddress(host, port); + if (sockAddr.isUnresolved()) { + throw new UnknownHostException(host + " cannot be resolved"); + } + socket.connect(sockAddr, timeout); + socket.setSoTimeout(timeout); + try (PrintWriter out = new PrintWriter(new BufferedWriter( + new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8)), true); + BufferedReader in = new BufferedReader( + new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))) { + out.println("stat"); + out.flush(); + ArrayList res = new ArrayList<>(); + while (true) { + String line = in.readLine(); + if (line != null) { + res.add(line); + } else { + break; + } + } + return res.toArray(new String[res.size()]); + } + } + } +} diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index d0569dd9f1c..acce316bae3 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,16 +17,7 @@ */ package org.apache.hadoop.hbase.zookeeper; -import java.io.BufferedReader; -import java.io.BufferedWriter; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.UnknownHostException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -51,7 +41,6 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.Op; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.proto.CreateRequest; @@ -59,7 +48,6 @@ import org.apache.zookeeper.proto.DeleteRequest; import org.apache.zookeeper.proto.SetDataRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; @@ -76,56 +64,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; public final class ZKUtil { private static final Logger LOG = LoggerFactory.getLogger(ZKUtil.class); - private static int zkDumpConnectionTimeOut; - private ZKUtil() { } - /** - * Creates a new connection to ZooKeeper, pulling settings and ensemble config - * from the specified configuration object using methods from {@link ZKConfig}. - * - * Sets the connection status monitoring watcher to the specified watcher. - * - * @param conf configuration to pull ensemble and other settings from - * @param watcher watcher to monitor connection changes - * @return connection to zookeeper - * @throws IOException if unable to connect to zk or config problem - */ - public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher) - throws IOException { - String ensemble = ZKConfig.getZKQuorumServersString(conf); - return connect(conf, ensemble, watcher); - } - - public static RecoverableZooKeeper connect(Configuration conf, String ensemble, - Watcher watcher) - throws IOException { - return connect(conf, ensemble, watcher, null); - } - - public static RecoverableZooKeeper connect(Configuration conf, String ensemble, - Watcher watcher, final String identifier) - throws IOException { - if(ensemble == null) { - throw new IOException("Unable to determine ZooKeeper ensemble"); - } - int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, - HConstants.DEFAULT_ZK_SESSION_TIMEOUT); - if (LOG.isTraceEnabled()) { - LOG.trace("{} opening connection to ZooKeeper ensemble={}", identifier, ensemble); - } - int retry = conf.getInt("zookeeper.recovery.retry", 3); - int retryIntervalMillis = - conf.getInt("zookeeper.recovery.retry.intervalmill", 1000); - int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000); - zkDumpConnectionTimeOut = conf.getInt("zookeeper.dump.connection.timeout", - 1000); - int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024*1024); - return new RecoverableZooKeeper(ensemble, timeout, watcher, - retry, retryIntervalMillis, maxSleepTime, identifier, multiMaxSize); - } - // // Helper methods // @@ -1577,265 +1518,6 @@ public final class ZKUtil { // ZooKeeper cluster information // - /** @return String dump of everything in ZooKeeper. */ - public static String dump(ZKWatcher zkw) { - StringBuilder sb = new StringBuilder(); - try { - sb.append("HBase is rooted at ").append(zkw.getZNodePaths().baseZNode); - sb.append("\nActive master address: "); - try { - sb.append("\n ").append(MasterAddressTracker.getMasterAddress(zkw)); - } catch (IOException e) { - sb.append("<>"); - } - sb.append("\nBackup master addresses:"); - final List backupMasterChildrenNoWatchList = listChildrenNoWatch(zkw, - zkw.getZNodePaths().backupMasterAddressesZNode); - if (backupMasterChildrenNoWatchList != null) { - for (String child : backupMasterChildrenNoWatchList) { - sb.append("\n ").append(child); - } - } - sb.append("\nRegion server holding hbase:meta:"); - sb.append("\n ").append(MetaTableLocator.getMetaRegionLocation(zkw)); - int numMetaReplicas = zkw.getMetaReplicaNodes().size(); - for (int i = 1; i < numMetaReplicas; i++) { - sb.append("\n replica" + i + ": " - + MetaTableLocator.getMetaRegionLocation(zkw, i)); - } - sb.append("\nRegion servers:"); - final List rsChildrenNoWatchList = - listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode); - if (rsChildrenNoWatchList != null) { - for (String child : rsChildrenNoWatchList) { - sb.append("\n ").append(child); - } - } - try { - getReplicationZnodesDump(zkw, sb); - } catch (KeeperException ke) { - LOG.warn("Couldn't get the replication znode dump", ke); - } - sb.append("\nQuorum Server Statistics:"); - String[] servers = zkw.getQuorum().split(","); - for (String server : servers) { - sb.append("\n ").append(server); - try { - String[] stat = getServerStats(server, ZKUtil.zkDumpConnectionTimeOut); - - if (stat == null) { - sb.append("[Error] invalid quorum server: " + server); - break; - } - - for (String s : stat) { - sb.append("\n ").append(s); - } - } catch (Exception e) { - sb.append("\n ERROR: ").append(e.getMessage()); - } - } - } catch (KeeperException ke) { - sb.append("\nFATAL ZooKeeper Exception!\n"); - sb.append("\n" + ke.getMessage()); - } - return sb.toString(); - } - - /** - * Appends replication znodes to the passed StringBuilder. - * - * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation - * @param sb the {@link StringBuilder} to append to - * @throws KeeperException if a ZooKeeper operation fails - */ - private static void getReplicationZnodesDump(ZKWatcher zkw, StringBuilder sb) - throws KeeperException { - String replicationZnode = zkw.getZNodePaths().replicationZNode; - - if (ZKUtil.checkExists(zkw, replicationZnode) == -1) { - return; - } - - // do a ls -r on this znode - sb.append("\n").append(replicationZnode).append(": "); - List children = ZKUtil.listChildrenNoWatch(zkw, replicationZnode); - if (children != null) { - Collections.sort(children); - for (String child : children) { - String zNode = ZNodePaths.joinZNode(replicationZnode, child); - if (zNode.equals(zkw.getZNodePaths().peersZNode)) { - appendPeersZnodes(zkw, zNode, sb); - } else if (zNode.equals(zkw.getZNodePaths().queuesZNode)) { - appendRSZnodes(zkw, zNode, sb); - } else if (zNode.equals(zkw.getZNodePaths().hfileRefsZNode)) { - appendHFileRefsZNodes(zkw, zNode, sb); - } - } - } - } - - private static void appendHFileRefsZNodes(ZKWatcher zkw, String hFileRefsZNode, - StringBuilder sb) throws KeeperException { - sb.append("\n").append(hFileRefsZNode).append(": "); - final List hFileRefChildrenNoWatchList = - ZKUtil.listChildrenNoWatch(zkw, hFileRefsZNode); - if (hFileRefChildrenNoWatchList != null) { - for (String peerIdZNode : hFileRefChildrenNoWatchList) { - String zNodeToProcess = ZNodePaths.joinZNode(hFileRefsZNode, peerIdZNode); - sb.append("\n").append(zNodeToProcess).append(": "); - List peerHFileRefsZNodes = ZKUtil.listChildrenNoWatch(zkw, zNodeToProcess); - if (peerHFileRefsZNodes != null) { - sb.append(String.join(", ", peerHFileRefsZNodes)); - } - } - } - } - - /** - * Returns a string with replication znodes and position of the replication log - * @param zkw reference to the {@link ZKWatcher} which also contains configuration and operation - * @return aq string of replication znodes and log positions - */ - public static String getReplicationZnodesDump(ZKWatcher zkw) throws KeeperException { - StringBuilder sb = new StringBuilder(); - getReplicationZnodesDump(zkw, sb); - return sb.toString(); - } - - private static void appendRSZnodes(ZKWatcher zkw, String znode, StringBuilder sb) - throws KeeperException { - List stack = new LinkedList<>(); - stack.add(znode); - do { - String znodeToProcess = stack.remove(stack.size() - 1); - sb.append("\n").append(znodeToProcess).append(": "); - byte[] data; - try { - data = ZKUtil.getData(zkw, znodeToProcess); - } catch (InterruptedException e) { - zkw.interruptedException(e); - return; - } - if (data != null && data.length > 0) { // log position - long position = 0; - try { - position = ZKUtil.parseWALPositionFrom(ZKUtil.getData(zkw, znodeToProcess)); - sb.append(position); - } catch (DeserializationException ignored) { - } catch (InterruptedException e) { - zkw.interruptedException(e); - return; - } - } - for (String zNodeChild : ZKUtil.listChildrenNoWatch(zkw, znodeToProcess)) { - stack.add(ZNodePaths.joinZNode(znodeToProcess, zNodeChild)); - } - } while (stack.size() > 0); - } - - private static void appendPeersZnodes(ZKWatcher zkw, String peersZnode, - StringBuilder sb) throws KeeperException { - int pblen = ProtobufUtil.lengthOfPBMagic(); - sb.append("\n").append(peersZnode).append(": "); - for (String peerIdZnode : ZKUtil.listChildrenNoWatch(zkw, peersZnode)) { - String znodeToProcess = ZNodePaths.joinZNode(peersZnode, peerIdZnode); - byte[] data; - try { - data = ZKUtil.getData(zkw, znodeToProcess); - } catch (InterruptedException e) { - zkw.interruptedException(e); - return; - } - // parse the data of the above peer znode. - try { - ReplicationProtos.ReplicationPeer.Builder builder = - ReplicationProtos.ReplicationPeer.newBuilder(); - ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen); - String clusterKey = builder.getClusterkey(); - sb.append("\n").append(znodeToProcess).append(": ").append(clusterKey); - // add the peer-state. - appendPeerState(zkw, znodeToProcess, sb); - } catch (IOException ipbe) { - LOG.warn("Got Exception while parsing peer: " + znodeToProcess, ipbe); - } - } - } - - private static void appendPeerState(ZKWatcher zkw, String znodeToProcess, StringBuilder sb) - throws KeeperException, InvalidProtocolBufferException { - String peerState = zkw.getConfiguration().get("zookeeper.znode.replication.peers.state", - "peer-state"); - int pblen = ProtobufUtil.lengthOfPBMagic(); - for (String child : ZKUtil.listChildrenNoWatch(zkw, znodeToProcess)) { - if (!child.equals(peerState)) { - continue; - } - - String peerStateZnode = ZNodePaths.joinZNode(znodeToProcess, child); - sb.append("\n").append(peerStateZnode).append(": "); - byte[] peerStateData; - try { - peerStateData = ZKUtil.getData(zkw, peerStateZnode); - ReplicationProtos.ReplicationState.Builder builder = - ReplicationProtos.ReplicationState.newBuilder(); - ProtobufUtil.mergeFrom(builder, peerStateData, pblen, peerStateData.length - pblen); - sb.append(builder.getState().name()); - } catch (IOException ipbe) { - LOG.warn("Got Exception while parsing peer: " + znodeToProcess, ipbe); - } catch (InterruptedException e) { - zkw.interruptedException(e); - return; - } - } - } - - /** - * Gets the statistics from the given server. - * - * @param server The server to get the statistics from. - * @param timeout The socket timeout to use. - * @return The array of response strings. - * @throws IOException When the socket communication fails. - */ - private static String[] getServerStats(String server, int timeout) - throws IOException { - String[] sp = server.split(":"); - if (sp.length == 0) { - return null; - } - - String host = sp[0]; - int port = sp.length > 1 ? Integer.parseInt(sp[1]) - : HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT; - - try (Socket socket = new Socket()) { - InetSocketAddress sockAddr = new InetSocketAddress(host, port); - if (sockAddr.isUnresolved()) { - throw new UnknownHostException(host + " cannot be resolved"); - } - socket.connect(sockAddr, timeout); - socket.setSoTimeout(timeout); - try (PrintWriter out = new PrintWriter(new BufferedWriter( - new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8)), true); - BufferedReader in = new BufferedReader( - new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))) { - out.println("stat"); - out.flush(); - ArrayList res = new ArrayList<>(); - while (true) { - String line = in.readLine(); - if (line != null) { - res.add(line); - } else { - break; - } - } - return res.toArray(new String[res.size()]); - } - } - } - private static void logRetrievedMsg(final ZKWatcher zkw, final String znode, final byte [] data, final boolean watcherSet) { if (!LOG.isTraceEnabled()) { diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java index ed43fa54378..fb086c19380 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java @@ -177,7 +177,8 @@ public class ZKWatcher implements Watcher, Abortable, Closeable { this.abortable = abortable; this.znodePaths = new ZNodePaths(conf); PendingWatcher pendingWatcher = new PendingWatcher(); - this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier); + this.recoverableZooKeeper = + RecoverableZooKeeper.connect(conf, quorum, pendingWatcher, identifier); pendingWatcher.prepare(this); if (canCreateBaseZNode) { try { diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java index 7e0a4132954..700781c849f 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java @@ -77,7 +77,7 @@ public class TestRecoverableZooKeeper { ZKWatcher zkw = new ZKWatcher(conf, "testSetDataVersionMismatchInLoop", abortable, true); String ensemble = ZKConfig.getZKQuorumServersString(conf); - RecoverableZooKeeper rzk = ZKUtil.connect(conf, ensemble, zkw); + RecoverableZooKeeper rzk = RecoverableZooKeeper.connect(conf, ensemble, zkw); rzk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); rzk.setData(znode, Bytes.toBytes("OPENING"), 0); Field zkField = RecoverableZooKeeper.class.getDeclaredField("zk");