HBASE-4765 Enhance HLog Metrics
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1200619 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
373c98b0df
commit
6c57076f7e
|
@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -142,15 +143,15 @@ public class HFile {
|
|||
static final char CACHE_KEY_SEPARATOR = '_';
|
||||
|
||||
// For measuring latency of "typical" reads and writes
|
||||
static volatile AtomicLong readOps = new AtomicLong();
|
||||
static volatile AtomicInteger readOps = new AtomicInteger();
|
||||
static volatile AtomicLong readTimeNano = new AtomicLong();
|
||||
static volatile AtomicLong writeOps = new AtomicLong();
|
||||
static volatile AtomicInteger writeOps = new AtomicInteger();
|
||||
static volatile AtomicLong writeTimeNano = new AtomicLong();
|
||||
|
||||
// for test purpose
|
||||
public static volatile AtomicLong dataBlockReadCnt = new AtomicLong(0);
|
||||
|
||||
public static final long getReadOps() {
|
||||
public static final int getReadOps() {
|
||||
return readOps.getAndSet(0);
|
||||
}
|
||||
|
||||
|
@ -158,7 +159,7 @@ public class HFile {
|
|||
return readTimeNano.getAndSet(0) / 1000000;
|
||||
}
|
||||
|
||||
public static final long getWriteOps() {
|
||||
public static final int getWriteOps() {
|
||||
return writeOps.getAndSet(0);
|
||||
}
|
||||
|
||||
|
|
|
@ -191,6 +191,12 @@ public class RegionServerMetrics implements Updater {
|
|||
public final MetricsTimeVaryingRate fsWriteLatency =
|
||||
new MetricsTimeVaryingRate("fsWriteLatency", registry);
|
||||
|
||||
/**
|
||||
* size (in bytes) of data in HLog append calls
|
||||
*/
|
||||
public final MetricsTimeVaryingRate fsWriteSize =
|
||||
new MetricsTimeVaryingRate("fsWriteSize", registry);
|
||||
|
||||
/**
|
||||
* filesystem sync latency
|
||||
*/
|
||||
|
@ -299,19 +305,24 @@ public class RegionServerMetrics implements Updater {
|
|||
// minMax.update(timePerOps);
|
||||
// }
|
||||
// Means you can't pass a numOps of zero or get a ArithmeticException / by zero.
|
||||
int ops = (int)HFile.getReadOps();
|
||||
// HLog metrics
|
||||
addHLogMetric(HLog.getWriteTime(), this.fsWriteLatency);
|
||||
addHLogMetric(HLog.getWriteSize(), this.fsWriteSize);
|
||||
addHLogMetric(HLog.getSyncTime(), this.fsSyncLatency);
|
||||
// HFile metrics
|
||||
int ops = HFile.getReadOps();
|
||||
if (ops != 0) this.fsReadLatency.inc(ops, HFile.getReadTimeMs());
|
||||
ops = (int)HFile.getWriteOps();
|
||||
if (ops != 0) this.fsWriteLatency.inc(ops, HFile.getWriteTimeMs());
|
||||
// mix in HLog metrics
|
||||
ops = (int)HLog.getWriteOps();
|
||||
if (ops != 0) this.fsWriteLatency.inc(ops, HLog.getWriteTime());
|
||||
ops = (int)HLog.getSyncOps();
|
||||
if (ops != 0) this.fsSyncLatency.inc(ops, HLog.getSyncTime());
|
||||
/* NOTE: removed HFile write latency. 2 reasons:
|
||||
* 1) Mixing HLog latencies are far higher priority since they're
|
||||
* on-demand and HFile is used in background (compact/flush)
|
||||
* 2) HFile metrics are being handled at a higher level
|
||||
* by compaction & flush metrics.
|
||||
*/
|
||||
|
||||
// push the result
|
||||
this.fsReadLatency.pushMetric(this.metricsRecord);
|
||||
this.fsWriteLatency.pushMetric(this.metricsRecord);
|
||||
this.fsWriteSize.pushMetric(this.metricsRecord);
|
||||
this.fsSyncLatency.pushMetric(this.metricsRecord);
|
||||
this.compactionTime.pushMetric(this.metricsRecord);
|
||||
this.compactionSize.pushMetric(this.metricsRecord);
|
||||
|
@ -321,10 +332,23 @@ public class RegionServerMetrics implements Updater {
|
|||
this.metricsRecord.update();
|
||||
}
|
||||
|
||||
private void addHLogMetric(HLog.Metric logMetric,
|
||||
MetricsTimeVaryingRate hadoopMetric) {
|
||||
if (logMetric.count > 0)
|
||||
hadoopMetric.inc(logMetric.min);
|
||||
if (logMetric.count > 1)
|
||||
hadoopMetric.inc(logMetric.max);
|
||||
if (logMetric.count > 2) {
|
||||
int ops = logMetric.count - 2;
|
||||
hadoopMetric.inc(ops, logMetric.total - logMetric.max - logMetric.min);
|
||||
}
|
||||
}
|
||||
|
||||
public void resetAllMinMax() {
|
||||
this.atomicIncrementTime.resetMinMax();
|
||||
this.fsReadLatency.resetMinMax();
|
||||
this.fsWriteLatency.resetMinMax();
|
||||
this.fsWriteSize.resetMinMax();
|
||||
this.fsSyncLatency.resetMinMax();
|
||||
}
|
||||
|
||||
|
|
|
@ -262,32 +262,50 @@ public class HLog implements Syncable {
|
|||
}
|
||||
}
|
||||
|
||||
public static class Metric {
|
||||
public long min = Long.MAX_VALUE;
|
||||
public long max = 0;
|
||||
public long total = 0;
|
||||
public int count = 0;
|
||||
|
||||
synchronized void inc(final long val) {
|
||||
min = Math.min(min, val);
|
||||
max = Math.max(max, val);
|
||||
total += val;
|
||||
++count;
|
||||
}
|
||||
|
||||
synchronized Metric get() {
|
||||
Metric copy = new Metric();
|
||||
copy.min = min;
|
||||
copy.max = max;
|
||||
copy.total = total;
|
||||
copy.count = count;
|
||||
this.min = Long.MAX_VALUE;
|
||||
this.max = 0;
|
||||
this.total = 0;
|
||||
this.count = 0;
|
||||
return copy;
|
||||
}
|
||||
}
|
||||
|
||||
// For measuring latency of writes
|
||||
private static volatile long writeOps;
|
||||
private static volatile long writeTime;
|
||||
private static Metric writeTime = new Metric();
|
||||
private static Metric writeSize = new Metric();
|
||||
// For measuring latency of syncs
|
||||
private static AtomicLong syncOps = new AtomicLong();
|
||||
private static AtomicLong syncTime = new AtomicLong();
|
||||
private static Metric syncTime = new Metric();
|
||||
private static AtomicLong syncBatchSize = new AtomicLong();
|
||||
|
||||
public static long getWriteOps() {
|
||||
long ret = writeOps;
|
||||
writeOps = 0;
|
||||
return ret;
|
||||
public static Metric getWriteTime() {
|
||||
return writeTime.get();
|
||||
}
|
||||
|
||||
public static long getWriteTime() {
|
||||
long ret = writeTime;
|
||||
writeTime = 0;
|
||||
return ret;
|
||||
public static Metric getWriteSize() {
|
||||
return writeSize.get();
|
||||
}
|
||||
|
||||
public static long getSyncOps() {
|
||||
return syncOps.getAndSet(0);
|
||||
}
|
||||
|
||||
public static long getSyncTime() {
|
||||
return syncTime.getAndSet(0);
|
||||
public static Metric getSyncTime() {
|
||||
return syncTime.get();
|
||||
}
|
||||
|
||||
public static long getSyncBatchSize() {
|
||||
|
@ -1247,8 +1265,7 @@ public class HLog implements Syncable {
|
|||
}
|
||||
// We try to not acquire the updateLock just to update statistics.
|
||||
// Make these statistics as AtomicLong.
|
||||
syncTime.addAndGet(System.currentTimeMillis() - now);
|
||||
syncOps.incrementAndGet();
|
||||
syncTime.inc(System.currentTimeMillis() - now);
|
||||
if (!this.logRollRunning) {
|
||||
checkLowReplication();
|
||||
if (this.writer.getLength() > this.logrollsize) {
|
||||
|
@ -1379,13 +1396,13 @@ public class HLog implements Syncable {
|
|||
}
|
||||
long took = System.currentTimeMillis() - now;
|
||||
coprocessorHost.postWALWrite(info, logKey, logEdit);
|
||||
writeTime += took;
|
||||
writeOps++;
|
||||
writeTime.inc(took);
|
||||
long len = 0;
|
||||
for (KeyValue kv : logEdit.getKeyValues()) {
|
||||
len += kv.getLength();
|
||||
}
|
||||
writeSize.inc(len);
|
||||
if (took > 1000) {
|
||||
long len = 0;
|
||||
for(KeyValue kv : logEdit.getKeyValues()) {
|
||||
len += kv.getLength();
|
||||
}
|
||||
LOG.warn(String.format(
|
||||
"%s took %d ms appending an edit to hlog; editcount=%d, len~=%s",
|
||||
Thread.currentThread().getName(), took, this.numEntries.get(),
|
||||
|
@ -1504,8 +1521,12 @@ public class HLog implements Syncable {
|
|||
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
|
||||
logSyncerThread.append(new Entry(key, edit));
|
||||
txid = this.unflushedEntries.incrementAndGet();
|
||||
writeTime += System.currentTimeMillis() - now;
|
||||
writeOps++;
|
||||
writeTime.inc(System.currentTimeMillis() - now);
|
||||
long len = 0;
|
||||
for (KeyValue kv : edit.getKeyValues()) {
|
||||
len += kv.getLength();
|
||||
}
|
||||
writeSize.inc(len);
|
||||
this.numEntries.incrementAndGet();
|
||||
}
|
||||
// sync txn to file system
|
||||
|
|
|
@ -348,8 +348,8 @@ public class TestHRegion extends HBaseTestCase {
|
|||
byte[] val = Bytes.toBytes("val");
|
||||
initHRegion(b, getName(), cf);
|
||||
|
||||
HLog.getSyncOps(); // clear counter from prior tests
|
||||
assertEquals(0, HLog.getSyncOps());
|
||||
HLog.getSyncTime(); // clear counter from prior tests
|
||||
assertEquals(0, HLog.getSyncTime().count);
|
||||
|
||||
LOG.info("First a batch put with all valid puts");
|
||||
final Put[] puts = new Put[10];
|
||||
|
@ -364,7 +364,7 @@ public class TestHRegion extends HBaseTestCase {
|
|||
assertEquals(OperationStatusCode.SUCCESS, codes[i]
|
||||
.getOperationStatusCode());
|
||||
}
|
||||
assertEquals(1, HLog.getSyncOps());
|
||||
assertEquals(1, HLog.getSyncTime().count);
|
||||
|
||||
LOG.info("Next a batch put with one invalid family");
|
||||
puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
|
||||
|
@ -374,7 +374,7 @@ public class TestHRegion extends HBaseTestCase {
|
|||
assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY :
|
||||
OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
|
||||
}
|
||||
assertEquals(1, HLog.getSyncOps());
|
||||
assertEquals(1, HLog.getSyncTime().count);
|
||||
|
||||
LOG.info("Next a batch put that has to break into two batches to avoid a lock");
|
||||
Integer lockedRow = region.obtainRowLock(Bytes.toBytes("row_2"));
|
||||
|
@ -395,7 +395,7 @@ public class TestHRegion extends HBaseTestCase {
|
|||
|
||||
LOG.info("...waiting for put thread to sync first time");
|
||||
long startWait = System.currentTimeMillis();
|
||||
while (HLog.getSyncOps() == 0) {
|
||||
while (HLog.getSyncTime().count == 0) {
|
||||
Thread.sleep(100);
|
||||
if (System.currentTimeMillis() - startWait > 10000) {
|
||||
fail("Timed out waiting for thread to sync first minibatch");
|
||||
|
@ -406,7 +406,7 @@ public class TestHRegion extends HBaseTestCase {
|
|||
LOG.info("...joining on thread");
|
||||
ctx.stop();
|
||||
LOG.info("...checking that next batch was synced");
|
||||
assertEquals(1, HLog.getSyncOps());
|
||||
assertEquals(1, HLog.getSyncTime().count);
|
||||
codes = retFromThread.get();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY :
|
||||
|
@ -430,7 +430,7 @@ public class TestHRegion extends HBaseTestCase {
|
|||
OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
|
||||
}
|
||||
// Make sure we didn't do an extra batch
|
||||
assertEquals(1, HLog.getSyncOps());
|
||||
assertEquals(1, HLog.getSyncTime().count);
|
||||
|
||||
// Make sure we still hold lock
|
||||
assertTrue(region.isRowLocked(lockedRow));
|
||||
|
|
Loading…
Reference in New Issue