HBASE-21871 Added support to specify a peer table name in VerifyReplication tool

Signed-off-by: Toshihiro Suzuki <brfrn169@gmail.com>
This commit is contained in:
subrat.mishra 2019-03-05 12:12:59 +05:30 committed by Toshihiro Suzuki
parent 11badde248
commit a51e584d7e
3 changed files with 163 additions and 7 deletions

View File

@ -114,6 +114,8 @@ public class VerifyReplication extends Configured implements Tool {
String peerFSAddress = null; String peerFSAddress = null;
//Peer cluster HBase root dir location //Peer cluster HBase root dir location
String peerHBaseRootAddress = null; String peerHBaseRootAddress = null;
//Peer Table Name
String peerTableName = null;
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
@ -192,8 +194,10 @@ public class VerifyReplication extends Configured implements Tool {
Configuration peerConf = HBaseConfiguration.createClusterConf(conf, Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
zkClusterKey, PEER_CONFIG_PREFIX); zkClusterKey, PEER_CONFIG_PREFIX);
String peerName = peerConf.get(NAME + ".peerTableName", tableName.getNameAsString());
TableName peerTableName = TableName.valueOf(peerName);
replicatedConnection = ConnectionFactory.createConnection(peerConf); replicatedConnection = ConnectionFactory.createConnection(peerConf);
replicatedTable = replicatedConnection.getTable(tableName); replicatedTable = replicatedConnection.getTable(peerTableName);
scan.setStartRow(value.getRow()); scan.setStartRow(value.getRow());
byte[] endRow = null; byte[] endRow = null;
@ -419,6 +423,11 @@ public class VerifyReplication extends Configured implements Tool {
conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
} }
if (peerTableName != null) {
LOG.info("Peer Table Name: " + peerTableName);
conf.set(NAME + ".peerTableName", peerTableName);
}
conf.setInt(NAME + ".versions", versions); conf.setInt(NAME + ".versions", versions);
LOG.info("Number of version: " + versions); LOG.info("Number of version: " + versions);
@ -617,6 +626,12 @@ public class VerifyReplication extends Configured implements Tool {
continue; continue;
} }
final String peerTableNameArgKey = "--peerTableName=";
if (cmd.startsWith(peerTableNameArgKey)) {
peerTableName = cmd.substring(peerTableNameArgKey.length());
continue;
}
if (cmd.startsWith("--")) { if (cmd.startsWith("--")) {
printUsage("Invalid argument '" + cmd + "'"); printUsage("Invalid argument '" + cmd + "'");
return false; return false;
@ -685,11 +700,11 @@ public class VerifyReplication extends Configured implements Tool {
if (errorMsg != null && errorMsg.length() > 0) { if (errorMsg != null && errorMsg.length() > 0) {
System.err.println("ERROR: " + errorMsg); System.err.println("ERROR: " + errorMsg);
} }
System.err.println("Usage: verifyrep [--starttime=X]" + System.err.println("Usage: verifyrep [--starttime=X]"
" [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " + + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] "
"[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] " + "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] "
+ "[--peerSnapshotName=R] [--peerSnapshotTmpDir=S] [--peerFSAddress=T] " + "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] "
+ "[--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>"); + "[--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>");
System.err.println(); System.err.println();
System.err.println("Options:"); System.err.println("Options:");
System.err.println(" starttime beginning of the time range"); System.err.println(" starttime beginning of the time range");
@ -705,6 +720,7 @@ public class VerifyReplication extends Configured implements Tool {
System.err.println(" recomparesleep milliseconds to sleep before recompare row, " + System.err.println(" recomparesleep milliseconds to sleep before recompare row, " +
"default value is 0 which disables the recompare."); "default value is 0 which disables the recompare.");
System.err.println(" verbose logs row keys of good rows"); System.err.println(" verbose logs row keys of good rows");
System.err.println(" peerTableName Peer Table Name");
System.err.println(" sourceSnapshotName Source Snapshot Name"); System.err.println(" sourceSnapshotName Source Snapshot Name");
System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot"); System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot");
System.err.println(" peerSnapshotName Peer Snapshot Name"); System.err.println(" peerSnapshotName Peer Snapshot Name");

View File

@ -25,7 +25,10 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap; import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -57,7 +60,9 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -78,6 +83,8 @@ public class TestVerifyReplication extends TestReplicationBase {
private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplication.class); private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplication.class);
private static final String PEER_ID = "2"; private static final String PEER_ID = "2";
private static final TableName peerTableName = TableName.valueOf("peerTest");
private static Table htable3;
@Rule @Rule
public TestName name = new TestName(); public TestName name = new TestName();
@ -85,6 +92,22 @@ public class TestVerifyReplication extends TestReplicationBase {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
cleanUp(); cleanUp();
utility2.deleteTableData(peerTableName);
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TestReplicationBase.setUpBeforeClass();
TableDescriptor peerTable = TableDescriptorBuilder.newBuilder(peerTableName).setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100)
.build()).build();
Connection connection2 = ConnectionFactory.createConnection(conf2);
try (Admin admin2 = connection2.getAdmin()) {
admin2.createTable(peerTable, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
htable3 = connection2.getTable(peerTableName);
} }
private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
@ -561,4 +584,117 @@ public class TestVerifyReplication extends TestReplicationBase {
checkRestoreTmpDir(conf1, tmpPath1, 2); checkRestoreTmpDir(conf1, tmpPath1, 2);
checkRestoreTmpDir(conf2, tmpPath2, 2); checkRestoreTmpDir(conf2, tmpPath2, 2);
} }
private static void runBatchCopyTest() throws Exception {
// normal Batch tests for htable1
loadData("", row, noRepfamName);
Scan scan1 = new Scan();
List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
ResultScanner scanner1 = htable1.getScanner(scan1);
Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
for (Result result : res1) {
Put put = new Put(result.getRow());
for (Cell cell : result.rawCells()) {
put.add(cell);
}
puts.add(put);
}
scanner1.close();
assertEquals(NB_ROWS_IN_BATCH, res1.length);
// Copy the data to htable3
htable3.put(puts);
Scan scan2 = new Scan();
ResultScanner scanner2 = htable3.getScanner(scan2);
Result[] res2 = scanner2.next(NB_ROWS_IN_BATCH);
scanner2.close();
assertEquals(NB_ROWS_IN_BATCH, res2.length);
}
@Test
public void testVerifyRepJobWithPeerTableName() throws Exception {
// Populate the tables with same data
runBatchCopyTest();
// with a peerTableName along with quorum address (a cluster key)
String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
utility2.getClusterKey(), tableName.getNameAsString() };
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
utility2.deleteTableData(peerTableName);
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
}
@Test
public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Exception {
// Populate the tables with same data
runBatchCopyTest();
// Take source and target tables snapshot
Path rootDir = FSUtils.getRootDir(conf1);
FileSystem fs = rootDir.getFileSystem(conf1);
String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
// Take target snapshot
Path peerRootDir = FSUtils.getRootDir(conf2);
FileSystem peerFs = peerRootDir.getFileSystem(conf2);
String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), peerTableName,
Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
String peerFSAddress = peerFs.getUri().toString();
String tmpPath1 = utility1.getRandomDir().toString();
String tmpPath2 = "/tmp" + System.currentTimeMillis();
String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
"--sourceSnapshotName=" + sourceSnapshotName,
"--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
tableName.getNameAsString() };
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
checkRestoreTmpDir(conf1, tmpPath1, 1);
checkRestoreTmpDir(conf2, tmpPath2, 1);
Scan scan = new Scan();
ResultScanner rs = htable3.getScanner(scan);
Put put = null;
for (Result result : rs) {
put = new Put(result.getRow());
Cell firstVal = result.rawCells()[0];
put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
Bytes.toBytes("diff data"));
htable3.put(put);
}
Delete delete = new Delete(put.getRow());
htable3.delete(delete);
sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), peerTableName,
Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
"--sourceSnapshotName=" + sourceSnapshotName,
"--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
tableName.getNameAsString() };
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
checkRestoreTmpDir(conf1, tmpPath1, 2);
checkRestoreTmpDir(conf2, tmpPath2, 2);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
htable3.close();
TestReplicationBase.tearDownAfterClass();
}
} }

View File

@ -156,10 +156,14 @@ public class TestReplicationBase {
} }
protected static void loadData(String prefix, byte[] row) throws IOException { protected static void loadData(String prefix, byte[] row) throws IOException {
loadData(prefix, row, famName);
}
protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException {
List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH); List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i))); Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
put.addColumn(famName, row, row); put.addColumn(familyName, row, row);
puts.add(put); puts.add(put);
} }
htable1.put(puts); htable1.put(puts);