HBASE-2752 Don't retry forever when waiting on too many store files

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@956183 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-06-19 05:39:47 +00:00
parent 13cf63cac5
commit 9e73f76604
4 changed files with 146 additions and 135 deletions

View File

@ -395,6 +395,7 @@ Release 0.21.0 - Unreleased
same timestamp in MemStore same timestamp in MemStore
HBASE-2725 Shutdown hook management is gone in trunk; restore HBASE-2725 Shutdown hook management is gone in trunk; restore
HBASE-2740 NPE in ReadWriteConsistencyControl HBASE-2740 NPE in ReadWriteConsistencyControl
HBASE-2752 Don't retry forever when waiting on too many store files
IMPROVEMENTS IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable HBASE-1760 Cleanup TODOs in HTable

View File

@ -1080,7 +1080,7 @@ public class HRegion implements HeapSize { // , Writable{
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
long now = EnvironmentEdgeManager.currentTimeMillis(); long now = EnvironmentEdgeManager.currentTimeMillis();
LOG.debug("Finished memstore flush of ~" + LOG.info("Finished memstore flush of ~" +
StringUtils.humanReadableInt(currentMemStoreSize) + " for region " + StringUtils.humanReadableInt(currentMemStoreSize) + " for region " +
this + " in " + (now - startTime) + "ms, sequence id=" + sequenceId + this + " in " + (now - startTime) + "ms, sequence id=" + sequenceId +
", compaction requested=" + compactionRequested); ", compaction requested=" + compactionRequested);

View File

@ -32,10 +32,12 @@ import java.io.IOException;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.ConcurrentModificationException; import java.util.ConcurrentModificationException;
import java.util.HashSet; import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -50,10 +52,12 @@ import java.util.concurrent.locks.ReentrantLock;
*/ */
class MemStoreFlusher extends Thread implements FlushRequester { class MemStoreFlusher extends Thread implements FlushRequester {
static final Log LOG = LogFactory.getLog(MemStoreFlusher.class); static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
private final BlockingQueue<HRegion> flushQueue = // These two data members go together. Any entry in the one must have
new LinkedBlockingQueue<HRegion>(); // a corresponding entry in the other.
private final BlockingQueue<FlushQueueEntry> flushQueue =
private final HashSet<HRegion> regionsInQueue = new HashSet<HRegion>(); new DelayQueue<FlushQueueEntry>();
private final Map<HRegion, FlushQueueEntry> regionsInQueue =
new HashMap<HRegion, FlushQueueEntry>();
private final long threadWakeFrequency; private final long threadWakeFrequency;
private final HRegionServer server; private final HRegionServer server;
@ -98,7 +102,7 @@ class MemStoreFlusher extends Thread implements FlushRequester {
conf.getInt("hbase.hstore.compactionThreshold", 3); conf.getInt("hbase.hstore.compactionThreshold", 3);
} }
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
90000); // default of 180 seconds 90000);
LOG.info("globalMemStoreLimit=" + LOG.info("globalMemStoreLimit=" +
StringUtils.humanReadableInt(this.globalMemStoreLimit) + StringUtils.humanReadableInt(this.globalMemStoreLimit) +
", globalMemStoreLimitLowMark=" + ", globalMemStoreLimitLowMark=" +
@ -133,13 +137,13 @@ class MemStoreFlusher extends Thread implements FlushRequester {
@Override @Override
public void run() { public void run() {
while (!this.server.isStopRequested()) { while (!this.server.isStopRequested()) {
HRegion r = null; FlushQueueEntry fqe = null;
try { try {
r = this.flushQueue.poll(this.threadWakeFrequency, TimeUnit.MILLISECONDS); fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
if (r == null) { if (fqe == null) {
continue; continue;
} }
if (!flushRegion(r, false)) { if (!flushRegion(fqe)) {
break; break;
} }
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
@ -148,7 +152,7 @@ class MemStoreFlusher extends Thread implements FlushRequester {
continue; continue;
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Cache flush failed" + LOG.error("Cache flush failed" +
(r != null ? (" for region " + Bytes.toString(r.getRegionName())) : ""), (fqe != null ? (" for region " + Bytes.toString(fqe.region.getRegionName())) : ""),
ex); ex);
if (!server.checkFileSystem()) { if (!server.checkFileSystem()) {
break; break;
@ -162,9 +166,12 @@ class MemStoreFlusher extends Thread implements FlushRequester {
public void request(HRegion r) { public void request(HRegion r) {
synchronized (regionsInQueue) { synchronized (regionsInQueue) {
if (!regionsInQueue.contains(r)) { if (!regionsInQueue.containsKey(r)) {
regionsInQueue.add(r); // This entry has no delay so it will be added at the top of the flush
flushQueue.add(r); // queue. It'll come out near immediately.
FlushQueueEntry fqe = new FlushQueueEntry(r);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
} }
} }
} }
@ -181,78 +188,65 @@ class MemStoreFlusher extends Thread implements FlushRequester {
} }
} }
/*
* A flushRegion that checks store file count. If too many, puts the flush
* on delay queue to retry later.
* @param fqe
* @return true if the region was successfully flushed, false otherwise. If
* false, there will be accompanying log messages explaining why the log was
* not flushed.
*/
private boolean flushRegion(final FlushQueueEntry fqe) {
HRegion region = fqe.region;
if (!fqe.region.getRegionInfo().isMetaRegion() &&
isTooManyStoreFiles(region)) {
if (fqe.isMaximumWait(this.blockingWaitTime)) {
LOG.info("Waited " + (System.currentTimeMillis() - fqe.createTime) +
"ms on a compaction to clean up 'too many store files'; waited " +
"long enough... proceeding with flush of " +
region.getRegionNameAsString());
} else {
// If this is first time we've been put off, then emit a log message.
if (fqe.getRequeueCount() <= 0) {
// Note: We don't impose blockingStoreFiles constraint on meta regions
LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
"store files; delaying flush up to " + this.blockingWaitTime + "ms");
}
this.server.compactSplitThread.compactionRequested(region, getName());
// Put back on the queue. Have it come back out of the queue
// after a delay of this.blockingWaitTime / 100 ms.
this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
// Tell a lie, it's not flushed but it's ok
return true;
}
}
return flushRegion(region, false);
}
/* /*
* Flush a region. * Flush a region.
* * @param region Region to flush.
* @param region the region to be flushed * @param emergencyFlush Set if we are being force flushed. If true the region
* @param removeFromQueue True if the region needs to be removed from the * needs to be removed from the flush queue. If false, when we were called
* flush queue. False if called from the main flusher run loop and true if * from the main flusher run loop and we got the entry to flush by calling
* called from flushSomeRegions to relieve memory pressure from the region * poll on the flush queue (which removed it).
* server. If <code>true</code>, we are in a state of emergency; we are not
* taking on updates regionserver-wide, not until memory is flushed. In this
* case, do not let a compaction run inline with blocked updates. Compactions
* can take a long time. Stopping compactions, there is a danger that number
* of flushes will overwhelm compaction on a busy server; we'll have to see.
* That compactions do not run when called out of flushSomeRegions means that
* compactions can be reported by the historian without danger of deadlock
* (HBASE-670).
*
* <p>In the main run loop, regions have already been removed from the flush
* queue, and if this method is called for the relief of memory pressure,
* this may not be necessarily true. We want to avoid trying to remove
* region from the queue because if it has already been removed, it requires a
* sequential scan of the queue to determine that it is not in the queue.
*
* <p>If called from flushSomeRegions, the region may be in the queue but
* it may have been determined that the region had a significant amount of
* memory in use and needed to be flushed to relieve memory pressure. In this
* case, its flush may preempt the pending request in the queue, and if so,
* it needs to be removed from the queue to avoid flushing the region
* multiple times.
* *
* @return true if the region was successfully flushed, false otherwise. If * @return true if the region was successfully flushed, false otherwise. If
* false, there will be accompanying log messages explaining why the log was * false, there will be accompanying log messages explaining why the log was
* not flushed. * not flushed.
*/ */
private boolean flushRegion(HRegion region, boolean removeFromQueue) { private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
// if removeFromQueue, then we come from flushSomeRegions and we need synchronized (this.regionsInQueue) {
// to block if there's too many store files. Else, we don't want to hang FlushQueueEntry fqe = this.regionsInQueue.remove(region);
// the main flushing thread so we'll just the region at the end of the if (fqe != null && emergencyFlush) {
// queue if there's too many files. // Need to remove from region from delay queue. When NOT an
if (removeFromQueue) { // emergencyFlush, then item was removed via a flushQueue.poll.
checkStoreFileCount(region); flushQueue.remove(fqe);
} else if ((!region.getRegionInfo().isMetaRegion()) && }
isTooManyStoreFiles(region)) { lock.lock();
// Note: We don't impose blockingStoreFiles constraint on meta regions
LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
"store files, putting it back at the end of the flush queue.");
server.compactSplitThread.compactionRequested(region, getName());
// If there's only this item in the queue or they are all in this
// situation, we will loop at lot. Sleep a bit.
try {
Thread.sleep(1000);
} catch (InterruptedException e) { } // just continue
flushQueue.add(region);
// Tell a lie, it's not flushed but it's ok
return true;
}
synchronized (regionsInQueue) {
// See comment above for removeFromQueue on why we do not
// take the region out of the set. If removeFromQueue is true, remove it
// from the queue too if it is there. This didn't used to be a
// constraint, but now that HBASE-512 is in play, we need to try and
// limit double-flushing of regions.
if (regionsInQueue.remove(region) && removeFromQueue) {
flushQueue.remove(region);
}
lock.lock();
} }
try { try {
// See comment above for removeFromQueue on why we do not if (region.flushcache()) {
// compact if removeFromQueue is true. Note that region.flushCache()
// only returns true if a flush is done and if a compaction is needed.
if (region.flushcache() && !removeFromQueue) {
server.compactSplitThread.compactionRequested(region, getName()); server.compactSplitThread.compactionRequested(region, getName());
} }
} catch (DroppedSnapshotException ex) { } catch (DroppedSnapshotException ex) {
@ -264,65 +258,18 @@ class MemStoreFlusher extends Thread implements FlushRequester {
server.abort("Replay of HLog required. Forcing server shutdown", ex); server.abort("Replay of HLog required. Forcing server shutdown", ex);
return false; return false;
} catch (IOException ex) { } catch (IOException ex) {
LOG.error("Cache flush failed" LOG.error("Cache flush failed" +
+ (region != null ? (" for region " + Bytes.toString(region.getRegionName())) : ""), (region != null ? (" for region " + Bytes.toString(region.getRegionName())) : ""),
RemoteExceptionHandler.checkIOException(ex)); RemoteExceptionHandler.checkIOException(ex));
if (!server.checkFileSystem()) { if (!server.checkFileSystem()) {
return false; return false;
} }
} finally { } finally {
lock.unlock(); lock.unlock();
} }
return true; return true;
} }
/*
* If too many store files already, schedule a compaction and pause a while
* before going on with compaction.
* @param region Region to check.
*/
private void checkStoreFileCount(final HRegion region) {
// If catalog region, do not ever hold up writes (isMetaRegion returns
// true if ROOT or META region).
if (region.getRegionInfo().isMetaRegion()) return;
int count = 0;
boolean triggered = false;
boolean finished = false;
while (count++ < (blockingWaitTime / 500)) {
finished = true;
for (Store hstore: region.stores.values()) {
if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
// only log once
if (!triggered) {
LOG.info("Too many store files for region " + region + ": " +
hstore.getStorefilesCount() + ", requesting compaction and " +
"waiting");
this.server.compactSplitThread.compactionRequested(region, getName());
triggered = true;
}
// pending compaction, not finished
finished = false;
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// ignore
}
}
}
if (triggered && finished) {
LOG.info("Compaction has completed, we waited " + (count * 500) + "ms, "
+ "finishing flush of region " + region);
break;
}
}
if (triggered && !finished) {
LOG.warn("Tried to hold up flushing for compactions of region " + region +
" but have waited longer than " + blockingWaitTime + "ms, continuing");
}
}
private boolean isTooManyStoreFiles(HRegion region) { private boolean isTooManyStoreFiles(HRegion region) {
for (Store hstore: region.stores.values()) { for (Store hstore: region.stores.values()) {
if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) { if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
@ -381,4 +328,65 @@ class MemStoreFlusher extends Thread implements FlushRequester {
server.compactSplitThread.compactionRequested(region, getName()); server.compactSplitThread.compactionRequested(region, getName());
} }
} }
/**
* Datastructure used in the flush queue. Holds region and retry count.
* Keeps tabs on how old this object is. Implements {@link Delayed}. On
* construction, the delay is zero. When added to a delay queue, we'll come
* out near immediately. Call {@link #requeue(long)} passing delay in
* milliseconds before readding to delay queue if you want it to stay there
* a while.
*/
static class FlushQueueEntry implements Delayed {
private final HRegion region;
private final long createTime;
private long whenToExpire;
private int requeueCount = 0;
FlushQueueEntry(final HRegion r) {
this.region = r;
this.createTime = System.currentTimeMillis();
this.whenToExpire = this.createTime;
}
/**
* @param maximumWait
* @return True if we have been delayed > <code>maximumWait</code> milliseconds.
*/
public boolean isMaximumWait(final long maximumWait) {
return (System.currentTimeMillis() - this.createTime) > maximumWait;
}
/**
* @return Count of times {@link #resetDelay()} was called; i.e this is
* number of times we've been requeued.
*/
public int getRequeueCount() {
return this.requeueCount;
}
/**
* @param when When to expire, when to come up out of the queue.
* Specify in milliseconds. This method adds System.currentTimeMillis()
* to whatever you pass.
* @return This.
*/
public FlushQueueEntry requeue(final long when) {
this.whenToExpire = System.currentTimeMillis() + when;
this.requeueCount++;
return this;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.whenToExpire - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
other.getDelay(TimeUnit.MILLISECONDS)).intValue();
}
}
} }

View File

@ -629,8 +629,8 @@ public class Store implements HeapSize {
this.conf, this.family.getBloomFilterType(), this.inMemory); this.conf, this.family.getBloomFilterType(), this.inMemory);
Reader r = sf.createReader(); Reader r = sf.createReader();
this.storeSize += r.length(); this.storeSize += r.length();
if(LOG.isDebugEnabled()) { if(LOG.isInfoEnabled()) {
LOG.debug("Added " + sf + ", entries=" + r.getEntries() + LOG.info("Added " + sf + ", entries=" + r.getEntries() +
", sequenceid=" + logCacheFlushId + ", sequenceid=" + logCacheFlushId +
", memsize=" + StringUtils.humanReadableInt(flushed) + ", memsize=" + StringUtils.humanReadableInt(flushed) +
", filesize=" + StringUtils.humanReadableInt(r.length()) + ", filesize=" + StringUtils.humanReadableInt(r.length()) +
@ -822,15 +822,17 @@ public class Store implements HeapSize {
} }
// Ready to go. Have list of files to compact. // Ready to go. Have list of files to compact.
LOG.debug("Started compaction of " + filesToCompact.size() + " file(s)" + LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in " +
this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() +
(references? ", hasReferences=true,": " ") + " into " + (references? ", hasReferences=true,": " ") + " into " +
FSUtils.getPath(this.regionCompactionDir) + ", seqid=" + maxId); FSUtils.getPath(this.regionCompactionDir) + ", seqid=" + maxId);
HFile.Writer writer = compact(filesToCompact, majorcompaction, maxId); HFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
// Move the compaction into place. // Move the compaction into place.
StoreFile sf = completeCompaction(filesToCompact, writer); StoreFile sf = completeCompaction(filesToCompact, writer);
if (LOG.isDebugEnabled()) { if (LOG.isInfoEnabled()) {
LOG.debug("Completed" + (majorcompaction? " major ": " ") + LOG.info("Completed" + (majorcompaction? " major ": " ") +
"compaction of " + this.storeNameStr + "compaction of " + filesToCompact.size() + " file(s) in " +
this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() +
"; new storefile is " + (sf == null? "none": sf.toString()) + "; new storefile is " + (sf == null? "none": sf.toString()) +
"; store size is " + StringUtils.humanReadableInt(storeSize)); "; store size is " + StringUtils.humanReadableInt(storeSize));
} }