diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index d54cda92d90..87f129ebed7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -1195,7 +1195,7 @@ public class ReplicationSourceManager { boolean checkBufferQuota(String peerId) { // try not to go over total quota if (totalBufferUsed.get() > totalBufferLimit) { - LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", + LOG.debug("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", peerId, totalBufferUsed.get(), totalBufferLimit); return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index bd5b7736f3b..eb3658a2c2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -69,6 +70,9 @@ class ReplicationSourceWALReader extends Thread { // position in the WAL to start reading at private long currentPosition; private final long sleepForRetries; + private final long sleepForQuotaCheck; + + private final long logQuotaThrottleInterval; private final int maxRetriesMultiplier; // Indicates whether this particular worker is running @@ -102,6 +106,10 @@ class ReplicationSourceWALReader extends Thread { int batchCount = conf.getInt("replication.source.nb.batches", 1); // 1 second this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); + // 300ms + this.sleepForQuotaCheck = this.conf.getLong("replication.source.sleepforquotacheck", 300); + this.logQuotaThrottleInterval = + this.conf.getLong("replication.source.logintervalforquotathrottle.ms", 3000); // 5 minutes @ 1 sec per this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); @@ -140,9 +148,7 @@ class ReplicationSourceWALReader extends Thread { Threads.sleep(sleepForRetries); continue; } - if (!checkBufferQuota()) { - continue; - } + blockUntilFreeBufferQuota(); Path currentPath = entryStream.getCurrentPath(); WALEntryStream.HasNext hasNext = entryStream.hasNext(); if (hasNext == WALEntryStream.HasNext.NO) { @@ -266,14 +272,19 @@ class ReplicationSourceWALReader extends Thread { return logQueue.getQueue(walGroupId).peek(); } - // returns false if we've already exceeded the global quota - private boolean checkBufferQuota() { - // try not to go over total quota - if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) { - Threads.sleep(sleepForRetries); - return false; + // blocking until buffer quota free + private void blockUntilFreeBufferQuota() { + long start = EnvironmentEdgeManager.currentTime(); + while ( + !this.getSourceManager().checkBufferQuota(this.source.getPeerId()) && isReaderRunning() + ) { + if (EnvironmentEdgeManager.currentTime() - start >= logQuotaThrottleInterval) { + LOG.warn("peer={}, source reader is blocking until buffer quota free, current wal is {}", + this.source.getPeerId(), this.getCurrentPath()); + start = EnvironmentEdgeManager.currentTime(); + } + Threads.sleep(sleepForQuotaCheck); } - return true; } private WALEntryBatch createBatch(WALEntryStream entryStream) {