HBASE-16946 Provide Raw scan as an option in VerifyReplication (Sreekar Pallapothu)
This commit is contained in:
parent
79073cd40c
commit
c3bb3b35c5
|
@ -86,6 +86,7 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
static String rowPrefixes = null;
|
static String rowPrefixes = null;
|
||||||
static int sleepMsBeforeReCompare = 0;
|
static int sleepMsBeforeReCompare = 0;
|
||||||
static boolean verbose = false;
|
static boolean verbose = false;
|
||||||
|
static boolean includeDeletedCells = false;
|
||||||
|
|
||||||
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
|
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
|
||||||
|
|
||||||
|
@ -140,6 +141,8 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
scan.addFamily(Bytes.toBytes(fam));
|
scan.addFamily(Bytes.toBytes(fam));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false);
|
||||||
|
scan.setRaw(includeDeletedCells);
|
||||||
String rowPrefixes = conf.get(NAME + ".rowPrefixes", null);
|
String rowPrefixes = conf.get(NAME + ".rowPrefixes", null);
|
||||||
setRowPrefixFilter(scan, rowPrefixes);
|
setRowPrefixFilter(scan, rowPrefixes);
|
||||||
scan.setTimeRange(startTime, endTime);
|
scan.setTimeRange(startTime, endTime);
|
||||||
|
@ -325,6 +328,7 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare);
|
conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare);
|
||||||
conf.set(NAME + ".delimiter", delimiter);
|
conf.set(NAME + ".delimiter", delimiter);
|
||||||
conf.setBoolean(NAME +".verbose", verbose);
|
conf.setBoolean(NAME +".verbose", verbose);
|
||||||
|
conf.setBoolean(NAME +".includeDeletedCells", includeDeletedCells);
|
||||||
if (families != null) {
|
if (families != null) {
|
||||||
conf.set(NAME+".families", families);
|
conf.set(NAME+".families", families);
|
||||||
}
|
}
|
||||||
|
@ -349,6 +353,7 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
|
|
||||||
Scan scan = new Scan();
|
Scan scan = new Scan();
|
||||||
scan.setTimeRange(startTime, endTime);
|
scan.setTimeRange(startTime, endTime);
|
||||||
|
scan.setRaw(includeDeletedCells);
|
||||||
if (versions >= 0) {
|
if (versions >= 0) {
|
||||||
scan.setMaxVersions(versions);
|
scan.setMaxVersions(versions);
|
||||||
LOG.info("Number of versions set to " + versions);
|
LOG.info("Number of versions set to " + versions);
|
||||||
|
@ -426,6 +431,12 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final String includeDeletedCellsArgKey = "--raw";
|
||||||
|
if (cmd.equals(includeDeletedCellsArgKey)) {
|
||||||
|
includeDeletedCells = true;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
final String versionsArgKey = "--versions=";
|
final String versionsArgKey = "--versions=";
|
||||||
if (cmd.startsWith(versionsArgKey)) {
|
if (cmd.startsWith(versionsArgKey)) {
|
||||||
versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
|
versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
|
||||||
|
@ -495,6 +506,7 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
families = null;
|
families = null;
|
||||||
peerId = null;
|
peerId = null;
|
||||||
rowPrefixes = null;
|
rowPrefixes = null;
|
||||||
|
includeDeletedCells = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -513,6 +525,7 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
System.err.println(" without endtime means from starttime to forever");
|
System.err.println(" without endtime means from starttime to forever");
|
||||||
System.err.println(" endtime end of the time range");
|
System.err.println(" endtime end of the time range");
|
||||||
System.err.println(" versions number of cell versions to verify");
|
System.err.println(" versions number of cell versions to verify");
|
||||||
|
System.err.println(" raw includes raw scan if given in options");
|
||||||
System.err.println(" families comma-separated list of families to copy");
|
System.err.println(" families comma-separated list of families to copy");
|
||||||
System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
|
System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
|
||||||
System.err.println(" delimiter the delimiter used in display around rowkey");
|
System.err.println(" delimiter the delimiter used in display around rowkey");
|
||||||
|
|
|
@ -35,29 +35,30 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.ClusterStatus;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.ServerLoad;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||||
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
|
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -490,6 +491,102 @@ public class TestReplicationSmallTests extends TestReplicationBase {
|
||||||
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
|
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load a row into a table, make sure the data is really the same,
|
||||||
|
* delete the row, make sure the delete marker is replicated,
|
||||||
|
* run verify replication with and without raw to check the results.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testVerifyRepJobWithRawOptions() throws Exception {
|
||||||
|
LOG.info("testVerifyRepJobWithRawOptions");
|
||||||
|
|
||||||
|
TableName tablename = TableName.valueOf("test_raw");
|
||||||
|
byte[] familyname = Bytes.toBytes("fam_raw");
|
||||||
|
byte[] row = Bytes.toBytes("row_raw");
|
||||||
|
|
||||||
|
Table lHtable1 = null;
|
||||||
|
Table lHtable2 = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
HTableDescriptor table = new HTableDescriptor(tablename);
|
||||||
|
HColumnDescriptor fam = new HColumnDescriptor(familyname);
|
||||||
|
fam.setMaxVersions(100);
|
||||||
|
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
|
||||||
|
table.addFamily(fam);
|
||||||
|
scopes = new TreeMap<byte[], Integer>(
|
||||||
|
Bytes.BYTES_COMPARATOR);
|
||||||
|
for (HColumnDescriptor f : table.getColumnFamilies()) {
|
||||||
|
scopes.put(f.getName(), f.getScope());
|
||||||
|
}
|
||||||
|
|
||||||
|
Connection connection1 = ConnectionFactory.createConnection(conf1);
|
||||||
|
Connection connection2 = ConnectionFactory.createConnection(conf2);
|
||||||
|
try (Admin admin1 = connection1.getAdmin()) {
|
||||||
|
admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
|
||||||
|
}
|
||||||
|
try (Admin admin2 = connection2.getAdmin()) {
|
||||||
|
admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
|
||||||
|
}
|
||||||
|
utility1.waitUntilAllRegionsAssigned(tablename);
|
||||||
|
utility2.waitUntilAllRegionsAssigned(tablename);
|
||||||
|
|
||||||
|
lHtable1 = utility1.getConnection().getTable(tablename);
|
||||||
|
lHtable2 = utility2.getConnection().getTable(tablename);
|
||||||
|
|
||||||
|
Put put = new Put(row);
|
||||||
|
put.addColumn(familyname, row, row);
|
||||||
|
lHtable1.put(put);
|
||||||
|
|
||||||
|
Get get = new Get(row);
|
||||||
|
for (int i = 0; i < NB_RETRIES; i++) {
|
||||||
|
if (i==NB_RETRIES-1) {
|
||||||
|
fail("Waited too much time for put replication");
|
||||||
|
}
|
||||||
|
Result res = lHtable2.get(get);
|
||||||
|
if (res.size() == 0) {
|
||||||
|
LOG.info("Row not available");
|
||||||
|
Thread.sleep(SLEEP_TIME);
|
||||||
|
} else {
|
||||||
|
assertArrayEquals(res.value(), row);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Delete del = new Delete(row);
|
||||||
|
lHtable1.delete(del);
|
||||||
|
|
||||||
|
get = new Get(row);
|
||||||
|
for (int i = 0; i < NB_RETRIES; i++) {
|
||||||
|
if (i==NB_RETRIES-1) {
|
||||||
|
fail("Waited too much time for del replication");
|
||||||
|
}
|
||||||
|
Result res = lHtable2.get(get);
|
||||||
|
if (res.size() >= 1) {
|
||||||
|
LOG.info("Row not deleted");
|
||||||
|
Thread.sleep(SLEEP_TIME);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Checking verifyReplication for the default behavior.
|
||||||
|
String[] argsWithoutRaw = new String[] {PEER_ID, tablename.getNameAsString()};
|
||||||
|
runVerifyReplication(argsWithoutRaw, 0, 0);
|
||||||
|
|
||||||
|
// Checking verifyReplication with raw
|
||||||
|
String[] argsWithRawAsTrue = new String[] {"--raw", PEER_ID, tablename.getNameAsString()};
|
||||||
|
runVerifyReplication(argsWithRawAsTrue, 1, 0);
|
||||||
|
} finally {
|
||||||
|
if (lHtable1 != null) {
|
||||||
|
lHtable1.close();
|
||||||
|
}
|
||||||
|
if (lHtable2 != null) {
|
||||||
|
lHtable2.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
|
private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
|
||||||
throws IOException, InterruptedException, ClassNotFoundException {
|
throws IOException, InterruptedException, ClassNotFoundException {
|
||||||
Job job = VerifyReplication.createSubmittableJob(new Configuration(CONF_WITH_LOCALFS), args);
|
Job job = VerifyReplication.createSubmittableJob(new Configuration(CONF_WITH_LOCALFS), args);
|
||||||
|
|
Loading…
Reference in New Issue