HBASE-21445 CopyTable by bulkload will write hfile into yarn's HDFS

This commit is contained in:
huzheng 2018-11-07 11:25:43 +08:00
parent ccabf7310d
commit 8135285506
1 changed files with 8 additions and 7 deletions

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -139,13 +140,13 @@ public class CopyTable extends Configured implements Tool {
job.setNumReduceTasks(0); job.setNumReduceTasks(0);
if (bulkload) { if (bulkload) {
TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.CellImporter.class, null, TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.CellImporter.class, null, null,
null, job); job);
// We need to split the inputs by destination tables so that output of Map can be bulk-loaded. // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName)); TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
FileSystem fs = FileSystem.get(getConf()); FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
Random rand = new Random(); Random rand = new Random();
Path root = new Path(fs.getWorkingDirectory(), "copytable"); Path root = new Path(fs.getWorkingDirectory(), "copytable");
fs.mkdirs(root); fs.mkdirs(root);
@ -161,7 +162,7 @@ public class CopyTable extends Configured implements Tool {
try (Connection conn = ConnectionFactory.createConnection(getConf()); try (Connection conn = ConnectionFactory.createConnection(getConf());
Admin admin = conn.getAdmin()) { Admin admin = conn.getAdmin()) {
HFileOutputFormat2.configureIncrementalLoadMap(job, HFileOutputFormat2.configureIncrementalLoadMap(job,
admin.getDescriptor((TableName.valueOf(dstTableName)))); admin.getDescriptor((TableName.valueOf(dstTableName))));
} }
} else { } else {
TableMapReduceUtil.initTableMapperJob(tableName, scan, TableMapReduceUtil.initTableMapperJob(tableName, scan,
@ -370,12 +371,12 @@ public class CopyTable extends Configured implements Tool {
} }
int code = 0; int code = 0;
if (bulkload) { if (bulkload) {
code = new LoadIncrementalHFiles(this.getConf()).run(new String[]{this.bulkloadDir.toString(), code = new LoadIncrementalHFiles(this.getConf())
this.dstTableName}); .run(new String[] { this.bulkloadDir.toString(), this.dstTableName });
if (code == 0) { if (code == 0) {
// bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun
// LoadIncrementalHFiles. // LoadIncrementalHFiles.
FileSystem fs = FileSystem.get(this.getConf()); FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
if (!fs.delete(this.bulkloadDir, true)) { if (!fs.delete(this.bulkloadDir, true)) {
LOG.error("Deleting folder " + bulkloadDir + " failed!"); LOG.error("Deleting folder " + bulkloadDir + " failed!");
code = 1; code = 1;