diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 63e1627eafa..2c338901bc8 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -114,6 +114,8 @@ public class VerifyReplication extends Configured implements Tool { String peerFSAddress = null; //Peer cluster HBase root dir location String peerHBaseRootAddress = null; + //Peer Table Name + String peerTableName = null; private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; @@ -192,8 +194,10 @@ public class VerifyReplication extends Configured implements Tool { Configuration peerConf = HBaseConfiguration.createClusterConf(conf, zkClusterKey, PEER_CONFIG_PREFIX); + String peerName = peerConf.get(NAME + ".peerTableName", tableName.getNameAsString()); + TableName peerTableName = TableName.valueOf(peerName); replicatedConnection = ConnectionFactory.createConnection(peerConf); - replicatedTable = replicatedConnection.getTable(tableName); + replicatedTable = replicatedConnection.getTable(peerTableName); scan.setStartRow(value.getRow()); byte[] endRow = null; @@ -419,6 +423,11 @@ public class VerifyReplication extends Configured implements Tool { conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); } + if (peerTableName != null) { + LOG.info("Peer Table Name: " + peerTableName); + conf.set(NAME + ".peerTableName", peerTableName); + } + conf.setInt(NAME + ".versions", versions); LOG.info("Number of version: " + versions); @@ -617,6 +626,12 @@ public class VerifyReplication extends Configured implements Tool { continue; } + final String peerTableNameArgKey = "--peerTableName="; + if (cmd.startsWith(peerTableNameArgKey)) { + peerTableName = cmd.substring(peerTableNameArgKey.length()); + continue; + } + if (cmd.startsWith("--")) { printUsage("Invalid argument '" + cmd + "'"); return false; @@ -685,11 +700,11 @@ public class VerifyReplication extends Configured implements Tool { if (errorMsg != null && errorMsg.length() > 0) { System.err.println("ERROR: " + errorMsg); } - System.err.println("Usage: verifyrep [--starttime=X]" + - " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " + - "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] " - + "[--peerSnapshotName=R] [--peerSnapshotTmpDir=S] [--peerFSAddress=T] " - + "[--peerHBaseRootAddress=U] "); + System.err.println("Usage: verifyrep [--starttime=X]" + + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " + + "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] " + + "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] " + + "[--peerFSAddress=T] [--peerHBaseRootAddress=U] "); System.err.println(); System.err.println("Options:"); System.err.println(" starttime beginning of the time range"); @@ -705,6 +720,7 @@ public class VerifyReplication extends Configured implements Tool { System.err.println(" recomparesleep milliseconds to sleep before recompare row, " + "default value is 0 which disables the recompare."); System.err.println(" verbose logs row keys of good rows"); + System.err.println(" peerTableName Peer Table Name"); System.err.println(" sourceSnapshotName Source Snapshot Name"); System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot"); System.err.println(" peerSnapshotName Peer Snapshot Name"); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java index 8b52390bb0a..4ef1214e636 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java @@ -25,7 +25,10 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.TreeMap; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -57,7 +60,9 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.mapreduce.Job; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -78,6 +83,8 @@ public class TestVerifyReplication extends TestReplicationBase { private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplication.class); private static final String PEER_ID = "2"; + private static final TableName peerTableName = TableName.valueOf("peerTest"); + private static Table htable3; @Rule public TestName name = new TestName(); @@ -85,6 +92,22 @@ public class TestVerifyReplication extends TestReplicationBase { @Before public void setUp() throws Exception { cleanUp(); + utility2.deleteTableData(peerTableName); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TestReplicationBase.setUpBeforeClass(); + + TableDescriptor peerTable = TableDescriptorBuilder.newBuilder(peerTableName).setColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100) + .build()).build(); + + Connection connection2 = ConnectionFactory.createConnection(conf2); + try (Admin admin2 = connection2.getAdmin()) { + admin2.createTable(peerTable, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + } + htable3 = connection2.getTable(peerTableName); } private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) @@ -561,4 +584,117 @@ public class TestVerifyReplication extends TestReplicationBase { checkRestoreTmpDir(conf1, tmpPath1, 2); checkRestoreTmpDir(conf2, tmpPath2, 2); } + + private static void runBatchCopyTest() throws Exception { + // normal Batch tests for htable1 + loadData("", row, noRepfamName); + + Scan scan1 = new Scan(); + List puts = new ArrayList<>(NB_ROWS_IN_BATCH); + ResultScanner scanner1 = htable1.getScanner(scan1); + Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH); + for (Result result : res1) { + Put put = new Put(result.getRow()); + for (Cell cell : result.rawCells()) { + put.add(cell); + } + puts.add(put); + } + scanner1.close(); + assertEquals(NB_ROWS_IN_BATCH, res1.length); + + // Copy the data to htable3 + htable3.put(puts); + + Scan scan2 = new Scan(); + ResultScanner scanner2 = htable3.getScanner(scan2); + Result[] res2 = scanner2.next(NB_ROWS_IN_BATCH); + scanner2.close(); + assertEquals(NB_ROWS_IN_BATCH, res2.length); + } + + @Test + public void testVerifyRepJobWithPeerTableName() throws Exception { + // Populate the tables with same data + runBatchCopyTest(); + + // with a peerTableName along with quorum address (a cluster key) + String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), + utility2.getClusterKey(), tableName.getNameAsString() }; + runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); + + utility2.deleteTableData(peerTableName); + runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); + } + + @Test + public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Exception { + // Populate the tables with same data + runBatchCopyTest(); + + // Take source and target tables snapshot + Path rootDir = FSUtils.getRootDir(conf1); + FileSystem fs = rootDir.getFileSystem(conf1); + String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); + SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, + Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true); + + // Take target snapshot + Path peerRootDir = FSUtils.getRootDir(conf2); + FileSystem peerFs = peerRootDir.getFileSystem(conf2); + String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); + SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), peerTableName, + Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true); + + String peerFSAddress = peerFs.getUri().toString(); + String tmpPath1 = utility1.getRandomDir().toString(); + String tmpPath2 = "/tmp" + System.currentTimeMillis(); + + String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), + "--sourceSnapshotName=" + sourceSnapshotName, + "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, + "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, + "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(), + tableName.getNameAsString() }; + runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); + checkRestoreTmpDir(conf1, tmpPath1, 1); + checkRestoreTmpDir(conf2, tmpPath2, 1); + + Scan scan = new Scan(); + ResultScanner rs = htable3.getScanner(scan); + Put put = null; + for (Result result : rs) { + put = new Put(result.getRow()); + Cell firstVal = result.rawCells()[0]; + put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), + Bytes.toBytes("diff data")); + htable3.put(put); + } + Delete delete = new Delete(put.getRow()); + htable3.delete(delete); + + sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); + SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName, + Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true); + + peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); + SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), peerTableName, + Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true); + + args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), + "--sourceSnapshotName=" + sourceSnapshotName, + "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, + "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, + "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(), + tableName.getNameAsString() }; + runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); + checkRestoreTmpDir(conf1, tmpPath1, 2); + checkRestoreTmpDir(conf2, tmpPath2, 2); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + htable3.close(); + TestReplicationBase.tearDownAfterClass(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index f96dbe5dc17..72ab2466503 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -156,10 +156,14 @@ public class TestReplicationBase { } protected static void loadData(String prefix, byte[] row) throws IOException { + loadData(prefix, row, famName); + } + + protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException { List puts = new ArrayList<>(NB_ROWS_IN_BATCH); for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i))); - put.addColumn(famName, row, row); + put.addColumn(familyName, row, row); puts.add(put); } htable1.put(puts);