diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 37bb412b07c..20f77de5a11 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -96,6 +97,7 @@ public class VerifyReplication extends Configured implements Tool { String families = null; String delimiter = ""; String peerId = null; + String peerQuorumAddress = null; String rowPrefixes = null; int sleepMsBeforeReCompare = 0; boolean verbose = false; @@ -385,7 +387,6 @@ public class VerifyReplication extends Configured implements Tool { if (!doCommandLine(args)) { return null; } - conf.set(NAME+".peerId", peerId); conf.set(NAME+".tableName", tableName); conf.setLong(NAME+".startTime", startTime); conf.setLong(NAME+".endTime", endTime); @@ -401,14 +402,23 @@ public class VerifyReplication extends Configured implements Tool { conf.set(NAME+".rowPrefixes", rowPrefixes); } - Pair peerConfigPair = getPeerQuorumConfig(conf, peerId); - ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); - String peerQuorumAddress = peerConfig.getClusterKey(); - LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " + + String peerQuorumAddress; + Pair peerConfigPair = null; + if (peerId != null) { + peerConfigPair = getPeerQuorumConfig(conf, peerId); + ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); + peerQuorumAddress = peerConfig.getClusterKey(); + LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " + peerConfig.getConfiguration()); - conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); - HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX, + conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); + HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX, peerConfig.getConfiguration().entrySet()); + } else { + assert this.peerQuorumAddress != null; + peerQuorumAddress = this.peerQuorumAddress; + LOG.info("Peer Quorum Address: " + peerQuorumAddress); + conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); + } conf.setInt(NAME + ".versions", versions); LOG.info("Number of version: " + versions); @@ -463,9 +473,13 @@ public class VerifyReplication extends Configured implements Tool { } else { TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); } - Configuration peerClusterConf = peerConfigPair.getSecond(); - // Obtain the auth token from peer cluster - TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); + + if (peerId != null) { + assert peerConfigPair != null; + Configuration peerClusterConf = peerConfigPair.getSecond(); + // Obtain the auth token from peer cluster + TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); + } job.setOutputFormatClass(NullOutputFormat.class); job.setNumReduceTasks(0); @@ -610,7 +624,11 @@ public class VerifyReplication extends Configured implements Tool { } if (i == args.length-2) { - peerId = cmd; + if (isPeerQuorumAddress(cmd)) { + peerQuorumAddress = cmd; + } else { + peerId = cmd; + } } if (i == args.length-1) { @@ -651,6 +669,16 @@ public class VerifyReplication extends Configured implements Tool { return true; } + private boolean isPeerQuorumAddress(String cmd) { + try { + ZKConfig.validateClusterKey(cmd); + } catch (IOException e) { + // not a quorum address + return false; + } + return true; + } + /* * @param errorMsg Error message. Can be null. */ @@ -660,8 +688,9 @@ public class VerifyReplication extends Configured implements Tool { } System.err.println("Usage: verifyrep [--starttime=X]" + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " + - "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] " - + "[--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U] "); + "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] " + + "[--peerSnapshotName=R] [--peerSnapshotTmpDir=S] [--peerFSAddress=T] " + + "[--peerHBaseRootAddress=U] "); System.err.println(); System.err.println("Options:"); System.err.println(" starttime beginning of the time range"); @@ -686,6 +715,8 @@ public class VerifyReplication extends Configured implements Tool { System.err.println(); System.err.println("Args:"); System.err.println(" peerid Id of the peer used for verification, must match the one given for replication"); + System.err.println(" peerQuorumAddress quorumAdress of the peer used for verification. The " + + "format is zk_quorum:zk_port:zk_hbase_path"); System.err.println(" tablename Name of the table to verify"); System.err.println(); System.err.println("Examples:"); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java index e1fda4eb0b8..8b52390bb0a 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java @@ -422,36 +422,24 @@ public class TestVerifyReplication extends TestReplicationBase { FileSystem fs = rootDir.getFileSystem(conf1); String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, - new String(famName), sourceSnapshotName, rootDir, fs, true); + Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); // Take target snapshot Path peerRootDir = FSUtils.getRootDir(conf2); FileSystem peerFs = peerRootDir.getFileSystem(conf2); String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName, - new String(famName), peerSnapshotName, peerRootDir, peerFs, true); + Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); String peerFSAddress = peerFs.getUri().toString(); String temPath1 = utility1.getRandomDir().toString(); - String temPath2 = "/tmp2"; + String temPath2 = "/tmp" + System.currentTimeMillis(); String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() }; - - Job job = new VerifyReplication().createSubmittableJob(conf1, args); - if (job == null) { - fail("Job wasn't created, see the log"); - } - if (!job.waitForCompletion(true)) { - fail("Job failed, see the log"); - } - assertEquals(NB_ROWS_IN_BATCH, - job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); - assertEquals(0, - job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); - + runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); checkRestoreTmpDir(conf1, temPath1, 1); checkRestoreTmpDir(conf2, temPath2, 1); @@ -470,30 +458,107 @@ public class TestVerifyReplication extends TestReplicationBase { sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, - new String(famName), sourceSnapshotName, rootDir, fs, true); + Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName, - new String(famName), peerSnapshotName, peerRootDir, peerFs, true); + Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() }; - - job = new VerifyReplication().createSubmittableJob(conf1, args); - if (job == null) { - fail("Job wasn't created, see the log"); - } - if (!job.waitForCompletion(true)) { - fail("Job failed, see the log"); - } - assertEquals(0, - job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); - assertEquals(NB_ROWS_IN_BATCH, - job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); - + runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); checkRestoreTmpDir(conf1, temPath1, 2); checkRestoreTmpDir(conf2, temPath2, 2); } + + @Test + public void testVerifyRepJobWithQuorumAddress() throws Exception { + // Populate the tables, at the same time it guarantees that the tables are + // identical since it does the check + runSmallBatchTest(); + + // with a quorum address (a cluster key) + String[] args = new String[] { utility2.getClusterKey(), tableName.getNameAsString() }; + runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); + + Scan scan = new Scan(); + ResultScanner rs = htable2.getScanner(scan); + Put put = null; + for (Result result : rs) { + put = new Put(result.getRow()); + Cell firstVal = result.rawCells()[0]; + put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), + Bytes.toBytes("diff data")); + htable2.put(put); + } + Delete delete = new Delete(put.getRow()); + htable2.delete(delete); + runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); + } + + @Test + public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception { + // Populate the tables, at the same time it guarantees that the tables are + // identical since it does the check + runSmallBatchTest(); + + // Take source and target tables snapshot + Path rootDir = FSUtils.getRootDir(conf1); + FileSystem fs = rootDir.getFileSystem(conf1); + String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); + SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, + Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); + + // Take target snapshot + Path peerRootDir = FSUtils.getRootDir(conf2); + FileSystem peerFs = peerRootDir.getFileSystem(conf2); + String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); + SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName, + Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); + + String peerFSAddress = peerFs.getUri().toString(); + String tmpPath1 = utility1.getRandomDir().toString(); + String tmpPath2 = "/tmp" + System.currentTimeMillis(); + + String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, + "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, + "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, + "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(), + tableName.getNameAsString() }; + runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); + checkRestoreTmpDir(conf1, tmpPath1, 1); + checkRestoreTmpDir(conf2, tmpPath2, 1); + + Scan scan = new Scan(); + ResultScanner rs = htable2.getScanner(scan); + Put put = null; + for (Result result : rs) { + put = new Put(result.getRow()); + Cell firstVal = result.rawCells()[0]; + put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), + Bytes.toBytes("diff data")); + htable2.put(put); + } + Delete delete = new Delete(put.getRow()); + htable2.delete(delete); + + sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); + SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, + Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); + + peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); + SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName, + Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); + + args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, + "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, + "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, + "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(), + tableName.getNameAsString() }; + runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); + checkRestoreTmpDir(conf1, tmpPath1, 2); + checkRestoreTmpDir(conf2, tmpPath2, 2); + } }