HBASE-9467 write can be totally blocked temporarily by a write-heavy region
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1523881 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8fd9db6d6c
commit
866a2c6c42
|
@ -2511,66 +2511,17 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
/*
|
||||
* Check if resources to support an update.
|
||||
*
|
||||
* Here we synchronize on HRegion, a broad scoped lock. Its appropriate
|
||||
* given we're figuring in here whether this region is able to take on
|
||||
* writes. This is only method with a synchronize (at time of writing),
|
||||
* this and the synchronize on 'this' inside in internalFlushCache to send
|
||||
* the notify.
|
||||
*/
|
||||
* We throw RegionTooBusyException if above memstore limit
|
||||
* and expect client to retry using some kind of backoff
|
||||
*/
|
||||
private void checkResources()
|
||||
throws RegionTooBusyException, InterruptedIOException {
|
||||
|
||||
throws RegionTooBusyException {
|
||||
// If catalog region, do not impose resource constraints or block updates.
|
||||
if (this.getRegionInfo().isMetaRegion()) return;
|
||||
|
||||
boolean blocked = false;
|
||||
long startTime = 0;
|
||||
while (this.memstoreSize.get() > this.blockingMemStoreSize) {
|
||||
if (this.memstoreSize.get() > this.blockingMemStoreSize) {
|
||||
requestFlush();
|
||||
if (!blocked) {
|
||||
startTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
|
||||
"' on region " + Bytes.toStringBinary(getRegionName()) +
|
||||
": memstore size " +
|
||||
StringUtils.humanReadableInt(this.memstoreSize.get()) +
|
||||
" is >= than blocking " +
|
||||
StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size");
|
||||
}
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
long timeToWait = startTime + busyWaitDuration - now;
|
||||
if (timeToWait <= 0L) {
|
||||
final long totalTime = now - startTime;
|
||||
this.updatesBlockedMs.add(totalTime);
|
||||
LOG.info("Failed to unblock updates for region " + this + " '"
|
||||
+ Thread.currentThread().getName() + "' in " + totalTime
|
||||
+ "ms. The region is still busy.");
|
||||
throw new RegionTooBusyException("region is flushing");
|
||||
}
|
||||
blocked = true;
|
||||
synchronized(this) {
|
||||
try {
|
||||
wait(Math.min(timeToWait, threadWakeFrequency));
|
||||
} catch (InterruptedException ie) {
|
||||
final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
|
||||
if (totalTime > 0) {
|
||||
this.updatesBlockedMs.add(totalTime);
|
||||
}
|
||||
LOG.info("Interrupted while waiting to unblock updates for region "
|
||||
+ this + " '" + Thread.currentThread().getName() + "'");
|
||||
InterruptedIOException iie = new InterruptedIOException();
|
||||
iie.initCause(ie);
|
||||
throw iie;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (blocked) {
|
||||
// Add in the blocked time if appropriate
|
||||
final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
|
||||
if(totalTime > 0 ){
|
||||
this.updatesBlockedMs.add(totalTime);
|
||||
}
|
||||
LOG.info("Unblocking updates for region " + this + " '"
|
||||
+ Thread.currentThread().getName() + "'");
|
||||
throw new RegionTooBusyException("above memstore limit");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4661,6 +4612,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
long txid = 0;
|
||||
|
||||
checkReadOnly();
|
||||
checkResources();
|
||||
// Lock row
|
||||
startRegionOperation(Operation.APPEND);
|
||||
this.writeRequestsCount.increment();
|
||||
|
@ -4835,6 +4787,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
long txid = 0;
|
||||
|
||||
checkReadOnly();
|
||||
checkResources();
|
||||
// Lock row
|
||||
startRegionOperation(Operation.INCREMENT);
|
||||
this.writeRequestsCount.increment();
|
||||
|
|
|
@ -1655,8 +1655,19 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
Put put = new Put(k);
|
||||
put.add(f, null, k);
|
||||
if (r.getLog() == null) put.setDurability(Durability.SKIP_WAL);
|
||||
r.put(put);
|
||||
rowCount++;
|
||||
|
||||
int preRowCount = rowCount;
|
||||
int pause = 10;
|
||||
int maxPause = 1000;
|
||||
while (rowCount == preRowCount) {
|
||||
try {
|
||||
r.put(put);
|
||||
rowCount++;
|
||||
} catch (RegionTooBusyException e) {
|
||||
pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
|
||||
Threads.sleep(pause);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (flush) {
|
||||
|
|
Loading…
Reference in New Issue