HBASE-6409 Create histogram class for metrics 2
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1387358 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6bcd00145d
commit
053fc8e7ba
|
@ -55,6 +55,12 @@ public interface MasterMetricsSource extends BaseMetricsSource {
|
||||||
public static final String SERVER_NAME_NAME = "serverName";
|
public static final String SERVER_NAME_NAME = "serverName";
|
||||||
public static final String CLUSTER_ID_NAME = "clusterId";
|
public static final String CLUSTER_ID_NAME = "clusterId";
|
||||||
public static final String IS_ACTIVE_MASTER_NAME = "isActiveMaster";
|
public static final String IS_ACTIVE_MASTER_NAME = "isActiveMaster";
|
||||||
|
public static final String SPLIT_TIME_NAME = "hlogSplitTime";
|
||||||
|
public static final String SPLIT_SIZE_NAME = "hlogSplitSize";
|
||||||
|
public static final String CLUSTER_REQUESTS_NAME = "clusterRequests";
|
||||||
|
public static final String RIT_COUNT_NAME = "ritCount";
|
||||||
|
public static final String RIT_COUNT_OVER_THRESHOLD_NAME = "ritCountOverThreshold";
|
||||||
|
public static final String RIT_OLDEST_AGE_NAME = "ritOldestAge";
|
||||||
public static final String MASTER_ACTIVE_TIME_DESC = "Master Active Time";
|
public static final String MASTER_ACTIVE_TIME_DESC = "Master Active Time";
|
||||||
public static final String MASTER_START_TIME_DESC = "Master Start Time";
|
public static final String MASTER_START_TIME_DESC = "Master Start Time";
|
||||||
public static final String AVERAGE_LOAD_DESC = "AverageLoad";
|
public static final String AVERAGE_LOAD_DESC = "AverageLoad";
|
||||||
|
@ -64,6 +70,8 @@ public interface MasterMetricsSource extends BaseMetricsSource {
|
||||||
public static final String SERVER_NAME_DESC = "Server Name";
|
public static final String SERVER_NAME_DESC = "Server Name";
|
||||||
public static final String CLUSTER_ID_DESC = "Cluster Id";
|
public static final String CLUSTER_ID_DESC = "Cluster Id";
|
||||||
public static final String IS_ACTIVE_MASTER_DESC = "Is Active Master";
|
public static final String IS_ACTIVE_MASTER_DESC = "Is Active Master";
|
||||||
|
public static final String SPLIT_TIME_DESC = "Time it takes to finish HLog.splitLog()";
|
||||||
|
public static final String SPLIT_SIZE_DESC = "Size of HLog files being split";
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -90,4 +98,8 @@ public interface MasterMetricsSource extends BaseMetricsSource {
|
||||||
*/
|
*/
|
||||||
public void setRITOldestAge(long age);
|
public void setRITOldestAge(long age);
|
||||||
|
|
||||||
|
public void updateSplitTime(long time);
|
||||||
|
|
||||||
|
public void updateSplitSize(long size);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,6 +67,24 @@ public interface BaseMetricsSource {
|
||||||
*/
|
*/
|
||||||
public void incCounters(String counterName, long delta);
|
public void incCounters(String counterName, long delta);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add some value to a histogram.
|
||||||
|
*
|
||||||
|
* @param name the name of the histogram
|
||||||
|
* @param value the value to add to the histogram
|
||||||
|
*/
|
||||||
|
public void updateHistogram(String name, long value);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add some value to a Quantile (An accurate histogram).
|
||||||
|
*
|
||||||
|
* @param name the name of the quantile
|
||||||
|
* @param value the value to add to the quantile
|
||||||
|
*/
|
||||||
|
public void updateQuantile(String name, long value);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove a counter and stop announcing it to metrics2.
|
* Remove a counter and stop announcing it to metrics2.
|
||||||
*
|
*
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.metrics;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public interface MetricHistogram {
|
||||||
|
|
||||||
|
public static final String NUM_OPS_METRIC_NAME = "_num_ops";
|
||||||
|
public static final String MIN_METRIC_NAME = "_min";
|
||||||
|
public static final String MAX_METRIC_NAME = "_max";
|
||||||
|
public static final String MEAN_METRIC_NAME = "_mean";
|
||||||
|
public static final String STD_DEV_METRIC_NAME = "_std_dev";
|
||||||
|
public static final String MEDIAN_METRIC_NAME = "_median";
|
||||||
|
public static final String SEVENTY_FIFTH_PERCENTILE_METRIC_NAME = "_75th_percentile";
|
||||||
|
public static final String NINETY_FIFTH_PERCENTILE_METRIC_NAME = "_95th_percentile";
|
||||||
|
public static final String NINETY_NINETH_PERCENTILE_METRIC_NAME = "_99th_percentile";
|
||||||
|
|
||||||
|
public void add(long value);
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,32 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.metrics;
|
||||||
|
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public interface MetricsExecutor {
|
||||||
|
|
||||||
|
public ScheduledExecutorService getExecutor();
|
||||||
|
|
||||||
|
public void stop();
|
||||||
|
|
||||||
|
}
|
|
@ -93,6 +93,10 @@ limitations under the License.
|
||||||
</exclusion>
|
</exclusion>
|
||||||
</exclusions>
|
</exclusions>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.yammer.metrics</groupId>
|
||||||
|
<artifactId>metrics-core</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-test</artifactId>
|
<artifactId>hadoop-test</artifactId>
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.metrics2.MetricsBuilder;
|
||||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
|
import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
|
||||||
import org.apache.hadoop.metrics2.lib.MetricMutableGaugeLong;
|
import org.apache.hadoop.metrics2.lib.MetricMutableGaugeLong;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MetricMutableHistogram;
|
||||||
|
|
||||||
/** Hadoop1 implementation of MasterMetricsSource. */
|
/** Hadoop1 implementation of MasterMetricsSource. */
|
||||||
public class MasterMetricsSourceImpl
|
public class MasterMetricsSourceImpl
|
||||||
|
@ -38,6 +39,8 @@ public class MasterMetricsSourceImpl
|
||||||
MetricMutableGaugeLong ritOldestAgeGauge;
|
MetricMutableGaugeLong ritOldestAgeGauge;
|
||||||
|
|
||||||
private final MasterMetricsWrapper masterWrapper;
|
private final MasterMetricsWrapper masterWrapper;
|
||||||
|
private MetricMutableHistogram splitTimeHisto;
|
||||||
|
private MetricMutableHistogram splitSizeHisto;
|
||||||
|
|
||||||
public MasterMetricsSourceImpl(MasterMetricsWrapper masterWrapper) {
|
public MasterMetricsSourceImpl(MasterMetricsWrapper masterWrapper) {
|
||||||
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT, masterWrapper);
|
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT, masterWrapper);
|
||||||
|
@ -50,6 +53,12 @@ public class MasterMetricsSourceImpl
|
||||||
MasterMetricsWrapper masterWrapper) {
|
MasterMetricsWrapper masterWrapper) {
|
||||||
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
|
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
|
||||||
this.masterWrapper = masterWrapper;
|
this.masterWrapper = masterWrapper;
|
||||||
|
clusterRequestsCounter = metricsRegistry.newCounter(CLUSTER_REQUESTS_NAME, "", 0l);
|
||||||
|
ritGauge = metricsRegistry.newGauge(RIT_COUNT_NAME, "", 0l);
|
||||||
|
ritCountOverThresholdGauge = metricsRegistry.newGauge(RIT_COUNT_OVER_THRESHOLD_NAME, "", 0l);
|
||||||
|
ritOldestAgeGauge = metricsRegistry.newGauge(RIT_OLDEST_AGE_NAME, "", 0l);
|
||||||
|
splitTimeHisto = metricsRegistry.newHistogram(SPLIT_SIZE_NAME, SPLIT_SIZE_DESC);
|
||||||
|
splitSizeHisto = metricsRegistry.newHistogram(SPLIT_TIME_NAME, SPLIT_TIME_DESC);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -77,6 +86,16 @@ public class MasterMetricsSourceImpl
|
||||||
ritCountOverThresholdGauge.set(ritCount);
|
ritCountOverThresholdGauge.set(ritCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateSplitTime(long time) {
|
||||||
|
splitTimeHisto.add(time);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateSplitSize(long size) {
|
||||||
|
splitSizeHisto.add(size);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Method to export all the metrics.
|
* Method to export all the metrics.
|
||||||
*
|
*
|
||||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
|
||||||
import org.apache.hadoop.metrics2.lib.MetricMutable;
|
import org.apache.hadoop.metrics2.lib.MetricMutable;
|
||||||
import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
|
import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
|
||||||
import org.apache.hadoop.metrics2.lib.MetricMutableGaugeLong;
|
import org.apache.hadoop.metrics2.lib.MetricMutableGaugeLong;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MetricMutableHistogram;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MetricMutableQuantiles;
|
||||||
import org.apache.hadoop.metrics2.source.JvmMetricsSource;
|
import org.apache.hadoop.metrics2.source.JvmMetricsSource;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -128,6 +130,18 @@ public class BaseMetricsSourceImpl implements BaseMetricsSource, MetricsSource {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateHistogram(String name, long value) {
|
||||||
|
MetricMutableHistogram histo = metricsRegistry.getHistogram(name);
|
||||||
|
histo.add(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateQuantile(String name, long value) {
|
||||||
|
MetricMutableQuantiles histo = metricsRegistry.getQuantile(name);
|
||||||
|
histo.add(value);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove a named gauge.
|
* Remove a named gauge.
|
||||||
*
|
*
|
||||||
|
|
|
@ -177,6 +177,46 @@ public class DynamicMetricsRegistry {
|
||||||
return newStat(name, "", "ops", "time", false);
|
return newStat(name, "", "ops", "time", false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new histogram.
|
||||||
|
* @param name Name of the histogram.
|
||||||
|
* @return A new MutableHistogram
|
||||||
|
*/
|
||||||
|
public MetricMutableHistogram newHistogram(String name) {
|
||||||
|
return newHistogram(name, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new histogram.
|
||||||
|
* @param name The name of the histogram
|
||||||
|
* @param desc The description of the data in the histogram.
|
||||||
|
* @return A new MutableHistogram
|
||||||
|
*/
|
||||||
|
public MetricMutableHistogram newHistogram(String name, String desc) {
|
||||||
|
MetricMutableHistogram histo = new MetricMutableHistogram(name, desc);
|
||||||
|
return addNewMetricIfAbsent(name, histo, MetricMutableHistogram.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new MutableQuantile(A more accurate histogram).
|
||||||
|
* @param name The name of the histogram
|
||||||
|
* @return a new MutableQuantile
|
||||||
|
*/
|
||||||
|
public MetricMutableQuantiles newQuantile(String name) {
|
||||||
|
return newQuantile(name, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new MutableQuantile(A more accurate histogram).
|
||||||
|
* @param name The name of the histogram
|
||||||
|
* @param desc Description of the data.
|
||||||
|
* @return a new MutableQuantile
|
||||||
|
*/
|
||||||
|
public MetricMutableQuantiles newQuantile(String name, String desc) {
|
||||||
|
MetricMutableQuantiles histo = new MetricMutableQuantiles(name, desc);
|
||||||
|
return addNewMetricIfAbsent(name, histo, MetricMutableQuantiles.class);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the metrics context tag
|
* Set the metrics context tag
|
||||||
* @param name of the context
|
* @param name of the context
|
||||||
|
@ -277,7 +317,7 @@ public class DynamicMetricsRegistry {
|
||||||
if (metric == null) {
|
if (metric == null) {
|
||||||
|
|
||||||
//Create the potential new gauge.
|
//Create the potential new gauge.
|
||||||
MetricMutableGaugeLong newGauge = new MetricMutableGaugeLong(gaugeName, "",
|
MetricMutableGaugeLong newGauge = mf.newGauge(gaugeName, "",
|
||||||
potentialStartingValue);
|
potentialStartingValue);
|
||||||
|
|
||||||
// Try and put the gauge in. This is atomic.
|
// Try and put the gauge in. This is atomic.
|
||||||
|
@ -313,7 +353,7 @@ public class DynamicMetricsRegistry {
|
||||||
MetricMutable counter = metricsMap.get(counterName);
|
MetricMutable counter = metricsMap.get(counterName);
|
||||||
if (counter == null) {
|
if (counter == null) {
|
||||||
MetricMutableCounterLong newCounter =
|
MetricMutableCounterLong newCounter =
|
||||||
new MetricMutableCounterLong(counterName, "", potentialStartingValue);
|
mf.newCounter(counterName, "", potentialStartingValue);
|
||||||
counter = metricsMap.putIfAbsent(counterName, newCounter);
|
counter = metricsMap.putIfAbsent(counterName, newCounter);
|
||||||
if (counter == null) {
|
if (counter == null) {
|
||||||
return newCounter;
|
return newCounter;
|
||||||
|
@ -328,6 +368,46 @@ public class DynamicMetricsRegistry {
|
||||||
return (MetricMutableCounterLong) counter;
|
return (MetricMutableCounterLong) counter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public MetricMutableHistogram getHistogram(String histoName) {
|
||||||
|
//See getLongGauge for description on how this works.
|
||||||
|
MetricMutable histo = metricsMap.get(histoName);
|
||||||
|
if (histo == null) {
|
||||||
|
MetricMutableHistogram newHisto =
|
||||||
|
new MetricMutableHistogram(histoName, "");
|
||||||
|
histo = metricsMap.putIfAbsent(histoName, newHisto);
|
||||||
|
if (histo == null) {
|
||||||
|
return newHisto;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(histo instanceof MetricMutableHistogram)) {
|
||||||
|
throw new MetricsException("Metric already exists in registry for metric name: " +
|
||||||
|
name + "and not of type MetricMutableHistogram");
|
||||||
|
}
|
||||||
|
|
||||||
|
return (MetricMutableHistogram) histo;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MetricMutableQuantiles getQuantile(String histoName) {
|
||||||
|
//See getLongGauge for description on how this works.
|
||||||
|
MetricMutable histo = metricsMap.get(histoName);
|
||||||
|
if (histo == null) {
|
||||||
|
MetricMutableQuantiles newHisto =
|
||||||
|
new MetricMutableQuantiles(histoName, "");
|
||||||
|
histo = metricsMap.putIfAbsent(histoName, newHisto);
|
||||||
|
if (histo == null) {
|
||||||
|
return newHisto;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(histo instanceof MetricMutableQuantiles)) {
|
||||||
|
throw new MetricsException("Metric already exists in registry for metric name: " +
|
||||||
|
name + "and not of type MetricMutableQuantiles");
|
||||||
|
}
|
||||||
|
|
||||||
|
return (MetricMutableQuantiles) histo;
|
||||||
|
}
|
||||||
|
|
||||||
private<T extends MetricMutable> T
|
private<T extends MetricMutable> T
|
||||||
addNewMetricIfAbsent(String name,
|
addNewMetricIfAbsent(String name,
|
||||||
T ret,
|
T ret,
|
||||||
|
|
|
@ -0,0 +1,122 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.metrics2.lib;
|
||||||
|
|
||||||
|
import com.yammer.metrics.stats.ExponentiallyDecayingSample;
|
||||||
|
import com.yammer.metrics.stats.Sample;
|
||||||
|
import com.yammer.metrics.stats.Snapshot;
|
||||||
|
import org.apache.hadoop.metrics.MetricHistogram;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MetricMutable;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A histogram implementation that runs in constant space, and exports to hadoop's metrics2 system.
|
||||||
|
*/
|
||||||
|
public class MetricMutableHistogram extends MetricMutable implements MetricHistogram {
|
||||||
|
|
||||||
|
private static final int DEFAULT_SAMPLE_SIZE = 2046;
|
||||||
|
// the bias towards sampling from more recent data.
|
||||||
|
// Per Cormode et al. an alpha of 0.015 strongly biases to the last 5 minutes
|
||||||
|
private static final double DEFAULT_ALPHA = 0.015;
|
||||||
|
|
||||||
|
private final Sample sample;
|
||||||
|
private final AtomicLong min;
|
||||||
|
private final AtomicLong max;
|
||||||
|
private final AtomicLong sum;
|
||||||
|
private final AtomicLong count;
|
||||||
|
|
||||||
|
|
||||||
|
public MetricMutableHistogram(String name, String description) {
|
||||||
|
super(name, description);
|
||||||
|
sample = new ExponentiallyDecayingSample(DEFAULT_SAMPLE_SIZE, DEFAULT_ALPHA);
|
||||||
|
count = new AtomicLong();
|
||||||
|
min = new AtomicLong(Long.MAX_VALUE);
|
||||||
|
max = new AtomicLong(Long.MIN_VALUE);
|
||||||
|
sum = new AtomicLong();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void add(final long val) {
|
||||||
|
setChanged();
|
||||||
|
count.incrementAndGet();
|
||||||
|
sample.update(val);
|
||||||
|
setMax(val);
|
||||||
|
setMin(val);
|
||||||
|
sum.getAndAdd(val);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setMax(final long potentialMax) {
|
||||||
|
boolean done = false;
|
||||||
|
while (!done) {
|
||||||
|
final long currentMax = max.get();
|
||||||
|
done = currentMax >= potentialMax
|
||||||
|
|| max.compareAndSet(currentMax, potentialMax);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setMin(long potentialMin) {
|
||||||
|
boolean done = false;
|
||||||
|
while (!done) {
|
||||||
|
final long currentMin = min.get();
|
||||||
|
done = currentMin <= potentialMin
|
||||||
|
|| min.compareAndSet(currentMin, potentialMin);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMax() {
|
||||||
|
if (count.get() > 0) {
|
||||||
|
return max.get();
|
||||||
|
}
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMin() {
|
||||||
|
if (count.get() > 0) {
|
||||||
|
return min.get();
|
||||||
|
}
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getMean() {
|
||||||
|
long cCount = count.get();
|
||||||
|
if (cCount > 0) {
|
||||||
|
return sum.get() / (double) cCount;
|
||||||
|
}
|
||||||
|
return 0.0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void snapshot(MetricsRecordBuilder metricsRecordBuilder, boolean all) {
|
||||||
|
if (all || changed()) {
|
||||||
|
clearChanged();
|
||||||
|
final Snapshot s = sample.getSnapshot();
|
||||||
|
metricsRecordBuilder.addCounter(name + NUM_OPS_METRIC_NAME, "", count.get());
|
||||||
|
metricsRecordBuilder.addGauge(name + MIN_METRIC_NAME, "", getMin());
|
||||||
|
metricsRecordBuilder.addGauge(name + MAX_METRIC_NAME, "", getMax());
|
||||||
|
metricsRecordBuilder.addGauge(name + MEAN_METRIC_NAME, "", getMean());
|
||||||
|
|
||||||
|
metricsRecordBuilder.addGauge(name + MEDIAN_METRIC_NAME, "", s.getMedian());
|
||||||
|
metricsRecordBuilder.addGauge(name + SEVENTY_FIFTH_PERCENTILE_METRIC_NAME, "", s.get75thPercentile());
|
||||||
|
metricsRecordBuilder.addGauge(name + NINETY_FIFTH_PERCENTILE_METRIC_NAME, "", s.get95thPercentile());
|
||||||
|
metricsRecordBuilder.addGauge(name + NINETY_NINETH_PERCENTILE_METRIC_NAME, "", s.get99thPercentile());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,139 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.metrics2.lib;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.metrics.MetricHistogram;
|
||||||
|
import org.apache.hadoop.metrics.MetricsExecutor;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
|
import org.apache.hadoop.metrics2.util.MetricQuantile;
|
||||||
|
import org.apache.hadoop.metrics2.util.MetricSampleQuantiles;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Watches a stream of long values, maintaining online estimates of specific quantiles with provably
|
||||||
|
* low error bounds. This is particularly useful for accurate high-percentile (e.g. 95th, 99th)
|
||||||
|
* latency metrics.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class MetricMutableQuantiles extends MetricMutable implements MetricHistogram {
|
||||||
|
|
||||||
|
static final MetricQuantile[] quantiles = {new MetricQuantile(0.50, 0.050),
|
||||||
|
new MetricQuantile(0.75, 0.025), new MetricQuantile(0.90, 0.010),
|
||||||
|
new MetricQuantile(0.95, 0.005), new MetricQuantile(0.99, 0.001)};
|
||||||
|
|
||||||
|
static final String[] quantilesSuffix = {"_Median",
|
||||||
|
"_75th_percentile", "_90th_percentile",
|
||||||
|
"_95th_percentile", "_99th_percentile"};
|
||||||
|
|
||||||
|
private final int interval;
|
||||||
|
|
||||||
|
private MetricSampleQuantiles estimator;
|
||||||
|
private long previousCount = 0;
|
||||||
|
private MetricsExecutor executor;
|
||||||
|
|
||||||
|
protected Map<MetricQuantile, Long> previousSnapshot = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Instantiates a new {@link MetricMutableQuantiles} for a metric that rolls itself over on the
|
||||||
|
* specified time interval.
|
||||||
|
*
|
||||||
|
* @param name of the metric
|
||||||
|
* @param description long-form textual description of the metric
|
||||||
|
* @param sampleName type of items in the stream (e.g., "Ops")
|
||||||
|
* @param valueName type of the values
|
||||||
|
* @param interval rollover interval (in seconds) of the estimator
|
||||||
|
*/
|
||||||
|
public MetricMutableQuantiles(String name, String description, String sampleName,
|
||||||
|
String valueName, int interval) {
|
||||||
|
super(name, description);
|
||||||
|
|
||||||
|
estimator = new MetricSampleQuantiles(quantiles);
|
||||||
|
|
||||||
|
executor = new MetricsExecutorImpl();
|
||||||
|
|
||||||
|
this.interval = interval;
|
||||||
|
executor.getExecutor().scheduleAtFixedRate(new RolloverSample(this),
|
||||||
|
interval,
|
||||||
|
interval,
|
||||||
|
TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MetricMutableQuantiles(String name, String description) {
|
||||||
|
this(name, description, "Ops", "", 60);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
|
||||||
|
if (all || changed()) {
|
||||||
|
builder.addCounter(name + "NumOps", description, previousCount);
|
||||||
|
for (int i = 0; i < quantiles.length; i++) {
|
||||||
|
long newValue = 0;
|
||||||
|
// If snapshot is null, we failed to update since the window was empty
|
||||||
|
if (previousSnapshot != null) {
|
||||||
|
newValue = previousSnapshot.get(quantiles[i]);
|
||||||
|
}
|
||||||
|
builder.addGauge(name + quantilesSuffix[i], description, newValue);
|
||||||
|
}
|
||||||
|
if (changed()) {
|
||||||
|
clearChanged();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void add(long value) {
|
||||||
|
estimator.insert(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getInterval() {
|
||||||
|
return interval;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Runnable used to periodically roll over the internal {@link org.apache.hadoop.metrics2.util.MetricSampleQuantiles} every interval. */
|
||||||
|
private static class RolloverSample implements Runnable {
|
||||||
|
|
||||||
|
MetricMutableQuantiles parent;
|
||||||
|
|
||||||
|
public RolloverSample(MetricMutableQuantiles parent) {
|
||||||
|
this.parent = parent;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
synchronized (parent) {
|
||||||
|
try {
|
||||||
|
parent.previousCount = parent.estimator.getCount();
|
||||||
|
parent.previousSnapshot = parent.estimator.snapshot();
|
||||||
|
} catch (IOException e) {
|
||||||
|
// Couldn't get a new snapshot because the window was empty
|
||||||
|
parent.previousCount = 0;
|
||||||
|
parent.previousSnapshot = null;
|
||||||
|
}
|
||||||
|
parent.estimator.clear();
|
||||||
|
}
|
||||||
|
parent.setChanged();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,66 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.metrics2.lib;
|
||||||
|
|
||||||
|
import org.apache.hadoop.metrics.MetricsExecutor;
|
||||||
|
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to handle the ScheduledExecutorService{@link ScheduledExecutorService} used by MetricMutableQuantiles{@link MetricMutableQuantiles}
|
||||||
|
*/
|
||||||
|
public class MetricsExecutorImpl implements MetricsExecutor {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScheduledExecutorService getExecutor() {
|
||||||
|
return ExecutorSingleton.INSTANCE.scheduler;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
if (!getExecutor().isShutdown()) {
|
||||||
|
getExecutor().shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private enum ExecutorSingleton {
|
||||||
|
INSTANCE;
|
||||||
|
|
||||||
|
private final ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1, new ThreadPoolExecutorThreadFactory("HBase-Metrics2-"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class ThreadPoolExecutorThreadFactory implements ThreadFactory {
|
||||||
|
private final String name;
|
||||||
|
private final AtomicInteger threadNumber = new AtomicInteger(1);
|
||||||
|
|
||||||
|
private ThreadPoolExecutorThreadFactory(String name) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Thread newThread(Runnable runnable) {
|
||||||
|
Thread t = new Thread(runnable, name + threadNumber.getAndIncrement());
|
||||||
|
t.setDaemon(true);
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,60 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.metrics2.util;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specifies a quantile (with error bounds) to be watched by a
|
||||||
|
* {@link MetricSampleQuantiles} object.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class MetricQuantile {
|
||||||
|
public final double quantile;
|
||||||
|
public final double error;
|
||||||
|
|
||||||
|
public MetricQuantile(double quantile, double error) {
|
||||||
|
this.quantile = quantile;
|
||||||
|
this.error = error;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object aThat) {
|
||||||
|
if (this == aThat) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (!(aThat instanceof MetricQuantile)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
MetricQuantile that = (MetricQuantile) aThat;
|
||||||
|
|
||||||
|
long qbits = Double.doubleToLongBits(quantile);
|
||||||
|
long ebits = Double.doubleToLongBits(error);
|
||||||
|
|
||||||
|
return qbits == Double.doubleToLongBits(that.quantile)
|
||||||
|
&& ebits == Double.doubleToLongBits(that.error);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return (int) (Double.doubleToLongBits(quantile) ^ Double
|
||||||
|
.doubleToLongBits(error));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,307 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.metrics2.util;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.ListIterator;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm
|
||||||
|
* for streaming calculation of targeted high-percentile epsilon-approximate
|
||||||
|
* quantiles.
|
||||||
|
*
|
||||||
|
* This is a generalization of the earlier work by Greenwald and Khanna (GK),
|
||||||
|
* which essentially allows different error bounds on the targeted quantiles,
|
||||||
|
* which allows for far more efficient calculation of high-percentiles.
|
||||||
|
*
|
||||||
|
* See: Cormode, Korn, Muthukrishnan, and Srivastava
|
||||||
|
* "Effective Computation of Biased Quantiles over Data Streams" in ICDE 2005
|
||||||
|
*
|
||||||
|
* Greenwald and Khanna,
|
||||||
|
* "Space-efficient online computation of quantile summaries" in SIGMOD 2001
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class MetricSampleQuantiles {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Total number of items in stream
|
||||||
|
*/
|
||||||
|
private long count = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Current list of sampled items, maintained in sorted order with error bounds
|
||||||
|
*/
|
||||||
|
private LinkedList<SampleItem> samples;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Buffers incoming items to be inserted in batch. Items are inserted into
|
||||||
|
* the buffer linearly. When the buffer fills, it is flushed into the samples
|
||||||
|
* array in its entirety.
|
||||||
|
*/
|
||||||
|
private long[] buffer = new long[500];
|
||||||
|
private int bufferCount = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Array of Quantiles that we care about, along with desired error.
|
||||||
|
*/
|
||||||
|
private final MetricQuantile quantiles[];
|
||||||
|
|
||||||
|
public MetricSampleQuantiles(MetricQuantile[] quantiles) {
|
||||||
|
this.quantiles = quantiles;
|
||||||
|
this.samples = new LinkedList<SampleItem>();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specifies the allowable error for this rank, depending on which quantiles
|
||||||
|
* are being targeted.
|
||||||
|
*
|
||||||
|
* This is the f(r_i, n) function from the CKMS paper. It's basically how wide
|
||||||
|
* the range of this rank can be.
|
||||||
|
*
|
||||||
|
* @param rank
|
||||||
|
* the index in the list of samples
|
||||||
|
*/
|
||||||
|
private double allowableError(int rank) {
|
||||||
|
int size = samples.size();
|
||||||
|
double minError = size + 1;
|
||||||
|
for (MetricQuantile q : quantiles) {
|
||||||
|
double error;
|
||||||
|
if (rank <= q.quantile * size) {
|
||||||
|
error = (2.0 * q.error * (size - rank)) / (1.0 - q.quantile);
|
||||||
|
} else {
|
||||||
|
error = (2.0 * q.error * rank) / q.quantile;
|
||||||
|
}
|
||||||
|
if (error < minError) {
|
||||||
|
minError = error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return minError;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a new value from the stream.
|
||||||
|
*
|
||||||
|
* @param v
|
||||||
|
*/
|
||||||
|
synchronized public void insert(long v) {
|
||||||
|
buffer[bufferCount] = v;
|
||||||
|
bufferCount++;
|
||||||
|
|
||||||
|
count++;
|
||||||
|
|
||||||
|
if (bufferCount == buffer.length) {
|
||||||
|
insertBatch();
|
||||||
|
compress();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merges items from buffer into the samples array in one pass.
|
||||||
|
* This is more efficient than doing an insert on every item.
|
||||||
|
*/
|
||||||
|
private void insertBatch() {
|
||||||
|
if (bufferCount == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Arrays.sort(buffer, 0, bufferCount);
|
||||||
|
|
||||||
|
// Base case: no samples
|
||||||
|
int start = 0;
|
||||||
|
if (samples.size() == 0) {
|
||||||
|
SampleItem newItem = new SampleItem(buffer[0], 1, 0);
|
||||||
|
samples.add(newItem);
|
||||||
|
start++;
|
||||||
|
}
|
||||||
|
|
||||||
|
ListIterator<SampleItem> it = samples.listIterator();
|
||||||
|
SampleItem item = it.next();
|
||||||
|
for (int i = start; i < bufferCount; i++) {
|
||||||
|
long v = buffer[i];
|
||||||
|
while (it.nextIndex() < samples.size() && item.value < v) {
|
||||||
|
item = it.next();
|
||||||
|
}
|
||||||
|
// If we found that bigger item, back up so we insert ourselves before it
|
||||||
|
if (item.value > v) {
|
||||||
|
it.previous();
|
||||||
|
}
|
||||||
|
// We use different indexes for the edge comparisons, because of the above
|
||||||
|
// if statement that adjusts the iterator
|
||||||
|
int delta;
|
||||||
|
if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) {
|
||||||
|
delta = 0;
|
||||||
|
} else {
|
||||||
|
delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1;
|
||||||
|
}
|
||||||
|
SampleItem newItem = new SampleItem(v, 1, delta);
|
||||||
|
it.add(newItem);
|
||||||
|
item = newItem;
|
||||||
|
}
|
||||||
|
|
||||||
|
bufferCount = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try to remove extraneous items from the set of sampled items. This checks
|
||||||
|
* if an item is unnecessary based on the desired error bounds, and merges it
|
||||||
|
* with the adjacent item if it is.
|
||||||
|
*/
|
||||||
|
private void compress() {
|
||||||
|
if (samples.size() < 2) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ListIterator<SampleItem> it = samples.listIterator();
|
||||||
|
SampleItem prev = null;
|
||||||
|
SampleItem next = it.next();
|
||||||
|
|
||||||
|
while (it.hasNext()) {
|
||||||
|
prev = next;
|
||||||
|
next = it.next();
|
||||||
|
if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) {
|
||||||
|
next.g += prev.g;
|
||||||
|
// Remove prev. it.remove() kills the last thing returned.
|
||||||
|
it.previous();
|
||||||
|
it.previous();
|
||||||
|
it.remove();
|
||||||
|
// it.next() is now equal to next, skip it back forward again
|
||||||
|
it.next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the estimated value at the specified quantile.
|
||||||
|
*
|
||||||
|
* @param quantile Queried quantile, e.g. 0.50 or 0.99.
|
||||||
|
* @return Estimated value at that quantile.
|
||||||
|
*/
|
||||||
|
private long query(double quantile) throws IOException {
|
||||||
|
if (samples.size() == 0) {
|
||||||
|
throw new IOException("No samples present");
|
||||||
|
}
|
||||||
|
|
||||||
|
int rankMin = 0;
|
||||||
|
int desired = (int) (quantile * count);
|
||||||
|
|
||||||
|
for (int i = 1; i < samples.size(); i++) {
|
||||||
|
SampleItem prev = samples.get(i - 1);
|
||||||
|
SampleItem cur = samples.get(i);
|
||||||
|
|
||||||
|
rankMin += prev.g;
|
||||||
|
|
||||||
|
if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) {
|
||||||
|
return prev.value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// edge case of wanting max value
|
||||||
|
return samples.get(samples.size() - 1).value;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a snapshot of the current values of all the tracked quantiles.
|
||||||
|
*
|
||||||
|
* @return snapshot of the tracked quantiles
|
||||||
|
* @throws IOException
|
||||||
|
* if no items have been added to the estimator
|
||||||
|
*/
|
||||||
|
synchronized public Map<MetricQuantile, Long> snapshot() throws IOException {
|
||||||
|
// flush the buffer first for best results
|
||||||
|
insertBatch();
|
||||||
|
Map<MetricQuantile, Long> values = new HashMap<MetricQuantile, Long>(quantiles.length);
|
||||||
|
for (int i = 0; i < quantiles.length; i++) {
|
||||||
|
values.put(quantiles[i], query(quantiles[i].quantile));
|
||||||
|
}
|
||||||
|
|
||||||
|
return values;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of items that the estimator has processed
|
||||||
|
*
|
||||||
|
* @return count total number of items processed
|
||||||
|
*/
|
||||||
|
synchronized public long getCount() {
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of samples kept by the estimator
|
||||||
|
*
|
||||||
|
* @return count current number of samples
|
||||||
|
*/
|
||||||
|
synchronized public int getSampleCount() {
|
||||||
|
return samples.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resets the estimator, clearing out all previously inserted items
|
||||||
|
*/
|
||||||
|
synchronized public void clear() {
|
||||||
|
count = 0;
|
||||||
|
bufferCount = 0;
|
||||||
|
samples.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Describes a measured value passed to the estimator, tracking additional
|
||||||
|
* metadata required by the CKMS algorithm.
|
||||||
|
*/
|
||||||
|
private static class SampleItem {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Value of the sampled item (e.g. a measured latency value)
|
||||||
|
*/
|
||||||
|
public final long value;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Difference between the lowest possible rank of the previous item, and
|
||||||
|
* the lowest possible rank of this item.
|
||||||
|
*
|
||||||
|
* The sum of the g of all previous items yields this item's lower bound.
|
||||||
|
*/
|
||||||
|
public int g;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Difference between the item's greatest possible rank and lowest possible
|
||||||
|
* rank.
|
||||||
|
*/
|
||||||
|
public final int delta;
|
||||||
|
|
||||||
|
public SampleItem(long value, int lowerDelta, int delta) {
|
||||||
|
this.value = value;
|
||||||
|
this.g = lowerDelta;
|
||||||
|
this.delta = delta;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format("%d, %d, %d", value, g, delta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -134,6 +134,10 @@ limitations under the License.
|
||||||
<artifactId>hadoop-common</artifactId>
|
<artifactId>hadoop-common</artifactId>
|
||||||
<version>${hadoop-two.version}</version>
|
<version>${hadoop-two.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.yammer.metrics</groupId>
|
||||||
|
<artifactId>metrics-core</artifactId>
|
||||||
|
</dependency>
|
||||||
<!-- This was marked as test dep in earlier pom, but was scoped compile. Where
|
<!-- This was marked as test dep in earlier pom, but was scoped compile. Where
|
||||||
do we actually need it? -->
|
do we actually need it? -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|
|
@ -24,16 +24,20 @@ import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
import org.apache.hadoop.metrics2.lib.Interns;
|
import org.apache.hadoop.metrics2.lib.Interns;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableHistogram;
|
||||||
|
|
||||||
/** Hadoop2 implementation of MasterMetricsSource. */
|
/** Hadoop2 implementation of MasterMetricsSource. */
|
||||||
public class MasterMetricsSourceImpl
|
public class MasterMetricsSourceImpl
|
||||||
extends BaseMetricsSourceImpl implements MasterMetricsSource {
|
extends BaseMetricsSourceImpl implements MasterMetricsSource {
|
||||||
|
|
||||||
|
|
||||||
MutableCounterLong clusterRequestsCounter;
|
MutableCounterLong clusterRequestsCounter;
|
||||||
MutableGaugeLong ritGauge;
|
MutableGaugeLong ritGauge;
|
||||||
MutableGaugeLong ritCountOverThresholdGauge;
|
MutableGaugeLong ritCountOverThresholdGauge;
|
||||||
MutableGaugeLong ritOldestAgeGauge;
|
MutableGaugeLong ritOldestAgeGauge;
|
||||||
private final MasterMetricsWrapper masterWrapper;
|
private final MasterMetricsWrapper masterWrapper;
|
||||||
|
private MutableHistogram splitTimeHisto;
|
||||||
|
private MutableHistogram splitSizeHisto;
|
||||||
|
|
||||||
public MasterMetricsSourceImpl(MasterMetricsWrapper masterMetricsWrapper) {
|
public MasterMetricsSourceImpl(MasterMetricsWrapper masterMetricsWrapper) {
|
||||||
this(METRICS_NAME,
|
this(METRICS_NAME,
|
||||||
|
@ -50,6 +54,12 @@ public class MasterMetricsSourceImpl
|
||||||
MasterMetricsWrapper masterWrapper) {
|
MasterMetricsWrapper masterWrapper) {
|
||||||
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
|
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
|
||||||
this.masterWrapper = masterWrapper;
|
this.masterWrapper = masterWrapper;
|
||||||
|
clusterRequestsCounter = metricsRegistry.newCounter(CLUSTER_REQUESTS_NAME, "", 0l);
|
||||||
|
ritGauge = metricsRegistry.newGauge(RIT_COUNT_NAME, "", 0l);
|
||||||
|
ritCountOverThresholdGauge = metricsRegistry.newGauge(RIT_COUNT_OVER_THRESHOLD_NAME, "", 0l);
|
||||||
|
ritOldestAgeGauge = metricsRegistry.newGauge(RIT_OLDEST_AGE_NAME, "", 0l);
|
||||||
|
splitTimeHisto = metricsRegistry.newHistogram(SPLIT_SIZE_NAME, SPLIT_SIZE_DESC);
|
||||||
|
splitSizeHisto = metricsRegistry.newHistogram(SPLIT_TIME_NAME, SPLIT_TIME_DESC);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -77,6 +87,16 @@ public class MasterMetricsSourceImpl
|
||||||
ritCountOverThresholdGauge.set(ritCount);
|
ritCountOverThresholdGauge.set(ritCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateSplitTime(long time) {
|
||||||
|
splitTimeHisto.add(time);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateSplitSize(long size) {
|
||||||
|
splitSizeHisto.add(size);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
|
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
|
||||||
|
|
||||||
|
|
|
@ -22,8 +22,10 @@ import org.apache.hadoop.metrics2.MetricsCollector;
|
||||||
import org.apache.hadoop.metrics2.MetricsSource;
|
import org.apache.hadoop.metrics2.MetricsSource;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
|
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MetricMutableQuantiles;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableHistogram;
|
||||||
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -117,6 +119,18 @@ public class BaseMetricsSourceImpl implements BaseMetricsSource, MetricsSource {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateHistogram(String name, long value) {
|
||||||
|
MutableHistogram histo = metricsRegistry.getHistogram(name);
|
||||||
|
histo.add(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateQuantile(String name, long value) {
|
||||||
|
MetricMutableQuantiles histo = metricsRegistry.getQuantile(name);
|
||||||
|
histo.add(value);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove a named gauge.
|
* Remove a named gauge.
|
||||||
*
|
*
|
||||||
|
|
|
@ -250,8 +250,41 @@ public class DynamicMetricsRegistry {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
MutableRate ret = new MutableRate(name, desc, extended);
|
MutableRate ret = new MutableRate(name, desc, extended);
|
||||||
metricsMap.put(name, ret);
|
return addNewMetricIfAbsent(name, ret, MutableRate.class);
|
||||||
return ret;
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new histogram.
|
||||||
|
* @param name Name of the histogram.
|
||||||
|
* @return A new MutableHistogram
|
||||||
|
*/
|
||||||
|
public MutableHistogram newHistogram(String name) {
|
||||||
|
return newHistogram(name, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new histogram.
|
||||||
|
* @param name The name of the histogram
|
||||||
|
* @param desc The description of the data in the histogram.
|
||||||
|
* @return A new MutableHistogram
|
||||||
|
*/
|
||||||
|
public MutableHistogram newHistogram(String name, String desc) {
|
||||||
|
MutableHistogram histo = new MutableHistogram(name, desc);
|
||||||
|
return addNewMetricIfAbsent(name, histo, MutableHistogram.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new MutableQuantile(A more accurate histogram).
|
||||||
|
* @param name The name of the histogram
|
||||||
|
* @return a new MutableQuantile
|
||||||
|
*/
|
||||||
|
public MetricMutableQuantiles newQuantile(String name) {
|
||||||
|
return newQuantile(name, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
public MetricMutableQuantiles newQuantile(String name, String desc) {
|
||||||
|
MetricMutableQuantiles histo = new MetricMutableQuantiles(name, desc, "Ops", "", 60);
|
||||||
|
return addNewMetricIfAbsent(name, histo, MetricMutableQuantiles.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void add(String name, MutableMetric metric) {
|
synchronized void add(String name, MutableMetric metric) {
|
||||||
|
@ -440,6 +473,48 @@ public class DynamicMetricsRegistry {
|
||||||
return (MutableCounterLong) counter;
|
return (MutableCounterLong) counter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public MutableHistogram getHistogram(String histoName) {
|
||||||
|
//See getLongGauge for description on how this works.
|
||||||
|
MutableMetric histo = metricsMap.get(histoName);
|
||||||
|
if (histo == null) {
|
||||||
|
MutableHistogram newCounter =
|
||||||
|
new MutableHistogram(Interns.info(histoName, ""));
|
||||||
|
histo = metricsMap.putIfAbsent(histoName, newCounter);
|
||||||
|
if (histo == null) {
|
||||||
|
return newCounter;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (!(histo instanceof MutableHistogram)) {
|
||||||
|
throw new MetricsException("Metric already exists in registry for metric name: " +
|
||||||
|
histoName + " and not of type MutableHistogram");
|
||||||
|
}
|
||||||
|
|
||||||
|
return (MutableHistogram) histo;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MetricMutableQuantiles getQuantile(String histoName) {
|
||||||
|
//See getLongGauge for description on how this works.
|
||||||
|
MutableMetric histo = metricsMap.get(histoName);
|
||||||
|
if (histo == null) {
|
||||||
|
MetricMutableQuantiles newCounter =
|
||||||
|
new MetricMutableQuantiles(histoName, "", "Ops", "", 60);
|
||||||
|
histo = metricsMap.putIfAbsent(histoName, newCounter);
|
||||||
|
if (histo == null) {
|
||||||
|
return newCounter;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (!(histo instanceof MetricMutableQuantiles)) {
|
||||||
|
throw new MetricsException("Metric already exists in registry for metric name: " +
|
||||||
|
histoName + " and not of type MutableHistogram");
|
||||||
|
}
|
||||||
|
|
||||||
|
return (MetricMutableQuantiles) histo;
|
||||||
|
}
|
||||||
|
|
||||||
private<T extends MutableMetric> T
|
private<T extends MutableMetric> T
|
||||||
addNewMetricIfAbsent(String name,
|
addNewMetricIfAbsent(String name,
|
||||||
T ret,
|
T ret,
|
||||||
|
|
|
@ -0,0 +1,157 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.metrics2.lib;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.metrics.MetricHistogram;
|
||||||
|
import org.apache.hadoop.metrics.MetricsExecutor;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
|
import org.apache.hadoop.metrics2.util.MetricQuantile;
|
||||||
|
import org.apache.hadoop.metrics2.util.MetricSampleQuantiles;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Watches a stream of long values, maintaining online estimates of specific quantiles with provably
|
||||||
|
* low error bounds. This is particularly useful for accurate high-percentile (e.g. 95th, 99th)
|
||||||
|
* latency metrics.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class MetricMutableQuantiles extends MutableMetric implements MetricHistogram {
|
||||||
|
|
||||||
|
static final MetricQuantile[] quantiles = {new MetricQuantile(0.50, 0.050),
|
||||||
|
new MetricQuantile(0.75, 0.025), new MetricQuantile(0.90, 0.010),
|
||||||
|
new MetricQuantile(0.95, 0.005), new MetricQuantile(0.99, 0.001)};
|
||||||
|
|
||||||
|
private final MetricsInfo numInfo;
|
||||||
|
private final MetricsInfo[] quantileInfos;
|
||||||
|
private final int interval;
|
||||||
|
|
||||||
|
private MetricSampleQuantiles estimator;
|
||||||
|
private long previousCount = 0;
|
||||||
|
private MetricsExecutor executor;
|
||||||
|
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected Map<MetricQuantile, Long> previousSnapshot = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Instantiates a new {@link MetricMutableQuantiles} for a metric that rolls itself over on the
|
||||||
|
* specified time interval.
|
||||||
|
*
|
||||||
|
* @param name of the metric
|
||||||
|
* @param description long-form textual description of the metric
|
||||||
|
* @param sampleName type of items in the stream (e.g., "Ops")
|
||||||
|
* @param valueName type of the values
|
||||||
|
* @param interval rollover interval (in seconds) of the estimator
|
||||||
|
*/
|
||||||
|
public MetricMutableQuantiles(String name, String description, String sampleName,
|
||||||
|
String valueName, int interval) {
|
||||||
|
String ucName = StringUtils.capitalize(name);
|
||||||
|
String usName = StringUtils.capitalize(sampleName);
|
||||||
|
String uvName = StringUtils.capitalize(valueName);
|
||||||
|
String desc = StringUtils.uncapitalize(description);
|
||||||
|
String lsName = StringUtils.uncapitalize(sampleName);
|
||||||
|
String lvName = StringUtils.uncapitalize(valueName);
|
||||||
|
|
||||||
|
numInfo = info(ucName + "Num" + usName, String.format(
|
||||||
|
"Number of %s for %s with %ds interval", lsName, desc, interval));
|
||||||
|
// Construct the MetricsInfos for the quantiles, converting to percentiles
|
||||||
|
quantileInfos = new MetricsInfo[quantiles.length];
|
||||||
|
String nameTemplate = ucName + "%dthPercentile" + interval + "sInterval"
|
||||||
|
+ uvName;
|
||||||
|
String descTemplate = "%d percentile " + lvName + " with " + interval
|
||||||
|
+ " second interval for " + desc;
|
||||||
|
for (int i = 0; i < quantiles.length; i++) {
|
||||||
|
int percentile = (int) (100 * quantiles[i].quantile);
|
||||||
|
quantileInfos[i] = info(String.format(nameTemplate, percentile),
|
||||||
|
String.format(descTemplate, percentile));
|
||||||
|
}
|
||||||
|
|
||||||
|
estimator = new MetricSampleQuantiles(quantiles);
|
||||||
|
executor = new MetricsExecutorImpl();
|
||||||
|
this.interval = interval;
|
||||||
|
executor.getExecutor().scheduleAtFixedRate(new RolloverSample(this),
|
||||||
|
interval,
|
||||||
|
interval,
|
||||||
|
TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
|
||||||
|
if (all || changed()) {
|
||||||
|
builder.addGauge(numInfo, previousCount);
|
||||||
|
for (int i = 0; i < quantiles.length; i++) {
|
||||||
|
long newValue = 0;
|
||||||
|
// If snapshot is null, we failed to update since the window was empty
|
||||||
|
if (previousSnapshot != null) {
|
||||||
|
newValue = previousSnapshot.get(quantiles[i]);
|
||||||
|
}
|
||||||
|
builder.addGauge(quantileInfos[i], newValue);
|
||||||
|
}
|
||||||
|
if (changed()) {
|
||||||
|
clearChanged();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void add(long value) {
|
||||||
|
estimator.insert(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getInterval() {
|
||||||
|
return interval;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Runnable used to periodically roll over the internal {@link org.apache.hadoop.metrics2.util.MetricSampleQuantiles} every interval. */
|
||||||
|
private static class RolloverSample implements Runnable {
|
||||||
|
|
||||||
|
MetricMutableQuantiles parent;
|
||||||
|
|
||||||
|
public RolloverSample(MetricMutableQuantiles parent) {
|
||||||
|
this.parent = parent;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
synchronized (parent) {
|
||||||
|
try {
|
||||||
|
parent.previousCount = parent.estimator.getCount();
|
||||||
|
parent.previousSnapshot = parent.estimator.snapshot();
|
||||||
|
} catch (IOException e) {
|
||||||
|
// Couldn't get a new snapshot because the window was empty
|
||||||
|
parent.previousCount = 0;
|
||||||
|
parent.previousSnapshot = null;
|
||||||
|
}
|
||||||
|
parent.estimator.clear();
|
||||||
|
}
|
||||||
|
parent.setChanged();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,66 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.metrics2.lib;
|
||||||
|
|
||||||
|
import org.apache.hadoop.metrics.MetricsExecutor;
|
||||||
|
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to handle the ScheduledExecutorService{@link ScheduledExecutorService} used by MetricMutableQuantiles{@link MetricMutableQuantiles}
|
||||||
|
*/
|
||||||
|
public class MetricsExecutorImpl implements MetricsExecutor {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScheduledExecutorService getExecutor() {
|
||||||
|
return ExecutorSingleton.INSTANCE.scheduler;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
if (!getExecutor().isShutdown()) {
|
||||||
|
getExecutor().shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private enum ExecutorSingleton {
|
||||||
|
INSTANCE;
|
||||||
|
|
||||||
|
private final ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1, new ThreadPoolExecutorThreadFactory("HBase-Metrics2-"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class ThreadPoolExecutorThreadFactory implements ThreadFactory {
|
||||||
|
private final String name;
|
||||||
|
private final AtomicInteger threadNumber = new AtomicInteger(1);
|
||||||
|
|
||||||
|
private ThreadPoolExecutorThreadFactory(String name) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Thread newThread(Runnable runnable) {
|
||||||
|
Thread t = new Thread(runnable, name + threadNumber.getAndIncrement());
|
||||||
|
t.setDaemon(true);
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,132 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.metrics2.lib;
|
||||||
|
|
||||||
|
import com.yammer.metrics.stats.ExponentiallyDecayingSample;
|
||||||
|
import com.yammer.metrics.stats.Sample;
|
||||||
|
import com.yammer.metrics.stats.Snapshot;
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.apache.hadoop.metrics.MetricHistogram;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A histogram implementation that runs in constant space, and exports to hadoop2's metrics2 system.
|
||||||
|
*/
|
||||||
|
public class MutableHistogram extends MutableMetric implements MetricHistogram {
|
||||||
|
|
||||||
|
private static final int DEFAULT_SAMPLE_SIZE = 2046;
|
||||||
|
// the bias towards sampling from more recent data.
|
||||||
|
// Per Cormode et al. an alpha of 0.015 strongly biases to the last 5 minutes
|
||||||
|
private static final double DEFAULT_ALPHA = 0.015;
|
||||||
|
|
||||||
|
private final String name;
|
||||||
|
private final String desc;
|
||||||
|
private final Sample sample;
|
||||||
|
private final AtomicLong min;
|
||||||
|
private final AtomicLong max;
|
||||||
|
private final AtomicLong sum;
|
||||||
|
private final AtomicLong count;
|
||||||
|
|
||||||
|
public MutableHistogram(MetricsInfo info) {
|
||||||
|
this(info.name(), info.description());
|
||||||
|
}
|
||||||
|
|
||||||
|
public MutableHistogram(String name, String description) {
|
||||||
|
this.name = StringUtils.capitalize(name);
|
||||||
|
this.desc = StringUtils.uncapitalize(description);
|
||||||
|
sample = new ExponentiallyDecayingSample(DEFAULT_SAMPLE_SIZE, DEFAULT_ALPHA);
|
||||||
|
count = new AtomicLong();
|
||||||
|
min = new AtomicLong(Long.MAX_VALUE);
|
||||||
|
max = new AtomicLong(Long.MIN_VALUE);
|
||||||
|
sum = new AtomicLong();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void add(final long val) {
|
||||||
|
setChanged();
|
||||||
|
count.incrementAndGet();
|
||||||
|
sample.update(val);
|
||||||
|
setMax(val);
|
||||||
|
setMin(val);
|
||||||
|
sum.getAndAdd(val);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setMax(final long potentialMax) {
|
||||||
|
boolean done = false;
|
||||||
|
while (!done) {
|
||||||
|
final long currentMax = max.get();
|
||||||
|
done = currentMax >= potentialMax
|
||||||
|
|| max.compareAndSet(currentMax, potentialMax);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setMin(long potentialMin) {
|
||||||
|
boolean done = false;
|
||||||
|
while (!done) {
|
||||||
|
final long currentMin = min.get();
|
||||||
|
done = currentMin <= potentialMin
|
||||||
|
|| min.compareAndSet(currentMin, potentialMin);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMax() {
|
||||||
|
if (count.get() > 0) {
|
||||||
|
return max.get();
|
||||||
|
}
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMin() {
|
||||||
|
if (count.get() > 0) {
|
||||||
|
return min.get();
|
||||||
|
}
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getMean() {
|
||||||
|
long cCount = count.get();
|
||||||
|
if (cCount > 0) {
|
||||||
|
return sum.get() / (double) cCount;
|
||||||
|
}
|
||||||
|
return 0.0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void snapshot(MetricsRecordBuilder metricsRecordBuilder, boolean all) {
|
||||||
|
if (all || changed()) {
|
||||||
|
clearChanged();
|
||||||
|
final Snapshot s = sample.getSnapshot();
|
||||||
|
metricsRecordBuilder.addCounter(Interns.info(name + NUM_OPS_METRIC_NAME, desc), count.get());
|
||||||
|
|
||||||
|
metricsRecordBuilder.addGauge(Interns.info(name + MIN_METRIC_NAME, desc), getMin());
|
||||||
|
metricsRecordBuilder.addGauge(Interns.info(name + MAX_METRIC_NAME, desc), getMax());
|
||||||
|
metricsRecordBuilder.addGauge(Interns.info(name + MEAN_METRIC_NAME, desc), getMean());
|
||||||
|
|
||||||
|
metricsRecordBuilder.addGauge(Interns.info(name + MEDIAN_METRIC_NAME, desc), s.getMedian());
|
||||||
|
metricsRecordBuilder.addGauge(Interns.info(name + SEVENTY_FIFTH_PERCENTILE_METRIC_NAME, desc),
|
||||||
|
s.get75thPercentile());
|
||||||
|
metricsRecordBuilder.addGauge(Interns.info(name + NINETY_FIFTH_PERCENTILE_METRIC_NAME, desc),
|
||||||
|
s.get95thPercentile());
|
||||||
|
metricsRecordBuilder.addGauge(Interns.info(name + NINETY_NINETH_PERCENTILE_METRIC_NAME, desc),
|
||||||
|
s.get99thPercentile());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,60 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.metrics2.util;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specifies a quantile (with error bounds) to be watched by a
|
||||||
|
* {@link MetricSampleQuantiles} object.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class MetricQuantile {
|
||||||
|
public final double quantile;
|
||||||
|
public final double error;
|
||||||
|
|
||||||
|
public MetricQuantile(double quantile, double error) {
|
||||||
|
this.quantile = quantile;
|
||||||
|
this.error = error;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object aThat) {
|
||||||
|
if (this == aThat) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (!(aThat instanceof MetricQuantile)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
MetricQuantile that = (MetricQuantile) aThat;
|
||||||
|
|
||||||
|
long qbits = Double.doubleToLongBits(quantile);
|
||||||
|
long ebits = Double.doubleToLongBits(error);
|
||||||
|
|
||||||
|
return qbits == Double.doubleToLongBits(that.quantile)
|
||||||
|
&& ebits == Double.doubleToLongBits(that.error);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return (int) (Double.doubleToLongBits(quantile) ^ Double
|
||||||
|
.doubleToLongBits(error));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,310 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.metrics2.util;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.ListIterator;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation of the Cormode, Korn, Muthukrishnan, and Srivastava algorithm
|
||||||
|
* for streaming calculation of targeted high-percentile epsilon-approximate
|
||||||
|
* quantiles.
|
||||||
|
*
|
||||||
|
* This is a generalization of the earlier work by Greenwald and Khanna (GK),
|
||||||
|
* which essentially allows different error bounds on the targeted quantiles,
|
||||||
|
* which allows for far more efficient calculation of high-percentiles.
|
||||||
|
*
|
||||||
|
* See: Cormode, Korn, Muthukrishnan, and Srivastava
|
||||||
|
* "Effective Computation of Biased Quantiles over Data Streams" in ICDE 2005
|
||||||
|
*
|
||||||
|
* Greenwald and Khanna,
|
||||||
|
* "Space-efficient online computation of quantile summaries" in SIGMOD 2001
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class MetricSampleQuantiles {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Total number of items in stream
|
||||||
|
*/
|
||||||
|
private long count = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Current list of sampled items, maintained in sorted order with error bounds
|
||||||
|
*/
|
||||||
|
private LinkedList<SampleItem> samples;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Buffers incoming items to be inserted in batch. Items are inserted into
|
||||||
|
* the buffer linearly. When the buffer fills, it is flushed into the samples
|
||||||
|
* array in its entirety.
|
||||||
|
*/
|
||||||
|
private long[] buffer = new long[500];
|
||||||
|
private int bufferCount = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Array of Quantiles that we care about, along with desired error.
|
||||||
|
*/
|
||||||
|
private final MetricQuantile quantiles[];
|
||||||
|
|
||||||
|
public MetricSampleQuantiles(MetricQuantile[] quantiles) {
|
||||||
|
this.quantiles = quantiles;
|
||||||
|
this.samples = new LinkedList<SampleItem>();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specifies the allowable error for this rank, depending on which quantiles
|
||||||
|
* are being targeted.
|
||||||
|
*
|
||||||
|
* This is the f(r_i, n) function from the CKMS paper. It's basically how wide
|
||||||
|
* the range of this rank can be.
|
||||||
|
*
|
||||||
|
* @param rank
|
||||||
|
* the index in the list of samples
|
||||||
|
*/
|
||||||
|
private double allowableError(int rank) {
|
||||||
|
int size = samples.size();
|
||||||
|
double minError = size + 1;
|
||||||
|
for (MetricQuantile q : quantiles) {
|
||||||
|
double error;
|
||||||
|
if (rank <= q.quantile * size) {
|
||||||
|
error = (2.0 * q.error * (size - rank)) / (1.0 - q.quantile);
|
||||||
|
} else {
|
||||||
|
error = (2.0 * q.error * rank) / q.quantile;
|
||||||
|
}
|
||||||
|
if (error < minError) {
|
||||||
|
minError = error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return minError;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a new value from the stream.
|
||||||
|
*
|
||||||
|
* @param v
|
||||||
|
*/
|
||||||
|
synchronized public void insert(long v) {
|
||||||
|
buffer[bufferCount] = v;
|
||||||
|
bufferCount++;
|
||||||
|
|
||||||
|
count++;
|
||||||
|
|
||||||
|
if (bufferCount == buffer.length) {
|
||||||
|
insertBatch();
|
||||||
|
compress();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merges items from buffer into the samples array in one pass.
|
||||||
|
* This is more efficient than doing an insert on every item.
|
||||||
|
*/
|
||||||
|
private void insertBatch() {
|
||||||
|
if (bufferCount == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Arrays.sort(buffer, 0, bufferCount);
|
||||||
|
|
||||||
|
// Base case: no samples
|
||||||
|
int start = 0;
|
||||||
|
if (samples.size() == 0) {
|
||||||
|
SampleItem newItem = new SampleItem(buffer[0], 1, 0);
|
||||||
|
samples.add(newItem);
|
||||||
|
start++;
|
||||||
|
}
|
||||||
|
|
||||||
|
ListIterator<SampleItem> it = samples.listIterator();
|
||||||
|
SampleItem item = it.next();
|
||||||
|
for (int i = start; i < bufferCount; i++) {
|
||||||
|
long v = buffer[i];
|
||||||
|
while (it.nextIndex() < samples.size() && item.value < v) {
|
||||||
|
item = it.next();
|
||||||
|
}
|
||||||
|
// If we found that bigger item, back up so we insert ourselves before it
|
||||||
|
if (item.value > v) {
|
||||||
|
it.previous();
|
||||||
|
}
|
||||||
|
// We use different indexes for the edge comparisons, because of the above
|
||||||
|
// if statement that adjusts the iterator
|
||||||
|
int delta;
|
||||||
|
if (it.previousIndex() == 0 || it.nextIndex() == samples.size()) {
|
||||||
|
delta = 0;
|
||||||
|
} else {
|
||||||
|
delta = ((int) Math.floor(allowableError(it.nextIndex()))) - 1;
|
||||||
|
}
|
||||||
|
SampleItem newItem = new SampleItem(v, 1, delta);
|
||||||
|
it.add(newItem);
|
||||||
|
item = newItem;
|
||||||
|
}
|
||||||
|
|
||||||
|
bufferCount = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try to remove extraneous items from the set of sampled items. This checks
|
||||||
|
* if an item is unnecessary based on the desired error bounds, and merges it
|
||||||
|
* with the adjacent item if it is.
|
||||||
|
*/
|
||||||
|
private void compress() {
|
||||||
|
if (samples.size() < 2) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ListIterator<SampleItem> it = samples.listIterator();
|
||||||
|
SampleItem prev = null;
|
||||||
|
SampleItem next = it.next();
|
||||||
|
|
||||||
|
while (it.hasNext()) {
|
||||||
|
prev = next;
|
||||||
|
next = it.next();
|
||||||
|
if (prev.g + next.g + next.delta <= allowableError(it.previousIndex())) {
|
||||||
|
next.g += prev.g;
|
||||||
|
// Remove prev. it.remove() kills the last thing returned.
|
||||||
|
it.previous();
|
||||||
|
it.previous();
|
||||||
|
it.remove();
|
||||||
|
// it.next() is now equal to next, skip it back forward again
|
||||||
|
it.next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the estimated value at the specified quantile.
|
||||||
|
*
|
||||||
|
* @param quantile Queried quantile, e.g. 0.50 or 0.99.
|
||||||
|
* @return Estimated value at that quantile.
|
||||||
|
*/
|
||||||
|
private long query(double quantile) throws IOException {
|
||||||
|
if (samples.size() == 0) {
|
||||||
|
throw new IOException("No samples present");
|
||||||
|
}
|
||||||
|
|
||||||
|
int rankMin = 0;
|
||||||
|
int desired = (int) (quantile * count);
|
||||||
|
|
||||||
|
for (int i = 1; i < samples.size(); i++) {
|
||||||
|
SampleItem prev = samples.get(i - 1);
|
||||||
|
SampleItem cur = samples.get(i);
|
||||||
|
|
||||||
|
rankMin += prev.g;
|
||||||
|
|
||||||
|
if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) {
|
||||||
|
return prev.value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// edge case of wanting max value
|
||||||
|
return samples.get(samples.size() - 1).value;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a snapshot of the current values of all the tracked quantiles.
|
||||||
|
*
|
||||||
|
* @return snapshot of the tracked quantiles
|
||||||
|
* @throws IOException
|
||||||
|
* if no items have been added to the estimator
|
||||||
|
*/
|
||||||
|
synchronized public Map<MetricQuantile, Long> snapshot() throws IOException {
|
||||||
|
// flush the buffer first for best results
|
||||||
|
insertBatch();
|
||||||
|
Map<MetricQuantile, Long> values = new HashMap<MetricQuantile, Long>(quantiles.length);
|
||||||
|
for (int i = 0; i < quantiles.length; i++) {
|
||||||
|
values.put(quantiles[i], query(quantiles[i].quantile));
|
||||||
|
}
|
||||||
|
|
||||||
|
return values;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of items that the estimator has processed
|
||||||
|
*
|
||||||
|
* @return count total number of items processed
|
||||||
|
*/
|
||||||
|
synchronized public long getCount() {
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of samples kept by the estimator
|
||||||
|
*
|
||||||
|
* @return count current number of samples
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
synchronized public int getSampleCount() {
|
||||||
|
return samples.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resets the estimator, clearing out all previously inserted items
|
||||||
|
*/
|
||||||
|
synchronized public void clear() {
|
||||||
|
count = 0;
|
||||||
|
bufferCount = 0;
|
||||||
|
samples.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Describes a measured value passed to the estimator, tracking additional
|
||||||
|
* metadata required by the CKMS algorithm.
|
||||||
|
*/
|
||||||
|
private static class SampleItem {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Value of the sampled item (e.g. a measured latency value)
|
||||||
|
*/
|
||||||
|
public final long value;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Difference between the lowest possible rank of the previous item, and
|
||||||
|
* the lowest possible rank of this item.
|
||||||
|
*
|
||||||
|
* The sum of the g of all previous items yields this item's lower bound.
|
||||||
|
*/
|
||||||
|
public int g;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Difference between the item's greatest possible rank and lowest possible
|
||||||
|
* rank.
|
||||||
|
*/
|
||||||
|
public final int delta;
|
||||||
|
|
||||||
|
public SampleItem(long value, int lowerDelta, int delta) {
|
||||||
|
this.value = value;
|
||||||
|
this.g = lowerDelta;
|
||||||
|
this.delta = delta;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format("%d, %d, %d", value, g, delta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -51,8 +51,8 @@ public class MasterMetrics {
|
||||||
* @param size length of original HLogs that were split
|
* @param size length of original HLogs that were split
|
||||||
*/
|
*/
|
||||||
public synchronized void addSplit(long time, long size) {
|
public synchronized void addSplit(long time, long size) {
|
||||||
//TODO use new metrics histogram
|
masterMetricsSource.updateSplitTime(time);
|
||||||
|
masterMetricsSource.updateSplitSize(size);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue