HBASE-15847 VerifyReplication prefix filtering (Geoffrey Jacoby)

This commit is contained in:
tedyu 2016-05-19 10:43:49 -07:00
parent 5eed14272b
commit 696a51d344
2 changed files with 118 additions and 61 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.mapreduce.replication;
import java.io.IOException;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -34,6 +35,9 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
@ -76,6 +80,7 @@ public class VerifyReplication extends Configured implements Tool {
static String tableName = null;
static String families = null;
static String peerId = null;
static String rowPrefixes = null;
/**
* Map-only comparator for 2 tables
@ -117,6 +122,8 @@ public class VerifyReplication extends Configured implements Tool {
scan.addFamily(Bytes.toBytes(fam));
}
}
String rowPrefixes = conf.get(NAME + ".rowPrefixes", null);
setRowPrefixFilter(scan, rowPrefixes);
scan.setTimeRange(startTime, endTime);
int versions = conf.getInt(NAME+".versions", -1);
LOG.info("Setting number of version inside map as: " + versions);
@ -263,6 +270,9 @@ public class VerifyReplication extends Configured implements Tool {
if (families != null) {
conf.set(NAME+".families", families);
}
if (rowPrefixes != null){
conf.set(NAME+".rowPrefixes", rowPrefixes);
}
Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf);
ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
@ -291,6 +301,9 @@ public class VerifyReplication extends Configured implements Tool {
scan.addFamily(Bytes.toBytes(fam));
}
}
setRowPrefixFilter(scan, rowPrefixes);
TableMapReduceUtil.initTableMapperJob(tableName, scan,
Verifier.class, null, null, job);
@ -303,11 +316,38 @@ public class VerifyReplication extends Configured implements Tool {
return job;
}
private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
String[] rowPrefixArray = rowPrefixes.split(",");
Arrays.sort(rowPrefixArray);
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
for (String prefix : rowPrefixArray) {
Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
filterList.addFilter(filter);
}
scan.setFilter(filterList);
byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]);
byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]);
setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
}
}
private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) {
scan.setStartRow(startPrefixRow);
byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1),
new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)});
scan.setStopRow(stopRow);
}
private static boolean doCommandLine(final String[] args) {
if (args.length < 2) {
printUsage(null);
return false;
}
//in case we've been run before, restore all parameters to their initial states
//Otherwise, if our previous run included a parameter not in args this time,
//we might hold on to the old value.
restoreDefaults();
try {
for (int i = 0; i < args.length; i++) {
String cmd = args[i];
@ -346,6 +386,12 @@ public class VerifyReplication extends Configured implements Tool {
continue;
}
final String rowPrefixesKey = "--row-prefixes=";
if (cmd.startsWith(rowPrefixesKey)){
rowPrefixes = cmd.substring(rowPrefixesKey.length());
continue;
}
if (i == args.length-2) {
peerId = cmd;
}
@ -362,6 +408,17 @@ public class VerifyReplication extends Configured implements Tool {
return true;
}
private static void restoreDefaults() {
startTime = 0;
endTime = Long.MAX_VALUE;
batch = Integer.MAX_VALUE;
versions = -1;
tableName = null;
families = null;
peerId = null;
rowPrefixes = null;
}
/*
* @param errorMsg Error message. Can be null.
*/
@ -370,7 +427,7 @@ public class VerifyReplication extends Configured implements Tool {
System.err.println("ERROR: " + errorMsg);
}
System.err.println("Usage: verifyrep [--starttime=X]" +
" [--stoptime=Y] [--families=A] <peerid> <tablename>");
" [--stoptime=Y] [--families=A] [--row-prefixes=B] <peerid> <tablename>");
System.err.println();
System.err.println("Options:");
System.err.println(" starttime beginning of the time range");
@ -378,6 +435,7 @@ public class VerifyReplication extends Configured implements Tool {
System.err.println(" endtime end of the time range");
System.err.println(" versions number of cell versions to verify");
System.err.println(" families comma-separated list of families to copy");
System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
System.err.println();
System.err.println("Args:");
System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");

View File

@ -23,13 +23,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ClusterStatus;
@ -64,13 +65,12 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.protobuf.ByteString;
import com.sun.tools.javac.code.Attribute.Array;
@Category(LargeTests.class)
public class TestReplicationSmallTests extends TestReplicationBase {
private static final Log LOG = LogFactory.getLog(TestReplicationSmallTests.class);
private static final String PEER_ID = "2";
/**
* @throws java.lang.Exception
@ -83,6 +83,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
utility1.getHBaseCluster().getRegionServerThreads()) {
utility1.getHBaseAdmin().rollWALWriter(r.getRegionServer().getServerName());
}
int rowCount = utility1.countRows(tableName);
utility1.deleteTableData(tableName);
// truncating the table will send one Delete per row to the slave cluster
// in an async fashion, which is why we cannot just call deleteTableData on
@ -96,7 +97,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
fail("Waited too much time for truncate");
}
ResultScanner scanner = htable2.getScanner(scan);
Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
Result[] res = scanner.next(rowCount);
scanner.close();
if (res.length != 0) {
if (res.length < lastCount) {
@ -253,13 +254,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
public void testSmallBatch() throws Exception {
LOG.info("testSmallBatch");
// normal Batch tests
List<Put> puts = new ArrayList<>();
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put put = new Put(Bytes.toBytes(i));
put.add(famName, row, row);
puts.add(put);
}
htable1.put(puts);
loadData("", row);
Scan scan = new Scan();
@ -268,15 +263,20 @@ public class TestReplicationSmallTests extends TestReplicationBase {
scanner1.close();
assertEquals(NB_ROWS_IN_BATCH, res1.length);
for (int i = 0; i < NB_RETRIES; i++) {
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
}
private void waitForReplication(int expectedRows, int retries) throws IOException, InterruptedException {
Scan scan;
for (int i = 0; i < retries; i++) {
scan = new Scan();
if (i==NB_RETRIES-1) {
if (i== retries -1) {
fail("Waited too much time for normal batch replication");
}
ResultScanner scanner = htable2.getScanner(scan);
Result[] res = scanner.next(NB_ROWS_IN_BATCH);
Result[] res = scanner.next(expectedRows);
scanner.close();
if (res.length != NB_ROWS_IN_BATCH) {
if (res.length != expectedRows) {
LOG.info("Only got " + res.length + " rows");
Thread.sleep(SLEEP_TIME);
} else {
@ -285,6 +285,16 @@ public class TestReplicationSmallTests extends TestReplicationBase {
}
}
private void loadData(String prefix, byte[] row) throws IOException {
List<Put> puts = new ArrayList<>();
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
put.addColumn(famName, row, row);
puts.add(put);
}
htable1.put(puts);
}
/**
* Test disable/enable replication, trying to insert, make sure nothing's
* replicated, enable it, the insert should be replicated
@ -295,7 +305,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
public void testDisableEnable() throws Exception {
// Test disabling replication
admin.disablePeer("2");
admin.disablePeer(PEER_ID);
byte[] rowkey = Bytes.toBytes("disable enable");
Put put = new Put(rowkey);
@ -314,7 +324,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
}
// Test enable replication
admin.enablePeer("2");
admin.enablePeer(PEER_ID);
for (int i = 0; i < NB_RETRIES; i++) {
Result res = htable2.get(get);
@ -338,7 +348,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
@Test(timeout=300000)
public void testAddAndRemoveClusters() throws Exception {
LOG.info("testAddAndRemoveClusters");
admin.removePeer("2");
admin.removePeer(PEER_ID);
Thread.sleep(SLEEP_TIME);
byte[] rowKey = Bytes.toBytes("Won't be replicated");
Put put = new Put(rowKey);
@ -457,18 +467,8 @@ public class TestReplicationSmallTests extends TestReplicationBase {
// identical since it does the check
testSmallBatch();
String[] args = new String[] {"2", tableName.getNameAsString()};
Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
if (job == null) {
fail("Job wasn't created, see the log");
}
if (!job.waitForCompletion(true)) {
fail("Job failed, see the log");
}
assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
assertEquals(0, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
String[] args = new String[] {PEER_ID, tableName.getNameAsString()};
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
Scan scan = new Scan();
ResultScanner rs = htable2.getScanner(scan);
@ -482,16 +482,21 @@ public class TestReplicationSmallTests extends TestReplicationBase {
}
Delete delete = new Delete(put.getRow());
htable2.delete(delete);
job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
}
private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
throws IOException, InterruptedException, ClassNotFoundException {
Job job = VerifyReplication.createSubmittableJob(new Configuration(CONF_WITH_LOCALFS), args);
if (job == null) {
fail("Job wasn't created, see the log");
}
if (!job.waitForCompletion(true)) {
fail("Job failed, see the log");
}
assertEquals(0, job.getCounters().
assertEquals(expectedGoodRows, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
assertEquals(expectedBadRows, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
}
@ -554,18 +559,8 @@ public class TestReplicationSmallTests extends TestReplicationBase {
assertEquals(1, res1.length);
assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size());
String[] args = new String[] {"--versions=100", "2", tableName.getNameAsString()};
Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
if (job == null) {
fail("Job wasn't created, see the log");
}
if (!job.waitForCompletion(true)) {
fail("Job failed, see the log");
}
assertEquals(0, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
assertEquals(1, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
String[] args = new String[] {"--versions=100", PEER_ID, tableName.getNameAsString()};
runVerifyReplication(args, 0, 1);
}
@Test(timeout=300000)
@ -616,7 +611,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
try {
// Disabling replication and modifying the particular version of the cell to validate the feature.
admin.disablePeer("2");
admin.disablePeer(PEER_ID);
Put put2 = new Put(Bytes.toBytes("r1"));
put2.addColumn(famName, qualifierName, ts +2, Bytes.toBytes("v99"));
htable2.put(put2);
@ -629,21 +624,11 @@ public class TestReplicationSmallTests extends TestReplicationBase {
assertEquals(1, res1.length);
assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
String[] args = new String[] {"--versions=100", "2", tableName.getNameAsString()};
Job job = VerifyReplication.createSubmittableJob(CONF_WITH_LOCALFS, args);
if (job == null) {
fail("Job wasn't created, see the log");
}
if (!job.waitForCompletion(true)) {
fail("Job failed, see the log");
}
assertEquals(0, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
assertEquals(1, job.getCounters().
findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
String[] args = new String[] {"--versions=100", PEER_ID, tableName.getNameAsString()};
runVerifyReplication(args, 0, 1);
}
finally {
admin.enablePeer("2");
admin.enablePeer(PEER_ID);
}
}
@ -758,4 +743,18 @@ public class TestReplicationSmallTests extends TestReplicationBase {
}
}
}
@Test(timeout=300000)
public void testVerifyReplicationPrefixFiltering() throws Exception {
final byte[] prefixRow = Bytes.toBytes("prefixrow");
final byte[] prefixRow2 = Bytes.toBytes("secondrow");
loadData("prefixrow", prefixRow);
loadData("secondrow", prefixRow2);
loadData("aaa", row);
loadData("zzz", row);
waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4);
String[] args = new String[] {"--row-prefixes=prefixrow,secondrow", PEER_ID,
tableName.getNameAsString()};
runVerifyReplication(args, NB_ROWS_IN_BATCH *2, 0);
}
}