From b5b0ac64a18ccefe1626ce985adf576549172911 Mon Sep 17 00:00:00 2001 From: Aaron Myers Date: Thu, 12 Jul 2012 01:31:40 +0000 Subject: [PATCH] HADOOP-8541. Better high-percentile latency metrics. Contributed by Andrew Wang. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1360501 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 2 + .../dev-support/findbugsExcludeFile.xml | 9 + .../hadoop/metrics2/lib/MetricsRegistry.java | 18 + .../hadoop/metrics2/lib/MutableQuantiles.java | 165 ++++++++++ .../apache/hadoop/metrics2/util/Quantile.java | 60 ++++ .../hadoop/metrics2/util/SampleQuantiles.java | 310 ++++++++++++++++++ .../metrics2/lib/TestMutableMetrics.java | 140 +++++++- .../metrics2/util/TestSampleQuantiles.java | 125 +++++++ 8 files changed, 824 insertions(+), 5 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/Quantile.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/SampleQuantiles.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/util/TestSampleQuantiles.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 9666156ae07..c6289846c25 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -252,6 +252,8 @@ Branch-2 ( Unreleased changes ) EOFException on Snappy or LZO block-compressed data (todd via harsh) + HADOOP-8541. Better high-percentile latency metrics. (Andrew Wang via atm) + BUG FIXES HADOOP-8372. NetUtils.normalizeHostName() incorrectly handles hostname diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml index c1c3cd28b17..0244955865a 100644 --- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml +++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml @@ -295,4 +295,13 @@ + + + + + + + diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsRegistry.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsRegistry.java index 20793e508f5..1c0d30e234f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsRegistry.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsRegistry.java @@ -180,6 +180,24 @@ public class MetricsRegistry { return ret; } + /** + * Create a mutable metric that estimates quantiles of a stream of values + * @param name of the metric + * @param desc metric description + * @param sampleName of the metric (e.g., "Ops") + * @param valueName of the metric (e.g., "Time" or "Latency") + * @param interval rollover interval of estimator in seconds + * @return a new quantile estimator object + */ + public synchronized MutableQuantiles newQuantiles(String name, String desc, + String sampleName, String valueName, int interval) { + checkMetricName(name); + MutableQuantiles ret = + new MutableQuantiles(name, desc, sampleName, valueName, interval); + metricsMap.put(name, ret); + return ret; + } + /** * Create a mutable metric with stats * @param name of the metric diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java new file mode 100644 index 00000000000..6830084c30b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.lib; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.util.Quantile; +import org.apache.hadoop.metrics2.util.SampleQuantiles; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Watches a stream of long values, maintaining online estimates of specific + * quantiles with provably low error bounds. This is particularly useful for + * accurate high-percentile (e.g. 95th, 99th) latency metrics. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class MutableQuantiles extends MutableMetric { + + static final Quantile[] quantiles = { new Quantile(0.50, 0.050), + new Quantile(0.75, 0.025), new Quantile(0.90, 0.010), + new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) }; + + private final MetricsInfo numInfo; + private final MetricsInfo[] quantileInfos; + private final int interval; + + private SampleQuantiles estimator; + private long previousCount = 0; + + @VisibleForTesting + protected Map previousSnapshot = null; + + private final ScheduledExecutorService scheduler = Executors + .newScheduledThreadPool(1); + + /** + * Instantiates a new {@link MutableQuantiles} for a metric that rolls itself + * over on the specified time interval. + * + * @param name + * of the metric + * @param description + * long-form textual description of the metric + * @param sampleName + * type of items in the stream (e.g., "Ops") + * @param valueName + * type of the values + * @param interval + * rollover interval (in seconds) of the estimator + */ + public MutableQuantiles(String name, String description, String sampleName, + String valueName, int interval) { + String ucName = StringUtils.capitalize(name); + String usName = StringUtils.capitalize(sampleName); + String uvName = StringUtils.capitalize(valueName); + String desc = StringUtils.uncapitalize(description); + String lsName = StringUtils.uncapitalize(sampleName); + String lvName = StringUtils.uncapitalize(valueName); + + numInfo = info(ucName + "Num" + usName, String.format( + "Number of %s for %s with %ds interval", lsName, desc, interval)); + // Construct the MetricsInfos for the quantiles, converting to percentiles + quantileInfos = new MetricsInfo[quantiles.length]; + String nameTemplate = ucName + "%dthPercentile" + interval + "sInterval" + + uvName; + String descTemplate = "%d percentile " + lvName + " with " + interval + + " second interval for " + desc; + for (int i = 0; i < quantiles.length; i++) { + int percentile = (int) (100 * quantiles[i].quantile); + quantileInfos[i] = info(String.format(nameTemplate, percentile), + String.format(descTemplate, percentile)); + } + + estimator = new SampleQuantiles(quantiles); + + this.interval = interval; + scheduler.scheduleAtFixedRate(new RolloverSample(this), interval, interval, + TimeUnit.SECONDS); + } + + @Override + public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) { + if (all || changed()) { + builder.addGauge(numInfo, previousCount); + for (int i = 0; i < quantiles.length; i++) { + long newValue = 0; + // If snapshot is null, we failed to update since the window was empty + if (previousSnapshot != null) { + newValue = previousSnapshot.get(quantiles[i]); + } + builder.addGauge(quantileInfos[i], newValue); + } + if (changed()) { + clearChanged(); + } + } + } + + public synchronized void add(long value) { + estimator.insert(value); + } + + public int getInterval() { + return interval; + } + + /** + * Runnable used to periodically roll over the internal + * {@link SampleQuantiles} every interval. + */ + private static class RolloverSample implements Runnable { + + MutableQuantiles parent; + + public RolloverSample(MutableQuantiles parent) { + this.parent = parent; + } + + @Override + public void run() { + synchronized (parent) { + try { + parent.previousCount = parent.estimator.getCount(); + parent.previousSnapshot = parent.estimator.snapshot(); + } catch (IOException e) { + // Couldn't get a new snapshot because the window was empty + parent.previousCount = 0; + parent.previousSnapshot = null; + } + parent.estimator.clear(); + } + parent.setChanged(); + } + + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/Quantile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/Quantile.java new file mode 100644 index 00000000000..18290557f0a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/Quantile.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.util; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Specifies a quantile (with error bounds) to be watched by a + * {@link SampleQuantiles} object. + */ +@InterfaceAudience.Private +public class Quantile { + public final double quantile; + public final double error; + + public Quantile(double quantile, double error) { + this.quantile = quantile; + this.error = error; + } + + @Override + public boolean equals(Object aThat) { + if (this == aThat) { + return true; + } + if (!(aThat instanceof Quantile)) { + return false; + } + + Quantile that = (Quantile) aThat; + + long qbits = Double.doubleToLongBits(quantile); + long ebits = Double.doubleToLongBits(error); + + return qbits == Double.doubleToLongBits(that.quantile) + && ebits == Double.doubleToLongBits(that.error); + } + + @Override + public int hashCode() { + return (int) (Double.doubleToLongBits(quantile) ^ Double + .doubleToLongBits(error)); + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/SampleQuantiles.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/SampleQuantiles.java new file mode 100644 index 00000000000..c0b14cc21e6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/SampleQuantiles.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.util; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.ListIterator; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm + * for streaming calculation of targeted high-percentile epsilon-approximate + * quantiles. + * + * This is a generalization of the earlier work by Greenwald and Khanna (GK), + * which essentially allows different error bounds on the targeted quantiles, + * which allows for far more efficient calculation of high-percentiles. + * + * See: Cormode, Korn, Muthukrishnan, and Srivastava + * "Effective Computation of Biased Quantiles over Data Streams" in ICDE 2005 + * + * Greenwald and Khanna, + * "Space-efficient online computation of quantile summaries" in SIGMOD 2001 + * + */ +@InterfaceAudience.Private +public class SampleQuantiles { + + /** + * Total number of items in stream + */ + private long count = 0; + + /** + * Current list of sampled items, maintained in sorted order with error bounds + */ + private LinkedList samples; + + /** + * Buffers incoming items to be inserted in batch. Items are inserted into + * the buffer linearly. When the buffer fills, it is flushed into the samples + * array in its entirety. + */ + private long[] buffer = new long[500]; + private int bufferCount = 0; + + /** + * Array of Quantiles that we care about, along with desired error. + */ + private final Quantile quantiles[]; + + public SampleQuantiles(Quantile[] quantiles) { + this.quantiles = quantiles; + this.samples = new LinkedList(); + } + + /** + * Specifies the allowable error for this rank, depending on which quantiles + * are being targeted. + * + * This is the f(r_i, n) function from the CKMS paper. It's basically how wide + * the range of this rank can be. + * + * @param rank + * the index in the list of samples + */ + private double allowableError(int rank) { + int size = samples.size(); + double minError = size + 1; + for (Quantile q : quantiles) { + double error; + if (rank <= q.quantile * size) { + error = (2.0 * q.error * (size - rank)) / (1.0 - q.quantile); + } else { + error = (2.0 * q.error * rank) / q.quantile; + } + if (error < minError) { + minError = error; + } + } + + return minError; + } + + /** + * Add a new value from the stream. + * + * @param v + */ + synchronized public void insert(long v) { + buffer[bufferCount] = v; + bufferCount++; + + count++; + + if (bufferCount == buffer.length) { + insertBatch(); + compress(); + } + } + + /** + * Merges items from buffer into the samples array in one pass. + * This is more efficient than doing an insert on every item. + */ + private void insertBatch() { + if (bufferCount == 0) { + return; + } + + Arrays.sort(buffer, 0, bufferCount); + + // Base case: no samples + int start = 0; + if (samples.size() == 0) { + SampleItem newItem = new SampleItem(buffer[0], 1, 0); + samples.add(newItem); + start++; + } + + ListIterator it = samples.listIterator(); + SampleItem item = it.next(); + for (int i = start; i < bufferCount; i++) { + long v = buffer[i]; + while (it.nextIndex() < samples.size() && item.value < v) { + item = it.next(); + } + // If we found that bigger item, back up so we insert ourselves before it + if (item.value > v) { + it.previous(); + } + // We use different indexes for the edge comparisons, because of the above + // if statement that adjusts the iterator + int delta; + if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) { + delta = 0; + } else { + delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1; + } + SampleItem newItem = new SampleItem(v, 1, delta); + it.add(newItem); + item = newItem; + } + + bufferCount = 0; + } + + /** + * Try to remove extraneous items from the set of sampled items. This checks + * if an item is unnecessary based on the desired error bounds, and merges it + * with the adjacent item if it is. + */ + private void compress() { + if (samples.size() < 2) { + return; + } + + ListIterator it = samples.listIterator(); + SampleItem prev = null; + SampleItem next = it.next(); + + while (it.hasNext()) { + prev = next; + next = it.next(); + if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) { + next.g += prev.g; + // Remove prev. it.remove() kills the last thing returned. + it.previous(); + it.previous(); + it.remove(); + // it.next() is now equal to next, skip it back forward again + it.next(); + } + } + } + + /** + * Get the estimated value at the specified quantile. + * + * @param quantile Queried quantile, e.g. 0.50 or 0.99. + * @return Estimated value at that quantile. + */ + private long query(double quantile) throws IOException { + if (samples.size() == 0) { + throw new IOException("No samples present"); + } + + int rankMin = 0; + int desired = (int) (quantile * count); + + for (int i = 1; i < samples.size(); i++) { + SampleItem prev = samples.get(i - 1); + SampleItem cur = samples.get(i); + + rankMin += prev.g; + + if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) { + return prev.value; + } + } + + // edge case of wanting max value + return samples.get(samples.size() - 1).value; + } + + /** + * Get a snapshot of the current values of all the tracked quantiles. + * + * @return snapshot of the tracked quantiles + * @throws IOException + * if no items have been added to the estimator + */ + synchronized public Map snapshot() throws IOException { + // flush the buffer first for best results + insertBatch(); + Map values = new HashMap(quantiles.length); + for (int i = 0; i < quantiles.length; i++) { + values.put(quantiles[i], query(quantiles[i].quantile)); + } + + return values; + } + + /** + * Returns the number of items that the estimator has processed + * + * @return count total number of items processed + */ + synchronized public long getCount() { + return count; + } + + /** + * Returns the number of samples kept by the estimator + * + * @return count current number of samples + */ + @VisibleForTesting + synchronized public int getSampleCount() { + return samples.size(); + } + + /** + * Resets the estimator, clearing out all previously inserted items + */ + synchronized public void clear() { + count = 0; + bufferCount = 0; + samples.clear(); + } + + /** + * Describes a measured value passed to the estimator, tracking additional + * metadata required by the CKMS algorithm. + */ + private static class SampleItem { + + /** + * Value of the sampled item (e.g. a measured latency value) + */ + public final long value; + + /** + * Difference between the lowest possible rank of the previous item, and + * the lowest possible rank of this item. + * + * The sum of the g of all previous items yields this item's lower bound. + */ + public int g; + + /** + * Difference between the item's greatest possible rank and lowest possible + * rank. + */ + public final int delta; + + public SampleItem(long value, int lowerDelta, int delta) { + this.value = value; + this.g = lowerDelta; + this.delta = delta; + } + + @Override + public String toString() { + return String.format("%d, %d, %d", value, g, delta); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java index 7718d0fd861..023facffe89 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java @@ -18,13 +18,24 @@ package org.apache.hadoop.metrics2.lib; -import org.junit.Test; -import static org.mockito.Mockito.*; -import static org.mockito.AdditionalMatchers.*; +import static org.apache.hadoop.metrics2.lib.Interns.info; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertGauge; +import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder; +import static org.mockito.AdditionalMatchers.eq; +import static org.mockito.AdditionalMatchers.geq; +import static org.mockito.AdditionalMatchers.leq; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.Map; +import java.util.Map.Entry; import org.apache.hadoop.metrics2.MetricsRecordBuilder; -import static org.apache.hadoop.metrics2.lib.Interns.*; -import static org.apache.hadoop.test.MetricsAsserts.*; +import org.apache.hadoop.metrics2.util.Quantile; +import org.junit.Test; /** * Test metrics record builder interface and mutable metrics @@ -103,4 +114,123 @@ public class TestMutableMetrics { assertCounter("BarNumOps", 0L, rb); assertGauge("BarAvgTime", 0.0, rb); } + + /** + * Ensure that quantile estimates from {@link MutableQuantiles} are within + * specified error bounds. + */ + @Test(timeout = 30000) + public void testMutableQuantilesError() throws Exception { + MetricsRecordBuilder mb = mockMetricsRecordBuilder(); + MetricsRegistry registry = new MetricsRegistry("test"); + // Use a 5s rollover period + MutableQuantiles quantiles = registry.newQuantiles("foo", "stat", "Ops", + "Latency", 5); + // Push some values in and wait for it to publish + long start = System.nanoTime() / 1000000; + for (long i = 1; i <= 1000; i++) { + quantiles.add(i); + quantiles.add(1001 - i); + } + long end = System.nanoTime() / 1000000; + + Thread.sleep(6000 - (end - start)); + + registry.snapshot(mb, false); + + // Print out the snapshot + Map previousSnapshot = quantiles.previousSnapshot; + for (Entry item : previousSnapshot.entrySet()) { + System.out.println(String.format("Quantile %.2f has value %d", + item.getKey().quantile, item.getValue())); + } + + // Verify the results are within our requirements + verify(mb).addGauge( + info("FooNumOps", "Number of ops for stat with 5s interval"), + (long) 2000); + Quantile[] quants = MutableQuantiles.quantiles; + String name = "Foo%dthPercentile5sIntervalLatency"; + String desc = "%d percentile latency with 5 second interval for stat"; + for (Quantile q : quants) { + int percentile = (int) (100 * q.quantile); + int error = (int) (1000 * q.error); + String n = String.format(name, percentile); + String d = String.format(desc, percentile); + long expected = (long) (q.quantile * 1000); + verify(mb).addGauge(eq(info(n, d)), leq(expected + error)); + verify(mb).addGauge(eq(info(n, d)), geq(expected - error)); + } + } + + /** + * Test that {@link MutableQuantiles} rolls the window over at the specified + * interval. + */ + @Test(timeout = 30000) + public void testMutableQuantilesRollover() throws Exception { + MetricsRecordBuilder mb = mockMetricsRecordBuilder(); + MetricsRegistry registry = new MetricsRegistry("test"); + // Use a 5s rollover period + MutableQuantiles quantiles = registry.newQuantiles("foo", "stat", "Ops", + "Latency", 5); + + Quantile[] quants = MutableQuantiles.quantiles; + String name = "Foo%dthPercentile5sIntervalLatency"; + String desc = "%d percentile latency with 5 second interval for stat"; + + // Push values for three intervals + long start = System.nanoTime() / 1000000; + for (int i = 1; i <= 3; i++) { + // Insert the values + for (long j = 1; j <= 1000; j++) { + quantiles.add(i); + } + // Sleep until 1s after the next 5s interval, to let the metrics + // roll over + long sleep = (start + (5000 * i) + 1000) - (System.nanoTime() / 1000000); + Thread.sleep(sleep); + // Verify that the window reset, check it has the values we pushed in + registry.snapshot(mb, false); + for (Quantile q : quants) { + int percentile = (int) (100 * q.quantile); + String n = String.format(name, percentile); + String d = String.format(desc, percentile); + verify(mb).addGauge(info(n, d), (long) i); + } + } + + // Verify the metrics were added the right number of times + verify(mb, times(3)).addGauge( + info("FooNumOps", "Number of ops for stat with 5s interval"), + (long) 1000); + for (Quantile q : quants) { + int percentile = (int) (100 * q.quantile); + String n = String.format(name, percentile); + String d = String.format(desc, percentile); + verify(mb, times(3)).addGauge(eq(info(n, d)), anyLong()); + } + } + + /** + * Test that {@link MutableQuantiles} rolls over correctly even if no items + * have been added to the window + */ + @Test(timeout = 30000) + public void testMutableQuantilesEmptyRollover() throws Exception { + MetricsRecordBuilder mb = mockMetricsRecordBuilder(); + MetricsRegistry registry = new MetricsRegistry("test"); + // Use a 5s rollover period + MutableQuantiles quantiles = registry.newQuantiles("foo", "stat", "Ops", + "Latency", 5); + + // Check it initially + quantiles.snapshot(mb, true); + verify(mb).addGauge( + info("FooNumOps", "Number of ops for stat with 5s interval"), (long) 0); + Thread.sleep(6000); + quantiles.snapshot(mb, false); + verify(mb, times(2)).addGauge( + info("FooNumOps", "Number of ops for stat with 5s interval"), (long) 0); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/util/TestSampleQuantiles.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/util/TestSampleQuantiles.java new file mode 100644 index 00000000000..39abb0b1ec2 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/util/TestSampleQuantiles.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Random; + +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Before; +import org.junit.Test; + +public class TestSampleQuantiles { + + static final Quantile[] quantiles = { new Quantile(0.50, 0.050), + new Quantile(0.75, 0.025), new Quantile(0.90, 0.010), + new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) }; + + SampleQuantiles estimator; + + @Before + public void init() { + estimator = new SampleQuantiles(quantiles); + } + + /** + * Check that the counts of the number of items in the window and sample are + * incremented correctly as items are added. + */ + @Test + public void testCount() throws IOException { + // Counts start off zero + assertEquals(estimator.getCount(), 0); + assertEquals(estimator.getSampleCount(), 0); + try { + estimator.snapshot(); + fail("Expected IOException from empty window"); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains("No samples", e); + } + + // Count increment correctly by 1 + estimator.insert(1337); + assertEquals(estimator.getCount(), 1); + estimator.snapshot(); + assertEquals(estimator.getSampleCount(), 1); + } + + /** + * Check that counts and quantile estimates are correctly reset after a call + * to {@link SampleQuantiles#clear()}. + */ + @Test + public void testClear() throws IOException { + for (int i = 0; i < 1000; i++) { + estimator.insert(i); + } + estimator.clear(); + assertEquals(estimator.getCount(), 0); + assertEquals(estimator.getSampleCount(), 0); + try { + estimator.snapshot(); + fail("Expected IOException for an empty window."); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains("No samples", e); + } + } + + /** + * Correctness test that checks that absolute error of the estimate is within + * specified error bounds for some randomly permuted streams of items. + */ + @Test + public void testQuantileError() throws IOException { + final int count = 100000; + Random r = new Random(0xDEADDEAD); + Long[] values = new Long[count]; + for (int i = 0; i < count; i++) { + values[i] = (long) (i + 1); + } + // Do 10 shuffle/insert/check cycles + for (int i = 0; i < 10; i++) { + System.out.println("Starting run " + i); + Collections.shuffle(Arrays.asList(values), r); + estimator.clear(); + for (int j = 0; j < count; j++) { + estimator.insert(values[j]); + } + Map snapshot; + snapshot = estimator.snapshot(); + for (Quantile q : quantiles) { + long actual = (long) (q.quantile * count); + long error = (long) (q.error * count); + long estimate = snapshot.get(q); + System.out + .println(String.format("Expected %d with error %d, estimated %d", + actual, error, estimate)); + assertTrue(estimate <= actual + error); + assertTrue(estimate >= actual - error); + } + } + } +}