From ceaf3ee3d7a0fb30862b227fa5985dc81f8716c1 Mon Sep 17 00:00:00 2001 From: Jean-Daniel Cryans Date: Wed, 8 Dec 2010 21:38:25 +0000 Subject: [PATCH] HBASE-3308 SplitTransaction.splitStoreFiles slows splits a lot HBASE-3318 Split rollback leaves parent with writesEnabled=false git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1043688 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 4 + .../hadoop/hbase/regionserver/HRegion.java | 5 +- .../hbase/regionserver/SplitTransaction.java | 83 ++++++++++++++++++- 3 files changed, 85 insertions(+), 7 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 593e539d300..1032d74c65d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -30,6 +30,8 @@ Release 0.91.0 - Unreleased HBASE-3316 Add support for Java Serialization to HbaseObjectWritable (Ed Kohlwey via Stack) HBASE-1861 Multi-Family support for bulk upload tools + HBASE-3308 SplitTransaction.splitStoreFiles slows splits a lot + NEW FEATURES HBASE-3287 Add option to cache blocks on hfile write and evict blocks on @@ -759,6 +761,8 @@ Release 0.90.0 - Unreleased HBASE-3314 [shell] 'move' is broken HBASE-3315 Add debug output for when balancer makes bad balance HBASE-3278 AssertionError in LoadBalancer + HBASE-3318 Split rollback leaves parent with writesEnabled=false + IMPROVEMENTS diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a60b62f44b6..cf9cad016c7 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -376,10 +376,7 @@ public class HRegion implements HeapSize { // , Writable{ SplitTransaction.cleanupAnySplitDetritus(this); FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR)); - // See if region is meant to run read-only. - if (this.regionInfo.getTableDesc().isReadOnly()) { - this.writestate.setReadOnly(true); - } + this.writestate.setReadOnly(this.regionInfo.getTableDesc().isReadOnly()); this.writestate.compacting = false; this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java index 4fe41406ada..58f9191158e 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -23,7 +23,15 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.ListIterator; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -75,6 +83,7 @@ public class SplitTransaction { private HRegionInfo hri_a; private HRegionInfo hri_b; private Path splitdir; + private long fileSplitTimeout = 30000; /* * Row to split around @@ -186,6 +195,8 @@ public class SplitTransaction { throw new IOException("Server is stopped or stopping"); } assert !this.parent.lock.writeLock().isHeldByCurrentThread() : "Unsafe to hold write lock while performing RPCs"; + this.fileSplitTimeout = server.getConfiguration().getLong( + "hbase.regionserver.fileSplitTimeout", this.fileSplitTimeout); // Coprocessor callback if (this.parent.getCoprocessorHost() != null) { @@ -394,11 +405,52 @@ public class SplitTransaction { // Could be null because close didn't succeed -- for now consider it fatal throw new IOException("Close returned empty list of StoreFiles"); } + // The following code sets up a thread pool executor with as many slots as + // there's files to split. It then fires up everything, waits for + // completion and finally checks for any exception + int nbFiles = hstoreFilesToSplit.size(); + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); + builder.setNameFormat("StoreFileSplitter-%1$d"); + ThreadFactory factory = builder.build(); + ThreadPoolExecutor threadPool = + (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory); + List> futures = new ArrayList>(nbFiles); // Split each store file. - for (StoreFile sf: hstoreFilesToSplit) { - splitStoreFile(sf, splitdir); - } + for (StoreFile sf: hstoreFilesToSplit) { + //splitStoreFile(sf, splitdir); + StoreFileSplitter sfs = new StoreFileSplitter(sf, splitdir); + futures.add(threadPool.submit(sfs)); + } + // Shutdown the pool + threadPool.shutdown(); + + // Wait for all the tasks to finish + try { + boolean stillRunning = !threadPool.awaitTermination( + this.fileSplitTimeout, TimeUnit.MILLISECONDS); + if (stillRunning) { + threadPool.shutdownNow(); + throw new IOException("Took too long to split the" + + " files and create the references, aborting split"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting for file splitters", e); + } + + // Look for any exception + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + "Interrupted while trying to get the results of file splitters", e); + } catch (ExecutionException e) { + throw new IOException(e); + } + } } private void splitStoreFile(final StoreFile sf, final Path splitdir) @@ -413,6 +465,31 @@ public class SplitTransaction { StoreFile.split(fs, storedir, sf, this.splitrow, Range.top); } + /** + * Utility class used to do the file splitting / reference writing + * in parallel instead of sequentially. + */ + class StoreFileSplitter implements Callable { + + private final StoreFile sf; + private final Path splitdir; + + /** + * Constructor that takes what it needs to split + * @param sf which file + * @param splitdir where the splitting is done + */ + public StoreFileSplitter(final StoreFile sf, final Path splitdir) { + this.sf = sf; + this.splitdir = splitdir; + } + + public Void call() throws IOException { + splitStoreFile(sf, splitdir); + return null; + } + } + /** * @param hri Spec. for daughter region to open. * @param flusher Flusher this region should use.