HBASE-10153 improve VerifyReplication to compute BADROWS more accurately (Jianwei)
This commit is contained in:
parent
a43f111f0d
commit
a2fe4d6700
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
|
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
|
||||||
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
||||||
import org.apache.hadoop.hbase.mapreduce.TableMapper;
|
import org.apache.hadoop.hbase.mapreduce.TableMapper;
|
||||||
|
import org.apache.hadoop.hbase.mapreduce.TableSplit;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
|
||||||
|
@ -81,9 +82,11 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
public static class Verifier
|
public static class Verifier
|
||||||
extends TableMapper<ImmutableBytesWritable, Put> {
|
extends TableMapper<ImmutableBytesWritable, Put> {
|
||||||
|
|
||||||
public static enum Counters {GOODROWS, BADROWS}
|
public static enum Counters {
|
||||||
|
GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
|
||||||
|
|
||||||
private ResultScanner replicatedScanner;
|
private ResultScanner replicatedScanner;
|
||||||
|
private Result currentCompareRowInPeerTable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Map method that compares every scanned row with the equivalent from
|
* Map method that compares every scanned row with the equivalent from
|
||||||
|
@ -114,6 +117,8 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
if (versions >= 0) {
|
if (versions >= 0) {
|
||||||
scan.setMaxVersions(versions);
|
scan.setMaxVersions(versions);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
|
||||||
HConnectionManager.execute(new HConnectable<Void>(conf) {
|
HConnectionManager.execute(new HConnectable<Void>(conf) {
|
||||||
@Override
|
@Override
|
||||||
public Void connect(HConnection conn) throws IOException {
|
public Void connect(HConnection conn) throws IOException {
|
||||||
|
@ -124,26 +129,64 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
|
TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
|
||||||
Table replicatedTable = new HTable(peerConf, tableName);
|
Table replicatedTable = new HTable(peerConf, tableName);
|
||||||
scan.setStartRow(value.getRow());
|
scan.setStartRow(value.getRow());
|
||||||
|
scan.setStopRow(tableSplit.getEndRow());
|
||||||
replicatedScanner = replicatedTable.getScanner(scan);
|
replicatedScanner = replicatedTable.getScanner(scan);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
currentCompareRowInPeerTable = replicatedScanner.next();
|
||||||
}
|
}
|
||||||
Result res = replicatedScanner.next();
|
while (true) {
|
||||||
try {
|
if (currentCompareRowInPeerTable == null) {
|
||||||
Result.compareResults(value, res);
|
// reach the region end of peer table, row only in source table
|
||||||
context.getCounter(Counters.GOODROWS).increment(1);
|
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
|
||||||
} catch (Exception e) {
|
break;
|
||||||
LOG.warn("Bad row", e);
|
}
|
||||||
context.getCounter(Counters.BADROWS).increment(1);
|
int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
|
||||||
|
if (rowCmpRet == 0) {
|
||||||
|
// rowkey is same, need to compare the content of the row
|
||||||
|
try {
|
||||||
|
Result.compareResults(value, currentCompareRowInPeerTable);
|
||||||
|
context.getCounter(Counters.GOODROWS).increment(1);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
|
||||||
|
}
|
||||||
|
currentCompareRowInPeerTable = replicatedScanner.next();
|
||||||
|
break;
|
||||||
|
} else if (rowCmpRet < 0) {
|
||||||
|
// row only exists in source table
|
||||||
|
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
// row only exists in peer table
|
||||||
|
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
|
||||||
|
currentCompareRowInPeerTable);
|
||||||
|
currentCompareRowInPeerTable = replicatedScanner.next();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
|
||||||
|
context.getCounter(counter).increment(1);
|
||||||
|
context.getCounter(Counters.BADROWS).increment(1);
|
||||||
|
LOG.error(counter.toString() + ", rowkey=" + Bytes.toString(row.getRow()));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void cleanup(Context context) {
|
protected void cleanup(Context context) {
|
||||||
if (replicatedScanner != null) {
|
if (replicatedScanner != null) {
|
||||||
replicatedScanner.close();
|
try {
|
||||||
replicatedScanner = null;
|
while (currentCompareRowInPeerTable != null) {
|
||||||
|
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
|
||||||
|
currentCompareRowInPeerTable);
|
||||||
|
currentCompareRowInPeerTable = replicatedScanner.next();
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("fail to scan peer table in cleanup", e);
|
||||||
|
} finally {
|
||||||
|
replicatedScanner.close();
|
||||||
|
replicatedScanner = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue