From 866a2c6c42a1c9625c31cc759518e829c42aea6c Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Tue, 17 Sep 2013 03:34:51 +0000 Subject: [PATCH] HBASE-9467 write can be totally blocked temporarily by a write-heavy region git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1523881 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/regionserver/HRegion.java | 63 +++---------------- .../hadoop/hbase/HBaseTestingUtility.java | 15 ++++- 2 files changed, 21 insertions(+), 57 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 87d6d26e605..786dd97ceb7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2511,66 +2511,17 @@ public class HRegion implements HeapSize { // , Writable{ /* * Check if resources to support an update. * - * Here we synchronize on HRegion, a broad scoped lock. Its appropriate - * given we're figuring in here whether this region is able to take on - * writes. This is only method with a synchronize (at time of writing), - * this and the synchronize on 'this' inside in internalFlushCache to send - * the notify. - */ + * We throw RegionTooBusyException if above memstore limit + * and expect client to retry using some kind of backoff + */ private void checkResources() - throws RegionTooBusyException, InterruptedIOException { - + throws RegionTooBusyException { // If catalog region, do not impose resource constraints or block updates. if (this.getRegionInfo().isMetaRegion()) return; - boolean blocked = false; - long startTime = 0; - while (this.memstoreSize.get() > this.blockingMemStoreSize) { + if (this.memstoreSize.get() > this.blockingMemStoreSize) { requestFlush(); - if (!blocked) { - startTime = EnvironmentEdgeManager.currentTimeMillis(); - LOG.info("Blocking updates for '" + Thread.currentThread().getName() + - "' on region " + Bytes.toStringBinary(getRegionName()) + - ": memstore size " + - StringUtils.humanReadableInt(this.memstoreSize.get()) + - " is >= than blocking " + - StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size"); - } - long now = EnvironmentEdgeManager.currentTimeMillis(); - long timeToWait = startTime + busyWaitDuration - now; - if (timeToWait <= 0L) { - final long totalTime = now - startTime; - this.updatesBlockedMs.add(totalTime); - LOG.info("Failed to unblock updates for region " + this + " '" - + Thread.currentThread().getName() + "' in " + totalTime - + "ms. The region is still busy."); - throw new RegionTooBusyException("region is flushing"); - } - blocked = true; - synchronized(this) { - try { - wait(Math.min(timeToWait, threadWakeFrequency)); - } catch (InterruptedException ie) { - final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime; - if (totalTime > 0) { - this.updatesBlockedMs.add(totalTime); - } - LOG.info("Interrupted while waiting to unblock updates for region " - + this + " '" + Thread.currentThread().getName() + "'"); - InterruptedIOException iie = new InterruptedIOException(); - iie.initCause(ie); - throw iie; - } - } - } - if (blocked) { - // Add in the blocked time if appropriate - final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime; - if(totalTime > 0 ){ - this.updatesBlockedMs.add(totalTime); - } - LOG.info("Unblocking updates for region " + this + " '" - + Thread.currentThread().getName() + "'"); + throw new RegionTooBusyException("above memstore limit"); } } @@ -4661,6 +4612,7 @@ public class HRegion implements HeapSize { // , Writable{ long txid = 0; checkReadOnly(); + checkResources(); // Lock row startRegionOperation(Operation.APPEND); this.writeRequestsCount.increment(); @@ -4835,6 +4787,7 @@ public class HRegion implements HeapSize { // , Writable{ long txid = 0; checkReadOnly(); + checkResources(); // Lock row startRegionOperation(Operation.INCREMENT); this.writeRequestsCount.increment(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index e23a29bd5ac..534d49a651c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1655,8 +1655,19 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { Put put = new Put(k); put.add(f, null, k); if (r.getLog() == null) put.setDurability(Durability.SKIP_WAL); - r.put(put); - rowCount++; + + int preRowCount = rowCount; + int pause = 10; + int maxPause = 1000; + while (rowCount == preRowCount) { + try { + r.put(put); + rowCount++; + } catch (RegionTooBusyException e) { + pause = (pause * 2 >= maxPause) ? maxPause : pause * 2; + Threads.sleep(pause); + } + } } } if (flush) {