HBASE-16947 Some improvements for DumpReplicationQueues tool

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Guanghao Zhang 2016-10-27 09:00:37 +08:00 committed by Michael Stack
parent d0e61b0e9a
commit 7b74dd0374
3 changed files with 107 additions and 31 deletions

View File

@ -51,7 +51,9 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
@Override
public void init() throws ReplicationException {
try {
ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
if (ZKUtil.checkExists(this.zookeeper, this.queuesZNode) < 0) {
ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
}
} catch (KeeperException e) {
throw new ReplicationException("Internal error while initializing a queues client", e);
}

View File

@ -82,14 +82,18 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
public void init(String serverName) throws ReplicationException {
this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
try {
ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
if (ZKUtil.checkExists(this.zookeeper, this.myQueuesZnode) < 0) {
ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
}
} catch (KeeperException e) {
throw new ReplicationException("Could not initialize replication queues.", e);
}
if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
try {
ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
if (ZKUtil.checkExists(this.zookeeper, this.hfileRefsZNode) < 0) {
ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
}
} catch (KeeperException e) {
throw new ReplicationException("Could not initialize hfile references replication queue.",
e);

View File

@ -40,6 +40,9 @@ import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.KeeperException;
import org.mortbay.util.IO;
import com.google.common.util.concurrent.AtomicLongMap;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.*;
@ -55,6 +58,20 @@ public class DumpReplicationQueues extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(DumpReplicationQueues.class.getName());
private List<String> deadRegionServers;
private List<String> deletedQueues;
private AtomicLongMap<String> peersQueueSize;
private long totalSizeOfWALs;
private long numWalsNotFound;
public DumpReplicationQueues() {
deadRegionServers = new ArrayList<String>();
deletedQueues = new ArrayList<String>();
peersQueueSize = AtomicLongMap.create();
totalSizeOfWALs = 0;
numWalsNotFound = 0;
}
static class DumpOptions {
boolean hdfs = false;
boolean distributed = false;
@ -155,14 +172,16 @@ public class DumpReplicationQueues extends Configured implements Tool {
if (message != null && message.length() > 0) {
System.err.println(message);
}
System.err.println("Usage: java " + className + " \\");
System.err.println("Usage: bin/hbase " + className + " \\");
System.err.println(" <OPTIONS> [-D<property=value>]*");
System.err.println();
System.err.println("General Options:");
System.err.println(" distributed Poll each RS and print its own replication queue. "
System.err.println(" -h|--h|--help Show this help and exit.");
System.err.println(" --distributed Poll each RS and print its own replication queue. "
+ "Default only polls ZooKeeper");
System.err.println(" hdfs Use HDFS to calculate usage of WALs by replication. It could be overestimated"
+ " if replicating to multiple peers. --distributed flag is also needed.");
System.err.println(" --hdfs Use HDFS to calculate usage of WALs by replication."
+ " It could be overestimated if replicating to multiple peers."
+ " --distributed flag is also needed.");
}
protected static void printUsageAndExit(final String message, final int exitCode) {
@ -176,7 +195,6 @@ public class DumpReplicationQueues extends Configured implements Tool {
HBaseAdmin.available(conf);
ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf);
ClusterConnection connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
long deleted = 0;
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "DumpReplicationQueues" + System.currentTimeMillis(),
new WarnOnlyAbortable(), true);
@ -203,11 +221,8 @@ public class DumpReplicationQueues extends Configured implements Tool {
if (opts.isDistributed()) {
LOG.info("Found [--distributed], will poll each RegionServer.");
System.out.println(dumpQueues(connection, zkw, opts.isHdfs(), deleted));
if (deleted > 0) {
LOG.warn("Found " + deleted +" deleted queues"
+ ", run hbck -fixReplication in order to remove the deleted replication queues");
}
System.out.println(dumpQueues(connection, zkw, peerConfigs.keySet(), opts.isHdfs()));
System.out.println(dumpReplicationSummary());
} else {
// use ZK instead
System.out.print("Dumping replication znodes via ZooKeeper:");
@ -221,21 +236,54 @@ public class DumpReplicationQueues extends Configured implements Tool {
}
}
public String dumpReplicationSummary() {
StringBuilder sb = new StringBuilder();
if (!deletedQueues.isEmpty()) {
sb.append("Found " + deletedQueues.size() + " deleted queues"
+ ", run hbck -fixReplication in order to remove the deleted replication queues\n");
for (String deletedQueue : deletedQueues) {
sb.append(" " + deletedQueue + "\n");
}
}
if (!deadRegionServers.isEmpty()) {
sb.append("Found " + deadRegionServers.size() + " dead regionservers"
+ ", restart one regionserver to transfer the queues of dead regionservers\n");
for (String deadRs : deadRegionServers) {
sb.append(" " + deadRs + "\n");
}
}
if (!peersQueueSize.isEmpty()) {
sb.append("Dumping all peers's number of WALs in replication queue\n");
for (Map.Entry<String, Long> entry : peersQueueSize.asMap().entrySet()) {
sb.append(" PeerId: " + entry.getKey() + " , sizeOfLogQueue: " + entry.getValue() + "\n");
}
}
sb.append(" Total size of WALs on HDFS: " + StringUtils.humanSize(totalSizeOfWALs) + "\n");
if (numWalsNotFound > 0) {
sb.append(" ERROR: There are " + numWalsNotFound + " WALs not found!!!\n");
}
return sb.toString();
}
public String dumpPeersState(ReplicationAdmin replicationAdmin,
Map<String, ReplicationPeerConfig> peerConfigs) throws Exception {
Map<String, ReplicationPeerConfig> peerConfigs) throws Exception {
Map<String, String> currentConf;
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, ReplicationPeerConfig> peer : peerConfigs.entrySet()) {
try {
ReplicationPeerConfig peerConfig = peer.getValue();
sb.append("Peer: " + peer.getKey() + "\n");
sb.append(" " + "State: " + (replicationAdmin.getPeerState(peer.getKey()) ? "ENABLED" : "DISABLED") + "\n");
sb.append(" " + "Cluster Name: " + peer.getValue() + "\n");
currentConf = peer.getValue().getConfiguration();
sb.append(" " + "State: "
+ (replicationAdmin.getPeerState(peer.getKey()) ? "ENABLED" : "DISABLED") + "\n");
sb.append(" " + "Cluster Name: " + peerConfig.getClusterKey() + "\n");
sb.append(" " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n");
currentConf = peerConfig.getConfiguration();
// Only show when we have a custom configuration for the peer
if (currentConf.size() > 1) {
sb.append(" " + "Peer Configuration: " + currentConf + "\n");
}
sb.append(" " + "Peer Table CFs: " + replicationAdmin.getPeerTableCFs(peer.getKey()) + "\n");
sb.append(" " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n");
sb.append(" " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n");
} catch (ReplicationException re) {
sb.append("Got an exception while invoking ReplicationAdmin: " + re + "\n");
}
@ -243,11 +291,12 @@ public class DumpReplicationQueues extends Configured implements Tool {
return sb.toString();
}
public String dumpQueues(ClusterConnection connection, ZooKeeperWatcher zkw, boolean hdfs, long deleted)
throws Exception {
public String dumpQueues(ClusterConnection connection, ZooKeeperWatcher zkw, Set<String> peerIds,
boolean hdfs) throws Exception {
ReplicationQueuesClient queuesClient;
ReplicationPeers replicationPeers;
ReplicationQueues replicationQueues;
ReplicationTracker replicationTracker;
ReplicationQueuesClientArguments replicationArgs =
new ReplicationQueuesClientArguments(getConf(), new WarnOnlyAbortable(), zkw);
StringBuilder sb = new StringBuilder();
@ -256,24 +305,29 @@ public class DumpReplicationQueues extends Configured implements Tool {
queuesClient.init();
replicationQueues = ReplicationFactory.getReplicationQueues(replicationArgs);
replicationPeers = ReplicationFactory.getReplicationPeers(zkw, getConf(), queuesClient, connection);
replicationPeers.init();
replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(),
new WarnOnlyAbortable(), new WarnOnlyStoppable());
List<String> liveRegionServers = replicationTracker.getListOfRegionServers();
// Loops each peer on each RS and dumps the queues
Set<String> peerIds = new HashSet<String>(replicationPeers.getAllPeerIds());
try {
List<String> regionservers = queuesClient.getListOfReplicators();
for (String regionserver : regionservers) {
List<String> queueIds = queuesClient.getAllQueues(regionserver);
replicationQueues.init(regionserver);
if (!liveRegionServers.contains(regionserver)) {
deadRegionServers.add(regionserver);
}
for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
List<String> wals = queuesClient.getLogsInQueue(regionserver, queueId);
if (!peerIds.contains(queueInfo.getPeerId())) {
deleted++;
sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, hdfs));
deletedQueues.add(regionserver + "/" + queueId);
sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true,
hdfs));
} else {
sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, hdfs));
sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false,
hdfs));
}
}
}
@ -291,6 +345,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
List<String> deadServers ;
sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
sb.append(" Queue znode: " + queueId + "\n");
sb.append(" PeerID: " + queueInfo.getPeerId() + "\n");
sb.append(" Recovered: " + queueInfo.isQueueRecovered() + "\n");
deadServers = queueInfo.getDeadRegionServers();
@ -301,6 +356,8 @@ public class DumpReplicationQueues extends Configured implements Tool {
}
sb.append(" Was deleted: " + isDeleted + "\n");
sb.append(" Number of WALs in replication queue: " + wals.size() + "\n");
peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());
for (String wal : wals) {
long position = replicationQueues.getLogPosition(queueInfo.getPeerId(), wal);
sb.append(" Replication position for " + wal + ": " + (position > 0 ? position : "0"
@ -325,11 +382,18 @@ public class DumpReplicationQueues extends Configured implements Tool {
try {
fileStatus = (new WALLink(getConf(), server, wal)).getFileStatus(fs);
} catch (IOException e) {
LOG.warn("WAL " + wal + " couldn't be found, skipping");
break;
if (e instanceof FileNotFoundException) {
numWalsNotFound++;
LOG.warn("WAL " + wal + " couldn't be found, skipping", e);
} else {
LOG.warn("Can't get file status of WAL " + wal + ", skipping", e);
}
continue;
}
size += fileStatus.getLen();
}
totalSizeOfWALs += size;
return size;
}
@ -348,9 +412,15 @@ public class DumpReplicationQueues extends Configured implements Tool {
}
}
private static void usage(final String errorMsg) {
if (errorMsg != null && errorMsg.length() > 0) {
LOG.error(errorMsg);
private static class WarnOnlyStoppable implements Stoppable {
@Override
public void stop(String why) {
LOG.warn("DumpReplicationQueue received stop, ignoring. Reason: " + why);
}
@Override
public boolean isStopped() {
return false;
}
}
}