HBASE-5533 Add more metrics to HBase
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1305499 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1692dde435
commit
8ea1c8ddd6
|
@ -24,8 +24,12 @@ import java.io.DataInput;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
@ -48,14 +52,15 @@ import org.apache.hadoop.hbase.io.HbaseMapWritable;
|
||||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||||
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
|
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
|
||||||
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.SchemaAware;
|
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.SchemaAware;
|
||||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
|
||||||
import org.apache.hadoop.hbase.util.BloomFilterWriter;
|
import org.apache.hadoop.hbase.util.BloomFilterWriter;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.io.RawComparator;
|
import org.apache.hadoop.io.RawComparator;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* File format for hbase.
|
* File format for hbase.
|
||||||
|
@ -165,18 +170,100 @@ public class HFile {
|
||||||
public static final ChecksumType DEFAULT_CHECKSUM_TYPE = ChecksumType.CRC32;
|
public static final ChecksumType DEFAULT_CHECKSUM_TYPE = ChecksumType.CRC32;
|
||||||
|
|
||||||
// For measuring latency of "sequential" reads and writes
|
// For measuring latency of "sequential" reads and writes
|
||||||
static final AtomicInteger readOps = new AtomicInteger();
|
private static final AtomicInteger readOps = new AtomicInteger();
|
||||||
static final AtomicLong readTimeNano = new AtomicLong();
|
private static final AtomicLong readTimeNano = new AtomicLong();
|
||||||
static final AtomicInteger writeOps = new AtomicInteger();
|
private static final AtomicInteger writeOps = new AtomicInteger();
|
||||||
static final AtomicLong writeTimeNano = new AtomicLong();
|
private static final AtomicLong writeTimeNano = new AtomicLong();
|
||||||
|
|
||||||
// For measuring latency of pread
|
// For measuring latency of pread
|
||||||
static final AtomicInteger preadOps = new AtomicInteger();
|
private static final AtomicInteger preadOps = new AtomicInteger();
|
||||||
static final AtomicLong preadTimeNano = new AtomicLong();
|
private static final AtomicLong preadTimeNano = new AtomicLong();
|
||||||
|
|
||||||
// For measuring number of checksum failures
|
// For measuring number of checksum failures
|
||||||
static final AtomicLong checksumFailures = new AtomicLong();
|
static final AtomicLong checksumFailures = new AtomicLong();
|
||||||
|
|
||||||
|
// For getting more detailed stats on FS latencies
|
||||||
|
// If, for some reason, the metrics subsystem stops polling for latencies,
|
||||||
|
// I don't want data to pile up in a memory leak
|
||||||
|
// so, after LATENCY_BUFFER_SIZE items have been enqueued for processing,
|
||||||
|
// fs latency stats will be dropped (and this behavior will be logged)
|
||||||
|
private static final int LATENCY_BUFFER_SIZE = 5000;
|
||||||
|
private static final BlockingQueue<Long> fsReadLatenciesNanos =
|
||||||
|
new ArrayBlockingQueue<Long>(LATENCY_BUFFER_SIZE);
|
||||||
|
private static final BlockingQueue<Long> fsWriteLatenciesNanos =
|
||||||
|
new ArrayBlockingQueue<Long>(LATENCY_BUFFER_SIZE);
|
||||||
|
private static final BlockingQueue<Long> fsPreadLatenciesNanos =
|
||||||
|
new ArrayBlockingQueue<Long>(LATENCY_BUFFER_SIZE);
|
||||||
|
private static final AtomicLong lastLoggedDataDrop = new AtomicLong(0);
|
||||||
|
|
||||||
|
// we don't want to fill up the logs with this message, so only log it
|
||||||
|
// once every 30 seconds at most
|
||||||
|
// I also want to avoid locks on the 'critical path' (the common case will be
|
||||||
|
// uncontended) - hence the CAS
|
||||||
|
private static void logDroppedLatencyStat() {
|
||||||
|
final long now = System.currentTimeMillis();
|
||||||
|
final long earliestAcceptableLog = now - TimeUnit.SECONDS.toMillis(30L);
|
||||||
|
while (true) {
|
||||||
|
final long lastLog = lastLoggedDataDrop.get();
|
||||||
|
if (lastLog < earliestAcceptableLog) {
|
||||||
|
if (lastLoggedDataDrop.compareAndSet(lastLog, now)) {
|
||||||
|
LOG.warn("Dropping fs latency stats since buffer is full");
|
||||||
|
break;
|
||||||
|
} // otherwise (if the compaseAndSet failed) the while loop retries
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final void offerReadLatency(long latencyNanos, boolean pread) {
|
||||||
|
boolean stored = false;
|
||||||
|
if (pread) {
|
||||||
|
stored = fsPreadLatenciesNanos.offer(latencyNanos);
|
||||||
|
preadOps.incrementAndGet();
|
||||||
|
preadTimeNano.addAndGet(latencyNanos);
|
||||||
|
} else {
|
||||||
|
stored = fsReadLatenciesNanos.offer(latencyNanos);
|
||||||
|
readTimeNano.addAndGet(latencyNanos);
|
||||||
|
readOps.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!stored) {
|
||||||
|
logDroppedLatencyStat();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final void offerWriteLatency(long latencyNanos) {
|
||||||
|
final boolean stored = fsWriteLatenciesNanos.offer(latencyNanos);
|
||||||
|
if (!stored) {
|
||||||
|
logDroppedLatencyStat();
|
||||||
|
}
|
||||||
|
|
||||||
|
writeTimeNano.addAndGet(latencyNanos);
|
||||||
|
writeOps.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final Collection<Long> getReadLatenciesNanos() {
|
||||||
|
final List<Long> latencies =
|
||||||
|
Lists.newArrayListWithCapacity(fsReadLatenciesNanos.size());
|
||||||
|
fsReadLatenciesNanos.drainTo(latencies);
|
||||||
|
return latencies;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final Collection<Long> getPreadLatenciesNanos() {
|
||||||
|
final List<Long> latencies =
|
||||||
|
Lists.newArrayListWithCapacity(fsPreadLatenciesNanos.size());
|
||||||
|
fsPreadLatenciesNanos.drainTo(latencies);
|
||||||
|
return latencies;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final Collection<Long> getWriteLatenciesNanos() {
|
||||||
|
final List<Long> latencies =
|
||||||
|
Lists.newArrayListWithCapacity(fsWriteLatenciesNanos.size());
|
||||||
|
fsWriteLatenciesNanos.drainTo(latencies);
|
||||||
|
return latencies;
|
||||||
|
}
|
||||||
|
|
||||||
// for test purpose
|
// for test purpose
|
||||||
public static volatile AtomicLong dataBlockReadCnt = new AtomicLong(0);
|
public static volatile AtomicLong dataBlockReadCnt = new AtomicLong(0);
|
||||||
|
|
||||||
|
|
|
@ -249,9 +249,8 @@ public class HFileReaderV1 extends AbstractHFileReader {
|
||||||
passSchemaMetricsTo(hfileBlock);
|
passSchemaMetricsTo(hfileBlock);
|
||||||
hfileBlock.expectType(BlockType.META);
|
hfileBlock.expectType(BlockType.META);
|
||||||
|
|
||||||
long delta = System.nanoTime() - startTimeNs;
|
final long delta = System.nanoTime() - startTimeNs;
|
||||||
HFile.preadTimeNano.addAndGet(delta);
|
HFile.offerReadLatency(delta, true);
|
||||||
HFile.preadOps.incrementAndGet();
|
|
||||||
getSchemaMetrics().updateOnCacheMiss(effectiveCategory,
|
getSchemaMetrics().updateOnCacheMiss(effectiveCategory,
|
||||||
SchemaMetrics.NO_COMPACTION, delta);
|
SchemaMetrics.NO_COMPACTION, delta);
|
||||||
|
|
||||||
|
@ -328,14 +327,8 @@ public class HFileReaderV1 extends AbstractHFileReader {
|
||||||
passSchemaMetricsTo(hfileBlock);
|
passSchemaMetricsTo(hfileBlock);
|
||||||
hfileBlock.expectType(BlockType.DATA);
|
hfileBlock.expectType(BlockType.DATA);
|
||||||
|
|
||||||
long delta = System.nanoTime() - startTimeNs;
|
final long delta = System.nanoTime() - startTimeNs;
|
||||||
if (pread) {
|
HFile.offerReadLatency(delta, pread);
|
||||||
HFile.preadTimeNano.addAndGet(delta);
|
|
||||||
HFile.preadOps.incrementAndGet();
|
|
||||||
} else {
|
|
||||||
HFile.readTimeNano.addAndGet(delta);
|
|
||||||
HFile.readOps.incrementAndGet();
|
|
||||||
}
|
|
||||||
getSchemaMetrics().updateOnCacheMiss(BlockCategory.DATA, isCompaction,
|
getSchemaMetrics().updateOnCacheMiss(BlockCategory.DATA, isCompaction,
|
||||||
delta);
|
delta);
|
||||||
|
|
||||||
|
|
|
@ -236,9 +236,8 @@ public class HFileReaderV2 extends AbstractHFileReader {
|
||||||
blockSize, -1, true);
|
blockSize, -1, true);
|
||||||
passSchemaMetricsTo(metaBlock);
|
passSchemaMetricsTo(metaBlock);
|
||||||
|
|
||||||
long delta = System.nanoTime() - startTimeNs;
|
final long delta = System.nanoTime() - startTimeNs;
|
||||||
HFile.preadTimeNano.addAndGet(delta);
|
HFile.offerReadLatency(delta, true);
|
||||||
HFile.preadOps.incrementAndGet();
|
|
||||||
getSchemaMetrics().updateOnCacheMiss(BlockCategory.META, false, delta);
|
getSchemaMetrics().updateOnCacheMiss(BlockCategory.META, false, delta);
|
||||||
|
|
||||||
// Cache the block
|
// Cache the block
|
||||||
|
@ -335,14 +334,8 @@ public class HFileReaderV2 extends AbstractHFileReader {
|
||||||
passSchemaMetricsTo(hfileBlock);
|
passSchemaMetricsTo(hfileBlock);
|
||||||
BlockCategory blockCategory = hfileBlock.getBlockType().getCategory();
|
BlockCategory blockCategory = hfileBlock.getBlockType().getCategory();
|
||||||
|
|
||||||
long delta = System.nanoTime() - startTimeNs;
|
final long delta = System.nanoTime() - startTimeNs;
|
||||||
if (pread) {
|
HFile.offerReadLatency(delta, pread);
|
||||||
HFile.preadTimeNano.addAndGet(delta);
|
|
||||||
HFile.preadOps.incrementAndGet();
|
|
||||||
} else {
|
|
||||||
HFile.readTimeNano.addAndGet(delta);
|
|
||||||
HFile.readOps.incrementAndGet();
|
|
||||||
}
|
|
||||||
getSchemaMetrics().updateOnCacheMiss(blockCategory, isCompaction, delta);
|
getSchemaMetrics().updateOnCacheMiss(blockCategory, isCompaction, delta);
|
||||||
|
|
||||||
// Cache the block if necessary
|
// Cache the block if necessary
|
||||||
|
|
|
@ -141,9 +141,8 @@ public class HFileWriterV1 extends AbstractHFileWriter {
|
||||||
blockDataSizes.add(Integer.valueOf(size));
|
blockDataSizes.add(Integer.valueOf(size));
|
||||||
this.totalUncompressedBytes += size;
|
this.totalUncompressedBytes += size;
|
||||||
|
|
||||||
HFile.writeTimeNano.addAndGet(System.nanoTime() - startTimeNs);
|
HFile.offerWriteLatency(System.nanoTime() - startTimeNs);
|
||||||
HFile.writeOps.incrementAndGet();
|
|
||||||
|
|
||||||
if (cacheConf.shouldCacheDataOnWrite()) {
|
if (cacheConf.shouldCacheDataOnWrite()) {
|
||||||
baosDos.flush();
|
baosDos.flush();
|
||||||
// we do not do data block encoding on disk for HFile v1
|
// we do not do data block encoding on disk for HFile v1
|
||||||
|
|
|
@ -189,9 +189,8 @@ public class HFileWriterV2 extends AbstractHFileWriter {
|
||||||
onDiskSize);
|
onDiskSize);
|
||||||
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
|
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
|
||||||
|
|
||||||
HFile.writeTimeNano.addAndGet(System.nanoTime() - startTimeNs);
|
HFile.offerWriteLatency(System.nanoTime() - startTimeNs);
|
||||||
HFile.writeOps.incrementAndGet();
|
|
||||||
|
|
||||||
if (cacheConf.shouldCacheDataOnWrite()) {
|
if (cacheConf.shouldCacheDataOnWrite()) {
|
||||||
doCacheOnWrite(lastDataBlockOffset);
|
doCacheOnWrite(lastDataBlockOffset);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,154 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.metrics;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
import org.apache.hadoop.metrics.MetricsRecord;
|
||||||
|
import org.apache.hadoop.metrics.util.MetricsBase;
|
||||||
|
import org.apache.hadoop.metrics.util.MetricsRegistry;
|
||||||
|
import org.cliffc.high_scale_lib.Counter;
|
||||||
|
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.MapMaker;
|
||||||
|
|
||||||
|
public class ExactCounterMetric extends MetricsBase {
|
||||||
|
|
||||||
|
private static final int DEFAULT_TOP_N = 5;
|
||||||
|
|
||||||
|
// only publish stats on the topN items (default to DEFAULT_TOP_N)
|
||||||
|
private final int topN;
|
||||||
|
private final Map<String, Counter> counts;
|
||||||
|
|
||||||
|
// all access to the 'counts' map should use this lock.
|
||||||
|
// take a write lock iff you want to guarantee exclusive access
|
||||||
|
// (the map stripes locks internally, so it's already thread safe -
|
||||||
|
// this lock is just so you can take a consistent snapshot of data)
|
||||||
|
private final ReadWriteLock lock;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor to create a new counter metric
|
||||||
|
* @param nam the name to publish this metric under
|
||||||
|
* @param registry where the metrics object will be registered
|
||||||
|
* @param description metrics description
|
||||||
|
* @param topN how many 'keys' to publish metrics on
|
||||||
|
*/
|
||||||
|
public ExactCounterMetric(final String nam, final MetricsRegistry registry,
|
||||||
|
final String description, int topN) {
|
||||||
|
super(nam, description);
|
||||||
|
|
||||||
|
this.counts = new MapMaker().makeComputingMap(
|
||||||
|
new Function<String, Counter>() {
|
||||||
|
@Override
|
||||||
|
public Counter apply(String input) {
|
||||||
|
return new Counter();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
this.lock = new ReentrantReadWriteLock();
|
||||||
|
this.topN = topN;
|
||||||
|
|
||||||
|
if (registry != null) {
|
||||||
|
registry.add(nam, this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor creates a new ExactCounterMetric
|
||||||
|
* @param nam the name of the metrics to be used to publish the metric
|
||||||
|
* @param registry where the metrics object will be registered
|
||||||
|
*/
|
||||||
|
public ExactCounterMetric(final String nam, MetricsRegistry registry) {
|
||||||
|
this(nam, registry, NO_DESCRIPTION, DEFAULT_TOP_N);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void update(String type) {
|
||||||
|
this.lock.readLock().lock();
|
||||||
|
try {
|
||||||
|
this.counts.get(type).increment();
|
||||||
|
} finally {
|
||||||
|
this.lock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void update(String type, long count) {
|
||||||
|
this.lock.readLock().lock();
|
||||||
|
try {
|
||||||
|
this.counts.get(type).add(count);
|
||||||
|
} finally {
|
||||||
|
this.lock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Pair<String, Long>> getTop(int n) {
|
||||||
|
final List<Pair<String, Long>> countsSnapshot =
|
||||||
|
Lists.newArrayListWithCapacity(this.counts.size());
|
||||||
|
|
||||||
|
// no updates are allowed while I'm holding this lock, so move fast
|
||||||
|
this.lock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
for(Entry<String, Counter> entry : this.counts.entrySet()) {
|
||||||
|
countsSnapshot.add(Pair.newPair(entry.getKey(),
|
||||||
|
entry.getValue().get()));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
this.lock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
Collections.sort(countsSnapshot, new Comparator<Pair<String, Long>>() {
|
||||||
|
@Override
|
||||||
|
public int compare(Pair<String, Long> a, Pair<String, Long> b) {
|
||||||
|
return b.getSecond().compareTo(a.getSecond());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return countsSnapshot.subList(0, Math.min(n, countsSnapshot.size()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void pushMetric(MetricsRecord mr) {
|
||||||
|
final List<Pair<String, Long>> topKeys = getTop(Integer.MAX_VALUE);
|
||||||
|
int sum = 0;
|
||||||
|
|
||||||
|
int counter = 0;
|
||||||
|
for (Pair<String, Long> keyCount : topKeys) {
|
||||||
|
counter++;
|
||||||
|
// only push stats on the topN keys
|
||||||
|
if (counter <= this.topN) {
|
||||||
|
mr.setMetric(getName() + "_" + keyCount.getFirst(),
|
||||||
|
keyCount.getSecond());
|
||||||
|
}
|
||||||
|
sum += keyCount.getSecond();
|
||||||
|
}
|
||||||
|
mr.setMetric(getName() + "_map_size", this.counts.size());
|
||||||
|
mr.setMetric(getName() + "_total_count", sum);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,212 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.metrics.histogram;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An exponentially-decaying random sample of {@code long}s.
|
||||||
|
* Uses Cormode et al's forward-decaying priority reservoir sampling method
|
||||||
|
* to produce a statistically representative sample, exponentially biased
|
||||||
|
* towards newer entries.
|
||||||
|
*
|
||||||
|
* see Cormode et al.
|
||||||
|
* Forward Decay: A Practical Time Decay Model for Streaming Systems. ICDE '09
|
||||||
|
*/
|
||||||
|
public class ExponentiallyDecayingSample implements Sample {
|
||||||
|
|
||||||
|
private static final Random RANDOM = new Random();
|
||||||
|
private static final long RESCALE_THRESHOLD = TimeUnit.HOURS.toNanos(1);
|
||||||
|
|
||||||
|
private static final ScheduledExecutorService TICK_SERVICE =
|
||||||
|
Executors.newScheduledThreadPool(1,
|
||||||
|
Threads.getNamedThreadFactory("decayingSampleTick"));
|
||||||
|
|
||||||
|
private static volatile long CURRENT_TICK =
|
||||||
|
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
|
||||||
|
|
||||||
|
static {
|
||||||
|
// sample at twice our signal's frequency (1Hz) per the Nyquist theorem
|
||||||
|
TICK_SERVICE.scheduleAtFixedRate(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
CURRENT_TICK =
|
||||||
|
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
|
||||||
|
}
|
||||||
|
}, 0, 500, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final ConcurrentSkipListMap<Double, Long> values =
|
||||||
|
new ConcurrentSkipListMap<Double, Long>();
|
||||||
|
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
|
private final AtomicLong count = new AtomicLong(0);
|
||||||
|
private final AtomicLong nextScaleTime = new AtomicLong(0);
|
||||||
|
|
||||||
|
private final double alpha;
|
||||||
|
private final int reservoirSize;
|
||||||
|
private volatile long startTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor for an ExponentiallyDecayingSample.
|
||||||
|
*
|
||||||
|
* @param reservoirSize the number of samples to keep in the reservoir
|
||||||
|
* @param alpha the exponential decay factor; the higher this is,
|
||||||
|
* the more biased the sample will be towards newer
|
||||||
|
* values
|
||||||
|
*/
|
||||||
|
public ExponentiallyDecayingSample(int reservoirSize, double alpha) {
|
||||||
|
this.alpha = alpha;
|
||||||
|
this.reservoirSize = reservoirSize;
|
||||||
|
clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clear() {
|
||||||
|
lockForRescale();
|
||||||
|
try {
|
||||||
|
values.clear();
|
||||||
|
count.set(0);
|
||||||
|
this.startTime = CURRENT_TICK;
|
||||||
|
nextScaleTime.set(System.nanoTime() + RESCALE_THRESHOLD);
|
||||||
|
} finally {
|
||||||
|
unlockForRescale();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int size() {
|
||||||
|
return (int) Math.min(reservoirSize, count.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void update(long value) {
|
||||||
|
update(value, CURRENT_TICK);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds an old value with a fixed timestamp to the sample.
|
||||||
|
*
|
||||||
|
* @param value the value to be added
|
||||||
|
* @param timestamp the epoch timestamp of {@code value} in seconds
|
||||||
|
*/
|
||||||
|
public void update(long value, long timestamp) {
|
||||||
|
lockForRegularUsage();
|
||||||
|
try {
|
||||||
|
final double priority = weight(timestamp - startTime)
|
||||||
|
/ RANDOM.nextDouble();
|
||||||
|
final long newCount = count.incrementAndGet();
|
||||||
|
if (newCount <= reservoirSize) {
|
||||||
|
values.put(priority, value);
|
||||||
|
} else {
|
||||||
|
Double first = values.firstKey();
|
||||||
|
if (first < priority) {
|
||||||
|
if (values.putIfAbsent(priority, value) == null) {
|
||||||
|
// ensure we always remove an item
|
||||||
|
while (values.remove(first) == null) {
|
||||||
|
first = values.firstKey();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
unlockForRegularUsage();
|
||||||
|
}
|
||||||
|
|
||||||
|
final long now = System.nanoTime();
|
||||||
|
final long next = nextScaleTime.get();
|
||||||
|
if (now >= next) {
|
||||||
|
rescale(now, next);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Snapshot getSnapshot() {
|
||||||
|
lockForRegularUsage();
|
||||||
|
try {
|
||||||
|
return new Snapshot(values.values());
|
||||||
|
} finally {
|
||||||
|
unlockForRegularUsage();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private double weight(long t) {
|
||||||
|
return Math.exp(alpha * t);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* "A common feature of the above techniques—indeed, the key technique that
|
||||||
|
* allows us to track the decayed weights efficiently—is that they maintain
|
||||||
|
* counts and other quantities based on g(ti − L), and only scale by g(t − L)
|
||||||
|
* at query time. But while g(ti −L)/g(t−L) is guaranteed to lie between zero
|
||||||
|
* and one, the intermediate values of g(ti − L) could become very large. For
|
||||||
|
* polynomial functions, these values should not grow too large, and should
|
||||||
|
* be effectively represented in practice by floating point values without
|
||||||
|
* loss of precision. For exponential functions, these values could grow
|
||||||
|
* quite large as new values of (ti − L) become large, and potentially
|
||||||
|
* exceed the capacity of common floating point types. However, since the
|
||||||
|
* values stored by the algorithms are linear combinations of g values
|
||||||
|
* (scaled sums), they can be rescaled relative to a new landmark. That is,
|
||||||
|
* by the analysis of exponential decay in Section III-A, the choice of L
|
||||||
|
* does not affect the final result. We can therefore multiply each value
|
||||||
|
* based on L by a factor of exp(−α(L′ − L)), and obtain the correct value
|
||||||
|
* as if we had instead computed relative to a new landmark L′ (and then use
|
||||||
|
* this new L′ at query time). This can be done with a linear pass over
|
||||||
|
* whatever data structure is being used."
|
||||||
|
*/
|
||||||
|
private void rescale(long now, long next) {
|
||||||
|
if (nextScaleTime.compareAndSet(next, now + RESCALE_THRESHOLD)) {
|
||||||
|
lockForRescale();
|
||||||
|
try {
|
||||||
|
final long oldStartTime = startTime;
|
||||||
|
this.startTime = CURRENT_TICK;
|
||||||
|
final ArrayList<Double> keys = new ArrayList<Double>(values.keySet());
|
||||||
|
for (Double key : keys) {
|
||||||
|
final Long value = values.remove(key);
|
||||||
|
values.put(key * Math.exp(-alpha * (startTime - oldStartTime)),
|
||||||
|
value);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
unlockForRescale();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void unlockForRescale() {
|
||||||
|
lock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void lockForRescale() {
|
||||||
|
lock.writeLock().lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void lockForRegularUsage() {
|
||||||
|
lock.readLock().lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void unlockForRegularUsage() {
|
||||||
|
lock.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,225 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.metrics.histogram;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.apache.hadoop.metrics.MetricsRecord;
|
||||||
|
import org.apache.hadoop.metrics.util.MetricsBase;
|
||||||
|
import org.apache.hadoop.metrics.util.MetricsRegistry;
|
||||||
|
|
||||||
|
public class MetricsHistogram extends MetricsBase {
|
||||||
|
|
||||||
|
// 1028 items implies 99.9% CI w/ 5% margin of error
|
||||||
|
// (assuming a normal distribution on the underlying data)
|
||||||
|
private static final int DEFAULT_SAMPLE_SIZE = 1028;
|
||||||
|
|
||||||
|
// the bias towards sampling from more recent data.
|
||||||
|
// Per Cormode et al. an alpha of 0.015 strongly biases to the last 5 minutes
|
||||||
|
private static final double DEFAULT_ALPHA = 0.015;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor to create a new histogram metric
|
||||||
|
* @param nam the name to publish the metric under
|
||||||
|
* @param registry where the metrics object will be registered
|
||||||
|
* @param description the metric's description
|
||||||
|
* @param forwardBiased true if you want this histogram to give more
|
||||||
|
* weight to recent data,
|
||||||
|
* false if you want all data to have uniform weight
|
||||||
|
*/
|
||||||
|
public MetricsHistogram(final String nam, final MetricsRegistry registry,
|
||||||
|
final String description, boolean forwardBiased) {
|
||||||
|
super(nam, description);
|
||||||
|
|
||||||
|
this.min = new AtomicLong();
|
||||||
|
this.max = new AtomicLong();
|
||||||
|
this.sum = new AtomicLong();
|
||||||
|
this.sample = forwardBiased ?
|
||||||
|
new ExponentiallyDecayingSample(DEFAULT_SAMPLE_SIZE, DEFAULT_ALPHA)
|
||||||
|
: new UniformSample(DEFAULT_SAMPLE_SIZE);
|
||||||
|
|
||||||
|
this.variance = new AtomicReference<double[]>(new double[]{-1, 0});
|
||||||
|
this.count = new AtomicLong();
|
||||||
|
|
||||||
|
this.clear();
|
||||||
|
|
||||||
|
if (registry != null) {
|
||||||
|
registry.add(nam, this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor create a new (forward biased) histogram metric
|
||||||
|
* @param nam the name to publish the metric under
|
||||||
|
* @param registry where the metrics object will be registered
|
||||||
|
* @param description the metric's description
|
||||||
|
*/
|
||||||
|
public MetricsHistogram(final String nam, MetricsRegistry registry,
|
||||||
|
final String description) {
|
||||||
|
this(nam, registry, NO_DESCRIPTION, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor - create a new (forward biased) histogram metric
|
||||||
|
* @param nam the name of the metrics to be used to publish the metric
|
||||||
|
* @param registry - where the metrics object will be registered
|
||||||
|
*/
|
||||||
|
public MetricsHistogram(final String nam, MetricsRegistry registry) {
|
||||||
|
this(nam, registry, NO_DESCRIPTION);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Sample sample;
|
||||||
|
private final AtomicLong min;
|
||||||
|
private final AtomicLong max;
|
||||||
|
private final AtomicLong sum;
|
||||||
|
|
||||||
|
// these are for computing a running-variance,
|
||||||
|
// without letting floating point errors accumulate via Welford's algorithm
|
||||||
|
private final AtomicReference<double[]> variance;
|
||||||
|
private final AtomicLong count;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clears all recorded values.
|
||||||
|
*/
|
||||||
|
public void clear() {
|
||||||
|
this.sample.clear();
|
||||||
|
this.count.set(0);
|
||||||
|
this.max.set(Long.MIN_VALUE);
|
||||||
|
this.min.set(Long.MAX_VALUE);
|
||||||
|
this.sum.set(0);
|
||||||
|
variance.set(new double[]{-1, 0});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void update(int val) {
|
||||||
|
update((long) val);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void update(final long val) {
|
||||||
|
count.incrementAndGet();
|
||||||
|
sample.update(val);
|
||||||
|
setMax(val);
|
||||||
|
setMin(val);
|
||||||
|
sum.getAndAdd(val);
|
||||||
|
updateVariance(val);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setMax(final long potentialMax) {
|
||||||
|
boolean done = false;
|
||||||
|
while (!done) {
|
||||||
|
final long currentMax = max.get();
|
||||||
|
done = currentMax >= potentialMax
|
||||||
|
|| max.compareAndSet(currentMax, potentialMax);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setMin(long potentialMin) {
|
||||||
|
boolean done = false;
|
||||||
|
while (!done) {
|
||||||
|
final long currentMin = min.get();
|
||||||
|
done = currentMin <= potentialMin
|
||||||
|
|| min.compareAndSet(currentMin, potentialMin);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateVariance(long value) {
|
||||||
|
boolean done = false;
|
||||||
|
while (!done) {
|
||||||
|
final double[] oldValues = variance.get();
|
||||||
|
final double[] newValues = new double[2];
|
||||||
|
if (oldValues[0] == -1) {
|
||||||
|
newValues[0] = value;
|
||||||
|
newValues[1] = 0;
|
||||||
|
} else {
|
||||||
|
final double oldM = oldValues[0];
|
||||||
|
final double oldS = oldValues[1];
|
||||||
|
|
||||||
|
final double newM = oldM + ((value - oldM) / getCount());
|
||||||
|
final double newS = oldS + ((value - oldM) * (value - newM));
|
||||||
|
|
||||||
|
newValues[0] = newM;
|
||||||
|
newValues[1] = newS;
|
||||||
|
}
|
||||||
|
done = variance.compareAndSet(oldValues, newValues);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public long getCount() {
|
||||||
|
return count.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMax() {
|
||||||
|
if (getCount() > 0) {
|
||||||
|
return max.get();
|
||||||
|
}
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMin() {
|
||||||
|
if (getCount() > 0) {
|
||||||
|
return min.get();
|
||||||
|
}
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getMean() {
|
||||||
|
if (getCount() > 0) {
|
||||||
|
return sum.get() / (double) getCount();
|
||||||
|
}
|
||||||
|
return 0.0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getStdDev() {
|
||||||
|
if (getCount() > 0) {
|
||||||
|
return Math.sqrt(getVariance());
|
||||||
|
}
|
||||||
|
return 0.0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Snapshot getSnapshot() {
|
||||||
|
return sample.getSnapshot();
|
||||||
|
}
|
||||||
|
|
||||||
|
private double getVariance() {
|
||||||
|
if (getCount() <= 1) {
|
||||||
|
return 0.0;
|
||||||
|
}
|
||||||
|
return variance.get()[1] / (getCount() - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void pushMetric(MetricsRecord mr) {
|
||||||
|
final Snapshot s = this.getSnapshot();
|
||||||
|
mr.setMetric(getName() + "_num_ops", this.getCount());
|
||||||
|
mr.setMetric(getName() + "_min", this.getMin());
|
||||||
|
mr.setMetric(getName() + "_max", this.getMax());
|
||||||
|
|
||||||
|
mr.setMetric(getName() + "_mean", (float) this.getMean());
|
||||||
|
mr.setMetric(getName() + "_std_dev", (float) this.getStdDev());
|
||||||
|
|
||||||
|
mr.setMetric(getName() + "_median", (float) s.getMedian());
|
||||||
|
mr.setMetric(getName() + "_75th_percentile",
|
||||||
|
(float) s.get75thPercentile());
|
||||||
|
mr.setMetric(getName() + "_95th_percentile",
|
||||||
|
(float) s.get95thPercentile());
|
||||||
|
mr.setMetric(getName() + "_99th_percentile",
|
||||||
|
(float) s.get99thPercentile());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.metrics.histogram;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A statistically representative sample of items from a stream.
|
||||||
|
*/
|
||||||
|
public interface Sample {
|
||||||
|
/**
|
||||||
|
* Clears all recorded values.
|
||||||
|
*/
|
||||||
|
void clear();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of values recorded.
|
||||||
|
*
|
||||||
|
* @return the number of values recorded
|
||||||
|
*/
|
||||||
|
int size();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds a new recorded value to the sample.
|
||||||
|
*
|
||||||
|
* @param value a new recorded value
|
||||||
|
*/
|
||||||
|
void update(long value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a snapshot of the sample's values.
|
||||||
|
*
|
||||||
|
* @return a snapshot of the sample's values
|
||||||
|
*/
|
||||||
|
Snapshot getSnapshot();
|
||||||
|
}
|
|
@ -0,0 +1,166 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.metrics.histogram;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.PrintWriter;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A snapshot of all the information seen in a Sample.
|
||||||
|
*/
|
||||||
|
public class Snapshot {
|
||||||
|
|
||||||
|
private static final double MEDIAN_Q = 0.5;
|
||||||
|
private static final double P75_Q = 0.75;
|
||||||
|
private static final double P95_Q = 0.95;
|
||||||
|
private static final double P98_Q = 0.98;
|
||||||
|
private static final double P99_Q = 0.99;
|
||||||
|
private static final double P999_Q = 0.999;
|
||||||
|
|
||||||
|
private final double[] values;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new {@link Snapshot} with the given values.
|
||||||
|
*
|
||||||
|
* @param values an unordered set of values in the sample
|
||||||
|
*/
|
||||||
|
public Snapshot(Collection<Long> values) {
|
||||||
|
final Object[] copy = values.toArray();
|
||||||
|
this.values = new double[copy.length];
|
||||||
|
for (int i = 0; i < copy.length; i++) {
|
||||||
|
this.values[i] = (Long) copy[i];
|
||||||
|
}
|
||||||
|
Arrays.sort(this.values);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new {@link Snapshot} with the given values.
|
||||||
|
*
|
||||||
|
* @param values an unordered set of values in the sample
|
||||||
|
*/
|
||||||
|
public Snapshot(double[] values) {
|
||||||
|
this.values = new double[values.length];
|
||||||
|
System.arraycopy(values, 0, this.values, 0, values.length);
|
||||||
|
Arrays.sort(this.values);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the value at the given quantile.
|
||||||
|
*
|
||||||
|
* @param quantile a given quantile, in [0..1]
|
||||||
|
* @return the value in the distribution at quantile
|
||||||
|
*/
|
||||||
|
public double getValue(double quantile) {
|
||||||
|
if (quantile < 0.0 || quantile > 1.0) {
|
||||||
|
throw new IllegalArgumentException(quantile + " is not in [0..1]");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (values.length == 0) {
|
||||||
|
return 0.0;
|
||||||
|
}
|
||||||
|
|
||||||
|
final double pos = quantile * (values.length + 1);
|
||||||
|
|
||||||
|
if (pos < 1) {
|
||||||
|
return values[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pos >= values.length) {
|
||||||
|
return values[values.length - 1];
|
||||||
|
}
|
||||||
|
|
||||||
|
final double lower = values[(int) pos - 1];
|
||||||
|
final double upper = values[(int) pos];
|
||||||
|
return lower + (pos - Math.floor(pos)) * (upper - lower);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of values in the snapshot.
|
||||||
|
*
|
||||||
|
* @return the number of values in the snapshot
|
||||||
|
*/
|
||||||
|
public int size() {
|
||||||
|
return values.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the median value in the distribution.
|
||||||
|
*
|
||||||
|
* @return the median value in the distribution
|
||||||
|
*/
|
||||||
|
public double getMedian() {
|
||||||
|
return getValue(MEDIAN_Q);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the value at the 75th percentile in the distribution.
|
||||||
|
*
|
||||||
|
* @return the value at the 75th percentile in the distribution
|
||||||
|
*/
|
||||||
|
public double get75thPercentile() {
|
||||||
|
return getValue(P75_Q);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the value at the 95th percentile in the distribution.
|
||||||
|
*
|
||||||
|
* @return the value at the 95th percentile in the distribution
|
||||||
|
*/
|
||||||
|
public double get95thPercentile() {
|
||||||
|
return getValue(P95_Q);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the value at the 98th percentile in the distribution.
|
||||||
|
*
|
||||||
|
* @return the value at the 98th percentile in the distribution
|
||||||
|
*/
|
||||||
|
public double get98thPercentile() {
|
||||||
|
return getValue(P98_Q);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the value at the 99th percentile in the distribution.
|
||||||
|
*
|
||||||
|
* @return the value at the 99th percentile in the distribution
|
||||||
|
*/
|
||||||
|
public double get99thPercentile() {
|
||||||
|
return getValue(P99_Q);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the value at the 99.9th percentile in the distribution.
|
||||||
|
*
|
||||||
|
* @return the value at the 99.9th percentile in the distribution
|
||||||
|
*/
|
||||||
|
public double get999thPercentile() {
|
||||||
|
return getValue(P999_Q);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the entire set of values in the snapshot.
|
||||||
|
*
|
||||||
|
* @return the entire set of values in the snapshot
|
||||||
|
*/
|
||||||
|
public double[] getValues() {
|
||||||
|
return Arrays.copyOf(values, values.length);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,105 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.metrics.histogram;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.atomic.AtomicLongArray;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A random sample of a stream of longs. Uses Vitter's Algorithm R to produce a
|
||||||
|
* statistically representative sample.
|
||||||
|
*
|
||||||
|
* see: http://www.cs.umd.edu/~samir/498/vitter.pdf
|
||||||
|
*/
|
||||||
|
public class UniformSample implements Sample {
|
||||||
|
|
||||||
|
private static final Random RANDOM = new Random();
|
||||||
|
private static final int BITS_PER_LONG = 63;
|
||||||
|
|
||||||
|
private final AtomicLong count = new AtomicLong();
|
||||||
|
private final AtomicLongArray values;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new UniformSample
|
||||||
|
*
|
||||||
|
* @param reservoirSize the number of samples to keep
|
||||||
|
*/
|
||||||
|
public UniformSample(int reservoirSize) {
|
||||||
|
this.values = new AtomicLongArray(reservoirSize);
|
||||||
|
clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clear() {
|
||||||
|
for (int i = 0; i < values.length(); i++) {
|
||||||
|
values.set(i, 0);
|
||||||
|
}
|
||||||
|
count.set(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int size() {
|
||||||
|
final long c = count.get();
|
||||||
|
if (c > values.length()) {
|
||||||
|
return values.length();
|
||||||
|
}
|
||||||
|
return (int) c;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void update(long value) {
|
||||||
|
final long c = count.incrementAndGet();
|
||||||
|
if (c <= values.length()) {
|
||||||
|
values.set((int) c - 1, value);
|
||||||
|
} else {
|
||||||
|
final long r = nextLong(c);
|
||||||
|
if (r < values.length()) {
|
||||||
|
values.set((int) r, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a pseudo-random long uniformly between 0 and n-1. Stolen from
|
||||||
|
* {@link java.util.Random#nextInt()}.
|
||||||
|
*
|
||||||
|
* @param n the bound
|
||||||
|
* @return a value select randomly from the range {@code [0..n)}.
|
||||||
|
*/
|
||||||
|
private static long nextLong(long n) {
|
||||||
|
long bits, val;
|
||||||
|
do {
|
||||||
|
bits = RANDOM.nextLong() & (~(1L << BITS_PER_LONG));
|
||||||
|
val = bits % n;
|
||||||
|
} while (bits - val + (n - 1) < 0L);
|
||||||
|
return val;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Snapshot getSnapshot() {
|
||||||
|
final int s = size();
|
||||||
|
final List<Long> copy = new ArrayList<Long>(s);
|
||||||
|
for (int i = 0; i < s; i++) {
|
||||||
|
copy.add(values.get(i));
|
||||||
|
}
|
||||||
|
return new Snapshot(copy);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1953,12 +1953,15 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
/** {@inheritDoc} */
|
/** {@inheritDoc} */
|
||||||
public Result get(byte[] regionName, Get get) throws IOException {
|
public Result get(byte[] regionName, Get get) throws IOException {
|
||||||
checkOpen();
|
checkOpen();
|
||||||
|
final long startTime = System.nanoTime();
|
||||||
requestCount.incrementAndGet();
|
requestCount.incrementAndGet();
|
||||||
try {
|
try {
|
||||||
HRegion region = getRegion(regionName);
|
HRegion region = getRegion(regionName);
|
||||||
return region.get(get, getLockFromId(get.getLockId()));
|
return region.get(get, getLockFromId(get.getLockId()));
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
throw convertThrowableToIOE(cleanup(t));
|
throw convertThrowableToIOE(cleanup(t));
|
||||||
|
} finally {
|
||||||
|
this.metrics.getLatencies.update(System.nanoTime() - startTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1990,6 +1993,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
throw new IllegalArgumentException("update has null row");
|
throw new IllegalArgumentException("update has null row");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final long startTime = System.nanoTime();
|
||||||
checkOpen();
|
checkOpen();
|
||||||
this.requestCount.incrementAndGet();
|
this.requestCount.incrementAndGet();
|
||||||
HRegion region = getRegion(regionName);
|
HRegion region = getRegion(regionName);
|
||||||
|
@ -2001,6 +2005,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
region.put(put, getLockFromId(put.getLockId()), writeToWAL);
|
region.put(put, getLockFromId(put.getLockId()), writeToWAL);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
throw convertThrowableToIOE(cleanup(t));
|
throw convertThrowableToIOE(cleanup(t));
|
||||||
|
} finally {
|
||||||
|
this.metrics.putLatencies.update(System.nanoTime() - startTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2008,6 +2014,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkOpen();
|
checkOpen();
|
||||||
HRegion region = null;
|
HRegion region = null;
|
||||||
|
int i = 0;
|
||||||
|
|
||||||
|
final long startTime = System.nanoTime();
|
||||||
try {
|
try {
|
||||||
region = getRegion(regionName);
|
region = getRegion(regionName);
|
||||||
if (!region.getRegionInfo().isMetaTable()) {
|
if (!region.getRegionInfo().isMetaTable()) {
|
||||||
|
@ -2017,7 +2026,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
Pair<Put, Integer>[] putsWithLocks = new Pair[puts.size()];
|
Pair<Put, Integer>[] putsWithLocks = new Pair[puts.size()];
|
||||||
|
|
||||||
int i = 0;
|
|
||||||
for (Put p : puts) {
|
for (Put p : puts) {
|
||||||
Integer lock = getLockFromId(p.getLockId());
|
Integer lock = getLockFromId(p.getLockId());
|
||||||
putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
|
putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
|
||||||
|
@ -2033,6 +2041,14 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
return -1;
|
return -1;
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
throw convertThrowableToIOE(cleanup(t));
|
throw convertThrowableToIOE(cleanup(t));
|
||||||
|
} finally {
|
||||||
|
// going to count this as puts.size() PUTs for latency calculations
|
||||||
|
final long totalTime = System.nanoTime() - startTime;
|
||||||
|
final long putCount = i;
|
||||||
|
final long perPutTime = totalTime / putCount;
|
||||||
|
for (int request = 0; request < putCount; request++) {
|
||||||
|
this.metrics.putLatencies.update(perPutTime);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2494,6 +2510,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
public void delete(final byte[] regionName, final Delete delete)
|
public void delete(final byte[] regionName, final Delete delete)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkOpen();
|
checkOpen();
|
||||||
|
final long startTime = System.nanoTime();
|
||||||
try {
|
try {
|
||||||
boolean writeToWAL = delete.getWriteToWAL();
|
boolean writeToWAL = delete.getWriteToWAL();
|
||||||
this.requestCount.incrementAndGet();
|
this.requestCount.incrementAndGet();
|
||||||
|
@ -2505,6 +2522,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
region.delete(delete, lid, writeToWAL);
|
region.delete(delete, lid, writeToWAL);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
throw convertThrowableToIOE(cleanup(t));
|
throw convertThrowableToIOE(cleanup(t));
|
||||||
|
} finally {
|
||||||
|
this.metrics.deleteLatencies.update(System.nanoTime() - startTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2522,10 +2541,12 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
int size = deletes.size();
|
int size = deletes.size();
|
||||||
Integer[] locks = new Integer[size];
|
Integer[] locks = new Integer[size];
|
||||||
for (Delete delete : deletes) {
|
for (Delete delete : deletes) {
|
||||||
|
final long startTime = System.nanoTime();
|
||||||
this.requestCount.incrementAndGet();
|
this.requestCount.incrementAndGet();
|
||||||
locks[i] = getLockFromId(delete.getLockId());
|
locks[i] = getLockFromId(delete.getLockId());
|
||||||
region.delete(delete, locks[i], delete.getWriteToWAL());
|
region.delete(delete, locks[i], delete.getWriteToWAL());
|
||||||
i++;
|
i++;
|
||||||
|
this.metrics.deleteLatencies.update(System.nanoTime() - startTime);
|
||||||
}
|
}
|
||||||
} catch (WrongRegionException ex) {
|
} catch (WrongRegionException ex) {
|
||||||
LOG.debug("Batch deletes: " + i, ex);
|
LOG.debug("Batch deletes: " + i, ex);
|
||||||
|
|
|
@ -19,13 +19,21 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver.metrics;
|
package org.apache.hadoop.hbase.regionserver.metrics;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
|
import java.lang.management.MemoryUsage;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
|
import org.apache.hadoop.hbase.metrics.ExactCounterMetric;
|
||||||
import org.apache.hadoop.hbase.metrics.HBaseInfo;
|
import org.apache.hadoop.hbase.metrics.HBaseInfo;
|
||||||
import org.apache.hadoop.hbase.metrics.MetricsRate;
|
import org.apache.hadoop.hbase.metrics.MetricsRate;
|
||||||
import org.apache.hadoop.hbase.metrics.PersistentMetricsTimeVaryingRate;
|
import org.apache.hadoop.hbase.metrics.PersistentMetricsTimeVaryingRate;
|
||||||
|
import org.apache.hadoop.hbase.metrics.histogram.MetricsHistogram;
|
||||||
|
import org.apache.hadoop.hbase.metrics.histogram.Snapshot;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.Strings;
|
import org.apache.hadoop.hbase.util.Strings;
|
||||||
|
@ -42,11 +50,6 @@ import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
|
||||||
import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
|
import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.lang.management.ManagementFactory;
|
|
||||||
import java.lang.management.MemoryUsage;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is for maintaining the various regionserver statistics
|
* This class is for maintaining the various regionserver statistics
|
||||||
* and publishing them through the metrics interfaces.
|
* and publishing them through the metrics interfaces.
|
||||||
|
@ -78,43 +81,51 @@ public class RegionServerMetrics implements Updater {
|
||||||
/**
|
/**
|
||||||
* Block cache size.
|
* Block cache size.
|
||||||
*/
|
*/
|
||||||
public final MetricsLongValue blockCacheSize = new MetricsLongValue("blockCacheSize", registry);
|
public final MetricsLongValue blockCacheSize =
|
||||||
|
new MetricsLongValue("blockCacheSize", registry);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Block cache free size.
|
* Block cache free size.
|
||||||
*/
|
*/
|
||||||
public final MetricsLongValue blockCacheFree = new MetricsLongValue("blockCacheFree", registry);
|
public final MetricsLongValue blockCacheFree =
|
||||||
|
new MetricsLongValue("blockCacheFree", registry);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Block cache item count.
|
* Block cache item count.
|
||||||
*/
|
*/
|
||||||
public final MetricsLongValue blockCacheCount = new MetricsLongValue("blockCacheCount", registry);
|
public final MetricsLongValue blockCacheCount =
|
||||||
|
new MetricsLongValue("blockCacheCount", registry);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Block cache hit count.
|
* Block cache hit count.
|
||||||
*/
|
*/
|
||||||
public final MetricsLongValue blockCacheHitCount = new MetricsLongValue("blockCacheHitCount", registry);
|
public final MetricsLongValue blockCacheHitCount =
|
||||||
|
new MetricsLongValue("blockCacheHitCount", registry);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Block cache miss count.
|
* Block cache miss count.
|
||||||
*/
|
*/
|
||||||
public final MetricsLongValue blockCacheMissCount = new MetricsLongValue("blockCacheMissCount", registry);
|
public final MetricsLongValue blockCacheMissCount =
|
||||||
|
new MetricsLongValue("blockCacheMissCount", registry);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Block cache evict count.
|
* Block cache evict count.
|
||||||
*/
|
*/
|
||||||
public final MetricsLongValue blockCacheEvictedCount = new MetricsLongValue("blockCacheEvictedCount", registry);
|
public final MetricsLongValue blockCacheEvictedCount =
|
||||||
|
new MetricsLongValue("blockCacheEvictedCount", registry);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Block hit ratio.
|
* Block hit ratio.
|
||||||
*/
|
*/
|
||||||
public final MetricsIntValue blockCacheHitRatio = new MetricsIntValue("blockCacheHitRatio", registry);
|
public final MetricsIntValue blockCacheHitRatio =
|
||||||
|
new MetricsIntValue("blockCacheHitRatio", registry);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Block hit caching ratio. This only includes the requests to the block
|
* Block hit caching ratio. This only includes the requests to the block
|
||||||
* cache where caching was turned on. See HBASE-2253.
|
* cache where caching was turned on. See HBASE-2253.
|
||||||
*/
|
*/
|
||||||
public final MetricsIntValue blockCacheHitCachingRatio = new MetricsIntValue("blockCacheHitCachingRatio", registry);
|
public final MetricsIntValue blockCacheHitCachingRatio =
|
||||||
|
new MetricsIntValue("blockCacheHitCachingRatio", registry);
|
||||||
|
|
||||||
/** Block hit ratio for past N periods. */
|
/** Block hit ratio for past N periods. */
|
||||||
public final MetricsIntValue blockCacheHitRatioPastNPeriods = new MetricsIntValue("blockCacheHitRatioPastNPeriods", registry);
|
public final MetricsIntValue blockCacheHitRatioPastNPeriods = new MetricsIntValue("blockCacheHitRatioPastNPeriods", registry);
|
||||||
|
@ -122,6 +133,25 @@ public class RegionServerMetrics implements Updater {
|
||||||
/** Block hit caching ratio for past N periods */
|
/** Block hit caching ratio for past N periods */
|
||||||
public final MetricsIntValue blockCacheHitCachingRatioPastNPeriods = new MetricsIntValue("blockCacheHitCachingRatioPastNPeriods", registry);
|
public final MetricsIntValue blockCacheHitCachingRatioPastNPeriods = new MetricsIntValue("blockCacheHitCachingRatioPastNPeriods", registry);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* a latency histogram on 'get' requests
|
||||||
|
*/
|
||||||
|
public final MetricsHistogram getLatencies =
|
||||||
|
new MetricsHistogram("getRequestLatency", registry);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* a latency histogram on 'delete' requests
|
||||||
|
*/
|
||||||
|
public final MetricsHistogram deleteLatencies =
|
||||||
|
new MetricsHistogram("deleteRequestLatency", registry);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* a latency histogram on 'put' requests
|
||||||
|
*/
|
||||||
|
public final MetricsHistogram putLatencies =
|
||||||
|
new MetricsHistogram("putRequestLatency", registry);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Count of requests to the regionservers since last call to metrics update
|
* Count of requests to the regionservers since last call to metrics update
|
||||||
*/
|
*/
|
||||||
|
@ -135,17 +165,20 @@ public class RegionServerMetrics implements Updater {
|
||||||
/**
|
/**
|
||||||
* Count of storefiles open on the regionserver.
|
* Count of storefiles open on the regionserver.
|
||||||
*/
|
*/
|
||||||
public final MetricsIntValue storefiles = new MetricsIntValue("storefiles", registry);
|
public final MetricsIntValue storefiles =
|
||||||
|
new MetricsIntValue("storefiles", registry);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Count of read requests
|
* Count of read requests
|
||||||
*/
|
*/
|
||||||
public final MetricsLongValue readRequestsCount = new MetricsLongValue("readRequestsCount", registry);
|
public final MetricsLongValue readRequestsCount =
|
||||||
|
new MetricsLongValue("readRequestsCount", registry);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Count of write requests
|
* Count of write requests
|
||||||
*/
|
*/
|
||||||
public final MetricsLongValue writeRequestsCount = new MetricsLongValue("writeRequestsCount", registry);
|
public final MetricsLongValue writeRequestsCount =
|
||||||
|
new MetricsLongValue("writeRequestsCount", registry);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@ -189,7 +222,26 @@ public class RegionServerMetrics implements Updater {
|
||||||
new MetricsIntValue("flushQueueSize", registry);
|
new MetricsIntValue("flushQueueSize", registry);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* filesystem sequential read latency
|
* filesystem sequential read latency distribution
|
||||||
|
*/
|
||||||
|
public final MetricsHistogram fsReadLatencyHistogram =
|
||||||
|
new MetricsHistogram("fsReadLatencyHistogram", registry);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* filesystem pread latency distribution
|
||||||
|
*/
|
||||||
|
public final MetricsHistogram fsPreadLatencyHistogram =
|
||||||
|
new MetricsHistogram("fsPreadLatencyHistogram", registry);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Metrics on the distribution of filesystem write latencies (improved version of fsWriteLatency)
|
||||||
|
*/
|
||||||
|
public final MetricsHistogram fsWriteLatencyHistogram =
|
||||||
|
new MetricsHistogram("fsWriteLatencyHistogram", registry);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* filesystem read latency
|
||||||
*/
|
*/
|
||||||
public final MetricsTimeVaryingRate fsReadLatency =
|
public final MetricsTimeVaryingRate fsReadLatency =
|
||||||
new MetricsTimeVaryingRate("fsReadLatency", registry);
|
new MetricsTimeVaryingRate("fsReadLatency", registry);
|
||||||
|
@ -218,6 +270,7 @@ public class RegionServerMetrics implements Updater {
|
||||||
public final MetricsTimeVaryingRate fsSyncLatency =
|
public final MetricsTimeVaryingRate fsSyncLatency =
|
||||||
new MetricsTimeVaryingRate("fsSyncLatency", registry);
|
new MetricsTimeVaryingRate("fsSyncLatency", registry);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* time each scheduled compaction takes
|
* time each scheduled compaction takes
|
||||||
*/
|
*/
|
||||||
|
@ -331,6 +384,10 @@ public class RegionServerMetrics implements Updater {
|
||||||
this.blockCacheHitRatioPastNPeriods.pushMetric(this.metricsRecord);
|
this.blockCacheHitRatioPastNPeriods.pushMetric(this.metricsRecord);
|
||||||
this.blockCacheHitCachingRatioPastNPeriods.pushMetric(this.metricsRecord);
|
this.blockCacheHitCachingRatioPastNPeriods.pushMetric(this.metricsRecord);
|
||||||
|
|
||||||
|
this.putLatencies.pushMetric(this.metricsRecord);
|
||||||
|
this.deleteLatencies.pushMetric(this.metricsRecord);
|
||||||
|
this.getLatencies.pushMetric(this.metricsRecord);
|
||||||
|
|
||||||
// Mix in HFile and HLog metrics
|
// Mix in HFile and HLog metrics
|
||||||
// Be careful. Here is code for MTVR from up in hadoop:
|
// Be careful. Here is code for MTVR from up in hadoop:
|
||||||
// public synchronized void inc(final int numOps, final long time) {
|
// public synchronized void inc(final int numOps, final long time) {
|
||||||
|
@ -361,11 +418,27 @@ public class RegionServerMetrics implements Updater {
|
||||||
* by compaction & flush metrics.
|
* by compaction & flush metrics.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
for(Long latency : HFile.getReadLatenciesNanos()) {
|
||||||
|
this.fsReadLatencyHistogram.update(latency);
|
||||||
|
}
|
||||||
|
for(Long latency : HFile.getPreadLatenciesNanos()) {
|
||||||
|
this.fsPreadLatencyHistogram.update(latency);
|
||||||
|
}
|
||||||
|
for(Long latency : HFile.getWriteLatenciesNanos()) {
|
||||||
|
this.fsWriteLatencyHistogram.update(latency);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// push the result
|
// push the result
|
||||||
this.fsPreadLatency.pushMetric(this.metricsRecord);
|
this.fsPreadLatency.pushMetric(this.metricsRecord);
|
||||||
this.fsReadLatency.pushMetric(this.metricsRecord);
|
this.fsReadLatency.pushMetric(this.metricsRecord);
|
||||||
this.fsWriteLatency.pushMetric(this.metricsRecord);
|
this.fsWriteLatency.pushMetric(this.metricsRecord);
|
||||||
this.fsWriteSize.pushMetric(this.metricsRecord);
|
this.fsWriteSize.pushMetric(this.metricsRecord);
|
||||||
|
|
||||||
|
this.fsReadLatencyHistogram.pushMetric(this.metricsRecord);
|
||||||
|
this.fsWriteLatencyHistogram.pushMetric(this.metricsRecord);
|
||||||
|
this.fsPreadLatencyHistogram.pushMetric(this.metricsRecord);
|
||||||
|
|
||||||
this.fsSyncLatency.pushMetric(this.metricsRecord);
|
this.fsSyncLatency.pushMetric(this.metricsRecord);
|
||||||
this.compactionTime.pushMetric(this.metricsRecord);
|
this.compactionTime.pushMetric(this.metricsRecord);
|
||||||
this.compactionSize.pushMetric(this.metricsRecord);
|
this.compactionSize.pushMetric(this.metricsRecord);
|
||||||
|
@ -498,6 +571,40 @@ public class RegionServerMetrics implements Updater {
|
||||||
Long.valueOf(this.hdfsBlocksLocalityIndex.get()));
|
Long.valueOf(this.hdfsBlocksLocalityIndex.get()));
|
||||||
sb = Strings.appendKeyValue(sb, "slowHLogAppendCount",
|
sb = Strings.appendKeyValue(sb, "slowHLogAppendCount",
|
||||||
Long.valueOf(this.slowHLogAppendCount.get()));
|
Long.valueOf(this.slowHLogAppendCount.get()));
|
||||||
|
sb = appendHistogram(sb, this.deleteLatencies);
|
||||||
|
sb = appendHistogram(sb, this.getLatencies);
|
||||||
|
sb = appendHistogram(sb, this.putLatencies);
|
||||||
|
sb = appendHistogram(sb, this.fsReadLatencyHistogram);
|
||||||
|
sb = appendHistogram(sb, this.fsPreadLatencyHistogram);
|
||||||
|
sb = appendHistogram(sb, this.fsWriteLatencyHistogram);
|
||||||
|
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private StringBuilder appendHistogram(StringBuilder sb,
|
||||||
|
MetricsHistogram histogram) {
|
||||||
|
sb = Strings.appendKeyValue(sb,
|
||||||
|
histogram.getName() + "Mean",
|
||||||
|
StringUtils.limitDecimalTo2(histogram.getMean()));
|
||||||
|
sb = Strings.appendKeyValue(sb,
|
||||||
|
histogram.getName() + "Count",
|
||||||
|
StringUtils.limitDecimalTo2(histogram.getCount()));
|
||||||
|
final Snapshot s = histogram.getSnapshot();
|
||||||
|
sb = Strings.appendKeyValue(sb,
|
||||||
|
histogram.getName() + "Median",
|
||||||
|
StringUtils.limitDecimalTo2(s.getMedian()));
|
||||||
|
sb = Strings.appendKeyValue(sb,
|
||||||
|
histogram.getName() + "75th",
|
||||||
|
StringUtils.limitDecimalTo2(s.get75thPercentile()));
|
||||||
|
sb = Strings.appendKeyValue(sb,
|
||||||
|
histogram.getName() + "95th",
|
||||||
|
StringUtils.limitDecimalTo2(s.get95thPercentile()));
|
||||||
|
sb = Strings.appendKeyValue(sb,
|
||||||
|
histogram.getName() + "99th",
|
||||||
|
StringUtils.limitDecimalTo2(s.get99thPercentile()));
|
||||||
|
sb = Strings.appendKeyValue(sb,
|
||||||
|
histogram.getName() + "999th",
|
||||||
|
StringUtils.limitDecimalTo2(s.get999thPercentile()));
|
||||||
|
return sb;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1877,4 +1877,4 @@ public class HLog implements Syncable {
|
||||||
System.exit(-1);
|
System.exit(-1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -25,6 +25,7 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -180,4 +181,24 @@ public class Threads {
|
||||||
boundedCachedThreadPool.allowCoreThreadTimeOut(true);
|
boundedCachedThreadPool.allowCoreThreadTimeOut(true);
|
||||||
return boundedCachedThreadPool;
|
return boundedCachedThreadPool;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a {@link java.util.concurrent.ThreadFactory} that names each
|
||||||
|
* created thread uniquely, with a common prefix.
|
||||||
|
*
|
||||||
|
* @param prefix The prefix of every created Thread's name
|
||||||
|
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
|
||||||
|
*/
|
||||||
|
public static ThreadFactory getNamedThreadFactory(final String prefix) {
|
||||||
|
return new ThreadFactory() {
|
||||||
|
|
||||||
|
private final AtomicInteger threadNumber = new AtomicInteger(1);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Thread newThread(Runnable r) {
|
||||||
|
return new Thread(r, prefix + threadNumber.getAndIncrement());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.metrics;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestExactCounterMetric {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBasic() {
|
||||||
|
final ExactCounterMetric counter = new ExactCounterMetric("testCounter", null);
|
||||||
|
for (int i = 1; i <= 10; i++) {
|
||||||
|
for (int j = 0; j < i; j++) {
|
||||||
|
counter.update(i + "");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Pair<String, Long>> topFive = counter.getTop(5);
|
||||||
|
Long i = 10L;
|
||||||
|
for (Pair<String, Long> entry : topFive) {
|
||||||
|
Assert.assertEquals(i + "", entry.getFirst());
|
||||||
|
Assert.assertEquals(i, entry.getSecond());
|
||||||
|
i--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,63 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.metrics;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.metrics.histogram.ExponentiallyDecayingSample;
|
||||||
|
import org.apache.hadoop.hbase.metrics.histogram.Snapshot;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestExponentiallyDecayingSample {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBasic() {
|
||||||
|
final ExponentiallyDecayingSample sample =
|
||||||
|
new ExponentiallyDecayingSample(100, 0.99);
|
||||||
|
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
sample.update(i);
|
||||||
|
}
|
||||||
|
Assert.assertEquals(100, sample.size());
|
||||||
|
|
||||||
|
final Snapshot snapshot = sample.getSnapshot();
|
||||||
|
Assert.assertEquals(100, snapshot.size());
|
||||||
|
|
||||||
|
for (double i : snapshot.getValues()) {
|
||||||
|
Assert.assertTrue(i >= 0.0 && i < 1000.0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTooBig() throws Exception {
|
||||||
|
final ExponentiallyDecayingSample sample =
|
||||||
|
new ExponentiallyDecayingSample(100, 0.99);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
sample.update(i);
|
||||||
|
}
|
||||||
|
Assert.assertEquals(10, sample.size());
|
||||||
|
|
||||||
|
final Snapshot snapshot = sample.getSnapshot();
|
||||||
|
Assert.assertEquals(10, sample.size());
|
||||||
|
|
||||||
|
for (double i : snapshot.getValues()) {
|
||||||
|
Assert.assertTrue(i >= 0.0 && i < 1000.0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,98 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.metrics;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.metrics.histogram.MetricsHistogram;
|
||||||
|
import org.apache.hadoop.hbase.metrics.histogram.Snapshot;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestMetricsHistogram {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBasicUniform() {
|
||||||
|
MetricsHistogram h = new MetricsHistogram("testHistogram", null);
|
||||||
|
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
h.update(i);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(100, h.getCount());
|
||||||
|
Assert.assertEquals(0, h.getMin());
|
||||||
|
Assert.assertEquals(99, h.getMax());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int safeIndex(int i, int len) {
|
||||||
|
if (i < len && i>= 0) {
|
||||||
|
return i;
|
||||||
|
} else if (i >= len) {
|
||||||
|
return len - 1;
|
||||||
|
} else {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRandom() {
|
||||||
|
final Random r = new Random();
|
||||||
|
final MetricsHistogram h = new MetricsHistogram("testHistogram", null);
|
||||||
|
|
||||||
|
final long[] data = new long[1000];
|
||||||
|
|
||||||
|
for (int i = 0; i < data.length; i++) {
|
||||||
|
data[i] = (long) (r.nextGaussian() * 10000.0);
|
||||||
|
h.update(data[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
final Snapshot s = h.getSnapshot();
|
||||||
|
Arrays.sort(data);
|
||||||
|
|
||||||
|
// as long as the histogram chooses an item with index N+/-slop, accept it
|
||||||
|
final int slop = 20;
|
||||||
|
|
||||||
|
// make sure the median, 75th percentile and 95th percentile are good
|
||||||
|
final int medianIndex = data.length / 2;
|
||||||
|
final long minAcceptableMedian = data[safeIndex(medianIndex - slop,
|
||||||
|
data.length)];
|
||||||
|
final long maxAcceptableMedian = data[safeIndex(medianIndex + slop,
|
||||||
|
data.length)];
|
||||||
|
Assert.assertTrue(s.getMedian() >= minAcceptableMedian
|
||||||
|
&& s.getMedian() <= maxAcceptableMedian);
|
||||||
|
|
||||||
|
final int seventyFifthIndex = (int) (data.length * 0.75);
|
||||||
|
final long minAcceptableseventyFifth = data[safeIndex(seventyFifthIndex
|
||||||
|
- slop, data.length)];
|
||||||
|
final long maxAcceptableseventyFifth = data[safeIndex(seventyFifthIndex
|
||||||
|
+ slop, data.length)];
|
||||||
|
Assert.assertTrue(s.get75thPercentile() >= minAcceptableseventyFifth
|
||||||
|
&& s.get75thPercentile() <= maxAcceptableseventyFifth);
|
||||||
|
|
||||||
|
final int ninetyFifthIndex = (int) (data.length * 0.95);
|
||||||
|
final long minAcceptableninetyFifth = data[safeIndex(ninetyFifthIndex
|
||||||
|
- slop, data.length)];
|
||||||
|
final long maxAcceptableninetyFifth = data[safeIndex(ninetyFifthIndex
|
||||||
|
+ slop, data.length)];
|
||||||
|
Assert.assertTrue(s.get95thPercentile() >= minAcceptableninetyFifth
|
||||||
|
&& s.get95thPercentile() <= maxAcceptableninetyFifth);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue