HBASE-10153 improve VerifyReplication to compute BADROWS more accurately (Jianwei)
This commit is contained in:
parent
7219471081
commit
8dbf7b2238
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
|||
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableMapper;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableSplit;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
|
||||
|
@ -83,9 +84,11 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
public static class Verifier
|
||||
extends TableMapper<ImmutableBytesWritable, Put> {
|
||||
|
||||
public static enum Counters {GOODROWS, BADROWS}
|
||||
public static enum Counters {
|
||||
GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
|
||||
|
||||
private ResultScanner replicatedScanner;
|
||||
private Result currentCompareRowInPeerTable;
|
||||
|
||||
/**
|
||||
* Map method that compares every scanned row with the equivalent from
|
||||
|
@ -116,6 +119,8 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
if (versions >= 0) {
|
||||
scan.setMaxVersions(versions);
|
||||
}
|
||||
|
||||
final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
|
||||
HConnectionManager.execute(new HConnectable<Void>(conf) {
|
||||
@Override
|
||||
public Void connect(HConnection conn) throws IOException {
|
||||
|
@ -126,29 +131,67 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
|
||||
Table replicatedTable = new HTable(peerConf, tableName);
|
||||
scan.setStartRow(value.getRow());
|
||||
scan.setStopRow(tableSplit.getEndRow());
|
||||
replicatedScanner = replicatedTable.getScanner(scan);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
currentCompareRowInPeerTable = replicatedScanner.next();
|
||||
}
|
||||
Result res = replicatedScanner.next();
|
||||
while (true) {
|
||||
if (currentCompareRowInPeerTable == null) {
|
||||
// reach the region end of peer table, row only in source table
|
||||
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
|
||||
break;
|
||||
}
|
||||
int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
|
||||
if (rowCmpRet == 0) {
|
||||
// rowkey is same, need to compare the content of the row
|
||||
try {
|
||||
Result.compareResults(value, res);
|
||||
Result.compareResults(value, currentCompareRowInPeerTable);
|
||||
context.getCounter(Counters.GOODROWS).increment(1);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Bad row", e);
|
||||
context.getCounter(Counters.BADROWS).increment(1);
|
||||
logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
|
||||
}
|
||||
currentCompareRowInPeerTable = replicatedScanner.next();
|
||||
break;
|
||||
} else if (rowCmpRet < 0) {
|
||||
// row only exists in source table
|
||||
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
|
||||
break;
|
||||
} else {
|
||||
// row only exists in peer table
|
||||
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
|
||||
currentCompareRowInPeerTable);
|
||||
currentCompareRowInPeerTable = replicatedScanner.next();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
|
||||
context.getCounter(counter).increment(1);
|
||||
context.getCounter(Counters.BADROWS).increment(1);
|
||||
LOG.error(counter.toString() + ", rowkey=" + Bytes.toString(row.getRow()));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cleanup(Context context) {
|
||||
if (replicatedScanner != null) {
|
||||
try {
|
||||
while (currentCompareRowInPeerTable != null) {
|
||||
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
|
||||
currentCompareRowInPeerTable);
|
||||
currentCompareRowInPeerTable = replicatedScanner.next();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("fail to scan peer table in cleanup", e);
|
||||
} finally {
|
||||
replicatedScanner.close();
|
||||
replicatedScanner = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
|
||||
ZooKeeperWatcher localZKW = null;
|
||||
|
|
Loading…
Reference in New Issue