HBASE-21445 CopyTable by bulkload will write hfile into yarn's HDFS
This commit is contained in:
parent
c6090d4f04
commit
189122f3fe
|
@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -139,13 +140,13 @@ public class CopyTable extends Configured implements Tool {
|
||||||
job.setNumReduceTasks(0);
|
job.setNumReduceTasks(0);
|
||||||
|
|
||||||
if (bulkload) {
|
if (bulkload) {
|
||||||
TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.CellImporter.class, null,
|
TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.CellImporter.class, null, null,
|
||||||
null, job);
|
job);
|
||||||
|
|
||||||
// We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
|
// We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
|
||||||
TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
|
TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
|
||||||
|
|
||||||
FileSystem fs = FileSystem.get(getConf());
|
FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
|
||||||
Random rand = new Random();
|
Random rand = new Random();
|
||||||
Path root = new Path(fs.getWorkingDirectory(), "copytable");
|
Path root = new Path(fs.getWorkingDirectory(), "copytable");
|
||||||
fs.mkdirs(root);
|
fs.mkdirs(root);
|
||||||
|
@ -161,7 +162,7 @@ public class CopyTable extends Configured implements Tool {
|
||||||
try (Connection conn = ConnectionFactory.createConnection(getConf());
|
try (Connection conn = ConnectionFactory.createConnection(getConf());
|
||||||
Admin admin = conn.getAdmin()) {
|
Admin admin = conn.getAdmin()) {
|
||||||
HFileOutputFormat2.configureIncrementalLoadMap(job,
|
HFileOutputFormat2.configureIncrementalLoadMap(job,
|
||||||
admin.getDescriptor((TableName.valueOf(dstTableName))));
|
admin.getDescriptor((TableName.valueOf(dstTableName))));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
TableMapReduceUtil.initTableMapperJob(tableName, scan,
|
TableMapReduceUtil.initTableMapperJob(tableName, scan,
|
||||||
|
@ -370,12 +371,12 @@ public class CopyTable extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
int code = 0;
|
int code = 0;
|
||||||
if (bulkload) {
|
if (bulkload) {
|
||||||
code = new LoadIncrementalHFiles(this.getConf()).run(new String[]{this.bulkloadDir.toString(),
|
code = new LoadIncrementalHFiles(this.getConf())
|
||||||
this.dstTableName});
|
.run(new String[] { this.bulkloadDir.toString(), this.dstTableName });
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
// bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun
|
// bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun
|
||||||
// LoadIncrementalHFiles.
|
// LoadIncrementalHFiles.
|
||||||
FileSystem fs = FileSystem.get(this.getConf());
|
FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
|
||||||
if (!fs.delete(this.bulkloadDir, true)) {
|
if (!fs.delete(this.bulkloadDir, true)) {
|
||||||
LOG.error("Deleting folder " + bulkloadDir + " failed!");
|
LOG.error("Deleting folder " + bulkloadDir + " failed!");
|
||||||
code = 1;
|
code = 1;
|
||||||
|
|
Loading…
Reference in New Issue