diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index ae690f8551c..0ad3613af2f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.StringUtils; +import org.apache.htrace.core.TraceScope; /** * The request processing logic, which is usually executed in thread pools provided by an @@ -115,6 +116,7 @@ public class CallRunner { String error = null; Pair resultPair = null; RpcServer.CurCall.set(call); + TraceScope traceScope = null; try { if (!this.rpcServer.isStarted()) { InetSocketAddress address = rpcServer.getListenerAddress(); @@ -125,7 +127,7 @@ public class CallRunner { call.getService() != null ? call.getService().getDescriptorForType().getName() : ""; String methodName = (call.getMethod() != null) ? call.getMethod().getName() : ""; String traceString = serviceName + "." + methodName; - TraceUtil.createTrace(traceString); + traceScope = TraceUtil.createTrace(traceString); // make the call resultPair = this.rpcServer.call(call, this.status); } catch (TimeoutIOException e){ @@ -147,6 +149,9 @@ public class CallRunner { throw (Error)e; } } finally { + if (traceScope != null) { + traceScope.close(); + } RpcServer.CurCall.set(null); if (resultPair != null) { this.rpcServer.addCallSize(call.getSize() * -1); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index b8c80515acd..a53008de3d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -693,79 +693,81 @@ class MemStoreFlusher implements FlushRequester { * amount of memstore consumption. */ public void reclaimMemStoreMemory() { - TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory"); - FlushType flushType = isAboveHighWaterMark(); - if (flushType != FlushType.NORMAL) { - TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark."); - long start = EnvironmentEdgeManager.currentTime(); - synchronized (this.blockSignal) { - boolean blocked = false; - long startTime = 0; - boolean interrupted = false; - try { - flushType = isAboveHighWaterMark(); - while (flushType != FlushType.NORMAL && !server.isStopped()) { - server.cacheFlusher.setFlushType(flushType); - if (!blocked) { - startTime = EnvironmentEdgeManager.currentTime(); - if (!server.getRegionServerAccounting().isOffheap()) { - logMsg("global memstore heapsize", - server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), - server.getRegionServerAccounting().getGlobalMemStoreLimit()); - } else { - switch (flushType) { - case ABOVE_OFFHEAP_HIGHER_MARK: - logMsg("the global offheap memstore datasize", - server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), - server.getRegionServerAccounting().getGlobalMemStoreLimit()); - break; - case ABOVE_ONHEAP_HIGHER_MARK: + try (TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory")) { + FlushType flushType = isAboveHighWaterMark(); + if (flushType != FlushType.NORMAL) { + TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark."); + long start = EnvironmentEdgeManager.currentTime(); + long nextLogTimeMs = start; + synchronized (this.blockSignal) { + boolean blocked = false; + long startTime = 0; + boolean interrupted = false; + try { + flushType = isAboveHighWaterMark(); + while (flushType != FlushType.NORMAL && !server.isStopped()) { + server.cacheFlusher.setFlushType(flushType); + if (!blocked) { + startTime = EnvironmentEdgeManager.currentTime(); + if (!server.getRegionServerAccounting().isOffheap()) { logMsg("global memstore heapsize", server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), - server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit()); - break; - default: - break; + server.getRegionServerAccounting().getGlobalMemStoreLimit()); + } else { + switch (flushType) { + case ABOVE_OFFHEAP_HIGHER_MARK: + logMsg("the global offheap memstore datasize", + server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), + server.getRegionServerAccounting().getGlobalMemStoreLimit()); + break; + case ABOVE_ONHEAP_HIGHER_MARK: + logMsg("global memstore heapsize", + server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), + server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit()); + break; + default: + break; + } } } + blocked = true; + wakeupFlushThread(); + try { + // we should be able to wait forever, but we've seen a bug where + // we miss a notify, so put a 5 second bound on it at least. + blockSignal.wait(5 * 1000); + } catch (InterruptedException ie) { + LOG.warn("Interrupted while waiting"); + interrupted = true; + } + long nowMs = EnvironmentEdgeManager.currentTime(); + if (nowMs >= nextLogTimeMs) { + LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start); + nextLogTimeMs = nowMs + 1000; + } + flushType = isAboveHighWaterMark(); } - blocked = true; - wakeupFlushThread(); - try { - // we should be able to wait forever, but we've seen a bug where - // we miss a notify, so put a 5 second bound on it at least. - blockSignal.wait(5 * 1000); - } catch (InterruptedException ie) { - LOG.warn("Interrupted while waiting"); - interrupted = true; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); } - long took = EnvironmentEdgeManager.currentTime() - start; - LOG.warn("Memstore is above high water mark and block " + took + "ms"); - flushType = isAboveHighWaterMark(); } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - if(blocked){ - final long totalTime = EnvironmentEdgeManager.currentTime() - startTime; - if(totalTime > 0){ - this.updatesBlockedMsHighWater.add(totalTime); + if(blocked){ + final long totalTime = EnvironmentEdgeManager.currentTime() - startTime; + if(totalTime > 0){ + this.updatesBlockedMsHighWater.add(totalTime); + } + LOG.info("Unblocking updates for server " + server.toString()); } - LOG.info("Unblocking updates for server " + server.toString()); + } + } else { + flushType = isAboveLowWaterMark(); + if (flushType != FlushType.NORMAL) { + server.cacheFlusher.setFlushType(flushType); + wakeupFlushThread(); } } - } else { - flushType = isAboveLowWaterMark(); - if (flushType != FlushType.NORMAL) { - server.cacheFlusher.setFlushType(flushType); - wakeupFlushThread(); - } - } - if(scope!= null) { - scope.close(); } }