HBASE-21675 Port HBASE-21642 (CopyTable by reading snapshot and bulkloading will save a lot of time) to branch-1
HBASE-21642 CopyTable by reading snapshot and bulkloading will save a lot of time Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
cf87877fcf
commit
cd0645a5f6
|
@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.client.Connection;
|
|||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
|
||||
import org.apache.hadoop.hbase.mapreduce.Import.Importer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
|
@ -74,11 +76,39 @@ public class CopyTable extends Configured implements Tool {
|
|||
boolean bulkload = false;
|
||||
Path bulkloadDir = null;
|
||||
|
||||
boolean readingSnapshot = false;
|
||||
String snapshot = null;
|
||||
|
||||
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
|
||||
|
||||
public CopyTable(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
private Path generateUniqTempDir(boolean withDirCreated) throws IOException {
|
||||
FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
|
||||
Path dir = new Path(getConf().get(HConstants.TEMPORARY_FS_DIRECTORY_KEY), NAME);
|
||||
if (!fs.exists(dir)) {
|
||||
fs.mkdirs(dir);
|
||||
}
|
||||
Path newDir = new Path(dir, UUID.randomUUID().toString());
|
||||
if (withDirCreated) {
|
||||
fs.mkdirs(newDir);
|
||||
}
|
||||
return newDir;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes","unchecked"})
|
||||
private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException {
|
||||
Class mapper = bulkload ? KeyValueImporter.class : Importer.class;
|
||||
if (readingSnapshot) {
|
||||
TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true,
|
||||
generateUniqTempDir(true));
|
||||
} else {
|
||||
TableMapReduceUtil.initTableMapperJob(tableName, scan, mapper, null, null, job);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up the actual job.
|
||||
*
|
||||
|
@ -86,14 +116,13 @@ public class CopyTable extends Configured implements Tool {
|
|||
* @return The newly created job.
|
||||
* @throws IOException When setting up the job fails.
|
||||
*/
|
||||
public Job createSubmittableJob(String[] args)
|
||||
throws IOException {
|
||||
public Job createSubmittableJob(String[] args) throws IOException {
|
||||
if (!doCommandLine(args)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
|
||||
|
||||
String jobName = NAME + "_" + (tableName == null ? snapshot : tableName);
|
||||
Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, jobName));
|
||||
job.setJarByClass(CopyTable.class);
|
||||
Scan scan = new Scan();
|
||||
scan.setBatch(batch);
|
||||
|
@ -146,33 +175,20 @@ public class CopyTable extends Configured implements Tool {
|
|||
job.setNumReduceTasks(0);
|
||||
|
||||
if (bulkload) {
|
||||
TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null,
|
||||
null, job);
|
||||
initCopyTableMapperReducerJob(job, scan);
|
||||
|
||||
// We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
|
||||
TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
|
||||
|
||||
FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
|
||||
Random rand = new Random();
|
||||
Path root = new Path(fs.getWorkingDirectory(), "copytable");
|
||||
fs.mkdirs(root);
|
||||
while (true) {
|
||||
bulkloadDir = new Path(root, "" + rand.nextLong());
|
||||
if (!fs.exists(bulkloadDir)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
System.out.println("HFiles will be stored at " + this.bulkloadDir);
|
||||
bulkloadDir = generateUniqTempDir(false);
|
||||
LOG.info("HFiles will be stored at " + this.bulkloadDir);
|
||||
HFileOutputFormat2.setOutputPath(job, bulkloadDir);
|
||||
try (Connection conn = ConnectionFactory.createConnection(getConf());
|
||||
Table htable = conn.getTable(TableName.valueOf(dstTableName))) {
|
||||
HFileOutputFormat2.configureIncrementalLoadMap(job, htable);
|
||||
}
|
||||
} else {
|
||||
TableMapReduceUtil.initTableMapperJob(tableName, scan,
|
||||
Import.Importer.class, null, null, job);
|
||||
|
||||
initCopyTableMapperReducerJob(job, scan);
|
||||
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
|
||||
null);
|
||||
}
|
||||
|
@ -188,7 +204,7 @@ public class CopyTable extends Configured implements Tool {
|
|||
System.err.println("ERROR: " + errorMsg);
|
||||
}
|
||||
System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
|
||||
"[--new.name=NEW] [--peer.adr=ADR] <tablename>");
|
||||
"[--new.name=NEW] [--peer.adr=ADR] <tablename | snapshotName>");
|
||||
System.err.println();
|
||||
System.err.println("Options:");
|
||||
System.err.println(" rs.class hbase.regionserver.class of the peer cluster");
|
||||
|
@ -209,6 +225,7 @@ public class CopyTable extends Configured implements Tool {
|
|||
System.err.println(" all.cells also copy delete markers and deleted cells");
|
||||
System.err.println(" bulkload Write input into HFiles and bulk load to the destination "
|
||||
+ "table");
|
||||
System.err.println(" snapshot Copy the data from snapshot to destination table.");
|
||||
System.err.println();
|
||||
System.err.println("Args:");
|
||||
System.err.println(" tablename Name of the table to copy");
|
||||
|
@ -218,6 +235,12 @@ public class CopyTable extends Configured implements Tool {
|
|||
System.err.println(" $ bin/hbase " +
|
||||
"org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
|
||||
"--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
|
||||
System.err.println(" To copy data from 'sourceTableSnapshot' to 'destTable': ");
|
||||
System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
|
||||
+ "--snapshot --new.name=destTable sourceTableSnapshot");
|
||||
System.err.println(" To copy data from 'sourceTableSnapshot' and bulk load to 'destTable': ");
|
||||
System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
|
||||
+ "--new.name=destTable --snapshot --bulkload sourceTableSnapshot");
|
||||
System.err.println("For performance consider the following general option:\n"
|
||||
+ " It is recommended that you set the following to >=100. A higher value uses more memory but\n"
|
||||
+ " decreases the round trip time to the server and may increase performance.\n"
|
||||
|
@ -228,8 +251,6 @@ public class CopyTable extends Configured implements Tool {
|
|||
}
|
||||
|
||||
private boolean doCommandLine(final String[] args) {
|
||||
// Process command-line args. TODO: Better cmd-line processing
|
||||
// (but hopefully something not as painful as cli options).
|
||||
if (args.length < 1) {
|
||||
printUsage(null);
|
||||
return false;
|
||||
|
@ -317,16 +338,24 @@ public class CopyTable extends Configured implements Tool {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (i == args.length-1) {
|
||||
tableName = cmd;
|
||||
if(cmd.startsWith("--snapshot")){
|
||||
readingSnapshot = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (i == args.length - 1) {
|
||||
if (readingSnapshot) {
|
||||
snapshot = cmd;
|
||||
} else {
|
||||
tableName = cmd;
|
||||
}
|
||||
} else {
|
||||
printUsage("Invalid argument '" + cmd + "'" );
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (dstTableName == null && peerAddress == null) {
|
||||
printUsage("At least a new table name or a " +
|
||||
"peer address must be specified");
|
||||
printUsage("At least a new table name or a peer address must be specified");
|
||||
return false;
|
||||
}
|
||||
if ((endTime != 0) && (startTime > endTime)) {
|
||||
|
@ -339,6 +368,22 @@ public class CopyTable extends Configured implements Tool {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (readingSnapshot && peerAddress != null) {
|
||||
printUsage("Loading data from snapshot to remote peer cluster is not supported.");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (readingSnapshot && dstTableName == null) {
|
||||
printUsage("The --new.name=<table> for destination table should be "
|
||||
+ "provided when copying data from snapshot .");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (readingSnapshot && snapshot == null) {
|
||||
printUsage("Snapshot shouldn't be null when --snapshot is enabled.");
|
||||
return false;
|
||||
}
|
||||
|
||||
// set dstTableName if necessary
|
||||
if (dstTableName == null) {
|
||||
dstTableName = tableName;
|
||||
|
@ -376,8 +421,11 @@ public class CopyTable extends Configured implements Tool {
|
|||
}
|
||||
int code = 0;
|
||||
if (bulkload) {
|
||||
code = new LoadIncrementalHFiles(this.getConf()).run(new String[]{this.bulkloadDir.toString(),
|
||||
this.dstTableName});
|
||||
LOG.info("Trying to bulk load data to destination table: " + dstTableName
|
||||
+ ", command: ./bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles "
|
||||
+ bulkloadDir.toString() + " " + dstTableName);
|
||||
code = new LoadIncrementalHFiles(this.getConf())
|
||||
.run(new String[] { this.bulkloadDir.toString(), this.dstTableName });
|
||||
if (code == 0) {
|
||||
// bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun
|
||||
// LoadIncrementalHFiles.
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.io.IOException;
|
|||
import java.io.PrintStream;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
|
@ -38,9 +39,9 @@ import org.apache.hadoop.hbase.client.Result;
|
|||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
@ -77,40 +78,30 @@ public class TestCopyTable {
|
|||
final byte[] FAMILY = Bytes.toBytes("family");
|
||||
final byte[] COLUMN1 = Bytes.toBytes("c1");
|
||||
|
||||
Table t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY);
|
||||
Table t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY);
|
||||
try (Table t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY);
|
||||
Table t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY);) {
|
||||
// put rows into the first table
|
||||
loadData(t1, FAMILY, COLUMN1);
|
||||
|
||||
// put rows into the first table
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Put p = new Put(Bytes.toBytes("row" + i));
|
||||
p.add(FAMILY, COLUMN1, COLUMN1);
|
||||
t1.put(p);
|
||||
CopyTable copy = new CopyTable(TEST_UTIL.getConfiguration());
|
||||
int code;
|
||||
if (bulkload) {
|
||||
code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()),
|
||||
copy, new String[] { "--new.name=" + TABLENAME2.getNameAsString(),
|
||||
"--bulkload", TABLENAME1.getNameAsString() });
|
||||
} else {
|
||||
code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()),
|
||||
copy, new String[] { "--new.name=" + TABLENAME2.getNameAsString(),
|
||||
TABLENAME1.getNameAsString() });
|
||||
}
|
||||
assertEquals("copy job failed", 0, code);
|
||||
|
||||
// verify the data was copied into table 2
|
||||
verifyRows(t2, FAMILY, COLUMN1);
|
||||
} finally {
|
||||
TEST_UTIL.deleteTable(TABLENAME1);
|
||||
TEST_UTIL.deleteTable(TABLENAME2);
|
||||
}
|
||||
|
||||
CopyTable copy = new CopyTable(TEST_UTIL.getConfiguration());
|
||||
|
||||
int code;
|
||||
if (bulkload) {
|
||||
code = copy.run(new String[] { "--new.name=" + TABLENAME2.getNameAsString(),
|
||||
"--bulkload", TABLENAME1.getNameAsString() });
|
||||
} else {
|
||||
code = copy.run(new String[] { "--new.name=" + TABLENAME2.getNameAsString(),
|
||||
TABLENAME1.getNameAsString() });
|
||||
}
|
||||
assertEquals("copy job failed", 0, code);
|
||||
|
||||
// verify the data was copied into table 2
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Get g = new Get(Bytes.toBytes("row" + i));
|
||||
Result r = t2.get(g);
|
||||
assertEquals(1, r.size());
|
||||
assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN1));
|
||||
}
|
||||
|
||||
t1.close();
|
||||
t2.close();
|
||||
TEST_UTIL.deleteTable(TABLENAME1);
|
||||
TEST_UTIL.deleteTable(TABLENAME2);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -251,14 +242,78 @@ public class TestCopyTable {
|
|||
assertTrue(data.toString().contains("Usage:"));
|
||||
}
|
||||
|
||||
private boolean runCopy(String[] args) throws IOException, InterruptedException,
|
||||
ClassNotFoundException {
|
||||
GenericOptionsParser opts = new GenericOptionsParser(
|
||||
new Configuration(TEST_UTIL.getConfiguration()), args);
|
||||
Configuration configuration = opts.getConfiguration();
|
||||
args = opts.getRemainingArgs();
|
||||
Job job = new CopyTable(configuration).createSubmittableJob(args);
|
||||
job.waitForCompletion(false);
|
||||
return job.isSuccessful();
|
||||
private boolean runCopy(String[] args) throws Exception {
|
||||
CopyTable copy = new CopyTable(TEST_UTIL.getConfiguration());
|
||||
int code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), copy, args);
|
||||
return code == 0;
|
||||
}
|
||||
|
||||
private void loadData(Table t, byte[] family, byte[] column) throws IOException {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
byte[] row = Bytes.toBytes("row" + i);
|
||||
Put p = new Put(row);
|
||||
p.addColumn(family, column, row);
|
||||
t.put(p);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyRows(Table t, byte[] family, byte[] column) throws IOException {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
byte[] row = Bytes.toBytes("row" + i);
|
||||
Get g = new Get(row).addFamily(family);
|
||||
Result r = t.get(g);
|
||||
Assert.assertNotNull(r);
|
||||
Assert.assertEquals(1, r.size());
|
||||
Cell cell = r.rawCells()[0];
|
||||
Assert.assertTrue(CellUtil.matchingQualifier(cell, column));
|
||||
Assert.assertEquals(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(),
|
||||
cell.getValueLength(), row, 0, row.length), 0);
|
||||
}
|
||||
}
|
||||
|
||||
private void testCopyTableBySnapshot(String tablePrefix, boolean bulkLoad)
|
||||
throws Exception {
|
||||
TableName table1 = TableName.valueOf(tablePrefix + 1);
|
||||
TableName table2 = TableName.valueOf(tablePrefix + 2);
|
||||
Table t1 = TEST_UTIL.createTable(table1, FAMILY_A);
|
||||
Table t2 = TEST_UTIL.createTable(table2, FAMILY_A);
|
||||
loadData(t1, FAMILY_A, Bytes.toBytes("qualifier"));
|
||||
String snapshot = tablePrefix + "_snapshot";
|
||||
TEST_UTIL.getHBaseAdmin().snapshot(snapshot, table1);
|
||||
boolean success;
|
||||
if (bulkLoad) {
|
||||
success =
|
||||
runCopy(new String[] { "--snapshot", "--new.name=" + table2, "--bulkload", snapshot });
|
||||
} else {
|
||||
success = runCopy(new String[] { "--snapshot", "--new.name=" + table2, snapshot });
|
||||
}
|
||||
Assert.assertTrue(success);
|
||||
verifyRows(t2, FAMILY_A, Bytes.toBytes("qualifier"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadingSnapshotToTable() throws Exception {
|
||||
testCopyTableBySnapshot("testLoadingSnapshotToTable", false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadingSnapshotAndBulkLoadToTable() throws Exception {
|
||||
testCopyTableBySnapshot("testLoadingSnapshotAndBulkLoadToTable", true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadingSnapshotToRemoteCluster() throws Exception {
|
||||
Assert.assertFalse(runCopy(
|
||||
new String[] { "--snapshot", "--peerAdr=hbase://remoteHBase", "sourceSnapshotName" }));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadingSnapshotWithoutSnapshotName() throws Exception {
|
||||
Assert.assertFalse(runCopy(new String[] { "--snapshot", "--peerAdr=hbase://remoteHBase" }));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadingSnapshotWithoutDestTable() throws Exception {
|
||||
Assert.assertFalse(runCopy(new String[] { "--snapshot", "sourceSnapshotName" }));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue