HBASE-24351 Backport HBASE-24302 to branch-1 (#1693)
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
29c24e7257
commit
f57b3d68c7
|
@ -72,6 +72,7 @@ public class HashTable extends Configured implements Tool {
|
||||||
final static String MANIFEST_FILE_NAME = "manifest";
|
final static String MANIFEST_FILE_NAME = "manifest";
|
||||||
final static String HASH_DATA_DIR = "hashes";
|
final static String HASH_DATA_DIR = "hashes";
|
||||||
final static String OUTPUT_DATA_FILE_PREFIX = "part-r-";
|
final static String OUTPUT_DATA_FILE_PREFIX = "part-r-";
|
||||||
|
final static String IGNORE_TIMESTAMPS = "ignoreTimestamps";
|
||||||
private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp";
|
private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp";
|
||||||
|
|
||||||
TableHash tableHash = new TableHash();
|
TableHash tableHash = new TableHash();
|
||||||
|
@ -95,6 +96,7 @@ public class HashTable extends Configured implements Tool {
|
||||||
int versions = -1;
|
int versions = -1;
|
||||||
long startTime = 0;
|
long startTime = 0;
|
||||||
long endTime = 0;
|
long endTime = 0;
|
||||||
|
boolean ignoreTimestamps;
|
||||||
|
|
||||||
List<ImmutableBytesWritable> partitions;
|
List<ImmutableBytesWritable> partitions;
|
||||||
|
|
||||||
|
@ -433,6 +435,7 @@ public class HashTable extends Configured implements Tool {
|
||||||
getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
|
getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
|
||||||
Configuration jobConf = job.getConfiguration();
|
Configuration jobConf = job.getConfiguration();
|
||||||
jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
|
jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
|
||||||
|
jobConf.setBoolean(IGNORE_TIMESTAMPS, tableHash.ignoreTimestamps);
|
||||||
job.setJarByClass(HashTable.class);
|
job.setJarByClass(HashTable.class);
|
||||||
|
|
||||||
TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
|
TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
|
||||||
|
@ -470,6 +473,7 @@ public class HashTable extends Configured implements Tool {
|
||||||
private ImmutableBytesWritable batchStartKey;
|
private ImmutableBytesWritable batchStartKey;
|
||||||
private ImmutableBytesWritable batchHash;
|
private ImmutableBytesWritable batchHash;
|
||||||
private long batchSize = 0;
|
private long batchSize = 0;
|
||||||
|
boolean ignoreTimestamps;
|
||||||
|
|
||||||
|
|
||||||
public ResultHasher() {
|
public ResultHasher() {
|
||||||
|
@ -503,9 +507,11 @@ public class HashTable extends Configured implements Tool {
|
||||||
digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
|
digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
|
||||||
digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength);
|
digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength);
|
||||||
long ts = cell.getTimestamp();
|
long ts = cell.getTimestamp();
|
||||||
for (int i = 8; i > 0; i--) {
|
if(!ignoreTimestamps) {
|
||||||
digest.update((byte) ts);
|
for (int i = 8; i > 0; i--) {
|
||||||
ts >>>= 8;
|
digest.update((byte) ts);
|
||||||
|
ts >>>= 8;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength);
|
digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength);
|
||||||
|
|
||||||
|
@ -551,7 +557,8 @@ public class HashTable extends Configured implements Tool {
|
||||||
targetBatchSize = context.getConfiguration()
|
targetBatchSize = context.getConfiguration()
|
||||||
.getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
|
.getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
|
||||||
hasher = new ResultHasher();
|
hasher = new ResultHasher();
|
||||||
|
hasher.ignoreTimestamps = context.getConfiguration().
|
||||||
|
getBoolean(IGNORE_TIMESTAMPS, false);
|
||||||
TableSplit split = (TableSplit) context.getInputSplit();
|
TableSplit split = (TableSplit) context.getInputSplit();
|
||||||
hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
|
hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
|
||||||
}
|
}
|
||||||
|
@ -602,21 +609,24 @@ public class HashTable extends Configured implements Tool {
|
||||||
System.err.println("Usage: HashTable [options] <tablename> <outputpath>");
|
System.err.println("Usage: HashTable [options] <tablename> <outputpath>");
|
||||||
System.err.println();
|
System.err.println();
|
||||||
System.err.println("Options:");
|
System.err.println("Options:");
|
||||||
System.err.println(" batchsize the target amount of bytes to hash in each batch");
|
System.err.println(" batchsize the target amount of bytes to hash in each batch");
|
||||||
System.err.println(" rows are added to the batch until this size is reached");
|
System.err.println(" rows are added to the batch until this size is reached");
|
||||||
System.err.println(" (defaults to " + DEFAULT_BATCH_SIZE + " bytes)");
|
System.err.println(" (defaults to " + DEFAULT_BATCH_SIZE + " bytes)");
|
||||||
System.err.println(" numhashfiles the number of hash files to create");
|
System.err.println(" numhashfiles the number of hash files to create");
|
||||||
System.err.println(" if set to fewer than number of regions then");
|
System.err.println(" if set to fewer than number of regions then");
|
||||||
System.err.println(" the job will create this number of reducers");
|
System.err.println(" the job will create this number of reducers");
|
||||||
System.err.println(" (defaults to 1/100 of regions -- at least 1)");
|
System.err.println(" (defaults to 1/100 of regions -- at least 1)");
|
||||||
System.err.println(" startrow the start row");
|
System.err.println(" startrow the start row");
|
||||||
System.err.println(" stoprow the stop row");
|
System.err.println(" stoprow the stop row");
|
||||||
System.err.println(" starttime beginning of the time range (unixtime in millis)");
|
System.err.println(" starttime beginning of the time range (unixtime in millis)");
|
||||||
System.err.println(" without endtime means from starttime to forever");
|
System.err.println(" without endtime means from starttime to forever");
|
||||||
System.err.println(" endtime end of the time range. Ignored if no starttime specified.");
|
System.err.println(" endtime end of the time range.");
|
||||||
System.err.println(" scanbatch scanner batch size to support intra row scans");
|
System.err.println(" Ignored if no starttime specified.");
|
||||||
System.err.println(" versions number of cell versions to include");
|
System.err.println(" scanbatch scanner batch size to support intra row scans");
|
||||||
System.err.println(" families comma-separated list of families to include");
|
System.err.println(" versions number of cell versions to include");
|
||||||
|
System.err.println(" families comma-separated list of families to include");
|
||||||
|
System.err.println(" ignoreTimestamps if true, ignores cell timestamps");
|
||||||
|
System.err.println(" when calculating hashes");
|
||||||
System.err.println();
|
System.err.println();
|
||||||
System.err.println("Args:");
|
System.err.println("Args:");
|
||||||
System.err.println(" tablename Name of the table to hash");
|
System.err.println(" tablename Name of the table to hash");
|
||||||
|
@ -701,6 +711,13 @@ public class HashTable extends Configured implements Tool {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final String ignoreTimestampsKey = "--ignoreTimestamps=";
|
||||||
|
if (cmd.startsWith(ignoreTimestampsKey)) {
|
||||||
|
tableHash.ignoreTimestamps = Boolean.
|
||||||
|
parseBoolean(cmd.substring(ignoreTimestampsKey.length()));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
printUsage("Invalid argument '" + cmd + "'");
|
printUsage("Invalid argument '" + cmd + "'");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellComparator;
|
import org.apache.hadoop.hbase.CellComparator;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
@ -66,6 +67,7 @@ public class SyncTable extends Configured implements Tool {
|
||||||
static final String DRY_RUN_CONF_KEY = "sync.table.dry.run";
|
static final String DRY_RUN_CONF_KEY = "sync.table.dry.run";
|
||||||
static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes";
|
static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes";
|
||||||
static final String DO_PUTS_CONF_KEY = "sync.table.do.puts";
|
static final String DO_PUTS_CONF_KEY = "sync.table.do.puts";
|
||||||
|
static final String IGNORE_TIMESTAMPS = "sync.table.ignore.timestamps";
|
||||||
|
|
||||||
Path sourceHashDir;
|
Path sourceHashDir;
|
||||||
String sourceTableName;
|
String sourceTableName;
|
||||||
|
@ -76,6 +78,7 @@ public class SyncTable extends Configured implements Tool {
|
||||||
boolean dryRun;
|
boolean dryRun;
|
||||||
boolean doDeletes = true;
|
boolean doDeletes = true;
|
||||||
boolean doPuts = true;
|
boolean doPuts = true;
|
||||||
|
boolean ignoreTimestamps;
|
||||||
|
|
||||||
Counters counters;
|
Counters counters;
|
||||||
|
|
||||||
|
@ -149,6 +152,7 @@ public class SyncTable extends Configured implements Tool {
|
||||||
jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
|
jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
|
||||||
jobConf.setBoolean(DO_DELETES_CONF_KEY, doDeletes);
|
jobConf.setBoolean(DO_DELETES_CONF_KEY, doDeletes);
|
||||||
jobConf.setBoolean(DO_PUTS_CONF_KEY, doPuts);
|
jobConf.setBoolean(DO_PUTS_CONF_KEY, doPuts);
|
||||||
|
jobConf.setBoolean(IGNORE_TIMESTAMPS, ignoreTimestamps);
|
||||||
|
|
||||||
TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(),
|
TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(),
|
||||||
SyncMapper.class, null, null, job);
|
SyncMapper.class, null, null, job);
|
||||||
|
@ -185,6 +189,7 @@ public class SyncTable extends Configured implements Tool {
|
||||||
boolean dryRun;
|
boolean dryRun;
|
||||||
boolean doDeletes = true;
|
boolean doDeletes = true;
|
||||||
boolean doPuts = true;
|
boolean doPuts = true;
|
||||||
|
boolean ignoreTimestamp;
|
||||||
|
|
||||||
HashTable.TableHash sourceTableHash;
|
HashTable.TableHash sourceTableHash;
|
||||||
HashTable.TableHash.Reader sourceHashReader;
|
HashTable.TableHash.Reader sourceHashReader;
|
||||||
|
@ -211,6 +216,7 @@ public class SyncTable extends Configured implements Tool {
|
||||||
dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
|
dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
|
||||||
doDeletes = conf.getBoolean(DO_DELETES_CONF_KEY, true);
|
doDeletes = conf.getBoolean(DO_DELETES_CONF_KEY, true);
|
||||||
doPuts = conf.getBoolean(DO_PUTS_CONF_KEY, true);
|
doPuts = conf.getBoolean(DO_PUTS_CONF_KEY, true);
|
||||||
|
ignoreTimestamp = conf.getBoolean(IGNORE_TIMESTAMPS, false);
|
||||||
|
|
||||||
sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
|
sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
|
||||||
LOG.info("Read source hash manifest: " + sourceTableHash);
|
LOG.info("Read source hash manifest: " + sourceTableHash);
|
||||||
|
@ -226,6 +232,7 @@ public class SyncTable extends Configured implements Tool {
|
||||||
// instead, find the first hash batch at or after the start row
|
// instead, find the first hash batch at or after the start row
|
||||||
// and skip any rows that come before. they will be caught by the previous task
|
// and skip any rows that come before. they will be caught by the previous task
|
||||||
targetHasher = new HashTable.ResultHasher();
|
targetHasher = new HashTable.ResultHasher();
|
||||||
|
targetHasher.ignoreTimestamps = ignoreTimestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Connection openConnection(Configuration conf, String zkClusterConfKey,
|
private static Connection openConnection(Configuration conf, String zkClusterConfKey,
|
||||||
|
@ -473,6 +480,14 @@ public class SyncTable extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Cell checkAndResetTimestamp(Cell sourceCell){
|
||||||
|
if (ignoreTimestamp) {
|
||||||
|
sourceCell = new KeyValue(sourceCell);
|
||||||
|
((KeyValue) sourceCell).setTimestamp(System.currentTimeMillis());
|
||||||
|
}
|
||||||
|
return sourceCell;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compare the cells for the given row from the source and target tables.
|
* Compare the cells for the given row from the source and target tables.
|
||||||
* Count and log any differences.
|
* Count and log any differences.
|
||||||
|
@ -501,6 +516,7 @@ public class SyncTable extends Configured implements Tool {
|
||||||
if (put == null) {
|
if (put == null) {
|
||||||
put = new Put(rowKey);
|
put = new Put(rowKey);
|
||||||
}
|
}
|
||||||
|
sourceCell = checkAndResetTimestamp(sourceCell);
|
||||||
put.add(sourceCell);
|
put.add(sourceCell);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -544,6 +560,7 @@ public class SyncTable extends Configured implements Tool {
|
||||||
if (put == null) {
|
if (put == null) {
|
||||||
put = new Put(rowKey);
|
put = new Put(rowKey);
|
||||||
}
|
}
|
||||||
|
sourceCell = checkAndResetTimestamp(sourceCell);
|
||||||
put.add(sourceCell);
|
put.add(sourceCell);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -604,7 +621,7 @@ public class SyncTable extends Configured implements Tool {
|
||||||
* They are assumed to be of the same row.
|
* They are assumed to be of the same row.
|
||||||
* Nulls are after non-nulls.
|
* Nulls are after non-nulls.
|
||||||
*/
|
*/
|
||||||
private static int compareCellKeysWithinRow(Cell c1, Cell c2) {
|
private int compareCellKeysWithinRow(Cell c1, Cell c2) {
|
||||||
if (c1 == null) {
|
if (c1 == null) {
|
||||||
return 1; // source missing cell
|
return 1; // source missing cell
|
||||||
}
|
}
|
||||||
|
@ -621,9 +638,12 @@ public class SyncTable extends Configured implements Tool {
|
||||||
if (result != 0) {
|
if (result != 0) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
if (this.ignoreTimestamp) {
|
||||||
// note timestamp comparison is inverted - more recent cells first
|
return 0;
|
||||||
return CellComparator.compareTimestamps(c1, c2);
|
} else{
|
||||||
|
// note timestamp comparison is inverted - more recent cells first
|
||||||
|
return CellComparator.compareTimestamps(c1, c2);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -723,6 +743,10 @@ public class SyncTable extends Configured implements Tool {
|
||||||
System.err.println(" (defaults to true)");
|
System.err.println(" (defaults to true)");
|
||||||
System.err.println(" doPuts if false, does not perform puts ");
|
System.err.println(" doPuts if false, does not perform puts ");
|
||||||
System.err.println(" (defaults to true)");
|
System.err.println(" (defaults to true)");
|
||||||
|
System.err.println(" ignoreTimestamps if true, ignores cells timestamps while comparing ");
|
||||||
|
System.err.println(" cell values. Any missing cell on target then gets");
|
||||||
|
System.err.println(" added with current time as timestamp ");
|
||||||
|
System.err.println(" (defaults to false)");
|
||||||
System.err.println();
|
System.err.println();
|
||||||
System.err.println("Args:");
|
System.err.println("Args:");
|
||||||
System.err.println(" sourcehashdir path to HashTable output dir for source table");
|
System.err.println(" sourcehashdir path to HashTable output dir for source table");
|
||||||
|
@ -786,6 +810,12 @@ public class SyncTable extends Configured implements Tool {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final String ignoreTimestampsKey = "--ignoreTimestamps=";
|
||||||
|
if (cmd.startsWith(ignoreTimestampsKey)) {
|
||||||
|
ignoreTimestamps = Boolean.parseBoolean(cmd.substring(ignoreTimestampsKey.length()));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
printUsage("Invalid argument '" + cmd + "'");
|
printUsage("Invalid argument '" + cmd + "'");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.ArrayUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
@ -91,7 +90,7 @@ public class TestSyncTable {
|
||||||
writeTestData(sourceTableName, targetTableName);
|
writeTestData(sourceTableName, targetTableName);
|
||||||
hashSourceTable(sourceTableName, testDir);
|
hashSourceTable(sourceTableName, testDir);
|
||||||
Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir);
|
Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir);
|
||||||
assertEqualTables(90, sourceTableName, targetTableName);
|
assertEqualTables(90, sourceTableName, targetTableName, false);
|
||||||
|
|
||||||
assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
|
assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
|
||||||
assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
|
assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
|
||||||
|
@ -150,8 +149,31 @@ public class TestSyncTable {
|
||||||
TEST_UTIL.deleteTable(targetTableName);
|
TEST_UTIL.deleteTable(targetTableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSyncTableIgnoreTimestampsTrue() throws Exception {
|
||||||
|
final TableName sourceTableName = TableName.valueOf("testSyncTableIgnoreTimestampsTrue_source");
|
||||||
|
final TableName targetTableName = TableName.valueOf("testSyncTableIgnoreTimestampsTrue_target");
|
||||||
|
Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableIgnoreTimestampsTrue");
|
||||||
|
long current = System.currentTimeMillis();
|
||||||
|
writeTestData(sourceTableName, targetTableName, current - 1000, current);
|
||||||
|
hashSourceTable(sourceTableName, testDir, "--ignoreTimestamps=true");
|
||||||
|
Counters syncCounters = syncTables(sourceTableName, targetTableName,
|
||||||
|
testDir, "--ignoreTimestamps=true");
|
||||||
|
assertEqualTables(90, sourceTableName, targetTableName, true);
|
||||||
|
|
||||||
|
assertEquals(50, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
|
||||||
|
assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
|
||||||
|
assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
|
||||||
|
assertEquals(30, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
|
||||||
|
assertEquals(30, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
|
||||||
|
assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
|
||||||
|
|
||||||
|
TEST_UTIL.deleteTable(sourceTableName);
|
||||||
|
TEST_UTIL.deleteTable(targetTableName);
|
||||||
|
}
|
||||||
|
|
||||||
private void assertEqualTables(int expectedRows, TableName sourceTableName,
|
private void assertEqualTables(int expectedRows, TableName sourceTableName,
|
||||||
TableName targetTableName) throws Exception {
|
TableName targetTableName, boolean ignoreTimestamps) throws Exception {
|
||||||
Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
|
Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
|
||||||
Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
|
Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
|
||||||
|
|
||||||
|
@ -198,7 +220,7 @@ public class TestSyncTable {
|
||||||
if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
|
if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
|
||||||
Assert.fail("Qualifiers don't match");
|
Assert.fail("Qualifiers don't match");
|
||||||
}
|
}
|
||||||
if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
|
if (!ignoreTimestamps && !CellUtil.matchingTimestamp(sourceCell, targetCell)) {
|
||||||
Assert.fail("Timestamps don't match");
|
Assert.fail("Timestamps don't match");
|
||||||
}
|
}
|
||||||
if (!CellUtil.matchingValue(sourceCell, targetCell)) {
|
if (!CellUtil.matchingValue(sourceCell, targetCell)) {
|
||||||
|
@ -428,18 +450,19 @@ public class TestSyncTable {
|
||||||
return syncTable.counters;
|
return syncTable.counters;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void hashSourceTable(TableName sourceTableName, Path testDir)
|
private void hashSourceTable(TableName sourceTableName, Path testDir, String... options)
|
||||||
throws Exception, IOException {
|
throws Exception {
|
||||||
int numHashFiles = 3;
|
int numHashFiles = 3;
|
||||||
long batchSize = 100; // should be 2 batches per region
|
long batchSize = 100; // should be 2 batches per region
|
||||||
int scanBatch = 1;
|
int scanBatch = 1;
|
||||||
HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
|
HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
|
||||||
int code = hashTable.run(new String[] {
|
String[] args = Arrays.copyOf(options, options.length + 5);
|
||||||
"--batchsize=" + batchSize,
|
args[options.length] = "--batchsize=" + batchSize;
|
||||||
"--numhashfiles=" + numHashFiles,
|
args[options.length + 1] = "--numhashfiles=" + numHashFiles;
|
||||||
"--scanbatch=" + scanBatch,
|
args[options.length + 2] = "--scanbatch=" + scanBatch;
|
||||||
sourceTableName.getNameAsString(),
|
args[options.length + 3] = sourceTableName.getNameAsString();
|
||||||
testDir.toString()});
|
args[options.length + 4] = testDir.toString();
|
||||||
|
int code = hashTable.run(args);
|
||||||
assertEquals("hash table job failed", 0, code);
|
assertEquals("hash table job failed", 0, code);
|
||||||
|
|
||||||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||||
|
@ -453,8 +476,8 @@ public class TestSyncTable {
|
||||||
LOG.info("Hash table completed");
|
LOG.info("Hash table completed");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeTestData(TableName sourceTableName, TableName targetTableName)
|
private void writeTestData(TableName sourceTableName, TableName targetTableName,
|
||||||
throws Exception {
|
long... timestamps) throws Exception {
|
||||||
final byte[] family = Bytes.toBytes("family");
|
final byte[] family = Bytes.toBytes("family");
|
||||||
final byte[] column1 = Bytes.toBytes("c1");
|
final byte[] column1 = Bytes.toBytes("c1");
|
||||||
final byte[] column2 = Bytes.toBytes("c2");
|
final byte[] column2 = Bytes.toBytes("c2");
|
||||||
|
@ -465,26 +488,27 @@ public class TestSyncTable {
|
||||||
int numRows = 100;
|
int numRows = 100;
|
||||||
int sourceRegions = 10;
|
int sourceRegions = 10;
|
||||||
int targetRegions = 6;
|
int targetRegions = 6;
|
||||||
|
if (ArrayUtils.isEmpty(timestamps)) {
|
||||||
|
long current = System.currentTimeMillis();
|
||||||
|
timestamps = new long[]{current,current};
|
||||||
|
}
|
||||||
Table sourceTable = TEST_UTIL.createTable(sourceTableName,
|
Table sourceTable = TEST_UTIL.createTable(sourceTableName,
|
||||||
family, generateSplits(numRows, sourceRegions));
|
family, generateSplits(numRows, sourceRegions));
|
||||||
|
|
||||||
Table targetTable = TEST_UTIL.createTable(targetTableName,
|
Table targetTable = TEST_UTIL.createTable(targetTableName,
|
||||||
family, generateSplits(numRows, targetRegions));
|
family, generateSplits(numRows, targetRegions));
|
||||||
|
|
||||||
long timestamp = 1430764183454L;
|
|
||||||
|
|
||||||
int rowIndex = 0;
|
int rowIndex = 0;
|
||||||
// a bunch of identical rows
|
// a bunch of identical rows
|
||||||
for (; rowIndex < 40; rowIndex++) {
|
for (; rowIndex < 40; rowIndex++) {
|
||||||
Put sourcePut = new Put(Bytes.toBytes(rowIndex));
|
Put sourcePut = new Put(Bytes.toBytes(rowIndex));
|
||||||
sourcePut.addColumn(family, column1, timestamp, value1);
|
sourcePut.addColumn(family, column1, timestamps[0], value1);
|
||||||
sourcePut.addColumn(family, column2, timestamp, value2);
|
sourcePut.addColumn(family, column2, timestamps[0], value2);
|
||||||
sourceTable.put(sourcePut);
|
sourceTable.put(sourcePut);
|
||||||
|
|
||||||
Put targetPut = new Put(Bytes.toBytes(rowIndex));
|
Put targetPut = new Put(Bytes.toBytes(rowIndex));
|
||||||
targetPut.addColumn(family, column1, timestamp, value1);
|
targetPut.addColumn(family, column1, timestamps[1], value1);
|
||||||
targetPut.addColumn(family, column2, timestamp, value2);
|
targetPut.addColumn(family, column2, timestamps[1], value2);
|
||||||
targetTable.put(targetPut);
|
targetTable.put(targetPut);
|
||||||
}
|
}
|
||||||
// some rows only in the source table
|
// some rows only in the source table
|
||||||
|
@ -493,8 +517,8 @@ public class TestSyncTable {
|
||||||
// TARGETMISSINGCELLS: 20
|
// TARGETMISSINGCELLS: 20
|
||||||
for (; rowIndex < 50; rowIndex++) {
|
for (; rowIndex < 50; rowIndex++) {
|
||||||
Put put = new Put(Bytes.toBytes(rowIndex));
|
Put put = new Put(Bytes.toBytes(rowIndex));
|
||||||
put.addColumn(family, column1, timestamp, value1);
|
put.addColumn(family, column1, timestamps[0], value1);
|
||||||
put.addColumn(family, column2, timestamp, value2);
|
put.addColumn(family, column2, timestamps[0], value2);
|
||||||
sourceTable.put(put);
|
sourceTable.put(put);
|
||||||
}
|
}
|
||||||
// some rows only in the target table
|
// some rows only in the target table
|
||||||
|
@ -503,8 +527,8 @@ public class TestSyncTable {
|
||||||
// SOURCEMISSINGCELLS: 20
|
// SOURCEMISSINGCELLS: 20
|
||||||
for (; rowIndex < 60; rowIndex++) {
|
for (; rowIndex < 60; rowIndex++) {
|
||||||
Put put = new Put(Bytes.toBytes(rowIndex));
|
Put put = new Put(Bytes.toBytes(rowIndex));
|
||||||
put.addColumn(family, column1, timestamp, value1);
|
put.addColumn(family, column1, timestamps[1], value1);
|
||||||
put.addColumn(family, column2, timestamp, value2);
|
put.addColumn(family, column2, timestamps[1], value2);
|
||||||
targetTable.put(put);
|
targetTable.put(put);
|
||||||
}
|
}
|
||||||
// some rows with 1 missing cell in target table
|
// some rows with 1 missing cell in target table
|
||||||
|
@ -512,12 +536,12 @@ public class TestSyncTable {
|
||||||
// TARGETMISSINGCELLS: 10
|
// TARGETMISSINGCELLS: 10
|
||||||
for (; rowIndex < 70; rowIndex++) {
|
for (; rowIndex < 70; rowIndex++) {
|
||||||
Put sourcePut = new Put(Bytes.toBytes(rowIndex));
|
Put sourcePut = new Put(Bytes.toBytes(rowIndex));
|
||||||
sourcePut.addColumn(family, column1, timestamp, value1);
|
sourcePut.addColumn(family, column1, timestamps[0], value1);
|
||||||
sourcePut.addColumn(family, column2, timestamp, value2);
|
sourcePut.addColumn(family, column2, timestamps[0], value2);
|
||||||
sourceTable.put(sourcePut);
|
sourceTable.put(sourcePut);
|
||||||
|
|
||||||
Put targetPut = new Put(Bytes.toBytes(rowIndex));
|
Put targetPut = new Put(Bytes.toBytes(rowIndex));
|
||||||
targetPut.addColumn(family, column1, timestamp, value1);
|
targetPut.addColumn(family, column1, timestamps[1], value1);
|
||||||
targetTable.put(targetPut);
|
targetTable.put(targetPut);
|
||||||
}
|
}
|
||||||
// some rows with 1 missing cell in source table
|
// some rows with 1 missing cell in source table
|
||||||
|
@ -525,12 +549,12 @@ public class TestSyncTable {
|
||||||
// SOURCEMISSINGCELLS: 10
|
// SOURCEMISSINGCELLS: 10
|
||||||
for (; rowIndex < 80; rowIndex++) {
|
for (; rowIndex < 80; rowIndex++) {
|
||||||
Put sourcePut = new Put(Bytes.toBytes(rowIndex));
|
Put sourcePut = new Put(Bytes.toBytes(rowIndex));
|
||||||
sourcePut.addColumn(family, column1, timestamp, value1);
|
sourcePut.addColumn(family, column1, timestamps[0], value1);
|
||||||
sourceTable.put(sourcePut);
|
sourceTable.put(sourcePut);
|
||||||
|
|
||||||
Put targetPut = new Put(Bytes.toBytes(rowIndex));
|
Put targetPut = new Put(Bytes.toBytes(rowIndex));
|
||||||
targetPut.addColumn(family, column1, timestamp, value1);
|
targetPut.addColumn(family, column1, timestamps[1], value1);
|
||||||
targetPut.addColumn(family, column2, timestamp, value2);
|
targetPut.addColumn(family, column2, timestamps[1], value2);
|
||||||
targetTable.put(targetPut);
|
targetTable.put(targetPut);
|
||||||
}
|
}
|
||||||
// some rows differing only in timestamp
|
// some rows differing only in timestamp
|
||||||
|
@ -539,13 +563,13 @@ public class TestSyncTable {
|
||||||
// TARGETMISSINGCELLS: 20
|
// TARGETMISSINGCELLS: 20
|
||||||
for (; rowIndex < 90; rowIndex++) {
|
for (; rowIndex < 90; rowIndex++) {
|
||||||
Put sourcePut = new Put(Bytes.toBytes(rowIndex));
|
Put sourcePut = new Put(Bytes.toBytes(rowIndex));
|
||||||
sourcePut.addColumn(family, column1, timestamp, column1);
|
sourcePut.addColumn(family, column1, timestamps[0], column1);
|
||||||
sourcePut.addColumn(family, column2, timestamp, value2);
|
sourcePut.addColumn(family, column2, timestamps[0], value2);
|
||||||
sourceTable.put(sourcePut);
|
sourceTable.put(sourcePut);
|
||||||
|
|
||||||
Put targetPut = new Put(Bytes.toBytes(rowIndex));
|
Put targetPut = new Put(Bytes.toBytes(rowIndex));
|
||||||
targetPut.addColumn(family, column1, timestamp+1, column1);
|
targetPut.addColumn(family, column1, timestamps[1]+1, column1);
|
||||||
targetPut.addColumn(family, column2, timestamp-1, value2);
|
targetPut.addColumn(family, column2, timestamps[1]-1, value2);
|
||||||
targetTable.put(targetPut);
|
targetTable.put(targetPut);
|
||||||
}
|
}
|
||||||
// some rows with different values
|
// some rows with different values
|
||||||
|
@ -553,13 +577,13 @@ public class TestSyncTable {
|
||||||
// DIFFERENTCELLVALUES: 20
|
// DIFFERENTCELLVALUES: 20
|
||||||
for (; rowIndex < numRows; rowIndex++) {
|
for (; rowIndex < numRows; rowIndex++) {
|
||||||
Put sourcePut = new Put(Bytes.toBytes(rowIndex));
|
Put sourcePut = new Put(Bytes.toBytes(rowIndex));
|
||||||
sourcePut.addColumn(family, column1, timestamp, value1);
|
sourcePut.addColumn(family, column1, timestamps[0], value1);
|
||||||
sourcePut.addColumn(family, column2, timestamp, value2);
|
sourcePut.addColumn(family, column2, timestamps[0], value2);
|
||||||
sourceTable.put(sourcePut);
|
sourceTable.put(sourcePut);
|
||||||
|
|
||||||
Put targetPut = new Put(Bytes.toBytes(rowIndex));
|
Put targetPut = new Put(Bytes.toBytes(rowIndex));
|
||||||
targetPut.addColumn(family, column1, timestamp, value3);
|
targetPut.addColumn(family, column1, timestamps[1], value3);
|
||||||
targetPut.addColumn(family, column2, timestamp, value3);
|
targetPut.addColumn(family, column2, timestamps[1], value3);
|
||||||
targetTable.put(targetPut);
|
targetTable.put(targetPut);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue