From 4da8209a4d689611d777fc169c6bc7001df09615 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Mon, 4 May 2020 11:24:43 +0100 Subject: [PATCH] =?UTF-8?q?HBASE-24302=20Add=20an=20"ignoreTimestamps"=20o?= =?UTF-8?q?ption=20(defaulted=20to=20false)=20to=20=E2=80=A6=20(#1623)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jan Hentschel Signed-off-by: Josh Elser (cherry-picked from commit 3d59e328c8d467edfbd19c76d4081f15dde55bf8) --- .../hadoop/hbase/mapreduce/HashTable.java | 58 ++++++---- .../hadoop/hbase/mapreduce/SyncTable.java | 49 ++++++++- .../hadoop/hbase/mapreduce/TestSyncTable.java | 102 +++++++++++------- 3 files changed, 148 insertions(+), 61 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java index b67225e70d6..b3e212280ba 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java @@ -73,6 +73,7 @@ public class HashTable extends Configured implements Tool { final static String MANIFEST_FILE_NAME = "manifest"; final static String HASH_DATA_DIR = "hashes"; final static String OUTPUT_DATA_FILE_PREFIX = "part-r-"; + final static String IGNORE_TIMESTAMPS = "ignoreTimestamps"; private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp"; TableHash tableHash = new TableHash(); @@ -96,6 +97,7 @@ public class HashTable extends Configured implements Tool { int versions = -1; long startTime = 0; long endTime = 0; + boolean ignoreTimestamps; List partitions; @@ -434,6 +436,7 @@ public class HashTable extends Configured implements Tool { getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName)); Configuration jobConf = job.getConfiguration(); jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize); + jobConf.setBoolean(IGNORE_TIMESTAMPS, tableHash.ignoreTimestamps); job.setJarByClass(HashTable.class); TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(), @@ -471,6 +474,7 @@ public class HashTable extends Configured implements Tool { private ImmutableBytesWritable batchStartKey; private ImmutableBytesWritable batchHash; private long batchSize = 0; + boolean ignoreTimestamps; public ResultHasher() { @@ -503,10 +507,13 @@ public class HashTable extends Configured implements Tool { digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength); digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength); digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength); - long ts = cell.getTimestamp(); - for (int i = 8; i > 0; i--) { - digest.update((byte) ts); - ts >>>= 8; + + if (!ignoreTimestamps) { + long ts = cell.getTimestamp(); + for (int i = 8; i > 0; i--) { + digest.update((byte) ts); + ts >>>= 8; + } } digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength); @@ -552,7 +559,8 @@ public class HashTable extends Configured implements Tool { targetBatchSize = context.getConfiguration() .getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE); hasher = new ResultHasher(); - + hasher.ignoreTimestamps = context.getConfiguration(). + getBoolean(IGNORE_TIMESTAMPS, false); TableSplit split = (TableSplit) context.getInputSplit(); hasher.startBatch(new ImmutableBytesWritable(split.getStartRow())); } @@ -603,21 +611,24 @@ public class HashTable extends Configured implements Tool { System.err.println("Usage: HashTable [options] "); System.err.println(); System.err.println("Options:"); - System.err.println(" batchsize the target amount of bytes to hash in each batch"); - System.err.println(" rows are added to the batch until this size is reached"); - System.err.println(" (defaults to " + DEFAULT_BATCH_SIZE + " bytes)"); - System.err.println(" numhashfiles the number of hash files to create"); - System.err.println(" if set to fewer than number of regions then"); - System.err.println(" the job will create this number of reducers"); - System.err.println(" (defaults to 1/100 of regions -- at least 1)"); - System.err.println(" startrow the start row"); - System.err.println(" stoprow the stop row"); - System.err.println(" starttime beginning of the time range (unixtime in millis)"); - System.err.println(" without endtime means from starttime to forever"); - System.err.println(" endtime end of the time range. Ignored if no starttime specified."); - System.err.println(" scanbatch scanner batch size to support intra row scans"); - System.err.println(" versions number of cell versions to include"); - System.err.println(" families comma-separated list of families to include"); + System.err.println(" batchsize the target amount of bytes to hash in each batch"); + System.err.println(" rows are added to the batch until this size is reached"); + System.err.println(" (defaults to " + DEFAULT_BATCH_SIZE + " bytes)"); + System.err.println(" numhashfiles the number of hash files to create"); + System.err.println(" if set to fewer than number of regions then"); + System.err.println(" the job will create this number of reducers"); + System.err.println(" (defaults to 1/100 of regions -- at least 1)"); + System.err.println(" startrow the start row"); + System.err.println(" stoprow the stop row"); + System.err.println(" starttime beginning of the time range (unixtime in millis)"); + System.err.println(" without endtime means from starttime to forever"); + System.err.println(" endtime end of the time range."); + System.err.println(" Ignored if no starttime specified."); + System.err.println(" scanbatch scanner batch size to support intra row scans"); + System.err.println(" versions number of cell versions to include"); + System.err.println(" families comma-separated list of families to include"); + System.err.println(" ignoreTimestamps if true, ignores cell timestamps"); + System.err.println(" when calculating hashes"); System.err.println(); System.err.println("Args:"); System.err.println(" tablename Name of the table to hash"); @@ -702,6 +713,13 @@ public class HashTable extends Configured implements Tool { continue; } + final String ignoreTimestampsKey = "--ignoreTimestamps="; + if (cmd.startsWith(ignoreTimestampsKey)) { + tableHash.ignoreTimestamps = Boolean. + parseBoolean(cmd.substring(ignoreTimestampsKey.length())); + continue; + } + printUsage("Invalid argument '" + cmd + "'"); return false; } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java index 1bb9969712c..e092f9002af 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java @@ -26,6 +26,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -67,6 +69,7 @@ public class SyncTable extends Configured implements Tool { static final String DRY_RUN_CONF_KEY = "sync.table.dry.run"; static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes"; static final String DO_PUTS_CONF_KEY = "sync.table.do.puts"; + static final String IGNORE_TIMESTAMPS = "sync.table.ignore.timestamps"; Path sourceHashDir; String sourceTableName; @@ -77,6 +80,7 @@ public class SyncTable extends Configured implements Tool { boolean dryRun; boolean doDeletes = true; boolean doPuts = true; + boolean ignoreTimestamps; Counters counters; @@ -150,6 +154,7 @@ public class SyncTable extends Configured implements Tool { jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun); jobConf.setBoolean(DO_DELETES_CONF_KEY, doDeletes); jobConf.setBoolean(DO_PUTS_CONF_KEY, doPuts); + jobConf.setBoolean(IGNORE_TIMESTAMPS, ignoreTimestamps); TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(), SyncMapper.class, null, null, job); @@ -186,6 +191,7 @@ public class SyncTable extends Configured implements Tool { boolean dryRun; boolean doDeletes = true; boolean doPuts = true; + boolean ignoreTimestamp; HashTable.TableHash sourceTableHash; HashTable.TableHash.Reader sourceHashReader; @@ -212,6 +218,7 @@ public class SyncTable extends Configured implements Tool { dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false); doDeletes = conf.getBoolean(DO_DELETES_CONF_KEY, true); doPuts = conf.getBoolean(DO_PUTS_CONF_KEY, true); + ignoreTimestamp = conf.getBoolean(IGNORE_TIMESTAMPS, false); sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir); LOG.info("Read source hash manifest: " + sourceTableHash); @@ -227,6 +234,7 @@ public class SyncTable extends Configured implements Tool { // instead, find the first hash batch at or after the start row // and skip any rows that come before. they will be caught by the previous task targetHasher = new HashTable.ResultHasher(); + targetHasher.ignoreTimestamps = ignoreTimestamp; } private static Connection openConnection(Configuration conf, String zkClusterConfKey, @@ -474,6 +482,23 @@ public class SyncTable extends Configured implements Tool { } } + private Cell checkAndResetTimestamp(Cell sourceCell){ + if (ignoreTimestamp) { + sourceCell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) + .setType(sourceCell.getType()) + .setRow(sourceCell.getRowArray(), + sourceCell.getRowOffset(), sourceCell.getRowLength()) + .setFamily(sourceCell.getFamilyArray(), + sourceCell.getFamilyOffset(), sourceCell.getFamilyLength()) + .setQualifier(sourceCell.getQualifierArray(), + sourceCell.getQualifierOffset(), sourceCell.getQualifierLength()) + .setTimestamp(System.currentTimeMillis()) + .setValue(sourceCell.getValueArray(), + sourceCell.getValueOffset(), sourceCell.getValueLength()).build(); + } + return sourceCell; + } + /** * Compare the cells for the given row from the source and target tables. * Count and log any differences. @@ -502,6 +527,7 @@ public class SyncTable extends Configured implements Tool { if (put == null) { put = new Put(rowKey); } + sourceCell = checkAndResetTimestamp(sourceCell); put.add(sourceCell); } @@ -545,6 +571,7 @@ public class SyncTable extends Configured implements Tool { if (put == null) { put = new Put(rowKey); } + sourceCell = checkAndResetTimestamp(sourceCell); put.add(sourceCell); } } @@ -606,7 +633,7 @@ public class SyncTable extends Configured implements Tool { * They are assumed to be of the same row. * Nulls are after non-nulls. */ - private static int compareCellKeysWithinRow(Cell c1, Cell c2) { + private int compareCellKeysWithinRow(Cell c1, Cell c2) { if (c1 == null) { return 1; // source missing cell } @@ -624,8 +651,12 @@ public class SyncTable extends Configured implements Tool { return result; } - // note timestamp comparison is inverted - more recent cells first - return CellComparator.getInstance().compareTimestamps(c1, c2); + if (this.ignoreTimestamp) { + return 0; + } else { + // note timestamp comparison is inverted - more recent cells first + return CellComparator.getInstance().compareTimestamps(c1, c2); + } } @Override @@ -723,8 +754,12 @@ public class SyncTable extends Configured implements Tool { System.err.println(" (defaults to false)"); System.err.println(" doDeletes if false, does not perform deletes"); System.err.println(" (defaults to true)"); - System.err.println(" doPuts if false, does not perform puts "); + System.err.println(" doPuts if false, does not perform puts"); System.err.println(" (defaults to true)"); + System.err.println(" ignoreTimestamps if true, ignores cells timestamps while comparing "); + System.err.println(" cell values. Any missing cell on target then gets"); + System.err.println(" added with current time as timestamp "); + System.err.println(" (defaults to false)"); System.err.println(); System.err.println("Args:"); System.err.println(" sourcehashdir path to HashTable output dir for source table"); @@ -788,6 +823,12 @@ public class SyncTable extends Configured implements Tool { continue; } + final String ignoreTimestampsKey = "--ignoreTimestamps="; + if (cmd.startsWith(ignoreTimestampsKey)) { + ignoreTimestamps = Boolean.parseBoolean(cmd.substring(ignoreTimestampsKey.length())); + continue; + } + printUsage("Invalid argument '" + cmd + "'"); return false; } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java index 4ff8892f8d9..d84f80dcc1c 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.mapreduce; import static org.junit.Assert.assertEquals; import java.util.Arrays; + +import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -93,7 +95,7 @@ public class TestSyncTable { writeTestData(sourceTableName, targetTableName); hashSourceTable(sourceTableName, testDir); Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir); - assertEqualTables(90, sourceTableName, targetTableName); + assertEqualTables(90, sourceTableName, targetTableName, false); assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); @@ -152,8 +154,31 @@ public class TestSyncTable { TEST_UTIL.deleteTable(targetTableName); } + @Test + public void testSyncTableIgnoreTimestampsTrue() throws Exception { + final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); + final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); + Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableIgnoreTimestampsTrue"); + long current = System.currentTimeMillis(); + writeTestData(sourceTableName, targetTableName, current - 1000, current); + hashSourceTable(sourceTableName, testDir, "--ignoreTimestamps=true"); + Counters syncCounters = syncTables(sourceTableName, targetTableName, + testDir, "--ignoreTimestamps=true"); + assertEqualTables(90, sourceTableName, targetTableName, true); + + assertEquals(50, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); + assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); + assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); + assertEquals(30, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); + assertEquals(30, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); + assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); + + TEST_UTIL.deleteTable(sourceTableName); + TEST_UTIL.deleteTable(targetTableName); + } + private void assertEqualTables(int expectedRows, TableName sourceTableName, - TableName targetTableName) throws Exception { + TableName targetTableName, boolean ignoreTimestamps) throws Exception { Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName); Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName); @@ -200,7 +225,7 @@ public class TestSyncTable { if (!CellUtil.matchingQualifier(sourceCell, targetCell)) { Assert.fail("Qualifiers don't match"); } - if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) { + if (!ignoreTimestamps && !CellUtil.matchingTimestamp(sourceCell, targetCell)) { Assert.fail("Timestamps don't match"); } if (!CellUtil.matchingValue(sourceCell, targetCell)) { @@ -426,18 +451,19 @@ public class TestSyncTable { return syncTable.counters; } - private void hashSourceTable(TableName sourceTableName, Path testDir) throws Exception { + private void hashSourceTable(TableName sourceTableName, Path testDir, String... options) + throws Exception { int numHashFiles = 3; long batchSize = 100; // should be 2 batches per region int scanBatch = 1; HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration()); - int code = hashTable.run(new String[] { - "--batchsize=" + batchSize, - "--numhashfiles=" + numHashFiles, - "--scanbatch=" + scanBatch, - sourceTableName.getNameAsString(), - testDir.toString() - }); + String[] args = Arrays.copyOf(options, options.length+5); + args[options.length] = "--batchsize=" + batchSize; + args[options.length + 1] = "--numhashfiles=" + numHashFiles; + args[options.length + 2] = "--scanbatch=" + scanBatch; + args[options.length + 3] = sourceTableName.getNameAsString(); + args[options.length + 4] = testDir.toString(); + int code = hashTable.run(args); assertEquals("hash table job failed", 0, code); FileSystem fs = TEST_UTIL.getTestFileSystem(); @@ -451,8 +477,8 @@ public class TestSyncTable { LOG.info("Hash table completed"); } - private void writeTestData(TableName sourceTableName, TableName targetTableName) - throws Exception { + private void writeTestData(TableName sourceTableName, TableName targetTableName, + long... timestamps) throws Exception { final byte[] family = Bytes.toBytes("family"); final byte[] column1 = Bytes.toBytes("c1"); final byte[] column2 = Bytes.toBytes("c2"); @@ -463,6 +489,10 @@ public class TestSyncTable { int numRows = 100; int sourceRegions = 10; int targetRegions = 6; + if (ArrayUtils.isEmpty(timestamps)) { + long current = System.currentTimeMillis(); + timestamps = new long[]{current,current}; + } Table sourceTable = TEST_UTIL.createTable(sourceTableName, family, generateSplits(numRows, sourceRegions)); @@ -470,19 +500,17 @@ public class TestSyncTable { Table targetTable = TEST_UTIL.createTable(targetTableName, family, generateSplits(numRows, targetRegions)); - long timestamp = 1430764183454L; - int rowIndex = 0; // a bunch of identical rows for (; rowIndex < 40; rowIndex++) { Put sourcePut = new Put(Bytes.toBytes(rowIndex)); - sourcePut.addColumn(family, column1, timestamp, value1); - sourcePut.addColumn(family, column2, timestamp, value2); + sourcePut.addColumn(family, column1, timestamps[0], value1); + sourcePut.addColumn(family, column2, timestamps[0], value2); sourceTable.put(sourcePut); Put targetPut = new Put(Bytes.toBytes(rowIndex)); - targetPut.addColumn(family, column1, timestamp, value1); - targetPut.addColumn(family, column2, timestamp, value2); + targetPut.addColumn(family, column1, timestamps[1], value1); + targetPut.addColumn(family, column2, timestamps[1], value2); targetTable.put(targetPut); } // some rows only in the source table @@ -491,8 +519,8 @@ public class TestSyncTable { // TARGETMISSINGCELLS: 20 for (; rowIndex < 50; rowIndex++) { Put put = new Put(Bytes.toBytes(rowIndex)); - put.addColumn(family, column1, timestamp, value1); - put.addColumn(family, column2, timestamp, value2); + put.addColumn(family, column1, timestamps[0], value1); + put.addColumn(family, column2, timestamps[0], value2); sourceTable.put(put); } // some rows only in the target table @@ -501,8 +529,8 @@ public class TestSyncTable { // SOURCEMISSINGCELLS: 20 for (; rowIndex < 60; rowIndex++) { Put put = new Put(Bytes.toBytes(rowIndex)); - put.addColumn(family, column1, timestamp, value1); - put.addColumn(family, column2, timestamp, value2); + put.addColumn(family, column1, timestamps[1], value1); + put.addColumn(family, column2, timestamps[1], value2); targetTable.put(put); } // some rows with 1 missing cell in target table @@ -510,12 +538,12 @@ public class TestSyncTable { // TARGETMISSINGCELLS: 10 for (; rowIndex < 70; rowIndex++) { Put sourcePut = new Put(Bytes.toBytes(rowIndex)); - sourcePut.addColumn(family, column1, timestamp, value1); - sourcePut.addColumn(family, column2, timestamp, value2); + sourcePut.addColumn(family, column1, timestamps[0], value1); + sourcePut.addColumn(family, column2, timestamps[0], value2); sourceTable.put(sourcePut); Put targetPut = new Put(Bytes.toBytes(rowIndex)); - targetPut.addColumn(family, column1, timestamp, value1); + targetPut.addColumn(family, column1, timestamps[1], value1); targetTable.put(targetPut); } // some rows with 1 missing cell in source table @@ -523,12 +551,12 @@ public class TestSyncTable { // SOURCEMISSINGCELLS: 10 for (; rowIndex < 80; rowIndex++) { Put sourcePut = new Put(Bytes.toBytes(rowIndex)); - sourcePut.addColumn(family, column1, timestamp, value1); + sourcePut.addColumn(family, column1, timestamps[0], value1); sourceTable.put(sourcePut); Put targetPut = new Put(Bytes.toBytes(rowIndex)); - targetPut.addColumn(family, column1, timestamp, value1); - targetPut.addColumn(family, column2, timestamp, value2); + targetPut.addColumn(family, column1, timestamps[1], value1); + targetPut.addColumn(family, column2, timestamps[1], value2); targetTable.put(targetPut); } // some rows differing only in timestamp @@ -537,13 +565,13 @@ public class TestSyncTable { // TARGETMISSINGCELLS: 20 for (; rowIndex < 90; rowIndex++) { Put sourcePut = new Put(Bytes.toBytes(rowIndex)); - sourcePut.addColumn(family, column1, timestamp, column1); - sourcePut.addColumn(family, column2, timestamp, value2); + sourcePut.addColumn(family, column1, timestamps[0], column1); + sourcePut.addColumn(family, column2, timestamps[0], value2); sourceTable.put(sourcePut); Put targetPut = new Put(Bytes.toBytes(rowIndex)); - targetPut.addColumn(family, column1, timestamp+1, column1); - targetPut.addColumn(family, column2, timestamp-1, value2); + targetPut.addColumn(family, column1, timestamps[1]+1, column1); + targetPut.addColumn(family, column2, timestamps[1]-1, value2); targetTable.put(targetPut); } // some rows with different values @@ -551,13 +579,13 @@ public class TestSyncTable { // DIFFERENTCELLVALUES: 20 for (; rowIndex < numRows; rowIndex++) { Put sourcePut = new Put(Bytes.toBytes(rowIndex)); - sourcePut.addColumn(family, column1, timestamp, value1); - sourcePut.addColumn(family, column2, timestamp, value2); + sourcePut.addColumn(family, column1, timestamps[0], value1); + sourcePut.addColumn(family, column2, timestamps[0], value2); sourceTable.put(sourcePut); Put targetPut = new Put(Bytes.toBytes(rowIndex)); - targetPut.addColumn(family, column1, timestamp, value3); - targetPut.addColumn(family, column2, timestamp, value3); + targetPut.addColumn(family, column1, timestamps[1], value3); + targetPut.addColumn(family, column2, timestamps[1], value3); targetTable.put(targetPut); }