diff --git a/CHANGES.txt b/CHANGES.txt index a9828f56c62..cec907be6fd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -395,6 +395,7 @@ Release 0.21.0 - Unreleased same timestamp in MemStore HBASE-2725 Shutdown hook management is gone in trunk; restore HBASE-2740 NPE in ReadWriteConsistencyControl + HBASE-2752 Don't retry forever when waiting on too many store files IMPROVEMENTS HBASE-1760 Cleanup TODOs in HTable diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 61b8197f2e6..de417384b11 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1080,7 +1080,7 @@ public class HRegion implements HeapSize { // , Writable{ if (LOG.isDebugEnabled()) { long now = EnvironmentEdgeManager.currentTimeMillis(); - LOG.debug("Finished memstore flush of ~" + + LOG.info("Finished memstore flush of ~" + StringUtils.humanReadableInt(currentMemStoreSize) + " for region " + this + " in " + (now - startTime) + "ms, sequence id=" + sequenceId + ", compaction requested=" + compactionRequested); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 124fac8ed83..b45ed0259a7 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -32,10 +32,12 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.ConcurrentModificationException; -import java.util.HashSet; +import java.util.HashMap; +import java.util.Map; import java.util.SortedMap; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -50,10 +52,12 @@ import java.util.concurrent.locks.ReentrantLock; */ class MemStoreFlusher extends Thread implements FlushRequester { static final Log LOG = LogFactory.getLog(MemStoreFlusher.class); - private final BlockingQueue flushQueue = - new LinkedBlockingQueue(); - - private final HashSet regionsInQueue = new HashSet(); + // These two data members go together. Any entry in the one must have + // a corresponding entry in the other. + private final BlockingQueue flushQueue = + new DelayQueue(); + private final Map regionsInQueue = + new HashMap(); private final long threadWakeFrequency; private final HRegionServer server; @@ -98,7 +102,7 @@ class MemStoreFlusher extends Thread implements FlushRequester { conf.getInt("hbase.hstore.compactionThreshold", 3); } this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", - 90000); // default of 180 seconds + 90000); LOG.info("globalMemStoreLimit=" + StringUtils.humanReadableInt(this.globalMemStoreLimit) + ", globalMemStoreLimitLowMark=" + @@ -133,13 +137,13 @@ class MemStoreFlusher extends Thread implements FlushRequester { @Override public void run() { while (!this.server.isStopRequested()) { - HRegion r = null; + FlushQueueEntry fqe = null; try { - r = this.flushQueue.poll(this.threadWakeFrequency, TimeUnit.MILLISECONDS); - if (r == null) { + fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + if (fqe == null) { continue; } - if (!flushRegion(r, false)) { + if (!flushRegion(fqe)) { break; } } catch (InterruptedException ex) { @@ -148,7 +152,7 @@ class MemStoreFlusher extends Thread implements FlushRequester { continue; } catch (Exception ex) { LOG.error("Cache flush failed" + - (r != null ? (" for region " + Bytes.toString(r.getRegionName())) : ""), + (fqe != null ? (" for region " + Bytes.toString(fqe.region.getRegionName())) : ""), ex); if (!server.checkFileSystem()) { break; @@ -162,9 +166,12 @@ class MemStoreFlusher extends Thread implements FlushRequester { public void request(HRegion r) { synchronized (regionsInQueue) { - if (!regionsInQueue.contains(r)) { - regionsInQueue.add(r); - flushQueue.add(r); + if (!regionsInQueue.containsKey(r)) { + // This entry has no delay so it will be added at the top of the flush + // queue. It'll come out near immediately. + FlushQueueEntry fqe = new FlushQueueEntry(r); + this.regionsInQueue.put(r, fqe); + this.flushQueue.add(fqe); } } } @@ -181,78 +188,65 @@ class MemStoreFlusher extends Thread implements FlushRequester { } } + /* + * A flushRegion that checks store file count. If too many, puts the flush + * on delay queue to retry later. + * @param fqe + * @return true if the region was successfully flushed, false otherwise. If + * false, there will be accompanying log messages explaining why the log was + * not flushed. + */ + private boolean flushRegion(final FlushQueueEntry fqe) { + HRegion region = fqe.region; + if (!fqe.region.getRegionInfo().isMetaRegion() && + isTooManyStoreFiles(region)) { + if (fqe.isMaximumWait(this.blockingWaitTime)) { + LOG.info("Waited " + (System.currentTimeMillis() - fqe.createTime) + + "ms on a compaction to clean up 'too many store files'; waited " + + "long enough... proceeding with flush of " + + region.getRegionNameAsString()); + } else { + // If this is first time we've been put off, then emit a log message. + if (fqe.getRequeueCount() <= 0) { + // Note: We don't impose blockingStoreFiles constraint on meta regions + LOG.warn("Region " + region.getRegionNameAsString() + " has too many " + + "store files; delaying flush up to " + this.blockingWaitTime + "ms"); + } + this.server.compactSplitThread.compactionRequested(region, getName()); + // Put back on the queue. Have it come back out of the queue + // after a delay of this.blockingWaitTime / 100 ms. + this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); + // Tell a lie, it's not flushed but it's ok + return true; + } + } + return flushRegion(region, false); + } + /* * Flush a region. - * - * @param region the region to be flushed - * @param removeFromQueue True if the region needs to be removed from the - * flush queue. False if called from the main flusher run loop and true if - * called from flushSomeRegions to relieve memory pressure from the region - * server. If true, we are in a state of emergency; we are not - * taking on updates regionserver-wide, not until memory is flushed. In this - * case, do not let a compaction run inline with blocked updates. Compactions - * can take a long time. Stopping compactions, there is a danger that number - * of flushes will overwhelm compaction on a busy server; we'll have to see. - * That compactions do not run when called out of flushSomeRegions means that - * compactions can be reported by the historian without danger of deadlock - * (HBASE-670). - * - *

In the main run loop, regions have already been removed from the flush - * queue, and if this method is called for the relief of memory pressure, - * this may not be necessarily true. We want to avoid trying to remove - * region from the queue because if it has already been removed, it requires a - * sequential scan of the queue to determine that it is not in the queue. - * - *

If called from flushSomeRegions, the region may be in the queue but - * it may have been determined that the region had a significant amount of - * memory in use and needed to be flushed to relieve memory pressure. In this - * case, its flush may preempt the pending request in the queue, and if so, - * it needs to be removed from the queue to avoid flushing the region - * multiple times. + * @param region Region to flush. + * @param emergencyFlush Set if we are being force flushed. If true the region + * needs to be removed from the flush queue. If false, when we were called + * from the main flusher run loop and we got the entry to flush by calling + * poll on the flush queue (which removed it). * * @return true if the region was successfully flushed, false otherwise. If * false, there will be accompanying log messages explaining why the log was * not flushed. */ - private boolean flushRegion(HRegion region, boolean removeFromQueue) { - // if removeFromQueue, then we come from flushSomeRegions and we need - // to block if there's too many store files. Else, we don't want to hang - // the main flushing thread so we'll just the region at the end of the - // queue if there's too many files. - if (removeFromQueue) { - checkStoreFileCount(region); - } else if ((!region.getRegionInfo().isMetaRegion()) && - isTooManyStoreFiles(region)) { - // Note: We don't impose blockingStoreFiles constraint on meta regions - - LOG.warn("Region " + region.getRegionNameAsString() + " has too many " + - "store files, putting it back at the end of the flush queue."); - server.compactSplitThread.compactionRequested(region, getName()); - // If there's only this item in the queue or they are all in this - // situation, we will loop at lot. Sleep a bit. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { } // just continue - flushQueue.add(region); - // Tell a lie, it's not flushed but it's ok - return true; - } - synchronized (regionsInQueue) { - // See comment above for removeFromQueue on why we do not - // take the region out of the set. If removeFromQueue is true, remove it - // from the queue too if it is there. This didn't used to be a - // constraint, but now that HBASE-512 is in play, we need to try and - // limit double-flushing of regions. - if (regionsInQueue.remove(region) && removeFromQueue) { - flushQueue.remove(region); - } - lock.lock(); + private boolean flushRegion(final HRegion region, final boolean emergencyFlush) { + synchronized (this.regionsInQueue) { + FlushQueueEntry fqe = this.regionsInQueue.remove(region); + if (fqe != null && emergencyFlush) { + // Need to remove from region from delay queue. When NOT an + // emergencyFlush, then item was removed via a flushQueue.poll. + flushQueue.remove(fqe); + } + lock.lock(); } try { - // See comment above for removeFromQueue on why we do not - // compact if removeFromQueue is true. Note that region.flushCache() - // only returns true if a flush is done and if a compaction is needed. - if (region.flushcache() && !removeFromQueue) { + if (region.flushcache()) { server.compactSplitThread.compactionRequested(region, getName()); } } catch (DroppedSnapshotException ex) { @@ -264,65 +258,18 @@ class MemStoreFlusher extends Thread implements FlushRequester { server.abort("Replay of HLog required. Forcing server shutdown", ex); return false; } catch (IOException ex) { - LOG.error("Cache flush failed" - + (region != null ? (" for region " + Bytes.toString(region.getRegionName())) : ""), - RemoteExceptionHandler.checkIOException(ex)); + LOG.error("Cache flush failed" + + (region != null ? (" for region " + Bytes.toString(region.getRegionName())) : ""), + RemoteExceptionHandler.checkIOException(ex)); if (!server.checkFileSystem()) { return false; } } finally { lock.unlock(); } - return true; } - /* - * If too many store files already, schedule a compaction and pause a while - * before going on with compaction. - * @param region Region to check. - */ - private void checkStoreFileCount(final HRegion region) { - // If catalog region, do not ever hold up writes (isMetaRegion returns - // true if ROOT or META region). - if (region.getRegionInfo().isMetaRegion()) return; - - int count = 0; - boolean triggered = false; - boolean finished = false; - while (count++ < (blockingWaitTime / 500)) { - finished = true; - for (Store hstore: region.stores.values()) { - if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) { - // only log once - if (!triggered) { - LOG.info("Too many store files for region " + region + ": " + - hstore.getStorefilesCount() + ", requesting compaction and " + - "waiting"); - this.server.compactSplitThread.compactionRequested(region, getName()); - triggered = true; - } - // pending compaction, not finished - finished = false; - try { - Thread.sleep(500); - } catch (InterruptedException e) { - // ignore - } - } - } - if (triggered && finished) { - LOG.info("Compaction has completed, we waited " + (count * 500) + "ms, " - + "finishing flush of region " + region); - break; - } - } - if (triggered && !finished) { - LOG.warn("Tried to hold up flushing for compactions of region " + region + - " but have waited longer than " + blockingWaitTime + "ms, continuing"); - } - } - private boolean isTooManyStoreFiles(HRegion region) { for (Store hstore: region.stores.values()) { if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) { @@ -381,4 +328,65 @@ class MemStoreFlusher extends Thread implements FlushRequester { server.compactSplitThread.compactionRequested(region, getName()); } } + + /** + * Datastructure used in the flush queue. Holds region and retry count. + * Keeps tabs on how old this object is. Implements {@link Delayed}. On + * construction, the delay is zero. When added to a delay queue, we'll come + * out near immediately. Call {@link #requeue(long)} passing delay in + * milliseconds before readding to delay queue if you want it to stay there + * a while. + */ + static class FlushQueueEntry implements Delayed { + private final HRegion region; + private final long createTime; + private long whenToExpire; + private int requeueCount = 0; + + FlushQueueEntry(final HRegion r) { + this.region = r; + this.createTime = System.currentTimeMillis(); + this.whenToExpire = this.createTime; + } + + /** + * @param maximumWait + * @return True if we have been delayed > maximumWait milliseconds. + */ + public boolean isMaximumWait(final long maximumWait) { + return (System.currentTimeMillis() - this.createTime) > maximumWait; + } + + /** + * @return Count of times {@link #resetDelay()} was called; i.e this is + * number of times we've been requeued. + */ + public int getRequeueCount() { + return this.requeueCount; + } + + /** + * @param when When to expire, when to come up out of the queue. + * Specify in milliseconds. This method adds System.currentTimeMillis() + * to whatever you pass. + * @return This. + */ + public FlushQueueEntry requeue(final long when) { + this.whenToExpire = System.currentTimeMillis() + when; + this.requeueCount++; + return this; + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(this.whenToExpire - System.currentTimeMillis(), + TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(Delayed other) { + return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - + other.getDelay(TimeUnit.MILLISECONDS)).intValue(); + } + } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 1a604da1464..bf70e69ec21 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -629,8 +629,8 @@ public class Store implements HeapSize { this.conf, this.family.getBloomFilterType(), this.inMemory); Reader r = sf.createReader(); this.storeSize += r.length(); - if(LOG.isDebugEnabled()) { - LOG.debug("Added " + sf + ", entries=" + r.getEntries() + + if(LOG.isInfoEnabled()) { + LOG.info("Added " + sf + ", entries=" + r.getEntries() + ", sequenceid=" + logCacheFlushId + ", memsize=" + StringUtils.humanReadableInt(flushed) + ", filesize=" + StringUtils.humanReadableInt(r.length()) + @@ -822,15 +822,17 @@ public class Store implements HeapSize { } // Ready to go. Have list of files to compact. - LOG.debug("Started compaction of " + filesToCompact.size() + " file(s)" + + LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in " + + this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() + (references? ", hasReferences=true,": " ") + " into " + FSUtils.getPath(this.regionCompactionDir) + ", seqid=" + maxId); HFile.Writer writer = compact(filesToCompact, majorcompaction, maxId); // Move the compaction into place. StoreFile sf = completeCompaction(filesToCompact, writer); - if (LOG.isDebugEnabled()) { - LOG.debug("Completed" + (majorcompaction? " major ": " ") + - "compaction of " + this.storeNameStr + + if (LOG.isInfoEnabled()) { + LOG.info("Completed" + (majorcompaction? " major ": " ") + + "compaction of " + filesToCompact.size() + " file(s) in " + + this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() + "; new storefile is " + (sf == null? "none": sf.toString()) + "; store size is " + StringUtils.humanReadableInt(storeSize)); }