From 9b0a00401714426057caa47bf92d46ad7a5411f5 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Tue, 1 Feb 2011 06:20:21 +0000 Subject: [PATCH] HBASE-3483 Memstore lower limit should trigger asynchronous flushes git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1065918 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 5 + .../hbase/regionserver/MemStoreFlusher.java | 183 +++++++++++++----- 2 files changed, 138 insertions(+), 50 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 75527ecd361..e229f41cdfe 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -69,6 +69,11 @@ Release 0.91.0 - Unreleased HBASE-3256 Coprocessors: Coprocessor host and observer for HMaster HBASE-3448 RegionSplitter, utility class to manually split tables +Release 0.90.1 - Unreleased + + BUG FIXES + HBASE-3483 Memstore lower limit should trigger asynchronous flushes + Release 0.90.0 - January 19th, 2011 INCOMPATIBLE CHANGES diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 46239e9a63a..2a9e564a951 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -28,10 +28,12 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StringUtils; +import com.google.common.base.Preconditions; + import java.io.IOException; import java.lang.management.ManagementFactory; -import java.util.ArrayList; import java.util.ConcurrentModificationException; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.SortedMap; @@ -39,6 +41,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** @@ -56,12 +60,14 @@ class MemStoreFlusher extends Thread implements FlushRequester { // a corresponding entry in the other. private final BlockingQueue flushQueue = new DelayQueue(); - private final Map regionsInQueue = - new HashMap(); + private final Map regionsInQueue = + new HashMap(); + private AtomicBoolean wakeupPending = new AtomicBoolean(); private final long threadWakeFrequency; private final HRegionServer server; private final ReentrantLock lock = new ReentrantLock(); + private final Condition flushOccurred = lock.newCondition(); protected final long globalMemStoreLimit; protected final long globalMemStoreLimitLowMark; @@ -133,17 +139,66 @@ class MemStoreFlusher extends Thread implements FlushRequester { } return (long)(max * limit); } + + /** + * The memstore across all regions has exceeded the low water mark. Pick + * one region to flush and flush it synchronously (this is called from the + * flush thread) + * @return true if successful + */ + private boolean flushOneForGlobalPressure() { + SortedMap regionsBySize = + server.getCopyOfOnlineRegionsSortedBySize(); + // Find the biggest region that doesn't have too many storefiles + HRegion bestFlushableRegion = getBiggestMemstoreRegion(regionsBySize, true); + // Find the biggest region, total, even if it might have too many flushes. + HRegion bestAnyRegion = getBiggestMemstoreRegion(regionsBySize, false); + + Preconditions.checkState(bestAnyRegion != null, + "Above memory mark but there are no regions!"); + + HRegion regionToFlush; + if (bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) { + // Even if it's not supposed to be flushed, pick a region if it's more than twice + // as big as the best flushable one - otherwise when we're under pressure we make + // lots of little flushes and cause lots of compactions, etc, which just makes + // life worse! + LOG.info("Under global heap pressure: " + + "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " + + "store files, but is " + + StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) + + " vs best flushable region's " + + StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) + + ". Choosing the bigger."); + regionToFlush = bestAnyRegion; + } else { + regionToFlush = bestFlushableRegion; + } + + Preconditions.checkState(regionToFlush.memstoreSize.get() > 0); + + LOG.info("Flush of region " + regionToFlush + " due to global heap pressure"); + return flushRegion(regionToFlush, true); + } @Override public void run() { while (!this.server.isStopped()) { FlushQueueEntry fqe = null; try { + wakeupPending.set(false); // allow someone to wake us up again fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); - if (fqe == null) { + if (fqe == null || fqe instanceof WakeupFlushThread) { + if (isAboveLowWaterMark()) { + LOG.info("Flush thread woke up with memory above low water."); + flushOneForGlobalPressure(); + // Enqueue another one of these tokens so we'll wake up again + wakeupFlushThread(); + } continue; } - if (!flushRegion(fqe)) { + FlushRegionEntry fre = (FlushRegionEntry)fqe; + if (!flushRegion(fre)) { break; } } catch (InterruptedException ex) { @@ -151,9 +206,7 @@ class MemStoreFlusher extends Thread implements FlushRequester { } catch (ConcurrentModificationException ex) { continue; } catch (Exception ex) { - LOG.error("Cache flush failed" + - (fqe != null ? (" for region " + Bytes.toString(fqe.region.getRegionName())) : ""), - ex); + LOG.error("Cache flusher failed for entry " + fqe); if (!server.checkFileSystem()) { break; } @@ -164,12 +217,46 @@ class MemStoreFlusher extends Thread implements FlushRequester { LOG.info(getName() + " exiting"); } + private void wakeupFlushThread() { + if (wakeupPending.compareAndSet(false, true)) { + flushQueue.add(new WakeupFlushThread()); + } + } + + private HRegion getBiggestMemstoreRegion( + SortedMap regionsBySize, + boolean checkStoreFileCount) { + synchronized (regionsInQueue) { + for (HRegion region : regionsBySize.values()) { + if (checkStoreFileCount && isTooManyStoreFiles(region)) { + continue; + } + return region; + } + } + return null; + } + + /** + * Return true if global memory usage is above the high watermark + */ + private boolean isAboveHighWaterMark() { + return server.getGlobalMemStoreSize() >= globalMemStoreLimit; + } + + /** + * Return true if we're above the high watermark + */ + private boolean isAboveLowWaterMark() { + return server.getGlobalMemStoreSize() >= globalMemStoreLimitLowMark; + } + public void requestFlush(HRegion r) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has no delay so it will be added at the top of the flush // queue. It'll come out near immediately. - FlushQueueEntry fqe = new FlushQueueEntry(r); + FlushRegionEntry fqe = new FlushRegionEntry(r); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); } @@ -196,7 +283,7 @@ class MemStoreFlusher extends Thread implements FlushRequester { * false, there will be accompanying log messages explaining why the log was * not flushed. */ - private boolean flushRegion(final FlushQueueEntry fqe) { + private boolean flushRegion(final FlushRegionEntry fqe) { HRegion region = fqe.region; if (!fqe.region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) { @@ -237,7 +324,7 @@ class MemStoreFlusher extends Thread implements FlushRequester { */ private boolean flushRegion(final HRegion region, final boolean emergencyFlush) { synchronized (this.regionsInQueue) { - FlushQueueEntry fqe = this.regionsInQueue.remove(region); + FlushRegionEntry fqe = this.regionsInQueue.remove(region); if (fqe != null && emergencyFlush) { // Need to remove from region from delay queue. When NOT an // emergencyFlush, then item was removed via a flushQueue.poll. @@ -266,6 +353,7 @@ class MemStoreFlusher extends Thread implements FlushRequester { return false; } } finally { + flushOccurred.signalAll(); lock.unlock(); } return true; @@ -287,49 +375,38 @@ class MemStoreFlusher extends Thread implements FlushRequester { * amount of memstore consumption. */ public synchronized void reclaimMemStoreMemory() { - if (server.getGlobalMemStoreSize() >= globalMemStoreLimit) { - flushSomeRegions(); + if (isAboveHighWaterMark()) { + lock.lock(); + try { + while (isAboveHighWaterMark()) { + wakeupFlushThread(); + flushOccurred.awaitUninterruptibly(); + } + } finally { + lock.unlock(); + } + } else if (isAboveLowWaterMark()) { + wakeupFlushThread(); } } - /* - * Emergency! Need to flush memory. + interface FlushQueueEntry extends Delayed {} + + /** + * Token to insert into the flush queue that ensures that the flusher does not sleep */ - private synchronized void flushSomeRegions() { - // keep flushing until we hit the low water mark - long globalMemStoreSize = -1; - ArrayList regionsToCompact = new ArrayList(); - for (SortedMap m = - this.server.getCopyOfOnlineRegionsSortedBySize(); - (globalMemStoreSize = server.getGlobalMemStoreSize()) >= - this.globalMemStoreLimitLowMark;) { - // flush the region with the biggest memstore - if (m.size() <= 0) { - LOG.info("No online regions to flush though we've been asked flush " + - "some; globalMemStoreSize=" + - StringUtils.humanReadableInt(globalMemStoreSize) + - ", globalMemStoreLimitLowMark=" + - StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark)); - break; - } - HRegion biggestMemStoreRegion = m.remove(m.firstKey()); - LOG.info("Forced flushing of " + biggestMemStoreRegion.toString() + - " because global memstore limit of " + - StringUtils.humanReadableInt(this.globalMemStoreLimit) + - " exceeded; currently " + - StringUtils.humanReadableInt(globalMemStoreSize) + " and flushing till " + - StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark)); - if (!flushRegion(biggestMemStoreRegion, true)) { - LOG.warn("Flush failed"); - break; - } - regionsToCompact.add(biggestMemStoreRegion); + static class WakeupFlushThread implements FlushQueueEntry { + @Override + public long getDelay(TimeUnit unit) { + return 0; } - for (HRegion region : regionsToCompact) { - server.compactSplitThread.requestCompaction(region, getName()); + + @Override + public int compareTo(Delayed o) { + return -1; } } - + /** * Datastructure used in the flush queue. Holds region and retry count. * Keeps tabs on how old this object is. Implements {@link Delayed}. On @@ -338,13 +415,14 @@ class MemStoreFlusher extends Thread implements FlushRequester { * milliseconds before readding to delay queue if you want it to stay there * a while. */ - static class FlushQueueEntry implements Delayed { + static class FlushRegionEntry implements FlushQueueEntry { private final HRegion region; + private final long createTime; private long whenToExpire; private int requeueCount = 0; - FlushQueueEntry(final HRegion r) { + FlushRegionEntry(final HRegion r) { this.region = r; this.createTime = System.currentTimeMillis(); this.whenToExpire = this.createTime; @@ -372,7 +450,7 @@ class MemStoreFlusher extends Thread implements FlushRequester { * to whatever you pass. * @return This. */ - public FlushQueueEntry requeue(final long when) { + public FlushRegionEntry requeue(final long when) { this.whenToExpire = System.currentTimeMillis() + when; this.requeueCount++; return this; @@ -389,5 +467,10 @@ class MemStoreFlusher extends Thread implements FlushRequester { return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS)).intValue(); } + + @Override + public String toString() { + return "[flush region " + Bytes.toString(region.getRegionName()) + "]"; + } } }