HBASE-21201 Support to run VerifyReplication MR tool without peerid

Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
Toshihiro Suzuki 2019-01-25 00:24:39 +09:00
parent e0bf2b1dca
commit 3902693f61
2 changed files with 140 additions and 44 deletions

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@ -96,6 +97,7 @@ public class VerifyReplication extends Configured implements Tool {
String families = null;
String delimiter = "";
String peerId = null;
String peerQuorumAddress = null;
String rowPrefixes = null;
int sleepMsBeforeReCompare = 0;
boolean verbose = false;
@ -385,7 +387,6 @@ public class VerifyReplication extends Configured implements Tool {
if (!doCommandLine(args)) {
return null;
}
conf.set(NAME+".peerId", peerId);
conf.set(NAME+".tableName", tableName);
conf.setLong(NAME+".startTime", startTime);
conf.setLong(NAME+".endTime", endTime);
@ -401,14 +402,23 @@ public class VerifyReplication extends Configured implements Tool {
conf.set(NAME+".rowPrefixes", rowPrefixes);
}
Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf, peerId);
ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
String peerQuorumAddress = peerConfig.getClusterKey();
LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
String peerQuorumAddress;
Pair<ReplicationPeerConfig, Configuration> peerConfigPair = null;
if (peerId != null) {
peerConfigPair = getPeerQuorumConfig(conf, peerId);
ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
peerQuorumAddress = peerConfig.getClusterKey();
LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
peerConfig.getConfiguration());
conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
peerConfig.getConfiguration().entrySet());
} else {
assert this.peerQuorumAddress != null;
peerQuorumAddress = this.peerQuorumAddress;
LOG.info("Peer Quorum Address: " + peerQuorumAddress);
conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
}
conf.setInt(NAME + ".versions", versions);
LOG.info("Number of version: " + versions);
@ -463,9 +473,13 @@ public class VerifyReplication extends Configured implements Tool {
} else {
TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
}
Configuration peerClusterConf = peerConfigPair.getSecond();
// Obtain the auth token from peer cluster
TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
if (peerId != null) {
assert peerConfigPair != null;
Configuration peerClusterConf = peerConfigPair.getSecond();
// Obtain the auth token from peer cluster
TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
}
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
@ -610,7 +624,11 @@ public class VerifyReplication extends Configured implements Tool {
}
if (i == args.length-2) {
peerId = cmd;
if (isPeerQuorumAddress(cmd)) {
peerQuorumAddress = cmd;
} else {
peerId = cmd;
}
}
if (i == args.length-1) {
@ -651,6 +669,16 @@ public class VerifyReplication extends Configured implements Tool {
return true;
}
private boolean isPeerQuorumAddress(String cmd) {
try {
ZKConfig.validateClusterKey(cmd);
} catch (IOException e) {
// not a quorum address
return false;
}
return true;
}
/*
* @param errorMsg Error message. Can be null.
*/
@ -660,8 +688,9 @@ public class VerifyReplication extends Configured implements Tool {
}
System.err.println("Usage: verifyrep [--starttime=X]" +
" [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " +
"[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] "
+ "[--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid> <tablename>");
"[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] "
+ "[--peerSnapshotName=R] [--peerSnapshotTmpDir=S] [--peerFSAddress=T] "
+ "[--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>");
System.err.println();
System.err.println("Options:");
System.err.println(" starttime beginning of the time range");
@ -686,6 +715,8 @@ public class VerifyReplication extends Configured implements Tool {
System.err.println();
System.err.println("Args:");
System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");
System.err.println(" peerQuorumAddress quorumAdress of the peer used for verification. The "
+ "format is zk_quorum:zk_port:zk_hbase_path");
System.err.println(" tablename Name of the table to verify");
System.err.println();
System.err.println("Examples:");

View File

@ -422,36 +422,24 @@ public class TestVerifyReplication extends TestReplicationBase {
FileSystem fs = rootDir.getFileSystem(conf1);
String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
new String(famName), sourceSnapshotName, rootDir, fs, true);
Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
// Take target snapshot
Path peerRootDir = FSUtils.getRootDir(conf2);
FileSystem peerFs = peerRootDir.getFileSystem(conf2);
String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
String peerFSAddress = peerFs.getUri().toString();
String temPath1 = utility1.getRandomDir().toString();
String temPath2 = "/tmp2";
String temPath2 = "/tmp" + System.currentTimeMillis();
String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
"--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
Job job = new VerifyReplication().createSubmittableJob(conf1, args);
if (job == null) {
fail("Job wasn't created, see the log");
}
if (!job.waitForCompletion(true)) {
fail("Job failed, see the log");
}
assertEquals(NB_ROWS_IN_BATCH,
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
assertEquals(0,
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
checkRestoreTmpDir(conf1, temPath1, 1);
checkRestoreTmpDir(conf2, temPath2, 1);
@ -470,30 +458,107 @@ public class TestVerifyReplication extends TestReplicationBase {
sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
new String(famName), sourceSnapshotName, rootDir, fs, true);
Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
"--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
job = new VerifyReplication().createSubmittableJob(conf1, args);
if (job == null) {
fail("Job wasn't created, see the log");
}
if (!job.waitForCompletion(true)) {
fail("Job failed, see the log");
}
assertEquals(0,
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
assertEquals(NB_ROWS_IN_BATCH,
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
checkRestoreTmpDir(conf1, temPath1, 2);
checkRestoreTmpDir(conf2, temPath2, 2);
}
@Test
public void testVerifyRepJobWithQuorumAddress() throws Exception {
// Populate the tables, at the same time it guarantees that the tables are
// identical since it does the check
runSmallBatchTest();
// with a quorum address (a cluster key)
String[] args = new String[] { utility2.getClusterKey(), tableName.getNameAsString() };
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
Scan scan = new Scan();
ResultScanner rs = htable2.getScanner(scan);
Put put = null;
for (Result result : rs) {
put = new Put(result.getRow());
Cell firstVal = result.rawCells()[0];
put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
Bytes.toBytes("diff data"));
htable2.put(put);
}
Delete delete = new Delete(put.getRow());
htable2.delete(delete);
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
}
@Test
public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception {
// Populate the tables, at the same time it guarantees that the tables are
// identical since it does the check
runSmallBatchTest();
// Take source and target tables snapshot
Path rootDir = FSUtils.getRootDir(conf1);
FileSystem fs = rootDir.getFileSystem(conf1);
String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
// Take target snapshot
Path peerRootDir = FSUtils.getRootDir(conf2);
FileSystem peerFs = peerRootDir.getFileSystem(conf2);
String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
String peerFSAddress = peerFs.getUri().toString();
String tmpPath1 = utility1.getRandomDir().toString();
String tmpPath2 = "/tmp" + System.currentTimeMillis();
String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
"--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
tableName.getNameAsString() };
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
checkRestoreTmpDir(conf1, tmpPath1, 1);
checkRestoreTmpDir(conf2, tmpPath2, 1);
Scan scan = new Scan();
ResultScanner rs = htable2.getScanner(scan);
Put put = null;
for (Result result : rs) {
put = new Put(result.getRow());
Cell firstVal = result.rawCells()[0];
put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
Bytes.toBytes("diff data"));
htable2.put(put);
}
Delete delete = new Delete(put.getRow());
htable2.delete(delete);
sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
"--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
tableName.getNameAsString() };
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
checkRestoreTmpDir(conf1, tmpPath1, 2);
checkRestoreTmpDir(conf2, tmpPath2, 2);
}
}