HBASE-3483 Memstore lower limit should trigger asynchronous flushes

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1065918 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-02-01 06:20:21 +00:00
parent c8529f0cab
commit 9b0a004017
2 changed files with 138 additions and 50 deletions

View File

@ -69,6 +69,11 @@ Release 0.91.0 - Unreleased
HBASE-3256 Coprocessors: Coprocessor host and observer for HMaster
HBASE-3448 RegionSplitter, utility class to manually split tables
Release 0.90.1 - Unreleased
BUG FIXES
HBASE-3483 Memstore lower limit should trigger asynchronous flushes
Release 0.90.0 - January 19th, 2011
INCOMPATIBLE CHANGES

View File

@ -28,10 +28,12 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
@ -39,6 +41,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
@ -56,12 +60,14 @@ class MemStoreFlusher extends Thread implements FlushRequester {
// a corresponding entry in the other.
private final BlockingQueue<FlushQueueEntry> flushQueue =
new DelayQueue<FlushQueueEntry>();
private final Map<HRegion, FlushQueueEntry> regionsInQueue =
new HashMap<HRegion, FlushQueueEntry>();
private final Map<HRegion, FlushRegionEntry> regionsInQueue =
new HashMap<HRegion, FlushRegionEntry>();
private AtomicBoolean wakeupPending = new AtomicBoolean();
private final long threadWakeFrequency;
private final HRegionServer server;
private final ReentrantLock lock = new ReentrantLock();
private final Condition flushOccurred = lock.newCondition();
protected final long globalMemStoreLimit;
protected final long globalMemStoreLimitLowMark;
@ -133,17 +139,66 @@ class MemStoreFlusher extends Thread implements FlushRequester {
}
return (long)(max * limit);
}
/**
* The memstore across all regions has exceeded the low water mark. Pick
* one region to flush and flush it synchronously (this is called from the
* flush thread)
* @return true if successful
*/
private boolean flushOneForGlobalPressure() {
SortedMap<Long, HRegion> regionsBySize =
server.getCopyOfOnlineRegionsSortedBySize();
// Find the biggest region that doesn't have too many storefiles
HRegion bestFlushableRegion = getBiggestMemstoreRegion(regionsBySize, true);
// Find the biggest region, total, even if it might have too many flushes.
HRegion bestAnyRegion = getBiggestMemstoreRegion(regionsBySize, false);
Preconditions.checkState(bestAnyRegion != null,
"Above memory mark but there are no regions!");
HRegion regionToFlush;
if (bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
// Even if it's not supposed to be flushed, pick a region if it's more than twice
// as big as the best flushable one - otherwise when we're under pressure we make
// lots of little flushes and cause lots of compactions, etc, which just makes
// life worse!
LOG.info("Under global heap pressure: " +
"Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
"store files, but is " +
StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
" vs best flushable region's " +
StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
". Choosing the bigger.");
regionToFlush = bestAnyRegion;
} else {
regionToFlush = bestFlushableRegion;
}
Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
return flushRegion(regionToFlush, true);
}
@Override
public void run() {
while (!this.server.isStopped()) {
FlushQueueEntry fqe = null;
try {
wakeupPending.set(false); // allow someone to wake us up again
fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
if (fqe == null) {
if (fqe == null || fqe instanceof WakeupFlushThread) {
if (isAboveLowWaterMark()) {
LOG.info("Flush thread woke up with memory above low water.");
flushOneForGlobalPressure();
// Enqueue another one of these tokens so we'll wake up again
wakeupFlushThread();
}
continue;
}
if (!flushRegion(fqe)) {
FlushRegionEntry fre = (FlushRegionEntry)fqe;
if (!flushRegion(fre)) {
break;
}
} catch (InterruptedException ex) {
@ -151,9 +206,7 @@ class MemStoreFlusher extends Thread implements FlushRequester {
} catch (ConcurrentModificationException ex) {
continue;
} catch (Exception ex) {
LOG.error("Cache flush failed" +
(fqe != null ? (" for region " + Bytes.toString(fqe.region.getRegionName())) : ""),
ex);
LOG.error("Cache flusher failed for entry " + fqe);
if (!server.checkFileSystem()) {
break;
}
@ -164,12 +217,46 @@ class MemStoreFlusher extends Thread implements FlushRequester {
LOG.info(getName() + " exiting");
}
private void wakeupFlushThread() {
if (wakeupPending.compareAndSet(false, true)) {
flushQueue.add(new WakeupFlushThread());
}
}
private HRegion getBiggestMemstoreRegion(
SortedMap<Long, HRegion> regionsBySize,
boolean checkStoreFileCount) {
synchronized (regionsInQueue) {
for (HRegion region : regionsBySize.values()) {
if (checkStoreFileCount && isTooManyStoreFiles(region)) {
continue;
}
return region;
}
}
return null;
}
/**
* Return true if global memory usage is above the high watermark
*/
private boolean isAboveHighWaterMark() {
return server.getGlobalMemStoreSize() >= globalMemStoreLimit;
}
/**
* Return true if we're above the high watermark
*/
private boolean isAboveLowWaterMark() {
return server.getGlobalMemStoreSize() >= globalMemStoreLimitLowMark;
}
public void requestFlush(HRegion r) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
// This entry has no delay so it will be added at the top of the flush
// queue. It'll come out near immediately.
FlushQueueEntry fqe = new FlushQueueEntry(r);
FlushRegionEntry fqe = new FlushRegionEntry(r);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
}
@ -196,7 +283,7 @@ class MemStoreFlusher extends Thread implements FlushRequester {
* false, there will be accompanying log messages explaining why the log was
* not flushed.
*/
private boolean flushRegion(final FlushQueueEntry fqe) {
private boolean flushRegion(final FlushRegionEntry fqe) {
HRegion region = fqe.region;
if (!fqe.region.getRegionInfo().isMetaRegion() &&
isTooManyStoreFiles(region)) {
@ -237,7 +324,7 @@ class MemStoreFlusher extends Thread implements FlushRequester {
*/
private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
synchronized (this.regionsInQueue) {
FlushQueueEntry fqe = this.regionsInQueue.remove(region);
FlushRegionEntry fqe = this.regionsInQueue.remove(region);
if (fqe != null && emergencyFlush) {
// Need to remove from region from delay queue. When NOT an
// emergencyFlush, then item was removed via a flushQueue.poll.
@ -266,6 +353,7 @@ class MemStoreFlusher extends Thread implements FlushRequester {
return false;
}
} finally {
flushOccurred.signalAll();
lock.unlock();
}
return true;
@ -287,49 +375,38 @@ class MemStoreFlusher extends Thread implements FlushRequester {
* amount of memstore consumption.
*/
public synchronized void reclaimMemStoreMemory() {
if (server.getGlobalMemStoreSize() >= globalMemStoreLimit) {
flushSomeRegions();
if (isAboveHighWaterMark()) {
lock.lock();
try {
while (isAboveHighWaterMark()) {
wakeupFlushThread();
flushOccurred.awaitUninterruptibly();
}
} finally {
lock.unlock();
}
} else if (isAboveLowWaterMark()) {
wakeupFlushThread();
}
}
/*
* Emergency! Need to flush memory.
interface FlushQueueEntry extends Delayed {}
/**
* Token to insert into the flush queue that ensures that the flusher does not sleep
*/
private synchronized void flushSomeRegions() {
// keep flushing until we hit the low water mark
long globalMemStoreSize = -1;
ArrayList<HRegion> regionsToCompact = new ArrayList<HRegion>();
for (SortedMap<Long, HRegion> m =
this.server.getCopyOfOnlineRegionsSortedBySize();
(globalMemStoreSize = server.getGlobalMemStoreSize()) >=
this.globalMemStoreLimitLowMark;) {
// flush the region with the biggest memstore
if (m.size() <= 0) {
LOG.info("No online regions to flush though we've been asked flush " +
"some; globalMemStoreSize=" +
StringUtils.humanReadableInt(globalMemStoreSize) +
", globalMemStoreLimitLowMark=" +
StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
break;
}
HRegion biggestMemStoreRegion = m.remove(m.firstKey());
LOG.info("Forced flushing of " + biggestMemStoreRegion.toString() +
" because global memstore limit of " +
StringUtils.humanReadableInt(this.globalMemStoreLimit) +
" exceeded; currently " +
StringUtils.humanReadableInt(globalMemStoreSize) + " and flushing till " +
StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
if (!flushRegion(biggestMemStoreRegion, true)) {
LOG.warn("Flush failed");
break;
}
regionsToCompact.add(biggestMemStoreRegion);
static class WakeupFlushThread implements FlushQueueEntry {
@Override
public long getDelay(TimeUnit unit) {
return 0;
}
for (HRegion region : regionsToCompact) {
server.compactSplitThread.requestCompaction(region, getName());
@Override
public int compareTo(Delayed o) {
return -1;
}
}
/**
* Datastructure used in the flush queue. Holds region and retry count.
* Keeps tabs on how old this object is. Implements {@link Delayed}. On
@ -338,13 +415,14 @@ class MemStoreFlusher extends Thread implements FlushRequester {
* milliseconds before readding to delay queue if you want it to stay there
* a while.
*/
static class FlushQueueEntry implements Delayed {
static class FlushRegionEntry implements FlushQueueEntry {
private final HRegion region;
private final long createTime;
private long whenToExpire;
private int requeueCount = 0;
FlushQueueEntry(final HRegion r) {
FlushRegionEntry(final HRegion r) {
this.region = r;
this.createTime = System.currentTimeMillis();
this.whenToExpire = this.createTime;
@ -372,7 +450,7 @@ class MemStoreFlusher extends Thread implements FlushRequester {
* to whatever you pass.
* @return This.
*/
public FlushQueueEntry requeue(final long when) {
public FlushRegionEntry requeue(final long when) {
this.whenToExpire = System.currentTimeMillis() + when;
this.requeueCount++;
return this;
@ -389,5 +467,10 @@ class MemStoreFlusher extends Thread implements FlushRequester {
return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
other.getDelay(TimeUnit.MILLISECONDS)).intValue();
}
@Override
public String toString() {
return "[flush region " + Bytes.toString(region.getRegionName()) + "]";
}
}
}