From 8ea1c8ddd6ad9f1663921871a4a1d5df75e5830d Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Mon, 26 Mar 2012 19:21:42 +0000 Subject: [PATCH] HBASE-5533 Add more metrics to HBase git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1305499 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/hbase/io/hfile/HFile.java | 101 +++++++- .../hadoop/hbase/io/hfile/HFileReaderV1.java | 15 +- .../hadoop/hbase/io/hfile/HFileReaderV2.java | 15 +- .../hadoop/hbase/io/hfile/HFileWriterV1.java | 5 +- .../hadoop/hbase/io/hfile/HFileWriterV2.java | 5 +- .../hbase/metrics/ExactCounterMetric.java | 154 ++++++++++++ .../ExponentiallyDecayingSample.java | 212 +++++++++++++++++ .../metrics/histogram/MetricsHistogram.java | 225 ++++++++++++++++++ .../hbase/metrics/histogram/Sample.java | 49 ++++ .../hbase/metrics/histogram/Snapshot.java | 166 +++++++++++++ .../metrics/histogram/UniformSample.java | 105 ++++++++ .../hbase/regionserver/HRegionServer.java | 23 +- .../metrics/RegionServerMetrics.java | 141 +++++++++-- .../hadoop/hbase/regionserver/wal/HLog.java | 2 +- .../org/apache/hadoop/hbase/util/Threads.java | 21 ++ .../hbase/metrics/TestExactCounterMetric.java | 47 ++++ .../TestExponentiallyDecayingSample.java | 63 +++++ .../hbase/metrics/TestMetricsHistogram.java | 98 ++++++++ 18 files changed, 1393 insertions(+), 54 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/hbase/metrics/ExactCounterMetric.java create mode 100644 src/main/java/org/apache/hadoop/hbase/metrics/histogram/ExponentiallyDecayingSample.java create mode 100644 src/main/java/org/apache/hadoop/hbase/metrics/histogram/MetricsHistogram.java create mode 100644 src/main/java/org/apache/hadoop/hbase/metrics/histogram/Sample.java create mode 100644 src/main/java/org/apache/hadoop/hbase/metrics/histogram/Snapshot.java create mode 100644 src/main/java/org/apache/hadoop/hbase/metrics/histogram/UniformSample.java create mode 100644 src/test/java/org/apache/hadoop/hbase/metrics/TestExactCounterMetric.java create mode 100644 src/test/java/org/apache/hadoop/hbase/metrics/TestExponentiallyDecayingSample.java create mode 100644 src/test/java/org/apache/hadoop/hbase/metrics/TestMetricsHistogram.java diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 3fde7bbd6b4..37bcabab8c5 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -24,8 +24,12 @@ import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -48,14 +52,15 @@ import org.apache.hadoop.hbase.io.HbaseMapWritable; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.SchemaAware; -import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Writable; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; /** * File format for hbase. @@ -165,18 +170,100 @@ public class HFile { public static final ChecksumType DEFAULT_CHECKSUM_TYPE = ChecksumType.CRC32; // For measuring latency of "sequential" reads and writes - static final AtomicInteger readOps = new AtomicInteger(); - static final AtomicLong readTimeNano = new AtomicLong(); - static final AtomicInteger writeOps = new AtomicInteger(); - static final AtomicLong writeTimeNano = new AtomicLong(); + private static final AtomicInteger readOps = new AtomicInteger(); + private static final AtomicLong readTimeNano = new AtomicLong(); + private static final AtomicInteger writeOps = new AtomicInteger(); + private static final AtomicLong writeTimeNano = new AtomicLong(); // For measuring latency of pread - static final AtomicInteger preadOps = new AtomicInteger(); - static final AtomicLong preadTimeNano = new AtomicLong(); + private static final AtomicInteger preadOps = new AtomicInteger(); + private static final AtomicLong preadTimeNano = new AtomicLong(); // For measuring number of checksum failures static final AtomicLong checksumFailures = new AtomicLong(); + // For getting more detailed stats on FS latencies + // If, for some reason, the metrics subsystem stops polling for latencies, + // I don't want data to pile up in a memory leak + // so, after LATENCY_BUFFER_SIZE items have been enqueued for processing, + // fs latency stats will be dropped (and this behavior will be logged) + private static final int LATENCY_BUFFER_SIZE = 5000; + private static final BlockingQueue fsReadLatenciesNanos = + new ArrayBlockingQueue(LATENCY_BUFFER_SIZE); + private static final BlockingQueue fsWriteLatenciesNanos = + new ArrayBlockingQueue(LATENCY_BUFFER_SIZE); + private static final BlockingQueue fsPreadLatenciesNanos = + new ArrayBlockingQueue(LATENCY_BUFFER_SIZE); + private static final AtomicLong lastLoggedDataDrop = new AtomicLong(0); + + // we don't want to fill up the logs with this message, so only log it + // once every 30 seconds at most + // I also want to avoid locks on the 'critical path' (the common case will be + // uncontended) - hence the CAS + private static void logDroppedLatencyStat() { + final long now = System.currentTimeMillis(); + final long earliestAcceptableLog = now - TimeUnit.SECONDS.toMillis(30L); + while (true) { + final long lastLog = lastLoggedDataDrop.get(); + if (lastLog < earliestAcceptableLog) { + if (lastLoggedDataDrop.compareAndSet(lastLog, now)) { + LOG.warn("Dropping fs latency stats since buffer is full"); + break; + } // otherwise (if the compaseAndSet failed) the while loop retries + } else { + break; + } + } + } + + public static final void offerReadLatency(long latencyNanos, boolean pread) { + boolean stored = false; + if (pread) { + stored = fsPreadLatenciesNanos.offer(latencyNanos); + preadOps.incrementAndGet(); + preadTimeNano.addAndGet(latencyNanos); + } else { + stored = fsReadLatenciesNanos.offer(latencyNanos); + readTimeNano.addAndGet(latencyNanos); + readOps.incrementAndGet(); + } + + if (!stored) { + logDroppedLatencyStat(); + } + } + + public static final void offerWriteLatency(long latencyNanos) { + final boolean stored = fsWriteLatenciesNanos.offer(latencyNanos); + if (!stored) { + logDroppedLatencyStat(); + } + + writeTimeNano.addAndGet(latencyNanos); + writeOps.incrementAndGet(); + } + + public static final Collection getReadLatenciesNanos() { + final List latencies = + Lists.newArrayListWithCapacity(fsReadLatenciesNanos.size()); + fsReadLatenciesNanos.drainTo(latencies); + return latencies; + } + + public static final Collection getPreadLatenciesNanos() { + final List latencies = + Lists.newArrayListWithCapacity(fsPreadLatenciesNanos.size()); + fsPreadLatenciesNanos.drainTo(latencies); + return latencies; + } + + public static final Collection getWriteLatenciesNanos() { + final List latencies = + Lists.newArrayListWithCapacity(fsWriteLatenciesNanos.size()); + fsWriteLatenciesNanos.drainTo(latencies); + return latencies; + } + // for test purpose public static volatile AtomicLong dataBlockReadCnt = new AtomicLong(0); diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java index efa4603236f..79ffee0fc67 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java @@ -249,9 +249,8 @@ public class HFileReaderV1 extends AbstractHFileReader { passSchemaMetricsTo(hfileBlock); hfileBlock.expectType(BlockType.META); - long delta = System.nanoTime() - startTimeNs; - HFile.preadTimeNano.addAndGet(delta); - HFile.preadOps.incrementAndGet(); + final long delta = System.nanoTime() - startTimeNs; + HFile.offerReadLatency(delta, true); getSchemaMetrics().updateOnCacheMiss(effectiveCategory, SchemaMetrics.NO_COMPACTION, delta); @@ -328,14 +327,8 @@ public class HFileReaderV1 extends AbstractHFileReader { passSchemaMetricsTo(hfileBlock); hfileBlock.expectType(BlockType.DATA); - long delta = System.nanoTime() - startTimeNs; - if (pread) { - HFile.preadTimeNano.addAndGet(delta); - HFile.preadOps.incrementAndGet(); - } else { - HFile.readTimeNano.addAndGet(delta); - HFile.readOps.incrementAndGet(); - } + final long delta = System.nanoTime() - startTimeNs; + HFile.offerReadLatency(delta, pread); getSchemaMetrics().updateOnCacheMiss(BlockCategory.DATA, isCompaction, delta); diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 8b780962337..aa99addf43f 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -236,9 +236,8 @@ public class HFileReaderV2 extends AbstractHFileReader { blockSize, -1, true); passSchemaMetricsTo(metaBlock); - long delta = System.nanoTime() - startTimeNs; - HFile.preadTimeNano.addAndGet(delta); - HFile.preadOps.incrementAndGet(); + final long delta = System.nanoTime() - startTimeNs; + HFile.offerReadLatency(delta, true); getSchemaMetrics().updateOnCacheMiss(BlockCategory.META, false, delta); // Cache the block @@ -335,14 +334,8 @@ public class HFileReaderV2 extends AbstractHFileReader { passSchemaMetricsTo(hfileBlock); BlockCategory blockCategory = hfileBlock.getBlockType().getCategory(); - long delta = System.nanoTime() - startTimeNs; - if (pread) { - HFile.preadTimeNano.addAndGet(delta); - HFile.preadOps.incrementAndGet(); - } else { - HFile.readTimeNano.addAndGet(delta); - HFile.readOps.incrementAndGet(); - } + final long delta = System.nanoTime() - startTimeNs; + HFile.offerReadLatency(delta, pread); getSchemaMetrics().updateOnCacheMiss(blockCategory, isCompaction, delta); // Cache the block if necessary diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java index 6d251d8337d..a9489075c19 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java @@ -141,9 +141,8 @@ public class HFileWriterV1 extends AbstractHFileWriter { blockDataSizes.add(Integer.valueOf(size)); this.totalUncompressedBytes += size; - HFile.writeTimeNano.addAndGet(System.nanoTime() - startTimeNs); - HFile.writeOps.incrementAndGet(); - + HFile.offerWriteLatency(System.nanoTime() - startTimeNs); + if (cacheConf.shouldCacheDataOnWrite()) { baosDos.flush(); // we do not do data block encoding on disk for HFile v1 diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index 76b77c53a80..d78badb7319 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -189,9 +189,8 @@ public class HFileWriterV2 extends AbstractHFileWriter { onDiskSize); totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); - HFile.writeTimeNano.addAndGet(System.nanoTime() - startTimeNs); - HFile.writeOps.incrementAndGet(); - + HFile.offerWriteLatency(System.nanoTime() - startTimeNs); + if (cacheConf.shouldCacheDataOnWrite()) { doCacheOnWrite(lastDataBlockOffset); } diff --git a/src/main/java/org/apache/hadoop/hbase/metrics/ExactCounterMetric.java b/src/main/java/org/apache/hadoop/hbase/metrics/ExactCounterMetric.java new file mode 100644 index 00000000000..40e29eb69ef --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/metrics/ExactCounterMetric.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.metrics; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.metrics.util.MetricsBase; +import org.apache.hadoop.metrics.util.MetricsRegistry; +import org.cliffc.high_scale_lib.Counter; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.collect.MapMaker; + +public class ExactCounterMetric extends MetricsBase { + + private static final int DEFAULT_TOP_N = 5; + + // only publish stats on the topN items (default to DEFAULT_TOP_N) + private final int topN; + private final Map counts; + + // all access to the 'counts' map should use this lock. + // take a write lock iff you want to guarantee exclusive access + // (the map stripes locks internally, so it's already thread safe - + // this lock is just so you can take a consistent snapshot of data) + private final ReadWriteLock lock; + + + /** + * Constructor to create a new counter metric + * @param nam the name to publish this metric under + * @param registry where the metrics object will be registered + * @param description metrics description + * @param topN how many 'keys' to publish metrics on + */ + public ExactCounterMetric(final String nam, final MetricsRegistry registry, + final String description, int topN) { + super(nam, description); + + this.counts = new MapMaker().makeComputingMap( + new Function() { + @Override + public Counter apply(String input) { + return new Counter(); + } + }); + + this.lock = new ReentrantReadWriteLock(); + this.topN = topN; + + if (registry != null) { + registry.add(nam, this); + } + } + + /** + * Constructor creates a new ExactCounterMetric + * @param nam the name of the metrics to be used to publish the metric + * @param registry where the metrics object will be registered + */ + public ExactCounterMetric(final String nam, MetricsRegistry registry) { + this(nam, registry, NO_DESCRIPTION, DEFAULT_TOP_N); + } + + + public void update(String type) { + this.lock.readLock().lock(); + try { + this.counts.get(type).increment(); + } finally { + this.lock.readLock().unlock(); + } + } + + public void update(String type, long count) { + this.lock.readLock().lock(); + try { + this.counts.get(type).add(count); + } finally { + this.lock.readLock().unlock(); + } + } + + public List> getTop(int n) { + final List> countsSnapshot = + Lists.newArrayListWithCapacity(this.counts.size()); + + // no updates are allowed while I'm holding this lock, so move fast + this.lock.writeLock().lock(); + try { + for(Entry entry : this.counts.entrySet()) { + countsSnapshot.add(Pair.newPair(entry.getKey(), + entry.getValue().get())); + } + } finally { + this.lock.writeLock().unlock(); + } + + Collections.sort(countsSnapshot, new Comparator>() { + @Override + public int compare(Pair a, Pair b) { + return b.getSecond().compareTo(a.getSecond()); + } + }); + + return countsSnapshot.subList(0, Math.min(n, countsSnapshot.size())); + } + + @Override + public void pushMetric(MetricsRecord mr) { + final List> topKeys = getTop(Integer.MAX_VALUE); + int sum = 0; + + int counter = 0; + for (Pair keyCount : topKeys) { + counter++; + // only push stats on the topN keys + if (counter <= this.topN) { + mr.setMetric(getName() + "_" + keyCount.getFirst(), + keyCount.getSecond()); + } + sum += keyCount.getSecond(); + } + mr.setMetric(getName() + "_map_size", this.counts.size()); + mr.setMetric(getName() + "_total_count", sum); + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/metrics/histogram/ExponentiallyDecayingSample.java b/src/main/java/org/apache/hadoop/hbase/metrics/histogram/ExponentiallyDecayingSample.java new file mode 100644 index 00000000000..e615b2123eb --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/metrics/histogram/ExponentiallyDecayingSample.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.metrics.histogram; + +import java.util.ArrayList; +import java.util.Random; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.hbase.util.Threads; + +/** + * An exponentially-decaying random sample of {@code long}s. + * Uses Cormode et al's forward-decaying priority reservoir sampling method + * to produce a statistically representative sample, exponentially biased + * towards newer entries. + * + * see Cormode et al. + * Forward Decay: A Practical Time Decay Model for Streaming Systems. ICDE '09 + */ +public class ExponentiallyDecayingSample implements Sample { + + private static final Random RANDOM = new Random(); + private static final long RESCALE_THRESHOLD = TimeUnit.HOURS.toNanos(1); + + private static final ScheduledExecutorService TICK_SERVICE = + Executors.newScheduledThreadPool(1, + Threads.getNamedThreadFactory("decayingSampleTick")); + + private static volatile long CURRENT_TICK = + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + + static { + // sample at twice our signal's frequency (1Hz) per the Nyquist theorem + TICK_SERVICE.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + CURRENT_TICK = + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + } + }, 0, 500, TimeUnit.MILLISECONDS); + } + + private final ConcurrentSkipListMap values = + new ConcurrentSkipListMap(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final AtomicLong count = new AtomicLong(0); + private final AtomicLong nextScaleTime = new AtomicLong(0); + + private final double alpha; + private final int reservoirSize; + private volatile long startTime; + + /** + * Constructor for an ExponentiallyDecayingSample. + * + * @param reservoirSize the number of samples to keep in the reservoir + * @param alpha the exponential decay factor; the higher this is, + * the more biased the sample will be towards newer + * values + */ + public ExponentiallyDecayingSample(int reservoirSize, double alpha) { + this.alpha = alpha; + this.reservoirSize = reservoirSize; + clear(); + } + + @Override + public void clear() { + lockForRescale(); + try { + values.clear(); + count.set(0); + this.startTime = CURRENT_TICK; + nextScaleTime.set(System.nanoTime() + RESCALE_THRESHOLD); + } finally { + unlockForRescale(); + } + } + + @Override + public int size() { + return (int) Math.min(reservoirSize, count.get()); + } + + @Override + public void update(long value) { + update(value, CURRENT_TICK); + } + + /** + * Adds an old value with a fixed timestamp to the sample. + * + * @param value the value to be added + * @param timestamp the epoch timestamp of {@code value} in seconds + */ + public void update(long value, long timestamp) { + lockForRegularUsage(); + try { + final double priority = weight(timestamp - startTime) + / RANDOM.nextDouble(); + final long newCount = count.incrementAndGet(); + if (newCount <= reservoirSize) { + values.put(priority, value); + } else { + Double first = values.firstKey(); + if (first < priority) { + if (values.putIfAbsent(priority, value) == null) { + // ensure we always remove an item + while (values.remove(first) == null) { + first = values.firstKey(); + } + } + } + } + } finally { + unlockForRegularUsage(); + } + + final long now = System.nanoTime(); + final long next = nextScaleTime.get(); + if (now >= next) { + rescale(now, next); + } + } + + @Override + public Snapshot getSnapshot() { + lockForRegularUsage(); + try { + return new Snapshot(values.values()); + } finally { + unlockForRegularUsage(); + } + } + + private double weight(long t) { + return Math.exp(alpha * t); + } + + /* "A common feature of the above techniques—indeed, the key technique that + * allows us to track the decayed weights efficiently—is that they maintain + * counts and other quantities based on g(ti − L), and only scale by g(t − L) + * at query time. But while g(ti −L)/g(t−L) is guaranteed to lie between zero + * and one, the intermediate values of g(ti − L) could become very large. For + * polynomial functions, these values should not grow too large, and should + * be effectively represented in practice by floating point values without + * loss of precision. For exponential functions, these values could grow + * quite large as new values of (ti − L) become large, and potentially + * exceed the capacity of common floating point types. However, since the + * values stored by the algorithms are linear combinations of g values + * (scaled sums), they can be rescaled relative to a new landmark. That is, + * by the analysis of exponential decay in Section III-A, the choice of L + * does not affect the final result. We can therefore multiply each value + * based on L by a factor of exp(−α(L′ − L)), and obtain the correct value + * as if we had instead computed relative to a new landmark L′ (and then use + * this new L′ at query time). This can be done with a linear pass over + * whatever data structure is being used." + */ + private void rescale(long now, long next) { + if (nextScaleTime.compareAndSet(next, now + RESCALE_THRESHOLD)) { + lockForRescale(); + try { + final long oldStartTime = startTime; + this.startTime = CURRENT_TICK; + final ArrayList keys = new ArrayList(values.keySet()); + for (Double key : keys) { + final Long value = values.remove(key); + values.put(key * Math.exp(-alpha * (startTime - oldStartTime)), + value); + } + } finally { + unlockForRescale(); + } + } + } + + private void unlockForRescale() { + lock.writeLock().unlock(); + } + + private void lockForRescale() { + lock.writeLock().lock(); + } + + private void lockForRegularUsage() { + lock.readLock().lock(); + } + + private void unlockForRegularUsage() { + lock.readLock().unlock(); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/metrics/histogram/MetricsHistogram.java b/src/main/java/org/apache/hadoop/hbase/metrics/histogram/MetricsHistogram.java new file mode 100644 index 00000000000..a78b0ce9256 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/metrics/histogram/MetricsHistogram.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.metrics.histogram; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.metrics.util.MetricsBase; +import org.apache.hadoop.metrics.util.MetricsRegistry; + +public class MetricsHistogram extends MetricsBase { + + // 1028 items implies 99.9% CI w/ 5% margin of error + // (assuming a normal distribution on the underlying data) + private static final int DEFAULT_SAMPLE_SIZE = 1028; + + // the bias towards sampling from more recent data. + // Per Cormode et al. an alpha of 0.015 strongly biases to the last 5 minutes + private static final double DEFAULT_ALPHA = 0.015; + + /** + * Constructor to create a new histogram metric + * @param nam the name to publish the metric under + * @param registry where the metrics object will be registered + * @param description the metric's description + * @param forwardBiased true if you want this histogram to give more + * weight to recent data, + * false if you want all data to have uniform weight + */ + public MetricsHistogram(final String nam, final MetricsRegistry registry, + final String description, boolean forwardBiased) { + super(nam, description); + + this.min = new AtomicLong(); + this.max = new AtomicLong(); + this.sum = new AtomicLong(); + this.sample = forwardBiased ? + new ExponentiallyDecayingSample(DEFAULT_SAMPLE_SIZE, DEFAULT_ALPHA) + : new UniformSample(DEFAULT_SAMPLE_SIZE); + + this.variance = new AtomicReference(new double[]{-1, 0}); + this.count = new AtomicLong(); + + this.clear(); + + if (registry != null) { + registry.add(nam, this); + } + } + + /** + * Constructor create a new (forward biased) histogram metric + * @param nam the name to publish the metric under + * @param registry where the metrics object will be registered + * @param description the metric's description + */ + public MetricsHistogram(final String nam, MetricsRegistry registry, + final String description) { + this(nam, registry, NO_DESCRIPTION, true); + } + + /** + * Constructor - create a new (forward biased) histogram metric + * @param nam the name of the metrics to be used to publish the metric + * @param registry - where the metrics object will be registered + */ + public MetricsHistogram(final String nam, MetricsRegistry registry) { + this(nam, registry, NO_DESCRIPTION); + } + + private final Sample sample; + private final AtomicLong min; + private final AtomicLong max; + private final AtomicLong sum; + + // these are for computing a running-variance, + // without letting floating point errors accumulate via Welford's algorithm + private final AtomicReference variance; + private final AtomicLong count; + + /** + * Clears all recorded values. + */ + public void clear() { + this.sample.clear(); + this.count.set(0); + this.max.set(Long.MIN_VALUE); + this.min.set(Long.MAX_VALUE); + this.sum.set(0); + variance.set(new double[]{-1, 0}); + } + + public void update(int val) { + update((long) val); + } + + public void update(final long val) { + count.incrementAndGet(); + sample.update(val); + setMax(val); + setMin(val); + sum.getAndAdd(val); + updateVariance(val); + } + + private void setMax(final long potentialMax) { + boolean done = false; + while (!done) { + final long currentMax = max.get(); + done = currentMax >= potentialMax + || max.compareAndSet(currentMax, potentialMax); + } + } + + private void setMin(long potentialMin) { + boolean done = false; + while (!done) { + final long currentMin = min.get(); + done = currentMin <= potentialMin + || min.compareAndSet(currentMin, potentialMin); + } + } + + private void updateVariance(long value) { + boolean done = false; + while (!done) { + final double[] oldValues = variance.get(); + final double[] newValues = new double[2]; + if (oldValues[0] == -1) { + newValues[0] = value; + newValues[1] = 0; + } else { + final double oldM = oldValues[0]; + final double oldS = oldValues[1]; + + final double newM = oldM + ((value - oldM) / getCount()); + final double newS = oldS + ((value - oldM) * (value - newM)); + + newValues[0] = newM; + newValues[1] = newS; + } + done = variance.compareAndSet(oldValues, newValues); + } + } + + + public long getCount() { + return count.get(); + } + + public long getMax() { + if (getCount() > 0) { + return max.get(); + } + return 0L; + } + + public long getMin() { + if (getCount() > 0) { + return min.get(); + } + return 0L; + } + + public double getMean() { + if (getCount() > 0) { + return sum.get() / (double) getCount(); + } + return 0.0; + } + + public double getStdDev() { + if (getCount() > 0) { + return Math.sqrt(getVariance()); + } + return 0.0; + } + + public Snapshot getSnapshot() { + return sample.getSnapshot(); + } + + private double getVariance() { + if (getCount() <= 1) { + return 0.0; + } + return variance.get()[1] / (getCount() - 1); + } + + @Override + public void pushMetric(MetricsRecord mr) { + final Snapshot s = this.getSnapshot(); + mr.setMetric(getName() + "_num_ops", this.getCount()); + mr.setMetric(getName() + "_min", this.getMin()); + mr.setMetric(getName() + "_max", this.getMax()); + + mr.setMetric(getName() + "_mean", (float) this.getMean()); + mr.setMetric(getName() + "_std_dev", (float) this.getStdDev()); + + mr.setMetric(getName() + "_median", (float) s.getMedian()); + mr.setMetric(getName() + "_75th_percentile", + (float) s.get75thPercentile()); + mr.setMetric(getName() + "_95th_percentile", + (float) s.get95thPercentile()); + mr.setMetric(getName() + "_99th_percentile", + (float) s.get99thPercentile()); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/metrics/histogram/Sample.java b/src/main/java/org/apache/hadoop/hbase/metrics/histogram/Sample.java new file mode 100644 index 00000000000..55b91d688b2 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/metrics/histogram/Sample.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.metrics.histogram; + +/** + * A statistically representative sample of items from a stream. + */ +public interface Sample { + /** + * Clears all recorded values. + */ + void clear(); + + /** + * Returns the number of values recorded. + * + * @return the number of values recorded + */ + int size(); + + /** + * Adds a new recorded value to the sample. + * + * @param value a new recorded value + */ + void update(long value); + + /** + * Returns a snapshot of the sample's values. + * + * @return a snapshot of the sample's values + */ + Snapshot getSnapshot(); +} diff --git a/src/main/java/org/apache/hadoop/hbase/metrics/histogram/Snapshot.java b/src/main/java/org/apache/hadoop/hbase/metrics/histogram/Snapshot.java new file mode 100644 index 00000000000..418b6fb1205 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/metrics/histogram/Snapshot.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.metrics.histogram; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Arrays; +import java.util.Collection; + +/** + * A snapshot of all the information seen in a Sample. + */ +public class Snapshot { + + private static final double MEDIAN_Q = 0.5; + private static final double P75_Q = 0.75; + private static final double P95_Q = 0.95; + private static final double P98_Q = 0.98; + private static final double P99_Q = 0.99; + private static final double P999_Q = 0.999; + + private final double[] values; + + /** + * Create a new {@link Snapshot} with the given values. + * + * @param values an unordered set of values in the sample + */ + public Snapshot(Collection values) { + final Object[] copy = values.toArray(); + this.values = new double[copy.length]; + for (int i = 0; i < copy.length; i++) { + this.values[i] = (Long) copy[i]; + } + Arrays.sort(this.values); + } + + /** + * Create a new {@link Snapshot} with the given values. + * + * @param values an unordered set of values in the sample + */ + public Snapshot(double[] values) { + this.values = new double[values.length]; + System.arraycopy(values, 0, this.values, 0, values.length); + Arrays.sort(this.values); + } + + /** + * Returns the value at the given quantile. + * + * @param quantile a given quantile, in [0..1] + * @return the value in the distribution at quantile + */ + public double getValue(double quantile) { + if (quantile < 0.0 || quantile > 1.0) { + throw new IllegalArgumentException(quantile + " is not in [0..1]"); + } + + if (values.length == 0) { + return 0.0; + } + + final double pos = quantile * (values.length + 1); + + if (pos < 1) { + return values[0]; + } + + if (pos >= values.length) { + return values[values.length - 1]; + } + + final double lower = values[(int) pos - 1]; + final double upper = values[(int) pos]; + return lower + (pos - Math.floor(pos)) * (upper - lower); + } + + /** + * Returns the number of values in the snapshot. + * + * @return the number of values in the snapshot + */ + public int size() { + return values.length; + } + + /** + * Returns the median value in the distribution. + * + * @return the median value in the distribution + */ + public double getMedian() { + return getValue(MEDIAN_Q); + } + + /** + * Returns the value at the 75th percentile in the distribution. + * + * @return the value at the 75th percentile in the distribution + */ + public double get75thPercentile() { + return getValue(P75_Q); + } + + /** + * Returns the value at the 95th percentile in the distribution. + * + * @return the value at the 95th percentile in the distribution + */ + public double get95thPercentile() { + return getValue(P95_Q); + } + + /** + * Returns the value at the 98th percentile in the distribution. + * + * @return the value at the 98th percentile in the distribution + */ + public double get98thPercentile() { + return getValue(P98_Q); + } + + /** + * Returns the value at the 99th percentile in the distribution. + * + * @return the value at the 99th percentile in the distribution + */ + public double get99thPercentile() { + return getValue(P99_Q); + } + + /** + * Returns the value at the 99.9th percentile in the distribution. + * + * @return the value at the 99.9th percentile in the distribution + */ + public double get999thPercentile() { + return getValue(P999_Q); + } + + /** + * Returns the entire set of values in the snapshot. + * + * @return the entire set of values in the snapshot + */ + public double[] getValues() { + return Arrays.copyOf(values, values.length); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/metrics/histogram/UniformSample.java b/src/main/java/org/apache/hadoop/hbase/metrics/histogram/UniformSample.java new file mode 100644 index 00000000000..74dd0f94567 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/metrics/histogram/UniformSample.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.metrics.histogram; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongArray; + +/** + * A random sample of a stream of longs. Uses Vitter's Algorithm R to produce a + * statistically representative sample. + * + * see: http://www.cs.umd.edu/~samir/498/vitter.pdf + */ +public class UniformSample implements Sample { + + private static final Random RANDOM = new Random(); + private static final int BITS_PER_LONG = 63; + + private final AtomicLong count = new AtomicLong(); + private final AtomicLongArray values; + + /** + * Creates a new UniformSample + * + * @param reservoirSize the number of samples to keep + */ + public UniformSample(int reservoirSize) { + this.values = new AtomicLongArray(reservoirSize); + clear(); + } + + @Override + public void clear() { + for (int i = 0; i < values.length(); i++) { + values.set(i, 0); + } + count.set(0); + } + + @Override + public int size() { + final long c = count.get(); + if (c > values.length()) { + return values.length(); + } + return (int) c; + } + + @Override + public void update(long value) { + final long c = count.incrementAndGet(); + if (c <= values.length()) { + values.set((int) c - 1, value); + } else { + final long r = nextLong(c); + if (r < values.length()) { + values.set((int) r, value); + } + } + } + + /** + * Get a pseudo-random long uniformly between 0 and n-1. Stolen from + * {@link java.util.Random#nextInt()}. + * + * @param n the bound + * @return a value select randomly from the range {@code [0..n)}. + */ + private static long nextLong(long n) { + long bits, val; + do { + bits = RANDOM.nextLong() & (~(1L << BITS_PER_LONG)); + val = bits % n; + } while (bits - val + (n - 1) < 0L); + return val; + } + + @Override + public Snapshot getSnapshot() { + final int s = size(); + final List copy = new ArrayList(s); + for (int i = 0; i < s; i++) { + copy.add(values.get(i)); + } + return new Snapshot(copy); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index e0af8fb2955..90cf3a3c92f 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1953,12 +1953,15 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, /** {@inheritDoc} */ public Result get(byte[] regionName, Get get) throws IOException { checkOpen(); + final long startTime = System.nanoTime(); requestCount.incrementAndGet(); try { HRegion region = getRegion(regionName); return region.get(get, getLockFromId(get.getLockId())); } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); + } finally { + this.metrics.getLatencies.update(System.nanoTime() - startTime); } } @@ -1990,6 +1993,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, throw new IllegalArgumentException("update has null row"); } + final long startTime = System.nanoTime(); checkOpen(); this.requestCount.incrementAndGet(); HRegion region = getRegion(regionName); @@ -2001,6 +2005,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, region.put(put, getLockFromId(put.getLockId()), writeToWAL); } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); + } finally { + this.metrics.putLatencies.update(System.nanoTime() - startTime); } } @@ -2008,6 +2014,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, throws IOException { checkOpen(); HRegion region = null; + int i = 0; + + final long startTime = System.nanoTime(); try { region = getRegion(regionName); if (!region.getRegionInfo().isMetaTable()) { @@ -2017,7 +2026,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, @SuppressWarnings("unchecked") Pair[] putsWithLocks = new Pair[puts.size()]; - int i = 0; for (Put p : puts) { Integer lock = getLockFromId(p.getLockId()); putsWithLocks[i++] = new Pair(p, lock); @@ -2033,6 +2041,14 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, return -1; } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); + } finally { + // going to count this as puts.size() PUTs for latency calculations + final long totalTime = System.nanoTime() - startTime; + final long putCount = i; + final long perPutTime = totalTime / putCount; + for (int request = 0; request < putCount; request++) { + this.metrics.putLatencies.update(perPutTime); + } } } @@ -2494,6 +2510,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, public void delete(final byte[] regionName, final Delete delete) throws IOException { checkOpen(); + final long startTime = System.nanoTime(); try { boolean writeToWAL = delete.getWriteToWAL(); this.requestCount.incrementAndGet(); @@ -2505,6 +2522,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, region.delete(delete, lid, writeToWAL); } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); + } finally { + this.metrics.deleteLatencies.update(System.nanoTime() - startTime); } } @@ -2522,10 +2541,12 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, int size = deletes.size(); Integer[] locks = new Integer[size]; for (Delete delete : deletes) { + final long startTime = System.nanoTime(); this.requestCount.incrementAndGet(); locks[i] = getLockFromId(delete.getLockId()); region.delete(delete, locks[i], delete.getWriteToWAL()); i++; + this.metrics.deleteLatencies.update(System.nanoTime() - startTime); } } catch (WrongRegionException ex) { LOG.debug("Batch deletes: " + i, ex); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java b/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java index e5e85b02bbd..da08532d973 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java @@ -19,13 +19,21 @@ */ package org.apache.hadoop.hbase.regionserver.metrics; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryUsage; +import java.util.List; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.metrics.ExactCounterMetric; import org.apache.hadoop.hbase.metrics.HBaseInfo; import org.apache.hadoop.hbase.metrics.MetricsRate; import org.apache.hadoop.hbase.metrics.PersistentMetricsTimeVaryingRate; +import org.apache.hadoop.hbase.metrics.histogram.MetricsHistogram; +import org.apache.hadoop.hbase.metrics.histogram.Snapshot; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Strings; @@ -42,11 +50,6 @@ import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate; import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong; import org.apache.hadoop.util.StringUtils; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryUsage; -import java.util.List; - /** * This class is for maintaining the various regionserver statistics * and publishing them through the metrics interfaces. @@ -78,43 +81,51 @@ public class RegionServerMetrics implements Updater { /** * Block cache size. */ - public final MetricsLongValue blockCacheSize = new MetricsLongValue("blockCacheSize", registry); + public final MetricsLongValue blockCacheSize = + new MetricsLongValue("blockCacheSize", registry); /** * Block cache free size. */ - public final MetricsLongValue blockCacheFree = new MetricsLongValue("blockCacheFree", registry); + public final MetricsLongValue blockCacheFree = + new MetricsLongValue("blockCacheFree", registry); /** * Block cache item count. */ - public final MetricsLongValue blockCacheCount = new MetricsLongValue("blockCacheCount", registry); + public final MetricsLongValue blockCacheCount = + new MetricsLongValue("blockCacheCount", registry); /** * Block cache hit count. */ - public final MetricsLongValue blockCacheHitCount = new MetricsLongValue("blockCacheHitCount", registry); + public final MetricsLongValue blockCacheHitCount = + new MetricsLongValue("blockCacheHitCount", registry); /** * Block cache miss count. */ - public final MetricsLongValue blockCacheMissCount = new MetricsLongValue("blockCacheMissCount", registry); + public final MetricsLongValue blockCacheMissCount = + new MetricsLongValue("blockCacheMissCount", registry); /** * Block cache evict count. */ - public final MetricsLongValue blockCacheEvictedCount = new MetricsLongValue("blockCacheEvictedCount", registry); + public final MetricsLongValue blockCacheEvictedCount = + new MetricsLongValue("blockCacheEvictedCount", registry); /** * Block hit ratio. */ - public final MetricsIntValue blockCacheHitRatio = new MetricsIntValue("blockCacheHitRatio", registry); + public final MetricsIntValue blockCacheHitRatio = + new MetricsIntValue("blockCacheHitRatio", registry); /** * Block hit caching ratio. This only includes the requests to the block * cache where caching was turned on. See HBASE-2253. */ - public final MetricsIntValue blockCacheHitCachingRatio = new MetricsIntValue("blockCacheHitCachingRatio", registry); + public final MetricsIntValue blockCacheHitCachingRatio = + new MetricsIntValue("blockCacheHitCachingRatio", registry); /** Block hit ratio for past N periods. */ public final MetricsIntValue blockCacheHitRatioPastNPeriods = new MetricsIntValue("blockCacheHitRatioPastNPeriods", registry); @@ -122,6 +133,25 @@ public class RegionServerMetrics implements Updater { /** Block hit caching ratio for past N periods */ public final MetricsIntValue blockCacheHitCachingRatioPastNPeriods = new MetricsIntValue("blockCacheHitCachingRatioPastNPeriods", registry); + /** + * a latency histogram on 'get' requests + */ + public final MetricsHistogram getLatencies = + new MetricsHistogram("getRequestLatency", registry); + + /** + * a latency histogram on 'delete' requests + */ + public final MetricsHistogram deleteLatencies = + new MetricsHistogram("deleteRequestLatency", registry); + + /** + * a latency histogram on 'put' requests + */ + public final MetricsHistogram putLatencies = + new MetricsHistogram("putRequestLatency", registry); + + /* * Count of requests to the regionservers since last call to metrics update */ @@ -135,17 +165,20 @@ public class RegionServerMetrics implements Updater { /** * Count of storefiles open on the regionserver. */ - public final MetricsIntValue storefiles = new MetricsIntValue("storefiles", registry); + public final MetricsIntValue storefiles = + new MetricsIntValue("storefiles", registry); /** * Count of read requests */ - public final MetricsLongValue readRequestsCount = new MetricsLongValue("readRequestsCount", registry); + public final MetricsLongValue readRequestsCount = + new MetricsLongValue("readRequestsCount", registry); /** * Count of write requests */ - public final MetricsLongValue writeRequestsCount = new MetricsLongValue("writeRequestsCount", registry); + public final MetricsLongValue writeRequestsCount = + new MetricsLongValue("writeRequestsCount", registry); /** */ @@ -189,7 +222,26 @@ public class RegionServerMetrics implements Updater { new MetricsIntValue("flushQueueSize", registry); /** - * filesystem sequential read latency + * filesystem sequential read latency distribution + */ + public final MetricsHistogram fsReadLatencyHistogram = + new MetricsHistogram("fsReadLatencyHistogram", registry); + + /** + * filesystem pread latency distribution + */ + public final MetricsHistogram fsPreadLatencyHistogram = + new MetricsHistogram("fsPreadLatencyHistogram", registry); + + /** + * Metrics on the distribution of filesystem write latencies (improved version of fsWriteLatency) + */ + public final MetricsHistogram fsWriteLatencyHistogram = + new MetricsHistogram("fsWriteLatencyHistogram", registry); + + + /** + * filesystem read latency */ public final MetricsTimeVaryingRate fsReadLatency = new MetricsTimeVaryingRate("fsReadLatency", registry); @@ -218,6 +270,7 @@ public class RegionServerMetrics implements Updater { public final MetricsTimeVaryingRate fsSyncLatency = new MetricsTimeVaryingRate("fsSyncLatency", registry); + /** * time each scheduled compaction takes */ @@ -331,6 +384,10 @@ public class RegionServerMetrics implements Updater { this.blockCacheHitRatioPastNPeriods.pushMetric(this.metricsRecord); this.blockCacheHitCachingRatioPastNPeriods.pushMetric(this.metricsRecord); + this.putLatencies.pushMetric(this.metricsRecord); + this.deleteLatencies.pushMetric(this.metricsRecord); + this.getLatencies.pushMetric(this.metricsRecord); + // Mix in HFile and HLog metrics // Be careful. Here is code for MTVR from up in hadoop: // public synchronized void inc(final int numOps, final long time) { @@ -361,11 +418,27 @@ public class RegionServerMetrics implements Updater { * by compaction & flush metrics. */ + for(Long latency : HFile.getReadLatenciesNanos()) { + this.fsReadLatencyHistogram.update(latency); + } + for(Long latency : HFile.getPreadLatenciesNanos()) { + this.fsPreadLatencyHistogram.update(latency); + } + for(Long latency : HFile.getWriteLatenciesNanos()) { + this.fsWriteLatencyHistogram.update(latency); + } + + // push the result this.fsPreadLatency.pushMetric(this.metricsRecord); this.fsReadLatency.pushMetric(this.metricsRecord); this.fsWriteLatency.pushMetric(this.metricsRecord); this.fsWriteSize.pushMetric(this.metricsRecord); + + this.fsReadLatencyHistogram.pushMetric(this.metricsRecord); + this.fsWriteLatencyHistogram.pushMetric(this.metricsRecord); + this.fsPreadLatencyHistogram.pushMetric(this.metricsRecord); + this.fsSyncLatency.pushMetric(this.metricsRecord); this.compactionTime.pushMetric(this.metricsRecord); this.compactionSize.pushMetric(this.metricsRecord); @@ -498,6 +571,40 @@ public class RegionServerMetrics implements Updater { Long.valueOf(this.hdfsBlocksLocalityIndex.get())); sb = Strings.appendKeyValue(sb, "slowHLogAppendCount", Long.valueOf(this.slowHLogAppendCount.get())); + sb = appendHistogram(sb, this.deleteLatencies); + sb = appendHistogram(sb, this.getLatencies); + sb = appendHistogram(sb, this.putLatencies); + sb = appendHistogram(sb, this.fsReadLatencyHistogram); + sb = appendHistogram(sb, this.fsPreadLatencyHistogram); + sb = appendHistogram(sb, this.fsWriteLatencyHistogram); + return sb.toString(); } + + private StringBuilder appendHistogram(StringBuilder sb, + MetricsHistogram histogram) { + sb = Strings.appendKeyValue(sb, + histogram.getName() + "Mean", + StringUtils.limitDecimalTo2(histogram.getMean())); + sb = Strings.appendKeyValue(sb, + histogram.getName() + "Count", + StringUtils.limitDecimalTo2(histogram.getCount())); + final Snapshot s = histogram.getSnapshot(); + sb = Strings.appendKeyValue(sb, + histogram.getName() + "Median", + StringUtils.limitDecimalTo2(s.getMedian())); + sb = Strings.appendKeyValue(sb, + histogram.getName() + "75th", + StringUtils.limitDecimalTo2(s.get75thPercentile())); + sb = Strings.appendKeyValue(sb, + histogram.getName() + "95th", + StringUtils.limitDecimalTo2(s.get95thPercentile())); + sb = Strings.appendKeyValue(sb, + histogram.getName() + "99th", + StringUtils.limitDecimalTo2(s.get99thPercentile())); + sb = Strings.appendKeyValue(sb, + histogram.getName() + "999th", + StringUtils.limitDecimalTo2(s.get999thPercentile())); + return sb; + } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 1c9202ad1ae..c3d13c31c66 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -1877,4 +1877,4 @@ public class HLog implements Syncable { System.exit(-1); } } -} +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/src/main/java/org/apache/hadoop/hbase/util/Threads.java index 5d23df98990..f23ada76660 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ b/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -25,6 +25,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -180,4 +181,24 @@ public class Threads { boundedCachedThreadPool.allowCoreThreadTimeOut(true); return boundedCachedThreadPool; } + + + /** + * Returns a {@link java.util.concurrent.ThreadFactory} that names each + * created thread uniquely, with a common prefix. + * + * @param prefix The prefix of every created Thread's name + * @return a {@link java.util.concurrent.ThreadFactory} that names threads + */ + public static ThreadFactory getNamedThreadFactory(final String prefix) { + return new ThreadFactory() { + + private final AtomicInteger threadNumber = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, prefix + threadNumber.getAndIncrement()); + } + }; + } } diff --git a/src/test/java/org/apache/hadoop/hbase/metrics/TestExactCounterMetric.java b/src/test/java/org/apache/hadoop/hbase/metrics/TestExactCounterMetric.java new file mode 100644 index 00000000000..c9e5b6e3af0 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/metrics/TestExactCounterMetric.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.metrics; + +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; + +public class TestExactCounterMetric { + + @Test + public void testBasic() { + final ExactCounterMetric counter = new ExactCounterMetric("testCounter", null); + for (int i = 1; i <= 10; i++) { + for (int j = 0; j < i; j++) { + counter.update(i + ""); + } + } + + List> topFive = counter.getTop(5); + Long i = 10L; + for (Pair entry : topFive) { + Assert.assertEquals(i + "", entry.getFirst()); + Assert.assertEquals(i, entry.getSecond()); + i--; + } + } +} diff --git a/src/test/java/org/apache/hadoop/hbase/metrics/TestExponentiallyDecayingSample.java b/src/test/java/org/apache/hadoop/hbase/metrics/TestExponentiallyDecayingSample.java new file mode 100644 index 00000000000..8b0153e1878 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/metrics/TestExponentiallyDecayingSample.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.metrics; + +import junit.framework.Assert; + +import org.apache.hadoop.hbase.metrics.histogram.ExponentiallyDecayingSample; +import org.apache.hadoop.hbase.metrics.histogram.Snapshot; +import org.junit.Test; + +public class TestExponentiallyDecayingSample { + + @Test + public void testBasic() { + final ExponentiallyDecayingSample sample = + new ExponentiallyDecayingSample(100, 0.99); + + for (int i = 0; i < 1000; i++) { + sample.update(i); + } + Assert.assertEquals(100, sample.size()); + + final Snapshot snapshot = sample.getSnapshot(); + Assert.assertEquals(100, snapshot.size()); + + for (double i : snapshot.getValues()) { + Assert.assertTrue(i >= 0.0 && i < 1000.0); + } + } + + @Test + public void testTooBig() throws Exception { + final ExponentiallyDecayingSample sample = + new ExponentiallyDecayingSample(100, 0.99); + for (int i = 0; i < 10; i++) { + sample.update(i); + } + Assert.assertEquals(10, sample.size()); + + final Snapshot snapshot = sample.getSnapshot(); + Assert.assertEquals(10, sample.size()); + + for (double i : snapshot.getValues()) { + Assert.assertTrue(i >= 0.0 && i < 1000.0); + } + } +} diff --git a/src/test/java/org/apache/hadoop/hbase/metrics/TestMetricsHistogram.java b/src/test/java/org/apache/hadoop/hbase/metrics/TestMetricsHistogram.java new file mode 100644 index 00000000000..4b1ae151d7e --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/metrics/TestMetricsHistogram.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.metrics; + +import java.util.Arrays; +import java.util.Random; + +import org.apache.hadoop.hbase.metrics.histogram.MetricsHistogram; +import org.apache.hadoop.hbase.metrics.histogram.Snapshot; +import org.junit.Assert; +import org.junit.Test; + +public class TestMetricsHistogram { + + @Test + public void testBasicUniform() { + MetricsHistogram h = new MetricsHistogram("testHistogram", null); + + for (int i = 0; i < 100; i++) { + h.update(i); + } + + Assert.assertEquals(100, h.getCount()); + Assert.assertEquals(0, h.getMin()); + Assert.assertEquals(99, h.getMax()); + } + + private static int safeIndex(int i, int len) { + if (i < len && i>= 0) { + return i; + } else if (i >= len) { + return len - 1; + } else { + return 0; + } + } + + @Test + public void testRandom() { + final Random r = new Random(); + final MetricsHistogram h = new MetricsHistogram("testHistogram", null); + + final long[] data = new long[1000]; + + for (int i = 0; i < data.length; i++) { + data[i] = (long) (r.nextGaussian() * 10000.0); + h.update(data[i]); + } + + final Snapshot s = h.getSnapshot(); + Arrays.sort(data); + + // as long as the histogram chooses an item with index N+/-slop, accept it + final int slop = 20; + + // make sure the median, 75th percentile and 95th percentile are good + final int medianIndex = data.length / 2; + final long minAcceptableMedian = data[safeIndex(medianIndex - slop, + data.length)]; + final long maxAcceptableMedian = data[safeIndex(medianIndex + slop, + data.length)]; + Assert.assertTrue(s.getMedian() >= minAcceptableMedian + && s.getMedian() <= maxAcceptableMedian); + + final int seventyFifthIndex = (int) (data.length * 0.75); + final long minAcceptableseventyFifth = data[safeIndex(seventyFifthIndex + - slop, data.length)]; + final long maxAcceptableseventyFifth = data[safeIndex(seventyFifthIndex + + slop, data.length)]; + Assert.assertTrue(s.get75thPercentile() >= minAcceptableseventyFifth + && s.get75thPercentile() <= maxAcceptableseventyFifth); + + final int ninetyFifthIndex = (int) (data.length * 0.95); + final long minAcceptableninetyFifth = data[safeIndex(ninetyFifthIndex + - slop, data.length)]; + final long maxAcceptableninetyFifth = data[safeIndex(ninetyFifthIndex + + slop, data.length)]; + Assert.assertTrue(s.get95thPercentile() >= minAcceptableninetyFifth + && s.get95thPercentile() <= maxAcceptableninetyFifth); + + } +}