HBASE-10500 Some tools OOM when BucketCache is enabled
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1567687 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8b99b91cb9
commit
e6fa0cf92d
|
@ -100,7 +100,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
|
private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
|
||||||
static final AtomicLong regionCount = new AtomicLong(0);
|
static final AtomicLong regionCount = new AtomicLong(0);
|
||||||
private HBaseAdmin hbAdmin;
|
private HBaseAdmin hbAdmin;
|
||||||
private Configuration cfg;
|
|
||||||
|
|
||||||
public static final String NAME = "completebulkload";
|
public static final String NAME = "completebulkload";
|
||||||
private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
|
private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
|
||||||
|
@ -113,7 +112,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
|
|
||||||
public LoadIncrementalHFiles(Configuration conf) throws Exception {
|
public LoadIncrementalHFiles(Configuration conf) throws Exception {
|
||||||
super(conf);
|
super(conf);
|
||||||
this.cfg = conf;
|
// make a copy, just to be sure we're not overriding someone else's config
|
||||||
|
setConf(HBaseConfiguration.create(getConf()));
|
||||||
|
// disable blockcache for tool invocation, see HBASE-10500
|
||||||
|
getConf().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
|
||||||
this.hbAdmin = new HBaseAdmin(conf);
|
this.hbAdmin = new HBaseAdmin(conf);
|
||||||
this.userProvider = UserProvider.instantiate(conf);
|
this.userProvider = UserProvider.instantiate(conf);
|
||||||
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
|
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
|
||||||
|
@ -204,7 +206,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize thread pools
|
// initialize thread pools
|
||||||
int nrThreads = cfg.getInt("hbase.loadincremental.threads.max",
|
int nrThreads = getConf().getInt("hbase.loadincremental.threads.max",
|
||||||
Runtime.getRuntime().availableProcessors());
|
Runtime.getRuntime().availableProcessors());
|
||||||
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
|
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
|
||||||
builder.setNameFormat("LoadIncrementalHFiles-%1$d");
|
builder.setNameFormat("LoadIncrementalHFiles-%1$d");
|
||||||
|
@ -252,7 +254,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
//If using secure bulk load
|
//If using secure bulk load
|
||||||
//prepare staging directory and token
|
//prepare staging directory and token
|
||||||
if (userProvider.isHBaseSecurityEnabled()) {
|
if (userProvider.isHBaseSecurityEnabled()) {
|
||||||
FileSystem fs = FileSystem.get(cfg);
|
FileSystem fs = FileSystem.get(getConf());
|
||||||
//This condition is here for unit testing
|
//This condition is here for unit testing
|
||||||
//Since delegation token doesn't work in mini cluster
|
//Since delegation token doesn't work in mini cluster
|
||||||
if (userProvider.isHadoopSecurityEnabled()) {
|
if (userProvider.isHadoopSecurityEnabled()) {
|
||||||
|
@ -278,7 +280,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
+ count + " with " + queue.size() + " files remaining to group or split");
|
+ count + " with " + queue.size() + " files remaining to group or split");
|
||||||
}
|
}
|
||||||
|
|
||||||
int maxRetries = cfg.getInt("hbase.bulkload.retries.number", 0);
|
int maxRetries = getConf().getInt("hbase.bulkload.retries.number", 0);
|
||||||
if (maxRetries != 0 && count >= maxRetries) {
|
if (maxRetries != 0 && count >= maxRetries) {
|
||||||
LOG.error("Retry attempted " + count + " times without completing, bailing out");
|
LOG.error("Retry attempted " + count + " times without completing, bailing out");
|
||||||
return;
|
return;
|
||||||
|
@ -300,7 +302,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
if (userProvider.isHBaseSecurityEnabled()) {
|
if (userProvider.isHBaseSecurityEnabled()) {
|
||||||
if (userToken != null && !hasForwardedToken) {
|
if (userToken != null && !hasForwardedToken) {
|
||||||
try {
|
try {
|
||||||
userToken.cancel(cfg);
|
userToken.cancel(getConf());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn("Failed to cancel HDFS delegation token.", e);
|
LOG.warn("Failed to cancel HDFS delegation token.", e);
|
||||||
}
|
}
|
||||||
|
@ -579,7 +581,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
//from the staging directory back to original location
|
//from the staging directory back to original location
|
||||||
//in user directory
|
//in user directory
|
||||||
if(secureClient != null && !success) {
|
if(secureClient != null && !success) {
|
||||||
FileSystem fs = FileSystem.get(cfg);
|
FileSystem fs = FileSystem.get(getConf());
|
||||||
for(Pair<byte[], String> el : famPaths) {
|
for(Pair<byte[], String> el : famPaths) {
|
||||||
Path hfileStagingPath = null;
|
Path hfileStagingPath = null;
|
||||||
Path hfileOrigPath = new Path(el.getSecond());
|
Path hfileOrigPath = new Path(el.getSecond());
|
||||||
|
@ -818,14 +820,15 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
if (!tableExists) this.createTable(tableName,dirPath);
|
if (!tableExists) this.createTable(tableName,dirPath);
|
||||||
|
|
||||||
Path hfofDir = new Path(dirPath);
|
Path hfofDir = new Path(dirPath);
|
||||||
HTable table = new HTable(this.cfg, tableName);
|
HTable table = new HTable(getConf(), tableName);
|
||||||
|
|
||||||
doBulkLoad(hfofDir, table);
|
doBulkLoad(hfofDir, table);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
int ret = ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args);
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
int ret = ToolRunner.run(new LoadIncrementalHFiles(conf), args);
|
||||||
System.exit(ret);
|
System.exit(ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -270,6 +270,10 @@ public class HBaseFsck extends Configured {
|
||||||
public HBaseFsck(Configuration conf) throws MasterNotRunningException,
|
public HBaseFsck(Configuration conf) throws MasterNotRunningException,
|
||||||
ZooKeeperConnectionException, IOException, ClassNotFoundException {
|
ZooKeeperConnectionException, IOException, ClassNotFoundException {
|
||||||
super(conf);
|
super(conf);
|
||||||
|
// make a copy, just to be sure we're not overriding someone else's config
|
||||||
|
setConf(HBaseConfiguration.create(getConf()));
|
||||||
|
// disable blockcache for tool invocation, see HBASE-10500
|
||||||
|
getConf().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
|
||||||
errors = getErrorReporter(conf);
|
errors = getErrorReporter(conf);
|
||||||
|
|
||||||
int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
|
int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
|
||||||
|
|
Loading…
Reference in New Issue