HBASE-13959 Region splitting uses a single thread in most common cases. (Hari Krishna Dara)
This commit is contained in:
parent
254ef1624e
commit
163ddbf03c
|
@ -1174,6 +1174,13 @@ public final class HConstants {
|
||||||
"hbase.heap.occupancy.high_water_mark";
|
"hbase.heap.occupancy.high_water_mark";
|
||||||
public static final float DEFAULT_HEAP_OCCUPANCY_HIGH_WATERMARK = 0.98f;
|
public static final float DEFAULT_HEAP_OCCUPANCY_HIGH_WATERMARK = 0.98f;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The max number of threads used for splitting storefiles in parallel during
|
||||||
|
* the region split process.
|
||||||
|
*/
|
||||||
|
public static final String REGION_SPLIT_THREADS_MAX =
|
||||||
|
"hbase.regionserver.region.split.threads.max";
|
||||||
|
|
||||||
private HConstants() {
|
private HConstants() {
|
||||||
// Can't be instantiated with this ctor.
|
// Can't be instantiated with this ctor.
|
||||||
}
|
}
|
||||||
|
|
|
@ -682,17 +682,29 @@ public class SplitTransactionImpl implements SplitTransaction {
|
||||||
// The following code sets up a thread pool executor with as many slots as
|
// The following code sets up a thread pool executor with as many slots as
|
||||||
// there's files to split. It then fires up everything, waits for
|
// there's files to split. It then fires up everything, waits for
|
||||||
// completion and finally checks for any exception
|
// completion and finally checks for any exception
|
||||||
int nbFiles = hstoreFilesToSplit.size();
|
int nbFiles = 0;
|
||||||
|
for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
|
||||||
|
nbFiles += entry.getValue().size();
|
||||||
|
}
|
||||||
if (nbFiles == 0) {
|
if (nbFiles == 0) {
|
||||||
// no file needs to be splitted.
|
// no file needs to be splitted.
|
||||||
return new Pair<Integer, Integer>(0,0);
|
return new Pair<Integer, Integer>(0,0);
|
||||||
}
|
}
|
||||||
LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent);
|
// Default max #threads to use is the smaller of table's configured number of blocking store
|
||||||
|
// files or the available number of logical cores.
|
||||||
|
int defMaxThreads = Math.min(parent.conf.getInt(HStore.BLOCKING_STOREFILES_KEY,
|
||||||
|
HStore.DEFAULT_BLOCKING_STOREFILE_COUNT),
|
||||||
|
Runtime.getRuntime().availableProcessors());
|
||||||
|
// Max #threads is the smaller of the number of storefiles or the default max determined above.
|
||||||
|
int maxThreads = Math.min(parent.conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
|
||||||
|
defMaxThreads), nbFiles);
|
||||||
|
LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent +
|
||||||
|
" using " + maxThreads + " threads");
|
||||||
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
|
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
|
||||||
builder.setNameFormat("StoreFileSplitter-%1$d");
|
builder.setNameFormat("StoreFileSplitter-%1$d");
|
||||||
ThreadFactory factory = builder.build();
|
ThreadFactory factory = builder.build();
|
||||||
ThreadPoolExecutor threadPool =
|
ThreadPoolExecutor threadPool =
|
||||||
(ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory);
|
(ThreadPoolExecutor) Executors.newFixedThreadPool(maxThreads, factory);
|
||||||
List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
|
List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
|
||||||
|
|
||||||
// Split each store file.
|
// Split each store file.
|
||||||
|
@ -738,14 +750,18 @@ public class SplitTransactionImpl implements SplitTransaction {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Split storefiles for region " + this.parent + " Daugther A: " + created_a
|
LOG.debug("Split storefiles for region " + this.parent + " Daughter A: " + created_a
|
||||||
+ " storefiles, Daugther B: " + created_b + " storefiles.");
|
+ " storefiles, Daughter B: " + created_b + " storefiles.");
|
||||||
}
|
}
|
||||||
return new Pair<Integer, Integer>(created_a, created_b);
|
return new Pair<Integer, Integer>(created_a, created_b);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf)
|
private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Splitting started for store file: " + sf.getPath() + " for region: " +
|
||||||
|
this.parent);
|
||||||
|
}
|
||||||
HRegionFileSystem fs = this.parent.getRegionFileSystem();
|
HRegionFileSystem fs = this.parent.getRegionFileSystem();
|
||||||
String familyName = Bytes.toString(family);
|
String familyName = Bytes.toString(family);
|
||||||
|
|
||||||
|
@ -755,6 +771,10 @@ public class SplitTransactionImpl implements SplitTransaction {
|
||||||
Path path_b =
|
Path path_b =
|
||||||
fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
|
fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
|
||||||
this.parent.getSplitPolicy());
|
this.parent.getSplitPolicy());
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Splitting complete for store file: " + sf.getPath() + " for region: " +
|
||||||
|
this.parent);
|
||||||
|
}
|
||||||
return new Pair<Path,Path>(path_a, path_b);
|
return new Pair<Path,Path>(path_a, path_b);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue