HBASE-22132 Backport HBASE-22115 to branch-1 (#395)
Signed-off-by: Reid Chan <reidchan@apache.org> Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
cbb9614433
commit
2be03e130b
|
@ -579,63 +579,64 @@ class MemStoreFlusher implements FlushRequester {
|
|||
* amount of memstore consumption.
|
||||
*/
|
||||
public void reclaimMemStoreMemory() {
|
||||
TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory");
|
||||
if (isAboveHighWaterMark()) {
|
||||
if (Trace.isTracing()) {
|
||||
scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
|
||||
}
|
||||
long start = EnvironmentEdgeManager.currentTime();
|
||||
long nextLogTimeMs = start;
|
||||
synchronized (this.blockSignal) {
|
||||
boolean blocked = false;
|
||||
long startTime = 0;
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
while (isAboveHighWaterMark() && !server.isStopped()) {
|
||||
if (!blocked) {
|
||||
startTime = EnvironmentEdgeManager.currentTime();
|
||||
LOG.info("Blocking updates on "
|
||||
+ server.toString()
|
||||
+ ": the global memstore size "
|
||||
+ TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting()
|
||||
.getGlobalMemstoreSize(), "", 1) + " is >= than blocking "
|
||||
+ TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size");
|
||||
}
|
||||
blocked = true;
|
||||
wakeupFlushThread();
|
||||
try {
|
||||
// we should be able to wait forever, but we've seen a bug where
|
||||
// we miss a notify, so put a 5 second bound on it at least.
|
||||
blockSignal.wait(5 * 1000);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Interrupted while waiting");
|
||||
interrupted = true;
|
||||
}
|
||||
long nowMs = EnvironmentEdgeManager.currentTime();
|
||||
if (nowMs >= nextLogTimeMs) {
|
||||
LOG.warn("Memstore is above high water mark and block " + (nowMs - start) + " ms");
|
||||
nextLogTimeMs = nowMs + 1000;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (interrupted) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
try (TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory")) {
|
||||
if (isAboveHighWaterMark()) {
|
||||
if (Trace.isTracing()) {
|
||||
scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
|
||||
}
|
||||
long start = EnvironmentEdgeManager.currentTime();
|
||||
long nextLogTimeMs = start;
|
||||
synchronized (this.blockSignal) {
|
||||
boolean blocked = false;
|
||||
long startTime = 0;
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
while (isAboveHighWaterMark() && !server.isStopped()) {
|
||||
if (!blocked) {
|
||||
startTime = EnvironmentEdgeManager.currentTime();
|
||||
LOG.info("Blocking updates on "
|
||||
+ server.toString()
|
||||
+ ": the global memstore size "
|
||||
+ TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting()
|
||||
.getGlobalMemstoreSize(), "", 1) + " is >= than blocking "
|
||||
+ TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size");
|
||||
}
|
||||
blocked = true;
|
||||
wakeupFlushThread();
|
||||
try {
|
||||
// we should be able to wait forever, but we've seen a bug where
|
||||
// we miss a notify, so put a 5 second bound on it at least.
|
||||
blockSignal.wait(5 * 1000);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Interrupted while waiting");
|
||||
interrupted = true;
|
||||
}
|
||||
long nowMs = EnvironmentEdgeManager.currentTime();
|
||||
if (nowMs >= nextLogTimeMs) {
|
||||
LOG.warn("Memstore is above high water mark and block " + (nowMs - start) + " ms");
|
||||
nextLogTimeMs = nowMs + 1000;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (interrupted) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
if(blocked){
|
||||
final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
|
||||
if(totalTime > 0){
|
||||
this.updatesBlockedMsHighWater.add(totalTime);
|
||||
if (blocked) {
|
||||
final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
|
||||
if (totalTime > 0) {
|
||||
this.updatesBlockedMsHighWater.add(totalTime);
|
||||
}
|
||||
LOG.info("Unblocking updates for server " + server.toString());
|
||||
}
|
||||
LOG.info("Unblocking updates for server " + server.toString());
|
||||
}
|
||||
} else if (isAboveLowWaterMark()) {
|
||||
wakeupFlushThread();
|
||||
}
|
||||
} else if (isAboveLowWaterMark()) {
|
||||
wakeupFlushThread();
|
||||
}
|
||||
scope.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "flush_queue="
|
||||
|
|
Loading…
Reference in New Issue