HBASE-22115 HBase RPC aspires to grow an infinite tree of trace scopes; some other places are also unsafe

Signed-off-by: Andrew Purtell <apurtell@apache.org>, stack <stack@apache.org>
This commit is contained in:
Sergey Shelukhin 2019-03-29 13:38:11 -07:00 committed by Duo Zhang
parent 46d0e5a05d
commit 78c4e4996a
2 changed files with 70 additions and 63 deletions

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.core.TraceScope;
/** /**
* The request processing logic, which is usually executed in thread pools provided by an * The request processing logic, which is usually executed in thread pools provided by an
@ -115,6 +116,7 @@ public class CallRunner {
String error = null; String error = null;
Pair<Message, CellScanner> resultPair = null; Pair<Message, CellScanner> resultPair = null;
RpcServer.CurCall.set(call); RpcServer.CurCall.set(call);
TraceScope traceScope = null;
try { try {
if (!this.rpcServer.isStarted()) { if (!this.rpcServer.isStarted()) {
InetSocketAddress address = rpcServer.getListenerAddress(); InetSocketAddress address = rpcServer.getListenerAddress();
@ -125,7 +127,7 @@ public class CallRunner {
call.getService() != null ? call.getService().getDescriptorForType().getName() : ""; call.getService() != null ? call.getService().getDescriptorForType().getName() : "";
String methodName = (call.getMethod() != null) ? call.getMethod().getName() : ""; String methodName = (call.getMethod() != null) ? call.getMethod().getName() : "";
String traceString = serviceName + "." + methodName; String traceString = serviceName + "." + methodName;
TraceUtil.createTrace(traceString); traceScope = TraceUtil.createTrace(traceString);
// make the call // make the call
resultPair = this.rpcServer.call(call, this.status); resultPair = this.rpcServer.call(call, this.status);
} catch (TimeoutIOException e){ } catch (TimeoutIOException e){
@ -147,6 +149,9 @@ public class CallRunner {
throw (Error)e; throw (Error)e;
} }
} finally { } finally {
if (traceScope != null) {
traceScope.close();
}
RpcServer.CurCall.set(null); RpcServer.CurCall.set(null);
if (resultPair != null) { if (resultPair != null) {
this.rpcServer.addCallSize(call.getSize() * -1); this.rpcServer.addCallSize(call.getSize() * -1);

View File

@ -693,79 +693,81 @@ class MemStoreFlusher implements FlushRequester {
* amount of memstore consumption. * amount of memstore consumption.
*/ */
public void reclaimMemStoreMemory() { public void reclaimMemStoreMemory() {
TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory"); try (TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory")) {
FlushType flushType = isAboveHighWaterMark(); FlushType flushType = isAboveHighWaterMark();
if (flushType != FlushType.NORMAL) { if (flushType != FlushType.NORMAL) {
TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark."); TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark.");
long start = EnvironmentEdgeManager.currentTime(); long start = EnvironmentEdgeManager.currentTime();
synchronized (this.blockSignal) { long nextLogTimeMs = start;
boolean blocked = false; synchronized (this.blockSignal) {
long startTime = 0; boolean blocked = false;
boolean interrupted = false; long startTime = 0;
try { boolean interrupted = false;
flushType = isAboveHighWaterMark(); try {
while (flushType != FlushType.NORMAL && !server.isStopped()) { flushType = isAboveHighWaterMark();
server.cacheFlusher.setFlushType(flushType); while (flushType != FlushType.NORMAL && !server.isStopped()) {
if (!blocked) { server.cacheFlusher.setFlushType(flushType);
startTime = EnvironmentEdgeManager.currentTime(); if (!blocked) {
if (!server.getRegionServerAccounting().isOffheap()) { startTime = EnvironmentEdgeManager.currentTime();
logMsg("global memstore heapsize", if (!server.getRegionServerAccounting().isOffheap()) {
server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
server.getRegionServerAccounting().getGlobalMemStoreLimit());
} else {
switch (flushType) {
case ABOVE_OFFHEAP_HIGHER_MARK:
logMsg("the global offheap memstore datasize",
server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(),
server.getRegionServerAccounting().getGlobalMemStoreLimit());
break;
case ABOVE_ONHEAP_HIGHER_MARK:
logMsg("global memstore heapsize", logMsg("global memstore heapsize",
server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit()); server.getRegionServerAccounting().getGlobalMemStoreLimit());
break; } else {
default: switch (flushType) {
break; case ABOVE_OFFHEAP_HIGHER_MARK:
logMsg("the global offheap memstore datasize",
server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(),
server.getRegionServerAccounting().getGlobalMemStoreLimit());
break;
case ABOVE_ONHEAP_HIGHER_MARK:
logMsg("global memstore heapsize",
server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit());
break;
default:
break;
}
} }
} }
blocked = true;
wakeupFlushThread();
try {
// we should be able to wait forever, but we've seen a bug where
// we miss a notify, so put a 5 second bound on it at least.
blockSignal.wait(5 * 1000);
} catch (InterruptedException ie) {
LOG.warn("Interrupted while waiting");
interrupted = true;
}
long nowMs = EnvironmentEdgeManager.currentTime();
if (nowMs >= nextLogTimeMs) {
LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
nextLogTimeMs = nowMs + 1000;
}
flushType = isAboveHighWaterMark();
} }
blocked = true; } finally {
wakeupFlushThread(); if (interrupted) {
try { Thread.currentThread().interrupt();
// we should be able to wait forever, but we've seen a bug where
// we miss a notify, so put a 5 second bound on it at least.
blockSignal.wait(5 * 1000);
} catch (InterruptedException ie) {
LOG.warn("Interrupted while waiting");
interrupted = true;
} }
long took = EnvironmentEdgeManager.currentTime() - start;
LOG.warn("Memstore is above high water mark and block " + took + "ms");
flushType = isAboveHighWaterMark();
} }
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
if(blocked){ if(blocked){
final long totalTime = EnvironmentEdgeManager.currentTime() - startTime; final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
if(totalTime > 0){ if(totalTime > 0){
this.updatesBlockedMsHighWater.add(totalTime); this.updatesBlockedMsHighWater.add(totalTime);
}
LOG.info("Unblocking updates for server " + server.toString());
} }
LOG.info("Unblocking updates for server " + server.toString()); }
} else {
flushType = isAboveLowWaterMark();
if (flushType != FlushType.NORMAL) {
server.cacheFlusher.setFlushType(flushType);
wakeupFlushThread();
} }
} }
} else {
flushType = isAboveLowWaterMark();
if (flushType != FlushType.NORMAL) {
server.cacheFlusher.setFlushType(flushType);
wakeupFlushThread();
}
}
if(scope!= null) {
scope.close();
} }
} }