HBASE-27881 The sleep time in checkQuota of replication WAL reader should be controlled independently

This commit is contained in:
haxiaolin 2023-05-24 12:09:12 +08:00
parent 22526a6339
commit 87289066fa
2 changed files with 22 additions and 11 deletions

View File

@ -1195,7 +1195,7 @@ public class ReplicationSourceManager {
boolean checkBufferQuota(String peerId) {
// try not to go over total quota
if (totalBufferUsed.get() > totalBufferLimit) {
LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
LOG.debug("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
peerId, totalBufferUsed.get(), totalBufferLimit);
return false;
}

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@ -69,6 +70,9 @@ class ReplicationSourceWALReader extends Thread {
// position in the WAL to start reading at
private long currentPosition;
private final long sleepForRetries;
private final long sleepForQuotaCheck;
private final long logQuotaThrottleInterval;
private final int maxRetriesMultiplier;
// Indicates whether this particular worker is running
@ -102,6 +106,10 @@ class ReplicationSourceWALReader extends Thread {
int batchCount = conf.getInt("replication.source.nb.batches", 1);
// 1 second
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
// 300ms
this.sleepForQuotaCheck = this.conf.getLong("replication.source.sleepforquotacheck", 300);
this.logQuotaThrottleInterval =
this.conf.getLong("replication.source.logintervalforquotathrottle.ms", 3000);
// 5 minutes @ 1 sec per
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
@ -140,9 +148,7 @@ class ReplicationSourceWALReader extends Thread {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkBufferQuota()) {
continue;
}
blockUntilFreeBufferQuota();
Path currentPath = entryStream.getCurrentPath();
WALEntryStream.HasNext hasNext = entryStream.hasNext();
if (hasNext == WALEntryStream.HasNext.NO) {
@ -266,14 +272,19 @@ class ReplicationSourceWALReader extends Thread {
return logQueue.getQueue(walGroupId).peek();
}
// returns false if we've already exceeded the global quota
private boolean checkBufferQuota() {
// try not to go over total quota
if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) {
Threads.sleep(sleepForRetries);
return false;
// blocking until buffer quota free
private void blockUntilFreeBufferQuota() {
long start = EnvironmentEdgeManager.currentTime();
while (
!this.getSourceManager().checkBufferQuota(this.source.getPeerId()) && isReaderRunning()
) {
if (EnvironmentEdgeManager.currentTime() - start >= logQuotaThrottleInterval) {
LOG.warn("peer={}, source reader is blocking until buffer quota free, current wal is {}",
this.source.getPeerId(), this.getCurrentPath());
start = EnvironmentEdgeManager.currentTime();
}
Threads.sleep(sleepForQuotaCheck);
}
return true;
}
private WALEntryBatch createBatch(WALEntryStream entryStream) {