HBASE-21642 CopyTable by reading snapshot and bulkloading will save a lot of time

This commit is contained in:
huzheng 2018-12-26 16:17:55 +08:00
parent c552088877
commit c2d5991b82
4 changed files with 187 additions and 48 deletions

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.UUID;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -29,6 +29,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
import org.apache.hadoop.hbase.mapreduce.Import.Importer;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -70,8 +72,34 @@ public class CopyTable extends Configured implements Tool {
boolean bulkload = false; boolean bulkload = false;
Path bulkloadDir = null; Path bulkloadDir = null;
boolean readingSnapshot = false;
String snapshot = null;
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
private Path generateUniqTempDir(boolean withDirCreated) throws IOException {
FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
Path dir = new Path(fs.getWorkingDirectory(), NAME);
if (!fs.exists(dir)) {
fs.mkdirs(dir);
}
Path newDir = new Path(dir, UUID.randomUUID().toString());
if (withDirCreated) {
fs.mkdirs(newDir);
}
return newDir;
}
private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException {
Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class;
if (readingSnapshot) {
TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true,
generateUniqTempDir(true));
} else {
TableMapReduceUtil.initTableMapperJob(tableName, scan, mapper, null, null, job);
}
}
/** /**
* Sets up the actual job. * Sets up the actual job.
* *
@ -79,13 +107,13 @@ public class CopyTable extends Configured implements Tool {
* @return The newly created job. * @return The newly created job.
* @throws IOException When setting up the job fails. * @throws IOException When setting up the job fails.
*/ */
public Job createSubmittableJob(String[] args) public Job createSubmittableJob(String[] args) throws IOException {
throws IOException {
if (!doCommandLine(args)) { if (!doCommandLine(args)) {
return null; return null;
} }
Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); String jobName = NAME + "_" + (tableName == null ? snapshot : tableName);
Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, jobName));
job.setJarByClass(CopyTable.class); job.setJarByClass(CopyTable.class);
Scan scan = new Scan(); Scan scan = new Scan();
@ -107,15 +135,15 @@ public class CopyTable extends Configured implements Tool {
job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true"); job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
} }
if (versions >= 0) { if (versions >= 0) {
scan.setMaxVersions(versions); scan.readVersions(versions);
} }
if (startRow != null) { if (startRow != null) {
scan.setStartRow(Bytes.toBytesBinary(startRow)); scan.withStartRow(Bytes.toBytesBinary(startRow));
} }
if (stopRow != null) { if (stopRow != null) {
scan.setStopRow(Bytes.toBytesBinary(stopRow)); scan.withStopRow(Bytes.toBytesBinary(stopRow));
} }
if(families != null) { if(families != null) {
@ -140,24 +168,13 @@ public class CopyTable extends Configured implements Tool {
job.setNumReduceTasks(0); job.setNumReduceTasks(0);
if (bulkload) { if (bulkload) {
TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.CellImporter.class, null, null, initCopyTableMapperReducerJob(job, scan);
job);
// We need to split the inputs by destination tables so that output of Map can be bulk-loaded. // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName)); TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
FileSystem fs = FSUtils.getCurrentFileSystem(getConf()); bulkloadDir = generateUniqTempDir(false);
Random rand = new Random(); LOG.info("HFiles will be stored at " + this.bulkloadDir);
Path root = new Path(fs.getWorkingDirectory(), "copytable");
fs.mkdirs(root);
while (true) {
bulkloadDir = new Path(root, "" + rand.nextLong());
if (!fs.exists(bulkloadDir)) {
break;
}
}
System.out.println("HFiles will be stored at " + this.bulkloadDir);
HFileOutputFormat2.setOutputPath(job, bulkloadDir); HFileOutputFormat2.setOutputPath(job, bulkloadDir);
try (Connection conn = ConnectionFactory.createConnection(getConf()); try (Connection conn = ConnectionFactory.createConnection(getConf());
Admin admin = conn.getAdmin()) { Admin admin = conn.getAdmin()) {
@ -165,9 +182,7 @@ public class CopyTable extends Configured implements Tool {
admin.getDescriptor((TableName.valueOf(dstTableName)))); admin.getDescriptor((TableName.valueOf(dstTableName))));
} }
} else { } else {
TableMapReduceUtil.initTableMapperJob(tableName, scan, initCopyTableMapperReducerJob(job, scan);
Import.Importer.class, null, null, job);
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null, TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
null); null);
} }
@ -183,7 +198,7 @@ public class CopyTable extends Configured implements Tool {
System.err.println("ERROR: " + errorMsg); System.err.println("ERROR: " + errorMsg);
} }
System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " + System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
"[--new.name=NEW] [--peer.adr=ADR] <tablename>"); "[--new.name=NEW] [--peer.adr=ADR] <tablename | snapshotName>");
System.err.println(); System.err.println();
System.err.println("Options:"); System.err.println("Options:");
System.err.println(" rs.class hbase.regionserver.class of the peer cluster"); System.err.println(" rs.class hbase.regionserver.class of the peer cluster");
@ -205,6 +220,7 @@ public class CopyTable extends Configured implements Tool {
System.err.println(" all.cells also copy delete markers and deleted cells"); System.err.println(" all.cells also copy delete markers and deleted cells");
System.err.println(" bulkload Write input into HFiles and bulk load to the destination " System.err.println(" bulkload Write input into HFiles and bulk load to the destination "
+ "table"); + "table");
System.err.println(" snapshot Copy the data from snapshot to destination table.");
System.err.println(); System.err.println();
System.err.println("Args:"); System.err.println("Args:");
System.err.println(" tablename Name of the table to copy"); System.err.println(" tablename Name of the table to copy");
@ -214,6 +230,12 @@ public class CopyTable extends Configured implements Tool {
System.err.println(" $ hbase " + System.err.println(" $ hbase " +
"org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " + "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
"--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable "); "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
System.err.println(" To copy data from 'sourceTableSnapshot' to 'destTable': ");
System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
+ "--snapshot --new.name=destTable sourceTableSnapshot");
System.err.println(" To copy data from 'sourceTableSnapshot' and bulk load to 'destTable': ");
System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable "
+ "--new.name=destTable --snapshot --bulkload sourceTableSnapshot");
System.err.println("For performance consider the following general option:\n" System.err.println("For performance consider the following general option:\n"
+ " It is recommended that you set the following to >=100. A higher value uses more memory but\n" + " It is recommended that you set the following to >=100. A higher value uses more memory but\n"
+ " decreases the round trip time to the server and may increase performance.\n" + " decreases the round trip time to the server and may increase performance.\n"
@ -224,8 +246,6 @@ public class CopyTable extends Configured implements Tool {
} }
private boolean doCommandLine(final String[] args) { private boolean doCommandLine(final String[] args) {
// Process command-line args. TODO: Better cmd-line processing
// (but hopefully something not as painful as cli options).
if (args.length < 1) { if (args.length < 1) {
printUsage(null); printUsage(null);
return false; return false;
@ -313,16 +333,24 @@ public class CopyTable extends Configured implements Tool {
continue; continue;
} }
if (i == args.length-1) { if(cmd.startsWith("--snapshot")){
tableName = cmd; readingSnapshot = true;
continue;
}
if (i == args.length - 1) {
if (readingSnapshot) {
snapshot = cmd;
} else {
tableName = cmd;
}
} else { } else {
printUsage("Invalid argument '" + cmd + "'"); printUsage("Invalid argument '" + cmd + "'");
return false; return false;
} }
} }
if (dstTableName == null && peerAddress == null) { if (dstTableName == null && peerAddress == null) {
printUsage("At least a new table name or a " + printUsage("At least a new table name or a peer address must be specified");
"peer address must be specified");
return false; return false;
} }
if ((endTime != 0) && (startTime > endTime)) { if ((endTime != 0) && (startTime > endTime)) {
@ -335,6 +363,22 @@ public class CopyTable extends Configured implements Tool {
return false; return false;
} }
if (readingSnapshot && peerAddress != null) {
printUsage("Loading data from snapshot to remote peer cluster is not supported.");
return false;
}
if (readingSnapshot && dstTableName == null) {
printUsage("The --new.name=<table> for destination table should be "
+ "provided when copying data from snapshot .");
return false;
}
if (readingSnapshot && snapshot == null) {
printUsage("Snapshot shouldn't be null when --snapshot is enabled.");
return false;
}
// set dstTableName if necessary // set dstTableName if necessary
if (dstTableName == null) { if (dstTableName == null) {
dstTableName = tableName; dstTableName = tableName;
@ -371,6 +415,9 @@ public class CopyTable extends Configured implements Tool {
} }
int code = 0; int code = 0;
if (bulkload) { if (bulkload) {
LOG.info("Trying to bulk load data to destination table: " + dstTableName);
LOG.info("command: ./bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles {} {}",
this.bulkloadDir.toString(), this.dstTableName);
code = new LoadIncrementalHFiles(this.getConf()) code = new LoadIncrementalHFiles(this.getConf())
.run(new String[] { this.bulkloadDir.toString(), this.dstTableName }); .run(new String[] { this.bulkloadDir.toString(), this.dstTableName });
if (code == 0) { if (code == 0) {

View File

@ -24,12 +24,15 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
@ -45,6 +48,7 @@ import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Rule; import org.junit.Rule;
@ -93,14 +97,9 @@ public class TestCopyTable {
try (Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); try (Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
Table t2 = TEST_UTIL.createTable(tableName2, FAMILY)) { Table t2 = TEST_UTIL.createTable(tableName2, FAMILY)) {
// put rows into the first table // put rows into the first table
for (int i = 0; i < 10; i++) { loadData(t1, FAMILY, COLUMN1);
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(FAMILY, COLUMN1, COLUMN1);
t1.put(p);
}
CopyTable copy = new CopyTable(); CopyTable copy = new CopyTable();
int code; int code;
if (bulkload) { if (bulkload) {
code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()),
@ -114,12 +113,7 @@ public class TestCopyTable {
assertEquals("copy job failed", 0, code); assertEquals("copy job failed", 0, code);
// verify the data was copied into table 2 // verify the data was copied into table 2
for (int i = 0; i < 10; i++) { verifyRows(t2, FAMILY, COLUMN1);
Get g = new Get(Bytes.toBytes("row" + i));
Result r = t2.get(g);
assertEquals(1, r.size());
assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN1));
}
} finally { } finally {
TEST_UTIL.deleteTable(tableName1); TEST_UTIL.deleteTable(tableName1);
TEST_UTIL.deleteTable(tableName2); TEST_UTIL.deleteTable(tableName2);
@ -185,7 +179,6 @@ public class TestCopyTable {
t2.getDescriptor().getValues().size()); t2.getDescriptor().getValues().size());
assertTrue("The mob row count is 0 but should be > 0", assertTrue("The mob row count is 0 but should be > 0",
MobTestUtil.countMobRows(t2) > 0); MobTestUtil.countMobRows(t2) > 0);
} finally { } finally {
TEST_UTIL.deleteTable(tableName1); TEST_UTIL.deleteTable(tableName1);
TEST_UTIL.deleteTable(tableName2); TEST_UTIL.deleteTable(tableName2);
@ -349,4 +342,95 @@ public class TestCopyTable {
args); args);
return status == 0; return status == 0;
} }
private void loadData(Table t, byte[] family, byte[] column) throws IOException {
for (int i = 0; i < 10; i++) {
byte[] row = Bytes.toBytes("row" + i);
Put p = new Put(row);
p.addColumn(family, column, row);
t.put(p);
}
}
private void verifyRows(Table t, byte[] family, byte[] column) throws IOException {
for (int i = 0; i < 10; i++) {
byte[] row = Bytes.toBytes("row" + i);
Get g = new Get(row).addFamily(family);
Result r = t.get(g);
Assert.assertNotNull(r);
Assert.assertEquals(1, r.size());
Cell cell = r.rawCells()[0];
Assert.assertTrue(CellUtil.matchingQualifier(cell, column));
Assert.assertEquals(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(),
cell.getValueLength(), row, 0, row.length), 0);
}
}
private Table createTable(TableName tableName, byte[] family, boolean isMob) throws IOException {
if (isMob) {
ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(family)
.setMobEnabled(true).setMobThreshold(1).build();
TableDescriptor desc =
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(cfd).build();
return TEST_UTIL.createTable(desc, null);
} else {
return TEST_UTIL.createTable(tableName, family);
}
}
private void testCopyTableBySnapshot(String tablePrefix, boolean bulkLoad, boolean isMob)
throws Exception {
TableName table1 = TableName.valueOf(tablePrefix + 1);
TableName table2 = TableName.valueOf(tablePrefix + 2);
Table t1 = createTable(table1, FAMILY_A, isMob);
Table t2 = createTable(table2, FAMILY_A, isMob);
loadData(t1, FAMILY_A, Bytes.toBytes("qualifier"));
String snapshot = tablePrefix + "_snapshot";
TEST_UTIL.getAdmin().snapshot(snapshot, table1);
boolean success;
if (bulkLoad) {
success =
runCopy(new String[] { "--snapshot", "--new.name=" + table2, "--bulkload", snapshot });
} else {
success = runCopy(new String[] { "--snapshot", "--new.name=" + table2, snapshot });
}
Assert.assertTrue(success);
verifyRows(t2, FAMILY_A, Bytes.toBytes("qualifier"));
}
@Test
public void testLoadingSnapshotToTable() throws Exception {
testCopyTableBySnapshot("testLoadingSnapshotToTable", false, false);
}
@Test
public void tsetLoadingSnapshotToMobTable() throws Exception {
testCopyTableBySnapshot("testLoadingSnapshotToMobTable", false, true);
}
@Test
public void testLoadingSnapshotAndBulkLoadToTable() throws Exception {
testCopyTableBySnapshot("testLoadingSnapshotAndBulkLoadToTable", true, false);
}
@Test
public void testLoadingSnapshotAndBulkLoadToMobTable() throws Exception {
testCopyTableBySnapshot("testLoadingSnapshotAndBulkLoadToMobTable", true, true);
}
@Test
public void testLoadingSnapshotToRemoteCluster() throws Exception {
Assert.assertFalse(runCopy(
new String[] { "--snapshot", "--peerAdr=hbase://remoteHBase", "sourceSnapshotName" }));
}
@Test
public void testLoadingSnapshotWithoutSnapshotName() throws Exception {
Assert.assertFalse(runCopy(new String[] { "--snapshot", "--peerAdr=hbase://remoteHBase" }));
}
@Test
public void testLoadingSnapshotWithoutDestTable() throws Exception {
Assert.assertFalse(runCopy(new String[] { "--snapshot", "sourceSnapshotName" }));
}
} }

View File

@ -26,11 +26,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -50,14 +51,21 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
public ClientSideRegionScanner(Configuration conf, FileSystem fs, public ClientSideRegionScanner(Configuration conf, FileSystem fs,
Path rootDir, TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics) Path rootDir, TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics)
throws IOException { throws IOException {
// region is immutable, set isolation level // region is immutable, set isolation level
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
htd = TableDescriptorBuilder.newBuilder(htd).setReadOnly(true).build(); htd = TableDescriptorBuilder.newBuilder(htd).setReadOnly(true).build();
// open region from the snapshot directory // open region from the snapshot directory
this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null); region = HRegion.newHRegion(FSUtils.getTableDir(rootDir, htd.getTableName()), null, fs, conf,
hri, htd, null);
// we won't initialize the MobFileCache when not running in RS process. so provided an
// initialized cache. Consider the case: an CF was set from an mob to non-mob. if we only
// initialize cache for MOB region, NPE from HMobStore will still happen. So Initialize the
// cache for every region although it may hasn't any mob CF, BTW the cache is very light-weight.
region.setMobFileCache(new MobFileCache(conf));
region.initialize();
// create an internal region scanner // create an internal region scanner
this.scanner = region.getScanner(scan); this.scanner = region.getScanner(scan);

View File

@ -7055,7 +7055,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param htd the table descriptor * @param htd the table descriptor
* @return the new instance * @return the new instance
*/ */
static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs, public static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,
Configuration conf, RegionInfo regionInfo, final TableDescriptor htd, Configuration conf, RegionInfo regionInfo, final TableDescriptor htd,
RegionServerServices rsServices) { RegionServerServices rsServices) {
try { try {