diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 1e268c1858b..c74e71525df 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -20,6 +20,11 @@ package org.apache.hadoop.hbase.mapreduce.replication; import java.io.IOException; import java.util.Arrays; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; @@ -30,7 +35,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -46,6 +50,7 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat; import org.apache.hadoop.hbase.mapreduce.TableSplit; +import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication.Verifier.Counters; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; @@ -55,12 +60,12 @@ import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -84,6 +89,11 @@ public class VerifyReplication extends Configured implements Tool { public final static String NAME = "verifyrep"; private final static String PEER_CONFIG_PREFIX = NAME + ".peer."; + private static ExecutorService reCompareExecutor = null; + int reCompareTries = 1; + int reCompareBackoffExponent = 0; + int reCompareThreads = 1; + int sleepMsBeforeReCompare = 0; long startTime = 0; long endTime = Long.MAX_VALUE; int batch = -1; @@ -94,7 +104,6 @@ public class VerifyReplication extends Configured implements Tool { String peerId = null; String peerQuorumAddress = null; String rowPrefixes = null; - int sleepMsBeforeReCompare = 0; boolean verbose = false; boolean includeDeletedCells = false; // Source table snapshot name @@ -124,7 +133,12 @@ public class VerifyReplication extends Configured implements Tool { BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, - CONTENT_DIFFERENT_ROWS + CONTENT_DIFFERENT_ROWS, + RE_COMPARES, + REJECTED_RE_COMPARES, + SOURCE_ROW_CHANGED, + PEER_ROW_CHANGED, + FAILED_RECOMPARE } private Connection sourceConnection; @@ -133,6 +147,9 @@ public class VerifyReplication extends Configured implements Tool { private Table replicatedTable; private ResultScanner replicatedScanner; private Result currentCompareRowInPeerTable; + private Scan tableScan; + private int reCompareTries; + private int reCompareBackoffExponent; private int sleepMsBeforeReCompare; private String delimiter = ""; private boolean verbose = false; @@ -150,6 +167,8 @@ public class VerifyReplication extends Configured implements Tool { throws IOException { if (replicatedScanner == null) { Configuration conf = context.getConfiguration(); + reCompareTries = conf.getInt(NAME + ".reCompareTries", 0); + reCompareBackoffExponent = conf.getInt(NAME + ".reCompareBackoffExponent", 1); sleepMsBeforeReCompare = conf.getInt(NAME + ".sleepMsBeforeReCompare", 0); delimiter = conf.get(NAME + ".delimiter", ""); verbose = conf.getBoolean(NAME + ".verbose", false); @@ -179,9 +198,12 @@ public class VerifyReplication extends Configured implements Tool { if (versions >= 0) { scan.readVersions(versions); } + int reCompareThreads = conf.getInt(NAME + ".reCompareThreads", 1); + reCompareExecutor = buildReCompareExecutor(reCompareThreads, context); TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); sourceConnection = ConnectionFactory.createConnection(conf); sourceTable = sourceConnection.getTable(tableName); + tableScan = scan; final InputSplit tableSplit = context.getInputSplit(); @@ -226,7 +248,7 @@ public class VerifyReplication extends Configured implements Tool { while (true) { if (currentCompareRowInPeerTable == null) { // reach the region end of peer table, row only in source table - logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null); break; } int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); @@ -240,47 +262,39 @@ public class VerifyReplication extends Configured implements Tool { "Good row key: " + delimiter + Bytes.toStringBinary(value.getRow()) + delimiter); } } catch (Exception e) { - logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); + logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value, + currentCompareRowInPeerTable); } currentCompareRowInPeerTable = replicatedScanner.next(); break; } else if (rowCmpRet < 0) { // row only exists in source table - logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null); break; } else { // row only exists in peer table - logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null, currentCompareRowInPeerTable); currentCompareRowInPeerTable = replicatedScanner.next(); } } } - private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { - if (sleepMsBeforeReCompare > 0) { - Threads.sleep(sleepMsBeforeReCompare); - try { - Result sourceResult = sourceTable.get(new Get(row.getRow())); - Result replicatedResult = replicatedTable.get(new Get(row.getRow())); - Result.compareResults(sourceResult, replicatedResult, false); - if (!sourceResult.isEmpty()) { - context.getCounter(Counters.GOODROWS).increment(1); - if (verbose) { - LOG.info("Good row key (with recompare): " + delimiter - + Bytes.toStringBinary(row.getRow()) + delimiter); - } - } - return; - } catch (Exception e) { - LOG.error("recompare fail after sleep, rowkey=" + delimiter - + Bytes.toStringBinary(row.getRow()) + delimiter); - } + @SuppressWarnings("FutureReturnValueIgnored") + private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row, + Result replicatedRow) { + if (reCompareTries > 0 && sleepMsBeforeReCompare > 0) { + VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable( + context, row, replicatedRow, counter, delimiter, tableScan, sourceTable, replicatedTable, + reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, verbose); + reCompareExecutor.submit(runnable); + return; } + + byte[] rowKey = getRow(row, replicatedRow); context.getCounter(counter).increment(1); context.getCounter(Counters.BADROWS).increment(1); - LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) - + delimiter); + LOG.error("{}, rowkey={}{}{}", counter, delimiter, Bytes.toStringBinary(rowKey), delimiter); } @Override @@ -288,7 +302,7 @@ public class VerifyReplication extends Configured implements Tool { if (replicatedScanner != null) { try { while (currentCompareRowInPeerTable != null) { - logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null, currentCompareRowInPeerTable); currentCompareRowInPeerTable = replicatedScanner.next(); } @@ -329,6 +343,17 @@ public class VerifyReplication extends Configured implements Tool { LOG.error("fail to close replicated connection in cleanup", e); } } + if (reCompareExecutor != null && !reCompareExecutor.isShutdown()) { + reCompareExecutor.shutdown(); + try { + boolean terminated = reCompareExecutor.awaitTermination(10, TimeUnit.SECONDS); + if (!terminated) { + LOG.warn("Wait termination timed out"); + } + } catch (InterruptedException e) { + LOG.error("fail to await executor termination in cleanup", e); + } + } } } @@ -424,6 +449,10 @@ public class VerifyReplication extends Configured implements Tool { conf.setInt(NAME + ".versions", versions); LOG.info("Number of version: " + versions); + conf.setInt(NAME + ".reCompareTries", reCompareTries); + conf.setInt(NAME + ".reCompareBackoffExponent", reCompareBackoffExponent); + conf.setInt(NAME + ".reCompareThreads", reCompareThreads); + // Set Snapshot specific parameters if (peerSnapshotName != null) { conf.set(NAME + ".peerSnapshotName", peerSnapshotName); @@ -491,6 +520,15 @@ public class VerifyReplication extends Configured implements Tool { return job; } + protected static byte[] getRow(Result sourceResult, Result replicatedResult) { + if (sourceResult != null) { + return sourceResult.getRow(); + } else if (replicatedResult != null) { + return replicatedResult.getRow(); + } + throw new RuntimeException("Both sourceResult and replicatedResult are null!"); + } + private static void setRowPrefixFilter(Scan scan, String rowPrefixes) { if (rowPrefixes != null && !rowPrefixes.isEmpty()) { String[] rowPrefixArray = rowPrefixes.split(","); @@ -628,6 +666,25 @@ public class VerifyReplication extends Configured implements Tool { continue; } + final String reCompareThreadArgs = "--reCompareThreads="; + if (cmd.startsWith(reCompareThreadArgs)) { + reCompareThreads = Integer.parseInt(cmd.substring(reCompareThreadArgs.length())); + continue; + } + + final String reCompareTriesKey = "--reCompareTries="; + if (cmd.startsWith(reCompareTriesKey)) { + reCompareTries = Integer.parseInt(cmd.substring(reCompareTriesKey.length())); + continue; + } + + final String reCompareBackoffExponentKey = "--reCompareBackoffExponent="; + if (cmd.startsWith(reCompareBackoffExponentKey)) { + reCompareBackoffExponent = + Integer.parseInt(cmd.substring(reCompareBackoffExponentKey.length())); + continue; + } + if (cmd.startsWith("--")) { printUsage("Invalid argument '" + cmd + "'"); return false; @@ -705,6 +762,7 @@ public class VerifyReplication extends Configured implements Tool { } System.err.println("Usage: verifyrep [--starttime=X]" + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " + + "[--reCompareThreads=] [--reCompareTries=] [--reCompareBackoffExponent=]" + "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] " + "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] " + "[--peerFSAddress=T] [--peerHBaseRootAddress=U] "); @@ -722,6 +780,12 @@ public class VerifyReplication extends Configured implements Tool { System.err.println(" delimiter the delimiter used in display around rowkey"); System.err.println(" recomparesleep milliseconds to sleep before recompare row, " + "default value is 0 which disables the recompare."); + System.err.println(" reCompareThreads number of threads to run recompares in"); + System.err.println(" reCompareTries number of recompare attempts before incrementing " + + "the BADROWS counter. Defaults to 1 recompare"); + System.out.println(" reCompareBackoffExponent exponential multiplier to increase " + + "recomparesleep after each recompare attempt, " + + "default value is 0 which results in a constant sleep time"); System.err.println(" verbose logs row keys of good rows"); System.err.println(" peerTableName Peer Table Name"); System.err.println(" sourceSnapshotName Source Snapshot Name"); @@ -788,6 +852,23 @@ public class VerifyReplication extends Configured implements Tool { + "2181:/cluster-b \\\n" + " TestTable"); } + private static ExecutorService buildReCompareExecutor(int maxThreads, Mapper.Context context) { + return new ThreadPoolExecutor(0, maxThreads, 1L, TimeUnit.SECONDS, new SynchronousQueue<>(), + buildRejectedReComparePolicy(context)); + } + + private static CallerRunsPolicy buildRejectedReComparePolicy(Mapper.Context context) { + return new CallerRunsPolicy() { + @Override + public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) { + LOG.debug("Re-comparison execution rejected. Running in main thread."); + context.getCounter(Counters.REJECTED_RE_COMPARES).increment(1); + // will run in the current thread + super.rejectedExecution(runnable, e); + } + }; + } + @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplicationRecompareRunnable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplicationRecompareRunnable.java new file mode 100644 index 00000000000..c8988510ddf --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplicationRecompareRunnable.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce.replication; + +import java.io.IOException; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class VerifyReplicationRecompareRunnable implements Runnable { + + private static final Logger LOG = + LoggerFactory.getLogger(VerifyReplicationRecompareRunnable.class); + + private final Mapper.Context context; + private final VerifyReplication.Verifier.Counters originalCounter; + private final String delimiter; + private final byte[] row; + private final Scan tableScan; + private final Table sourceTable; + private final Table replicatedTable; + + private final int reCompareTries; + private final int sleepMsBeforeReCompare; + private final int reCompareBackoffExponent; + private final boolean verbose; + + private Result sourceResult; + private Result replicatedResult; + + public VerifyReplicationRecompareRunnable(Mapper.Context context, Result sourceResult, + Result replicatedResult, VerifyReplication.Verifier.Counters originalCounter, String delimiter, + Scan tableScan, Table sourceTable, Table replicatedTable, int reCompareTries, + int sleepMsBeforeReCompare, int reCompareBackoffExponent, boolean verbose) { + this.context = context; + this.sourceResult = sourceResult; + this.replicatedResult = replicatedResult; + this.originalCounter = originalCounter; + this.delimiter = delimiter; + this.tableScan = tableScan; + this.sourceTable = sourceTable; + this.replicatedTable = replicatedTable; + this.reCompareTries = reCompareTries; + this.sleepMsBeforeReCompare = sleepMsBeforeReCompare; + this.reCompareBackoffExponent = reCompareBackoffExponent; + this.verbose = verbose; + this.row = VerifyReplication.getRow(sourceResult, replicatedResult); + } + + @Override + public void run() { + Get get = new Get(row); + get.setCacheBlocks(tableScan.getCacheBlocks()); + get.setFilter(tableScan.getFilter()); + + int sleepMs = sleepMsBeforeReCompare; + int tries = 0; + + while (++tries <= reCompareTries) { + context.getCounter(VerifyReplication.Verifier.Counters.RE_COMPARES).increment(1); + + try { + Thread.sleep(sleepMs); + } catch (InterruptedException e) { + LOG.warn("Sleeping interrupted, incrementing bad rows and aborting"); + incrementOriginalAndBadCounter(); + Thread.currentThread().interrupt(); + } + + try { + if (fetchLatestRows(get)) { + if (matches(sourceResult, replicatedResult, null)) { + if (verbose) { + LOG.info("Good row key (with recompare): {}{}{}", delimiter, + Bytes.toStringBinary(row), delimiter); + } + context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).increment(1); + return; + } + } + } catch (IOException e) { + context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1); + if (verbose) { + LOG.info("Got an exception during recompare for rowkey={}", Bytes.toStringBinary(row), e); + } + } + + sleepMs = sleepMs * (2 ^ reCompareBackoffExponent); + } + + LOG.error("{}, rowkey={}{}{}", originalCounter, delimiter, Bytes.toStringBinary(row), + delimiter); + incrementOriginalAndBadCounter(); + } + + private boolean fetchLatestRows(Get get) throws IOException { + Result sourceResult = sourceTable.get(get); + Result replicatedResult = replicatedTable.get(get); + + boolean sourceMatches = matches(sourceResult, this.sourceResult, + VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED); + boolean replicatedMatches = matches(replicatedResult, this.replicatedResult, + VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED); + + if (sourceMatches && replicatedMatches) { + return true; + } + + this.sourceResult = sourceResult; + this.replicatedResult = replicatedResult; + return false; + } + + private boolean matches(Result original, Result updated, + VerifyReplication.Verifier.Counters failCounter) { + try { + Result.compareResults(original, updated); + return true; + } catch (Exception e) { + if (failCounter != null) { + context.getCounter(failCounter).increment(1); + if (LOG.isDebugEnabled()) { + LOG.debug("{} for rowkey={}", failCounter, Bytes.toStringBinary(row)); + } + } + return false; + } + } + + private void incrementOriginalAndBadCounter() { + context.getCounter(originalCounter).increment(1); + context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).increment(1); + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java index ee77d9f6fcc..b0a50951cf6 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java @@ -447,6 +447,32 @@ public class TestVerifyReplication extends TestReplicationBase { checkRestoreTmpDir(CONF2, tmpPath2, 2); } + @Test + public void testVerifyReplicationThreadedRecompares() throws Exception { + // Populate the tables with same data + runBatchCopyTest(); + + // ONLY_IN_PEER_TABLE_ROWS + Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); + put.addColumn(noRepfamName, row, row); + htable3.put(put); + + // CONTENT_DIFFERENT_ROWS + put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); + put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); + htable3.put(put); + + // ONLY_IN_SOURCE_TABLE_ROWS + put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); + put.addColumn(noRepfamName, row, row); + htable1.put(put); + + String[] args = new String[] { "--reCompareThreads=10", "--reCompareTries=3", + "--recomparesleep=1", "--peerTableName=" + peerTableName.getNameAsString(), + UTIL2.getClusterKey(), tableName.getNameAsString() }; + runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); + } + @AfterClass public static void tearDownAfterClass() throws Exception { htable3.close(); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRecompareRunnable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRecompareRunnable.java new file mode 100644 index 00000000000..dbfbd1297e2 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRecompareRunnable.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; +import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplicationRecompareRunnable; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.counters.GenericCounter; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@Category({ ReplicationTests.class, SmallTests.class }) +@RunWith(MockitoJUnitRunner.class) +public class TestVerifyReplicationRecompareRunnable { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestVerifyReplicationRecompareRunnable.class); + + @Mock + private Table sourceTable; + + @Mock + private Table replicatedTable; + + @Mock + private Mapper.Context context; + + static Result genResult(int cols) { + KeyValue[] kvs = new KeyValue[cols]; + + for (int i = 0; i < cols; ++i) { + kvs[i] = + new KeyValue(genBytes(), genBytes(), genBytes(), System.currentTimeMillis(), genBytes()); + } + + return Result.create(kvs); + } + + static byte[] genBytes() { + return Bytes.toBytes(ThreadLocalRandom.current().nextInt()); + } + + @Before + public void setUp() { + for (VerifyReplication.Verifier.Counters counter : VerifyReplication.Verifier.Counters + .values()) { + Counter emptyCounter = new GenericCounter(counter.name(), counter.name()); + when(context.getCounter(counter)).thenReturn(emptyCounter); + } + } + + @Test + public void itRecomparesGoodRow() throws IOException { + Result result = genResult(2); + + when(sourceTable.get(any(Get.class))).thenReturn(result); + when(replicatedTable.get(any(Get.class))).thenReturn(result); + + VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, + genResult(5), null, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, "", + new Scan(), sourceTable, replicatedTable, 3, 1, 0, true); + + runnable.run(); + + assertEquals(0, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + assertEquals(0, + context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); + assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); + assertEquals(1, + context.getCounter(VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED).getValue()); + assertEquals(1, + context.getCounter(VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED).getValue()); + assertEquals(2, context.getCounter(VerifyReplication.Verifier.Counters.RE_COMPARES).getValue()); + } + + @Test + public void itRecomparesBadRow() throws IOException { + Result replicatedResult = genResult(1); + when(sourceTable.get(any(Get.class))).thenReturn(genResult(5)); + when(replicatedTable.get(any(Get.class))).thenReturn(replicatedResult); + + VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, + genResult(5), replicatedResult, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, + "", new Scan(), sourceTable, replicatedTable, 1, 1, 0, true); + + runnable.run(); + + assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + assertEquals(1, + context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); + assertEquals(0, context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); + assertEquals(1, + context.getCounter(VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED).getValue()); + assertEquals(0, + context.getCounter(VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED).getValue()); + assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.RE_COMPARES).getValue()); + } + + @Test + public void itHandlesExceptionOnRecompare() throws IOException { + when(sourceTable.get(any(Get.class))).thenThrow(new IOException("Error!")); + when(replicatedTable.get(any(Get.class))).thenReturn(genResult(5)); + + VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, + genResult(5), null, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, "", + new Scan(), sourceTable, replicatedTable, 1, 1, 0, true); + + runnable.run(); + + assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + assertEquals(1, + context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); + assertEquals(1, + context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue()); + assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.RE_COMPARES).getValue()); + } +}