HBASE-6974 Metric for blocked updates (Michael Drzal)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1399880 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
952ec5521d
commit
c951cfbd31
|
@ -245,6 +245,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
final Counter checkAndMutateChecksFailed = new Counter();
|
||||
final Counter readRequestsCount = new Counter();
|
||||
final Counter writeRequestsCount = new Counter();
|
||||
final Counter updatesBlockedMs = new Counter();
|
||||
|
||||
/**
|
||||
* The directory for the table this region is part of.
|
||||
|
@ -2493,9 +2494,11 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
if (this.getRegionInfo().isMetaRegion()) return;
|
||||
|
||||
boolean blocked = false;
|
||||
long startTime = 0;
|
||||
while (this.memstoreSize.get() > this.blockingMemStoreSize) {
|
||||
requestFlush();
|
||||
if (!blocked) {
|
||||
startTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
|
||||
"' on region " + Bytes.toStringBinary(getRegionName()) +
|
||||
": memstore size " +
|
||||
|
@ -2508,11 +2511,16 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
try {
|
||||
wait(threadWakeFrequency);
|
||||
} catch (InterruptedException e) {
|
||||
// continue;
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (blocked) {
|
||||
// Add in the blocked time if appropriate
|
||||
final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
|
||||
if(totalTime > 0 ){
|
||||
this.updatesBlockedMs.add(totalTime);
|
||||
}
|
||||
LOG.info("Unblocking updates for region " + this + " '"
|
||||
+ Thread.currentThread().getName() + "'");
|
||||
}
|
||||
|
@ -4932,7 +4940,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||
ClassSize.OBJECT +
|
||||
ClassSize.ARRAY +
|
||||
39 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
|
||||
40 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
|
||||
(7 * Bytes.SIZEOF_LONG) +
|
||||
Bytes.SIZEOF_BOOLEAN);
|
||||
|
||||
|
|
|
@ -1465,6 +1465,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
long totalStaticBloomSize = 0;
|
||||
long numPutsWithoutWAL = 0;
|
||||
long dataInMemoryWithoutWAL = 0;
|
||||
long updatesBlockedMs = 0;
|
||||
|
||||
// Note that this is a map of Doubles instead of Longs. This is because we
|
||||
// do effective integer division, which would perhaps truncate more than it
|
||||
|
@ -1483,6 +1484,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
writeRequestsCount += r.writeRequestsCount.get();
|
||||
checkAndMutateChecksFailed += r.checkAndMutateChecksFailed.get();
|
||||
checkAndMutateChecksPassed += r.checkAndMutateChecksPassed.get();
|
||||
updatesBlockedMs += r.updatesBlockedMs.get();
|
||||
synchronized (r.stores) {
|
||||
stores += r.stores.size();
|
||||
for (Map.Entry<byte[], Store> ee : r.stores.entrySet()) {
|
||||
|
@ -1562,6 +1564,11 @@ public class HRegionServer implements ClientProtocol,
|
|||
.getCompactionQueueSize());
|
||||
this.metrics.flushQueueSize.set(cacheFlusher
|
||||
.getFlushQueueSize());
|
||||
this.metrics.updatesBlockedSeconds.update(updatesBlockedMs > 0 ?
|
||||
updatesBlockedMs/1000: 0);
|
||||
final long updatesBlockedMsHigherWater = cacheFlusher.getUpdatesBlockedMsHighWater().get();
|
||||
this.metrics.updatesBlockedSecondsHighWater.update(updatesBlockedMsHigherWater > 0 ?
|
||||
updatesBlockedMsHigherWater/1000: 0);
|
||||
|
||||
BlockCache blockCache = cacheConfig.getBlockCache();
|
||||
if (blockCache != null) {
|
||||
|
|
|
@ -42,8 +42,10 @@ import org.apache.hadoop.hbase.DroppedSnapshotException;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.cliffc.high_scale_lib.Counter;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
|
@ -83,6 +85,7 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
|||
"hbase.regionserver.global.memstore.lowerLimit";
|
||||
private long blockingStoreFilesNumber;
|
||||
private long blockingWaitTime;
|
||||
private final Counter updatesBlockedMsHighWater = new Counter();
|
||||
|
||||
/**
|
||||
* @param conf
|
||||
|
@ -141,6 +144,10 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
|||
return (long)(max * effectiveLimit);
|
||||
}
|
||||
|
||||
public Counter getUpdatesBlockedMsHighWater() {
|
||||
return this.updatesBlockedMsHighWater;
|
||||
}
|
||||
|
||||
/**
|
||||
* The memstore across all regions has exceeded the low water mark. Pick
|
||||
* one region to flush and flush it synchronously (this is called from the
|
||||
|
@ -450,11 +457,22 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
|||
* to the lower limit. This method blocks callers until we're down to a safe
|
||||
* amount of memstore consumption.
|
||||
*/
|
||||
public synchronized void reclaimMemStoreMemory() {
|
||||
public void reclaimMemStoreMemory() {
|
||||
if (isAboveHighWaterMark()) {
|
||||
lock.lock();
|
||||
try {
|
||||
boolean blocked = false;
|
||||
long startTime = 0;
|
||||
while (isAboveHighWaterMark() && !server.isStopped()) {
|
||||
if(!blocked){
|
||||
startTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
LOG.info("Blocking updates on " + server.toString() +
|
||||
": the global memstore size " +
|
||||
StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) +
|
||||
" is >= than blocking " +
|
||||
StringUtils.humanReadableInt(globalMemStoreLimit) + " size");
|
||||
}
|
||||
blocked = true;
|
||||
wakeupFlushThread();
|
||||
try {
|
||||
// we should be able to wait forever, but we've seen a bug where
|
||||
|
@ -464,6 +482,13 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
|||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
if(blocked){
|
||||
final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
|
||||
if(totalTime > 0){
|
||||
this.updatesBlockedMsHighWater.add(totalTime);
|
||||
}
|
||||
LOG.info("Unblocking updates for server " + server.toString());
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
|
|
@ -310,6 +310,18 @@ public class RegionServerMetrics implements Updater {
|
|||
public final MetricsLongValue checksumFailuresCount =
|
||||
new MetricsLongValue("checksumFailuresCount", registry);
|
||||
|
||||
/**
|
||||
* time blocked on lack of resources
|
||||
*/
|
||||
public final MetricsHistogram updatesBlockedSeconds = new MetricsHistogram(
|
||||
"updatesBlockedSeconds", registry);
|
||||
|
||||
/**
|
||||
* time blocked on memstoreHW
|
||||
*/
|
||||
public final MetricsHistogram updatesBlockedSecondsHighWater = new MetricsHistogram(
|
||||
"updatesBlockedSecondsHighWater",registry);
|
||||
|
||||
public RegionServerMetrics() {
|
||||
MetricsContext context = MetricsUtil.getContext("hbase");
|
||||
metricsRecord = MetricsUtil.createRecord(context, "regionserver");
|
||||
|
@ -449,6 +461,8 @@ public class RegionServerMetrics implements Updater {
|
|||
this.regionSplitSuccessCount.pushMetric(this.metricsRecord);
|
||||
this.regionSplitFailureCount.pushMetric(this.metricsRecord);
|
||||
this.checksumFailuresCount.pushMetric(this.metricsRecord);
|
||||
this.updatesBlockedSeconds.pushMetric(this.metricsRecord);
|
||||
this.updatesBlockedSecondsHighWater.pushMetric(this.metricsRecord);
|
||||
}
|
||||
this.metricsRecord.update();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue