HBASE-16423 Add re-compare option to VerifyReplication to avoid occasional inconsistent rows (Jianwei Cui)

This commit is contained in:
tedyu 2016-09-23 05:08:19 -07:00
parent 8a797e81b8
commit b503843bae
1 changed files with 37 additions and 1 deletions

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.HConnectable;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
@ -82,6 +84,7 @@ public class VerifyReplication extends Configured implements Tool {
static String delimiter = "";
static String peerId = null;
static String rowPrefixes = null;
static int sleepMsBeforeReCompare = 0;
/**
* Map-only comparator for 2 tables
@ -92,9 +95,11 @@ public class VerifyReplication extends Configured implements Tool {
public static enum Counters {
GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
private Table sourceTable;
private ResultScanner replicatedScanner;
private Result currentCompareRowInPeerTable;
private Table replicatedTable;
private int sleepMsBeforeReCompare;
/**
* Map method that compares every scanned row with the equivalent from
@ -110,6 +115,7 @@ public class VerifyReplication extends Configured implements Tool {
throws IOException {
if (replicatedScanner == null) {
Configuration conf = context.getConfiguration();
sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0);
final Scan scan = new Scan();
scan.setBatch(batch);
scan.setCacheBlocks(false);
@ -131,6 +137,7 @@ public class VerifyReplication extends Configured implements Tool {
if (versions >= 0) {
scan.setMaxVersions(versions);
}
sourceTable = new HTable(conf, tableName);
final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
HConnectionManager.execute(new HConnectable<Void>(conf) {
@ -182,6 +189,18 @@ public class VerifyReplication extends Configured implements Tool {
}
private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
if (sleepMsBeforeReCompare > 0) {
Threads.sleep(sleepMsBeforeReCompare);
try {
Result sourceResult = sourceTable.get(new Get(row.getRow()));
Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
Result.compareResults(sourceResult, replicatedResult);
return;
} catch (Exception e) {
LOG.error("recompare fail after sleep, rowkey=" + delimiter +
Bytes.toString(row.getRow()) + delimiter);
}
}
context.getCounter(counter).increment(1);
context.getCounter(Counters.BADROWS).increment(1);
LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toString(row.getRow()) +
@ -204,12 +223,20 @@ public class VerifyReplication extends Configured implements Tool {
replicatedScanner = null;
}
}
if (sourceTable != null) {
TableName tableName = replicatedTable.getName();
try {
sourceTable.close();
} catch (IOException ioe) {
LOG.warn("Exception closing source table: " + tableName, ioe);
}
}
if (replicatedTable != null) {
TableName tableName = replicatedTable.getName();
try {
replicatedTable.close();
} catch (IOException ioe) {
LOG.warn("Exception closing " + tableName, ioe);
LOG.warn("Exception closing replicated table: " + tableName, ioe);
}
}
}
@ -269,6 +296,7 @@ public class VerifyReplication extends Configured implements Tool {
conf.set(NAME+".tableName", tableName);
conf.setLong(NAME+".startTime", startTime);
conf.setLong(NAME+".endTime", endTime);
conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare);
if (families != null) {
conf.set(NAME+".families", families);
}
@ -404,6 +432,12 @@ public class VerifyReplication extends Configured implements Tool {
continue;
}
final String sleepToReCompareKey = "--recomparesleep=";
if (cmd.startsWith(sleepToReCompareKey)) {
sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
continue;
}
if (i == args.length-2) {
peerId = cmd;
}
@ -449,6 +483,8 @@ public class VerifyReplication extends Configured implements Tool {
System.err.println(" families comma-separated list of families to copy");
System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
System.err.println(" delimiter the delimiter used in display around rowkey");
System.err.println(" recomparesleep milliseconds to sleep before recompare row, " +
"default value is 0 which disables the recompare.");
System.err.println();
System.err.println("Args:");
System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");