async VerifyReplication recompares

This commit is contained in:
Hernan Gelaf-Romer 2023-02-21 11:21:48 -05:00
parent a71105997f
commit d99c9a5c24
4 changed files with 447 additions and 30 deletions

View File

@ -20,6 +20,11 @@ package org.apache.hadoop.hbase.mapreduce.replication;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -30,7 +35,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
@ -46,6 +50,7 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSplit; import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication.Verifier.Counters;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
@ -55,12 +60,12 @@ import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
@ -84,6 +89,11 @@ public class VerifyReplication extends Configured implements Tool {
public final static String NAME = "verifyrep"; public final static String NAME = "verifyrep";
private final static String PEER_CONFIG_PREFIX = NAME + ".peer."; private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
private static ExecutorService reCompareExecutor = null;
int reCompareTries = 1;
int reCompareBackoffExponent = 0;
int reCompareThreads = 1;
int sleepMsBeforeReCompare = 0;
long startTime = 0; long startTime = 0;
long endTime = Long.MAX_VALUE; long endTime = Long.MAX_VALUE;
int batch = -1; int batch = -1;
@ -94,7 +104,6 @@ public class VerifyReplication extends Configured implements Tool {
String peerId = null; String peerId = null;
String peerQuorumAddress = null; String peerQuorumAddress = null;
String rowPrefixes = null; String rowPrefixes = null;
int sleepMsBeforeReCompare = 0;
boolean verbose = false; boolean verbose = false;
boolean includeDeletedCells = false; boolean includeDeletedCells = false;
// Source table snapshot name // Source table snapshot name
@ -124,7 +133,12 @@ public class VerifyReplication extends Configured implements Tool {
BADROWS, BADROWS,
ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_SOURCE_TABLE_ROWS,
ONLY_IN_PEER_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS,
CONTENT_DIFFERENT_ROWS CONTENT_DIFFERENT_ROWS,
RE_COMPARES,
REJECTED_RE_COMPARES,
SOURCE_ROW_CHANGED,
PEER_ROW_CHANGED,
FAILED_RECOMPARE
} }
private Connection sourceConnection; private Connection sourceConnection;
@ -133,6 +147,9 @@ public class VerifyReplication extends Configured implements Tool {
private Table replicatedTable; private Table replicatedTable;
private ResultScanner replicatedScanner; private ResultScanner replicatedScanner;
private Result currentCompareRowInPeerTable; private Result currentCompareRowInPeerTable;
private Scan tableScan;
private int reCompareTries;
private int reCompareBackoffExponent;
private int sleepMsBeforeReCompare; private int sleepMsBeforeReCompare;
private String delimiter = ""; private String delimiter = "";
private boolean verbose = false; private boolean verbose = false;
@ -150,6 +167,8 @@ public class VerifyReplication extends Configured implements Tool {
throws IOException { throws IOException {
if (replicatedScanner == null) { if (replicatedScanner == null) {
Configuration conf = context.getConfiguration(); Configuration conf = context.getConfiguration();
reCompareTries = conf.getInt(NAME + ".reCompareTries", 0);
reCompareBackoffExponent = conf.getInt(NAME + ".reCompareBackoffExponent", 1);
sleepMsBeforeReCompare = conf.getInt(NAME + ".sleepMsBeforeReCompare", 0); sleepMsBeforeReCompare = conf.getInt(NAME + ".sleepMsBeforeReCompare", 0);
delimiter = conf.get(NAME + ".delimiter", ""); delimiter = conf.get(NAME + ".delimiter", "");
verbose = conf.getBoolean(NAME + ".verbose", false); verbose = conf.getBoolean(NAME + ".verbose", false);
@ -179,9 +198,12 @@ public class VerifyReplication extends Configured implements Tool {
if (versions >= 0) { if (versions >= 0) {
scan.readVersions(versions); scan.readVersions(versions);
} }
int reCompareThreads = conf.getInt(NAME + ".reCompareThreads", 1);
reCompareExecutor = buildReCompareExecutor(reCompareThreads, context);
TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
sourceConnection = ConnectionFactory.createConnection(conf); sourceConnection = ConnectionFactory.createConnection(conf);
sourceTable = sourceConnection.getTable(tableName); sourceTable = sourceConnection.getTable(tableName);
tableScan = scan;
final InputSplit tableSplit = context.getInputSplit(); final InputSplit tableSplit = context.getInputSplit();
@ -226,7 +248,7 @@ public class VerifyReplication extends Configured implements Tool {
while (true) { while (true) {
if (currentCompareRowInPeerTable == null) { if (currentCompareRowInPeerTable == null) {
// reach the region end of peer table, row only in source table // reach the region end of peer table, row only in source table
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
break; break;
} }
int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
@ -240,47 +262,39 @@ public class VerifyReplication extends Configured implements Tool {
"Good row key: " + delimiter + Bytes.toStringBinary(value.getRow()) + delimiter); "Good row key: " + delimiter + Bytes.toStringBinary(value.getRow()) + delimiter);
} }
} catch (Exception e) { } catch (Exception e) {
logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value,
currentCompareRowInPeerTable);
} }
currentCompareRowInPeerTable = replicatedScanner.next(); currentCompareRowInPeerTable = replicatedScanner.next();
break; break;
} else if (rowCmpRet < 0) { } else if (rowCmpRet < 0) {
// row only exists in source table // row only exists in source table
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
break; break;
} else { } else {
// row only exists in peer table // row only exists in peer table
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null,
currentCompareRowInPeerTable); currentCompareRowInPeerTable);
currentCompareRowInPeerTable = replicatedScanner.next(); currentCompareRowInPeerTable = replicatedScanner.next();
} }
} }
} }
private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { @SuppressWarnings("FutureReturnValueIgnored")
if (sleepMsBeforeReCompare > 0) { private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row,
Threads.sleep(sleepMsBeforeReCompare); Result replicatedRow) {
try { if (reCompareTries > 0 && sleepMsBeforeReCompare > 0) {
Result sourceResult = sourceTable.get(new Get(row.getRow())); VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(
Result replicatedResult = replicatedTable.get(new Get(row.getRow())); context, row, replicatedRow, counter, delimiter, tableScan, sourceTable, replicatedTable,
Result.compareResults(sourceResult, replicatedResult, false); reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, verbose);
if (!sourceResult.isEmpty()) { reCompareExecutor.submit(runnable);
context.getCounter(Counters.GOODROWS).increment(1);
if (verbose) {
LOG.info("Good row key (with recompare): " + delimiter
+ Bytes.toStringBinary(row.getRow()) + delimiter);
}
}
return; return;
} catch (Exception e) {
LOG.error("recompare fail after sleep, rowkey=" + delimiter
+ Bytes.toStringBinary(row.getRow()) + delimiter);
}
} }
byte[] rowKey = getRow(row, replicatedRow);
context.getCounter(counter).increment(1); context.getCounter(counter).increment(1);
context.getCounter(Counters.BADROWS).increment(1); context.getCounter(Counters.BADROWS).increment(1);
LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) LOG.error("{}, rowkey={}{}{}", counter, delimiter, Bytes.toStringBinary(rowKey), delimiter);
+ delimiter);
} }
@Override @Override
@ -288,7 +302,7 @@ public class VerifyReplication extends Configured implements Tool {
if (replicatedScanner != null) { if (replicatedScanner != null) {
try { try {
while (currentCompareRowInPeerTable != null) { while (currentCompareRowInPeerTable != null) {
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null,
currentCompareRowInPeerTable); currentCompareRowInPeerTable);
currentCompareRowInPeerTable = replicatedScanner.next(); currentCompareRowInPeerTable = replicatedScanner.next();
} }
@ -329,6 +343,17 @@ public class VerifyReplication extends Configured implements Tool {
LOG.error("fail to close replicated connection in cleanup", e); LOG.error("fail to close replicated connection in cleanup", e);
} }
} }
if (reCompareExecutor != null && !reCompareExecutor.isShutdown()) {
reCompareExecutor.shutdown();
try {
boolean terminated = reCompareExecutor.awaitTermination(10, TimeUnit.SECONDS);
if (!terminated) {
LOG.warn("Wait termination timed out");
}
} catch (InterruptedException e) {
LOG.error("fail to await executor termination in cleanup", e);
}
}
} }
} }
@ -424,6 +449,10 @@ public class VerifyReplication extends Configured implements Tool {
conf.setInt(NAME + ".versions", versions); conf.setInt(NAME + ".versions", versions);
LOG.info("Number of version: " + versions); LOG.info("Number of version: " + versions);
conf.setInt(NAME + ".reCompareTries", reCompareTries);
conf.setInt(NAME + ".reCompareBackoffExponent", reCompareBackoffExponent);
conf.setInt(NAME + ".reCompareThreads", reCompareThreads);
// Set Snapshot specific parameters // Set Snapshot specific parameters
if (peerSnapshotName != null) { if (peerSnapshotName != null) {
conf.set(NAME + ".peerSnapshotName", peerSnapshotName); conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
@ -491,6 +520,15 @@ public class VerifyReplication extends Configured implements Tool {
return job; return job;
} }
protected static byte[] getRow(Result sourceResult, Result replicatedResult) {
if (sourceResult != null) {
return sourceResult.getRow();
} else if (replicatedResult != null) {
return replicatedResult.getRow();
}
throw new RuntimeException("Both sourceResult and replicatedResult are null!");
}
private static void setRowPrefixFilter(Scan scan, String rowPrefixes) { private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
if (rowPrefixes != null && !rowPrefixes.isEmpty()) { if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
String[] rowPrefixArray = rowPrefixes.split(","); String[] rowPrefixArray = rowPrefixes.split(",");
@ -628,6 +666,25 @@ public class VerifyReplication extends Configured implements Tool {
continue; continue;
} }
final String reCompareThreadArgs = "--reCompareThreads=";
if (cmd.startsWith(reCompareThreadArgs)) {
reCompareThreads = Integer.parseInt(cmd.substring(reCompareThreadArgs.length()));
continue;
}
final String reCompareTriesKey = "--reCompareTries=";
if (cmd.startsWith(reCompareTriesKey)) {
reCompareTries = Integer.parseInt(cmd.substring(reCompareTriesKey.length()));
continue;
}
final String reCompareBackoffExponentKey = "--reCompareBackoffExponent=";
if (cmd.startsWith(reCompareBackoffExponentKey)) {
reCompareBackoffExponent =
Integer.parseInt(cmd.substring(reCompareBackoffExponentKey.length()));
continue;
}
if (cmd.startsWith("--")) { if (cmd.startsWith("--")) {
printUsage("Invalid argument '" + cmd + "'"); printUsage("Invalid argument '" + cmd + "'");
return false; return false;
@ -705,6 +762,7 @@ public class VerifyReplication extends Configured implements Tool {
} }
System.err.println("Usage: verifyrep [--starttime=X]" System.err.println("Usage: verifyrep [--starttime=X]"
+ " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] "
+ "[--reCompareThreads=] [--reCompareTries=] [--reCompareBackoffExponent=]"
+ "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] " + "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] "
+ "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] " + "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] "
+ "[--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>"); + "[--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>");
@ -722,6 +780,12 @@ public class VerifyReplication extends Configured implements Tool {
System.err.println(" delimiter the delimiter used in display around rowkey"); System.err.println(" delimiter the delimiter used in display around rowkey");
System.err.println(" recomparesleep milliseconds to sleep before recompare row, " System.err.println(" recomparesleep milliseconds to sleep before recompare row, "
+ "default value is 0 which disables the recompare."); + "default value is 0 which disables the recompare.");
System.err.println(" reCompareThreads number of threads to run recompares in");
System.err.println(" reCompareTries number of recompare attempts before incrementing "
+ "the BADROWS counter. Defaults to 1 recompare");
System.out.println(" reCompareBackoffExponent exponential multiplier to increase "
+ "recomparesleep after each recompare attempt, "
+ "default value is 0 which results in a constant sleep time");
System.err.println(" verbose logs row keys of good rows"); System.err.println(" verbose logs row keys of good rows");
System.err.println(" peerTableName Peer Table Name"); System.err.println(" peerTableName Peer Table Name");
System.err.println(" sourceSnapshotName Source Snapshot Name"); System.err.println(" sourceSnapshotName Source Snapshot Name");
@ -788,6 +852,23 @@ public class VerifyReplication extends Configured implements Tool {
+ "2181:/cluster-b \\\n" + " TestTable"); + "2181:/cluster-b \\\n" + " TestTable");
} }
private static ExecutorService buildReCompareExecutor(int maxThreads, Mapper.Context context) {
return new ThreadPoolExecutor(0, maxThreads, 1L, TimeUnit.SECONDS, new SynchronousQueue<>(),
buildRejectedReComparePolicy(context));
}
private static CallerRunsPolicy buildRejectedReComparePolicy(Mapper.Context context) {
return new CallerRunsPolicy() {
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
LOG.debug("Re-comparison execution rejected. Running in main thread.");
context.getCounter(Counters.REJECTED_RE_COMPARES).increment(1);
// will run in the current thread
super.rejectedExecution(runnable, e);
}
};
}
@Override @Override
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
Configuration conf = this.getConf(); Configuration conf = this.getConf();

View File

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce.replication;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class VerifyReplicationRecompareRunnable implements Runnable {
private static final Logger LOG =
LoggerFactory.getLogger(VerifyReplicationRecompareRunnable.class);
private final Mapper.Context context;
private final VerifyReplication.Verifier.Counters originalCounter;
private final String delimiter;
private final byte[] row;
private final Scan tableScan;
private final Table sourceTable;
private final Table replicatedTable;
private final int reCompareTries;
private final int sleepMsBeforeReCompare;
private final int reCompareBackoffExponent;
private final boolean verbose;
private Result sourceResult;
private Result replicatedResult;
public VerifyReplicationRecompareRunnable(Mapper.Context context, Result sourceResult,
Result replicatedResult, VerifyReplication.Verifier.Counters originalCounter, String delimiter,
Scan tableScan, Table sourceTable, Table replicatedTable, int reCompareTries,
int sleepMsBeforeReCompare, int reCompareBackoffExponent, boolean verbose) {
this.context = context;
this.sourceResult = sourceResult;
this.replicatedResult = replicatedResult;
this.originalCounter = originalCounter;
this.delimiter = delimiter;
this.tableScan = tableScan;
this.sourceTable = sourceTable;
this.replicatedTable = replicatedTable;
this.reCompareTries = reCompareTries;
this.sleepMsBeforeReCompare = sleepMsBeforeReCompare;
this.reCompareBackoffExponent = reCompareBackoffExponent;
this.verbose = verbose;
this.row = VerifyReplication.getRow(sourceResult, replicatedResult);
}
@Override
public void run() {
Get get = new Get(row);
get.setCacheBlocks(tableScan.getCacheBlocks());
get.setFilter(tableScan.getFilter());
int sleepMs = sleepMsBeforeReCompare;
int tries = 0;
while (++tries <= reCompareTries) {
context.getCounter(VerifyReplication.Verifier.Counters.RE_COMPARES).increment(1);
try {
Thread.sleep(sleepMs);
} catch (InterruptedException e) {
LOG.warn("Sleeping interrupted, incrementing bad rows and aborting");
incrementOriginalAndBadCounter();
Thread.currentThread().interrupt();
}
try {
if (fetchLatestRows(get)) {
if (matches(sourceResult, replicatedResult, null)) {
if (verbose) {
LOG.info("Good row key (with recompare): {}{}{}", delimiter,
Bytes.toStringBinary(row), delimiter);
}
context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).increment(1);
return;
}
}
} catch (IOException e) {
context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1);
if (verbose) {
LOG.info("Got an exception during recompare for rowkey={}", Bytes.toStringBinary(row), e);
}
}
sleepMs = sleepMs * (2 ^ reCompareBackoffExponent);
}
LOG.error("{}, rowkey={}{}{}", originalCounter, delimiter, Bytes.toStringBinary(row),
delimiter);
incrementOriginalAndBadCounter();
}
private boolean fetchLatestRows(Get get) throws IOException {
Result sourceResult = sourceTable.get(get);
Result replicatedResult = replicatedTable.get(get);
boolean sourceMatches = matches(sourceResult, this.sourceResult,
VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED);
boolean replicatedMatches = matches(replicatedResult, this.replicatedResult,
VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED);
if (sourceMatches && replicatedMatches) {
return true;
}
this.sourceResult = sourceResult;
this.replicatedResult = replicatedResult;
return false;
}
private boolean matches(Result original, Result updated,
VerifyReplication.Verifier.Counters failCounter) {
try {
Result.compareResults(original, updated);
return true;
} catch (Exception e) {
if (failCounter != null) {
context.getCounter(failCounter).increment(1);
if (LOG.isDebugEnabled()) {
LOG.debug("{} for rowkey={}", failCounter, Bytes.toStringBinary(row));
}
}
return false;
}
}
private void incrementOriginalAndBadCounter() {
context.getCounter(originalCounter).increment(1);
context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).increment(1);
}
}

View File

@ -447,6 +447,32 @@ public class TestVerifyReplication extends TestReplicationBase {
checkRestoreTmpDir(CONF2, tmpPath2, 2); checkRestoreTmpDir(CONF2, tmpPath2, 2);
} }
@Test
public void testVerifyReplicationThreadedRecompares() throws Exception {
// Populate the tables with same data
runBatchCopyTest();
// ONLY_IN_PEER_TABLE_ROWS
Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
put.addColumn(noRepfamName, row, row);
htable3.put(put);
// CONTENT_DIFFERENT_ROWS
put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
htable3.put(put);
// ONLY_IN_SOURCE_TABLE_ROWS
put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
put.addColumn(noRepfamName, row, row);
htable1.put(put);
String[] args = new String[] { "--reCompareThreads=10", "--reCompareTries=3",
"--recomparesleep=1", "--peerTableName=" + peerTableName.getNameAsString(),
UTIL2.getClusterKey(), tableName.getNameAsString() };
runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
}
@AfterClass @AfterClass
public static void tearDownAfterClass() throws Exception { public static void tearDownAfterClass() throws Exception {
htable3.close(); htable3.close();

View File

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplicationRecompareRunnable;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.counters.GenericCounter;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@Category({ ReplicationTests.class, SmallTests.class })
@RunWith(MockitoJUnitRunner.class)
public class TestVerifyReplicationRecompareRunnable {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestVerifyReplicationRecompareRunnable.class);
@Mock
private Table sourceTable;
@Mock
private Table replicatedTable;
@Mock
private Mapper.Context context;
static Result genResult(int cols) {
KeyValue[] kvs = new KeyValue[cols];
for (int i = 0; i < cols; ++i) {
kvs[i] =
new KeyValue(genBytes(), genBytes(), genBytes(), System.currentTimeMillis(), genBytes());
}
return Result.create(kvs);
}
static byte[] genBytes() {
return Bytes.toBytes(ThreadLocalRandom.current().nextInt());
}
@Before
public void setUp() {
for (VerifyReplication.Verifier.Counters counter : VerifyReplication.Verifier.Counters
.values()) {
Counter emptyCounter = new GenericCounter(counter.name(), counter.name());
when(context.getCounter(counter)).thenReturn(emptyCounter);
}
}
@Test
public void itRecomparesGoodRow() throws IOException {
Result result = genResult(2);
when(sourceTable.get(any(Get.class))).thenReturn(result);
when(replicatedTable.get(any(Get.class))).thenReturn(result);
VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context,
genResult(5), null, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, "",
new Scan(), sourceTable, replicatedTable, 3, 1, 0, true);
runnable.run();
assertEquals(0, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
assertEquals(0,
context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
assertEquals(1,
context.getCounter(VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED).getValue());
assertEquals(1,
context.getCounter(VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED).getValue());
assertEquals(2, context.getCounter(VerifyReplication.Verifier.Counters.RE_COMPARES).getValue());
}
@Test
public void itRecomparesBadRow() throws IOException {
Result replicatedResult = genResult(1);
when(sourceTable.get(any(Get.class))).thenReturn(genResult(5));
when(replicatedTable.get(any(Get.class))).thenReturn(replicatedResult);
VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context,
genResult(5), replicatedResult, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS,
"", new Scan(), sourceTable, replicatedTable, 1, 1, 0, true);
runnable.run();
assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
assertEquals(1,
context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
assertEquals(0, context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
assertEquals(1,
context.getCounter(VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED).getValue());
assertEquals(0,
context.getCounter(VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED).getValue());
assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.RE_COMPARES).getValue());
}
@Test
public void itHandlesExceptionOnRecompare() throws IOException {
when(sourceTable.get(any(Get.class))).thenThrow(new IOException("Error!"));
when(replicatedTable.get(any(Get.class))).thenReturn(genResult(5));
VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context,
genResult(5), null, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, "",
new Scan(), sourceTable, replicatedTable, 1, 1, 0, true);
runnable.run();
assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
assertEquals(1,
context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
assertEquals(1,
context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue());
assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.RE_COMPARES).getValue());
}
}