HBASE-5033 Differential Revision: 933 Opening/Closing store in parallel to reduce region open/close time (Liyin)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1230347 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-01-12 01:19:26 +00:00
parent 06ca87ae87
commit f90f4971bd
4 changed files with 251 additions and 43 deletions

View File

@ -19,15 +19,15 @@
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.util.Bytes;
/**
* HConstants holds a bunch of HBase-related constants
*/
@ -218,6 +218,19 @@ public final class HConstants {
public static final String HREGION_MAX_FILESIZE =
"hbase.hregion.max.filesize";
/**
* The max number of threads used for opening and closing stores or store
* files in parallel
*/
public static final String HSTORE_OPEN_AND_CLOSE_THREADS_MAX =
"hbase.hstore.open.and.close.threads.max";
/**
* The default number for the max number of threads used for opening and
* closing stores or store files in parallel
*/
public static final int DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX = 1;
/** Default maximum file size */
public static final long DEFAULT_MAX_FILE_SIZE = 256 * 1024 * 1024;

View File

@ -40,10 +40,17 @@ import java.util.NavigableSet;
import java.util.Random;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -106,6 +113,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HashedBytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.Writable;
@ -114,6 +122,7 @@ import org.cliffc.high_scale_lib.Counter;
import com.google.common.base.Preconditions;
import com.google.common.collect.ClassToInstanceMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MutableClassToInstanceMap;
@ -433,7 +442,7 @@ public class HRegion implements HeapSize { // , Writable{
* @see HRegion#newHRegion(Path, HLog, FileSystem, Configuration, HRegionInfo, HTableDescriptor, RegionServerServices)
*/
public HRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf,
HRegionInfo regionInfo, final HTableDescriptor htd,
final HRegionInfo regionInfo, final HTableDescriptor htd,
RegionServerServices rsServices) {
this.tableDir = tableDir;
this.comparator = regionInfo.getComparator();
@ -542,10 +551,31 @@ public class HRegion implements HeapSize { // , Writable{
long maxSeqId = -1;
// initialized to -1 so that we pick up MemstoreTS from column families
long maxMemstoreTS = -1;
for (HColumnDescriptor c : this.htableDescriptor.getFamilies()) {
status.setStatus("Instantiating store for column family " + c);
Store store = instantiateHStore(this.tableDir, c);
this.stores.put(c.getName(), store);
if (this.htableDescriptor != null &&
!htableDescriptor.getFamilies().isEmpty()) {
// initialize the thread pool for opening stores in parallel.
ThreadPoolExecutor storeOpenerThreadPool =
getStoreOpenAndCloseThreadPool(
"StoreOpenerThread-" + this.regionInfo.getRegionNameAsString());
CompletionService<Store> completionService =
new ExecutorCompletionService<Store>(storeOpenerThreadPool);
// initialize each store in parallel
for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
status.setStatus("Instantiating store for column family " + family);
completionService.submit(new Callable<Store>() {
public Store call() throws IOException {
return instantiateHStore(tableDir, family);
}
});
}
try {
for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
Future<Store> future = completionService.take();
Store store = future.get();
this.stores.put(store.getColumnFamilyName().getBytes(), store);
long storeSeqId = store.getMaxSequenceId();
if (minSeqId == -1 || storeSeqId < minSeqId) {
minSeqId = storeSeqId;
@ -558,6 +588,14 @@ public class HRegion implements HeapSize { // , Writable{
maxMemstoreTS = maxStoreMemstoreTS;
}
}
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} finally {
storeOpenerThreadPool.shutdownNow();
}
}
mvcc.initialize(maxMemstoreTS + 1);
// Recover any edits if available.
maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
@ -883,8 +921,38 @@ public class HRegion implements HeapSize { // , Writable{
}
List<StoreFile> result = new ArrayList<StoreFile>();
for (Store store : stores.values()) {
result.addAll(store.close());
if (!stores.isEmpty()) {
// initialize the thread pool for closing stores in parallel.
ThreadPoolExecutor storeCloserThreadPool =
getStoreOpenAndCloseThreadPool("StoreCloserThread-"
+ this.regionInfo.getRegionNameAsString());
CompletionService<ImmutableList<StoreFile>> completionService =
new ExecutorCompletionService<ImmutableList<StoreFile>>(
storeCloserThreadPool);
// close each store in parallel
for (final Store store : stores.values()) {
completionService
.submit(new Callable<ImmutableList<StoreFile>>() {
public ImmutableList<StoreFile> call() throws IOException {
return store.close();
}
});
}
try {
for (int i = 0; i < stores.size(); i++) {
Future<ImmutableList<StoreFile>> future = completionService
.take();
ImmutableList<StoreFile> storeFileList = future.get();
result.addAll(storeFileList);
}
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} finally {
storeCloserThreadPool.shutdownNow();
}
}
this.closed.set(true);
@ -900,6 +968,40 @@ public class HRegion implements HeapSize { // , Writable{
}
}
protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
final String threadNamePrefix) {
int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
int maxThreads = Math.min(numStores,
conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
}
protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
final String threadNamePrefix) {
int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
int maxThreads = Math.max(1,
conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
/ numStores);
return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
}
private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
final String threadNamePrefix) {
ThreadPoolExecutor openAndCloseThreadPool = Threads
.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
new ThreadFactory() {
private int count = 1;
public Thread newThread(Runnable r) {
Thread t = new Thread(r, threadNamePrefix + "-" + count++);
return t;
}
});
return openAndCloseThreadPool;
}
/**
* @return True if its worth doing a flush before we put up the close flag.
*/

View File

@ -27,7 +27,13 @@ import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.SortedSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -271,38 +277,73 @@ public class Store extends SchemaConfigured implements HeapSize {
return homedir;
}
/*
* Creates an unsorted list of StoreFile loaded from the given directory.
/**
* Creates an unsorted list of StoreFile loaded in parallel
* from the given directory.
* @throws IOException
*/
private List<StoreFile> loadStoreFiles()
throws IOException {
private List<StoreFile> loadStoreFiles() throws IOException {
ArrayList<StoreFile> results = new ArrayList<StoreFile>();
FileStatus files[] = FSUtils.listStatus(this.fs, this.homedir, null);
for (int i = 0; files != null && i < files.length; i++) {
if (files == null || files.length == 0) {
return results;
}
// initialize the thread pool for opening store files in parallel..
ThreadPoolExecutor storeFileOpenerThreadPool =
this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
this.family.getNameAsString());
CompletionService<StoreFile> completionService =
new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
int totalValidStoreFile = 0;
for (int i = 0; i < files.length; i++) {
// Skip directories.
if (files[i].isDir()) {
continue;
}
Path p = files[i].getPath();
final Path p = files[i].getPath();
// Check for empty file. Should never be the case but can happen
// after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646
if (this.fs.getFileStatus(p).getLen() <= 0) {
LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?");
continue;
}
StoreFile curfile = new StoreFile(fs, p, this.conf, this.cacheConf,
this.family.getBloomFilterType());
passSchemaMetricsTo(curfile);
curfile.createReader();
long length = curfile.getReader().length();
// open each store file in parallel
completionService.submit(new Callable<StoreFile>() {
public StoreFile call() throws IOException {
StoreFile storeFile = new StoreFile(fs, p, conf, cacheConf,
family.getBloomFilterType());
passSchemaMetricsTo(storeFile);
storeFile.createReader();
return storeFile;
}
});
totalValidStoreFile++;
}
try {
for (int i = 0; i < totalValidStoreFile; i++) {
Future<StoreFile> future = completionService.take();
StoreFile storeFile = future.get();
long length = storeFile.getReader().length();
this.storeSize += length;
this.totalUncompressedBytes += curfile.getReader().getTotalUncompressedBytes();
this.totalUncompressedBytes +=
storeFile.getReader().getTotalUncompressedBytes();
if (LOG.isDebugEnabled()) {
LOG.debug("loaded " + curfile.toStringDetailed());
LOG.debug("loaded " + storeFile.toStringDetailed());
}
results.add(curfile);
results.add(storeFile);
}
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} finally {
storeFileOpenerThreadPool.shutdownNow();
}
return results;
}
@ -499,8 +540,36 @@ public class Store extends SchemaConfigured implements HeapSize {
// Clear so metrics doesn't find them.
storefiles = ImmutableList.of();
for (StoreFile f: result) {
if (!result.isEmpty()) {
// initialize the thread pool for closing store files in parallel.
ThreadPoolExecutor storeFileCloserThreadPool = this.region
.getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
+ this.family.getNameAsString());
// close each store file in parallel
CompletionService<Void> completionService =
new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
for (final StoreFile f : result) {
completionService.submit(new Callable<Void>() {
public Void call() throws IOException {
f.closeReader(true);
return null;
}
});
}
try {
for (int i = 0; i < result.size(); i++) {
Future<Void> future = completionService.take();
future.get();
}
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} finally {
storeFileCloserThreadPool.shutdownNow();
}
}
LOG.debug("closed " + this.storeNameStr);
return result;

View File

@ -19,14 +19,17 @@
*/
package org.apache.hadoop.hbase.util;
import java.io.PrintWriter;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.PrintWriter;
import org.apache.hadoop.util.ReflectionUtils;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.TimeUnit;
/**
* Thread Utility
*/
@ -152,4 +155,25 @@ public class Threads {
}
}
/**
* Create a new CachedThreadPool with a bounded number as the maximum
* thread size in the pool.
*
* @param maxCachedThread the maximum thread could be created in the pool
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @param threadFactory the factory to use when creating new threads
* @return threadPoolExecutor the cachedThreadPool with a bounded number
* as the maximum thread size in the pool.
*/
public static ThreadPoolExecutor getBoundedCachedThreadPool(
int maxCachedThread, long timeout, TimeUnit unit,
ThreadFactory threadFactory) {
ThreadPoolExecutor boundedCachedThreadPool =
new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
// allow the core pool threads timeout and terminate
boundedCachedThreadPool.allowCoreThreadTimeOut(true);
return boundedCachedThreadPool;
}
}