From 8ade73c0cb05b6b2d88d838f642cc286b7cf90ed Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 4 Nov 2016 03:36:17 -0700 Subject: [PATCH] HBASE-16946 Provide Raw scan as an option in VerifyReplication (Sreekar Pallapothu) --- .../replication/VerifyReplication.java | 13 +++ .../TestReplicationSmallTests.java | 104 +++++++++++++++++- 2 files changed, 111 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index bf320eedcb3..e4d266f8e0e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -87,6 +87,7 @@ public class VerifyReplication extends Configured implements Tool { static String rowPrefixes = null; static int sleepMsBeforeReCompare = 0; static boolean verbose = false; + static boolean includeDeletedCells = false; /** * Map-only comparator for 2 tables @@ -135,6 +136,8 @@ public class VerifyReplication extends Configured implements Tool { scan.addFamily(Bytes.toBytes(fam)); } } + boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false); + scan.setRaw(includeDeletedCells); String rowPrefixes = conf.get(NAME + ".rowPrefixes", null); setRowPrefixFilter(scan, rowPrefixes); scan.setTimeRange(startTime, endTime); @@ -314,6 +317,7 @@ public class VerifyReplication extends Configured implements Tool { conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare); conf.set(NAME + ".delimiter", delimiter); conf.setBoolean(NAME +".verbose", verbose); + conf.setBoolean(NAME +".includeDeletedCells", includeDeletedCells); if (families != null) { conf.set(NAME+".families", families); } @@ -338,6 +342,7 @@ public class VerifyReplication extends Configured implements Tool { Scan scan = new Scan(); scan.setTimeRange(startTime, endTime); + scan.setRaw(includeDeletedCells); if (versions >= 0) { scan.setMaxVersions(versions); LOG.info("Number of versions set to " + versions); @@ -415,6 +420,12 @@ public class VerifyReplication extends Configured implements Tool { continue; } + final String includeDeletedCellsArgKey = "--raw"; + if (cmd.equals(includeDeletedCellsArgKey)) { + includeDeletedCells = true; + continue; + } + final String versionsArgKey = "--versions="; if (cmd.startsWith(versionsArgKey)) { versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); @@ -485,6 +496,7 @@ public class VerifyReplication extends Configured implements Tool { families = null; peerId = null; rowPrefixes = null; + includeDeletedCells = false; } /* @@ -503,6 +515,7 @@ public class VerifyReplication extends Configured implements Tool { System.err.println(" without endtime means from starttime to forever"); System.err.println(" endtime end of the time range"); System.err.println(" versions number of cell versions to verify"); + System.err.println(" raw includes raw scan if given in options"); System.err.println(" families comma-separated list of families to copy"); System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); System.err.println(" delimiter the delimiter used in display around rowkey"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 5e3c9b1d519..d56834c94f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -33,33 +33,34 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.ServerLoad; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.mapreduce.Job; import org.junit.Before; import org.junit.Test; @@ -487,6 +488,97 @@ public class TestReplicationSmallTests extends TestReplicationBase { runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); } + /** + * Load a row into a table, make sure the data is really the same, + * delete the row, make sure the delete marker is replicated, + * run verify replication with and without raw to check the results. + * @throws Exception + */ + @Test(timeout=300000) + public void testVerifyRepJobWithRawOptions() throws Exception { + LOG.info("testVerifyRepJobWithRawOptions"); + + TableName tablename = TableName.valueOf("test_raw"); + byte[] familyname = Bytes.toBytes("fam_raw"); + byte[] row = Bytes.toBytes("row_raw"); + + Table lHtable1 = null; + Table lHtable2 = null; + + try { + HTableDescriptor table = new HTableDescriptor(tablename); + HColumnDescriptor fam = new HColumnDescriptor(familyname); + fam.setMaxVersions(100); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + table.addFamily(fam); + + Connection connection1 = ConnectionFactory.createConnection(conf1); + Connection connection2 = ConnectionFactory.createConnection(conf2); + try (Admin admin1 = connection1.getAdmin()) { + admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + } + try (Admin admin2 = connection2.getAdmin()) { + admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + } + utility1.waitUntilAllRegionsAssigned(tablename); + utility2.waitUntilAllRegionsAssigned(tablename); + + lHtable1 = utility1.getConnection().getTable(tablename); + lHtable2 = utility2.getConnection().getTable(tablename); + + Put put = new Put(row); + put.addColumn(familyname, row, row); + lHtable1.put(put); + + Get get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for put replication"); + } + Result res = lHtable2.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.value(), row); + break; + } + } + + Delete del = new Delete(row); + lHtable1.delete(del); + + get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for del replication"); + } + Result res = lHtable2.get(get); + if (res.size() >= 1) { + LOG.info("Row not deleted"); + Thread.sleep(SLEEP_TIME); + } else { + break; + } + } + + // Checking verifyReplication for the default behavior. + String[] argsWithoutRaw = new String[] {PEER_ID, tablename.getNameAsString()}; + runVerifyReplication(argsWithoutRaw, 0, 0); + + // Checking verifyReplication with raw + String[] argsWithRawAsTrue = new String[] {"--raw", PEER_ID, tablename.getNameAsString()}; + runVerifyReplication(argsWithRawAsTrue, 1, 0); + } finally { + if (lHtable1 != null) { + lHtable1.close(); + } + if (lHtable2 != null) { + lHtable2.close(); + } + } + } + private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) throws IOException, InterruptedException, ClassNotFoundException { Job job = VerifyReplication.createSubmittableJob(new Configuration(CONF_WITH_LOCALFS), args);