HBASE-20305 adding options to skip deletes/puts on target when running SyncTable
Signed-off-by: tedyu <yuzhihong@gmail.com> Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
696298bfac
commit
5a987ec682
|
@ -64,7 +64,9 @@ public class SyncTable extends Configured implements Tool {
|
||||||
static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
|
static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
|
||||||
static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
|
static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
|
||||||
static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
|
static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
|
||||||
static final String DRY_RUN_CONF_KEY="sync.table.dry.run";
|
static final String DRY_RUN_CONF_KEY = "sync.table.dry.run";
|
||||||
|
static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes";
|
||||||
|
static final String DO_PUTS_CONF_KEY = "sync.table.do.puts";
|
||||||
|
|
||||||
Path sourceHashDir;
|
Path sourceHashDir;
|
||||||
String sourceTableName;
|
String sourceTableName;
|
||||||
|
@ -73,6 +75,8 @@ public class SyncTable extends Configured implements Tool {
|
||||||
String sourceZkCluster;
|
String sourceZkCluster;
|
||||||
String targetZkCluster;
|
String targetZkCluster;
|
||||||
boolean dryRun;
|
boolean dryRun;
|
||||||
|
boolean doDeletes = true;
|
||||||
|
boolean doPuts = true;
|
||||||
|
|
||||||
Counters counters;
|
Counters counters;
|
||||||
|
|
||||||
|
@ -144,6 +148,8 @@ public class SyncTable extends Configured implements Tool {
|
||||||
initCredentialsForHBase(targetZkCluster, job);
|
initCredentialsForHBase(targetZkCluster, job);
|
||||||
}
|
}
|
||||||
jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
|
jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
|
||||||
|
jobConf.setBoolean(DO_DELETES_CONF_KEY, doDeletes);
|
||||||
|
jobConf.setBoolean(DO_PUTS_CONF_KEY, doPuts);
|
||||||
|
|
||||||
TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(),
|
TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(),
|
||||||
SyncMapper.class, null, null, job);
|
SyncMapper.class, null, null, job);
|
||||||
|
@ -178,6 +184,8 @@ public class SyncTable extends Configured implements Tool {
|
||||||
Table sourceTable;
|
Table sourceTable;
|
||||||
Table targetTable;
|
Table targetTable;
|
||||||
boolean dryRun;
|
boolean dryRun;
|
||||||
|
boolean doDeletes = true;
|
||||||
|
boolean doPuts = true;
|
||||||
|
|
||||||
HashTable.TableHash sourceTableHash;
|
HashTable.TableHash sourceTableHash;
|
||||||
HashTable.TableHash.Reader sourceHashReader;
|
HashTable.TableHash.Reader sourceHashReader;
|
||||||
|
@ -201,7 +209,9 @@ public class SyncTable extends Configured implements Tool {
|
||||||
TableOutputFormat.OUTPUT_CONF_PREFIX);
|
TableOutputFormat.OUTPUT_CONF_PREFIX);
|
||||||
sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
|
sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
|
||||||
targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
|
targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
|
||||||
dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
|
dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
|
||||||
|
doDeletes = conf.getBoolean(DO_DELETES_CONF_KEY, true);
|
||||||
|
doPuts = conf.getBoolean(DO_PUTS_CONF_KEY, true);
|
||||||
|
|
||||||
sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
|
sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
|
||||||
LOG.info("Read source hash manifest: " + sourceTableHash);
|
LOG.info("Read source hash manifest: " + sourceTableHash);
|
||||||
|
@ -488,7 +498,7 @@ public class SyncTable extends Configured implements Tool {
|
||||||
context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
|
context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
|
||||||
matchingRow = false;
|
matchingRow = false;
|
||||||
|
|
||||||
if (!dryRun) {
|
if (!dryRun && doPuts) {
|
||||||
if (put == null) {
|
if (put == null) {
|
||||||
put = new Put(rowKey);
|
put = new Put(rowKey);
|
||||||
}
|
}
|
||||||
|
@ -503,7 +513,7 @@ public class SyncTable extends Configured implements Tool {
|
||||||
context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
|
context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
|
||||||
matchingRow = false;
|
matchingRow = false;
|
||||||
|
|
||||||
if (!dryRun) {
|
if (!dryRun && doDeletes) {
|
||||||
if (delete == null) {
|
if (delete == null) {
|
||||||
delete = new Delete(rowKey);
|
delete = new Delete(rowKey);
|
||||||
}
|
}
|
||||||
|
@ -530,7 +540,7 @@ public class SyncTable extends Configured implements Tool {
|
||||||
context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
|
context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
|
||||||
matchingRow = false;
|
matchingRow = false;
|
||||||
|
|
||||||
if (!dryRun) {
|
if (!dryRun && doPuts) {
|
||||||
// overwrite target cell
|
// overwrite target cell
|
||||||
if (put == null) {
|
if (put == null) {
|
||||||
put = new Put(rowKey);
|
put = new Put(rowKey);
|
||||||
|
@ -711,6 +721,10 @@ public class SyncTable extends Configured implements Tool {
|
||||||
System.err.println(" (defaults to cluster in classpath's config)");
|
System.err.println(" (defaults to cluster in classpath's config)");
|
||||||
System.err.println(" dryrun if true, output counters but no writes");
|
System.err.println(" dryrun if true, output counters but no writes");
|
||||||
System.err.println(" (defaults to false)");
|
System.err.println(" (defaults to false)");
|
||||||
|
System.err.println(" doDeletes if false, does not perform deletes");
|
||||||
|
System.err.println(" (defaults to true)");
|
||||||
|
System.err.println(" doPuts if false, does not perform puts ");
|
||||||
|
System.err.println(" (defaults to true)");
|
||||||
System.err.println();
|
System.err.println();
|
||||||
System.err.println("Args:");
|
System.err.println("Args:");
|
||||||
System.err.println(" sourcehashdir path to HashTable output dir for source table");
|
System.err.println(" sourcehashdir path to HashTable output dir for source table");
|
||||||
|
@ -762,6 +776,18 @@ public class SyncTable extends Configured implements Tool {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final String doDeletesKey = "--doDeletes=";
|
||||||
|
if (cmd.startsWith(doDeletesKey)) {
|
||||||
|
doDeletes = Boolean.parseBoolean(cmd.substring(doDeletesKey.length()));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
final String doPutsKey = "--doPuts=";
|
||||||
|
if (cmd.startsWith(doPutsKey)) {
|
||||||
|
doPuts = Boolean.parseBoolean(cmd.substring(doPutsKey.length()));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
printUsage("Invalid argument '" + cmd + "'");
|
printUsage("Invalid argument '" + cmd + "'");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,6 +74,7 @@ public class TestSyncTable {
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void afterClass() throws Exception {
|
public static void afterClass() throws Exception {
|
||||||
|
TEST_UTIL.cleanupDataTestDirOnTestFS();
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,7 +106,52 @@ public class TestSyncTable {
|
||||||
|
|
||||||
TEST_UTIL.deleteTable(sourceTableName);
|
TEST_UTIL.deleteTable(sourceTableName);
|
||||||
TEST_UTIL.deleteTable(targetTableName);
|
TEST_UTIL.deleteTable(targetTableName);
|
||||||
TEST_UTIL.cleanupDataTestDirOnTestFS();
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSyncTableDoDeletesFalse() throws Exception {
|
||||||
|
final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
|
||||||
|
final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
|
||||||
|
Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoDeletesFalse");
|
||||||
|
|
||||||
|
writeTestData(sourceTableName, targetTableName);
|
||||||
|
hashSourceTable(sourceTableName, testDir);
|
||||||
|
Counters syncCounters = syncTables(sourceTableName, targetTableName,
|
||||||
|
testDir, "--doDeletes=false");
|
||||||
|
assertTargetDoDeletesFalse(100, sourceTableName, targetTableName);
|
||||||
|
|
||||||
|
assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
|
||||||
|
assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
|
||||||
|
assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
|
||||||
|
assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
|
||||||
|
assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
|
||||||
|
assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
|
||||||
|
|
||||||
|
TEST_UTIL.deleteTable(sourceTableName);
|
||||||
|
TEST_UTIL.deleteTable(targetTableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSyncTableDoPutsFalse() throws Exception {
|
||||||
|
final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
|
||||||
|
final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
|
||||||
|
Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoPutsFalse");
|
||||||
|
|
||||||
|
writeTestData(sourceTableName, targetTableName);
|
||||||
|
hashSourceTable(sourceTableName, testDir);
|
||||||
|
Counters syncCounters = syncTables(sourceTableName, targetTableName,
|
||||||
|
testDir, "--doPuts=false");
|
||||||
|
assertTargetDoPutsFalse(70, sourceTableName, targetTableName);
|
||||||
|
|
||||||
|
assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
|
||||||
|
assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
|
||||||
|
assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
|
||||||
|
assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
|
||||||
|
assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
|
||||||
|
assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
|
||||||
|
|
||||||
|
TEST_UTIL.deleteTable(sourceTableName);
|
||||||
|
TEST_UTIL.deleteTable(targetTableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertEqualTables(int expectedRows, TableName sourceTableName,
|
private void assertEqualTables(int expectedRows, TableName sourceTableName,
|
||||||
|
@ -184,14 +230,202 @@ public class TestSyncTable {
|
||||||
targetTable.close();
|
targetTable.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void assertTargetDoDeletesFalse(int expectedRows, TableName
|
||||||
|
sourceTableName,
|
||||||
|
TableName targetTableName)
|
||||||
|
throws Exception {
|
||||||
|
Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
|
||||||
|
Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
|
||||||
|
|
||||||
|
ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
|
||||||
|
ResultScanner targetScanner = targetTable.getScanner(new Scan());
|
||||||
|
Result targetRow = targetScanner.next();
|
||||||
|
Result sourceRow = sourceScanner.next();
|
||||||
|
int rowsCount = 0;
|
||||||
|
while (targetRow!=null) {
|
||||||
|
rowsCount++;
|
||||||
|
//only compares values for existing rows, skipping rows existing on
|
||||||
|
//target only that were not deleted given --doDeletes=false
|
||||||
|
if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) {
|
||||||
|
targetRow = targetScanner.next();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.debug("SOURCE row: " + (sourceRow == null ? "null"
|
||||||
|
: Bytes.toInt(sourceRow.getRow()))
|
||||||
|
+ " cells:" + sourceRow);
|
||||||
|
LOG.debug("TARGET row: " + (targetRow == null ? "null"
|
||||||
|
: Bytes.toInt(targetRow.getRow()))
|
||||||
|
+ " cells:" + targetRow);
|
||||||
|
|
||||||
|
Cell[] sourceCells = sourceRow.rawCells();
|
||||||
|
Cell[] targetCells = targetRow.rawCells();
|
||||||
|
int targetRowKey = Bytes.toInt(targetRow.getRow());
|
||||||
|
if (targetRowKey >= 70 && targetRowKey < 80) {
|
||||||
|
if (sourceCells.length == targetCells.length) {
|
||||||
|
LOG.debug("Source cells: " + Arrays.toString(sourceCells));
|
||||||
|
LOG.debug("Target cells: " + Arrays.toString(targetCells));
|
||||||
|
Assert.fail("Row " + targetRowKey + " should have more cells in "
|
||||||
|
+ "target than in source");
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
if (sourceCells.length != targetCells.length) {
|
||||||
|
LOG.debug("Source cells: " + Arrays.toString(sourceCells));
|
||||||
|
LOG.debug("Target cells: " + Arrays.toString(targetCells));
|
||||||
|
Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
|
||||||
|
+ " has " + sourceCells.length
|
||||||
|
+ " cells in source table but " + targetCells.length
|
||||||
|
+ " cells in target table");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int j = 0; j < sourceCells.length; j++) {
|
||||||
|
Cell sourceCell = sourceCells[j];
|
||||||
|
Cell targetCell = targetCells[j];
|
||||||
|
try {
|
||||||
|
if (!CellUtil.matchingRow(sourceCell, targetCell)) {
|
||||||
|
Assert.fail("Rows don't match");
|
||||||
|
}
|
||||||
|
if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
|
||||||
|
Assert.fail("Families don't match");
|
||||||
|
}
|
||||||
|
if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
|
||||||
|
Assert.fail("Qualifiers don't match");
|
||||||
|
}
|
||||||
|
if(targetRowKey < 80 && targetRowKey >= 90){
|
||||||
|
if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
|
||||||
|
Assert.fail("Timestamps don't match");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!CellUtil.matchingValue(sourceCell, targetCell)) {
|
||||||
|
Assert.fail("Values don't match");
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.debug("Source cell: " + sourceCell + " target cell: "
|
||||||
|
+ targetCell);
|
||||||
|
Throwables.propagate(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
targetRow = targetScanner.next();
|
||||||
|
sourceRow = sourceScanner.next();
|
||||||
|
}
|
||||||
|
assertEquals("Target expected rows does not match.",expectedRows,
|
||||||
|
rowsCount);
|
||||||
|
sourceScanner.close();
|
||||||
|
targetScanner.close();
|
||||||
|
sourceTable.close();
|
||||||
|
targetTable.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertTargetDoPutsFalse(int expectedRows, TableName
|
||||||
|
sourceTableName,
|
||||||
|
TableName targetTableName)
|
||||||
|
throws Exception {
|
||||||
|
Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
|
||||||
|
Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
|
||||||
|
|
||||||
|
ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
|
||||||
|
ResultScanner targetScanner = targetTable.getScanner(new Scan());
|
||||||
|
Result targetRow = targetScanner.next();
|
||||||
|
Result sourceRow = sourceScanner.next();
|
||||||
|
int rowsCount = 0;
|
||||||
|
|
||||||
|
while (targetRow!=null) {
|
||||||
|
//only compares values for existing rows, skipping rows existing on
|
||||||
|
//source only that were not added to target given --doPuts=false
|
||||||
|
if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) {
|
||||||
|
sourceRow = sourceScanner.next();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.debug("SOURCE row: " + (sourceRow == null ?
|
||||||
|
"null" :
|
||||||
|
Bytes.toInt(sourceRow.getRow()))
|
||||||
|
+ " cells:" + sourceRow);
|
||||||
|
LOG.debug("TARGET row: " + (targetRow == null ?
|
||||||
|
"null" :
|
||||||
|
Bytes.toInt(targetRow.getRow()))
|
||||||
|
+ " cells:" + targetRow);
|
||||||
|
|
||||||
|
LOG.debug("rowsCount: " + rowsCount);
|
||||||
|
|
||||||
|
Cell[] sourceCells = sourceRow.rawCells();
|
||||||
|
Cell[] targetCells = targetRow.rawCells();
|
||||||
|
int targetRowKey = Bytes.toInt(targetRow.getRow());
|
||||||
|
if (targetRowKey >= 40 && targetRowKey < 60) {
|
||||||
|
LOG.debug("Source cells: " + Arrays.toString(sourceCells));
|
||||||
|
LOG.debug("Target cells: " + Arrays.toString(targetCells));
|
||||||
|
Assert.fail("There shouldn't exist any rows between 40 and 60, since "
|
||||||
|
+ "Puts are disabled and Deletes are enabled.");
|
||||||
|
} else if (targetRowKey >= 60 && targetRowKey < 70) {
|
||||||
|
if (sourceCells.length == targetCells.length) {
|
||||||
|
LOG.debug("Source cells: " + Arrays.toString(sourceCells));
|
||||||
|
LOG.debug("Target cells: " + Arrays.toString(targetCells));
|
||||||
|
Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
|
||||||
|
+ " shouldn't have same number of cells.");
|
||||||
|
}
|
||||||
|
} else if (targetRowKey >= 80 && targetRowKey < 90) {
|
||||||
|
LOG.debug("Source cells: " + Arrays.toString(sourceCells));
|
||||||
|
LOG.debug("Target cells: " + Arrays.toString(targetCells));
|
||||||
|
Assert.fail("There should be no rows between 80 and 90 on target, as "
|
||||||
|
+ "these had different timestamps and should had been deleted.");
|
||||||
|
} else if (targetRowKey >= 90 && targetRowKey < 100) {
|
||||||
|
for (int j = 0; j < sourceCells.length; j++) {
|
||||||
|
Cell sourceCell = sourceCells[j];
|
||||||
|
Cell targetCell = targetCells[j];
|
||||||
|
if (CellUtil.matchingValue(sourceCell, targetCell)) {
|
||||||
|
Assert.fail("Cells values should not match for rows between "
|
||||||
|
+ "90 and 100. Target row id: " + (Bytes.toInt(targetRow
|
||||||
|
.getRow())));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (int j = 0; j < sourceCells.length; j++) {
|
||||||
|
Cell sourceCell = sourceCells[j];
|
||||||
|
Cell targetCell = targetCells[j];
|
||||||
|
try {
|
||||||
|
if (!CellUtil.matchingRow(sourceCell, targetCell)) {
|
||||||
|
Assert.fail("Rows don't match");
|
||||||
|
}
|
||||||
|
if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
|
||||||
|
Assert.fail("Families don't match");
|
||||||
|
}
|
||||||
|
if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
|
||||||
|
Assert.fail("Qualifiers don't match");
|
||||||
|
}
|
||||||
|
if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
|
||||||
|
Assert.fail("Timestamps don't match");
|
||||||
|
}
|
||||||
|
if (!CellUtil.matchingValue(sourceCell, targetCell)) {
|
||||||
|
Assert.fail("Values don't match");
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.debug(
|
||||||
|
"Source cell: " + sourceCell + " target cell: " + targetCell);
|
||||||
|
Throwables.propagate(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rowsCount++;
|
||||||
|
targetRow = targetScanner.next();
|
||||||
|
sourceRow = sourceScanner.next();
|
||||||
|
}
|
||||||
|
assertEquals("Target expected rows does not match.",expectedRows,
|
||||||
|
rowsCount);
|
||||||
|
sourceScanner.close();
|
||||||
|
targetScanner.close();
|
||||||
|
sourceTable.close();
|
||||||
|
targetTable.close();
|
||||||
|
}
|
||||||
|
|
||||||
private Counters syncTables(TableName sourceTableName, TableName targetTableName,
|
private Counters syncTables(TableName sourceTableName, TableName targetTableName,
|
||||||
Path testDir) throws Exception {
|
Path testDir, String... options) throws Exception {
|
||||||
SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration());
|
SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration());
|
||||||
int code = syncTable.run(new String[] {
|
String[] args = Arrays.copyOf(options, options.length+3);
|
||||||
testDir.toString(),
|
args[options.length] = testDir.toString();
|
||||||
sourceTableName.getNameAsString(),
|
args[options.length+1] = sourceTableName.getNameAsString();
|
||||||
targetTableName.getNameAsString()
|
args[options.length+2] = targetTableName.getNameAsString();
|
||||||
});
|
int code = syncTable.run(args);
|
||||||
assertEquals("sync table job failed", 0, code);
|
assertEquals("sync table job failed", 0, code);
|
||||||
|
|
||||||
LOG.info("Sync tables completed");
|
LOG.info("Sync tables completed");
|
||||||
|
|
Loading…
Reference in New Issue