diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index f425ba80c1c..01d489357f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.mapreduce.TableSplit; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl; @@ -83,9 +84,11 @@ public class VerifyReplication extends Configured implements Tool { public static class Verifier extends TableMapper { - public static enum Counters {GOODROWS, BADROWS} + public static enum Counters { + GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS} private ResultScanner replicatedScanner; + private Result currentCompareRowInPeerTable; /** * Map method that compares every scanned row with the equivalent from @@ -116,6 +119,8 @@ public class VerifyReplication extends Configured implements Tool { if (versions >= 0) { scan.setMaxVersions(versions); } + + final TableSplit tableSplit = (TableSplit)(context.getInputSplit()); HConnectionManager.execute(new HConnectable(conf) { @Override public Void connect(HConnection conn) throws IOException { @@ -126,26 +131,64 @@ public class VerifyReplication extends Configured implements Tool { TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); Table replicatedTable = new HTable(peerConf, tableName); scan.setStartRow(value.getRow()); + scan.setStopRow(tableSplit.getEndRow()); replicatedScanner = replicatedTable.getScanner(scan); return null; } }); + currentCompareRowInPeerTable = replicatedScanner.next(); } - Result res = replicatedScanner.next(); - try { - Result.compareResults(value, res); - context.getCounter(Counters.GOODROWS).increment(1); - } catch (Exception e) { - LOG.warn("Bad row", e); - context.getCounter(Counters.BADROWS).increment(1); + while (true) { + if (currentCompareRowInPeerTable == null) { + // reach the region end of peer table, row only in source table + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); + break; + } + int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); + if (rowCmpRet == 0) { + // rowkey is same, need to compare the content of the row + try { + Result.compareResults(value, currentCompareRowInPeerTable); + context.getCounter(Counters.GOODROWS).increment(1); + } catch (Exception e) { + logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); + } + currentCompareRowInPeerTable = replicatedScanner.next(); + break; + } else if (rowCmpRet < 0) { + // row only exists in source table + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); + break; + } else { + // row only exists in peer table + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, + currentCompareRowInPeerTable); + currentCompareRowInPeerTable = replicatedScanner.next(); + } } } + private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { + context.getCounter(counter).increment(1); + context.getCounter(Counters.BADROWS).increment(1); + LOG.error(counter.toString() + ", rowkey=" + Bytes.toString(row.getRow())); + } + @Override protected void cleanup(Context context) { if (replicatedScanner != null) { - replicatedScanner.close(); - replicatedScanner = null; + try { + while (currentCompareRowInPeerTable != null) { + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, + currentCompareRowInPeerTable); + currentCompareRowInPeerTable = replicatedScanner.next(); + } + } catch (Exception e) { + LOG.error("fail to scan peer table in cleanup", e); + } finally { + replicatedScanner.close(); + replicatedScanner = null; + } } } }