HBASE-21201 Support to run VerifyReplication MR tool without peerid
Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
parent
28bc9a51e1
commit
b322d0a3e5
|
@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
|
@ -96,6 +97,7 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
String families = null;
|
||||
String delimiter = "";
|
||||
String peerId = null;
|
||||
String peerQuorumAddress = null;
|
||||
String rowPrefixes = null;
|
||||
int sleepMsBeforeReCompare = 0;
|
||||
boolean verbose = false;
|
||||
|
@ -385,7 +387,6 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
if (!doCommandLine(args)) {
|
||||
return null;
|
||||
}
|
||||
conf.set(NAME+".peerId", peerId);
|
||||
conf.set(NAME+".tableName", tableName);
|
||||
conf.setLong(NAME+".startTime", startTime);
|
||||
conf.setLong(NAME+".endTime", endTime);
|
||||
|
@ -401,14 +402,23 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
conf.set(NAME+".rowPrefixes", rowPrefixes);
|
||||
}
|
||||
|
||||
Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf, peerId);
|
||||
ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
|
||||
String peerQuorumAddress = peerConfig.getClusterKey();
|
||||
LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
|
||||
String peerQuorumAddress;
|
||||
Pair<ReplicationPeerConfig, Configuration> peerConfigPair = null;
|
||||
if (peerId != null) {
|
||||
peerConfigPair = getPeerQuorumConfig(conf, peerId);
|
||||
ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
|
||||
peerQuorumAddress = peerConfig.getClusterKey();
|
||||
LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
|
||||
peerConfig.getConfiguration());
|
||||
conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
|
||||
HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
|
||||
conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
|
||||
HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
|
||||
peerConfig.getConfiguration().entrySet());
|
||||
} else {
|
||||
assert this.peerQuorumAddress != null;
|
||||
peerQuorumAddress = this.peerQuorumAddress;
|
||||
LOG.info("Peer Quorum Address: " + peerQuorumAddress);
|
||||
conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
|
||||
}
|
||||
|
||||
conf.setInt(NAME + ".versions", versions);
|
||||
LOG.info("Number of version: " + versions);
|
||||
|
@ -463,9 +473,13 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
} else {
|
||||
TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
|
||||
}
|
||||
Configuration peerClusterConf = peerConfigPair.getSecond();
|
||||
// Obtain the auth token from peer cluster
|
||||
TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
|
||||
|
||||
if (peerId != null) {
|
||||
assert peerConfigPair != null;
|
||||
Configuration peerClusterConf = peerConfigPair.getSecond();
|
||||
// Obtain the auth token from peer cluster
|
||||
TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
|
||||
}
|
||||
|
||||
job.setOutputFormatClass(NullOutputFormat.class);
|
||||
job.setNumReduceTasks(0);
|
||||
|
@ -610,7 +624,11 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
}
|
||||
|
||||
if (i == args.length-2) {
|
||||
peerId = cmd;
|
||||
if (isPeerQuorumAddress(cmd)) {
|
||||
peerQuorumAddress = cmd;
|
||||
} else {
|
||||
peerId = cmd;
|
||||
}
|
||||
}
|
||||
|
||||
if (i == args.length-1) {
|
||||
|
@ -651,6 +669,16 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
return true;
|
||||
}
|
||||
|
||||
private boolean isPeerQuorumAddress(String cmd) {
|
||||
try {
|
||||
ZKConfig.validateClusterKey(cmd);
|
||||
} catch (IOException e) {
|
||||
// not a quorum address
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* @param errorMsg Error message. Can be null.
|
||||
*/
|
||||
|
@ -660,8 +688,9 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
}
|
||||
System.err.println("Usage: verifyrep [--starttime=X]" +
|
||||
" [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " +
|
||||
"[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] "
|
||||
+ "[--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid> <tablename>");
|
||||
"[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] "
|
||||
+ "[--peerSnapshotName=R] [--peerSnapshotTmpDir=S] [--peerFSAddress=T] "
|
||||
+ "[--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>");
|
||||
System.err.println();
|
||||
System.err.println("Options:");
|
||||
System.err.println(" starttime beginning of the time range");
|
||||
|
@ -686,6 +715,8 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
System.err.println();
|
||||
System.err.println("Args:");
|
||||
System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");
|
||||
System.err.println(" peerQuorumAddress quorumAdress of the peer used for verification. The "
|
||||
+ "format is zk_quorum:zk_port:zk_hbase_path");
|
||||
System.err.println(" tablename Name of the table to verify");
|
||||
System.err.println();
|
||||
System.err.println("Examples:");
|
||||
|
|
|
@ -422,36 +422,24 @@ public class TestVerifyReplication extends TestReplicationBase {
|
|||
FileSystem fs = rootDir.getFileSystem(conf1);
|
||||
String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
|
||||
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
|
||||
new String(famName), sourceSnapshotName, rootDir, fs, true);
|
||||
Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
|
||||
|
||||
// Take target snapshot
|
||||
Path peerRootDir = FSUtils.getRootDir(conf2);
|
||||
FileSystem peerFs = peerRootDir.getFileSystem(conf2);
|
||||
String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
|
||||
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
|
||||
new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
|
||||
Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
|
||||
|
||||
String peerFSAddress = peerFs.getUri().toString();
|
||||
String temPath1 = utility1.getRandomDir().toString();
|
||||
String temPath2 = "/tmp2";
|
||||
String temPath2 = "/tmp" + System.currentTimeMillis();
|
||||
|
||||
String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
|
||||
"--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
|
||||
"--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
|
||||
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
|
||||
|
||||
Job job = new VerifyReplication().createSubmittableJob(conf1, args);
|
||||
if (job == null) {
|
||||
fail("Job wasn't created, see the log");
|
||||
}
|
||||
if (!job.waitForCompletion(true)) {
|
||||
fail("Job failed, see the log");
|
||||
}
|
||||
assertEquals(NB_ROWS_IN_BATCH,
|
||||
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
|
||||
assertEquals(0,
|
||||
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
|
||||
|
||||
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
|
||||
checkRestoreTmpDir(conf1, temPath1, 1);
|
||||
checkRestoreTmpDir(conf2, temPath2, 1);
|
||||
|
||||
|
@ -470,30 +458,107 @@ public class TestVerifyReplication extends TestReplicationBase {
|
|||
|
||||
sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
|
||||
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
|
||||
new String(famName), sourceSnapshotName, rootDir, fs, true);
|
||||
Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
|
||||
|
||||
peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
|
||||
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
|
||||
new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
|
||||
Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
|
||||
|
||||
args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
|
||||
"--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
|
||||
"--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
|
||||
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
|
||||
|
||||
job = new VerifyReplication().createSubmittableJob(conf1, args);
|
||||
if (job == null) {
|
||||
fail("Job wasn't created, see the log");
|
||||
}
|
||||
if (!job.waitForCompletion(true)) {
|
||||
fail("Job failed, see the log");
|
||||
}
|
||||
assertEquals(0,
|
||||
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
|
||||
assertEquals(NB_ROWS_IN_BATCH,
|
||||
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
|
||||
|
||||
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
|
||||
checkRestoreTmpDir(conf1, temPath1, 2);
|
||||
checkRestoreTmpDir(conf2, temPath2, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVerifyRepJobWithQuorumAddress() throws Exception {
|
||||
// Populate the tables, at the same time it guarantees that the tables are
|
||||
// identical since it does the check
|
||||
runSmallBatchTest();
|
||||
|
||||
// with a quorum address (a cluster key)
|
||||
String[] args = new String[] { utility2.getClusterKey(), tableName.getNameAsString() };
|
||||
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
|
||||
|
||||
Scan scan = new Scan();
|
||||
ResultScanner rs = htable2.getScanner(scan);
|
||||
Put put = null;
|
||||
for (Result result : rs) {
|
||||
put = new Put(result.getRow());
|
||||
Cell firstVal = result.rawCells()[0];
|
||||
put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
|
||||
Bytes.toBytes("diff data"));
|
||||
htable2.put(put);
|
||||
}
|
||||
Delete delete = new Delete(put.getRow());
|
||||
htable2.delete(delete);
|
||||
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception {
|
||||
// Populate the tables, at the same time it guarantees that the tables are
|
||||
// identical since it does the check
|
||||
runSmallBatchTest();
|
||||
|
||||
// Take source and target tables snapshot
|
||||
Path rootDir = FSUtils.getRootDir(conf1);
|
||||
FileSystem fs = rootDir.getFileSystem(conf1);
|
||||
String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
|
||||
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
|
||||
Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
|
||||
|
||||
// Take target snapshot
|
||||
Path peerRootDir = FSUtils.getRootDir(conf2);
|
||||
FileSystem peerFs = peerRootDir.getFileSystem(conf2);
|
||||
String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
|
||||
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
|
||||
Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
|
||||
|
||||
String peerFSAddress = peerFs.getUri().toString();
|
||||
String tmpPath1 = utility1.getRandomDir().toString();
|
||||
String tmpPath2 = "/tmp" + System.currentTimeMillis();
|
||||
|
||||
String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
|
||||
"--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
|
||||
"--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
|
||||
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
|
||||
tableName.getNameAsString() };
|
||||
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
|
||||
checkRestoreTmpDir(conf1, tmpPath1, 1);
|
||||
checkRestoreTmpDir(conf2, tmpPath2, 1);
|
||||
|
||||
Scan scan = new Scan();
|
||||
ResultScanner rs = htable2.getScanner(scan);
|
||||
Put put = null;
|
||||
for (Result result : rs) {
|
||||
put = new Put(result.getRow());
|
||||
Cell firstVal = result.rawCells()[0];
|
||||
put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
|
||||
Bytes.toBytes("diff data"));
|
||||
htable2.put(put);
|
||||
}
|
||||
Delete delete = new Delete(put.getRow());
|
||||
htable2.delete(delete);
|
||||
|
||||
sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
|
||||
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
|
||||
Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
|
||||
|
||||
peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
|
||||
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
|
||||
Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
|
||||
|
||||
args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
|
||||
"--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
|
||||
"--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
|
||||
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
|
||||
tableName.getNameAsString() };
|
||||
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
|
||||
checkRestoreTmpDir(conf1, tmpPath1, 2);
|
||||
checkRestoreTmpDir(conf2, tmpPath2, 2);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue