HBASE-16423 Add re-compare option to VerifyReplication to avoid occasional inconsistent rows (Jianwei Cui)

This commit is contained in:
tedyu 2016-09-22 21:01:22 -07:00
parent 191afc8eb1
commit 946c1ed8f8
1 changed files with 54 additions and 8 deletions

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
@ -83,6 +85,7 @@ public class VerifyReplication extends Configured implements Tool {
static String delimiter = ""; static String delimiter = "";
static String peerId = null; static String peerId = null;
static String rowPrefixes = null; static String rowPrefixes = null;
static int sleepMsBeforeReCompare = 0;
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
@ -97,10 +100,13 @@ public class VerifyReplication extends Configured implements Tool {
public static enum Counters { public static enum Counters {
GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS} GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
private Connection connection; private Connection sourceConnection;
private Table sourceTable;
private Connection replicatedConnection;
private Table replicatedTable; private Table replicatedTable;
private ResultScanner replicatedScanner; private ResultScanner replicatedScanner;
private Result currentCompareRowInPeerTable; private Result currentCompareRowInPeerTable;
private int sleepMsBeforeReCompare;
/** /**
* Map method that compares every scanned row with the equivalent from * Map method that compares every scanned row with the equivalent from
@ -116,6 +122,7 @@ public class VerifyReplication extends Configured implements Tool {
throws IOException { throws IOException {
if (replicatedScanner == null) { if (replicatedScanner == null) {
Configuration conf = context.getConfiguration(); Configuration conf = context.getConfiguration();
sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0);
final Scan scan = new Scan(); final Scan scan = new Scan();
scan.setBatch(batch); scan.setBatch(batch);
scan.setCacheBlocks(false); scan.setCacheBlocks(false);
@ -137,6 +144,9 @@ public class VerifyReplication extends Configured implements Tool {
if (versions >= 0) { if (versions >= 0) {
scan.setMaxVersions(versions); scan.setMaxVersions(versions);
} }
TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
sourceConnection = ConnectionFactory.createConnection(conf);
sourceTable = sourceConnection.getTable(tableName);
final TableSplit tableSplit = (TableSplit)(context.getInputSplit()); final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
@ -144,9 +154,8 @@ public class VerifyReplication extends Configured implements Tool {
Configuration peerConf = HBaseConfiguration.createClusterConf(conf, Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
zkClusterKey, PEER_CONFIG_PREFIX); zkClusterKey, PEER_CONFIG_PREFIX);
TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); replicatedConnection = ConnectionFactory.createConnection(peerConf);
connection = ConnectionFactory.createConnection(peerConf); replicatedTable = replicatedConnection.getTable(tableName);
replicatedTable = connection.getTable(tableName);
scan.setStartRow(value.getRow()); scan.setStartRow(value.getRow());
scan.setStopRow(tableSplit.getEndRow()); scan.setStopRow(tableSplit.getEndRow());
replicatedScanner = replicatedTable.getScanner(scan); replicatedScanner = replicatedTable.getScanner(scan);
@ -184,6 +193,18 @@ public class VerifyReplication extends Configured implements Tool {
} }
private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
if (sleepMsBeforeReCompare > 0) {
Threads.sleep(sleepMsBeforeReCompare);
try {
Result sourceResult = sourceTable.get(new Get(row.getRow()));
Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
Result.compareResults(sourceResult, replicatedResult);
return;
} catch (Exception e) {
LOG.error("recompare fail after sleep, rowkey=" + delimiter +
Bytes.toString(row.getRow()) + delimiter);
}
}
context.getCounter(counter).increment(1); context.getCounter(counter).increment(1);
context.getCounter(Counters.BADROWS).increment(1); context.getCounter(Counters.BADROWS).increment(1);
LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toString(row.getRow()) + LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toString(row.getRow()) +
@ -206,18 +227,34 @@ public class VerifyReplication extends Configured implements Tool {
replicatedScanner = null; replicatedScanner = null;
} }
} }
if (sourceTable != null) {
try {
sourceTable.close();
} catch (IOException e) {
LOG.error("fail to close source table in cleanup", e);
}
}
if(sourceConnection != null){
try {
sourceConnection.close();
} catch (Exception e) {
LOG.error("fail to close source connection in cleanup", e);
}
}
if(replicatedTable != null){ if(replicatedTable != null){
try{ try{
replicatedTable.close(); replicatedTable.close();
} catch (Exception e) { } catch (Exception e) {
LOG.error("fail to close table in cleanup", e); LOG.error("fail to close replicated table in cleanup", e);
} }
} }
if(connection != null){ if(replicatedConnection != null){
try { try {
connection.close(); replicatedConnection.close();
} catch (Exception e) { } catch (Exception e) {
LOG.error("fail to close connection in cleanup", e); LOG.error("fail to close replicated connection in cleanup", e);
} }
} }
} }
@ -273,6 +310,7 @@ public class VerifyReplication extends Configured implements Tool {
conf.set(NAME+".tableName", tableName); conf.set(NAME+".tableName", tableName);
conf.setLong(NAME+".startTime", startTime); conf.setLong(NAME+".startTime", startTime);
conf.setLong(NAME+".endTime", endTime); conf.setLong(NAME+".endTime", endTime);
conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare);
if (families != null) { if (families != null) {
conf.set(NAME+".families", families); conf.set(NAME+".families", families);
} }
@ -408,6 +446,12 @@ public class VerifyReplication extends Configured implements Tool {
continue; continue;
} }
final String sleepToReCompareKey = "--recomparesleep=";
if (cmd.startsWith(sleepToReCompareKey)) {
sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
continue;
}
if (i == args.length-2) { if (i == args.length-2) {
peerId = cmd; peerId = cmd;
} }
@ -453,6 +497,8 @@ public class VerifyReplication extends Configured implements Tool {
System.err.println(" families comma-separated list of families to copy"); System.err.println(" families comma-separated list of families to copy");
System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
System.err.println(" delimiter the delimiter used in display around rowkey"); System.err.println(" delimiter the delimiter used in display around rowkey");
System.err.println(" recomparesleep milliseconds to sleep before recompare row, " +
"default value is 0 which disables the recompare.");
System.err.println(); System.err.println();
System.err.println("Args:"); System.err.println("Args:");
System.err.println(" peerid Id of the peer used for verification, must match the one given for replication"); System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");