HADOOP-14503. Make RollingAverages a mutable metric. Contributed by Hanisha Koneru.

This commit is contained in:
Arpit Agarwal 2017-06-12 20:43:43 -07:00
parent bec79ca249
commit 8633ef8e10
11 changed files with 213 additions and 111 deletions

View File

@ -313,6 +313,15 @@ public synchronized MutableRatesWithAggregation newRatesWithAggregation(
return rates;
}
public synchronized MutableRollingAverages newMutableRollingAverages(
String name, String valueName) {
checkMetricName(name);
MutableRollingAverages rollingAverages =
new MutableRollingAverages(valueName);
metricsMap.put(name, rollingAverages);
return rollingAverages;
}
synchronized void add(String name, MutableMetric metric) {
checkMetricName(name);
metricsMap.put(name, metric);

View File

@ -78,6 +78,10 @@ MutableMetric newForField(Field field, Metric annotation,
annotation.sampleName(), annotation.valueName(),
annotation.always());
}
if (cls == MutableRollingAverages.class) {
return registry.newMutableRollingAverages(info.name(),
annotation.valueName());
}
throw new MetricsException("Unsupported metric field "+ field.getName() +
" of type "+ field.getType().getName());
}

View File

@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -57,29 +58,30 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class RollingAverages extends MutableMetric implements Closeable {
public class MutableRollingAverages extends MutableMetric implements Closeable {
private final MutableRatesWithAggregation innerMetrics =
private MutableRatesWithAggregation innerMetrics =
new MutableRatesWithAggregation();
private static final ScheduledExecutorService SCHEDULER = Executors
@VisibleForTesting
static final ScheduledExecutorService SCHEDULER = Executors
.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("RollingAverages-%d").build());
.setNameFormat("MutableRollingAverages-%d").build());
private ScheduledFuture<?> scheduledTask = null;
@Nullable
private Map<String, MutableRate> currentSnapshot;
private final int numWindows;
private final String avgInfoNameTemplate;
private final String avgInfoDescTemplate;
private int numWindows;
private static class SumAndCount {
private final double sum;
private final long count;
public SumAndCount(final double sum, final long count) {
SumAndCount(final double sum, final long count) {
this.sum = sum;
this.count = count;
}
@ -105,44 +107,36 @@ public long getCount() {
private Map<String, LinkedBlockingDeque<SumAndCount>> averages =
new ConcurrentHashMap<>();
private static final long WINDOW_SIZE_MS_DEFAULT = 300_000;
private static final int NUM_WINDOWS_DEFAULT = 36;
/**
* Constructor of {@link RollingAverages}.
* @param windowSizeMs
* The number of milliseconds of each window for which subset
* of samples are gathered to compute the rolling average, A.K.A.
* roll over interval.
* @param numWindows
* The number of windows maintained to compute the rolling average.
* @param valueName
* of the metric (e.g. "Time", "Latency")
* Constructor for {@link MutableRollingAverages}.
* @param metricValueName
*/
public RollingAverages(
final long windowSizeMs,
final int numWindows,
final String valueName) {
String uvName = StringUtils.capitalize(valueName);
String lvName = StringUtils.uncapitalize(valueName);
avgInfoNameTemplate = "[%s]" + "RollingAvg"+ uvName;
avgInfoDescTemplate = "Rolling average "+ lvName +" for "+ "%s";
this.numWindows = numWindows;
public MutableRollingAverages(String metricValueName) {
if (metricValueName == null) {
metricValueName = "";
}
avgInfoNameTemplate = "[%s]" + "RollingAvg" +
StringUtils.capitalize(metricValueName);
avgInfoDescTemplate = "Rolling average " +
StringUtils.uncapitalize(metricValueName) +" for "+ "%s";
numWindows = NUM_WINDOWS_DEFAULT;
scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this),
windowSizeMs, windowSizeMs, TimeUnit.MILLISECONDS);
WINDOW_SIZE_MS_DEFAULT, WINDOW_SIZE_MS_DEFAULT, TimeUnit.MILLISECONDS);
}
/**
* Constructor of {@link RollingAverages}.
* @param windowSizeMs
* The number of seconds of each window for which sub set of samples
* are gathered to compute rolling average, also A.K.A roll over
* interval.
* @param numWindows
* The number of windows maintained in the same time to compute the
* average of the rolling averages.
* This method is for testing only to replace the scheduledTask.
*/
public RollingAverages(
final long windowSizeMs,
final int numWindows) {
this(windowSizeMs, numWindows, "Time");
@VisibleForTesting
synchronized void replaceScheduledTask(int windows, long interval,
TimeUnit timeUnit) {
numWindows = windows;
scheduledTask.cancel(true);
scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this),
interval, interval, timeUnit);
}
@Override
@ -190,9 +184,9 @@ public void add(final String name, final long value) {
}
private static class RatesRoller implements Runnable {
private final RollingAverages parent;
private final MutableRollingAverages parent;
public RatesRoller(final RollingAverages parent) {
RatesRoller(final MutableRollingAverages parent) {
this.parent = parent;
}
@ -218,7 +212,7 @@ public void run() {
/**
* Iterates over snapshot to capture all Avg metrics into rolling structure
* {@link RollingAverages#averages}.
* {@link MutableRollingAverages#averages}.
*/
private synchronized void rollOverAvgs() {
if (currentSnapshot == null) {
@ -232,7 +226,7 @@ private synchronized void rollOverAvgs() {
new Function<String, LinkedBlockingDeque<SumAndCount>>() {
@Override
public LinkedBlockingDeque<SumAndCount> apply(String k) {
return new LinkedBlockingDeque<SumAndCount>(numWindows);
return new LinkedBlockingDeque<>(numWindows);
}
});
final SumAndCount sumAndCount = new SumAndCount(

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.lib;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
* A helper class that can provide test cases access to package-private
* methods.
*/
public final class MetricsTestHelper {
public static final Logger LOG =
LoggerFactory.getLogger(MetricsTestHelper.class);
private MetricsTestHelper() {
//not called
}
/**
* Replace the rolling averages windows for a
* {@link MutableRollingAverages} metric.
*
*/
public static void replaceRollingAveragesScheduler(
MutableRollingAverages mutableRollingAverages,
int numWindows, long interval, TimeUnit timeUnit) {
mutableRollingAverages.replaceScheduledTask(
numWindows, interval, timeUnit);
}
}

View File

@ -17,23 +17,30 @@
*/
package org.apache.hadoop.metrics2.lib;
import com.google.common.base.Supplier;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.metrics2.lib.Interns.info;
import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.apache.hadoop.test.MetricsAsserts.*;
import static org.mockito.Matchers.anyDouble;
import static org.mockito.Matchers.eq;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.util.Time;
import org.junit.Test;
import static org.mockito.Mockito.*;
/**
* This class tests various cases of the algorithms implemented in
* {@link RollingAverages}.
* {@link MutableRollingAverages}.
*/
public class TestRollingAverages {
public class TestMutableRollingAverages {
/**
* Tests if the results are correct if no samples are inserted, dry run of
* empty roll over.
@ -42,8 +49,9 @@ public class TestRollingAverages {
public void testRollingAveragesEmptyRollover() throws Exception {
final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
/* 5s interval and 2 windows */
try (RollingAverages rollingAverages =
new RollingAverages(5000, 2)) {
try (MutableRollingAverages rollingAverages =
new MutableRollingAverages("Time")) {
rollingAverages.replaceScheduledTask(2, 5, TimeUnit.SECONDS);
/* Check it initially */
rollingAverages.snapshot(rb, true);
verify(rb, never()).addGauge(
@ -78,9 +86,9 @@ public void testRollingAveragesRollover() throws Exception {
final int windowSizeMs = 5000; // 5s roll over interval
final int numWindows = 2;
final int numOpsPerIteration = 1000;
try (RollingAverages rollingAverages = new RollingAverages(windowSizeMs,
numWindows)) {
try (MutableRollingAverages rollingAverages =
new MutableRollingAverages("Time")) {
rollingAverages.replaceScheduledTask(2, 5000, TimeUnit.MILLISECONDS);
/* Push values for three intervals */
final long start = Time.monotonicNow();
for (int i = 1; i <= 3; i++) {
@ -121,4 +129,64 @@ public void testRollingAveragesRollover() throws Exception {
}
}
}
/**
* Test that MutableRollingAverages gives expected results after
* initialization.
* @throws Exception
*/
@Test(timeout = 30000)
public void testMutableRollingAveragesMetric() throws Exception {
DummyTestMetric testMetric = new DummyTestMetric();
testMetric.create();
testMetric.add("metric1", 100);
testMetric.add("metric1", 900);
testMetric.add("metric2", 1000);
testMetric.add("metric2", 1000);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
testMetric.collectThreadLocalStates();
return testMetric.getStats().size() > 0;
}
}, 500, 5000);
MetricsRecordBuilder rb = getMetrics(DummyTestMetric.METRIC_NAME);
double metric1Avg = getDoubleGauge("[Metric1]RollingAvgTesting", rb);
double metric2Avg = getDoubleGauge("[Metric2]RollingAvgTesting", rb);
Assert.assertTrue("The rolling average of metric1 is not as expected",
metric1Avg == 500.0);
Assert.assertTrue("The rolling average of metric2 is not as expected",
metric2Avg == 1000.0);
}
class DummyTestMetric {
@Metric (valueName = "testing")
private MutableRollingAverages rollingAverages;
static final String METRIC_NAME = "RollingAveragesTestMetric";
protected void create() {
DefaultMetricsSystem.instance().register(METRIC_NAME,
"mutable rolling averages test", this);
rollingAverages.replaceScheduledTask(10, 1000, TimeUnit.MILLISECONDS);
}
void add(String name, long latency) {
rollingAverages.add(name, latency);
}
void collectThreadLocalStates() {
rollingAverages.collectThreadLocalStates();
}
Map<String, Double> getStats() {
return rollingAverages.getStats(0);
}
}
}

View File

@ -465,18 +465,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
HdfsClientConfigKeys.DeprecatedKeys.DFS_METRICS_SESSION_ID_KEY;
public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
// The following setting is not meant to be changed by administrators.
public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY =
"dfs.metrics.rolling.averages.window.length";
public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_DEFAULT =
"5m";
// The following setting is not meant to be changed by administrators.
public static final String DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY =
"dfs.metrics.rolling.average.num.windows";
public static final int DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_DEFAULT =
36;
public static final String DFS_DATANODE_PEER_STATS_ENABLED_KEY =
"dfs.datanode.peer.stats.enabled";
public static final boolean DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT = false;

View File

@ -1415,7 +1415,7 @@ void startDataNode(List<StorageLocation> dataDirectories,
metrics = DataNodeMetrics.create(getConf(), getDisplayName());
peerMetrics = dnConf.peerStatsEnabled ?
DataNodePeerMetrics.create(getConf(), getDisplayName()) : null;
DataNodePeerMetrics.create(getDisplayName()) : null;
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
ecWorker = new ErasureCodingWorker(getConf(), this);

View File

@ -22,16 +22,13 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.metrics2.MetricsJsonBuilder;
import org.apache.hadoop.metrics2.lib.RollingAverages;
import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
* This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for
@ -44,7 +41,7 @@ public class DataNodePeerMetrics {
public static final Logger LOG = LoggerFactory.getLogger(
DataNodePeerMetrics.class);
private final RollingAverages sendPacketDownstreamRollingAvgerages;
private final MutableRollingAverages sendPacketDownstreamRollingAverages;
private final String name;
@ -64,15 +61,11 @@ public class DataNodePeerMetrics {
@VisibleForTesting
static final long MIN_OUTLIER_DETECTION_SAMPLES = 1000;
public DataNodePeerMetrics(
final String name,
final long windowSizeMs,
final int numWindows) {
public DataNodePeerMetrics(final String name) {
this.name = name;
this.slowNodeDetector = new OutlierDetector(MIN_OUTLIER_DETECTION_NODES,
LOW_THRESHOLD_MS);
sendPacketDownstreamRollingAvgerages = new RollingAverages(
windowSizeMs, numWindows);
sendPacketDownstreamRollingAverages = new MutableRollingAverages("Time");
}
public String name() {
@ -82,23 +75,12 @@ public String name() {
/**
* Creates an instance of DataNodePeerMetrics, used for registration.
*/
public static DataNodePeerMetrics create(Configuration conf, String dnName) {
public static DataNodePeerMetrics create(String dnName) {
final String name = "DataNodePeerActivity-" + (dnName.isEmpty()
? "UndefinedDataNodeName" + ThreadLocalRandom.current().nextInt()
: dnName.replace(':', '-'));
final long windowSizeMs = conf.getTimeDuration(
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY,
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_DEFAULT,
TimeUnit.MILLISECONDS);
final int numWindows = conf.getInt(
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY,
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_DEFAULT);
return new DataNodePeerMetrics(
name,
windowSizeMs,
numWindows);
return new DataNodePeerMetrics(name);
}
/**
@ -112,7 +94,7 @@ public static DataNodePeerMetrics create(Configuration conf, String dnName) {
public void addSendPacketDownstream(
final String peerAddr,
final long elapsedMs) {
sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs);
sendPacketDownstreamRollingAverages.add(peerAddr, elapsedMs);
}
/**
@ -120,7 +102,7 @@ public void addSendPacketDownstream(
*/
public String dumpSendPacketDownstreamAvgInfoAsJson() {
final MetricsJsonBuilder builder = new MetricsJsonBuilder(null);
sendPacketDownstreamRollingAvgerages.snapshot(builder, true);
sendPacketDownstreamRollingAverages.snapshot(builder, true);
return builder.toString();
}
@ -128,7 +110,7 @@ public String dumpSendPacketDownstreamAvgInfoAsJson() {
* Collects states maintained in {@link ThreadLocal}, if any.
*/
public void collectThreadLocalStates() {
sendPacketDownstreamRollingAvgerages.collectThreadLocalStates();
sendPacketDownstreamRollingAverages.collectThreadLocalStates();
}
/**
@ -139,10 +121,14 @@ public Map<String, Double> getOutliers() {
// This maps the metric name to the aggregate latency.
// The metric name is the datanode ID.
final Map<String, Double> stats =
sendPacketDownstreamRollingAvgerages.getStats(
sendPacketDownstreamRollingAverages.getStats(
MIN_OUTLIER_DETECTION_SAMPLES);
LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
return slowNodeDetector.getOutliers(stats);
}
public MutableRollingAverages getSendPacketDownstreamRollingAverages() {
return sendPacketDownstreamRollingAverages;
}
}

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
@ -42,16 +43,13 @@ public void testGetSendPacketDownstreamAvgInfo() throws Exception {
final int numOpsPerIteration = 1000;
final Configuration conf = new HdfsConfiguration();
conf.setTimeDuration(
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY,
windowSize, TimeUnit.SECONDS);
conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY,
numWindows);
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
final DataNodePeerMetrics peerMetrics = DataNodePeerMetrics.create(
conf,
"Sample-DataNode");
MetricsTestHelper.replaceRollingAveragesScheduler(
peerMetrics.getSendPacketDownstreamRollingAverages(),
numWindows, windowSize, TimeUnit.SECONDS);
final long start = Time.monotonicNow();
for (int i = 1; i <= iterations; i++) {
final String peerAddr = genPeerAddress();

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.datanode.metrics;
import com.google.common.base.Supplier;
import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Before;
@ -30,6 +31,7 @@
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
@ -73,8 +75,12 @@ public void testOutlierIsDetected() throws Exception {
final String slowNodeName = "SlowNode";
DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics(
"PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS,
ROLLING_AVERAGE_WINDOWS);
"PeerMetrics-For-Test");
MetricsTestHelper.replaceRollingAveragesScheduler(
peerMetrics.getSendPacketDownstreamRollingAverages(),
ROLLING_AVERAGE_WINDOWS,
WINDOW_INTERVAL_SECONDS, TimeUnit.SECONDS);
injectFastNodesSamples(peerMetrics);
injectSlowNodeSamples(peerMetrics, slowNodeName);
@ -101,8 +107,12 @@ public Boolean get() {
@Test
public void testWithNoOutliers() throws Exception {
DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics(
"PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS,
ROLLING_AVERAGE_WINDOWS);
"PeerMetrics-For-Test");
MetricsTestHelper.replaceRollingAveragesScheduler(
peerMetrics.getSendPacketDownstreamRollingAverages(),
ROLLING_AVERAGE_WINDOWS,
WINDOW_INTERVAL_SECONDS, TimeUnit.SECONDS);
injectFastNodesSamples(peerMetrics);

View File

@ -66,10 +66,6 @@ public void initializeMemberVariables() {
// Purposely hidden, based on comments in DFSConfigKeys
configurationPropsToSkipCompare
.add(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY);
configurationPropsToSkipCompare
.add(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY);
configurationPropsToSkipCompare
.add(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY);
// Fully deprecated properties?
configurationPropsToSkipCompare