HBASE-6459 improve speed of table creation (Zhou Wenjian)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1400152 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c951cfbd31
commit
b6ca623286
|
@ -19,9 +19,18 @@
|
|||
package org.apache.hadoop.hbase.master.handler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -42,6 +51,7 @@ import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
|||
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
|
@ -122,7 +132,7 @@ public class CreateTableHandler extends EventHandler {
|
|||
if (cpHost != null) {
|
||||
cpHost.preCreateTableHandler(this.hTableDescriptor, this.newRegions);
|
||||
}
|
||||
handleCreateTable();
|
||||
handleCreateTable(tableName);
|
||||
if (cpHost != null) {
|
||||
cpHost.postCreateTableHandler(this.hTableDescriptor, this.newRegions);
|
||||
}
|
||||
|
@ -133,33 +143,47 @@ public class CreateTableHandler extends EventHandler {
|
|||
}
|
||||
}
|
||||
|
||||
private void handleCreateTable() throws IOException, KeeperException {
|
||||
|
||||
private void handleCreateTable(String tableName) throws IOException,
|
||||
KeeperException {
|
||||
int regionNumber = newRegions.length;
|
||||
ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(
|
||||
"RegionOpenAndInitThread-" + tableName, regionNumber);
|
||||
CompletionService<HRegion> completionService = new ExecutorCompletionService<HRegion>(
|
||||
regionOpenAndInitThreadPool);
|
||||
// TODO: Currently we make the table descriptor and as side-effect the
|
||||
// tableDir is created. Should we change below method to be createTable
|
||||
// where we create table in tmp dir with its table descriptor file and then
|
||||
// do rename to move it into place?
|
||||
FSTableDescriptors.createTableDescriptor(this.hTableDescriptor, this.conf);
|
||||
|
||||
List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
|
||||
final int batchSize =
|
||||
this.conf.getInt("hbase.master.createtable.batchsize", 100);
|
||||
for (int regionIdx = 0; regionIdx < this.newRegions.length; regionIdx++) {
|
||||
HRegionInfo newRegion = this.newRegions[regionIdx];
|
||||
// 1. Create HRegion
|
||||
HRegion region = HRegion.createHRegion(newRegion,
|
||||
this.fileSystemManager.getRootDir(), this.conf,
|
||||
this.hTableDescriptor, null, false, true);
|
||||
for (final HRegionInfo newRegion : newRegions) {
|
||||
completionService.submit(new Callable<HRegion>() {
|
||||
public HRegion call() throws IOException {
|
||||
|
||||
regionInfos.add(region.getRegionInfo());
|
||||
if (regionIdx % batchSize == 0) {
|
||||
// 2. Insert into META
|
||||
MetaEditor.addRegionsToMeta(this.catalogTracker, regionInfos);
|
||||
regionInfos.clear();
|
||||
// 1. Create HRegion
|
||||
HRegion region = HRegion.createHRegion(newRegion,
|
||||
fileSystemManager.getRootDir(), conf, hTableDescriptor, null,
|
||||
false, true);
|
||||
|
||||
// 2. Close the new region to flush to disk. Close log file too.
|
||||
region.close();
|
||||
return region;
|
||||
}
|
||||
});
|
||||
}
|
||||
try {
|
||||
// 3. wait for all regions to finish creation
|
||||
for (int i = 0; i < regionNumber; i++) {
|
||||
Future<HRegion> future = completionService.take();
|
||||
HRegion region = future.get();
|
||||
regionInfos.add(region.getRegionInfo());
|
||||
}
|
||||
|
||||
// 3. Close the new region to flush to disk. Close log file too.
|
||||
region.close();
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException(e.getMessage());
|
||||
} catch (ExecutionException e) {
|
||||
throw new IOException(e.getCause());
|
||||
} finally {
|
||||
regionOpenAndInitThreadPool.shutdownNow();
|
||||
}
|
||||
if (regionInfos.size() > 0) {
|
||||
MetaEditor.addRegionsToMeta(this.catalogTracker, regionInfos);
|
||||
|
@ -184,4 +208,21 @@ public class CreateTableHandler extends EventHandler {
|
|||
" enabled because of a ZooKeeper issue", e);
|
||||
}
|
||||
}
|
||||
|
||||
protected ThreadPoolExecutor getRegionOpenAndInitThreadPool(
|
||||
final String threadNamePrefix, int regionNumber) {
|
||||
int maxThreads = Math.min(regionNumber, conf.getInt(
|
||||
"hbase.hregion.open.and.init.threads.max", 10));
|
||||
ThreadPoolExecutor openAndInitializeThreadPool = Threads
|
||||
.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
|
||||
new ThreadFactory() {
|
||||
private int count = 1;
|
||||
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(r, threadNamePrefix + "-" + count++);
|
||||
return t;
|
||||
}
|
||||
});
|
||||
return openAndInitializeThreadPool;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue