diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java index dab84c4f14b..1bb9969712c 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java @@ -64,7 +64,9 @@ public class SyncTable extends Configured implements Tool { static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name"; static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster"; static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster"; - static final String DRY_RUN_CONF_KEY="sync.table.dry.run"; + static final String DRY_RUN_CONF_KEY = "sync.table.dry.run"; + static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes"; + static final String DO_PUTS_CONF_KEY = "sync.table.do.puts"; Path sourceHashDir; String sourceTableName; @@ -73,6 +75,8 @@ public class SyncTable extends Configured implements Tool { String sourceZkCluster; String targetZkCluster; boolean dryRun; + boolean doDeletes = true; + boolean doPuts = true; Counters counters; @@ -144,6 +148,8 @@ public class SyncTable extends Configured implements Tool { initCredentialsForHBase(targetZkCluster, job); } jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun); + jobConf.setBoolean(DO_DELETES_CONF_KEY, doDeletes); + jobConf.setBoolean(DO_PUTS_CONF_KEY, doPuts); TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(), SyncMapper.class, null, null, job); @@ -178,6 +184,8 @@ public class SyncTable extends Configured implements Tool { Table sourceTable; Table targetTable; boolean dryRun; + boolean doDeletes = true; + boolean doPuts = true; HashTable.TableHash sourceTableHash; HashTable.TableHash.Reader sourceHashReader; @@ -201,7 +209,9 @@ public class SyncTable extends Configured implements Tool { TableOutputFormat.OUTPUT_CONF_PREFIX); sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY); targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY); - dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false); + dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false); + doDeletes = conf.getBoolean(DO_DELETES_CONF_KEY, true); + doPuts = conf.getBoolean(DO_PUTS_CONF_KEY, true); sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir); LOG.info("Read source hash manifest: " + sourceTableHash); @@ -488,7 +498,7 @@ public class SyncTable extends Configured implements Tool { context.getCounter(Counter.TARGETMISSINGCELLS).increment(1); matchingRow = false; - if (!dryRun) { + if (!dryRun && doPuts) { if (put == null) { put = new Put(rowKey); } @@ -503,7 +513,7 @@ public class SyncTable extends Configured implements Tool { context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1); matchingRow = false; - if (!dryRun) { + if (!dryRun && doDeletes) { if (delete == null) { delete = new Delete(rowKey); } @@ -530,7 +540,7 @@ public class SyncTable extends Configured implements Tool { context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1); matchingRow = false; - if (!dryRun) { + if (!dryRun && doPuts) { // overwrite target cell if (put == null) { put = new Put(rowKey); @@ -711,6 +721,10 @@ public class SyncTable extends Configured implements Tool { System.err.println(" (defaults to cluster in classpath's config)"); System.err.println(" dryrun if true, output counters but no writes"); System.err.println(" (defaults to false)"); + System.err.println(" doDeletes if false, does not perform deletes"); + System.err.println(" (defaults to true)"); + System.err.println(" doPuts if false, does not perform puts "); + System.err.println(" (defaults to true)"); System.err.println(); System.err.println("Args:"); System.err.println(" sourcehashdir path to HashTable output dir for source table"); @@ -762,6 +776,18 @@ public class SyncTable extends Configured implements Tool { continue; } + final String doDeletesKey = "--doDeletes="; + if (cmd.startsWith(doDeletesKey)) { + doDeletes = Boolean.parseBoolean(cmd.substring(doDeletesKey.length())); + continue; + } + + final String doPutsKey = "--doPuts="; + if (cmd.startsWith(doPutsKey)) { + doPuts = Boolean.parseBoolean(cmd.substring(doPutsKey.length())); + continue; + } + printUsage("Invalid argument '" + cmd + "'"); return false; } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java index 543a169764a..ad02039fb92 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java @@ -74,6 +74,7 @@ public class TestSyncTable { @AfterClass public static void afterClass() throws Exception { + TEST_UTIL.cleanupDataTestDirOnTestFS(); TEST_UTIL.shutdownMiniCluster(); } @@ -105,7 +106,52 @@ public class TestSyncTable { TEST_UTIL.deleteTable(sourceTableName); TEST_UTIL.deleteTable(targetTableName); - TEST_UTIL.cleanupDataTestDirOnTestFS(); + } + + @Test + public void testSyncTableDoDeletesFalse() throws Exception { + final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); + final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); + Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoDeletesFalse"); + + writeTestData(sourceTableName, targetTableName); + hashSourceTable(sourceTableName, testDir); + Counters syncCounters = syncTables(sourceTableName, targetTableName, + testDir, "--doDeletes=false"); + assertTargetDoDeletesFalse(100, sourceTableName, targetTableName); + + assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); + assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); + assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); + assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); + assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); + assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); + + TEST_UTIL.deleteTable(sourceTableName); + TEST_UTIL.deleteTable(targetTableName); + } + + @Test + public void testSyncTableDoPutsFalse() throws Exception { + final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); + final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); + Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoPutsFalse"); + + writeTestData(sourceTableName, targetTableName); + hashSourceTable(sourceTableName, testDir); + Counters syncCounters = syncTables(sourceTableName, targetTableName, + testDir, "--doPuts=false"); + assertTargetDoPutsFalse(70, sourceTableName, targetTableName); + + assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); + assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); + assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); + assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); + assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); + assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); + + TEST_UTIL.deleteTable(sourceTableName); + TEST_UTIL.deleteTable(targetTableName); } private void assertEqualTables(int expectedRows, TableName sourceTableName, @@ -184,14 +230,202 @@ public class TestSyncTable { targetTable.close(); } + private void assertTargetDoDeletesFalse(int expectedRows, TableName + sourceTableName, + TableName targetTableName) + throws Exception { + Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName); + Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName); + + ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); + ResultScanner targetScanner = targetTable.getScanner(new Scan()); + Result targetRow = targetScanner.next(); + Result sourceRow = sourceScanner.next(); + int rowsCount = 0; + while (targetRow!=null) { + rowsCount++; + //only compares values for existing rows, skipping rows existing on + //target only that were not deleted given --doDeletes=false + if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) { + targetRow = targetScanner.next(); + continue; + } + + LOG.debug("SOURCE row: " + (sourceRow == null ? "null" + : Bytes.toInt(sourceRow.getRow())) + + " cells:" + sourceRow); + LOG.debug("TARGET row: " + (targetRow == null ? "null" + : Bytes.toInt(targetRow.getRow())) + + " cells:" + targetRow); + + Cell[] sourceCells = sourceRow.rawCells(); + Cell[] targetCells = targetRow.rawCells(); + int targetRowKey = Bytes.toInt(targetRow.getRow()); + if (targetRowKey >= 70 && targetRowKey < 80) { + if (sourceCells.length == targetCells.length) { + LOG.debug("Source cells: " + Arrays.toString(sourceCells)); + LOG.debug("Target cells: " + Arrays.toString(targetCells)); + Assert.fail("Row " + targetRowKey + " should have more cells in " + + "target than in source"); + } + + } else { + if (sourceCells.length != targetCells.length) { + LOG.debug("Source cells: " + Arrays.toString(sourceCells)); + LOG.debug("Target cells: " + Arrays.toString(targetCells)); + Assert.fail("Row " + Bytes.toInt(sourceRow.getRow()) + + " has " + sourceCells.length + + " cells in source table but " + targetCells.length + + " cells in target table"); + } + } + for (int j = 0; j < sourceCells.length; j++) { + Cell sourceCell = sourceCells[j]; + Cell targetCell = targetCells[j]; + try { + if (!CellUtil.matchingRow(sourceCell, targetCell)) { + Assert.fail("Rows don't match"); + } + if (!CellUtil.matchingFamily(sourceCell, targetCell)) { + Assert.fail("Families don't match"); + } + if (!CellUtil.matchingQualifier(sourceCell, targetCell)) { + Assert.fail("Qualifiers don't match"); + } + if(targetRowKey < 80 && targetRowKey >= 90){ + if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) { + Assert.fail("Timestamps don't match"); + } + } + if (!CellUtil.matchingValue(sourceCell, targetCell)) { + Assert.fail("Values don't match"); + } + } catch (Throwable t) { + LOG.debug("Source cell: " + sourceCell + " target cell: " + + targetCell); + Throwables.propagate(t); + } + } + targetRow = targetScanner.next(); + sourceRow = sourceScanner.next(); + } + assertEquals("Target expected rows does not match.",expectedRows, + rowsCount); + sourceScanner.close(); + targetScanner.close(); + sourceTable.close(); + targetTable.close(); + } + + private void assertTargetDoPutsFalse(int expectedRows, TableName + sourceTableName, + TableName targetTableName) + throws Exception { + Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName); + Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName); + + ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); + ResultScanner targetScanner = targetTable.getScanner(new Scan()); + Result targetRow = targetScanner.next(); + Result sourceRow = sourceScanner.next(); + int rowsCount = 0; + + while (targetRow!=null) { + //only compares values for existing rows, skipping rows existing on + //source only that were not added to target given --doPuts=false + if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) { + sourceRow = sourceScanner.next(); + continue; + } + + LOG.debug("SOURCE row: " + (sourceRow == null ? + "null" : + Bytes.toInt(sourceRow.getRow())) + + " cells:" + sourceRow); + LOG.debug("TARGET row: " + (targetRow == null ? + "null" : + Bytes.toInt(targetRow.getRow())) + + " cells:" + targetRow); + + LOG.debug("rowsCount: " + rowsCount); + + Cell[] sourceCells = sourceRow.rawCells(); + Cell[] targetCells = targetRow.rawCells(); + int targetRowKey = Bytes.toInt(targetRow.getRow()); + if (targetRowKey >= 40 && targetRowKey < 60) { + LOG.debug("Source cells: " + Arrays.toString(sourceCells)); + LOG.debug("Target cells: " + Arrays.toString(targetCells)); + Assert.fail("There shouldn't exist any rows between 40 and 60, since " + + "Puts are disabled and Deletes are enabled."); + } else if (targetRowKey >= 60 && targetRowKey < 70) { + if (sourceCells.length == targetCells.length) { + LOG.debug("Source cells: " + Arrays.toString(sourceCells)); + LOG.debug("Target cells: " + Arrays.toString(targetCells)); + Assert.fail("Row " + Bytes.toInt(sourceRow.getRow()) + + " shouldn't have same number of cells."); + } + } else if (targetRowKey >= 80 && targetRowKey < 90) { + LOG.debug("Source cells: " + Arrays.toString(sourceCells)); + LOG.debug("Target cells: " + Arrays.toString(targetCells)); + Assert.fail("There should be no rows between 80 and 90 on target, as " + + "these had different timestamps and should had been deleted."); + } else if (targetRowKey >= 90 && targetRowKey < 100) { + for (int j = 0; j < sourceCells.length; j++) { + Cell sourceCell = sourceCells[j]; + Cell targetCell = targetCells[j]; + if (CellUtil.matchingValue(sourceCell, targetCell)) { + Assert.fail("Cells values should not match for rows between " + + "90 and 100. Target row id: " + (Bytes.toInt(targetRow + .getRow()))); + } + } + } else { + for (int j = 0; j < sourceCells.length; j++) { + Cell sourceCell = sourceCells[j]; + Cell targetCell = targetCells[j]; + try { + if (!CellUtil.matchingRow(sourceCell, targetCell)) { + Assert.fail("Rows don't match"); + } + if (!CellUtil.matchingFamily(sourceCell, targetCell)) { + Assert.fail("Families don't match"); + } + if (!CellUtil.matchingQualifier(sourceCell, targetCell)) { + Assert.fail("Qualifiers don't match"); + } + if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) { + Assert.fail("Timestamps don't match"); + } + if (!CellUtil.matchingValue(sourceCell, targetCell)) { + Assert.fail("Values don't match"); + } + } catch (Throwable t) { + LOG.debug( + "Source cell: " + sourceCell + " target cell: " + targetCell); + Throwables.propagate(t); + } + } + } + rowsCount++; + targetRow = targetScanner.next(); + sourceRow = sourceScanner.next(); + } + assertEquals("Target expected rows does not match.",expectedRows, + rowsCount); + sourceScanner.close(); + targetScanner.close(); + sourceTable.close(); + targetTable.close(); + } + private Counters syncTables(TableName sourceTableName, TableName targetTableName, - Path testDir) throws Exception { + Path testDir, String... options) throws Exception { SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration()); - int code = syncTable.run(new String[] { - testDir.toString(), - sourceTableName.getNameAsString(), - targetTableName.getNameAsString() - }); + String[] args = Arrays.copyOf(options, options.length+3); + args[options.length] = testDir.toString(); + args[options.length+1] = sourceTableName.getNameAsString(); + args[options.length+2] = targetTableName.getNameAsString(); + int code = syncTable.run(args); assertEquals("sync table job failed", 0, code); LOG.info("Sync tables completed");