HBASE-27217 Revisit the DumpReplicationQueues tool (#4810)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
0d57ee147e
commit
ffad1ff727
@ -19,8 +19,12 @@ package org.apache.hadoop.hbase.replication.regionserver;
|
|||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.URLEncoder;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -31,7 +35,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
@ -40,28 +44,33 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
|
|||||||
import org.apache.hadoop.hbase.client.replication.TableCFs;
|
import org.apache.hadoop.hbase.client.replication.TableCFs;
|
||||||
import org.apache.hadoop.hbase.io.WALLink;
|
import org.apache.hadoop.hbase.io.WALLink;
|
||||||
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKDump;
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TODO: reimplement this tool
|
|
||||||
* <p/>
|
* <p/>
|
||||||
* Provides information about the existing states of replication, replication peers and queues.
|
* Provides information about the existing states of replication, replication peers and queues.
|
||||||
* Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args]
|
* Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args]
|
||||||
* Arguments: --distributed Polls each RS to dump information about the queue --hdfs Reports HDFS
|
* Arguments: --distributed Polls each RS to dump information about the queue --hdfs Reports HDFS
|
||||||
* usage by the replication queues (note: can be overestimated).
|
* usage by the replication queues (note: can be overestimated). In the new version, we
|
||||||
|
* reimplemented the DumpReplicationQueues tool to support obtaining information from replication
|
||||||
|
* table.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class DumpReplicationQueues extends Configured implements Tool {
|
public class DumpReplicationQueues extends Configured implements Tool {
|
||||||
@ -185,7 +194,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
|
|||||||
System.err.println("General Options:");
|
System.err.println("General Options:");
|
||||||
System.err.println(" -h|--h|--help Show this help and exit.");
|
System.err.println(" -h|--h|--help Show this help and exit.");
|
||||||
System.err.println(" --distributed Poll each RS and print its own replication queue. "
|
System.err.println(" --distributed Poll each RS and print its own replication queue. "
|
||||||
+ "Default only polls ZooKeeper");
|
+ "Default only polls replication table.");
|
||||||
System.err.println(" --hdfs Use HDFS to calculate usage of WALs by replication."
|
System.err.println(" --hdfs Use HDFS to calculate usage of WALs by replication."
|
||||||
+ " It could be overestimated if replicating to multiple peers."
|
+ " It could be overestimated if replicating to multiple peers."
|
||||||
+ " --distributed flag is also needed.");
|
+ " --distributed flag is also needed.");
|
||||||
@ -201,13 +210,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
|
|||||||
Connection connection = ConnectionFactory.createConnection(conf);
|
Connection connection = ConnectionFactory.createConnection(conf);
|
||||||
Admin admin = connection.getAdmin();
|
Admin admin = connection.getAdmin();
|
||||||
|
|
||||||
ZKWatcher zkw =
|
|
||||||
new ZKWatcher(conf, "DumpReplicationQueues" + EnvironmentEdgeManager.currentTime(),
|
|
||||||
new WarnOnlyAbortable(), true);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Our zk watcher
|
|
||||||
LOG.info("Our Quorum: " + zkw.getQuorum());
|
|
||||||
List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs();
|
List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs();
|
||||||
if (replicatedTableCFs.isEmpty()) {
|
if (replicatedTableCFs.isEmpty()) {
|
||||||
LOG.info("No tables with a configured replication peer were found.");
|
LOG.info("No tables with a configured replication peer were found.");
|
||||||
@ -229,21 +232,72 @@ public class DumpReplicationQueues extends Configured implements Tool {
|
|||||||
LOG.info("Found [--distributed], will poll each RegionServer.");
|
LOG.info("Found [--distributed], will poll each RegionServer.");
|
||||||
Set<String> peerIds =
|
Set<String> peerIds =
|
||||||
peers.stream().map((peer) -> peer.getPeerId()).collect(Collectors.toSet());
|
peers.stream().map((peer) -> peer.getPeerId()).collect(Collectors.toSet());
|
||||||
System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs()));
|
System.out.println(dumpQueues(connection, peerIds, opts.isHdfs(), conf));
|
||||||
System.out.println(dumpReplicationSummary());
|
System.out.println(dumpReplicationSummary());
|
||||||
} else {
|
} else {
|
||||||
// use ZK instead
|
// use replication table instead
|
||||||
System.out.print("Dumping replication znodes via ZooKeeper:");
|
System.out.println("Dumping replication info via replication table.");
|
||||||
System.out.println(ZKDump.getReplicationZnodesDump(zkw));
|
System.out.println(dumpReplicationViaTable(connection, conf));
|
||||||
}
|
}
|
||||||
return (0);
|
return (0);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
return (-1);
|
return (-1);
|
||||||
} finally {
|
} finally {
|
||||||
zkw.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String dumpReplicationViaTable(Connection connection, Configuration conf)
|
||||||
|
throws ReplicationException, IOException {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
ReplicationQueueStorage queueStorage =
|
||||||
|
ReplicationStorageFactory.getReplicationQueueStorage(connection, conf);
|
||||||
|
|
||||||
|
// The dump info format is as follows:
|
||||||
|
// peers:
|
||||||
|
// peers/1: zk1:2181:/hbase
|
||||||
|
// peers/1/peer-state: ENABLED
|
||||||
|
// rs:
|
||||||
|
// rs/rs1,16020,1664092120094/1/rs1%2C16020%2C1664092120094.1664096778778: 123
|
||||||
|
// rs/rs2,16020,1664092120094/2/rs1%2C16020%2C1664092120094.1664096778778: 321
|
||||||
|
// hfile-refs:
|
||||||
|
// hfile-refs/1/hfile1,hfile2
|
||||||
|
// hfile-refs/2/hfile3,hfile4
|
||||||
|
String peersKey = "peers";
|
||||||
|
sb.append(peersKey).append(": ").append("\n");
|
||||||
|
List<ReplicationPeerDescription> repPeerDescs = connection.getAdmin().listReplicationPeers();
|
||||||
|
for (ReplicationPeerDescription repPeerDesc : repPeerDescs) {
|
||||||
|
sb.append(peersKey).append("/").append(repPeerDesc.getPeerId()).append(": ")
|
||||||
|
.append(repPeerDesc.getPeerConfig().getClusterKey()).append("\n");
|
||||||
|
sb.append(peersKey).append("/").append(repPeerDesc.getPeerId()).append("/peer-state: ")
|
||||||
|
.append(repPeerDesc.isEnabled() ? "ENABLED" : "DISABLED").append("\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
List<ReplicationQueueData> repQueueDataList = queueStorage.listAllQueues();
|
||||||
|
String rsKey = "rs";
|
||||||
|
sb.append(rsKey).append(": ").append("\n");
|
||||||
|
for (ReplicationQueueData repQueueData : repQueueDataList) {
|
||||||
|
String peerId = repQueueData.getId().getPeerId();
|
||||||
|
for (ImmutableMap.Entry<String, ReplicationGroupOffset> entry : repQueueData.getOffsets()
|
||||||
|
.entrySet()) {
|
||||||
|
sb.append(rsKey).append("/").append(entry.getKey()).append("/").append(peerId).append("/")
|
||||||
|
.append(entry.getValue().getWal()).append(": ").append(entry.getValue().getOffset())
|
||||||
|
.append("\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
List<String> peerIds = queueStorage.getAllPeersFromHFileRefsQueue();
|
||||||
|
String hfileKey = "hfile-refs";
|
||||||
|
sb.append(hfileKey).append(": ").append("\n");
|
||||||
|
for (String peerId : peerIds) {
|
||||||
|
List<String> hfiles = queueStorage.getReplicableHFiles(peerId);
|
||||||
|
sb.append(hfileKey).append("/").append(peerId).append("/").append(String.join(",", hfiles))
|
||||||
|
.append("\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
public String dumpReplicationSummary() {
|
public String dumpReplicationSummary() {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
if (!deletedQueues.isEmpty()) {
|
if (!deletedQueues.isEmpty()) {
|
||||||
@ -294,71 +348,103 @@ public class DumpReplicationQueues extends Configured implements Tool {
|
|||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) throws Exception {
|
public String dumpQueues(Connection connection, Set<String> peerIds, boolean hdfs,
|
||||||
ReplicationQueueStorage queueStorage;
|
Configuration conf) throws Exception {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
|
ReplicationQueueStorage queueStorage =
|
||||||
|
ReplicationStorageFactory.getReplicationQueueStorage(connection, conf);
|
||||||
|
|
||||||
// queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
|
Set<ServerName> liveRegionServers =
|
||||||
// Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw,
|
connection.getAdmin().getClusterMetrics().getLiveServerMetrics().keySet();
|
||||||
// zkw.getZNodePaths().rsZNode)
|
|
||||||
// .stream().map(ServerName::parseServerName).collect(Collectors.toSet());
|
List<ServerName> regionServers = queueStorage.listAllReplicators();
|
||||||
//
|
if (regionServers == null || regionServers.isEmpty()) {
|
||||||
// Loops each peer on each RS and dumps the queues
|
return sb.toString();
|
||||||
// List<ServerName> regionservers = queueStorage.getListOfReplicators();
|
}
|
||||||
// if (regionservers == null || regionservers.isEmpty()) {
|
for (ServerName regionServer : regionServers) {
|
||||||
// return sb.toString();
|
List<ReplicationQueueId> queueIds = queueStorage.listAllQueueIds(regionServer);
|
||||||
// }
|
|
||||||
// for (ServerName regionserver : regionservers) {
|
if (!liveRegionServers.contains(regionServer)) {
|
||||||
// List<String> queueIds = queueStorage.getAllQueues(regionserver);
|
deadRegionServers.add(regionServer.getServerName());
|
||||||
// if (!liveRegionServers.contains(regionserver)) {
|
}
|
||||||
// deadRegionServers.add(regionserver.getServerName());
|
for (ReplicationQueueId queueId : queueIds) {
|
||||||
// }
|
List<String> tmpWals = new ArrayList<>();
|
||||||
// for (String queueId : queueIds) {
|
// wals
|
||||||
// ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
AbstractFSWALProvider
|
||||||
// List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
|
.getWALFiles(connection.getConfiguration(), queueId.getServerWALsBelongTo()).stream()
|
||||||
// Collections.sort(wals);
|
.map(Path::toString).forEach(tmpWals::add);
|
||||||
// if (!peerIds.contains(queueInfo.getPeerId())) {
|
|
||||||
// deletedQueues.add(regionserver + "/" + queueId);
|
// old wals
|
||||||
// sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
|
AbstractFSWALProvider.getArchivedWALFiles(connection.getConfiguration(),
|
||||||
// } else {
|
queueId.getServerWALsBelongTo(), URLEncoder
|
||||||
// sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
|
.encode(queueId.getServerWALsBelongTo().toString(), StandardCharsets.UTF_8.name()))
|
||||||
// }
|
.stream().map(Path::toString).forEach(tmpWals::add);
|
||||||
// }
|
|
||||||
// }
|
Map<String, ReplicationGroupOffset> offsets = queueStorage.getOffsets(queueId);
|
||||||
|
// filter out the wal files that should replicate
|
||||||
|
List<String> wals = new ArrayList<>();
|
||||||
|
for (Map.Entry<String, ReplicationGroupOffset> entry : offsets.entrySet()) {
|
||||||
|
ReplicationGroupOffset offset = entry.getValue();
|
||||||
|
for (String wal : tmpWals) {
|
||||||
|
if (ReplicationOffsetUtil.shouldReplicate(offset, wal)) {
|
||||||
|
wals.add(wal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Collections.sort(wals, Comparator.comparingLong(AbstractFSWALProvider::getTimestamp));
|
||||||
|
if (!peerIds.contains(queueId.getPeerId())) {
|
||||||
|
deletedQueues.add(regionServer + "/" + queueId);
|
||||||
|
sb.append(formatQueue(regionServer, offsets, wals, queueId, true, hdfs));
|
||||||
|
} else {
|
||||||
|
sb.append(formatQueue(regionServer, offsets, wals, queueId, false, hdfs));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage,
|
private String formatQueue(ServerName regionServer, Map<String, ReplicationGroupOffset> offsets,
|
||||||
ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted,
|
List<String> wals, ReplicationQueueId queueId, boolean isDeleted, boolean hdfs)
|
||||||
boolean hdfs) throws Exception {
|
throws Exception {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
|
|
||||||
List<ServerName> deadServers;
|
sb.append("Dumping replication queue info for RegionServer: [" + regionServer + "]" + "\n");
|
||||||
|
sb.append(" Queue id: " + queueId + "\n");
|
||||||
sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
|
sb.append(" PeerID: " + queueId.getPeerId() + "\n");
|
||||||
sb.append(" Queue znode: " + queueId + "\n");
|
sb.append(" Recovered: " + queueId.isRecovered() + "\n");
|
||||||
sb.append(" PeerID: " + queueInfo.getPeerId() + "\n");
|
// In new version, we only record the first dead RegionServer in queueId.
|
||||||
sb.append(" Recovered: " + queueInfo.isQueueRecovered() + "\n");
|
if (queueId.getSourceServerName().isPresent()) {
|
||||||
deadServers = queueInfo.getDeadRegionServers();
|
sb.append(" Dead RegionServer: " + queueId.getSourceServerName().get() + "\n");
|
||||||
if (deadServers.isEmpty()) {
|
|
||||||
sb.append(" No dead RegionServers found in this queue." + "\n");
|
|
||||||
} else {
|
} else {
|
||||||
sb.append(" Dead RegionServers: " + deadServers + "\n");
|
sb.append(" No dead RegionServer found in this queue." + "\n");
|
||||||
}
|
}
|
||||||
sb.append(" Was deleted: " + isDeleted + "\n");
|
sb.append(" Was deleted: " + isDeleted + "\n");
|
||||||
sb.append(" Number of WALs in replication queue: " + wals.size() + "\n");
|
sb.append(" Number of WALs in replication queue: " + wals.size() + "\n");
|
||||||
peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());
|
peersQueueSize.addAndGet(queueId.getPeerId(), wals.size());
|
||||||
|
|
||||||
for (String wal : wals) {
|
for (Map.Entry<String, ReplicationGroupOffset> entry : offsets.entrySet()) {
|
||||||
// long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal);
|
String walGroup = entry.getKey();
|
||||||
// sb.append(" Replication position for " + wal + ": "
|
ReplicationGroupOffset offset = entry.getValue();
|
||||||
// + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n");
|
for (String wal : wals) {
|
||||||
|
long position = 0;
|
||||||
|
if (offset.getWal().equals(wal)) {
|
||||||
|
position = offset.getOffset();
|
||||||
|
}
|
||||||
|
sb.append(
|
||||||
|
" Replication position for " + (walGroup != null ? walGroup + "/" + wal : wal) + ": ");
|
||||||
|
if (position == 0) {
|
||||||
|
sb.append("0 (not started or nothing to replicate)");
|
||||||
|
} else if (position > 0) {
|
||||||
|
sb.append(position);
|
||||||
|
}
|
||||||
|
sb.append("\n");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hdfs) {
|
if (hdfs) {
|
||||||
FileSystem fs = FileSystem.get(getConf());
|
FileSystem fs = FileSystem.get(getConf());
|
||||||
sb.append(" Total size of WALs on HDFS for this queue: "
|
sb.append(" Total size of WALs on HDFS for this queue: "
|
||||||
+ StringUtils.humanSize(getTotalWALSize(fs, wals, regionserver)) + "\n");
|
+ StringUtils.humanSize(getTotalWALSize(fs, wals, regionServer)) + "\n");
|
||||||
}
|
}
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
@ -366,8 +452,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
|
|||||||
/**
|
/**
|
||||||
* return total size in bytes from a list of WALs
|
* return total size in bytes from a list of WALs
|
||||||
*/
|
*/
|
||||||
private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server)
|
private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server) {
|
||||||
throws IOException {
|
|
||||||
long size = 0;
|
long size = 0;
|
||||||
FileStatus fileStatus;
|
FileStatus fileStatus;
|
||||||
|
|
||||||
@ -389,19 +474,4 @@ public class DumpReplicationQueues extends Configured implements Tool {
|
|||||||
totalSizeOfWALs += size;
|
totalSizeOfWALs += size;
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class WarnOnlyAbortable implements Abortable {
|
|
||||||
@Override
|
|
||||||
public void abort(String why, Throwable e) {
|
|
||||||
LOG.warn("DumpReplicationQueue received abort, ignoring. Reason: " + why);
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(e.toString(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isAborted() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -379,6 +379,26 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
|
|||||||
return archivedWalFiles;
|
return archivedWalFiles;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List all the wal files for a logPrefix.
|
||||||
|
*/
|
||||||
|
public static List<Path> getWALFiles(Configuration c, ServerName serverName) throws IOException {
|
||||||
|
Path walRoot = new Path(CommonFSUtils.getWALRootDir(c), HConstants.HREGION_LOGDIR_NAME);
|
||||||
|
FileSystem fs = walRoot.getFileSystem(c);
|
||||||
|
List<Path> walFiles = new ArrayList<>();
|
||||||
|
Path walDir = new Path(walRoot, serverName.toString());
|
||||||
|
try {
|
||||||
|
for (FileStatus status : fs.listStatus(walDir)) {
|
||||||
|
if (status.isFile()) {
|
||||||
|
walFiles.add(status.getPath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
LOG.info("WAL dir {} not exists", walDir);
|
||||||
|
}
|
||||||
|
return walFiles;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts,
|
* Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts,
|
||||||
* this method ignores the format of the logfile component. Current format: [base directory for
|
* this method ignores the format of the logfile component. Current format: [base directory for
|
||||||
|
@ -17,34 +17,43 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.replication.regionserver;
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.junit.After;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
import org.junit.Before;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Ignore;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests for DumpReplicationQueues tool
|
* Tests for DumpReplicationQueues tool
|
||||||
*/
|
*/
|
||||||
// TODO: reimplement
|
|
||||||
@Ignore
|
|
||||||
@Category({ ReplicationTests.class, SmallTests.class })
|
@Category({ ReplicationTests.class, SmallTests.class })
|
||||||
public class TestDumpReplicationQueues {
|
public class TestDumpReplicationQueues {
|
||||||
|
|
||||||
@ -52,49 +61,99 @@ public class TestDumpReplicationQueues {
|
|||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestDumpReplicationQueues.class);
|
HBaseClassTestRule.forClass(TestDumpReplicationQueues.class);
|
||||||
|
|
||||||
/**
|
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
|
||||||
* Makes sure dumpQueues returns wals znodes ordered chronologically.
|
private static Configuration CONF;
|
||||||
* @throws Exception if dumpqueues finds any error while handling list of znodes.
|
private static FileSystem FS = null;
|
||||||
*/
|
private Path root;
|
||||||
@Test
|
private Path logDir;
|
||||||
public void testDumpReplicationReturnsWalSorted() throws Exception {
|
@Rule
|
||||||
Configuration config = HBaseConfiguration.create();
|
public final TestName name = new TestName();
|
||||||
ZKWatcher zkWatcherMock = mock(ZKWatcher.class);
|
|
||||||
ZNodePaths zNodePath = new ZNodePaths(config);
|
@Before
|
||||||
RecoverableZooKeeper recoverableZooKeeperMock = mock(RecoverableZooKeeper.class);
|
public void setup() throws Exception {
|
||||||
when(zkWatcherMock.getRecoverableZooKeeper()).thenReturn(recoverableZooKeeperMock);
|
UTIL.startMiniCluster(3);
|
||||||
when(zkWatcherMock.getZNodePaths()).thenReturn(zNodePath);
|
CONF = UTIL.getConfiguration();
|
||||||
List<String> nodes = new ArrayList<>();
|
TableName tableName = TableName.valueOf("replication_" + name.getMethodName());
|
||||||
String server = "rs1,60030," + EnvironmentEdgeManager.currentTime();
|
UTIL.getAdmin()
|
||||||
nodes.add(server);
|
.createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName));
|
||||||
when(recoverableZooKeeperMock.getChildren("/hbase/rs", null)).thenReturn(nodes);
|
CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString());
|
||||||
when(recoverableZooKeeperMock.getChildren("/hbase/replication/rs", null)).thenReturn(nodes);
|
FS = FileSystem.get(CONF);
|
||||||
List<String> queuesIds = new ArrayList<>();
|
root = UTIL.getDataTestDirOnTestFS("hbase");
|
||||||
queuesIds.add("1");
|
logDir = new Path(root, HConstants.HREGION_LOGDIR_NAME);
|
||||||
when(recoverableZooKeeperMock.getChildren("/hbase/replication/rs/" + server, null))
|
FS.mkdirs(logDir);
|
||||||
.thenReturn(queuesIds);
|
CommonFSUtils.setRootDir(CONF, root);
|
||||||
List<String> wals = new ArrayList<>();
|
CommonFSUtils.setWALRootDir(CONF, root);
|
||||||
wals.add("rs1%2C60964%2C1549394085556.1549394101427");
|
|
||||||
wals.add("rs1%2C60964%2C1549394085556.1549394101426");
|
|
||||||
wals.add("rs1%2C60964%2C1549394085556.1549394101428");
|
|
||||||
when(recoverableZooKeeperMock.getChildren("/hbase/replication/rs/" + server + "/1", null))
|
|
||||||
.thenReturn(wals);
|
|
||||||
DumpReplicationQueues dumpQueues = new DumpReplicationQueues();
|
|
||||||
Set<String> peerIds = new HashSet<>();
|
|
||||||
peerIds.add("1");
|
|
||||||
dumpQueues.setConf(config);
|
|
||||||
String dump = dumpQueues.dumpQueues(zkWatcherMock, peerIds, false);
|
|
||||||
String[] parsedDump = dump.split("Replication position for");
|
|
||||||
assertEquals("Parsed dump should have 4 parts.", 4, parsedDump.length);
|
|
||||||
assertTrue(
|
|
||||||
"First wal should be rs1%2C60964%2C1549394085556.1549394101426, but got: " + parsedDump[1],
|
|
||||||
parsedDump[1].indexOf("rs1%2C60964%2C1549394085556.1549394101426") >= 0);
|
|
||||||
assertTrue(
|
|
||||||
"Second wal should be rs1%2C60964%2C1549394085556.1549394101427, but got: " + parsedDump[2],
|
|
||||||
parsedDump[2].indexOf("rs1%2C60964%2C1549394085556.1549394101427") >= 0);
|
|
||||||
assertTrue(
|
|
||||||
"Third wal should be rs1%2C60964%2C1549394085556.1549394101428, but got: " + parsedDump[3],
|
|
||||||
parsedDump[3].indexOf("rs1%2C60964%2C1549394085556.1549394101428") >= 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDumpReplication() throws Exception {
|
||||||
|
String peerId = "1";
|
||||||
|
String serverNameStr = "rs1,12345,123";
|
||||||
|
addPeer(peerId, "hbase");
|
||||||
|
ServerName serverName = ServerName.valueOf(serverNameStr);
|
||||||
|
String walName = "rs1%2C12345%2C123.10";
|
||||||
|
Path walPath = new Path(logDir, serverNameStr + "/" + walName);
|
||||||
|
FS.createNewFile(walPath);
|
||||||
|
|
||||||
|
ReplicationQueueId queueId = new ReplicationQueueId(serverName, peerId);
|
||||||
|
ReplicationQueueStorage queueStorage =
|
||||||
|
ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(), CONF);
|
||||||
|
queueStorage.setOffset(queueId, "wal-group",
|
||||||
|
new ReplicationGroupOffset(FS.listStatus(walPath)[0].getPath().toString(), 123),
|
||||||
|
Collections.emptyMap());
|
||||||
|
|
||||||
|
DumpReplicationQueues dumpQueues = new DumpReplicationQueues();
|
||||||
|
Set<String> peerIds = new HashSet<>();
|
||||||
|
peerIds.add(peerId);
|
||||||
|
List<String> wals = new ArrayList<>();
|
||||||
|
wals.add("rs1%2C12345%2C123.12");
|
||||||
|
wals.add("rs1%2C12345%2C123.15");
|
||||||
|
wals.add("rs1%2C12345%2C123.11");
|
||||||
|
for (String wal : wals) {
|
||||||
|
Path wPath = new Path(logDir, serverNameStr + "/" + wal);
|
||||||
|
FS.createNewFile(wPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
String dump = dumpQueues.dumpQueues(UTIL.getConnection(), peerIds, false, CONF);
|
||||||
|
assertTrue(dump.indexOf("Queue id: 1-rs1,12345,123") > 0);
|
||||||
|
assertTrue(dump.indexOf("Number of WALs in replication queue: 4") > 0);
|
||||||
|
// test for 'Returns wal sorted'
|
||||||
|
String[] parsedDump = dump.split("Replication position for");
|
||||||
|
assertTrue("First wal should be rs1%2C12345%2C123.10: 123, but got: " + parsedDump[1],
|
||||||
|
parsedDump[1].indexOf("rs1%2C12345%2C123.10: 123") >= 0);
|
||||||
|
assertTrue("Second wal should be rs1%2C12345%2C123.11: 0, but got: " + parsedDump[2],
|
||||||
|
parsedDump[2].indexOf("rs1%2C12345%2C123.11: 0 (not started or nothing to replicate)") >= 0);
|
||||||
|
assertTrue("Third wal should be rs1%2C12345%2C123.12: 0, but got: " + parsedDump[3],
|
||||||
|
parsedDump[3].indexOf("rs1%2C12345%2C123.12: 0 (not started or nothing to replicate)") >= 0);
|
||||||
|
assertTrue("Fourth wal should be rs1%2C12345%2C123.15: 0, but got: " + parsedDump[4],
|
||||||
|
parsedDump[4].indexOf("rs1%2C12345%2C123.15: 0 (not started or nothing to replicate)") >= 0);
|
||||||
|
|
||||||
|
Path file1 = new Path("testHFile1");
|
||||||
|
Path file2 = new Path("testHFile2");
|
||||||
|
List<Pair<Path, Path>> files = new ArrayList<>(1);
|
||||||
|
files.add(new Pair<>(null, file1));
|
||||||
|
files.add(new Pair<>(null, file2));
|
||||||
|
queueStorage.addHFileRefs(peerId, files);
|
||||||
|
// test for 'Dump Replication via replication table'
|
||||||
|
String dump2 = dumpQueues.dumpReplicationViaTable(UTIL.getConnection(), CONF);
|
||||||
|
assertTrue(dump2.indexOf("peers/1/peer-state: ENABLED") > 0);
|
||||||
|
assertTrue(dump2.indexOf("rs1,12345,123/rs1%2C12345%2C123.10: 123") >= 0);
|
||||||
|
assertTrue(dump2.indexOf("hfile-refs/1/testHFile1,testHFile2") >= 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a peer
|
||||||
|
*/
|
||||||
|
private void addPeer(String peerId, String clusterKey) throws IOException {
|
||||||
|
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
|
||||||
|
.setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey)
|
||||||
|
.setReplicationEndpointImpl(
|
||||||
|
TestReplicationSourceManager.ReplicationEndpointForTest.class.getName());
|
||||||
|
UTIL.getAdmin().addReplicationPeer(peerId, builder.build(), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user