HBASE-16772 Add verbose option to VerifyReplication for logging good rows
This commit is contained in:
parent
8203e02a2f
commit
a8fe9ed64f
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.HConnectable;
|
||||||
import org.apache.hadoop.hbase.client.HConnection;
|
import org.apache.hadoop.hbase.client.HConnection;
|
||||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
@ -46,8 +47,8 @@ import org.apache.hadoop.hbase.mapreduce.TableMapper;
|
||||||
import org.apache.hadoop.hbase.mapreduce.TableSplit;
|
import org.apache.hadoop.hbase.mapreduce.TableSplit;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
@ -85,6 +86,7 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
static String peerId = null;
|
static String peerId = null;
|
||||||
static String rowPrefixes = null;
|
static String rowPrefixes = null;
|
||||||
static int sleepMsBeforeReCompare = 0;
|
static int sleepMsBeforeReCompare = 0;
|
||||||
|
static boolean verbose = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Map-only comparator for 2 tables
|
* Map-only comparator for 2 tables
|
||||||
|
@ -100,6 +102,7 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
private Result currentCompareRowInPeerTable;
|
private Result currentCompareRowInPeerTable;
|
||||||
private Table replicatedTable;
|
private Table replicatedTable;
|
||||||
private int sleepMsBeforeReCompare;
|
private int sleepMsBeforeReCompare;
|
||||||
|
private boolean verbose = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Map method that compares every scanned row with the equivalent from
|
* Map method that compares every scanned row with the equivalent from
|
||||||
|
@ -116,6 +119,7 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
if (replicatedScanner == null) {
|
if (replicatedScanner == null) {
|
||||||
Configuration conf = context.getConfiguration();
|
Configuration conf = context.getConfiguration();
|
||||||
sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0);
|
sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0);
|
||||||
|
verbose = conf.getBoolean(NAME +".verbose", false);
|
||||||
final Scan scan = new Scan();
|
final Scan scan = new Scan();
|
||||||
scan.setBatch(batch);
|
scan.setBatch(batch);
|
||||||
scan.setCacheBlocks(false);
|
scan.setCacheBlocks(false);
|
||||||
|
@ -137,7 +141,8 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
if (versions >= 0) {
|
if (versions >= 0) {
|
||||||
scan.setMaxVersions(versions);
|
scan.setMaxVersions(versions);
|
||||||
}
|
}
|
||||||
sourceTable = new HTable(conf, tableName);
|
TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
|
||||||
|
sourceTable = ConnectionFactory.createConnection(conf).getTable(tableName);
|
||||||
|
|
||||||
final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
|
final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
|
||||||
HConnectionManager.execute(new HConnectable<Void>(conf) {
|
HConnectionManager.execute(new HConnectable<Void>(conf) {
|
||||||
|
@ -169,6 +174,9 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
try {
|
try {
|
||||||
Result.compareResults(value, currentCompareRowInPeerTable);
|
Result.compareResults(value, currentCompareRowInPeerTable);
|
||||||
context.getCounter(Counters.GOODROWS).increment(1);
|
context.getCounter(Counters.GOODROWS).increment(1);
|
||||||
|
if (verbose) {
|
||||||
|
LOG.info("Good row key: " + delimiter + Bytes.toString(value.getRow()) + delimiter);
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
|
logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
|
||||||
LOG.error("Exception while comparing row : " + e);
|
LOG.error("Exception while comparing row : " + e);
|
||||||
|
@ -195,6 +203,10 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
Result sourceResult = sourceTable.get(new Get(row.getRow()));
|
Result sourceResult = sourceTable.get(new Get(row.getRow()));
|
||||||
Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
|
Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
|
||||||
Result.compareResults(sourceResult, replicatedResult);
|
Result.compareResults(sourceResult, replicatedResult);
|
||||||
|
context.getCounter(Counters.GOODROWS).increment(1);
|
||||||
|
if (verbose) {
|
||||||
|
LOG.info("Good row key: " + delimiter + Bytes.toString(row.getRow()) + delimiter);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("recompare fail after sleep, rowkey=" + delimiter +
|
LOG.error("recompare fail after sleep, rowkey=" + delimiter +
|
||||||
|
@ -224,7 +236,7 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (sourceTable != null) {
|
if (sourceTable != null) {
|
||||||
TableName tableName = replicatedTable.getName();
|
TableName tableName = sourceTable.getName();
|
||||||
try {
|
try {
|
||||||
sourceTable.close();
|
sourceTable.close();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
|
@ -297,6 +309,7 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
conf.setLong(NAME+".startTime", startTime);
|
conf.setLong(NAME+".startTime", startTime);
|
||||||
conf.setLong(NAME+".endTime", endTime);
|
conf.setLong(NAME+".endTime", endTime);
|
||||||
conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare);
|
conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare);
|
||||||
|
conf.setBoolean(NAME +".verbose", verbose);
|
||||||
if (families != null) {
|
if (families != null) {
|
||||||
conf.set(NAME+".families", families);
|
conf.set(NAME+".families", families);
|
||||||
}
|
}
|
||||||
|
@ -437,6 +450,11 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
|
sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
final String verboseKey = "--verbose";
|
||||||
|
if (cmd.startsWith(verboseKey)) {
|
||||||
|
verbose = true;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (i == args.length-2) {
|
if (i == args.length-2) {
|
||||||
peerId = cmd;
|
peerId = cmd;
|
||||||
|
@ -473,7 +491,8 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
System.err.println("ERROR: " + errorMsg);
|
System.err.println("ERROR: " + errorMsg);
|
||||||
}
|
}
|
||||||
System.err.println("Usage: verifyrep [--starttime=X]" +
|
System.err.println("Usage: verifyrep [--starttime=X]" +
|
||||||
" [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] <peerid> <tablename>");
|
" [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " +
|
||||||
|
"[--verbose] <peerid> <tablename>");
|
||||||
System.err.println();
|
System.err.println();
|
||||||
System.err.println("Options:");
|
System.err.println("Options:");
|
||||||
System.err.println(" starttime beginning of the time range");
|
System.err.println(" starttime beginning of the time range");
|
||||||
|
@ -485,6 +504,7 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
System.err.println(" delimiter the delimiter used in display around rowkey");
|
System.err.println(" delimiter the delimiter used in display around rowkey");
|
||||||
System.err.println(" recomparesleep milliseconds to sleep before recompare row, " +
|
System.err.println(" recomparesleep milliseconds to sleep before recompare row, " +
|
||||||
"default value is 0 which disables the recompare.");
|
"default value is 0 which disables the recompare.");
|
||||||
|
System.err.println(" verbose logs row keys of good rows");
|
||||||
System.err.println();
|
System.err.println();
|
||||||
System.err.println("Args:");
|
System.err.println("Args:");
|
||||||
System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");
|
System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");
|
||||||
|
|
Loading…
Reference in New Issue