HBASE-15847 VerifyReplication prefix filtering (Geoffrey Jacoby)

This commit is contained in:
tedyu 2016-05-19 06:38:27 -07:00
parent 460b41c800
commit a050e1d9f8
2 changed files with 117 additions and 58 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.mapreduce.replication; package org.apache.hadoop.hbase.mapreduce.replication;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -35,6 +36,9 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
@ -77,6 +81,7 @@ public class VerifyReplication extends Configured implements Tool {
static String tableName = null; static String tableName = null;
static String families = null; static String families = null;
static String peerId = null; static String peerId = null;
static String rowPrefixes = null;
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
@ -123,6 +128,8 @@ public class VerifyReplication extends Configured implements Tool {
scan.addFamily(Bytes.toBytes(fam)); scan.addFamily(Bytes.toBytes(fam));
} }
} }
String rowPrefixes = conf.get(NAME + ".rowPrefixes", null);
setRowPrefixFilter(scan, rowPrefixes);
scan.setTimeRange(startTime, endTime); scan.setTimeRange(startTime, endTime);
int versions = conf.getInt(NAME+".versions", -1); int versions = conf.getInt(NAME+".versions", -1);
LOG.info("Setting number of version inside map as: " + versions); LOG.info("Setting number of version inside map as: " + versions);
@ -271,6 +278,9 @@ public class VerifyReplication extends Configured implements Tool {
if (families != null) { if (families != null) {
conf.set(NAME+".families", families); conf.set(NAME+".families", families);
} }
if (rowPrefixes != null){
conf.set(NAME+".rowPrefixes", rowPrefixes);
}
Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf); Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf);
ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
@ -299,6 +309,9 @@ public class VerifyReplication extends Configured implements Tool {
scan.addFamily(Bytes.toBytes(fam)); scan.addFamily(Bytes.toBytes(fam));
} }
} }
setRowPrefixFilter(scan, rowPrefixes);
TableMapReduceUtil.initTableMapperJob(tableName, scan, TableMapReduceUtil.initTableMapperJob(tableName, scan,
Verifier.class, null, null, job); Verifier.class, null, null, job);
@ -311,11 +324,38 @@ public class VerifyReplication extends Configured implements Tool {
return job; return job;
} }
private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
String[] rowPrefixArray = rowPrefixes.split(",");
Arrays.sort(rowPrefixArray);
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
for (String prefix : rowPrefixArray) {
Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
filterList.addFilter(filter);
}
scan.setFilter(filterList);
byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]);
byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]);
setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
}
}
private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) {
scan.setStartRow(startPrefixRow);
byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1),
new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)});
scan.setStopRow(stopRow);
}
private static boolean doCommandLine(final String[] args) { private static boolean doCommandLine(final String[] args) {
if (args.length < 2) { if (args.length < 2) {
printUsage(null); printUsage(null);
return false; return false;
} }
//in case we've been run before, restore all parameters to their initial states
//Otherwise, if our previous run included a parameter not in args this time,
//we might hold on to the old value.
restoreDefaults();
try { try {
for (int i = 0; i < args.length; i++) { for (int i = 0; i < args.length; i++) {
String cmd = args[i]; String cmd = args[i];
@ -354,6 +394,12 @@ public class VerifyReplication extends Configured implements Tool {
continue; continue;
} }
final String rowPrefixesKey = "--row-prefixes=";
if (cmd.startsWith(rowPrefixesKey)){
rowPrefixes = cmd.substring(rowPrefixesKey.length());
continue;
}
if (i == args.length-2) { if (i == args.length-2) {
peerId = cmd; peerId = cmd;
} }
@ -370,6 +416,17 @@ public class VerifyReplication extends Configured implements Tool {
return true; return true;
} }
private static void restoreDefaults() {
startTime = 0;
endTime = Long.MAX_VALUE;
batch = Integer.MAX_VALUE;
versions = -1;
tableName = null;
families = null;
peerId = null;
rowPrefixes = null;
}
/* /*
* @param errorMsg Error message. Can be null. * @param errorMsg Error message. Can be null.
*/ */
@ -378,7 +435,7 @@ public class VerifyReplication extends Configured implements Tool {
System.err.println("ERROR: " + errorMsg); System.err.println("ERROR: " + errorMsg);
} }
System.err.println("Usage: verifyrep [--starttime=X]" + System.err.println("Usage: verifyrep [--starttime=X]" +
" [--stoptime=Y] [--families=A] <peerid> <tablename>"); " [--stoptime=Y] [--families=A] [--row-prefixes=B] <peerid> <tablename>");
System.err.println(); System.err.println();
System.err.println("Options:"); System.err.println("Options:");
System.err.println(" starttime beginning of the time range"); System.err.println(" starttime beginning of the time range");
@ -386,6 +443,7 @@ public class VerifyReplication extends Configured implements Tool {
System.err.println(" endtime end of the time range"); System.err.println(" endtime end of the time range");
System.err.println(" versions number of cell versions to verify"); System.err.println(" versions number of cell versions to verify");
System.err.println(" families comma-separated list of families to copy"); System.err.println(" families comma-separated list of families to copy");
System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
System.err.println(); System.err.println();
System.err.println("Args:"); System.err.println("Args:");
System.err.println(" peerid Id of the peer used for verification, must match the one given for replication"); System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -31,6 +32,7 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.ClusterStatus;
@ -72,6 +74,7 @@ import org.junit.experimental.categories.Category;
public class TestReplicationSmallTests extends TestReplicationBase { public class TestReplicationSmallTests extends TestReplicationBase {
private static final Log LOG = LogFactory.getLog(TestReplicationSmallTests.class); private static final Log LOG = LogFactory.getLog(TestReplicationSmallTests.class);
private static final String PEER_ID = "2";
/** /**
* @throws java.lang.Exception * @throws java.lang.Exception
@ -84,6 +87,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
utility1.getHBaseCluster().getRegionServerThreads()) { utility1.getHBaseCluster().getRegionServerThreads()) {
utility1.getHBaseAdmin().rollWALWriter(r.getRegionServer().getServerName()); utility1.getHBaseAdmin().rollWALWriter(r.getRegionServer().getServerName());
} }
int rowCount = utility1.countRows(tableName);
utility1.deleteTableData(tableName); utility1.deleteTableData(tableName);
// truncating the table will send one Delete per row to the slave cluster // truncating the table will send one Delete per row to the slave cluster
// in an async fashion, which is why we cannot just call deleteTableData on // in an async fashion, which is why we cannot just call deleteTableData on
@ -97,7 +101,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
fail("Waited too much time for truncate"); fail("Waited too much time for truncate");
} }
ResultScanner scanner = htable2.getScanner(scan); ResultScanner scanner = htable2.getScanner(scan);
Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); Result[] res = scanner.next(rowCount);
scanner.close(); scanner.close();
if (res.length != 0) { if (res.length != 0) {
if (res.length < lastCount) { if (res.length < lastCount) {
@ -254,13 +258,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
public void testSmallBatch() throws Exception { public void testSmallBatch() throws Exception {
LOG.info("testSmallBatch"); LOG.info("testSmallBatch");
// normal Batch tests // normal Batch tests
List<Put> puts = new ArrayList<>(); loadData("", row);
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put put = new Put(Bytes.toBytes(i));
put.addColumn(famName, row, row);
puts.add(put);
}
htable1.put(puts);
Scan scan = new Scan(); Scan scan = new Scan();
@ -269,15 +267,20 @@ public class TestReplicationSmallTests extends TestReplicationBase {
scanner1.close(); scanner1.close();
assertEquals(NB_ROWS_IN_BATCH, res1.length); assertEquals(NB_ROWS_IN_BATCH, res1.length);
for (int i = 0; i < NB_RETRIES; i++) { waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
}
private void waitForReplication(int expectedRows, int retries) throws IOException, InterruptedException {
Scan scan;
for (int i = 0; i < retries; i++) {
scan = new Scan(); scan = new Scan();
if (i==NB_RETRIES-1) { if (i== retries -1) {
fail("Waited too much time for normal batch replication"); fail("Waited too much time for normal batch replication");
} }
ResultScanner scanner = htable2.getScanner(scan); ResultScanner scanner = htable2.getScanner(scan);
Result[] res = scanner.next(NB_ROWS_IN_BATCH); Result[] res = scanner.next(expectedRows);
scanner.close(); scanner.close();
if (res.length != NB_ROWS_IN_BATCH) { if (res.length != expectedRows) {
LOG.info("Only got " + res.length + " rows"); LOG.info("Only got " + res.length + " rows");
Thread.sleep(SLEEP_TIME); Thread.sleep(SLEEP_TIME);
} else { } else {
@ -286,6 +289,16 @@ public class TestReplicationSmallTests extends TestReplicationBase {
} }
} }
private void loadData(String prefix, byte[] row) throws IOException {
List<Put> puts = new ArrayList<>();
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
put.addColumn(famName, row, row);
puts.add(put);
}
htable1.put(puts);
}
/** /**
* Test disable/enable replication, trying to insert, make sure nothing's * Test disable/enable replication, trying to insert, make sure nothing's
* replicated, enable it, the insert should be replicated * replicated, enable it, the insert should be replicated
@ -296,7 +309,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
public void testDisableEnable() throws Exception { public void testDisableEnable() throws Exception {
// Test disabling replication // Test disabling replication
admin.disablePeer("2"); admin.disablePeer(PEER_ID);
byte[] rowkey = Bytes.toBytes("disable enable"); byte[] rowkey = Bytes.toBytes("disable enable");
Put put = new Put(rowkey); Put put = new Put(rowkey);
@ -315,7 +328,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
} }
// Test enable replication // Test enable replication
admin.enablePeer("2"); admin.enablePeer(PEER_ID);
for (int i = 0; i < NB_RETRIES; i++) { for (int i = 0; i < NB_RETRIES; i++) {
Result res = htable2.get(get); Result res = htable2.get(get);
@ -339,7 +352,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
@Test(timeout=300000) @Test(timeout=300000)
public void testAddAndRemoveClusters() throws Exception { public void testAddAndRemoveClusters() throws Exception {
LOG.info("testAddAndRemoveClusters"); LOG.info("testAddAndRemoveClusters");
admin.removePeer("2"); admin.removePeer(PEER_ID);
Thread.sleep(SLEEP_TIME); Thread.sleep(SLEEP_TIME);
byte[] rowKey = Bytes.toBytes("Won't be replicated"); byte[] rowKey = Bytes.toBytes("Won't be replicated");
Put put = new Put(rowKey); Put put = new Put(rowKey);
@ -361,7 +374,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
} }
ReplicationPeerConfig rpc = new ReplicationPeerConfig(); ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey()); rpc.setClusterKey(utility2.getClusterKey());
admin.addPeer("2", rpc, null); admin.addPeer(PEER_ID, rpc, null);
Thread.sleep(SLEEP_TIME); Thread.sleep(SLEEP_TIME);
rowKey = Bytes.toBytes("do rep"); rowKey = Bytes.toBytes("do rep");
put = new Put(rowKey); put = new Put(rowKey);
@ -459,18 +472,8 @@ public class TestReplicationSmallTests extends TestReplicationBase {
// identical since it does the check // identical since it does the check
testSmallBatch(); testSmallBatch();
String[] args = new String[] {"2", tableName.getNameAsString()}; String[] args = new String[] {PEER_ID, tableName.getNameAsString()};
Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args); runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
if (job == null) {
fail("Job wasn't created, see the log");
}
if (!job.waitForCompletion(true)) {
fail("Job failed, see the log");
}
assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
assertEquals(0, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
Scan scan = new Scan(); Scan scan = new Scan();
ResultScanner rs = htable2.getScanner(scan); ResultScanner rs = htable2.getScanner(scan);
@ -484,16 +487,21 @@ public class TestReplicationSmallTests extends TestReplicationBase {
} }
Delete delete = new Delete(put.getRow()); Delete delete = new Delete(put.getRow());
htable2.delete(delete); htable2.delete(delete);
job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args); runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
}
private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
throws IOException, InterruptedException, ClassNotFoundException {
Job job = VerifyReplication.createSubmittableJob(new Configuration(CONF_WITH_LOCALFS), args);
if (job == null) { if (job == null) {
fail("Job wasn't created, see the log"); fail("Job wasn't created, see the log");
} }
if (!job.waitForCompletion(true)) { if (!job.waitForCompletion(true)) {
fail("Job failed, see the log"); fail("Job failed, see the log");
} }
assertEquals(0, job.getCounters(). assertEquals(expectedGoodRows, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
assertEquals(NB_ROWS_IN_BATCH, job.getCounters(). assertEquals(expectedBadRows, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
} }
@ -556,18 +564,8 @@ public class TestReplicationSmallTests extends TestReplicationBase {
assertEquals(1, res1.length); assertEquals(1, res1.length);
assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size()); assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size());
String[] args = new String[] {"--versions=100", "2", tableName.getNameAsString()}; String[] args = new String[] {"--versions=100", PEER_ID, tableName.getNameAsString()};
Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args); runVerifyReplication(args, 0, 1);
if (job == null) {
fail("Job wasn't created, see the log");
}
if (!job.waitForCompletion(true)) {
fail("Job failed, see the log");
}
assertEquals(0, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
assertEquals(1, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
} }
@Test(timeout=300000) @Test(timeout=300000)
@ -618,7 +616,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
try { try {
// Disabling replication and modifying the particular version of the cell to validate the feature. // Disabling replication and modifying the particular version of the cell to validate the feature.
admin.disablePeer("2"); admin.disablePeer(PEER_ID);
Put put2 = new Put(Bytes.toBytes("r1")); Put put2 = new Put(Bytes.toBytes("r1"));
put2.addColumn(famName, qualifierName, ts +2, Bytes.toBytes("v99")); put2.addColumn(famName, qualifierName, ts +2, Bytes.toBytes("v99"));
htable2.put(put2); htable2.put(put2);
@ -631,21 +629,11 @@ public class TestReplicationSmallTests extends TestReplicationBase {
assertEquals(1, res1.length); assertEquals(1, res1.length);
assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
String[] args = new String[] {"--versions=100", "2", tableName.getNameAsString()}; String[] args = new String[] {"--versions=100", PEER_ID, tableName.getNameAsString()};
Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args); runVerifyReplication(args, 0, 1);
if (job == null) {
fail("Job wasn't created, see the log");
}
if (!job.waitForCompletion(true)) {
fail("Job failed, see the log");
}
assertEquals(0, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
assertEquals(1, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
} }
finally { finally {
admin.enablePeer("2"); admin.enablePeer(PEER_ID);
} }
} }
@ -803,5 +791,18 @@ public class TestReplicationSmallTests extends TestReplicationBase {
} }
} }
@Test(timeout=300000)
public void testVerifyReplicationPrefixFiltering() throws Exception {
final byte[] prefixRow = Bytes.toBytes("prefixrow");
final byte[] prefixRow2 = Bytes.toBytes("secondrow");
loadData("prefixrow", prefixRow);
loadData("secondrow", prefixRow2);
loadData("aaa", row);
loadData("zzz", row);
waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4);
String[] args = new String[] {"--row-prefixes=prefixrow,secondrow", PEER_ID,
tableName.getNameAsString()};
runVerifyReplication(args, NB_ROWS_IN_BATCH *2, 0);
}
} }