From 946c1ed8f89967b1f036ee3b0dcc296082eee487 Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 22 Sep 2016 21:01:22 -0700 Subject: [PATCH] HBASE-16423 Add re-compare option to VerifyReplication to avoid occasional inconsistent rows (Jianwei Cui) --- .../replication/VerifyReplication.java | 62 ++++++++++++++++--- 1 file changed, 54 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index c4dd3ad09b4..04ae18f2a1a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; @@ -83,6 +85,7 @@ public class VerifyReplication extends Configured implements Tool { static String delimiter = ""; static String peerId = null; static String rowPrefixes = null; + static int sleepMsBeforeReCompare = 0; private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; @@ -97,10 +100,13 @@ public class VerifyReplication extends Configured implements Tool { public static enum Counters { GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS} - private Connection connection; + private Connection sourceConnection; + private Table sourceTable; + private Connection replicatedConnection; private Table replicatedTable; private ResultScanner replicatedScanner; private Result currentCompareRowInPeerTable; + private int sleepMsBeforeReCompare; /** * Map method that compares every scanned row with the equivalent from @@ -116,6 +122,7 @@ public class VerifyReplication extends Configured implements Tool { throws IOException { if (replicatedScanner == null) { Configuration conf = context.getConfiguration(); + sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0); final Scan scan = new Scan(); scan.setBatch(batch); scan.setCacheBlocks(false); @@ -137,6 +144,9 @@ public class VerifyReplication extends Configured implements Tool { if (versions >= 0) { scan.setMaxVersions(versions); } + TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); + sourceConnection = ConnectionFactory.createConnection(conf); + sourceTable = sourceConnection.getTable(tableName); final TableSplit tableSplit = (TableSplit)(context.getInputSplit()); @@ -144,9 +154,8 @@ public class VerifyReplication extends Configured implements Tool { Configuration peerConf = HBaseConfiguration.createClusterConf(conf, zkClusterKey, PEER_CONFIG_PREFIX); - TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); - connection = ConnectionFactory.createConnection(peerConf); - replicatedTable = connection.getTable(tableName); + replicatedConnection = ConnectionFactory.createConnection(peerConf); + replicatedTable = replicatedConnection.getTable(tableName); scan.setStartRow(value.getRow()); scan.setStopRow(tableSplit.getEndRow()); replicatedScanner = replicatedTable.getScanner(scan); @@ -184,6 +193,18 @@ public class VerifyReplication extends Configured implements Tool { } private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { + if (sleepMsBeforeReCompare > 0) { + Threads.sleep(sleepMsBeforeReCompare); + try { + Result sourceResult = sourceTable.get(new Get(row.getRow())); + Result replicatedResult = replicatedTable.get(new Get(row.getRow())); + Result.compareResults(sourceResult, replicatedResult); + return; + } catch (Exception e) { + LOG.error("recompare fail after sleep, rowkey=" + delimiter + + Bytes.toString(row.getRow()) + delimiter); + } + } context.getCounter(counter).increment(1); context.getCounter(Counters.BADROWS).increment(1); LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toString(row.getRow()) + @@ -206,18 +227,34 @@ public class VerifyReplication extends Configured implements Tool { replicatedScanner = null; } } + + if (sourceTable != null) { + try { + sourceTable.close(); + } catch (IOException e) { + LOG.error("fail to close source table in cleanup", e); + } + } + if(sourceConnection != null){ + try { + sourceConnection.close(); + } catch (Exception e) { + LOG.error("fail to close source connection in cleanup", e); + } + } + if(replicatedTable != null){ try{ replicatedTable.close(); } catch (Exception e) { - LOG.error("fail to close table in cleanup", e); + LOG.error("fail to close replicated table in cleanup", e); } } - if(connection != null){ + if(replicatedConnection != null){ try { - connection.close(); + replicatedConnection.close(); } catch (Exception e) { - LOG.error("fail to close connection in cleanup", e); + LOG.error("fail to close replicated connection in cleanup", e); } } } @@ -273,6 +310,7 @@ public class VerifyReplication extends Configured implements Tool { conf.set(NAME+".tableName", tableName); conf.setLong(NAME+".startTime", startTime); conf.setLong(NAME+".endTime", endTime); + conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare); if (families != null) { conf.set(NAME+".families", families); } @@ -408,6 +446,12 @@ public class VerifyReplication extends Configured implements Tool { continue; } + final String sleepToReCompareKey = "--recomparesleep="; + if (cmd.startsWith(sleepToReCompareKey)) { + sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length())); + continue; + } + if (i == args.length-2) { peerId = cmd; } @@ -453,6 +497,8 @@ public class VerifyReplication extends Configured implements Tool { System.err.println(" families comma-separated list of families to copy"); System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); System.err.println(" delimiter the delimiter used in display around rowkey"); + System.err.println(" recomparesleep milliseconds to sleep before recompare row, " + + "default value is 0 which disables the recompare."); System.err.println(); System.err.println("Args:"); System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");