HBASE-7911 Remove duplicated code from CreateTableHandler
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1449400 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
78b8109fdc
commit
23ad43710f
|
@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.master.TableLockManager;
|
||||||
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
|
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||||
|
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
|
@ -255,59 +256,7 @@ public class CreateTableHandler extends EventHandler {
|
||||||
protected List<HRegionInfo> handleCreateHdfsRegions(final Path tableRootDir,
|
protected List<HRegionInfo> handleCreateHdfsRegions(final Path tableRootDir,
|
||||||
final String tableName)
|
final String tableName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int regionNumber = newRegions.length;
|
return ModifyRegionUtils.createRegions(conf, tableRootDir,
|
||||||
ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(
|
hTableDescriptor, newRegions, null);
|
||||||
"RegionOpenAndInitThread-" + tableName, regionNumber);
|
|
||||||
CompletionService<HRegion> completionService = new ExecutorCompletionService<HRegion>(
|
|
||||||
regionOpenAndInitThreadPool);
|
|
||||||
|
|
||||||
List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
|
|
||||||
for (final HRegionInfo newRegion : newRegions) {
|
|
||||||
completionService.submit(new Callable<HRegion>() {
|
|
||||||
public HRegion call() throws IOException {
|
|
||||||
|
|
||||||
// 1. Create HRegion
|
|
||||||
HRegion region = HRegion.createHRegion(newRegion,
|
|
||||||
tableRootDir, conf, hTableDescriptor, null,
|
|
||||||
false, true);
|
|
||||||
// 2. Close the new region to flush to disk. Close log file too.
|
|
||||||
region.close();
|
|
||||||
return region;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
// 3. wait for all regions to finish creation
|
|
||||||
for (int i = 0; i < regionNumber; i++) {
|
|
||||||
Future<HRegion> future = completionService.take();
|
|
||||||
HRegion region = future.get();
|
|
||||||
regionInfos.add(region.getRegionInfo());
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new InterruptedIOException(e.getMessage());
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
throw new IOException(e.getCause());
|
|
||||||
} finally {
|
|
||||||
regionOpenAndInitThreadPool.shutdownNow();
|
|
||||||
}
|
|
||||||
|
|
||||||
return regionInfos;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected ThreadPoolExecutor getRegionOpenAndInitThreadPool(
|
|
||||||
final String threadNamePrefix, int regionNumber) {
|
|
||||||
int maxThreads = Math.min(regionNumber, conf.getInt(
|
|
||||||
"hbase.hregion.open.and.init.threads.max", 10));
|
|
||||||
ThreadPoolExecutor openAndInitializeThreadPool = Threads
|
|
||||||
.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
|
|
||||||
new ThreadFactory() {
|
|
||||||
private int count = 1;
|
|
||||||
|
|
||||||
public Thread newThread(Runnable r) {
|
|
||||||
Thread t = new Thread(r, threadNamePrefix + "-" + count++);
|
|
||||||
return t;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return openAndInitializeThreadPool;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,8 +106,6 @@ public abstract class ModifyRegionUtils {
|
||||||
HRegion region = HRegion.createHRegion(newRegion,
|
HRegion region = HRegion.createHRegion(newRegion,
|
||||||
rootDir, conf, hTableDescriptor, null,
|
rootDir, conf, hTableDescriptor, null,
|
||||||
false, true);
|
false, true);
|
||||||
HRegion.writeRegioninfoOnFilesystem(region.getRegionInfo(), region.getRegionDir(),
|
|
||||||
region.getFilesystem(), conf);
|
|
||||||
try {
|
try {
|
||||||
// 2. Custom user code to interact with the created region
|
// 2. Custom user code to interact with the created region
|
||||||
if (task != null) {
|
if (task != null) {
|
||||||
|
@ -176,26 +174,4 @@ public abstract class ModifyRegionUtils {
|
||||||
throw new InterruptedIOException(e.getMessage());
|
throw new InterruptedIOException(e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Remove specified regions by removing them from file-system and .META.
|
|
||||||
* (The regions must be offline).
|
|
||||||
*
|
|
||||||
* @param fs {@link FileSystem} on which to delete the region directory
|
|
||||||
* @param catalogTracker the catalog tracker
|
|
||||||
* @param regions list of {@link HRegionInfo} to delete.
|
|
||||||
*/
|
|
||||||
public static void deleteRegions(final Configuration conf, final FileSystem fs,
|
|
||||||
final CatalogTracker catalogTracker, final List<HRegionInfo> regions) throws IOException {
|
|
||||||
if (regions != null && regions.size() > 0) {
|
|
||||||
List<Delete> deletes = new ArrayList<Delete>(regions.size());
|
|
||||||
for (HRegionInfo hri: regions) {
|
|
||||||
deletes.add(new Delete(hri.getRegionName()));
|
|
||||||
|
|
||||||
// "Delete" region from FS
|
|
||||||
HFileArchiver.archiveRegion(conf, fs, hri);
|
|
||||||
}
|
|
||||||
MetaEditor.deleteFromMetaTable(catalogTracker, deletes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue