From 696a51d3444827426f0b32aa3dc04d5ced427d3a Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 19 May 2016 10:43:49 -0700 Subject: [PATCH] HBASE-15847 VerifyReplication prefix filtering (Geoffrey Jacoby) --- .../replication/VerifyReplication.java | 60 ++++++++- .../TestReplicationSmallTests.java | 119 +++++++++--------- 2 files changed, 118 insertions(+), 61 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index a19cea5c5a3..eed0fa7bbf0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mapreduce.replication; import java.io.IOException; +import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,6 +35,9 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; @@ -76,6 +80,7 @@ public class VerifyReplication extends Configured implements Tool { static String tableName = null; static String families = null; static String peerId = null; + static String rowPrefixes = null; /** * Map-only comparator for 2 tables @@ -117,6 +122,8 @@ public class VerifyReplication extends Configured implements Tool { scan.addFamily(Bytes.toBytes(fam)); } } + String rowPrefixes = conf.get(NAME + ".rowPrefixes", null); + setRowPrefixFilter(scan, rowPrefixes); scan.setTimeRange(startTime, endTime); int versions = conf.getInt(NAME+".versions", -1); LOG.info("Setting number of version inside map as: " + versions); @@ -263,6 +270,9 @@ public class VerifyReplication extends Configured implements Tool { if (families != null) { conf.set(NAME+".families", families); } + if (rowPrefixes != null){ + conf.set(NAME+".rowPrefixes", rowPrefixes); + } Pair peerConfigPair = getPeerQuorumConfig(conf); ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); @@ -291,6 +301,9 @@ public class VerifyReplication extends Configured implements Tool { scan.addFamily(Bytes.toBytes(fam)); } } + + setRowPrefixFilter(scan, rowPrefixes); + TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); @@ -303,11 +316,38 @@ public class VerifyReplication extends Configured implements Tool { return job; } + private static void setRowPrefixFilter(Scan scan, String rowPrefixes) { + if (rowPrefixes != null && !rowPrefixes.isEmpty()) { + String[] rowPrefixArray = rowPrefixes.split(","); + Arrays.sort(rowPrefixArray); + FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); + for (String prefix : rowPrefixArray) { + Filter filter = new PrefixFilter(Bytes.toBytes(prefix)); + filterList.addFilter(filter); + } + scan.setFilter(filterList); + byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]); + byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]); + setStartAndStopRows(scan, startPrefixRow, lastPrefixRow); + } + } + + private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) { + scan.setStartRow(startPrefixRow); + byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1), + new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)}); + scan.setStopRow(stopRow); + } + private static boolean doCommandLine(final String[] args) { if (args.length < 2) { printUsage(null); return false; } + //in case we've been run before, restore all parameters to their initial states + //Otherwise, if our previous run included a parameter not in args this time, + //we might hold on to the old value. + restoreDefaults(); try { for (int i = 0; i < args.length; i++) { String cmd = args[i]; @@ -346,6 +386,12 @@ public class VerifyReplication extends Configured implements Tool { continue; } + final String rowPrefixesKey = "--row-prefixes="; + if (cmd.startsWith(rowPrefixesKey)){ + rowPrefixes = cmd.substring(rowPrefixesKey.length()); + continue; + } + if (i == args.length-2) { peerId = cmd; } @@ -362,6 +408,17 @@ public class VerifyReplication extends Configured implements Tool { return true; } + private static void restoreDefaults() { + startTime = 0; + endTime = Long.MAX_VALUE; + batch = Integer.MAX_VALUE; + versions = -1; + tableName = null; + families = null; + peerId = null; + rowPrefixes = null; + } + /* * @param errorMsg Error message. Can be null. */ @@ -370,7 +427,7 @@ public class VerifyReplication extends Configured implements Tool { System.err.println("ERROR: " + errorMsg); } System.err.println("Usage: verifyrep [--starttime=X]" + - " [--stoptime=Y] [--families=A] "); + " [--stoptime=Y] [--families=A] [--row-prefixes=B] "); System.err.println(); System.err.println("Options:"); System.err.println(" starttime beginning of the time range"); @@ -378,6 +435,7 @@ public class VerifyReplication extends Configured implements Tool { System.err.println(" endtime end of the time range"); System.err.println(" versions number of cell versions to verify"); System.err.println(" families comma-separated list of families to copy"); + System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); System.err.println(); System.err.println("Args:"); System.err.println(" peerid Id of the peer used for verification, must match the one given for replication"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 42a127f485d..c293444585d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -23,13 +23,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.ClusterStatus; @@ -64,13 +65,12 @@ import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.protobuf.ByteString; -import com.sun.tools.javac.code.Attribute.Array; @Category(LargeTests.class) public class TestReplicationSmallTests extends TestReplicationBase { private static final Log LOG = LogFactory.getLog(TestReplicationSmallTests.class); + private static final String PEER_ID = "2"; /** * @throws java.lang.Exception @@ -83,6 +83,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { utility1.getHBaseCluster().getRegionServerThreads()) { utility1.getHBaseAdmin().rollWALWriter(r.getRegionServer().getServerName()); } + int rowCount = utility1.countRows(tableName); utility1.deleteTableData(tableName); // truncating the table will send one Delete per row to the slave cluster // in an async fashion, which is why we cannot just call deleteTableData on @@ -96,7 +97,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { fail("Waited too much time for truncate"); } ResultScanner scanner = htable2.getScanner(scan); - Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); + Result[] res = scanner.next(rowCount); scanner.close(); if (res.length != 0) { if (res.length < lastCount) { @@ -253,14 +254,8 @@ public class TestReplicationSmallTests extends TestReplicationBase { public void testSmallBatch() throws Exception { LOG.info("testSmallBatch"); // normal Batch tests - List puts = new ArrayList<>(); - for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { - Put put = new Put(Bytes.toBytes(i)); - put.add(famName, row, row); - puts.add(put); - } - htable1.put(puts); - + loadData("", row); + Scan scan = new Scan(); ResultScanner scanner1 = htable1.getScanner(scan); @@ -268,15 +263,20 @@ public class TestReplicationSmallTests extends TestReplicationBase { scanner1.close(); assertEquals(NB_ROWS_IN_BATCH, res1.length); - for (int i = 0; i < NB_RETRIES; i++) { + waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); + } + + private void waitForReplication(int expectedRows, int retries) throws IOException, InterruptedException { + Scan scan; + for (int i = 0; i < retries; i++) { scan = new Scan(); - if (i==NB_RETRIES-1) { + if (i== retries -1) { fail("Waited too much time for normal batch replication"); } ResultScanner scanner = htable2.getScanner(scan); - Result[] res = scanner.next(NB_ROWS_IN_BATCH); + Result[] res = scanner.next(expectedRows); scanner.close(); - if (res.length != NB_ROWS_IN_BATCH) { + if (res.length != expectedRows) { LOG.info("Only got " + res.length + " rows"); Thread.sleep(SLEEP_TIME); } else { @@ -285,6 +285,16 @@ public class TestReplicationSmallTests extends TestReplicationBase { } } + private void loadData(String prefix, byte[] row) throws IOException { + List puts = new ArrayList<>(); + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i))); + put.addColumn(famName, row, row); + puts.add(put); + } + htable1.put(puts); + } + /** * Test disable/enable replication, trying to insert, make sure nothing's * replicated, enable it, the insert should be replicated @@ -295,7 +305,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { public void testDisableEnable() throws Exception { // Test disabling replication - admin.disablePeer("2"); + admin.disablePeer(PEER_ID); byte[] rowkey = Bytes.toBytes("disable enable"); Put put = new Put(rowkey); @@ -314,7 +324,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { } // Test enable replication - admin.enablePeer("2"); + admin.enablePeer(PEER_ID); for (int i = 0; i < NB_RETRIES; i++) { Result res = htable2.get(get); @@ -338,7 +348,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { @Test(timeout=300000) public void testAddAndRemoveClusters() throws Exception { LOG.info("testAddAndRemoveClusters"); - admin.removePeer("2"); + admin.removePeer(PEER_ID); Thread.sleep(SLEEP_TIME); byte[] rowKey = Bytes.toBytes("Won't be replicated"); Put put = new Put(rowKey); @@ -457,18 +467,8 @@ public class TestReplicationSmallTests extends TestReplicationBase { // identical since it does the check testSmallBatch(); - String[] args = new String[] {"2", tableName.getNameAsString()}; - Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args); - if (job == null) { - fail("Job wasn't created, see the log"); - } - if (!job.waitForCompletion(true)) { - fail("Job failed, see the log"); - } - assertEquals(NB_ROWS_IN_BATCH, job.getCounters(). - findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); - assertEquals(0, job.getCounters(). - findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + String[] args = new String[] {PEER_ID, tableName.getNameAsString()}; + runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); Scan scan = new Scan(); ResultScanner rs = htable2.getScanner(scan); @@ -482,16 +482,21 @@ public class TestReplicationSmallTests extends TestReplicationBase { } Delete delete = new Delete(put.getRow()); htable2.delete(delete); - job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args); + runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); + } + + private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) + throws IOException, InterruptedException, ClassNotFoundException { + Job job = VerifyReplication.createSubmittableJob(new Configuration(CONF_WITH_LOCALFS), args); if (job == null) { fail("Job wasn't created, see the log"); } if (!job.waitForCompletion(true)) { fail("Job failed, see the log"); } - assertEquals(0, job.getCounters(). + assertEquals(expectedGoodRows, job.getCounters(). findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); - assertEquals(NB_ROWS_IN_BATCH, job.getCounters(). + assertEquals(expectedBadRows, job.getCounters(). findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); } @@ -554,18 +559,8 @@ public class TestReplicationSmallTests extends TestReplicationBase { assertEquals(1, res1.length); assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size()); - String[] args = new String[] {"--versions=100", "2", tableName.getNameAsString()}; - Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args); - if (job == null) { - fail("Job wasn't created, see the log"); - } - if (!job.waitForCompletion(true)) { - fail("Job failed, see the log"); - } - assertEquals(0, job.getCounters(). - findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); - assertEquals(1, job.getCounters(). - findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + String[] args = new String[] {"--versions=100", PEER_ID, tableName.getNameAsString()}; + runVerifyReplication(args, 0, 1); } @Test(timeout=300000) @@ -616,7 +611,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { try { // Disabling replication and modifying the particular version of the cell to validate the feature. - admin.disablePeer("2"); + admin.disablePeer(PEER_ID); Put put2 = new Put(Bytes.toBytes("r1")); put2.addColumn(famName, qualifierName, ts +2, Bytes.toBytes("v99")); htable2.put(put2); @@ -629,21 +624,11 @@ public class TestReplicationSmallTests extends TestReplicationBase { assertEquals(1, res1.length); assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); - String[] args = new String[] {"--versions=100", "2", tableName.getNameAsString()}; - Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args); - if (job == null) { - fail("Job wasn't created, see the log"); - } - if (!job.waitForCompletion(true)) { - fail("Job failed, see the log"); - } - assertEquals(0, job.getCounters(). - findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); - assertEquals(1, job.getCounters(). - findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + String[] args = new String[] {"--versions=100", PEER_ID, tableName.getNameAsString()}; + runVerifyReplication(args, 0, 1); } finally { - admin.enablePeer("2"); + admin.enablePeer(PEER_ID); } } @@ -758,4 +743,18 @@ public class TestReplicationSmallTests extends TestReplicationBase { } } } + + @Test(timeout=300000) + public void testVerifyReplicationPrefixFiltering() throws Exception { + final byte[] prefixRow = Bytes.toBytes("prefixrow"); + final byte[] prefixRow2 = Bytes.toBytes("secondrow"); + loadData("prefixrow", prefixRow); + loadData("secondrow", prefixRow2); + loadData("aaa", row); + loadData("zzz", row); + waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4); + String[] args = new String[] {"--row-prefixes=prefixrow,secondrow", PEER_ID, + tableName.getNameAsString()}; + runVerifyReplication(args, NB_ROWS_IN_BATCH *2, 0); + } }