diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index e3499129ec7..fb5bd317ece 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1174,6 +1174,13 @@ public final class HConstants { "hbase.heap.occupancy.high_water_mark"; public static final float DEFAULT_HEAP_OCCUPANCY_HIGH_WATERMARK = 0.98f; + /** + * The max number of threads used for splitting storefiles in parallel during + * the region split process. + */ + public static final String REGION_SPLIT_THREADS_MAX = + "hbase.regionserver.region.split.threads.max"; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java index 08212a3cd96..5c24eaf7bb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java @@ -682,17 +682,29 @@ public class SplitTransactionImpl implements SplitTransaction { // The following code sets up a thread pool executor with as many slots as // there's files to split. It then fires up everything, waits for // completion and finally checks for any exception - int nbFiles = hstoreFilesToSplit.size(); + int nbFiles = 0; + for (Map.Entry> entry: hstoreFilesToSplit.entrySet()) { + nbFiles += entry.getValue().size(); + } if (nbFiles == 0) { // no file needs to be splitted. return new Pair(0,0); } - LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent); + // Default max #threads to use is the smaller of table's configured number of blocking store + // files or the available number of logical cores. + int defMaxThreads = Math.min(parent.conf.getInt(HStore.BLOCKING_STOREFILES_KEY, + HStore.DEFAULT_BLOCKING_STOREFILE_COUNT), + Runtime.getRuntime().availableProcessors()); + // Max #threads is the smaller of the number of storefiles or the default max determined above. + int maxThreads = Math.min(parent.conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX, + defMaxThreads), nbFiles); + LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent + + " using " + maxThreads + " threads"); ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); builder.setNameFormat("StoreFileSplitter-%1$d"); ThreadFactory factory = builder.build(); ThreadPoolExecutor threadPool = - (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory); + (ThreadPoolExecutor) Executors.newFixedThreadPool(maxThreads, factory); List>> futures = new ArrayList>> (nbFiles); // Split each store file. @@ -738,14 +750,18 @@ public class SplitTransactionImpl implements SplitTransaction { } if (LOG.isDebugEnabled()) { - LOG.debug("Split storefiles for region " + this.parent + " Daugther A: " + created_a - + " storefiles, Daugther B: " + created_b + " storefiles."); + LOG.debug("Split storefiles for region " + this.parent + " Daughter A: " + created_a + + " storefiles, Daughter B: " + created_b + " storefiles."); } return new Pair(created_a, created_b); } private Pair splitStoreFile(final byte[] family, final StoreFile sf) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Splitting started for store file: " + sf.getPath() + " for region: " + + this.parent); + } HRegionFileSystem fs = this.parent.getRegionFileSystem(); String familyName = Bytes.toString(family); @@ -755,6 +771,10 @@ public class SplitTransactionImpl implements SplitTransaction { Path path_b = fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true, this.parent.getSplitPolicy()); + if (LOG.isDebugEnabled()) { + LOG.debug("Splitting complete for store file: " + sf.getPath() + " for region: " + + this.parent); + } return new Pair(path_a, path_b); }