HBASE-6466 Enable multi-thread for memstore flush (Chunhui)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1437252 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e488304992
commit
538798fc95
|
@ -208,16 +208,30 @@ public class Threads {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a named {@link ThreadFactory} that just builds daemon threads
|
* Same as {#newDaemonThreadFactory(String, UncaughtExceptionHandler)},
|
||||||
* @param prefix name prefix for all threads created from the factory
|
* without setting the exception handler.
|
||||||
* @return a thread factory that creates named, daemon threads
|
|
||||||
*/
|
*/
|
||||||
public static ThreadFactory newDaemonThreadFactory(final String prefix) {
|
public static ThreadFactory newDaemonThreadFactory(final String prefix) {
|
||||||
|
return newDaemonThreadFactory(prefix, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a named {@link ThreadFactory} that just builds daemon threads.
|
||||||
|
* @param prefix name prefix for all threads created from the factory
|
||||||
|
* @param handler unhandles exception handler to set for all threads
|
||||||
|
* @return a thread factory that creates named, daemon threads with
|
||||||
|
* the supplied exception handler and normal priority
|
||||||
|
*/
|
||||||
|
public static ThreadFactory newDaemonThreadFactory(final String prefix,
|
||||||
|
final UncaughtExceptionHandler handler) {
|
||||||
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
|
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
|
||||||
return new ThreadFactory() {
|
return new ThreadFactory() {
|
||||||
@Override
|
@Override
|
||||||
public Thread newThread(Runnable r) {
|
public Thread newThread(Runnable r) {
|
||||||
Thread t = namedFactory.newThread(r);
|
Thread t = namedFactory.newThread(r);
|
||||||
|
if (handler != null) {
|
||||||
|
t.setUncaughtExceptionHandler(handler);
|
||||||
|
}
|
||||||
if (!t.isDaemon()) {
|
if (!t.isDaemon()) {
|
||||||
t.setDaemon(true);
|
t.setDaemon(true);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1531,8 +1531,7 @@ public class HRegionServer implements ClientProtocol,
|
||||||
|
|
||||||
Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
|
Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
|
||||||
uncaughtExceptionHandler);
|
uncaughtExceptionHandler);
|
||||||
Threads.setDaemonThreadRunning(this.cacheFlusher.getThread(), n + ".cacheFlusher",
|
this.cacheFlusher.start(uncaughtExceptionHandler);
|
||||||
uncaughtExceptionHandler);
|
|
||||||
Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
|
Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
|
||||||
".compactionChecker", uncaughtExceptionHandler);
|
".compactionChecker", uncaughtExceptionHandler);
|
||||||
if (this.healthCheckChore != null) {
|
if (this.healthCheckChore != null) {
|
||||||
|
@ -1790,7 +1789,7 @@ public class HRegionServer implements ClientProtocol,
|
||||||
*/
|
*/
|
||||||
protected void join() {
|
protected void join() {
|
||||||
Threads.shutdown(this.compactionChecker.getThread());
|
Threads.shutdown(this.compactionChecker.getThread());
|
||||||
Threads.shutdown(this.cacheFlusher.getThread());
|
this.cacheFlusher.join();
|
||||||
if (this.healthCheckChore != null) {
|
if (this.healthCheckChore != null) {
|
||||||
Threads.shutdown(this.healthCheckChore.getThread());
|
Threads.shutdown(this.healthCheckChore.getThread());
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.Thread.UncaughtExceptionHandler;
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
import java.util.ConcurrentModificationException;
|
import java.util.ConcurrentModificationException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -29,10 +30,10 @@ import java.util.SortedMap;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.DelayQueue;
|
import java.util.concurrent.DelayQueue;
|
||||||
import java.util.concurrent.Delayed;
|
import java.util.concurrent.Delayed;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.HasThread;
|
import org.apache.hadoop.hbase.util.HasThread;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.cliffc.high_scale_lib.Counter;
|
import org.cliffc.high_scale_lib.Counter;
|
||||||
|
|
||||||
|
@ -59,7 +61,7 @@ import com.google.common.base.Preconditions;
|
||||||
* @see FlushRequester
|
* @see FlushRequester
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
class MemStoreFlusher extends HasThread implements FlushRequester {
|
class MemStoreFlusher implements FlushRequester {
|
||||||
static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
|
static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
|
||||||
// These two data members go together. Any entry in the one must have
|
// These two data members go together. Any entry in the one must have
|
||||||
// a corresponding entry in the other.
|
// a corresponding entry in the other.
|
||||||
|
@ -71,8 +73,8 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
||||||
|
|
||||||
private final long threadWakeFrequency;
|
private final long threadWakeFrequency;
|
||||||
private final HRegionServer server;
|
private final HRegionServer server;
|
||||||
private final ReentrantLock lock = new ReentrantLock();
|
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
private final Condition flushOccurred = lock.newCondition();
|
private final Object blockSignal = new Object();
|
||||||
|
|
||||||
protected final long globalMemStoreLimit;
|
protected final long globalMemStoreLimit;
|
||||||
protected final long globalMemStoreLimitLowMark;
|
protected final long globalMemStoreLimitLowMark;
|
||||||
|
@ -87,6 +89,9 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
||||||
private long blockingWaitTime;
|
private long blockingWaitTime;
|
||||||
private final Counter updatesBlockedMsHighWater = new Counter();
|
private final Counter updatesBlockedMsHighWater = new Counter();
|
||||||
|
|
||||||
|
private FlushHandler[] flushHandlers = null;
|
||||||
|
private int handlerCount;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param conf
|
* @param conf
|
||||||
* @param server
|
* @param server
|
||||||
|
@ -111,6 +116,7 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
||||||
conf.getInt("hbase.hstore.blockingStoreFiles", 7);
|
conf.getInt("hbase.hstore.blockingStoreFiles", 7);
|
||||||
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
|
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
|
||||||
90000);
|
90000);
|
||||||
|
this.handlerCount = conf.getInt("hbase.hstore.flusher.count", 1);
|
||||||
LOG.info("globalMemStoreLimit=" +
|
LOG.info("globalMemStoreLimit=" +
|
||||||
StringUtils.humanReadableInt(this.globalMemStoreLimit) +
|
StringUtils.humanReadableInt(this.globalMemStoreLimit) +
|
||||||
", globalMemStoreLimitLowMark=" +
|
", globalMemStoreLimitLowMark=" +
|
||||||
|
@ -213,37 +219,33 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class FlushHandler extends HasThread {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
while (!this.server.isStopped()) {
|
while (!server.isStopped()) {
|
||||||
FlushQueueEntry fqe = null;
|
FlushQueueEntry fqe = null;
|
||||||
try {
|
try {
|
||||||
wakeupPending.set(false); // allow someone to wake us up again
|
wakeupPending.set(false); // allow someone to wake us up again
|
||||||
fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
|
fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
|
||||||
if (fqe == null || fqe instanceof WakeupFlushThread) {
|
if (fqe == null || fqe instanceof WakeupFlushThread) {
|
||||||
if (isAboveLowWaterMark()) {
|
if (isAboveLowWaterMark()) {
|
||||||
LOG.debug("Flush thread woke up because memory above low water=" +
|
LOG.debug("Flush thread woke up because memory above low water="
|
||||||
StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
|
+ StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
|
||||||
if (!flushOneForGlobalPressure()) {
|
if (!flushOneForGlobalPressure()) {
|
||||||
// Wasn't able to flush any region, but we're above low water mark
|
// Wasn't able to flush any region, but we're above low water mark
|
||||||
// This is unlikely to happen, but might happen when closing the
|
// This is unlikely to happen, but might happen when closing the
|
||||||
// entire server - another thread is flushing regions. We'll just
|
// entire server - another thread is flushing regions. We'll just
|
||||||
// sleep a little bit to avoid spinning, and then pretend that
|
// sleep a little bit to avoid spinning, and then pretend that
|
||||||
// we flushed one, so anyone blocked will check again
|
// we flushed one, so anyone blocked will check again
|
||||||
lock.lock();
|
|
||||||
try {
|
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
flushOccurred.signalAll();
|
wakeUpIfBlocking();
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// Enqueue another one of these tokens so we'll wake up again
|
// Enqueue another one of these tokens so we'll wake up again
|
||||||
wakeupFlushThread();
|
wakeupFlushThread();
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
FlushRegionEntry fre = (FlushRegionEntry)fqe;
|
FlushRegionEntry fre = (FlushRegionEntry) fqe;
|
||||||
if (!flushRegion(fre)) {
|
if (!flushRegion(fre)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -258,18 +260,17 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.regionsInQueue.clear();
|
synchronized (regionsInQueue) {
|
||||||
this.flushQueue.clear();
|
regionsInQueue.clear();
|
||||||
|
flushQueue.clear();
|
||||||
|
}
|
||||||
|
|
||||||
// Signal anyone waiting, so they see the close flag
|
// Signal anyone waiting, so they see the close flag
|
||||||
lock.lock();
|
wakeUpIfBlocking();
|
||||||
try {
|
|
||||||
flushOccurred.signalAll();
|
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
LOG.info(getName() + " exiting");
|
LOG.info(getName() + " exiting");
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private void wakeupFlushThread() {
|
private void wakeupFlushThread() {
|
||||||
if (wakeupPending.compareAndSet(false, true)) {
|
if (wakeupPending.compareAndSet(false, true)) {
|
||||||
|
@ -287,6 +288,10 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (region.writestate.flushing || !region.writestate.writesEnabled) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (checkStoreFileCount && isTooManyStoreFiles(region)) {
|
if (checkStoreFileCount && isTooManyStoreFiles(region)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -332,11 +337,41 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
||||||
* Only interrupt once it's done with a run through the work loop.
|
* Only interrupt once it's done with a run through the work loop.
|
||||||
*/
|
*/
|
||||||
void interruptIfNecessary() {
|
void interruptIfNecessary() {
|
||||||
lock.lock();
|
lock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
this.interrupt();
|
for (FlushHandler flushHander : flushHandlers) {
|
||||||
|
if (flushHander != null) flushHander.interrupt();
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void start(UncaughtExceptionHandler eh) {
|
||||||
|
ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
|
||||||
|
server.getServerName().toString() + "-MemStoreFlusher", eh);
|
||||||
|
flushHandlers = new FlushHandler[handlerCount];
|
||||||
|
for (int i = 0; i < flushHandlers.length; i++) {
|
||||||
|
flushHandlers[i] = new FlushHandler();
|
||||||
|
flusherThreadFactory.newThread(flushHandlers[i]);
|
||||||
|
flushHandlers[i].start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isAlive() {
|
||||||
|
for (FlushHandler flushHander : flushHandlers) {
|
||||||
|
if (flushHander != null && flushHander.isAlive()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void join() {
|
||||||
|
for (FlushHandler flushHander : flushHandlers) {
|
||||||
|
if (flushHander != null) {
|
||||||
|
Threads.shutdown(flushHander.getThread());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -365,7 +400,8 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
||||||
"store files; delaying flush up to " + this.blockingWaitTime + "ms");
|
"store files; delaying flush up to " + this.blockingWaitTime + "ms");
|
||||||
if (!this.server.compactSplitThread.requestSplit(region)) {
|
if (!this.server.compactSplitThread.requestSplit(region)) {
|
||||||
try {
|
try {
|
||||||
this.server.compactSplitThread.requestCompaction(region, getName());
|
this.server.compactSplitThread.requestCompaction(region, Thread
|
||||||
|
.currentThread().getName());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error(
|
LOG.error(
|
||||||
"Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
|
"Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
|
||||||
|
@ -404,8 +440,8 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
||||||
// emergencyFlush, then item was removed via a flushQueue.poll.
|
// emergencyFlush, then item was removed via a flushQueue.poll.
|
||||||
flushQueue.remove(fqe);
|
flushQueue.remove(fqe);
|
||||||
}
|
}
|
||||||
lock.lock();
|
|
||||||
}
|
}
|
||||||
|
lock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
boolean shouldCompact = region.flushcache();
|
boolean shouldCompact = region.flushcache();
|
||||||
// We just want to check the size
|
// We just want to check the size
|
||||||
|
@ -413,7 +449,7 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
||||||
if (shouldSplit) {
|
if (shouldSplit) {
|
||||||
this.server.compactSplitThread.requestSplit(region);
|
this.server.compactSplitThread.requestSplit(region);
|
||||||
} else if (shouldCompact) {
|
} else if (shouldCompact) {
|
||||||
server.compactSplitThread.requestCompaction(region, getName());
|
server.compactSplitThread.requestCompaction(region, Thread.currentThread().getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (DroppedSnapshotException ex) {
|
} catch (DroppedSnapshotException ex) {
|
||||||
|
@ -432,15 +468,18 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
lock.readLock().unlock();
|
||||||
flushOccurred.signalAll();
|
wakeUpIfBlocking();
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void wakeUpIfBlocking() {
|
||||||
|
synchronized (blockSignal) {
|
||||||
|
blockSignal.notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private boolean isTooManyStoreFiles(HRegion region) {
|
private boolean isTooManyStoreFiles(HRegion region) {
|
||||||
for (Store hstore : region.stores.values()) {
|
for (Store hstore : region.stores.values()) {
|
||||||
if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
|
if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
|
||||||
|
@ -458,12 +497,12 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
||||||
*/
|
*/
|
||||||
public void reclaimMemStoreMemory() {
|
public void reclaimMemStoreMemory() {
|
||||||
if (isAboveHighWaterMark()) {
|
if (isAboveHighWaterMark()) {
|
||||||
lock.lock();
|
long start = System.currentTimeMillis();
|
||||||
try {
|
synchronized (this.blockSignal) {
|
||||||
boolean blocked = false;
|
boolean blocked = false;
|
||||||
long startTime = 0;
|
long startTime = 0;
|
||||||
while (isAboveHighWaterMark() && !server.isStopped()) {
|
while (isAboveHighWaterMark() && !server.isStopped()) {
|
||||||
if(!blocked){
|
if (!blocked) {
|
||||||
startTime = EnvironmentEdgeManager.currentTimeMillis();
|
startTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
LOG.info("Blocking updates on " + server.toString() +
|
LOG.info("Blocking updates on " + server.toString() +
|
||||||
": the global memstore size " +
|
": the global memstore size " +
|
||||||
|
@ -476,10 +515,12 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
||||||
try {
|
try {
|
||||||
// we should be able to wait forever, but we've seen a bug where
|
// we should be able to wait forever, but we've seen a bug where
|
||||||
// we miss a notify, so put a 5 second bound on it at least.
|
// we miss a notify, so put a 5 second bound on it at least.
|
||||||
flushOccurred.await(5, TimeUnit.SECONDS);
|
blockSignal.wait(5 * 1000);
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
|
long took = System.currentTimeMillis() - start;
|
||||||
|
LOG.warn("Memstore is above high water mark and block " + took + "ms");
|
||||||
}
|
}
|
||||||
if(blocked){
|
if(blocked){
|
||||||
final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
|
final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
|
||||||
|
@ -488,8 +529,6 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
||||||
}
|
}
|
||||||
LOG.info("Unblocking updates for server " + server.toString());
|
LOG.info("Unblocking updates for server " + server.toString());
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
}
|
||||||
} else if (isAboveLowWaterMark()) {
|
} else if (isAboveLowWaterMark()) {
|
||||||
wakeupFlushThread();
|
wakeupFlushThread();
|
||||||
|
|
|
@ -42,6 +42,9 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
Loading…
Reference in New Issue